Commit 40ae356e authored by CARDOSI Paul's avatar CARDOSI Paul
Browse files

Separate task graph from work execution.

Squashed commit messages :
Draft.

Draft 2.

Draft 3.

Deadlock.

Lock worker lock and compute engine lock in the same order everywhere.

Forward SpRuntime nbThreads argument to SpComputeEngine constructor. Remove debug printouts.

Lock all locks in scoped ordered lock through a unique_lock instance.

Implement svg trace function in SpTaskGraph. Implement stopAllThreads function in SpRuntime. Use plain mutexes in SpScopedOrderedLock.

Allow compute engine pointer of worker to be changed by main thread.

Remove unnecessary boolean variable.

Optimize for the case where workers just keep popping the next task from the same compute engine.

Begin work on detachWorkers function.

Begin remodeling code to allow workers to be detached.

Deadlock.

Working version.

Remove unnecessary shouldBeMigrated member variable.

Properly stop workers and compute engine.

Fix broken svg trace generation function.

Remove unused classes.

Add compute engine and task graph unit tests....
parent 6a09d462
#include <optional>
#include <mutex>
#include "SpComputeEngine.hpp"
#include "TaskGraph/SpAbstractTaskGraph.hpp"
#include "Tasks/SpAbstractTask.hpp"
#include "SpWorker.hpp"
void SpComputeEngine::addGraph(SpAbstractTaskGraph* tg) {
if(tg) {
tg->setComputeEngine(this);
taskGraphs.push_back(tg);
}
}
void SpComputeEngine::stopIfNotAlreadyStopped() {
if(!hasBeenStopped) {
{
std::unique_lock<std::mutex> computeEngineLock(ceMutex);
for(auto& w : workers) {
w->setStopFlag(true);
}
}
ceCondVar.notify_all();
for(auto& w : workers) {
w->waitForThread();
}
hasBeenStopped = true;
}
}
#ifndef SPCOMPUTEENGINE_HPP
#define SPCOMPUTEENGINE_HPP
#include <memory>
#include <optional>
#include <utility>
#include <algorithm>
#include <iterator>
#include <atomic>
#include "Compute/SpWorker.hpp"
#include "Schedulers/SpPrioScheduler.hpp"
#include "Utils/small_vector.hpp"
class SpAbstractTaskGraph;
class SpComputeEngine {
private:
small_vector<std::unique_ptr<SpWorker>> workers;
small_vector<SpAbstractTaskGraph*> taskGraphs;
std::mutex ceMutex;
std::condition_variable ceCondVar;
std::mutex migrationMutex;
std::condition_variable migrationCondVar;
SpPrioScheduler prioSched;
std::atomic<long int> nbWorkersToMigrate;
std::atomic<long int> migrationSignalingCounter;
SpWorker::SpWorkerType workerTypeToMigrate;
SpComputeEngine* ceToMigrateTo;
long int nbAvailableCpuWorkers;
long int nbAvailableGpuWorkers;
long int totalNbCpuWorkers;
long int totalNbGpuWorkers;
bool hasBeenStopped;
private:
void wakeUpWaitingWorkers() {
{
std::unique_lock<std::mutex> workerLock(ceMutex);
}
ceCondVar.notify_all();
}
auto sendWorkersToInternal(SpComputeEngine *otherCe, const SpWorker::SpWorkerType wt, const long int maxCount, const bool allowBusyWorkersToBeDetached) {
small_vector<std::unique_ptr<SpWorker>> res;
using iter_t = small_vector<std::unique_ptr<SpWorker>>::iterator;
auto computeNbWorkersToDetach =
[&]() {
auto compute =
[](long int nbTotal, long int nbWaiting, const bool allowBusyWorToBeDeta, const long int max) {
if(allowBusyWorToBeDeta) {
return std::min(nbTotal, max);
} else {
return std::min(nbWaiting, max);
}
};
switch(wt) {
case SpWorker::SpWorkerType::CPU_WORKER:
return compute(totalNbCpuWorkers, nbAvailableCpuWorkers, allowBusyWorkersToBeDetached, maxCount);
case SpWorker::SpWorkerType::GPU_WORKER:
return compute(totalNbGpuWorkers, nbAvailableGpuWorkers, allowBusyWorkersToBeDetached, maxCount);
default:
return static_cast<long int>(0);
}
};
const auto nbWorkersToDetach =
[&]()
{
std::unique_lock<std::mutex> computeEngineLock(ceMutex);
auto result = computeNbWorkersToDetach();
if(result > 0) {
workerTypeToMigrate = wt;
ceToMigrateTo = otherCe;
migrationSignalingCounter.store(result, std::memory_order_relaxed);
nbWorkersToMigrate.store(result, std::memory_order_release);
}
return result;
}();
if(nbWorkersToDetach > 0) {
ceCondVar.notify_all();
{
std::unique_lock<std::mutex> migrationLock(migrationMutex);
migrationCondVar.wait(migrationLock, [&](){ return !(migrationSignalingCounter.load(std::memory_order_acquire) > 0); });
}
auto startIt = std::move_iterator<iter_t>(workers.begin());
auto endIt = std::move_iterator<iter_t>(workers.end());
auto eraseStartPosIt = std::remove_if(startIt, endIt,
[&](std::unique_ptr<SpWorker>&& wPtr) {
if(wPtr->getComputeEngine() != this) {
res.push_back(std::move(wPtr));
return true;
} else {
return false;
}
});
workers.erase(eraseStartPosIt.base(), workers.end());
std::unique_lock<std::mutex> computeEngineLock(ceMutex);
updateWorkerCounters<true, true>(wt, -nbWorkersToDetach);
}
return res;
}
template <const bool bindAndStartWorkers>
void addWorkersInternal(small_vector_base<std::unique_ptr<SpWorker>>&& inWorkers) {
for(auto& w : inWorkers) {
updateWorkerCounters<true,false>(w->getType(), +1);
if constexpr(bindAndStartWorkers) {
w->bindTo(this);
w->start();
}
}
if(workers.empty()) {
workers = std::move(inWorkers);
} else {
workers.reserve(workers.size() + inWorkers.size());
std::move(std::begin(inWorkers), std::end(inWorkers), std::back_inserter(workers));
}
}
bool areThereAnyWorkersToMigrate() const {
return nbWorkersToMigrate.load(std::memory_order_acquire) > 0;
}
bool areThereAnyReadyTasks() const {
return prioSched.getNbTasks() > 0;
}
bool areWorkersToMigrateOfType(SpWorker::SpWorkerType inWt) {
return workerTypeToMigrate == inWt;
}
SpAbstractTask* getTask() {
return prioSched.pop();
}
template <const bool updateTotalCounter, const bool updateAvailableCounter>
void updateWorkerCounters(const SpWorker::SpWorkerType inWt, const long int addend) {
switch(inWt) {
case SpWorker::SpWorkerType::CPU_WORKER:
if constexpr(updateTotalCounter) {
totalNbCpuWorkers += addend;
}
if constexpr(updateAvailableCounter) {
nbAvailableGpuWorkers += addend;
}
break;
case SpWorker::SpWorkerType::GPU_WORKER:
if constexpr(updateTotalCounter) {
totalNbGpuWorkers += addend;
}
if constexpr(updateAvailableCounter) {
nbAvailableGpuWorkers += addend;
}
break;
default:
break;
}
}
void wait(SpWorker& worker) {
std::unique_lock<std::mutex> ceLock(ceMutex);
updateWorkerCounters<false, true>(worker.getType(), +1);
ceCondVar.wait(ceLock, [&]() { return worker.hasBeenStopped() || areThereAnyWorkersToMigrate() || areThereAnyReadyTasks();});
updateWorkerCounters<false, true>(worker.getType(), -1);
}
auto getCeToMigrateTo() {
return ceToMigrateTo;
}
auto fetchDecNbOfWorkersToMigrate() {
return nbWorkersToMigrate.fetch_sub(1, std::memory_order_relaxed);
}
void notifyMigrationFinished() {
{
std::unique_lock<std::mutex> migrationLock(migrationMutex);
}
migrationCondVar.notify_one();
}
auto fetchDecMigrationSignalingCounter() {
return migrationSignalingCounter.fetch_sub(1, std::memory_order_release);
}
friend void SpWorker::start();
friend void SpWorker::waitOnCe(SpComputeEngine*);
public:
explicit SpComputeEngine(small_vector_base<std::unique_ptr<SpWorker>>&& inWorkers = SpWorker::createDefaultWorkerTeam())
: workers(), taskGraphs(), ceMutex(), ceCondVar(), migrationMutex(), migrationCondVar(), prioSched(), nbWorkersToMigrate(0),
migrationSignalingCounter(0), workerTypeToMigrate(SpWorker::SpWorkerType::CPU_WORKER), ceToMigrateTo(nullptr), nbAvailableCpuWorkers(0),
nbAvailableGpuWorkers(0), totalNbCpuWorkers(0), totalNbGpuWorkers(0), hasBeenStopped(false) {
addWorkers(std::move(inWorkers));
}
~SpComputeEngine() {
stopIfNotAlreadyStopped();
}
void addGraph(SpAbstractTaskGraph* tg);
void pushTask(SpAbstractTask* t) {
prioSched.push(t);
wakeUpWaitingWorkers();
}
void pushTasks(small_vector_base<SpAbstractTask*>& tasks) {
prioSched.pushTasks(tasks);
wakeUpWaitingWorkers();
}
size_t getCurrentNbOfWorkers() const {
return workers.size();
}
void addWorkers(small_vector_base<std::unique_ptr<SpWorker>>&& inWorkers) {
addWorkersInternal<true>(std::move(inWorkers));
}
void sendWorkersTo(SpComputeEngine* otherCe, const SpWorker::SpWorkerType wt, const size_t maxCount, const bool allowBusyWorkersToBeDetached) {
if(otherCe && otherCe != this) {
otherCe->addWorkersInternal<false>(sendWorkersToInternal(otherCe, wt, maxCount, allowBusyWorkersToBeDetached));
}
}
auto detachWorkers(const SpWorker::SpWorkerType wt, const size_t maxCount, const bool allowBusyWorkersToBeDetached) {
return sendWorkersToInternal(nullptr, wt, maxCount, allowBusyWorkersToBeDetached);
}
void stopIfNotAlreadyStopped();
};
#endif
#include "Compute/SpWorker.hpp"
#include "Compute/SpComputeEngine.hpp"
#include "TaskGraph/SpAbstractTaskGraph.hpp"
std::atomic<long int> SpWorker::totalNbThreadsCreated = 1;
void SpWorker::start() {
if(!t.joinable()) {
t = std::thread([&]() {
SpUtils::SetThreadId(threadId);
while(!stopFlag.load(std::memory_order_relaxed)) {
SpComputeEngine* saveCe = nullptr;
// Using memory order acquire on ce.load to form release/acquire pair
// I think we could use memory order consume as all the code that follows depends on the load of ce (through saveCe).
if((saveCe = ce.load(std::memory_order_acquire))) {
if(saveCe->areThereAnyWorkersToMigrate()) {
if(saveCe->areWorkersToMigrateOfType(wt)) {
auto previousNbOfWorkersToMigrate = saveCe->fetchDecNbOfWorkersToMigrate();
if(previousNbOfWorkersToMigrate > 0) {
SpComputeEngine* newCe = saveCe->getCeToMigrateTo();
ce.store(newCe, std::memory_order_relaxed);
auto previousMigrationSignalingCounterVal = saveCe->fetchDecMigrationSignalingCounter();
if(previousMigrationSignalingCounterVal == 1) {
saveCe->notifyMigrationFinished();
}
continue;
}
}
}
if(saveCe->areThereAnyReadyTasks()){
SpAbstractTask* task = saveCe->getTask();
if(task) {
SpAbstractTaskGraph* atg = task->getAbstractTaskGraph();
atg->preTaskExecution(task);
execute(task);
atg->postTaskExecution(task);
continue;
}
}
waitOnCe(saveCe);
} else {
idleWait();
}
}
});
}
}
void SpWorker::waitOnCe(SpComputeEngine* inCe) {
inCe->wait(*this);
}
#ifndef SPWORKER_HPP
#define SPWORKER_HPP
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <thread>
#include "Utils/SpModes.hpp"
#include "Utils/SpUtils.hpp"
#include "Tasks/SpAbstractTask.hpp"
#include "Utils/small_vector.hpp"
class SpComputeEngine;
class SpWorker {
public:
enum class SpWorkerType {
CPU_WORKER,
GPU_WORKER
};
static std::atomic<long int> totalNbThreadsCreated;
static auto createATeamOfNCpuWorkers(const int nbCpuWorkers) {
small_vector<std::unique_ptr<SpWorker>> res;
res.reserve(nbCpuWorkers);
for(int i = 0; i < nbCpuWorkers; i++) {
res.emplace_back(std::make_unique<SpWorker>(SpWorker::SpWorkerType::CPU_WORKER));
}
return res;
}
static auto createDefaultWorkerTeam() {
return createATeamOfNCpuWorkers(SpUtils::DefaultNumThreads());
}
private:
const SpWorkerType wt;
std::mutex workerMutex;
std::condition_variable workerConditionVariable;
std::atomic<bool> stopFlag;
std::atomic<SpComputeEngine*> ce;
long int threadId;
std::thread t;
private:
void setStopFlag(const bool inStopFlag) {
stopFlag.store(inStopFlag, std::memory_order_relaxed);
}
bool hasBeenStopped() const {
return stopFlag.load(std::memory_order_relaxed);
}
SpComputeEngine* getComputeEngine() const {
return ce.load(std::memory_order_relaxed);
}
void execute(SpAbstractTask *task) {
switch(this->getType()) {
case SpWorkerType::CPU_WORKER:
task->execute(SpCallableType::CPU);
break;
case SpWorkerType::GPU_WORKER:
task->execute(SpCallableType::GPU);
break;
default:
assert(false && "Worker is of unknown type.");
}
}
void waitForThread() {
t.join();
}
void stop() {
if(t.joinable()) {
if(stopFlag.load(std::memory_order_relaxed)) {
{
std::unique_lock<std::mutex> workerLock(workerMutex);
stopFlag.store(true, std::memory_order_relaxed);
}
workerConditionVariable.notify_one();
}
waitForThread();
}
}
void bindTo(SpComputeEngine* inCe) {
if(inCe) {
{
std::unique_lock workerLock(workerMutex);
ce.store(inCe, std::memory_order_release);
}
workerConditionVariable.notify_one();
}
}
void idleWait() {
std::unique_lock<std::mutex> workerLock(workerMutex);
workerConditionVariable.wait(workerLock, [&]() { return stopFlag.load(std::memory_order_relaxed) || ce.load(std::memory_order_relaxed); });
}
void waitOnCe(SpComputeEngine* inCe);
friend class SpComputeEngine;
public:
explicit SpWorker(const SpWorkerType inWt) :
wt(inWt), workerMutex(), workerConditionVariable(),
stopFlag(false), ce(nullptr), threadId(0), t() {
threadId = totalNbThreadsCreated.fetch_add(1, std::memory_order_relaxed);
}
SpWorker(const SpWorker& other) = delete;
SpWorker(SpWorker&& other) = delete;
SpWorker& operator=(const SpWorker& other) = delete;
SpWorker& operator=(SpWorker&& other) = delete;
~SpWorker() {
stop();
}
SpWorkerType getType() const {
return wt;
}
void start();
};
#endif
......@@ -7,6 +7,7 @@
#include <fstream>
#include <cmath>
#include <iterator>
#include "Tasks/SpAbstractTask.hpp"
#include "Utils/SpTimePoint.hpp"
......@@ -16,12 +17,38 @@
namespace SpSvgTrace {
inline void GenerateTrace(const std::string& outputFilename, const std::list<SpAbstractTask*>& tasksFinished,
const SpTimePoint& startingTime, const int nbThreads, const bool showDependences) {
const SpTimePoint& startingTime, const bool showDependences) {
std::ofstream svgfile(outputFilename);
if(svgfile.is_open() == false){
throw std::invalid_argument("Cannot open filename : " + outputFilename);
}
const auto threadIds =
[&]() {
std::vector<long int> res;
res.reserve(tasksFinished.size());
std::transform(std::begin(tasksFinished), std::end(tasksFinished), std::back_inserter(res),
[](SpAbstractTask* task){
return task->getThreadIdComputer();
});
std::sort(std::begin(res), std::end(res));
res.erase(std::unique(std::begin(res), std::end(res)), std::end(res));
return res;
}();
const int nbThreads = static_cast<int>(threadIds.size());
const auto threadIdsToVerticalSlotPosMap =
[&]() {
std::unordered_map<long int, long int> mapping;
for(long int i=0; i < static_cast<long int>(threadIds.size()); i++) {
mapping[threadIds[i]] = i+1;
}
return mapping;
}();
const long int vsizeperthread = std::max(100, std::min(200, 2000/nbThreads));
const long int vmargin = 100;
......@@ -70,22 +97,23 @@ inline void GenerateTrace(const std::string& outputFilename, const std::list<SpA
"\" font-size=\"30\" fill=\"black\">" << label << "</text>\n";
}
for(int idxThread = 0 ; idxThread < nbThreads ; ++idxThread){
for(auto idxThread : threadIds) {
auto yPos = threadIdsToVerticalSlotPosMap.at(idxThread);
svgfile << " <rect width=\"" << hdimtime << "\" height=\"" << vsizeperthread
<< "\" x=\"" << hmargin << "\" y=\"" << idxThread*(vsizeperthread+threadstrock) + vmargin << "\" style=\"fill:white;stroke:black;stroke-width:" << threadstrock << "\" />\n";
<< "\" x=\"" << hmargin << "\" y=\"" << (yPos-1)*(vsizeperthread+threadstrock) + vmargin << "\" style=\"fill:white;stroke:black;stroke-width:" << threadstrock << "\" />\n";
const std::string label = "Thread " + std::to_string(idxThread);
svgfile << "<text x=\"" << hmargin/2 << "\" y=\"" << idxThread*(vsizeperthread+threadstrock) + vmargin + label.size()*30 - vsizeperthread/2 <<
svgfile << "<text x=\"" << hmargin/2 << "\" y=\"" << (yPos-1)*(vsizeperthread+threadstrock) + vmargin + label.size()*30 - vsizeperthread/2 <<
"\" font-size=\"30\" fill=\"black\" transform=\"rotate(-90, " << hmargin/2 << " "
<< idxThread*(vsizeperthread+threadstrock) + vmargin + label.size()*30 - vsizeperthread/2 << ")\">" << label << "</text>\n";
<< (yPos-1)*(vsizeperthread+threadstrock) + vmargin + label.size()*30 - vsizeperthread/2 << ")\">" << label << "</text>\n";
}
std::unordered_map<std::string, std::string> colors;
for(const auto& atask : tasksFinished){
const long int idxThreadComputer = atask->getThreadIdComputer();
const long int ypos_start = (idxThreadComputer-1)*(vsizeperthread+threadstrock) + threadstrock/2 + vmargin;
const long int ypos_start = (threadIdsToVerticalSlotPosMap.at(idxThreadComputer)-1)*(vsizeperthread+threadstrock) + threadstrock/2 + vmargin;
const long int ypos_end = ypos_start + vsizeperthread - threadstrock;
const double taskStartTime = startingTime.differenceWith(atask->getStartingTime());
const double taskEndTime = startingTime.differenceWith(atask->getEndingTime());
......@@ -139,7 +167,7 @@ inline void GenerateTrace(const std::string& outputFilename, const std::list<SpA
for(const auto& atask : tasksFinished){
atask->getDependences(&deps);
const long int ypos_start = (atask->getThreadIdComputer()-1)*(vsizeperthread+threadstrock) + threadstrock/2 + vmargin + vsizeperthread/2;
const long int ypos_start = (threadIdsToVerticalSlotPosMap.at(atask->getThreadIdComputer())-1)*(vsizeperthread+threadstrock) + threadstrock/2 + vmargin + vsizeperthread/2;
const double taskEndTime = startingTime.differenceWith(atask->getEndingTime());
const long int xpos_start = static_cast<long int>(double(hdimtime)*taskEndTime/duration) + hmargin;
......@@ -147,7 +175,7 @@ inline void GenerateTrace(const std::string& outputFilename, const std::list<SpA
for(const auto& taskDep : deps){
if(alreadyExist.find(taskDep) == alreadyExist.end()){
const long int ypos_end = (taskDep->getThreadIdComputer()-1)*(vsizeperthread+threadstrock) + threadstrock/2 + vmargin + vsizeperthread/2;
const long int ypos_end = (threadIdsToVerticalSlotPosMap.at(taskDep->getThreadIdComputer())-1)*(vsizeperthread+threadstrock) + threadstrock/2 + vmargin + vsizeperthread/2;
const long int depstrocke = 1;
const double taskStartTime = startingTime.differenceWith(taskDep->getStartingTime());
const long int xpos_end = static_cast<long int>(double(hdimtime)*taskStartTime/duration) + hmargin;
......
This diff is collapsed.
......@@ -7,11 +7,12 @@
#include <vector>
#include <queue>
#include <utility>
#include "Tasks/SpAbstractTask.hpp"
#include "Utils/SpPriority.hpp"
#include "Utils/small_vector.hpp"
#include "Speculation/SpSpeculativeModel.hpp"
class SpPrioScheduler{
struct ComparePrio{
......@@ -47,13 +48,21 @@ public:
tasksReady.push(newTask);
return 1;