Attention une mise à jour du serveur va être effectuée le lundi 17 mai entre 13h et 13h30. Cette mise à jour va générer une interruption du service de quelques minutes.

Commit 4d949a78 authored by EYRAUD-DUBOIS Lionel's avatar EYRAUD-DUBOIS Lionel

Working parallel version !

parent 048ef627
......@@ -29,6 +29,7 @@ static const int opt_no_dependencies = 11;
static const int opt_no_header = 12;
static const int opt_rev_dep = 13;
static const int opt_best = 14;
static const int opt_thread = 15;
static struct option long_options[] = {
......@@ -48,6 +49,7 @@ static struct option long_options[] = {
{"help", no_argument, 0, 'h'},
{"default", optional_argument, 0, 'd'},
{"best", optional_argument, 0, opt_best},
{"threads", optional_argument, 0, opt_thread},
{0, 0, 0, 0 }
};
......@@ -63,6 +65,7 @@ ProgramOptions::ProgramOptions() {
noHeader = false;
optRevDep = false;
outputBest = false;
nbThreads = 1;
}
void ProgramOptions::parse(int argc, char** argv) {
......@@ -143,6 +146,12 @@ void ProgramOptions::parse(int argc, char** argv) {
if(optarg)
outputBestFile = s_optarg;
break ;
case opt_thread:
if(optarg)
nbThreads = stoi(s_optarg);
else
nbThreads = -1;
break;
case '?':
case 'h':
usage();
......
......@@ -129,6 +129,9 @@ int recTask::outputCount = 0;
void RecFileInstance::readFromFile(const string inputFile, unordered_map<string, vector<double> > timings) {
this->inputFile = inputFile;
if(recTask::recName == NULL) recTask::recReaderInit();
char* inputFileC = new char[inputFile.length() + 1];
......@@ -218,6 +221,8 @@ RecFileInstance::RecFileInstance(const string inputFile) {
}
RecFileInstance::RecFileInstance(const string inputFile, const string platformFile) {
this->platformFile = platformFile;
char* platformFileC = new char[platformFile.length() + 1];
strcpy(platformFileC, platformFile.c_str());
FILE* platformFILE = fopen(platformFileC, "r");
......
......@@ -5,6 +5,8 @@
#include <cmath>
using namespace std;
#include <mutex>
#include "instance.h"
#include "util.h"
......@@ -21,6 +23,7 @@ Instance::Instance() {
Instance::Instance(const string input_file, int convertIndices) {
// cout << "Opening " << input_file << "..." << endl;
this->inputFile = input_file;
ifstream input(input_file);
int i;
......@@ -230,23 +233,29 @@ void Instance::mergeWorkerTypes(const vector<int> indicesToMerge) {
/* Accessors for static analysis */
vector<int> Instance::getTopOrder() {
lock_guard<mutex> lock(mtx);
computeTopOrder();
return topOrder;
}
vector< vector<int> > Instance::getAncestors() {
lock_guard<mutex> lock(mtx);
computeAncestors();
return ancestors;
}
vector< vector<int> > Instance::getRevDependencies() {
lock_guard<mutex> lock(mtx);
computeRevDependencies();
return revDep;
}
vector<double> Instance::computeRanks(vector<double> wbar, int to, int from) {
computeRevDependencies();
computeTopOrder();
{
lock_guard<mutex> lock(mtx);
computeRevDependencies();
computeTopOrder();
}
vector<double> rank(nbTasks, -std::numeric_limits<double>::infinity());
if(to == -1) to = topOrder[nbTasks -1];
if(from == -1) from = topOrder[0];
......@@ -397,7 +406,6 @@ void Instance::computeTopOrder() {
if(topOrder.size() != 0)
return;
vector<int> computed(nbTasks, 0);
topOrder.resize(nbTasks);
int i, l = 0;
......
......@@ -46,7 +46,7 @@ ExportSchedule::ExportSchedule(Instance* ins, string name)
}
ExportToFile::ExportToFile(string filename, Instance* ins, bool header, string name): ExportSchedule(ins, name), f(new ofstream(filename)) {
ExportToFile::ExportToFile(string filename, Instance* ins, bool header, string name): ExportSchedule(ins, name), f(new ofstream(filename, ios::app)) {
output = f;
if(header) outputHeader();
}
......@@ -123,7 +123,7 @@ ExportAlloc::~ExportAlloc() {
UtilAnalysis::UtilAnalysis(Instance* _ins, string saveFile) :
ins(_ins), repartition(_ins->nbWorkerTypes,
std::vector<int>(ins->nbTaskTypes, 0)),
output(saveFile) {
output(saveFile, ios::app) {
}
UtilAnalysis::~UtilAnalysis(){
......
......@@ -48,7 +48,7 @@ class ProgramOptions {
bool optRevDep = false;
bool outputBest;
std::string outputBestFile;
int nbThreads;
std::set<std::string> optionKeys;
std::vector<fullAlg> algs;
......
......@@ -6,7 +6,7 @@
#include <limits>
#include "SynchroMap.h"
#include <unordered_map>
#include <mutex>
class Instance {
public:
......@@ -20,11 +20,16 @@ public:
int nbWorkerTypes;
SynchroMap extraData;
std::mutex mtx;
// Those are optional, only printed if available.
std::vector<std::string> workerNames;
std::vector<std::string> taskTypeNames;
std::vector<std::string> taskIDs;
std::string inputFile;
std::string platformFile;
// Computed afterwards
public:
std::vector< std::vector<int> > revDep;
......
#include <time.h>
//#include <queue>
//#include <list>
#include <vector>
//#include <map>
#include <set>
#include <string>
#include <iostream>
#include <chrono>
#include <thread>
#include <mutex>
#include "instance.h"
#include "ProgramOptions.h"
#include "algorithm.h"
#include "schedAction.h"
#include "ReproduceAlgorithm.h"
using namespace std;
class AlgResult {
public:
double makespan;
double milliseconds;
string name;
string key; // Is this better ? Not super clean, but well.
/// Might even be randomly generated, why not.
};
void computeAlg(ProgramOptions& progOpt, Instance* ins,
fullAlg& algAndOpt, AlgResult* result) {
Algorithm* alg = algAndOpt.first;
auto opts = algAndOpt.second;
ActionSequence seq;
result->key = progOpt.buildName(alg->name(), opts, true);
result->name = progOpt.buildName(alg->name(), opts);
InternalShare saveResult(result->key, ins);
seq.add(&saveResult);
ExportSchedule* localExport = NULL;
if(opts.isPresent("save")) {
localExport = new ExportToFile(opts.asString("save"), ins, true);
seq.add(localExport);
}
InternalShare* share = NULL;
if(opts.isPresent("share")) {
share = new InternalShare(opts.asString("share"), ins);
seq.add(share);
}
ExportAlloc* exportAlloc = NULL;
if(opts.isPresent("export")) {
exportAlloc = new ExportAlloc(opts.asString("export"), ins);
seq.add(exportAlloc);
}
ExportBubble * exportBubble = NULL;
if(opts.isPresent("bubble")) {
exportBubble = new ExportBubble(opts.asString("bubble"), ins, opts.asInt("bubbleType"));
seq.add(exportBubble);
}
auto start = chrono::steady_clock::now();
try {
result->makespan = alg->compute(*ins, &seq);
auto time = chrono::duration_cast<chrono::milliseconds>(chrono::steady_clock::now() - start);
result->milliseconds = time.count();
//cout << input_file << " " << name << " " << result << " " << time.count();
// if(bestAlgorithm == NULL || bestAlgorithmValue > result) {
// bestAlgorithm = &(*it);
// bestAlgorithmValue = result;
// isBestSoFar = true;
// if(bestExport){
// if(bestSchedule) free(bestSchedule);
// bestSchedule = bestExport;
// }
// }
} catch (int e) {
auto time = chrono::duration_cast<chrono::milliseconds>(chrono::steady_clock::now() - start);
result->makespan = -1;
result->milliseconds = time.count();
}
if(localExport) free(localExport);
if(exportAlloc) free(exportAlloc);
if(share) {
share->finish();
free(share);
}
if(exportBubble) {
exportBubble->finish();
free(exportBubble);
}
saveResult.finish();
}
void displayAlgResult(ProgramOptions& progOpt, Instance* instance,
fullAlg& algAndOpt, AlgResult& result) {
cout << instance->inputFile;
if(instance->platformFile != "")
cout << " " << instance->platformFile;
cout << " " << result.name;
if(result.makespan > 0)
cout << " " << result.makespan;
else
cout << " " << "NA";
cout << " " << result.milliseconds;
if(progOpt.saveFile != ""
|| progOpt.repartitionFile != "") {
// Need to reproduce this run to record the new data and/or save to disk
ActionSequence seq;
ExportSchedule* globalExport = NULL;
UtilAnalysis* utilAnalyser = NULL;
if(progOpt.saveFile != "") {
globalExport = new ExportToFile(progOpt.saveFile, instance, true, result.key);
seq.add(globalExport);
}
if(progOpt.repartitionFile != "") {
utilAnalyser = new UtilAnalysis(instance, progOpt.repartitionFile);
seq.add(utilAnalyser);
}
AlgOptions opt;
opt["key"] = result.key;
ReproduceAlgorithm rep(opt);
rep.compute(*instance, &seq);
if(utilAnalyser){
if(instance->nbWorkerTypes == 2) {
std::vector<std::vector<int> > r = utilAnalyser->getRepartition();
for(int i = 0; i < (int) r.size(); i++) {
double usage = 0.0;
double otherUsage = 0.0;
for(int j = 0; j < (int) r[i].size(); j++){
usage += r[i][j] * instance->execTimes[i][j];
otherUsage += r[i][j] * instance->execTimes[1-i][j];
}
cout << " " << usage << " " << (i == 0 ? usage / otherUsage : otherUsage / usage);
}
} else {
cout << " " << "NA" << " " << "NA";
cout << " " << "NA" << " " << "NA";
}
string prefix = instance->inputFile + " ";
if(instance->platformFile != "")
prefix += instance->platformFile + " ";
utilAnalyser->write(prefix + result.name);
free(utilAnalyser);
}
if(globalExport) free(globalExport);
}
cout << endl;
}
mutex controlMutex;
void computeThread(int idx, ProgramOptions* progOpt, Instance* ins,
vector<AlgResult>* results, vector<bool>* started) {
bool finished = false;
unsigned nbAlgs = started->size();
while(! finished) {
// First find some work to do
int i = 0;
{
lock_guard<mutex> lock(controlMutex);
for(i = 0; i < nbAlgs; i++)
if(!started->at(i)){
(*started)[i] = true;
break;
}
}
if(i == nbAlgs) {
// No work found, exit.
finished = true;
break;
}
computeAlg(*progOpt, ins, progOpt->algs[i], &(results->at(i)));
}
}
int main(int argc, char** argv) {
......@@ -41,6 +219,14 @@ int main(int argc, char** argv) {
ofstream save(progOpt.outputBestFile);
save.close();
}
if(progOpt.saveFile != "") {
ofstream save(progOpt.saveFile);
save.close();
}
if(progOpt.repartitionFile != "") {
ofstream save(progOpt.repartitionFile);
save.close();
}
for(string &input_file: progOpt.inputFiles) {
for(string &platform_file : progOpt.platformFiles) {
......@@ -50,8 +236,9 @@ int main(int argc, char** argv) {
auto start = chrono::steady_clock::now();
if(progOpt.verbosity >= 4)
cerr << "Reading input file '" << input_file << "' with platform file '" << platform_file << "'...";
if(dotPosition != string::npos
&& input_file.compare(dotPosition, string::npos, ".rec") == 0) {
if( (dotPosition != string::npos
&& input_file.compare(dotPosition, string::npos, ".rec") == 0)
|| platform_file != "") {
if(platform_file == "")
instance = new RecFileInstance(input_file);
else
......@@ -107,151 +294,58 @@ int main(int argc, char** argv) {
}
ActionSequence seq;
ExportSchedule* globalExport = NULL;
UtilAnalysis* utilAnalyser = NULL;
if(progOpt.saveFile != "") {
globalExport = new ExportToFile(progOpt.saveFile, instance, true, "NA");
seq.add(globalExport);
}
if(progOpt.repartitionFile != "") {
utilAnalyser = new UtilAnalysis(instance, progOpt.repartitionFile);
seq.add(utilAnalyser);
}
double bestAlgorithmValue = std::numeric_limits<double>::infinity();
fullAlg* bestAlgorithm = NULL;
ExportToString* bestSchedule = NULL;
// Iterate through algorithm list, execute all
for(auto it = progOpt.algs.begin(); it < progOpt.algs.end(); it++) {
auto opts = it->second;
ExportSchedule* localExport = NULL;
if(opts.isPresent("save")) {
localExport = new ExportToFile(opts.asString("save"), instance, true);
seq.add(localExport);
}
vector<AlgResult> results(progOpt.algs.size());
ExportToString* bestExport = NULL;
if(progOpt.outputBestFile != ""){
bestExport = new ExportToString(instance, true, input_file
+ (platform_file == ""?"":":"+platform_file));
seq.add(bestExport);
if(progOpt.nbThreads > 1) {
// Test and warn if nbThreads > #available threads
thread ths[progOpt.nbThreads];
vector<bool> started(progOpt.algs.size(), false);
for(int k = 0; k < progOpt.nbThreads; k++) {
ths[k] = thread(computeThread, k, &progOpt, instance,
&results, &started);
}
InternalShare* share = NULL;
if(opts.isPresent("share")) {
share = new InternalShare(opts.asString("share"), instance);
seq.add(share);
for (auto& th : ths) th.join();
for(uint i = 0; i < progOpt.algs.size(); i++) {
displayAlgResult(progOpt, instance, progOpt.algs[i], results[i]);
}
ExportAlloc* exportAlloc = NULL;
if(opts.isPresent("export")) {
exportAlloc = new ExportAlloc(opts.asString("export"), instance);
seq.add(exportAlloc);
}
ExportBubble * exportBubble = NULL;
if(opts.isPresent("bubble")) {
exportBubble = new ExportBubble(opts.asString("bubble"), instance, opts.asInt("bubbleType"));
seq.add(exportBubble);
} else {
for(uint i = 0; i < progOpt.algs.size(); i++) {
computeAlg(progOpt, instance, progOpt.algs[i], &results[i]);
displayAlgResult(progOpt, instance, progOpt.algs[i], results[i]);
}
string name = progOpt.buildName(it->first->name(), it->second);
// In save files, always use raw names, please.
if(globalExport)
globalExport->changeName(progOpt.buildName(it->first->name(), it->second, true));
auto start = chrono::steady_clock::now();
bool isBestSoFar = false;
try {
double result = it->first->compute(*instance, &seq);
auto time = chrono::duration_cast<chrono::milliseconds>(chrono::steady_clock::now() - start);
cout << input_file << " " << name << " " << result << " " << time.count();
if(bestAlgorithm == NULL || bestAlgorithmValue > result) {
bestAlgorithm = &(*it);
bestAlgorithmValue = result;
isBestSoFar = true;
if(bestExport){
if(bestSchedule) free(bestSchedule);
bestSchedule = bestExport;
}
}
if(utilAnalyser){
if(instance->nbWorkerTypes == 2) {
std::vector<std::vector<int> > r = utilAnalyser->getRepartition();
for(int i = 0; i < (int) r.size(); i++) {
double usage = 0.0;
double otherUsage = 0.0;
for(int j = 0; j < (int) r[i].size(); j++){
usage += r[i][j] * instance->execTimes[i][j];
otherUsage += r[i][j] * instance->execTimes[1-i][j];
}
cout << " " << usage << " " << (i == 0 ? usage / otherUsage : otherUsage / usage);
}
} else {
cout << " " << "NA" << " " << "NA";
cout << " " << "NA" << " " << "NA";
}
utilAnalyser->write(input_file + " " + name);
}
} catch (int e) {
auto time = chrono::duration_cast<chrono::milliseconds>(chrono::steady_clock::now() - start);
}
cout << input_file << " " << name << " " << "NA" << " " << time.count();
}
cout << endl;
if(bestExport) {
seq.remove(bestExport);
if(!isBestSoFar)
free(bestExport);
}
if(localExport){
seq.remove(localExport);
free(localExport);
}
if(exportAlloc) {
seq.remove(exportAlloc);
free(exportAlloc);
}
if(share) {
share->finish();
seq.remove(share);
free(share);
}
if(exportBubble) {
exportBubble->finish();
seq.remove(exportBubble);
free(exportBubble);
}
}
if(globalExport) free(globalExport);
if(progOpt.outputBest) {
double bestAlgorithmValue = std::numeric_limits<double>::infinity();
int bestAlgIdx = -1;
for(uint idx = 0; idx < progOpt.algs.size(); idx++) {
if( results[idx].makespan > 0
&& (bestAlgIdx == -1
|| results[idx].makespan < bestAlgorithmValue) ) {
bestAlgIdx = idx;
bestAlgorithmValue = results[idx].makespan;
}
}
if(bestBound != NULL)
cerr << "Best bound: " << bestBoundValue << " with " << buildNameRawLocal(bestBound) << endl;
cerr << "Best schedule: " << bestAlgorithmValue << " with " << buildNameRawLocal(bestAlgorithm) << endl;
if(bestBound != NULL)
if(bestAlgIdx > 0)
cerr << "Best schedule: " << bestAlgorithmValue << " with " << buildNameRawLocal(&progOpt.algs[bestAlgIdx]) << endl;
if(bestBound != NULL && bestAlgIdx > 0)
cerr << "Gap: " << (bestAlgorithmValue - bestBoundValue) / bestBoundValue << endl;
}
if(progOpt.outputBestFile != "" && bestSchedule) {
ofstream save(progOpt.outputBestFile, ios::out | ios::app);
save << bestSchedule->getResult();
save.close();
if(progOpt.outputBestFile != "" && bestAlgIdx > 0) {
string name = instance->inputFile;
if(instance->platformFile != "")
name += ":" + instance->platformFile;
ExportToFile exp(progOpt.outputBestFile, instance, false, name);
AlgOptions opt;
opt.insert("key", results[bestAlgIdx].key);
ReproduceAlgorithm alg(opt);
alg.compute(*instance, &exp);
}
}
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment