Commit 0c3f83af authored by BRAMAS Berenger's avatar BRAMAS Berenger
Browse files

add support for the redux for the P2P

parent 23b9e49c
......@@ -21,6 +21,7 @@
#include <starpu.h>
#include "../StarPUUtils/FStarPUUtils.hpp"
#include "../StarPUUtils/FStarPUFmmPriorities.hpp"
#include "../StarPUUtils/FStarPUReduxCpu.hpp"
#ifdef STARPU_USE_CPU
#include "../StarPUUtils/FStarPUCpuWrapper.hpp"
......@@ -102,6 +103,12 @@ protected:
starpu_codelet p2p_cl_in;
starpu_codelet p2p_cl_inout;
#ifdef STARPU_USE_REDUX
starpu_codelet p2p_redux_init;
starpu_codelet p2p_redux_perform;
starpu_codelet p2p_redux_read;
#endif
#ifdef STARPU_USE_CPU
StarPUCpuWrapperClass cpuWrapper;
#endif
......@@ -154,6 +161,8 @@ public:
FStarPUFmmPriorities::Controller().init(&conf, tree->getHeight(), inKernels);
FAssertLF(starpu_init(&conf) == 0);
starpu_malloc_set_align(32);
starpu_pthread_mutex_t initMutex;
starpu_pthread_mutex_init(&initMutex, NULL);
#ifdef STARPU_USE_CPU
......@@ -310,6 +319,9 @@ protected:
if( operationsToProceed & FFmmP2P ) directPass();
if( operationsToProceed & FFmmL2P && !directOnly) mergePass();
#ifdef STARPU_USE_REDUX
if( operationsToProceed & FFmmL2P && !directOnly) readParticle();
#endif
starpu_task_wait_for_all();
starpu_pause();
......@@ -446,7 +458,11 @@ protected:
l2p_cl.modes[0] = STARPU_R;
l2p_cl.modes[1] = STARPU_R;
l2p_cl.modes[2] = STARPU_R;
#ifdef STARPU_USE_REDUX
l2p_cl.modes[3] = STARPU_REDUX;
#else
l2p_cl.modes[3] = starpu_data_access_mode(STARPU_RW|STARPU_COMMUTE_IF_SUPPORTED);
#endif
l2p_cl.name = "l2p_cl";
memset(&p2p_cl_in, 0, sizeof(p2p_cl_in));
......@@ -470,7 +486,11 @@ protected:
#endif
p2p_cl_in.nbuffers = 2;
p2p_cl_in.modes[0] = STARPU_R;
#ifdef STARPU_USE_REDUX
p2p_cl_in.modes[1] = STARPU_REDUX;
#else
p2p_cl_in.modes[1] = starpu_data_access_mode(STARPU_RW|STARPU_COMMUTE_IF_SUPPORTED);
#endif
p2p_cl_in.name = "p2p_cl_in";
memset(&p2p_cl_inout, 0, sizeof(p2p_cl_inout));
#ifdef STARPU_USE_CPU
......@@ -493,9 +513,17 @@ protected:
#endif
p2p_cl_inout.nbuffers = 4;
p2p_cl_inout.modes[0] = STARPU_R;
#ifdef STARPU_USE_REDUX
p2p_cl_inout.modes[1] = STARPU_REDUX;
#else
p2p_cl_inout.modes[1] = starpu_data_access_mode(STARPU_RW|STARPU_COMMUTE_IF_SUPPORTED);
#endif
p2p_cl_inout.modes[2] = STARPU_R;
#ifdef STARPU_USE_REDUX
p2p_cl_inout.modes[3] = STARPU_REDUX;
#else
p2p_cl_inout.modes[3] = starpu_data_access_mode(STARPU_RW|STARPU_COMMUTE_IF_SUPPORTED);
#endif
p2p_cl_inout.name = "p2p_cl_inout";
memset(&m2l_cl_in, 0, sizeof(m2l_cl_in));
......@@ -548,6 +576,50 @@ protected:
m2l_cl_inout.modes[2] = STARPU_R;
m2l_cl_inout.modes[3] = STARPU_R;
m2l_cl_inout.name = "m2l_cl_inout";
#ifdef STARPU_USE_REDUX
memset(&p2p_redux_init, 0, sizeof(p2p_redux_init));
#ifdef STARPU_USE_CPU
p2p_redux_init.cpu_funcs[0] = FStarPUReduxCpu::InitData<typename ParticleGroupClass::ParticleDataType>;
p2p_redux_init.where |= STARPU_CPU;
#endif
p2p_redux_init.nbuffers = 1;
p2p_redux_init.modes[0] = STARPU_RW;
p2p_redux_init.name = "p2p_redux_init";
memset(&p2p_redux_perform, 0, sizeof(p2p_redux_perform));
#ifdef STARPU_USE_CPU
p2p_redux_perform.cpu_funcs[0] = FStarPUReduxCpu::ReduceData<typename ParticleGroupClass::ParticleDataType>;
p2p_redux_perform.where |= STARPU_CPU;
#endif
p2p_redux_perform.nbuffers = 2;
p2p_redux_perform.modes[0] = STARPU_RW;
p2p_redux_perform.modes[1] = STARPU_R;
p2p_redux_perform.name = "p2p_redux_perform";
memset(&p2p_redux_read, 0, sizeof(p2p_redux_read));
#ifdef STARPU_USE_CPU
if(originalCpuKernel->supportL2P(FSTARPU_CPU_IDX)){
p2p_redux_read.cpu_funcs[0] = FStarPUReduxCpu::EmptyCodelet<typename ParticleGroupClass::ParticleDataType>;
p2p_redux_read.where |= STARPU_CPU;
}
#endif
#ifdef SCALFMM_ENABLE_CUDA_KERNEL
if(originalCpuKernel->supportL2P(FSTARPU_CUDA_IDX)){
p2p_redux_read.cuda_funcs[0] = FStarPUReduxCpu::EmptyCodelet<typename ParticleGroupClass::ParticleDataType>;
p2p_redux_read.where |= STARPU_CUDA;
}
#endif
#ifdef SCALFMM_ENABLE_OPENCL_KERNEL
if(originalCpuKernel->supportL2P(FSTARPU_OPENCL_IDX)){
p2p_redux_read.opencl_funcs[0] = FStarPUReduxCpu::EmptyCodelet<typename ParticleGroupClass::ParticleDataType>;
p2p_redux_read.where |= STARPU_OPENCL;
}
#endif
p2p_redux_read.nbuffers = 1;
p2p_redux_read.modes[0] = STARPU_R;
p2p_redux_read.name = "p2p_redux_read";
#endif
}
/** dealloc in a starpu way all the defined handles */
......@@ -601,10 +673,15 @@ protected:
(uintptr_t)containers->getRawBuffer(), containers->getBufferSizeInByte());
starpu_variable_data_register(&particleHandles[idxGroup].down, 0,
(uintptr_t)containers->getRawAttributesBuffer(), containers->getAttributesBufferSizeInByte());
particleHandles[idxGroup].intervalSize = int(containers->getNumberOfLeavesInBlock());
#ifdef STARPU_USE_REDUX
starpu_data_set_reduction_methods(particleHandles[idxGroup].down, &p2p_redux_perform,
&p2p_redux_init);
#else
#ifdef STARPU_SUPPORT_ARBITER
starpu_data_assign_arbiter(particleHandles[idxGroup].down, arbiterGlobal);
#endif
#endif // STARPU_SUPPORT_ARBITER
#endif // STARPU_USE_REDUX
particleHandles[idxGroup].intervalSize = int(containers->getNumberOfLeavesInBlock());
}
}
}
......@@ -1100,9 +1177,17 @@ protected:
STARPU_VALUE, &particleHandles[idxGroup].intervalSize, sizeof(int),
STARPU_PRIORITY, FStarPUFmmPriorities::Controller().getInsertionPosP2PExtern(),
STARPU_R, particleHandles[idxGroup].symb,
#ifdef STARPU_USE_REDUX
STARPU_REDUX, particleHandles[idxGroup].down,
#else
(STARPU_RW|STARPU_COMMUTE_IF_SUPPORTED), particleHandles[idxGroup].down,
#endif
STARPU_R, particleHandles[interactionid].symb,
#ifdef STARPU_USE_REDUX
STARPU_REDUX, particleHandles[interactionid].down,
#else
(STARPU_RW|STARPU_COMMUTE_IF_SUPPORTED), particleHandles[interactionid].down,
#endif
#ifdef STARPU_USE_TASK_NAME
STARPU_NAME, p2pOuterTaskNames.get(),
#endif
......@@ -1117,7 +1202,11 @@ protected:
STARPU_VALUE, &particleHandles[idxGroup].intervalSize, sizeof(int),
STARPU_PRIORITY, FStarPUFmmPriorities::Controller().getInsertionPosP2P(),
STARPU_R, particleHandles[idxGroup].symb,
#ifdef STARPU_USE_REDUX
STARPU_REDUX, particleHandles[idxGroup].down,
#else
(STARPU_RW|STARPU_COMMUTE_IF_SUPPORTED), particleHandles[idxGroup].down,
#endif
#ifdef STARPU_USE_TASK_NAME
STARPU_NAME, p2pTaskNames.get(),
#endif
......@@ -1146,7 +1235,11 @@ protected:
STARPU_R, cellHandles[tree->getHeight()-1][idxGroup].symb,
STARPU_R, cellHandles[tree->getHeight()-1][idxGroup].down,
STARPU_R, particleHandles[idxGroup].symb,
#ifdef STARPU_USE_REDUX
STARPU_REDUX, particleHandles[idxGroup].down,
#else
(STARPU_RW|STARPU_COMMUTE_IF_SUPPORTED), particleHandles[idxGroup].down,
#endif
#ifdef STARPU_USE_TASK_NAME
STARPU_NAME, l2pTaskNames.get(),
#endif
......@@ -1155,6 +1248,25 @@ protected:
FLOG( FLog::Controller << "\t\t L2P in " << timer.tacAndElapsed() << "s\n" );
}
#ifdef STARPU_USE_REDUX
void readParticle(){
FLOG( FTic timer; );
FAssertLF(cellHandles[tree->getHeight()-1].size() == particleHandles.size());
for(int idxGroup = 0 ; idxGroup < tree->getNbParticleGroup() ; ++idxGroup){
starpu_insert_task(&p2p_redux_read,
STARPU_PRIORITY, FStarPUFmmPriorities::Controller().getInsertionPosL2P(),
STARPU_R, particleHandles[idxGroup].down,
#ifdef STARPU_USE_TASK_NAME
STARPU_NAME, "read-particle",
#endif
0);
}
}
#endif
};
#endif // FGROUPTASKSTARPUALGORITHM_HPP
......@@ -3,6 +3,7 @@
#define FSTARPUFMMPRIORITIES_HPP
#include "../../Utils/FGlobal.hpp"
#include "FStarPUUtils.hpp"
#include "FStarPUKernelCapacities.hpp"
......@@ -180,11 +181,13 @@ public:
heteroprio->prio_mapping_per_arch_index[FSTARPU_CPU_IDX][cpuCountPrio++] = insertionPositionP2P;
heteroprio->buckets[insertionPositionP2P].valide_archs |= STARPU_CPU;
}
#ifndef STARPU_USE_REDUX
if( capacities->supportP2PExtern(FSTARPU_CPU_IDX)){
FLOG( FLog::Controller << "\t CPU prio P2P Extern " << cpuCountPrio << " bucket " << insertionPositionP2PExtern << "\n" );
heteroprio->prio_mapping_per_arch_index[FSTARPU_CPU_IDX][cpuCountPrio++] = insertionPositionP2PExtern;
heteroprio->buckets[insertionPositionP2PExtern].valide_archs |= STARPU_CPU;
}
#endif
if(capacities->supportM2L(FSTARPU_CPU_IDX)){
const int prioM2LAtLevel = getInsertionPosM2L(treeHeight-1);
FLOG( FLog::Controller << "\t CPU prio M2L " << cpuCountPrio << " bucket " << prioM2LAtLevel << "\n" );
......@@ -196,6 +199,13 @@ public:
heteroprio->prio_mapping_per_arch_index[FSTARPU_CPU_IDX][cpuCountPrio++] = insertionPositionL2P;
heteroprio->buckets[insertionPositionL2P].valide_archs |= STARPU_CPU;
}
#ifdef STARPU_USE_REDUX
if( capacities->supportP2PExtern(FSTARPU_CPU_IDX)){
FLOG( FLog::Controller << "\t CPU prio P2P Extern " << cpuCountPrio << " bucket " << insertionPositionP2PExtern << "\n" );
heteroprio->prio_mapping_per_arch_index[FSTARPU_CPU_IDX][cpuCountPrio++] = insertionPositionP2PExtern;
heteroprio->buckets[insertionPositionP2PExtern].valide_archs |= STARPU_CPU;
}
#endif
heteroprio->nb_prio_per_arch_index[FSTARPU_CPU_IDX] = unsigned(cpuCountPrio);
FLOG( FLog::Controller << "\t CPU Priorities: " << cpuCountPrio << "\n" );
}
......
......@@ -65,6 +65,10 @@
/////////////////////////////////////////////////////
#define STARPU_USE_REDUX
/////////////////////////////////////////////////////
enum FStarPUTypes{
// First will be zero
#ifdef STARPU_USE_CPU
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment