...
 
Commits (30)
......@@ -44,16 +44,16 @@ set(PROG_SRC
add_executable(pmtool ${PROG_SRC})
target_link_libraries(pmtool core)
target_link_libraries(pmtool schedulers)
target_link_libraries(pmtool ${LIBREC_LIBRARIES})
if(CPLEX_FOUND)
target_link_libraries(pmtool bounds)
target_link_libraries(pmtool ${CPLEX_LIBRARIES})
endif()
target_link_libraries(pmtool core)
target_link_libraries(pmtool ${LIBREC_LIBRARIES})
if(THREADS_FOUND)
target_link_libraries(pmtool Threads::Threads)
endif()
......
......@@ -66,6 +66,24 @@
```
pmtool tasks.rec -a rep:key=rl
```
+ `Handles`, `Modes` and `Sizes` allow to specify which data this task
requires and produces. All three are space-separated lists, which
should be the same size. `Handles` contains strings which are
identifiers of data objects. `Modes` can be either `R`, `W` or `RW`,
and indicates whether the corresponding data object is read,
written, or both. `Sizes` are integers and indicate the size of this
object. The parser performs *versioning* of these data objects
according to the precedence specified by the `DependsOn` field: a
data object read by a task is matched to the last data object
produced with the same handle, in a topological ordering of the
dependencies.
It is expected, but not checked, that tasks which write on a data
object have dependencies with all tasks writing or reading the same
data object. It is also expected that data accessed in read mode
have the same size as the data produced by the previous task which
wrote to this handle. Tasks which write to a handle may change its
size.
Because of internal behavior of StarPU, this format allows *virtual* tasks to be added to the instance. A task is virtual
if it has no `Name` field. `JobId` is still compulsory for virtual tasks. Dependencies from real tasks go *through* virtual tasks
......
......@@ -35,10 +35,10 @@ static const int opt_no_dependencies = 11;
static const int opt_no_header = 12;
static const int opt_rev_dep = 13;
static const int opt_best = 14;
static const int opt_thread = 15;
static const int opt_tags = 16;
static const int opt_submit = 17;
static const int opt_bw = 15;
static const int opt_thread = 16;
static const int opt_tags = 17;
static const int opt_submit = 18;
static struct option long_options[] = {
......@@ -58,9 +58,10 @@ static struct option long_options[] = {
{"help", no_argument, 0, 'h'},
{"default", optional_argument, 0, 'd'},
{"best", optional_argument, 0, opt_best},
{"bw", required_argument, 0, opt_bw},
{"threads", optional_argument, 0, opt_thread},
{"use-tags", no_argument, 0, opt_tags},
{"submit-order", no_argument, 0, opt_submit},
{"submit-order", no_argument, 0, opt_submit},
{0, 0, 0, 0 }
};
......@@ -164,18 +165,21 @@ void ProgramOptions::parse(int argc, char** argv) {
if(optarg)
outputBestFile = s_optarg;
break ;
case opt_bw:
platform_bw = atof(optarg);
break;
case opt_thread:
if(optarg)
nbThreads = stoi(s_optarg);
else
nbThreads = -1;
break;
case opt_tags:
appendTags = true;
break;
case opt_submit:
useSubmitOrder = true;
break;
case opt_tags:
appendTags = true;
break;
case opt_submit:
useSubmitOrder = true;
break;
case '?':
case 'h':
usage();
......
......@@ -360,7 +360,10 @@ Greedy algorithm with pre-allocated tasks. Requires a `file` option
which renumbers resources.
+ `key`
Uses pre-allocation shared with share or export
option. Often `KEY[alloc]` actually.
option. Often `KEY[alloc]` actually.
+ `comms`
If `yes`, takes communication into account (if they are
defined in the `tasks.rec` input file). Default is `no`.
+ `stealer` (default -1)
specifies a resource type which is allowed to steal from others,
by number. CAREFUL of `-t` option, which renumbers resources.
......@@ -509,6 +512,12 @@ often:
* `submit-order`
Use the `SubmitOrder` field in `instance.rec` input files instead of `JobId`.
* `--bw <BW>`
Specify the bandwidth used for communications. Unit is not
specified, since it depends on the unit used to specify data sizes
(in the `Sizes` fields of the tasks) and on the unit used to specify
task execution times. Default value is 1.
* `-r <repartFile>`
Output a summary of the repartition of task types on resource types
in the repartFile. If there are only two types of resources, the
......
......@@ -3,6 +3,7 @@ set(CORE_SRC
instance.cpp
RecFileReader.cpp
schedAction.cpp
CommSequence.cpp
util.cpp
SynchroMap.cpp)
add_library(core ${CORE_SRC})
......
#include "CommSequence.h"
#include "util.h"
#include <iostream>
using namespace std;
CommSequence::CommSequence(Instance &ins): bandwidth(ins.bandwidth), nbWT(ins.totalWorkers),
outgoing(nbWT), incoming(nbWT) {
}
double reserveALAP(vector<AvailSequence*> &seqs, double from, double len, double goal = -1) {
vector<timeSeq::iterator> its;
vector<double> startTimes(seqs.size());
for(unsigned int i = 0; i < seqs.size(); i++) {
its.push_back(seqs[i]->getAvail(from, len, startTimes[i]));
}
double finish = getMax(startTimes) + len;
finish = max(finish, goal);
// cout << "reserveALAP: " << from << " len=" << len << " goal=" << goal << " finish=" << finish << endl;
for(unsigned int i = 0; i < seqs.size(); i++) {
AvailSequence* seq = seqs[i];
its[i] = seq->getAvailLatest(from, len, finish, startTimes[i], its[i]);
seq->insertBusy(its[i], startTimes[i], len);
}
return finish;
}
double CommSequence::newTransfer(double now, int source, int dest, int size, double goal) {
vector<AvailSequence*> seqs;
seqs.push_back(&outgoing[source]);
seqs.push_back(&incoming[dest]);
double len = size / bandwidth;
return (reserveALAP(seqs, now, len, goal));
}
......@@ -12,6 +12,7 @@ extern "C" {
#include "instance.h"
using namespace std;
#define C_TEXT( text ) ((char*)std::string( text ).c_str())
static string recordValueOpt(const rec_record_t record, const char* field) {
......@@ -23,20 +24,28 @@ static string recordValueOpt(const rec_record_t record, const char* field) {
return res;
}
}
static string recordValue(const rec_record_t record, const char* field) {
int nb = rec_record_get_num_fields_by_name(record, field);
if(nb == 0) {
cerr << "RecFileReader: mandatory field " << field << " missing at line " << rec_record_location_str(record) << endl;
throw(1);
}
else {
} else {
string res(rec_field_value(rec_record_get_field_by_name(record, field, 0)));
return res;
return res;
}
}
template <class T, typename Func = identity>
static void getVector(const rec_record_t &record, const char* &field,
vector<T> &result, Func &&func = Func()) {
string line = recordValueOpt(record, field);
if(line != "") {
vector<string> words = split(line, ' ');
for(auto& w: words) result.push_back(func(w));
}
}
class recTask {
......@@ -51,16 +60,23 @@ public:
static const char* recWorkerId;
static const char* recStartTime;
static const char* recEndTime;
static const char* recHandles;
static const char* recModes;
static const char* recSizes;
static int outputCount;
static void recReaderInit() {
if(recName == NULL) {
recName = rec_parse_field_name_str("Name");
recFootprint = rec_parse_field_name_str("Footprint");
recJobId = rec_parse_field_name_str("JobId");
recSubmitOrder = rec_parse_field_name_str("SubmitOrder");
recEstimatedTime = rec_parse_field_name_str("EstimatedTime");
recDependsOn = rec_parse_field_name_str("DependsOn");
recTag = rec_parse_field_name_str("Tag");
recName = rec_parse_field_name_str(C_TEXT("Name"));
recFootprint = rec_parse_field_name_str(C_TEXT("Footprint"));
recJobId = rec_parse_field_name_str(C_TEXT("JobId"));
recSubmitOrder = rec_parse_field_name_str(C_TEXT("SubmitOrder"));
recEstimatedTime = rec_parse_field_name_str(C_TEXT("EstimatedTime"));
recDependsOn = rec_parse_field_name_str(C_TEXT("DependsOn"));
recTag = rec_parse_field_name_str(C_TEXT("Tag"));
recHandles = rec_parse_field_name_str(C_TEXT("Handles"));
recModes = rec_parse_field_name_str(C_TEXT("Modes"));
recSizes = rec_parse_field_name_str(C_TEXT("Sizes"));
recWorkerId = rec_parse_field_name_str("WorkerId");
recStartTime= rec_parse_field_name_str("StartTime");
recEndTime = rec_parse_field_name_str("EndTime");
......@@ -84,6 +100,27 @@ public:
double startTime, endTime;
// Communication informations
typedef enum {ModeR, ModeRW, ModeW} modes_t;
vector<string> handles;
vector<modes_t> modes;
vector<int> sizes;
private:
static modes_t strToMode(string m) {
if(m == "R") return ModeR;
if(m=="RW") return ModeRW;
if(m=="W") return ModeW;
cerr << "Unknown mode " << m << ", sorry" << endl;
throw(1);
}
static int stoi(string s) {
return std::stoi(s);
}
public:
recTask(const rec_record_t record, int id) : internalId(id) {
name = recordValueOpt(record, recName);
convertedDeps = false;
......@@ -96,11 +133,25 @@ public:
outputId = outputCount;
++outputCount;
}
tag = recordValueOpt(record, recTag);
tag = recordValueOpt(record, recTag);
jobId = stoi(recordValue(record, recJobId));
getVector(record, recDependsOn, dependsOn, stoi);
getVector(record, recHandles, handles);
getVector(record, recModes, modes, strToMode);
getVector(record, recSizes, sizes, stoi);
if( isReal && ((handles.size() != modes.size()) || (modes.size() != sizes.size()))) {
cerr << "RecReader: data information from jobId " << jobId << " is inconsistent: "
<< handles.size() << " Handles, " << modes.size() << " Modes, " << sizes.size() << " Sizes" << endl;
throw(1);
}
string submitOrderStr = recordValueOpt(record, recSubmitOrder);
if (submitOrderStr != "")
submitOrder = stoi(submitOrderStr);
jobId = stoi(recordValue(record, recJobId));
string deps = recordValueOpt(record, recDependsOn);
if(deps != "") {
vector<string> splitDeps = split(deps, ' ');
......@@ -149,12 +200,16 @@ const char* recTask::recJobId = NULL;
const char* recTask::recSubmitOrder = NULL;
const char* recTask::recEstimatedTime = NULL;
const char* recTask::recDependsOn = NULL;
const char* recTask::recTag = NULL;
const char* recTask::recTag = NULL;
const char* recTask::recHandles = NULL;
const char* recTask::recModes = NULL;
const char* recTask::recSizes = NULL;
const char* recTask::recWorkerId = NULL;
const char* recTask::recStartTime = NULL;
const char* recTask::recEndTime = NULL;
int recTask::outputCount = 0;
void RecFileInstance::readFromFile(const string inputFile, unordered_map<string, vector<double> > timings,
bool useSubmitOrder, bool appendTags) {
......@@ -232,6 +287,7 @@ void RecFileInstance::readFromFile(const string inputFile, unordered_map<string,
execTimes[i].push_back(pairs.second[i]);
}
nbTaskTypes = taskTypeNames.size();
for(auto& intID: realTasks) {
recTask &t = tasks[intID];
vector<int> deps;
......@@ -248,7 +304,8 @@ void RecFileInstance::readFromFile(const string inputFile, unordered_map<string,
}
nbTasks = taskTypes.size();
// Cleanup taskTypeNames if possible
// Cleanup taskTypeNames (name:footprint) if possible (remove
// footprint if no other typeName has the same name)
for(int i = 0; i < nbTaskTypes; i++) {
string n = split(taskTypeNames[i], ':')[0];
bool other=false;
......@@ -260,6 +317,51 @@ void RecFileInstance::readFromFile(const string inputFile, unordered_map<string,
taskTypeNames[i] = n;
}
map<string, pair<int, int> > itemVersionAndIndex;
// Update the information about data items
computeTopOrder();
itemsRequired.resize(nbTasks);
itemsProduced.resize(nbTasks);
for(int & tID: topOrder) {
recTask & t = tasks[realTasks[tID]];
vector<int>& req = itemsRequired[tID];
vector<int>& prod = itemsProduced[tID];
for(int hindex = 0, nbItems = t.handles.size(); hindex < nbItems; hindex++) {
string h = t.handles[hindex];
recTask::modes_t mode = t.modes[hindex];
int version = 0;
int index = itemNames.size();
if(itemVersionAndIndex.count(h) > 0) {
auto data = itemVersionAndIndex[h];
version = data.first;
index = data.second;
} else {
itemNames.push_back(h + ":" + to_string(version));
itemSizes.push_back(t.sizes[hindex]);
itemVersionAndIndex.emplace(h, make_pair(version, index));
}
if(mode == recTask::ModeR || mode == recTask::ModeRW) {
req.push_back(index);
if((t.sizes[hindex] != itemSizes[index]) && (mode == recTask::ModeR)) {
cerr << "RecReader: Warning: Data #" << hindex << " of task " << t.jobId << " (handle " << h << ")"
<< " has size " << t.sizes[hindex] << " but last produced handle has size " << itemSizes[index] << endl;
}
}
if(mode == recTask::ModeW || mode == recTask::ModeRW) {
auto &data = itemVersionAndIndex[h];
++data.first;
data.second = itemNames.size();
itemNames.push_back(h + ":" + to_string(data.first));
itemSizes.push_back(t.sizes[hindex]);
prod.push_back(data.second);
}
}
}
int v = 0;
for(int n: nbWorkers) {
vector<int> ids;
......@@ -321,6 +423,16 @@ RecFileInstance::RecFileInstance(const string inputFile, const string platformFi
}
nbWorkerTypes = nbWorkers.size();
totalWorkers = getSum(nbWorkers);
int memNodes = 1;
int workerIndex = 0;
for(int k = 0; k < nbWorkerTypes; ++k) {
int memnode = 0;
if(workerNames[k].compare(0, 3, "cpu") != 0)
memnode = memNodes++;
vector<int> nodes(nbWorkers[k], memnode);
memoryNodes.push_back(nodes);
}
rec_rset_t timingSet = rec_db_get_rset_by_type(platform, "timing");
if(!timingSet) {
......
......@@ -47,12 +47,16 @@ Instance::Instance(const string input_file, int convertIndices) {
}
totalWorkers = static_cast<unsigned int>(s);
int v = 0;
for(int n: nbWorkers) {
for(int k = 0; k < nbWorkerTypes; k++) {
int n = nbWorkers[k];
vector<int> ids;
for(int i = 0; i < n; i++)
vector<int> nodes(n, k); // Size n, all equal to k
for(int i = 0; i < n; i++) {
ids.push_back(v + i);
}
v += n;
workerIDs.push_back(ids);
memoryNodes.push_back(nodes);
}
execTimes.resize(nbWorkerTypes);
......@@ -125,7 +129,7 @@ void Instance::display(int verbosity) {
cout << taskTypeNames << endl;
for(i = 0; i < (int) nbWorkers.size(); i++){
if(workerNames.size() > 0) cout << workerNames[i] << " \t";
cout << execTimes[i] << " " << workerIDs[i] << endl;
cout << execTimes[i] << " " << workerIDs[i] << " " << memoryNodes[i] << endl;
}
if(verbosity >= 4) {
cout << taskTypes << endl;
......@@ -199,7 +203,8 @@ void Instance::mergeWorkerTypes(const vector<int> indicesToMerge) {
vector<int> newNbWorkers;
vector< vector<double> > newExecTimes;
vector<vector<int> > newWorkerIDs;
vector<vector<int> > newMemoryNodes;
int replacementNB = 0;
vector<double> replacementExecTimes(nbTaskTypes, 0);
......@@ -222,21 +227,25 @@ void Instance::mergeWorkerTypes(const vector<int> indicesToMerge) {
newNbWorkers.push_back(nbWorkers[i]);
newExecTimes.push_back(execTimes[i]);
newWorkerIDs.push_back(workerIDs[i]);
newMemoryNodes.push_back(memoryNodes[i]);
}
}
vector<int> replacementIDs;
vector<int> replacementMemNodes;
for(int i = 0; i < (int) indicesToMerge.size(); i++) {
replacementIDs.insert(replacementIDs.end(), workerIDs[i].begin(), workerIDs[i].end());
replacementMemNodes.insert(replacementMemNodes.end(), memoryNodes[i].begin(), memoryNodes[i].end());
}
newWorkerIDs.push_back(replacementIDs);
newMemoryNodes.push_back(replacementMemNodes);
newNbWorkers.push_back(replacementNB);
newExecTimes.push_back(replacementExecTimes);
nbWorkers = newNbWorkers;
execTimes = newExecTimes;
workerIDs = newWorkerIDs;
memoryNodes = newMemoryNodes;
nbWorkerTypes = nbWorkers.size();
......
#ifndef COMSEQUENCE_H
#define COMSEQUENCE_H
#include <vector>
#include "availSequence.h"
#include "instance.h"
// Implements a not so greedy way of handling communication resources
// New transfers can only start after any ongoing transfer with which they are incompatible.
// Interference is handled with availSequence
class CommSequence {
protected:
double bandwidth;
int nbWT;
// This handles contention.
// 1st approx: each transfer starts ASAP on its source & dest resource
// Transfer end is when both are done.
// 2nd approx: transfers are preemptible (malleable when BW values are heterogeneous ?)
// ending time = max availability, on other resources it starts ALAP ?
// Yes, but then how does it interact with incompatibility ?
// Easy way: no incompatibility, only contention. With
// availSequences. Transfers start ALAP. LastMile model ? With
// homogeneous BW to start. So transfers use 2 resources, and use
// them at 100%.
std::vector<AvailSequence> outgoing;
std::vector<AvailSequence> incoming;
public:
CommSequence(Instance& ins);
// Returns ending time
// If provided, goal means: do not finish earlier than this time.
// schedule the transfer, trying to end at "goal"
// if "goal" is too early, end as soon as possible
// In all cases, schedule all parts of the transfer to finish at the same time, ALAP.
double newTransfer(double now, int source, int dest, int size, double goal = -1.0);
};
#endif
......@@ -11,7 +11,8 @@
class GreedyAlgorithm : public Algorithm {
protected:
// RankComputer ranker;
bool doComms;
int verbosity;
virtual int chooseTask(int worker, double now) = 0;
virtual void onTaskPush(int task, double now) = 0;
......
#ifndef GREEDYFILE_H
#define GREEDYFILE_H
#include "algorithm.h"
#include "instance.h"
#include "GreedyAlgorithm.h"
#include <vector>
#include <string>
class GreedyRanker : public RankComputer {
protected:
intCompare* oneCmp(Instance& ins, std::string subRankOpt);
std::vector<int> preAssign;
public:
GreedyRanker(const AlgOptions & options) : RankComputer(options) {}
TaskSet* makeSet(Instance & ins, std::vector<int> alloc);
};
class GreedyFile : public GreedyAlgorithm {
protected:
std::string file;
std::vector<TaskSet*> queues;
std::vector<int> preAssign;
GreedyRanker ranker;
Instance* ins;
int stealerType;
bool spoliate;
bool pick;
std::string stealerName;
std::vector<bool> isStealer;
public:
GreedyFile(const AlgOptions & options);
double compute(Instance & instance, SchedAction* action);
int chooseTask(int worker, double now);
void onTaskPush(int task);
std::string name();
};
#endif
......@@ -19,8 +19,10 @@ class GreedyPerType : public GreedyAlgorithm {
bool spoliate;
bool pick;
std::string stealerName;
protected:
std::vector<bool> isStealer;
protected:
Instance* ins;
std::vector<TaskSet*> queues;
virtual TaskSet* makeQueue();
......
......@@ -40,6 +40,7 @@ class ProgramOptions {
bool noHeader = false;
bool optRevDep = false;
bool outputBest;
double platform_bw = 1;
std::string outputBestFile;
int nbThreads;
bool appendTags;
......
......@@ -16,7 +16,8 @@ typedef std::list<Period> timeSeq;
class AvailSequence {
static constexpr double epsilon = 1e-9;
/* Guarantee: seq always contains an infinitely long period (end = -1) at the end */
timeSeq seq;
......@@ -24,6 +25,14 @@ class AvailSequence {
AvailSequence(double start = 0);
timeSeq::iterator getAvail(double from, double len, double& start);
timeSeq::iterator getAvailLatest(double from, double len, double latest,
double& start, timeSeq::iterator &i);
timeSeq::iterator getAvailLatest(double from, double len, double latest,
double& start) {
timeSeq::iterator r = seq.begin();
return getAvailLatest(from, len, latest, start, r);
}
void insertBusy(timeSeq::iterator, double start, double len);
void insertBusy(double start, double len);
void display(std::ostream&);
......
......@@ -29,6 +29,48 @@ public:
std::vector<std::string> taskTypeNames;
std::vector<std::string> taskIDs;
// Information about communications
// Invariants: each task type uses the same data sizes
//
// Need to identify data items, give them sizes (maybe they have types ?)
// tasks depend on data items, produce items
// EACH ITEM IS PRODUCED ONCE ! Converted from *PU data by versioning
// Need location information about initial data
// [data.rec only gives MPI owners, and coordinates]
// Need information about which worker is in which memory node
// Need bandwidth information
// [Available in StarPU, but not yet in .rec format]
// Questions: I think I cannot handle commutative stuff
// Reading data:
// Each task has Handles and Modes
// Following a topological ordering, create a new version each time a task writes a handle
// Tasks depend on the last version of the Handle
// Starting state:
// constant bandwidth as command-line argument [Hmm. Not clean, though. ]
// Memory node = worker type [otherwise I am not sure how to handle it in schedulers]
// Initial data is all on CPUs
// No item type, I don't think it's needed, store items directly
// If necessary: remove the itemNames, that's a lot of strings.
// itemSizes repeats a lot of information, but not more than if it was itemTypes + typeSizes,
// and less indirection...
std::vector<int> itemSizes;
std::vector<std::string> itemNames;
std::vector<std::vector<int> > itemsRequired;
// itemsRequired[i][j] = index of jth item that task i requires
std::vector<std::vector<int> > itemsProduced;
// itemsProduced[i][j] = index of jth item that task i produces
std::vector<std::vector<int>> memoryNodes;
// This vector works like workerIDs: length is nbWorkerTypes, memoryNodes[t] specifies
// the memory nodes of all workers of type t
double bandwidth;
std::string inputFile;
std::string platformFile;
......
......@@ -88,6 +88,21 @@ T getMax(std::vector<T> v) {
return m;
}
template <class T, class R, typename Func>
T getMax(std::vector<T> v, Func func) {
if(v.size() == 0) return -1;
T best = v[0];
R bestScore = func(best);
for(auto &x: v) {
R score = func(x);
if(score > bestScore){
best = x;
bestScore = score;
}
}
return best;
}
template <class T> T getSum(std::vector<T> v){
return getSum<T, T>(v, [] (T a) { return a; });
}
......@@ -100,4 +115,11 @@ template <class T, class R, typename Func> R getSum(std::vector<T> v, Func func)
}
struct identity {
template<typename U>
constexpr auto operator()(U&& v) const noexcept
-> decltype(std::forward<U>(v))
{
return std::forward<U>(v);
}
};
......@@ -261,6 +261,8 @@ int main(int argc, char** argv) {
else
instance = new Instance(input_file, progOpt.convertIndices);
instance->bandwidth = progOpt.platform_bw;
if(progOpt.noDependencies)
instance->removeDependencies();
if(progOpt.optRevDep)
......
#include "GreedyAlgorithm.h"
#include "CommSequence.h"
#include "util.h"
#include <set>
#include <iostream>
......@@ -12,11 +13,18 @@ using namespace std;
GreedyAlgorithm::GreedyAlgorithm(const AlgOptions & options) {
verbosity = options.asInt("verbosity", 0);
verbosity = options.asInt("verbosity", 0);
doComms = options.asString("comms", "no") == "yes";
}
double GreedyAlgorithm::compute(Instance& ins, SchedAction* action) {
CommSequence comms(ins);
vector<double> dataAvailTimes(ins.itemSizes.size(), 0);
vector<int> locations(ins.itemSizes.size(), 0); // CAREFUL HERE !! ASSUMING
// THAT DATA IS ON LOCATION 0 AT THE START
// IS NOT NICE !!
// Compute ranks
// TaskSet* readyTasks = ranker.makeSet(ins);
......@@ -65,14 +73,34 @@ double GreedyAlgorithm::compute(Instance& ins, SchedAction* action) {
}
if(chosenTask >= 0) {
// Check that all dependencies are satisfied
for(int k : ins.dependencies[chosenTask])
if(!finishedTasks[k]) {
cerr << "Greedy: chosenTask is " << chosenTask << " but its dependency " << k << " is not finished" << endl;
throw(1);
}
double finishTime = currentTime + ins.execWorker(idle, chosenTask);
double startTime = currentTime;
if(doComms) {
// Compute a finish time which takes communication into account.
for(int &item: ins.itemsRequired[chosenTask]) {
int src = locations[item];
int dst = ins.memoryNodes[t][index];
if(src != dst) {
double commEndTime = comms.newTransfer(dataAvailTimes[item], src, dst, ins.itemSizes[item], currentTime);
startTime = max(startTime, commEndTime);
if(verbosity >= 4)
cout << "Greedy: for task " << chosenTask << " on " << idle << " node " << dst
<< ": item " << ins.itemNames[item] << " is on " << src << " at "
<< dataAvailTimes[item] << ", comm. end delay is " << commEndTime << endl;
}
}
}
double finishTime = startTime + ins.execWorker(idle, chosenTask);
if(verbosity >= 4)
cout << "Greedy: starting " << chosenTask << " of type " << ins.taskTypes[chosenTask] << " at " << currentTime << " to end at " << finishTime << " on worker " << idle << " of type " << t << endl;
cout << "Greedy: starting " << chosenTask << " of type " << ins.taskTypes[chosenTask] << " at " << startTime << " to end at " << finishTime << " on worker " << idle << " of type " << t << endl;
endTimesWorkers[idle] = finishTime;
runningTasks[idle] = chosenTask;
......@@ -116,11 +144,20 @@ double GreedyAlgorithm::compute(Instance& ins, SchedAction* action) {
for(int w = 0; w < nbWorkers; w++)
if((runningTasks[w] != -1) && (endTimesWorkers[w] <= currentTime)) {
int i = runningTasks[w];
int index;
int wType = ins.getType(w, &index);
if(verbosity >= 4)
cout << "Greedy: Finishing task " << i << " on worker " << w << " at time " << currentTime << endl;
if(action != NULL)
action->onSchedule(runningTasks[w], w,
endTimesWorkers[w] - ins.execWorker(w, runningTasks[w]), endTimesWorkers[w]);
if(doComms) {
for(int& item: ins.itemsProduced[i]) {
locations[item] = ins.memoryNodes[wType][index];
dataAvailTimes[item] = currentTime;
cout << ins.itemNames[item] << " " << locations[item] << " " << locations[item] << " " << currentTime << " " << currentTime << " " << i << endl;
}
}
runningTasks[w] = -1;
if(finishedTasks[i]) {
cerr << "Greedy: task " << i << " was already finished !" << endl;
......
#include "GreedyFile.h"
#include <fstream>
#include <iostream>
using namespace std;
intCompare* GreedyRanker::oneCmp(Instance& ins, string subRankOpt) {
vector<double> w;
string old(subRankOpt);
bool reverse = isReverse(subRankOpt);
if(subRankOpt == "alloc") {
w = ins.computePreAllocRank(preAssign);
return (new rankCompare(w, reverse));
} else {
return RankComputer::oneCmp(ins, old);
}
}
TaskSet* GreedyRanker::makeSet(Instance & ins, vector<int> alloc) {
preAssign = alloc;
return RankComputer::makeSet(ins);
}
GreedyFile::GreedyFile(const AlgOptions& options): GreedyAlgorithm(options), ranker(options) {
file = options.asString("file", "");
if(file == "") {
cerr << "GreedyFile: option 'file' is mandatory" << endl;
throw(1);
}
stealerType = options.asInt("stealer", -1);
stealerName = options.asString("stealerName");
if(stealerName != "" && stealerType != -1) {
cerr << "GreedyFile: warning: specified both stealer and stealerName, stealer has precedence." <<endl;
}
spoliate = options.asString("spoliate", "yes") == "yes";
pick = options.asString("pick", "no") == "yes";
}
string GreedyFile::name() {
return "greedy";
}
double GreedyFile::compute(Instance & instance, SchedAction* action) {
ins = &instance;
if(file.substr(0, 4) == "int@") {
vector<int>* values = (vector<int>*) ins->extraData[file.substr(4)];
if(!values) {
cerr << "GreedyFile: could not find shared allocation " << file << ". Did you really use the share=<...> option in a previous algorithm?" << endl;
throw(1);
}
preAssign.resize(ins->nbTasks);
for(int i = 0; i < ins->nbTasks; i++)
preAssign[i] = (*values)[i];
} else {
ifstream inputStream(file);
if(!inputStream) {
cerr << "GreedyFile: cannot open " << file << endl;
throw(1);
}
preAssign.resize(ins->nbTasks);
for(int i = 0; i < ins->nbTasks; i++) {
inputStream >> preAssign[i] >> ws;
if(preAssign[i] < 0 || preAssign[i] >= ins->nbWorkerTypes) {
cerr << "In '" << file << "', index " << i << ": value " << preAssign[i] << "invalid" << endl;
throw(1);
}
}
}
isStealer.resize(ins->nbWorkerTypes, false);
if(stealerType != -1)
isStealer[stealerType] = true;
if(stealerName != "" && stealerType == -1) {
int nbStealers = 0;
for(int i = 0; i < ins->nbWorkerTypes; i++)
if(ins->workerNames[i].compare(0, stealerName.size(), stealerName) == 0){
isStealer[i] = true; ++nbStealers;
}
if(nbStealers == 0) {
cerr << "GreedyFile: warning: could not find stealer name " << stealerName << endl;
} else if(verbosity >= 3)
cerr << "GreedyFile: stealerName " << stealerName << " found " << nbStealers << endl;
}
queues.clear();
for(int i = 0; i < ins->nbWorkerTypes; i++)
queues.push_back(ranker.makeSet(instance, preAssign));
return GreedyAlgorithm::compute(instance, action);
}
int GreedyFile::chooseTask(int worker, double now) {
int wType = ins->getType(worker);
if(queues[wType]->empty()){
if(isStealer[wType]) {
int bestTask = -1;
int chosenVictim = -1;
if(pick) {
// Try to steal first, and then try to spoliate
// if no task is found and spoliate is true.
for(int t = 0; t < ins->nbWorkerTypes; t++) {
if(t != wType && ! queues[t]->empty()) {
int candidateTask = queues[t]->front();
// With my API I cannot look at all tasks :'(
if(ins->isValidType(wType, candidateTask)) {
double taskAF = ins->execType(t, candidateTask) / ins->execType(wType, candidateTask);
double bestAF = ins->execType(t, bestTask) / ins->execType(wType, bestTask);
if(taskAF > 1) {
if(bestTask == -1 || (taskAF > bestAF) ||
( (taskAF == bestAF) && queues[wType] -> compare(candidateTask, bestTask))) {
bestTask = candidateTask;
chosenVictim = t;
}
}
}
}
}
if(bestTask != -1) {
queues[chosenVictim]->eraseFront();
return bestTask;
}
}
if(spoliate) {
for(int victim = 0; victim < ins->totalWorkers; victim++) {
int victimType = ins->getType(victim);
// ins->getType not efficient, but easier to read
if(!isStealer[victimType] && runningTasks[victim] != -1
&& ins->isValidType(wType, runningTasks[victim])
&& now + ins->execType(wType, runningTasks[victim]) < endTimesWorkers[victim]) {
int task = runningTasks[victim];
double taskAF = ins->execType(victimType, task)/ins->execType(wType, task);
double bestAF = ins->execType(victimType, bestTask)/ins->execType(wType, bestTask);
if(bestTask == -1
|| (taskAF > bestAF)
|| ((taskAF == bestAF)
&& queues[wType]->compare(task, bestTask))) {
bestTask = task;
chosenVictim = victim;
}
}
}
if(bestTask != -1) {
runningTasks[chosenVictim] = -1;
return bestTask;
}
}
}
return -1;
}
int result = queues[wType]->front();
queues[wType]->eraseFront();
return result;
}
void GreedyFile::onTaskPush(int task) {
queues[preAssign[task]]->insert(task);
}
......@@ -20,31 +20,36 @@ GreedyPerType::GreedyPerType(const AlgOptions& options): GreedyAlgorithm(options
}
double GreedyPerType::compute(Instance & instance, SchedAction* action) {
ins = &instance;
ins = &instance;
if(stealerName != "" && stealerType == -1) {
for(int i = 0; i < ins->nbWorkerTypes; i++)
if(ins->workerNames[i].compare(0, stealerName.size(), stealerName) == 0){
stealerType = i; break;
}
if(stealerType == -1) {
cerr << "PreAllocatedGreedy: warning: could not find stealer name " << stealerName << endl;
} else if(verbosity >= 3)
cerr << "PreAllocatedGreedy: stealerName " << stealerName << " number " << stealerType << endl;
}
isStealer.resize(ins->nbWorkerTypes, false);
if(stealerType != -1)
isStealer[stealerType] = true;
if(stealerName != "" && stealerType == -1) {
int nbStealers = 0;
for(int i = 0; i < ins->nbWorkerTypes; i++)
if(ins->workerNames[i].compare(0, stealerName.size(), stealerName) == 0){
isStealer[i] = true; ++nbStealers;
}
if(nbStealers == 0) {
cerr << "PreAllocatedGreedy: warning: could not find stealer name " << stealerName << endl;
} else if(verbosity >= 3)
cerr << "PreAllocatedGreedy: stealerName " << stealerName << " found " << nbStealers << endl;
}
queues.clear();
for(int i = 0; i < ins->nbWorkerTypes; i++)
queues.push_back(makeQueue());
queues.clear();
for(int i = 0; i < ins->nbWorkerTypes; i++)
queues.push_back(makeQueue());
return GreedyAlgorithm::compute(instance, action);
return GreedyAlgorithm::compute(instance, action);
}
int GreedyPerType::chooseTask(int worker, double now) {
int wType = ins->getType(worker);
if(queues[wType]->empty()){
if(wType == stealerType) {
if(isStealer[wType]) {
int bestTask = -1;
int chosenVictim = -1;
......@@ -79,7 +84,7 @@ int GreedyPerType::chooseTask(int worker, double now) {
for(int victim = 0; victim < ins->totalWorkers; victim++) {
int victimType = ins->getType(victim);
// ins->getType not efficient, but easier to read
if(victimType != wType && runningTasks[victim] != -1
if(!isStealer[wType] && runningTasks[victim] != -1
&& ins->isValidType(wType, runningTasks[victim])
&& now + ins->execType(wType, runningTasks[victim]) < endTimesWorkers[victim]) {
int task = runningTasks[victim];
......
#include "availSequence.h"
#include <limits>
#include <ostream>
#include <iostream>
using namespace std;
AvailSequence::AvailSequence(double start) {
// Period p();
......@@ -24,8 +26,55 @@ timeSeq::iterator AvailSequence::getAvail(double from, double len, double& start
return i;
}
timeSeq::iterator AvailSequence::getAvailLatest(double from, double len, double latest,
double& start, timeSeq::iterator &i) {
timeSeq::iterator res = i;
bool found = false;
for(; i != seq.end(); i++)
if(i->end >= from) break;
if(i->start + len > latest) {
cerr << "getAvailLatest " << from << " " << len << " " << latest
<< ": starting interval is too late:" << i->start + len << endl;
throw(150);
}
if(from + len > latest) {
cerr << "getAvailLatest " << from << " " << len << " " << latest
<< ": earliest time is too late:" << i->start + len << endl;
throw(150);
}
// cerr << "gAL: " << std::min(i->end, latest) << " " << std::max(from, i->start) << " " << len << " " << std::min(i->end, latest) - std::max(from, i->start) << " " << (len + std::max(from, i->start) <= std::min(i->end, latest)) << endl;
// This looks highly unstable: for unknown reason, if I write
// len <= min - max, this does not detect equality correctly.
if(std::max(from, i->start) + len <= std::min(i->end, latest)) {
res = i;
start = std::min(i->end, latest) - len;
found = true;
}
for(i++; i != seq.end(); i++) {
if(i->start > latest) break;
if(i->start + len <= std::min(i->end, latest)) {
start = std::min(i->end, latest) - len;
res = i;
found = true;
}
}
if(!found) {
cerr << "getAvailLatest " << from << " " << len << " " << latest
<< ": no interval found, ended at " << i->start << " " << i->end << endl;
display(cerr);
throw(44);
}
return res;
}
void AvailSequence::insertBusy(timeSeq::iterator i, double start, double len) {
/* Assumes i != seq.end(), i->start <= start, i->end >= start + len */
if((i == seq.end()) || (i->start > start + epsilon) || (i->end + epsilon < start + len)) {
cerr << "AvailSequence::insertBusy: invalid arguments. Start " << start
<< ", len " << len << ", interval [" << i->start << "," << i->end << "]" << endl;
throw(12);
}
Period orig = *i;
auto j = seq.erase(i);
if(orig.start < start) {
......