Commit 4c77b652 authored by EYRAUD-DUBOIS Lionel's avatar EYRAUD-DUBOIS Lionel

Merge branch 'comms' into 'master'

handle communications, related to #19

Additional code to handle communications. Contains: 
- parsing logic to get information about handles required/produces by each task from `task.rec`
- change to availSequence to allow ALAP insertion
- CommSequence to handle comms, ItemManager to handle data availability
- new option `comms` in GreedyAlgorithm to activate all this

Notably missing: 
- better way to specify bandwidth (for now it is a global value with `--bw` option)
- for now default location for all initial items is memory node 0
- include this in (some) other algorithms

See merge request !4
parents 4989b525 84b4aa45
......@@ -51,16 +51,16 @@ set(PROG_SRC
add_executable(pmtool ${PROG_SRC})
target_link_libraries(pmtool core)
target_link_libraries(pmtool schedulers)
target_link_libraries(pmtool ${LIBREC_LIBRARIES})
if(CPLEX_FOUND)
target_link_libraries(pmtool bounds)
target_link_libraries(pmtool ${CPLEX_LIBRARIES})
endif()
target_link_libraries(pmtool core)
target_link_libraries(pmtool ${LIBREC_LIBRARIES})
if(THREADS_FOUND)
target_link_libraries(pmtool Threads::Threads)
endif()
......
......@@ -69,6 +69,24 @@ by adding the `rep` (reproduce) algorithm to the command line, with the option `
```
pmtool tasks.rec -a rep:key=rl
```
+ `Handles`, `Modes` and `Sizes` allow to specify which data this task
requires and produces. All three are space-separated lists, which
should be the same size. `Handles` contains strings which are
identifiers of data objects. `Modes` can be either `R`, `W` or `RW`,
and indicates whether the corresponding data object is read,
written, or both. `Sizes` are integers and indicate the size of this
object. The parser performs *versioning* of these data objects
according to the precedence specified by the `DependsOn` field: a
data object read by a task is matched to the last data object
produced with the same handle, in a topological ordering of the
dependencies.
It is expected, but not checked, that tasks which write on a data
object have dependencies with all tasks writing or reading the same
data object. It is also expected that data accessed in read mode
have the same size as the data produced by the previous task which
wrote to this handle. Tasks which write to a handle may change its
size.
Because of internal behavior of StarPU, this format allows *virtual* tasks to be added to the instance. A task is virtual
if it has no `Name` field. `JobId` is still compulsory for virtual tasks. Dependencies from real tasks go *through* virtual tasks
......
......@@ -38,11 +38,12 @@ static const int opt_no_dependencies = 11;
static const int opt_no_header = 12;
static const int opt_rev_dep = 13;
static const int opt_best = 14;
static const int opt_thread = 15;
static const int opt_tags = 16;
static const int opt_submit = 17;
static const int opt_export_type = 18;
static const int opt_bw = 15;
static const int opt_thread = 16;
static const int opt_tags = 17;
static const int opt_submit = 18;
static const int opt_export_type = 19;
static const int opt_export_order = 20;
static struct option long_options[] = {
......@@ -62,10 +63,12 @@ static struct option long_options[] = {
{"help", no_argument, 0, 'h'},
{"default", optional_argument, 0, 'd'},
{"best", optional_argument, 0, opt_best},
{"bw", required_argument, 0, opt_bw},
{"threads", optional_argument, 0, opt_thread},
{"use-tags", no_argument, 0, opt_tags},
{"submit-order", no_argument, 0, opt_submit},
{"export-type", no_argument, 0, opt_export_type},
{"export-order", no_argument, 0, opt_export_order},
{0, 0, 0, 0 }
};
......@@ -82,8 +85,11 @@ ProgramOptions::ProgramOptions() {
optRevDep = false;
outputBest = false;
nbThreads = 1;
useSubmitOrder = false;
appendTags = false;
useSubmitOrder = false;
appendTags = false;
outputTypeInExport = false;
workerOrderInExport = false;
platform_bw = 1;
}
void ProgramOptions::parse(int argc, char** argv) {
......@@ -169,6 +175,9 @@ void ProgramOptions::parse(int argc, char** argv) {
if(optarg)
outputBestFile = s_optarg;
break ;
case opt_bw:
platform_bw = atof(optarg);
break;
case opt_thread:
if(optarg)
nbThreads = stoi(s_optarg);
......@@ -184,6 +193,9 @@ void ProgramOptions::parse(int argc, char** argv) {
case opt_export_type:
outputTypeInExport = true;
break;
case opt_export_order:
workerOrderInExport = true;
break;
case '?':
case 'h':
usage();
......
......@@ -369,7 +369,10 @@ Greedy algorithm with pre-allocated tasks. Requires a `file` option
which renumbers resources.
+ `key`
Uses pre-allocation shared with share or export
option. Often `KEY[alloc]` actually.
option. Often `KEY[alloc]` actually.
+ `comms`
If `yes`, takes communication into account (if they are
defined in the `tasks.rec` input file). Default is `no`.
+ `stealer` (default -1)
specifies a resource type which is allowed to steal from others,
by number. CAREFUL of `-t` option, which renumbers resources.
......@@ -525,6 +528,17 @@ often:
When exporting in `.rec` format with the `export=` option of algorithms, specify
all workers of this type instead of the particular worker.
* `export-order`
When exporting in `.rec` format with the `export=` option of algorithms, specify
the ordering of tasks on workers in addition to the allocation to workers. This is
not compatible with `--export-type`.
* `--bw <BW>`
Specify the bandwidth used for communications. Unit is not
specified, since it depends on the unit used to specify data sizes
(in the `Sizes` fields of the tasks) and on the unit used to specify
task execution times. Default value is 1.
* `-r <repartFile>`
Output a summary of the repartition of task types on resource types
in the repartFile. If there are only two types of resources, the
......
......@@ -3,7 +3,8 @@ set(CORE_SRC
instance.cpp
RecFileReader.cpp
schedAction.cpp
CommSequence.cpp
util.cpp
SynchroMap.cpp)
SynchroMap.cpp ItemManager.cpp ../include/ItemManager.h)
add_library(core ${CORE_SRC})
#include "CommSequence.h"
#include "util.h"
#include <iostream>
using namespace std;
CommSequence::CommSequence(Instance &ins): bandwidth(ins.bandwidth), nbWT(ins.totalWorkers),
outgoing(nbWT), incoming(nbWT) {
}
double reserveALAP(vector<AvailSequence*> &seqs, double from, double len, double goal = -1) {
vector<timeSeq::iterator> its;
vector<double> startTimes(seqs.size());
for(unsigned int i = 0; i < seqs.size(); i++) {
its.push_back(seqs[i]->getAvail(from, len, startTimes[i]));
}
double finish = getMax(startTimes) + len;
finish = max(finish, goal);
// cout << "reserveALAP: " << from << " len=" << len << " goal=" << goal << " finish=" << finish << endl;
for(unsigned int i = 0; i < seqs.size(); i++) {
AvailSequence* seq = seqs[i];
its[i] = seq->getAvailLatest(from, len, finish, startTimes[i], its[i]);
seq->insertBusy(its[i], startTimes[i], len);
}
return finish;
}
double CommSequence::newTransfer(double now, int source, int dest, int size, double goal) {
vector<AvailSequence*> seqs;
seqs.push_back(&outgoing[source]);
seqs.push_back(&incoming[dest]);
double len = size / bandwidth;
return (reserveALAP(seqs, now, len, goal));
}
//
// Created by eyraud on 18/12/18.
//
#include <ItemManager.h>
#include <iostream>
#include "ItemManager.h"
using namespace std;
ItemManager::ItemManager(Instance & instance) : ins(&instance), comms(instance),
dataAvailTimes(ins->itemSizes.size()), sourceLocation(ins->itemSizes.size(), -1) {
vector<bool> isProduced(ins->itemSizes.size(), false);
for(auto &p : ins->itemsProduced) {
for (int &i: p)
isProduced[i] = true;
}
for(uint i = 0; i < ins->itemSizes.size(); ++i) {
dataAvailTimes[i].resize(ins->nbMemoryNodes, -1);
if(! isProduced[i]) {
// This is where I assume that all
// input data are at location 0 at
// the start.
sourceLocation[i] = 0;
dataAvailTimes[i][sourceLocation[i]] = 0;
}
}
}
double ItemManager::sendItemTo(int item, int dstMemNode, double currentTime) {
if(dataAvailTimes[item][dstMemNode] < 0) {
int src = sourceLocation[item]; // Ideally, I could choose the source location that minimizes commEndTime ?
if (src < 0 || src >= ins->nbMemoryNodes) {
cerr << "sendItemTo for item " << ins->itemNames[item] << " to " << dstMemNode << " at " << currentTime << ": sourceLocation is invalid " << src << endl;
throw(1);
}
double commEndTime = comms.newTransfer(dataAvailTimes[item][src], src, dstMemNode, ins->itemSizes[item], currentTime);
dataAvailTimes[item][dstMemNode] = commEndTime;
return commEndTime;
// if(verbosity >= 4)
// cout << "Greedy: for task " << chosenTask << " on " << idle << " node " << dst
// << ": item " << ins.itemNames[item] << " is on " << src << " at "
// << dataAvailTimes[item] << ", comm. end delay is " << commEndTime << endl;
} else {
return currentTime;
}
}
void ItemManager::produceItemOn(int item, int memNode, double currentTime) {
if(sourceLocation[item] >= 0) {
cerr << "ItemManager: item " << item << " (" << ins->itemNames[item] << ") produced several times: was on " << sourceLocation[item] << ", produced on " << memNode << endl;
throw(1);
}
// cerr << "ItemManager: Producing item " << item << " (" << ins->itemNames[item] << ") : was on " << sourceLocation[item] << ", produced on " << memNode << endl;
dataAvailTimes[item][memNode] = currentTime;
sourceLocation[item] = memNode;
}
......@@ -12,6 +12,7 @@ extern "C" {
#include "instance.h"
using namespace std;
#define C_TEXT( text ) ((char*)std::string( text ).c_str())
static string recordValueOpt(const rec_record_t record, const char* field) {
......@@ -23,20 +24,28 @@ static string recordValueOpt(const rec_record_t record, const char* field) {
return res;
}
}
static string recordValue(const rec_record_t record, const char* field) {
int nb = rec_record_get_num_fields_by_name(record, field);
if(nb == 0) {
cerr << "RecFileReader: mandatory field " << field << " missing at line " << rec_record_location_str(record) << endl;
throw(1);
}
else {
} else {
string res(rec_field_value(rec_record_get_field_by_name(record, field, 0)));
return res;
return res;
}
}
template <class T, typename Func = identity>
static void getVector(const rec_record_t &record, const char* &field,
vector<T> &result, Func &&func = Func()) {
string line = recordValueOpt(record, field);
if(line != "") {
vector<string> words = split(line, ' ');
for(auto& w: words) result.push_back(func(w));
}
}
class recTask {
......@@ -51,16 +60,23 @@ public:
static const char* recWorkerId;
static const char* recStartTime;
static const char* recEndTime;
static const char* recHandles;
static const char* recModes;
static const char* recSizes;
static int outputCount;
static void recReaderInit() {
if(recName == NULL) {
recName = rec_parse_field_name_str("Name");
recFootprint = rec_parse_field_name_str("Footprint");
recJobId = rec_parse_field_name_str("JobId");
recSubmitOrder = rec_parse_field_name_str("SubmitOrder");
recEstimatedTime = rec_parse_field_name_str("EstimatedTime");
recDependsOn = rec_parse_field_name_str("DependsOn");
recTag = rec_parse_field_name_str("Tag");
recName = rec_parse_field_name_str(C_TEXT("Name"));
recFootprint = rec_parse_field_name_str(C_TEXT("Footprint"));
recJobId = rec_parse_field_name_str(C_TEXT("JobId"));
recSubmitOrder = rec_parse_field_name_str(C_TEXT("SubmitOrder"));
recEstimatedTime = rec_parse_field_name_str(C_TEXT("EstimatedTime"));
recDependsOn = rec_parse_field_name_str(C_TEXT("DependsOn"));
recTag = rec_parse_field_name_str(C_TEXT("Tag"));
recHandles = rec_parse_field_name_str(C_TEXT("Handles"));
recModes = rec_parse_field_name_str(C_TEXT("Modes"));
recSizes = rec_parse_field_name_str(C_TEXT("Sizes"));
recWorkerId = rec_parse_field_name_str("WorkerId");
recStartTime= rec_parse_field_name_str("StartTime");
recEndTime = rec_parse_field_name_str("EndTime");
......@@ -84,6 +100,27 @@ public:
double startTime, endTime;
// Communication informations
typedef enum {ModeR, ModeRW, ModeW} modes_t;
vector<string> handles;
vector<modes_t> modes;
vector<int> sizes;
private:
static modes_t strToMode(string m) {
if(m == "R") return ModeR;
if(m=="RW") return ModeRW;
if(m=="W") return ModeW;
cerr << "Unknown mode " << m << ", sorry" << endl;
throw(1);
}
static int stoi(string s) {
return std::stoi(s);
}
public:
recTask(const rec_record_t record, int id) : internalId(id) {
name = recordValueOpt(record, recName);
convertedDeps = false;
......@@ -96,11 +133,25 @@ public:
outputId = outputCount;
++outputCount;
}
tag = recordValueOpt(record, recTag);
tag = recordValueOpt(record, recTag);
jobId = stoi(recordValue(record, recJobId));
getVector(record, recDependsOn, dependsOn, stoi);
getVector(record, recHandles, handles);
getVector(record, recModes, modes, strToMode);
getVector(record, recSizes, sizes, stoi);
if( isReal && ((handles.size() != modes.size()) || (modes.size() != sizes.size()))) {
cerr << "RecReader: data information from jobId " << jobId << " is inconsistent: "
<< handles.size() << " Handles, " << modes.size() << " Modes, " << sizes.size() << " Sizes" << endl;
throw(1);
}
string submitOrderStr = recordValueOpt(record, recSubmitOrder);
if (submitOrderStr != "")
submitOrder = stoi(submitOrderStr);
jobId = stoi(recordValue(record, recJobId));
string deps = recordValueOpt(record, recDependsOn);
if(deps != "") {
vector<string> splitDeps = split(deps, ' ');
......@@ -149,12 +200,16 @@ const char* recTask::recJobId = NULL;
const char* recTask::recSubmitOrder = NULL;
const char* recTask::recEstimatedTime = NULL;
const char* recTask::recDependsOn = NULL;
const char* recTask::recTag = NULL;
const char* recTask::recTag = NULL;
const char* recTask::recHandles = NULL;
const char* recTask::recModes = NULL;
const char* recTask::recSizes = NULL;
const char* recTask::recWorkerId = NULL;
const char* recTask::recStartTime = NULL;
const char* recTask::recEndTime = NULL;
int recTask::outputCount = 0;
void RecFileInstance::readFromFile(const string inputFile, unordered_map<string, vector<double> > timings,
bool useSubmitOrder, bool appendTags) {
......@@ -232,6 +287,7 @@ void RecFileInstance::readFromFile(const string inputFile, unordered_map<string,
execTimes[i].push_back(pairs.second[i]);
}
nbTaskTypes = taskTypeNames.size();
for(auto& intID: realTasks) {
recTask &t = tasks[intID];
vector<int> deps;
......@@ -248,7 +304,8 @@ void RecFileInstance::readFromFile(const string inputFile, unordered_map<string,
}
nbTasks = taskTypes.size();
// Cleanup taskTypeNames if possible
// Cleanup taskTypeNames (name:footprint) if possible (remove
// footprint if no other typeName has the same name)
for(int i = 0; i < nbTaskTypes; i++) {
string n = split(taskTypeNames[i], ':')[0];
bool other=false;
......@@ -260,6 +317,51 @@ void RecFileInstance::readFromFile(const string inputFile, unordered_map<string,
taskTypeNames[i] = n;
}
map<string, pair<int, int> > itemVersionAndIndex;
// Update the information about data items
computeTopOrder();
itemsRequired.resize(nbTasks);
itemsProduced.resize(nbTasks);
for(int & tID: topOrder) {
recTask & t = tasks[realTasks[tID]];
vector<int>& req = itemsRequired[tID];
vector<int>& prod = itemsProduced[tID];
for(int hindex = 0, nbItems = t.handles.size(); hindex < nbItems; hindex++) {
string h = t.handles[hindex];
recTask::modes_t mode = t.modes[hindex];
int version = 0;
int index = itemNames.size();
if(itemVersionAndIndex.count(h) > 0) {
auto data = itemVersionAndIndex[h];
version = data.first;
index = data.second;
} else {
itemNames.push_back(h + ":" + to_string(version));
itemSizes.push_back(t.sizes[hindex]);
itemVersionAndIndex.emplace(h, make_pair(version, index));
}
if(mode == recTask::ModeR || mode == recTask::ModeRW) {
req.push_back(index);
if((t.sizes[hindex] != itemSizes[index]) && (mode == recTask::ModeR)) {
cerr << "RecReader: Warning: Data #" << hindex << " of task " << t.jobId << " (handle " << h << ")"
<< " has size " << t.sizes[hindex] << " but last produced handle has size " << itemSizes[index] << endl;
}
}
if(mode == recTask::ModeW || mode == recTask::ModeRW) {
auto &data = itemVersionAndIndex[h];
++data.first;
data.second = itemNames.size();
itemNames.push_back(h + ":" + to_string(data.first));
itemSizes.push_back(t.sizes[hindex]);
prod.push_back(data.second);
}
}
}
int v = 0;
for(int n: nbWorkers) {
vector<int> ids;
......@@ -321,7 +423,18 @@ RecFileInstance::RecFileInstance(const string inputFile, const string platformFi
}
nbWorkerTypes = nbWorkers.size();
totalWorkers = getSum(nbWorkers);
int memNodes = 1;
int workerIndex = 0;
for(int k = 0; k < nbWorkerTypes; ++k) {
int memnode = 0;
if(workerNames[k].compare(0, 3, "cpu") != 0)
memnode = memNodes++;
vector<int> nodes(nbWorkers[k], memnode);
memoryNodes.push_back(nodes);
}
nbMemoryNodes = memNodes;
rec_rset_t timingSet = rec_db_get_rset_by_type(platform, "timing");
if(!timingSet) {
cerr << "RecFileInstance: no 'timing' record set\n"<< endl;
......
......@@ -47,14 +47,19 @@ Instance::Instance(const string input_file, int convertIndices) {
}
totalWorkers = static_cast<unsigned int>(s);
int v = 0;
for(int n: nbWorkers) {
for(int k = 0; k < nbWorkerTypes; k++) {
int n = nbWorkers[k];
vector<int> ids;
for(int i = 0; i < n; i++)
vector<int> nodes(n, k); // Size n, all equal to k
for(int i = 0; i < n; i++) {
ids.push_back(v + i);
}
v += n;
workerIDs.push_back(ids);
memoryNodes.push_back(nodes);
}
execTimes.resize(nbWorkerTypes);
nbMemoryNodes = nbWorkerTypes;
for(i = 0; i < nbWorkerTypes; i++) {
readArray(input, execTimes[i]);
......@@ -75,6 +80,9 @@ Instance::Instance(const string input_file, int convertIndices) {
isIndependent = false;
}
itemsRequired.resize(nbTasks);
itemsProduced.resize(nbTasks);
int nbTaskNames = readArray(input, taskIDs);
if(nbTaskNames != -1 && nbTaskNames != 0 && nbTaskNames != nbTasks) {
cerr << "Task Names do not have the correct length: " << nbTaskNames << " and not " << nbTasks << endl;
......@@ -125,7 +133,7 @@ void Instance::display(int verbosity) {
cout << taskTypeNames << endl;
for(i = 0; i < (int) nbWorkers.size(); i++){
if(workerNames.size() > 0) cout << workerNames[i] << " \t";
cout << execTimes[i] << " " << workerIDs[i] << endl;
cout << execTimes[i] << " " << workerIDs[i] << " " << memoryNodes[i] << endl;
}
if(verbosity >= 4) {
cout << taskTypes << endl;
......@@ -199,7 +207,8 @@ void Instance::mergeWorkerTypes(const vector<int> indicesToMerge) {
vector<int> newNbWorkers;
vector< vector<double> > newExecTimes;
vector<vector<int> > newWorkerIDs;
vector<vector<int> > newMemoryNodes;
int replacementNB = 0;
vector<double> replacementExecTimes(nbTaskTypes, 0);
......@@ -222,21 +231,25 @@ void Instance::mergeWorkerTypes(const vector<int> indicesToMerge) {
newNbWorkers.push_back(nbWorkers[i]);
newExecTimes.push_back(execTimes[i]);
newWorkerIDs.push_back(workerIDs[i]);
newMemoryNodes.push_back(memoryNodes[i]);
}
}
vector<int> replacementIDs;
vector<int> replacementMemNodes;
for(int i = 0; i < (int) indicesToMerge.size(); i++) {
replacementIDs.insert(replacementIDs.end(), workerIDs[i].begin(), workerIDs[i].end());
replacementMemNodes.insert(replacementMemNodes.end(), memoryNodes[i].begin(), memoryNodes[i].end());
}
newWorkerIDs.push_back(replacementIDs);
newMemoryNodes.push_back(replacementMemNodes);
newNbWorkers.push_back(replacementNB);
newExecTimes.push_back(replacementExecTimes);
nbWorkers = newNbWorkers;
execTimes = newExecTimes;
workerIDs = newWorkerIDs;
memoryNodes = newMemoryNodes;
nbWorkerTypes = nbWorkers.size();
......
......@@ -105,8 +105,16 @@ string ExportToString::getResult() {
/* ExportAlloc: exports in .rec format */
ExportAlloc::ExportAlloc(string filename, Instance* ins, bool submitOrder, bool outputType)
: output(filename), instance(ins), submitOrder(submitOrder), outputType(outputType) {
ExportAlloc::ExportAlloc(string filename, Instance* ins, bool submitOrder, bool outputType, bool workerOrder)
: output(filename), instance(ins), submitOrder(submitOrder), outputType(outputType), workerOrder(workerOrder) {
if (workerOrder) {
if (!outputType) {
workerTaskCount.resize(instance->totalWorkers, 0);
} else {
cerr << "Export: workerOrder is not compatible with outputType, ignoring." << endl;
workerOrder = false;
}
}
}
void ExportAlloc::onSchedule(int i, int w, double s, double f) {
......@@ -136,6 +144,10 @@ void ExportAlloc::onSchedule(int i, int w, double s, double f) {
output << endl;
} else {
output << "SpecificWorker: " << instance->workerIDs[type][index] << endl;
if (workerOrder) {
output << "Workerorder: " << workerTaskCount[w] << endl;
++workerTaskCount[w];
}
}
......
#ifndef COMSEQUENCE_H
#define COMSEQUENCE_H
#include <vector>
#include "availSequence.h"
#include "instance.h"
// Implements a not so greedy way of handling communication resources
// New transfers can only start after any ongoing transfer with which they are incompatible.
// Interference is handled with availSequence
class CommSequence {
protected:
double bandwidth;
int nbWT;
// This handles contention.
// 1st approx: each transfer starts ASAP on its source & dest resource
// Transfer end is when both are done.
// 2nd approx: transfers are preemptible (malleable when BW values are heterogeneous ?)
// ending time = max availability, on other resources it starts ALAP ?
// Yes, but then how does it interact with incompatibility ?
// Easy way: no incompatibility, only contention. With
// availSequences. Transfers start ALAP. LastMile model ? With
// homogeneous BW to start. So transfers use 2 resources, and use
// them at 100%.
std::vector<AvailSequence> outgoing;
std::vector<AvailSequence> incoming;
public:
CommSequence(Instance& ins);
// Returns ending time
// If provided, goal means: do not finish earlier than this time.
// schedule the transfer, trying to end at "goal"
// if "goal" is too early, end as soon as possible
// In all cases, schedule all parts of the transfer to finish at the same time, ALAP.