Commit 8a23d14c authored by Quentin Khan's avatar Quentin Khan
Browse files

First version of StarPU adaptive algorithm

parent 1c44fa04
#ifndef SCALFMM_STARPU_ALGO_HPP_
#define SCALFMM_STARPU_ALGO_HPP_
#include <algorithm>
#include <cmath> // Used to round box differences
#include <functional>
#include <map>
#include <memory>
#include <vector>
#include <unordered_map>
#include <starpu/1.2/starpu.h>
#include "FStarPUAdaptiveCodelets.hpp"
#include "Core/FCoreCommon.hpp"
#include "Containers/FTreeCoordinate.hpp"
#include "Utils/FAlgorithmTimers.hpp"
#include "Kernels/FKernelConcepts.hpp"
template<class _Tree, class _Kernel>
class FAdaptiveStarPU : public FAlgorithmInterface, public FAlgorithmTimers {
public:
using tree_t = _Tree;
using kernel_t = _Kernel;
using FReal = typename tree_t::FReal;
using node_t = typename tree_t::node_t;
private:
tree_t& _tree;
kernel_t& _ref_kernel;
/// Vector of kernels, one per thread
std::vector<std::unique_ptr<kernel_t>> _kernels;
starpu_codelet P2M_cl;
starpu_codelet M2M_cl;
starpu_codelet M2L_cl;
starpu_codelet L2L_cl;
starpu_codelet L2P_cl;
starpu_codelet P2P_cl;
starpu_codelet M2P_cl;
starpu_codelet P2L_cl;
struct node_data_handles;
std::unordered_map<node_t*, node_data_handles> _data_handles;
public:
FAdaptiveStarPU(tree_t& tree, kernel_t& kernel) :
_tree(tree),
_ref_kernel(kernel),
_kernels()
{}
std::string name() const override {
return "StarPU adaptive algorithm";
}
std::string description() const override {
return "";
}
using FAlgorithmInterface::execute;
/** \brief Run specific steps of the algorithm
*
* \param operations Specifies the algorithm operations to run, see
* FFmmOperations.
*/
void execute(const unsigned int operations) override {
this->run(operations);
}
void run(int operations = FFmmNearAndFarFields) {
this->setup_kernel(&(this->_ref_kernel));
this->setup_starpu();
auto run = [this](const char* name, void (FAdaptiveStarPU::*op)()) {
Timers[name].tic();
(this->*op)();;
Timers[name].tac();
std::cout << " " << name << ": " << Timers[name].elapsed() << std::endl;
};
if(operations & FFmmP2P) {
run("P2P", &FAdaptiveStarPU::u_list_step);
}
if(operations & FFmmP2M) {
run("P2M", &FAdaptiveStarPU::source_to_up);
}
if(operations & FFmmP2L) {
run("P2L", &FAdaptiveStarPU::x_list_step);
}
if(operations & FFmmM2M) {
run("M2M", &FAdaptiveStarPU::up_to_up);
}
if(operations & FFmmM2L) {
run("M2L", &FAdaptiveStarPU::v_list_step);
}
if(operations & FFmmM2P) {
run("M2P", &FAdaptiveStarPU::w_list_step);
}
if(operations & FFmmL2L) {
run("L2L", &FAdaptiveStarPU::down_to_down);
}
if(operations & FFmmL2P) {
run("L2P", &FAdaptiveStarPU::down_to_target);
}
this->cleanup_starpu();
this->cleanup_kernel(&this->_ref_kernel);
}
private:
template<class K>
auto setup_kernel(K* ker) -> decltype(ker->setup(_tree)) {
ker->setup(_tree);
}
template<class K>
auto setup_kernel(K* ker) -> decltype(ker->setup()) {
ker->setup();
}
void setup_kernel(...) {}
template<class K>
auto cleanup_kernel(K* ker) -> decltype(ker->cleanup(_tree)){
return ker->cleanup(_tree);
}
template<class K>
auto cleanup_kernel(K* ker) -> decltype(ker->cleanup()){
return ker->cleanup();
}
void cleanup_kernel(...) {}
template<class K>
auto fuse_kernel_results(K* ref, K* other) -> decltype(ref->fuse_results(*other)) {
return ref->fuse_results(*other);
}
void fuse_kernel_results(...) {}
/**
* \brief Data handle for StarPU dependency management
*/
struct node_data_handles {
node_data_handles(const node_data_handles&) = delete;
node_data_handles& operator=(const node_data_handles&) = delete;
starpu_data_handle_t symbolic;
starpu_data_handle_t multipole;
starpu_data_handle_t local_exp;
starpu_data_handle_t particles;
bool registered_particles = false;
node_data_handles(node_t* n) : registered_particles(false) {
starpu_variable_data_register(
&(this->symbolic),
STARPU_MAIN_RAM,
reinterpret_cast<uintptr_t>(&(n->getSymbolicData())),
sizeof(n->getSymbolicData()));
starpu_variable_data_register(
&(this->multipole),
STARPU_MAIN_RAM,
reinterpret_cast<uintptr_t>(&(n->getData()->multipole)),
sizeof(n->getData()->multipole));
starpu_variable_data_register(
&(this->local_exp),
STARPU_MAIN_RAM,
reinterpret_cast<uintptr_t>(&(n->getData()->local_expansion)),
sizeof(n->getData()->local_expansion));
if(n->is_leaf()) {
starpu_variable_data_register(
&(this->particles),
STARPU_MAIN_RAM,
reinterpret_cast<uintptr_t>(n->getParticleContainer()),
sizeof(*(n->getParticleContainer())));
this->registered_particles = true;
}
}
~node_data_handles() {
starpu_data_unregister(this->symbolic);
starpu_data_unregister(this->multipole);
starpu_data_unregister(this->local_exp);
if(this->registered_particles) {
starpu_data_unregister(this->particles);
this->registered_particles = false;
}
}
};
starpu_codelet setup_worker_kernel_cl;
static void setup_worker_kernel(void**, void* cl_arg) {
FAdaptiveStarPU* algo = (FAdaptiveStarPU*) cl_arg;
auto& ref_kernel = algo->_ref_kernel;
auto& kernels = algo->_kernels;
int id = starpu_worker_get_id();
std::cout << "initialising worker " << id << '\n';
kernels[id].reset(new kernel_t(ref_kernel));
algo->setup_kernel(kernels[id].get());
}
void setup_starpu() {
if(0 != starpu_init(NULL)) {
std::cerr << "StarPU could not be initialized.\n";
std::exit(EXIT_FAILURE);
}
this->_kernels.resize(starpu_worker_get_count());
std::cerr << "StarPU worker count: " << starpu_worker_get_count() << "\n";
starpu_codelet_init(&this->setup_worker_kernel_cl);
setup_worker_kernel_cl.cpu_funcs[0] = setup_worker_kernel;
for(unsigned int i = 0; i < starpu_worker_get_count(); ++i) {
starpu_task* task = starpu_task_create();
task->cl = &this->setup_worker_kernel_cl;
task->cl_arg = this;
task->execute_on_a_specific_worker = true;
task->workerid = i;
task->synchronous = true;
this->submit_starpu_task(task);
}
this->init_P2M_codelet();
this->init_M2M_codelet();
this->init_M2L_codelet();
this->init_L2L_codelet();
this->init_L2P_codelet();
this->init_P2P_codelet();
this->init_M2P_codelet();
this->init_P2L_codelet();
for(auto& node : _tree.pre_order_walk()) {
this->_data_handles.emplace(&node, &node);
}
}
starpu_codelet cleanup_worker_kernel_cl;
static void cleanup_worker_kernel(void**, void* cl_arg) {
FAdaptiveStarPU* algo = (FAdaptiveStarPU*) cl_arg;
auto& ref_kernel = algo->_ref_kernel;
auto& kernels = algo->_kernels;
int id = starpu_worker_get_id();
std::cout << "cleaning up worker " << id << '\n';
algo->fuse_kernel_results(&ref_kernel, kernels[id].get());
algo->cleanup_kernel(kernels[id].get());
}
void cleanup_starpu() {
starpu_task_wait_for_all();
this->_data_handles.clear();
starpu_codelet_init(&this->cleanup_worker_kernel_cl);
cleanup_worker_kernel_cl.cpu_funcs[0] = cleanup_worker_kernel;
for(unsigned int i = 0; i < starpu_worker_get_count(); ++i) {
starpu_task* task = starpu_task_create();
task->cl = &this->cleanup_worker_kernel_cl;
task->cl_arg = this;
task->execute_on_a_specific_worker = true;
task->workerid = i;
task->synchronous = true;
this->submit_starpu_task(task);
}
starpu_shutdown();
}
void submit_starpu_task(starpu_task* task) {
const auto res = starpu_task_submit(task);
if(0 != res) {
std::cerr << task->cl->cpu_funcs_name[0] << " was not submitted...\n";
if(res == -ENODEV) {
std::cerr << "No device found to execute task.\n";
}
}
}
static kernel_t* get_starpu_worker_kernel(FAdaptiveStarPU* algo) {
int worker_id = starpu_worker_get_id();
return algo->_kernels[worker_id].get();
}
// P2M
void source_to_up() {
for(node_t* leaf : this->_tree.leaves()) {
if(leaf->getParticleContainer()->size() != 0) {
node_data_handles& leaf_handles = this->_data_handles.at(leaf);
starpu_task* task = starpu_task_create();
task->cl = &(this->P2M_cl);
task->handles[0] = leaf_handles.multipole;
task->handles[1] = leaf_handles.particles;
task->cl_arg = this;
task->cl_arg_size = sizeof(this);
this->submit_starpu_task(task);
}
}
}
void init_P2M_codelet() {
starpu_codelet_init(&this->P2M_cl);
this->P2M_cl.cpu_funcs[0] = P2M_cpu;
this->P2M_cl.cpu_funcs_name[0] = {const_cast<char*>("P2M task")};
this->P2M_cl.nbuffers = 2; // {0: leaf multipole, 1: leaf particles}
this->P2M_cl.modes[0] = STARPU_RW;
this->P2M_cl.modes[1] = STARPU_R;
}
static void P2M_cpu(void** buffers, void* cl_arg) {
typename tree_t::node_t::data_t::multipole_t* leaf_multipole =
(typename tree_t::node_t::data_t::multipole_t*)
STARPU_VARIABLE_GET_PTR(buffers[0]);
typename tree_t::node_t::particle_container_t* particle_container =
(typename tree_t::node_t::particle_container_t*)
STARPU_VARIABLE_GET_PTR(buffers[1]);
auto algo = (FAdaptiveStarPU<tree_t, kernel_t>*) cl_arg;
kernel_t* ker = get_starpu_worker_kernel(algo);
ker->P2M(leaf_multipole, particle_container);
}
// M2M
void up_to_up() {
for(node_t& node : _tree.post_order_walk()) {
if(! node.is_leaf()) {
starpu_task* task = starpu_task_create();
task->cl = &(this->M2M_cl);
task->cl_arg = this;
task->cl_arg_size = sizeof(this);
auto children = node.getChildren();
starpu_data_handle_t* handles = task->handles;
starpu_data_access_mode* modes = task->modes;
// Setup handle array to be used and buffer count
task->nbuffers = 2;
// Get total handle count
for(auto child : children) {
task->nbuffers += (child != nullptr);
}
if(task->nbuffers > STARPU_NMAXBUFS) {
// Allocate dynamic StaPU dynamic handles
// TODO: check those are freed by StarPU
task->dyn_handles = (starpu_data_handle_t*)
std::malloc(task->nbuffers * sizeof(starpu_data_handle_t));
task->dyn_modes = (starpu_data_access_mode*)
std::malloc(task->nbuffers * sizeof(starpu_data_access_mode));
handles = task->dyn_handles;
modes = task->dyn_modes;
}
// Set parent multipole handle
handles[0] = this->_data_handles.at(&node).symbolic;
modes[0] = STARPU_R;
handles[1] = this->_data_handles.at(&node).multipole;
modes[1] = STARPU_RW;
// Set children multipole handles
for(std::size_t i = 0, j = 2; i < node_t::child_count; ++i) {
if(nullptr != children[i]) {
handles[j] = this->_data_handles.at(children[i]).multipole;
modes[j] = STARPU_R;
++j;
}
}
this->submit_starpu_task(task);
}
}
}
void init_M2M_codelet() {
starpu_codelet_init(&this->M2M_cl);
this->M2M_cl.cpu_funcs[0] = M2M_cpu;
this->M2M_cl.cpu_funcs_name[0] = {const_cast<char*>("M2M task")};
this->M2M_cl.nbuffers = STARPU_VARIABLE_NBUFFERS;
}
static void M2M_cpu(void** buffers, void* cl_arg) {
using symb_data_t = typename tree_t::node_t::symbolic_data_t;
using multipole_t = typename tree_t::node_t::data_t::multipole_t;
const starpu_task* const current_task = starpu_task_get_current();
std::size_t buffer_count = current_task->nbuffers;
const symb_data_t* node_symbolic =
(symb_data_t*) STARPU_VARIABLE_GET_PTR(buffers[0]);
multipole_t* node_multipole =
(multipole_t*) STARPU_VARIABLE_GET_PTR(buffers[1]);
multipole_t* children_multipole[node_t::child_count] = {};
// Children buffer indices start at 2
for(std::size_t i = 2; i < buffer_count; ++i) {
children_multipole[i-2] =
(typename tree_t::node_t::data_t::multipole_t*)
STARPU_VARIABLE_GET_PTR(buffers[i]);
}
auto algo = (FAdaptiveStarPU<tree_t, kernel_t>*) cl_arg;
kernel_t* ker = get_starpu_worker_kernel(algo);
ker->M2M(node_multipole, children_multipole,
static_cast<int>(node_symbolic->depth));
}
// M2L
void v_list_step() {
for(node_t& node : _tree.in_order_walk()) {
// Empty leaves have an empty multipole, we skip them
if(node.is_leaf() && node.getParticleContainer()->size() == 0) {
continue;
}
starpu_task* task = starpu_task_create();
task->cl = &(this->M2L_cl);
task->cl_arg = this;
task->cl_arg_size = sizeof(this);
// Returns true if the V item must be used
auto pick_item = [](const node_t* v) {
return ! v->is_leaf() || v->getParticleContainer()->size() != 0;
};
auto v_item_count = std::count_if(node.V.begin(), node.V.end(), pick_item);
// V-items symbolic data and multipole, plus the local expansion and
// its symbolic data.
task->nbuffers = 2 + (2 * static_cast<int>(v_item_count));
task->dyn_handles = (starpu_data_handle_t*)
std::malloc(task->nbuffers * sizeof(starpu_data_handle_t));
task->dyn_modes = (starpu_data_access_mode*)
std::malloc(task->nbuffers * sizeof(starpu_data_access_mode));
task->dyn_handles[0] = this->_data_handles.at(&node).symbolic;
task->dyn_handles[1] = this->_data_handles.at(&node).local_exp;
task->dyn_modes[0] = STARPU_R;
task->dyn_modes[1] = STARPU_RW;
std::size_t j = 2;
for(node_t* v_item : node.V) {
if(! pick_item(v_item)) {
continue;
}
task->dyn_handles[j+0] = this->_data_handles.at(v_item).symbolic;
task->dyn_handles[j+1] = this->_data_handles.at(v_item).multipole;
task->dyn_modes[j+0] = STARPU_R;
task->dyn_modes[j+1] = STARPU_RW;
j += 2;
}
this->submit_starpu_task(task);
}
}
void init_M2L_codelet() {
starpu_codelet_init(&this->M2L_cl);
this->M2L_cl.cpu_funcs[0] = M2L_cpu;
this->M2L_cl.cpu_funcs_name[0] = {const_cast<char*>("M2L task")};
this->M2L_cl.nbuffers = STARPU_VARIABLE_NBUFFERS;
}
// buffer order: symbolic local_exp then symbolic multipole ....
static void M2L_cpu(void** buffers, void* cl_arg) {
using symb_data_t = typename tree_t::node_t::symbolic_data_t;
using multipole_t = typename tree_t::node_t::data_t::multipole_t;
using local_exp_t = typename tree_t::node_t::data_t::local_expansion_t;
const starpu_task* const current_task = starpu_task_get_current();
std::size_t buffer_count = current_task->nbuffers;
const symb_data_t* node_symbolic =
(symb_data_t*) STARPU_VARIABLE_GET_PTR(buffers[0]);
local_exp_t* node_local_exp =
(local_exp_t*) STARPU_VARIABLE_GET_PTR(buffers[1]);
std::vector<const multipole_t*> multipoles;
std::vector<int> offset_indices;
// two buffers per V item, first is for the local exp
multipoles.reserve((buffer_count-2) / 2);
offset_indices.reserve((buffer_count-2) / 2);
for(std::size_t i = 2; i < buffer_count; i += 2) {
const symb_data_t* v_item_symb = (symb_data_t*) STARPU_VARIABLE_GET_PTR(buffers[i]);
offset_indices.emplace_back(compute_box_offset_index(*node_symbolic, *v_item_symb, 3));
multipoles.emplace_back((multipole_t*) STARPU_VARIABLE_GET_PTR(buffers[i+1]));
}
auto algo = (FAdaptiveStarPU<tree_t, kernel_t>*) cl_arg;
kernel_t* ker = get_starpu_worker_kernel(algo);
ker->M2L(node_local_exp, multipoles.data(), offset_indices.data(),
static_cast<int>(multipoles.size()),
static_cast<int>(node_symbolic->depth));
}
// P2L
void x_list_step() {
/* NOTE: the X list and W list are complementary: if A is in X(B) then B
* is in W(A).
*
* We loop over the leaves first to detect the empty ones early on.
*/
for(node_t* leaf : _tree.leaves()) {
if(leaf->getParticleContainer()->size() == 0) {
continue;
}
for(node_t* w_item : leaf->W) {
if(w_item->is_leaf() && w_item->getParticleContainer()->size() == 0) {
continue;
}
starpu_task* task = starpu_task_create();
task->cl = &(this->P2L_cl);
task->handles[0] = this->_data_handles.at(w_item).local_exp;
task->handles[1] = this->_data_handles.at(leaf).particles;
task->cl_arg = this;
task->cl_arg_size = sizeof(this);
this->submit_starpu_task(task);
}
}
}
void init_P2L_codelet() {
starpu_codelet_init(&this->P2L_cl);
this->P2L_cl.cpu_funcs[0] = P2L_cpu;
this->P2L_cl.cpu_funcs_name[0] = {const_cast<char*>("P2L task")};
this->P2L_cl.nbuffers = 2; // {0: leaf particles, 1: node local exp}
this->P2L_cl.modes[0] = STARPU_RW;
this->P2L_cl.modes[1] = STARPU_R;
}
static void P2L_cpu(void** buffers, void* cl_arg) {
typename tree_t::node_t::data_t::local_expansion_t* leaf_local_exp =
(typename tree_t::node_t::data_t::local_expansion_t*)
STARPU_VARIABLE_GET_PTR(buffers[0]);
typename tree_t::node_t::particle_container_t* particle_container =
(typename tree_t::node_t::particle_container_t*)
STARPU_VARIABLE_GET_PTR(buffers[1]);
auto algo = (FAdaptiveStarPU<tree_t, kernel_t>*) cl_arg;
kernel_t* ker = get_starpu_worker_kernel(algo);
ker->P2L(leaf_local_exp, particle_container);
}
// L2L
void down_to_down() {
for(node_t& node : _tree.pre_order_walk()) {
if(node.is_leaf()) {
continue;
}
starpu_task* task = starpu_task_create();
task->cl = &(this->L2L_cl);
task->cl_arg = this;
task->cl_arg_size = sizeof(this);
auto children = node.getChildren();
starpu_data_handle_t* handles = task->handles;
starpu_data_access_mode* modes = task->modes;
// Setup handle array to be used and buffer count
task->nbuffers = 2;
// Get total handle count
for(auto child : children) {
task->nbuffers += (child != nullptr);