Commit 123c779e authored by CARDOSI Paul's avatar CARDOSI Paul
Browse files

Merge branch 'split-workers-from-runtime-at-interface-level' into 'master'

Separate task graph from work execution.

See merge request bramas/spetabaru!27
parents 6a09d462 40ae356e
#include <optional>
#include <mutex>
#include "SpComputeEngine.hpp"
#include "TaskGraph/SpAbstractTaskGraph.hpp"
#include "Tasks/SpAbstractTask.hpp"
#include "SpWorker.hpp"
void SpComputeEngine::addGraph(SpAbstractTaskGraph* tg) {
if(tg) {
tg->setComputeEngine(this);
taskGraphs.push_back(tg);
}
}
void SpComputeEngine::stopIfNotAlreadyStopped() {
if(!hasBeenStopped) {
{
std::unique_lock<std::mutex> computeEngineLock(ceMutex);
for(auto& w : workers) {
w->setStopFlag(true);
}
}
ceCondVar.notify_all();
for(auto& w : workers) {
w->waitForThread();
}
hasBeenStopped = true;
}
}
#ifndef SPCOMPUTEENGINE_HPP
#define SPCOMPUTEENGINE_HPP
#include <memory>
#include <optional>
#include <utility>
#include <algorithm>
#include <iterator>
#include <atomic>
#include "Compute/SpWorker.hpp"
#include "Schedulers/SpPrioScheduler.hpp"
#include "Utils/small_vector.hpp"
class SpAbstractTaskGraph;
class SpComputeEngine {
private:
small_vector<std::unique_ptr<SpWorker>> workers;
small_vector<SpAbstractTaskGraph*> taskGraphs;
std::mutex ceMutex;
std::condition_variable ceCondVar;
std::mutex migrationMutex;
std::condition_variable migrationCondVar;
SpPrioScheduler prioSched;
std::atomic<long int> nbWorkersToMigrate;
std::atomic<long int> migrationSignalingCounter;
SpWorker::SpWorkerType workerTypeToMigrate;
SpComputeEngine* ceToMigrateTo;
long int nbAvailableCpuWorkers;
long int nbAvailableGpuWorkers;
long int totalNbCpuWorkers;
long int totalNbGpuWorkers;
bool hasBeenStopped;
private:
void wakeUpWaitingWorkers() {
{
std::unique_lock<std::mutex> workerLock(ceMutex);
}
ceCondVar.notify_all();
}
auto sendWorkersToInternal(SpComputeEngine *otherCe, const SpWorker::SpWorkerType wt, const long int maxCount, const bool allowBusyWorkersToBeDetached) {
small_vector<std::unique_ptr<SpWorker>> res;
using iter_t = small_vector<std::unique_ptr<SpWorker>>::iterator;
auto computeNbWorkersToDetach =
[&]() {
auto compute =
[](long int nbTotal, long int nbWaiting, const bool allowBusyWorToBeDeta, const long int max) {
if(allowBusyWorToBeDeta) {
return std::min(nbTotal, max);
} else {
return std::min(nbWaiting, max);
}
};
switch(wt) {
case SpWorker::SpWorkerType::CPU_WORKER:
return compute(totalNbCpuWorkers, nbAvailableCpuWorkers, allowBusyWorkersToBeDetached, maxCount);
case SpWorker::SpWorkerType::GPU_WORKER:
return compute(totalNbGpuWorkers, nbAvailableGpuWorkers, allowBusyWorkersToBeDetached, maxCount);
default:
return static_cast<long int>(0);
}
};
const auto nbWorkersToDetach =
[&]()
{
std::unique_lock<std::mutex> computeEngineLock(ceMutex);
auto result = computeNbWorkersToDetach();
if(result > 0) {
workerTypeToMigrate = wt;
ceToMigrateTo = otherCe;
migrationSignalingCounter.store(result, std::memory_order_relaxed);
nbWorkersToMigrate.store(result, std::memory_order_release);
}
return result;
}();
if(nbWorkersToDetach > 0) {
ceCondVar.notify_all();
{
std::unique_lock<std::mutex> migrationLock(migrationMutex);
migrationCondVar.wait(migrationLock, [&](){ return !(migrationSignalingCounter.load(std::memory_order_acquire) > 0); });
}
auto startIt = std::move_iterator<iter_t>(workers.begin());
auto endIt = std::move_iterator<iter_t>(workers.end());
auto eraseStartPosIt = std::remove_if(startIt, endIt,
[&](std::unique_ptr<SpWorker>&& wPtr) {
if(wPtr->getComputeEngine() != this) {
res.push_back(std::move(wPtr));
return true;
} else {
return false;
}
});
workers.erase(eraseStartPosIt.base(), workers.end());
std::unique_lock<std::mutex> computeEngineLock(ceMutex);
updateWorkerCounters<true, true>(wt, -nbWorkersToDetach);
}
return res;
}
template <const bool bindAndStartWorkers>
void addWorkersInternal(small_vector_base<std::unique_ptr<SpWorker>>&& inWorkers) {
for(auto& w : inWorkers) {
updateWorkerCounters<true,false>(w->getType(), +1);
if constexpr(bindAndStartWorkers) {
w->bindTo(this);
w->start();
}
}
if(workers.empty()) {
workers = std::move(inWorkers);
} else {
workers.reserve(workers.size() + inWorkers.size());
std::move(std::begin(inWorkers), std::end(inWorkers), std::back_inserter(workers));
}
}
bool areThereAnyWorkersToMigrate() const {
return nbWorkersToMigrate.load(std::memory_order_acquire) > 0;
}
bool areThereAnyReadyTasks() const {
return prioSched.getNbTasks() > 0;
}
bool areWorkersToMigrateOfType(SpWorker::SpWorkerType inWt) {
return workerTypeToMigrate == inWt;
}
SpAbstractTask* getTask() {
return prioSched.pop();
}
template <const bool updateTotalCounter, const bool updateAvailableCounter>
void updateWorkerCounters(const SpWorker::SpWorkerType inWt, const long int addend) {
switch(inWt) {
case SpWorker::SpWorkerType::CPU_WORKER:
if constexpr(updateTotalCounter) {
totalNbCpuWorkers += addend;
}
if constexpr(updateAvailableCounter) {
nbAvailableGpuWorkers += addend;
}
break;
case SpWorker::SpWorkerType::GPU_WORKER:
if constexpr(updateTotalCounter) {
totalNbGpuWorkers += addend;
}
if constexpr(updateAvailableCounter) {
nbAvailableGpuWorkers += addend;
}
break;
default:
break;
}
}
void wait(SpWorker& worker) {
std::unique_lock<std::mutex> ceLock(ceMutex);
updateWorkerCounters<false, true>(worker.getType(), +1);
ceCondVar.wait(ceLock, [&]() { return worker.hasBeenStopped() || areThereAnyWorkersToMigrate() || areThereAnyReadyTasks();});
updateWorkerCounters<false, true>(worker.getType(), -1);
}
auto getCeToMigrateTo() {
return ceToMigrateTo;
}
auto fetchDecNbOfWorkersToMigrate() {
return nbWorkersToMigrate.fetch_sub(1, std::memory_order_relaxed);
}
void notifyMigrationFinished() {
{
std::unique_lock<std::mutex> migrationLock(migrationMutex);
}
migrationCondVar.notify_one();
}
auto fetchDecMigrationSignalingCounter() {
return migrationSignalingCounter.fetch_sub(1, std::memory_order_release);
}
friend void SpWorker::start();
friend void SpWorker::waitOnCe(SpComputeEngine*);
public:
explicit SpComputeEngine(small_vector_base<std::unique_ptr<SpWorker>>&& inWorkers = SpWorker::createDefaultWorkerTeam())
: workers(), taskGraphs(), ceMutex(), ceCondVar(), migrationMutex(), migrationCondVar(), prioSched(), nbWorkersToMigrate(0),
migrationSignalingCounter(0), workerTypeToMigrate(SpWorker::SpWorkerType::CPU_WORKER), ceToMigrateTo(nullptr), nbAvailableCpuWorkers(0),
nbAvailableGpuWorkers(0), totalNbCpuWorkers(0), totalNbGpuWorkers(0), hasBeenStopped(false) {
addWorkers(std::move(inWorkers));
}
~SpComputeEngine() {
stopIfNotAlreadyStopped();
}
void addGraph(SpAbstractTaskGraph* tg);
void pushTask(SpAbstractTask* t) {
prioSched.push(t);
wakeUpWaitingWorkers();
}
void pushTasks(small_vector_base<SpAbstractTask*>& tasks) {
prioSched.pushTasks(tasks);
wakeUpWaitingWorkers();
}
size_t getCurrentNbOfWorkers() const {
return workers.size();
}
void addWorkers(small_vector_base<std::unique_ptr<SpWorker>>&& inWorkers) {
addWorkersInternal<true>(std::move(inWorkers));
}
void sendWorkersTo(SpComputeEngine* otherCe, const SpWorker::SpWorkerType wt, const size_t maxCount, const bool allowBusyWorkersToBeDetached) {
if(otherCe && otherCe != this) {
otherCe->addWorkersInternal<false>(sendWorkersToInternal(otherCe, wt, maxCount, allowBusyWorkersToBeDetached));
}
}
auto detachWorkers(const SpWorker::SpWorkerType wt, const size_t maxCount, const bool allowBusyWorkersToBeDetached) {
return sendWorkersToInternal(nullptr, wt, maxCount, allowBusyWorkersToBeDetached);
}
void stopIfNotAlreadyStopped();
};
#endif
#include "Compute/SpWorker.hpp"
#include "Compute/SpComputeEngine.hpp"
#include "TaskGraph/SpAbstractTaskGraph.hpp"
std::atomic<long int> SpWorker::totalNbThreadsCreated = 1;
void SpWorker::start() {
if(!t.joinable()) {
t = std::thread([&]() {
SpUtils::SetThreadId(threadId);
while(!stopFlag.load(std::memory_order_relaxed)) {
SpComputeEngine* saveCe = nullptr;
// Using memory order acquire on ce.load to form release/acquire pair
// I think we could use memory order consume as all the code that follows depends on the load of ce (through saveCe).
if((saveCe = ce.load(std::memory_order_acquire))) {
if(saveCe->areThereAnyWorkersToMigrate()) {
if(saveCe->areWorkersToMigrateOfType(wt)) {
auto previousNbOfWorkersToMigrate = saveCe->fetchDecNbOfWorkersToMigrate();
if(previousNbOfWorkersToMigrate > 0) {
SpComputeEngine* newCe = saveCe->getCeToMigrateTo();
ce.store(newCe, std::memory_order_relaxed);
auto previousMigrationSignalingCounterVal = saveCe->fetchDecMigrationSignalingCounter();
if(previousMigrationSignalingCounterVal == 1) {
saveCe->notifyMigrationFinished();
}
continue;
}
}
}
if(saveCe->areThereAnyReadyTasks()){
SpAbstractTask* task = saveCe->getTask();
if(task) {
SpAbstractTaskGraph* atg = task->getAbstractTaskGraph();
atg->preTaskExecution(task);
execute(task);
atg->postTaskExecution(task);
continue;
}
}
waitOnCe(saveCe);
} else {
idleWait();
}
}
});
}
}
void SpWorker::waitOnCe(SpComputeEngine* inCe) {
inCe->wait(*this);
}
#ifndef SPWORKER_HPP
#define SPWORKER_HPP
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <thread>
#include "Utils/SpModes.hpp"
#include "Utils/SpUtils.hpp"
#include "Tasks/SpAbstractTask.hpp"
#include "Utils/small_vector.hpp"
class SpComputeEngine;
class SpWorker {
public:
enum class SpWorkerType {
CPU_WORKER,
GPU_WORKER
};
static std::atomic<long int> totalNbThreadsCreated;
static auto createATeamOfNCpuWorkers(const int nbCpuWorkers) {
small_vector<std::unique_ptr<SpWorker>> res;
res.reserve(nbCpuWorkers);
for(int i = 0; i < nbCpuWorkers; i++) {
res.emplace_back(std::make_unique<SpWorker>(SpWorker::SpWorkerType::CPU_WORKER));
}
return res;
}
static auto createDefaultWorkerTeam() {
return createATeamOfNCpuWorkers(SpUtils::DefaultNumThreads());
}
private:
const SpWorkerType wt;
std::mutex workerMutex;
std::condition_variable workerConditionVariable;
std::atomic<bool> stopFlag;
std::atomic<SpComputeEngine*> ce;
long int threadId;
std::thread t;
private:
void setStopFlag(const bool inStopFlag) {
stopFlag.store(inStopFlag, std::memory_order_relaxed);
}
bool hasBeenStopped() const {
return stopFlag.load(std::memory_order_relaxed);
}
SpComputeEngine* getComputeEngine() const {
return ce.load(std::memory_order_relaxed);
}
void execute(SpAbstractTask *task) {
switch(this->getType()) {
case SpWorkerType::CPU_WORKER:
task->execute(SpCallableType::CPU);
break;
case SpWorkerType::GPU_WORKER:
task->execute(SpCallableType::GPU);
break;
default:
assert(false && "Worker is of unknown type.");
}
}
void waitForThread() {
t.join();
}
void stop() {
if(t.joinable()) {
if(stopFlag.load(std::memory_order_relaxed)) {
{
std::unique_lock<std::mutex> workerLock(workerMutex);
stopFlag.store(true, std::memory_order_relaxed);
}
workerConditionVariable.notify_one();
}
waitForThread();
}
}
void bindTo(SpComputeEngine* inCe) {
if(inCe) {
{
std::unique_lock workerLock(workerMutex);
ce.store(inCe, std::memory_order_release);
}
workerConditionVariable.notify_one();
}
}
void idleWait() {
std::unique_lock<std::mutex> workerLock(workerMutex);
workerConditionVariable.wait(workerLock, [&]() { return stopFlag.load(std::memory_order_relaxed) || ce.load(std::memory_order_relaxed); });
}
void waitOnCe(SpComputeEngine* inCe);
friend class SpComputeEngine;
public:
explicit SpWorker(const SpWorkerType inWt) :
wt(inWt), workerMutex(), workerConditionVariable(),
stopFlag(false), ce(nullptr), threadId(0), t() {
threadId = totalNbThreadsCreated.fetch_add(1, std::memory_order_relaxed);
}
SpWorker(const SpWorker& other) = delete;
SpWorker(SpWorker&& other) = delete;
SpWorker& operator=(const SpWorker& other) = delete;
SpWorker& operator=(SpWorker&& other) = delete;
~SpWorker() {
stop();
}
SpWorkerType getType() const {
return wt;
}
void start();
};
#endif
......@@ -7,6 +7,7 @@
#include <fstream>
#include <cmath>
#include <iterator>
#include "Tasks/SpAbstractTask.hpp"
#include "Utils/SpTimePoint.hpp"
......@@ -16,12 +17,38 @@
namespace SpSvgTrace {
inline void GenerateTrace(const std::string& outputFilename, const std::list<SpAbstractTask*>& tasksFinished,
const SpTimePoint& startingTime, const int nbThreads, const bool showDependences) {
const SpTimePoint& startingTime, const bool showDependences) {
std::ofstream svgfile(outputFilename);
if(svgfile.is_open() == false){
throw std::invalid_argument("Cannot open filename : " + outputFilename);
}
const auto threadIds =
[&]() {
std::vector<long int> res;
res.reserve(tasksFinished.size());
std::transform(std::begin(tasksFinished), std::end(tasksFinished), std::back_inserter(res),
[](SpAbstractTask* task){
return task->getThreadIdComputer();
});
std::sort(std::begin(res), std::end(res));
res.erase(std::unique(std::begin(res), std::end(res)), std::end(res));
return res;
}();
const int nbThreads = static_cast<int>(threadIds.size());
const auto threadIdsToVerticalSlotPosMap =
[&]() {
std::unordered_map<long int, long int> mapping;
for(long int i=0; i < static_cast<long int>(threadIds.size()); i++) {
mapping[threadIds[i]] = i+1;
}
return mapping;
}();
const long int vsizeperthread = std::max(100, std::min(200, 2000/nbThreads));
const long int vmargin = 100;
......@@ -70,22 +97,23 @@ inline void GenerateTrace(const std::string& outputFilename, const std::list<SpA
"\" font-size=\"30\" fill=\"black\">" << label << "</text>\n";
}
for(int idxThread = 0 ; idxThread < nbThreads ; ++idxThread){
for(auto idxThread : threadIds) {
auto yPos = threadIdsToVerticalSlotPosMap.at(idxThread);
svgfile << " <rect width=\"" << hdimtime << "\" height=\"" << vsizeperthread
<< "\" x=\"" << hmargin << "\" y=\"" << idxThread*(vsizeperthread+threadstrock) + vmargin << "\" style=\"fill:white;stroke:black;stroke-width:" << threadstrock << "\" />\n";
<< "\" x=\"" << hmargin << "\" y=\"" << (yPos-1)*(vsizeperthread+threadstrock) + vmargin << "\" style=\"fill:white;stroke:black;stroke-width:" << threadstrock << "\" />\n";
const std::string label = "Thread " + std::to_string(idxThread);
svgfile << "<text x=\"" << hmargin/2 << "\" y=\"" << idxThread*(vsizeperthread+threadstrock) + vmargin + label.size()*30 - vsizeperthread/2 <<
svgfile << "<text x=\"" << hmargin/2 << "\" y=\"" << (yPos-1)*(vsizeperthread+threadstrock) + vmargin + label.size()*30 - vsizeperthread/2 <<
"\" font-size=\"30\" fill=\"black\" transform=\"rotate(-90, " << hmargin/2 << " "
<< idxThread*(vsizeperthread+threadstrock) + vmargin + label.size()*30 - vsizeperthread/2 << ")\">" << label << "</text>\n";
<< (yPos-1)*(vsizeperthread+threadstrock) + vmargin + label.size()*30 - vsizeperthread/2 << ")\">" << label << "</text>\n";
}
std::unordered_map<std::string, std::string> colors;
for(const auto& atask : tasksFinished){
const long int idxThreadComputer = atask->getThreadIdComputer();
const long int ypos_start = (idxThreadComputer-1)*(vsizeperthread+threadstrock) + threadstrock/2 + vmargin;
const long int ypos_start = (threadIdsToVerticalSlotPosMap.at(idxThreadComputer)-1)*(vsizeperthread+threadstrock) + threadstrock/2 + vmargin;
const long int ypos_end = ypos_start + vsizeperthread - threadstrock;
const double taskStartTime = startingTime.differenceWith(atask->getStartingTime());
const double taskEndTime = startingTime.differenceWith(atask->getEndingTime());
......@@ -139,7 +167,7 @@ inline void GenerateTrace(const std::string& outputFilename, const std::list<SpA
for(const auto& atask : tasksFinished){
atask->getDependences(&deps);
const long int ypos_start = (atask->getThreadIdComputer()-1)*(vsizeperthread+threadstrock) + threadstrock/2 + vmargin + vsizeperthread/2;
const long int ypos_start = (threadIdsToVerticalSlotPosMap.at(atask->getThreadIdComputer())-1)*(vsizeperthread+threadstrock) + threadstrock/2 + vmargin + vsizeperthread/2;
const double taskEndTime = startingTime.differenceWith(atask->getEndingTime());
const long int xpos_start = static_cast<long int>(double(hdimtime)*taskEndTime/duration) + hmargin;
......@@ -147,7 +175,7 @@ inline void GenerateTrace(const std::string& outputFilename, const std::list<SpA
for(const auto& taskDep : deps){
if(alreadyExist.find(taskDep) == alreadyExist.end()){
const long int ypos_end = (taskDep->getThreadIdComputer()-1)*(vsizeperthread+threadstrock) + threadstrock/2 + vmargin + vsizeperthread/2;
const long int ypos_end = (threadIdsToVerticalSlotPosMap.at(taskDep->getThreadIdComputer())-1)*(vsizeperthread+threadstrock) + threadstrock/2 + vmargin + vsizeperthread/2;
const long int depstrocke = 1;
const double taskStartTime = startingTime.differenceWith(taskDep->getStartingTime());
const long int xpos_end = static_cast<long int>(double(hdimtime)*taskStartTime/duration) + hmargin;
......
This diff is collapsed.
......@@ -7,11 +7,12 @@
#include <vector>
#include <queue>
#include <utility>
#include "Tasks/SpAbstractTask.hpp"
#include "Utils/SpPriority.hpp"
#include "Utils/small_vector.hpp"
#include "Speculation/SpSpeculativeModel.hpp"
class SpPrioScheduler{
struct ComparePrio{
......@@ -47,13 +48,21 @@ public:
tasksReady.push(newTask);
return 1;
}