Commit 31427e57 authored by PIACIBELLO Cyrille's avatar PIACIBELLO Cyrille
Browse files

FFmmAlgorithmProc fonctionne à nouveau

parent 499701b4
......@@ -38,1314 +38,1357 @@
#include "FCoreCommon.hpp"
/**
* @author Berenger Bramas (berenger.bramas@inria.fr)
* @class FFmmAlgorithmThreadProc
* @brief
* Please read the license
*
* This class is a threaded FMM algorithm with mpi.
* It just iterates on a tree and call the kernels with good arguments.
* It used the inspector-executor model :
* iterates on the tree and builds an array to work in parallel on this array
*
* Of course this class does not deallocate pointer given in arguements.
*
* Threaded & based on the inspector-executor model
* schedule(runtime) export OMP_NUM_THREADS=2
* export OMPI_CXX=`which g++-4.4`
* mpirun -np 2 valgrind --suppressions=/usr/share/openmpi/openmpi-valgrind.supp
* --tool=memcheck --leak-check=yes --show-reachable=yes --num-callers=20 --track-fds=yes
* ./Tests/testFmmAlgorithmProc ../Data/testLoaderSmall.fma.tmp
*/
* @author Berenger Bramas (berenger.bramas@inria.fr)
* @class FFmmAlgorithmThreadProc
* @brief
* Please read the license
*
* This class is a threaded FMM algorithm with mpi.
* It just iterates on a tree and call the kernels with good arguments.
* It used the inspector-executor model :
* iterates on the tree and builds an array to work in parallel on this array
*
* Of course this class does not deallocate pointer given in arguements.
*
* Threaded & based on the inspector-executor model
* schedule(runtime) export OMP_NUM_THREADS=2
* export OMPI_CXX=`which g++-4.4`
* mpirun -np 2 valgrind --suppressions=/usr/share/openmpi/openmpi-valgrind.supp
* --tool=memcheck --leak-check=yes --show-reachable=yes --num-callers=20 --track-fds=yes
* ./Tests/testFmmAlgorithmProc ../Data/testLoaderSmall.fma.tmp
*/
template<class OctreeClass, class CellClass, class ContainerClass, class KernelClass, class LeafClass>
class FFmmAlgorithmThreadProc : protected FAssertable , public FAbstractAlgorithm {
static const int MaxSizePerCell = 1024;
static const int MaxSizePerCell = 1024;
OctreeClass* const tree; //< The octree to work on
KernelClass** kernels; //< The kernels
const FMpi::FComm& comm; //< MPI comm
typename OctreeClass::Iterator* iterArray; //
int numberOfLeafs; //< To store the size at the previous level
const int MaxThreads; //< the max number of thread allowed by openmp
const int nbProcess; //< Number of process
const int idProcess; //< Id of current process
const int OctreeHeight; //<Height of the tree
/** An interval is the morton index interval
* that a proc use (it holds data in this interval)
*/
struct Interval{
MortonIndex min;
MortonIndex max;
};
/** My interval */
Interval*const intervals;
/** All process intervals */
Interval*const workingIntervalsPerLevel;
/** Get an interval from proc id and level */
Interval& getWorkingInterval( int level, int proc){
return workingIntervalsPerLevel[OctreeHeight * proc + level];
}
OctreeClass* const tree; //< The octree to work on
KernelClass** kernels; //< The kernels
const FMpi::FComm& comm; //< MPI comm
public:
/** Get current proc interval at level */
Interval& getWorkingInterval( int level){
return getWorkingInterval(level, idProcess);
}
/** Does the current proc has some work at this level */
bool hasWorkAtLevel( int level){
return idProcess == 0 || (getWorkingInterval(level, idProcess - 1).max) < (getWorkingInterval(level, idProcess).max);
}
/** The constructor need the octree and the kernels used for computation
* @param inTree the octree to work on
* @param inKernels the kernels to call
* An assert is launched if one of the arguments is null
*/
FFmmAlgorithmThreadProc(const FMpi::FComm& inComm, OctreeClass* const inTree, KernelClass* const inKernels)
: tree(inTree) , kernels(0), comm(inComm), iterArray(nullptr),numberOfLeafs(0),
MaxThreads(omp_get_max_threads()), nbProcess(inComm.processCount()), idProcess(inComm.processId()),
OctreeHeight(tree->getHeight()),intervals(new Interval[inComm.processCount()]),
workingIntervalsPerLevel(new Interval[inComm.processCount() * tree->getHeight()]){
fassert(tree, "tree cannot be null", __LINE__, __FILE__);
this->kernels = new KernelClass*[MaxThreads];
for(int idxThread = 0 ; idxThread < MaxThreads ; ++idxThread){
this->kernels[idxThread] = new KernelClass(*inKernels);
}
typename OctreeClass::Iterator* iterArray;
int numberOfLeafs; //< To store the size at the previous level
FLOG(FLog::Controller << "FFmmAlgorithmThreadProc\n");
FLOG(FLog::Controller << "Max threads = " << MaxThreads << ", Procs = " << nbProcess << ", I am " << idProcess << ".\n");
}
/** Default destructor */
virtual ~FFmmAlgorithmThreadProc(){
for(int idxThread = 0 ; idxThread < MaxThreads ; ++idxThread){
delete this->kernels[idxThread];
}
delete [] this->kernels;
delete [] intervals;
delete [] workingIntervalsPerLevel;
}
/**
* To execute the fmm algorithm
* Call this function to run the complete algorithm
*/
void execute(const unsigned operationsToProceed = FFmmNearAndFarFields){
FTRACE( FTrace::FFunction functionTrace( __FUNCTION__, "Fmm" , __FILE__ , __LINE__ ) );
// Count leaf
this->numberOfLeafs = 0;
{
FTRACE( FTrace::FRegion regionTrace( "Preprocess" , __FUNCTION__ , __FILE__ , __LINE__) );
Interval myLastInterval;
{
typename OctreeClass::Iterator octreeIterator(tree);
octreeIterator.gotoBottomLeft();
myLastInterval.min = octreeIterator.getCurrentGlobalIndex();
do{
++this->numberOfLeafs;
} while(octreeIterator.moveRight());
myLastInterval.max = octreeIterator.getCurrentGlobalIndex();
}
iterArray = new typename OctreeClass::Iterator[numberOfLeafs];
fassert(iterArray, "iterArray bad alloc", __LINE__, __FILE__);
// We get the min/max indexes from each procs
FMpi::MpiAssert( MPI_Allgather( &myLastInterval, sizeof(Interval), MPI_BYTE, intervals, sizeof(Interval), MPI_BYTE, comm.getComm()), __LINE__ );
Interval*const myIntervals = new Interval[OctreeHeight];
myIntervals[OctreeHeight - 1] = myLastInterval;
for(int idxLevel = OctreeHeight - 2 ; idxLevel >= 0 ; --idxLevel){
myIntervals[idxLevel].min = myIntervals[idxLevel+1].min >> 3;
myIntervals[idxLevel].max = myIntervals[idxLevel+1].max >> 3;
}
if(idProcess != 0){
typename OctreeClass::Iterator octreeIterator(tree);
octreeIterator.gotoBottomLeft();
octreeIterator.moveUp();
//Da fck is dat ?!
MortonIndex currentLimit = intervals[idProcess-1].max >> 3;
for(int idxLevel = OctreeHeight - 2 ; idxLevel >= 1 ; --idxLevel){
while(octreeIterator.getCurrentGlobalIndex() <= currentLimit){
if( !octreeIterator.moveRight() ) break;
}
myIntervals[idxLevel].min = octreeIterator.getCurrentGlobalIndex();
octreeIterator.moveUp();
currentLimit >>= 3;
}
}
// We get the min/max indexes from each procs
FMpi::MpiAssert( MPI_Allgather( myIntervals, int(sizeof(Interval)) * OctreeHeight, MPI_BYTE,
workingIntervalsPerLevel, int(sizeof(Interval)) * OctreeHeight, MPI_BYTE, comm.getComm()), __LINE__ );
delete[] myIntervals;
}
const int MaxThreads; //< the max number of thread allowed by openmp
// run;
if(operationsToProceed & FFmmP2M) bottomPass();
const int nbProcess; //< Number of process
const int idProcess; //< Id of current process
if(operationsToProceed & FFmmM2M) upwardPass();
const int OctreeHeight;
if(operationsToProceed & FFmmM2L) transferPass();
/** An interval is the morton index interval
* that a proc use (it holds data in this interval)
*/
struct Interval{
MortonIndex min;
MortonIndex max;
};
/** My interval */
Interval*const intervals;
/** All process intervals */
Interval*const workingIntervalsPerLevel;
if(operationsToProceed & FFmmL2L) downardPass();
/** Get an interval from proc id and level */
Interval& getWorkingInterval( int level, int proc){
return workingIntervalsPerLevel[OctreeHeight * proc + level];
}
if((operationsToProceed & FFmmP2P) || (operationsToProceed & FFmmL2P)) directPass();
// delete array
delete [] iterArray;
iterArray = 0;
}
public:
private:
/** Get current proc interval at level */
Interval& getWorkingInterval( int level){
return getWorkingInterval(level, idProcess);
/////////////////////////////////////////////////////////////////////////////
// P2M
/////////////////////////////////////////////////////////////////////////////
/** P2M Bottom Pass */
void bottomPass(){
FTRACE( FTrace::FFunction functionTrace(__FUNCTION__, "Fmm" , __FILE__ , __LINE__) );
FLOG( FLog::Controller.write("\tStart Bottom Pass\n").write(FLog::Flush) );
FLOG(FTic counterTime);
typename OctreeClass::Iterator octreeIterator(tree);
// Iterate on leafs
octreeIterator.gotoBottomLeft();
int leafs = 0;
do{
iterArray[leafs++] = octreeIterator;
} while(octreeIterator.moveRight());
FLOG(FTic computationCounter);
#pragma omp parallel
{
KernelClass * const myThreadkernels = kernels[omp_get_thread_num()];
#pragma omp for nowait
for(int idxLeafs = 0 ; idxLeafs < leafs ; ++idxLeafs){
myThreadkernels->P2M( iterArray[idxLeafs].getCurrentCell() , iterArray[idxLeafs].getCurrentListSrc());
}
}
/** Does the current proc has some work at this level */
bool hasWorkAtLevel( int level){
return idProcess == 0 || (getWorkingInterval(level, idProcess - 1).max) < (getWorkingInterval(level, idProcess).max);
FLOG(computationCounter.tac());
FLOG( FLog::Controller << "\tFinished (@Bottom Pass (P2M) = " << counterTime.tacAndElapsed() << "s)\n" );
FLOG( FLog::Controller << "\t\t Computation : " << computationCounter.elapsed() << " s\n" );
}
/////////////////////////////////////////////////////////////////////////////
// Upward
/////////////////////////////////////////////////////////////////////////////
/** M2M */
void upwardPass(){
FTRACE( FTrace::FFunction functionTrace(__FUNCTION__, "Fmm" , __FILE__ , __LINE__) );
FLOG( FLog::Controller.write("\tStart Upward Pass\n").write(FLog::Flush); );
FLOG(FTic counterTime);
FLOG(FTic computationCounter);
FLOG(FTic prepareCounter);
FLOG(FTic waitCounter);
// Start from leal level - 1
typename OctreeClass::Iterator octreeIterator(tree);
octreeIterator.gotoBottomLeft();
octreeIterator.moveUp();
typename OctreeClass::Iterator avoidGotoLeftIterator(octreeIterator);
// This variable is the proc responsible
// of the shared cells
int sendToProc = idProcess;
// There are a maximum of 8-1 sends and 8-1 receptions
MPI_Request requests[14];
MPI_Status status[14];
// Maximum data per message is:
FBufferWriter sendBuffer;
const int recvBufferOffset = 8 * MaxSizePerCell + 1;
FBufferReader recvBuffer(nbProcess * recvBufferOffset);
CellClass recvBufferCells[8];
int firstProcThatSend = idProcess + 1;
// for each levels
for(int idxLevel = OctreeHeight - 2 ; idxLevel > 1 ; --idxLevel ){
// No more work for me
if(idProcess != 0
&& getWorkingInterval((idxLevel+1), idProcess).max <= getWorkingInterval((idxLevel+1), idProcess - 1).max){
break;
}
// copy cells to work with
int numberOfCells = 0;
// for each cells
do{
iterArray[numberOfCells++] = octreeIterator;
} while(octreeIterator.moveRight());
avoidGotoLeftIterator.moveUp();
octreeIterator = avoidGotoLeftIterator;
// We may need to send something
int iterRequests = 0;
int cellsToSend = -1;
while(iterArray[cellsToSend+1].getCurrentGlobalIndex() < getWorkingInterval(idxLevel, idProcess).min){
++cellsToSend;
}
FTRACE( FTrace::FRegion regionTrace( "Preprocess" , __FUNCTION__ , __FILE__ , __LINE__) );
FLOG(prepareCounter.tic());
if(idProcess != 0
&& (getWorkingInterval((idxLevel+1), idProcess).min >>3) <= (getWorkingInterval((idxLevel+1), idProcess - 1).max >>3)){
char state = 0;
sendBuffer.write(char(0));
const CellClass* const* const child = iterArray[cellsToSend].getCurrentChild();
for(int idxChild = 0 ; idxChild < 8 ; ++idxChild){
if( child[idxChild] && getWorkingInterval((idxLevel+1), idProcess).min <= child[idxChild]->getMortonIndex() ){
child[idxChild]->serializeUp(sendBuffer);
state = char(state | (0x1 << idxChild));
}
}
sendBuffer.writeAt(0,state);
while( sendToProc && iterArray[cellsToSend].getCurrentGlobalIndex() == getWorkingInterval(idxLevel , sendToProc - 1).max){
--sendToProc;
}
MPI_Isend(sendBuffer.data(), sendBuffer.getSize(), MPI_BYTE, sendToProc, FMpi::TagFmmM2M, comm.getComm(), &requests[iterRequests++]);
}
// We may need to receive something
bool hasToReceive = false;
int endProcThatSend = firstProcThatSend;
if(idProcess != nbProcess - 1){
while(firstProcThatSend < nbProcess
&& (getWorkingInterval((idxLevel+1), firstProcThatSend).max) < (getWorkingInterval((idxLevel+1), idProcess).max)){
++firstProcThatSend;
}
if(firstProcThatSend < nbProcess &&
(getWorkingInterval((idxLevel+1), firstProcThatSend).min >>3) <= (getWorkingInterval((idxLevel+1) , idProcess).max>>3) ){
endProcThatSend = firstProcThatSend;
while( endProcThatSend < nbProcess &&
(getWorkingInterval((idxLevel+1) ,endProcThatSend).min >>3) <= (getWorkingInterval((idxLevel+1) , idProcess).max>>3)){
++endProcThatSend;
}
if(firstProcThatSend != endProcThatSend){
hasToReceive = true;
for(int idxProc = firstProcThatSend ; idxProc < endProcThatSend ; ++idxProc ){
MPI_Irecv(&recvBuffer.data()[idxProc * recvBufferOffset], recvBufferOffset, MPI_BYTE,
idxProc, FMpi::TagFmmM2M, comm.getComm(), &requests[iterRequests++]);
}
}
}
}
FLOG(prepareCounter.tac());
FTRACE( regionTrace.end() );
// Compute
const int endIndex = (hasToReceive?numberOfCells-1:numberOfCells);
FLOG(computationCounter.tic());
#pragma omp parallel
{
KernelClass& myThreadkernels = (*kernels[omp_get_thread_num()]);
#pragma omp for nowait
for( int idxCell = cellsToSend + 1 ; idxCell < endIndex ; ++idxCell){
myThreadkernels.M2M( iterArray[idxCell].getCurrentCell() , iterArray[idxCell].getCurrentChild(), idxLevel);
}
}
FLOG(computationCounter.tac());
// Are we sending or waiting anything?
if(iterRequests){
FLOG(waitCounter.tic());
MPI_Waitall( iterRequests, requests, status);
FLOG(waitCounter.tac());
// we were receiving data
if( hasToReceive ){
CellClass* currentChild[8];
memcpy(currentChild, iterArray[numberOfCells - 1].getCurrentChild(), 8 * sizeof(CellClass*));
// retreive data and merge my child and the child from others
for(int idxProc = firstProcThatSend ; idxProc < endProcThatSend ; ++idxProc){
recvBuffer.seek(idxProc * recvBufferOffset);
int state = int(recvBuffer.getValue<char>());
int position = 0;
while( state && position < 8){
while(!(state & 0x1)){
state >>= 1;
++position;
}
fassert(!currentChild[position], "Already has a cell here", __LINE__, __FILE__);
recvBufferCells[position].deserializeUp(recvBuffer);
currentChild[position] = (CellClass*) &recvBufferCells[position];
state >>= 1;
++position;
}
}
// Finally compute
FLOG(computationCounter.tic());
(*kernels[0]).M2M( iterArray[numberOfCells - 1].getCurrentCell() , currentChild, idxLevel);
FLOG(computationCounter.tac());
firstProcThatSend = endProcThatSend - 1;
}
}
sendBuffer.reset();
recvBuffer.seek(0);
}
/** The constructor need the octree and the kernels used for computation
* @param inTree the octree to work on
* @param inKernels the kernels to call
* An assert is launched if one of the arguments is null
*/
FFmmAlgorithmThreadProc(const FMpi::FComm& inComm, OctreeClass* const inTree, KernelClass* const inKernels)
: tree(inTree) , kernels(0), comm(inComm), iterArray(nullptr),numberOfLeafs(0),
MaxThreads(omp_get_max_threads()), nbProcess(inComm.processCount()), idProcess(inComm.processId()),
OctreeHeight(tree->getHeight()),intervals(new Interval[inComm.processCount()]),
workingIntervalsPerLevel(new Interval[inComm.processCount() * tree->getHeight()]){
fassert(tree, "tree cannot be null", __LINE__, __FILE__);
this->kernels = new KernelClass*[MaxThreads];
for(int idxThread = 0 ; idxThread < MaxThreads ; ++idxThread){
this->kernels[idxThread] = new KernelClass(*inKernels);
}
FLOG(FLog::Controller << "FFmmAlgorithmThreadProc\n");
FLOG(FLog::Controller << "Max threads = " << MaxThreads << ", Procs = " << nbProcess << ", I am " << idProcess << ".\n");
}
/** Default destructor */
virtual ~FFmmAlgorithmThreadProc(){
for(int idxThread = 0 ; idxThread < MaxThreads ; ++idxThread){
delete this->kernels[idxThread];
}
delete [] this->kernels;
delete [] intervals;
delete [] workingIntervalsPerLevel;
}
/**
* To execute the fmm algorithm
* Call this function to run the complete algorithm
*/
void execute(const unsigned operationsToProceed = FFmmNearAndFarFields){
FTRACE( FTrace::FFunction functionTrace( __FUNCTION__, "Fmm" , __FILE__ , __LINE__ ) );
// Count leaf
this->numberOfLeafs = 0;
{
FTRACE( FTrace::FRegion regionTrace( "Preprocess" , __FUNCTION__ , __FILE__ , __LINE__) );
Interval myLastInterval;
{
typename OctreeClass::Iterator octreeIterator(tree);
octreeIterator.gotoBottomLeft();
myLastInterval.min = octreeIterator.getCurrentGlobalIndex();
do{
++this->numberOfLeafs;
} while(octreeIterator.moveRight());
myLastInterval.max = octreeIterator.getCurrentGlobalIndex();
}
iterArray = new typename OctreeClass::Iterator[numberOfLeafs];
fassert(iterArray, "iterArray bad alloc", __LINE__, __FILE__);
// We get the min/max indexes from each procs
FMpi::MpiAssert( MPI_Allgather( &myLastInterval, sizeof(Interval), MPI_BYTE, intervals, sizeof(Interval), MPI_BYTE, comm.getComm()), __LINE__ );
Interval*const myIntervals = new Interval[OctreeHeight];
myIntervals[OctreeHeight - 1] = myLastInterval;
for(int idxLevel = OctreeHeight - 2 ; idxLevel >= 0 ; --idxLevel){
myIntervals[idxLevel].min = myIntervals[idxLevel+1].min >> 3;
myIntervals[idxLevel].max = myIntervals[idxLevel+1].max >> 3;
}
if(idProcess != 0){
typename OctreeClass::Iterator octreeIterator(tree);
octreeIterator.gotoBottomLeft();
octreeIterator.moveUp();
MortonIndex currentLimit = intervals[idProcess-1].max >> 3;
for(int idxLevel = OctreeHeight - 2 ; idxLevel >= 1 ; --idxLevel){
while(octreeIterator.getCurrentGlobalIndex() <= currentLimit){
if( !octreeIterator.moveRight() ) break;
}
myIntervals[idxLevel].min = octreeIterator.getCurrentGlobalIndex();
octreeIterator.moveUp();
currentLimit >>= 3;
}
}
// We get the min/max indexes from each procs
FMpi::MpiAssert( MPI_Allgather( myIntervals, int(sizeof(Interval)) * OctreeHeight, MPI_BYTE,
workingIntervalsPerLevel, int(sizeof(Interval)) * OctreeHeight, MPI_BYTE, comm.getComm()), __LINE__ );
delete[] myIntervals;
}
// run;
if(operationsToProceed & FFmmP2M) bottomPass();
if(operationsToProceed & FFmmM2M) upwardPass();
if(operationsToProceed & FFmmM2L) transferPass();
if(operationsToProceed & FFmmL2L) downardPass();
if((operationsToProceed & FFmmP2P) || (operationsToProceed & FFmmL2P)) directPass();
// delete array
delete [] iterArray;
iterArray = 0;
FLOG( FLog::Controller << "\tFinished (@Upward Pass (M2M) = " << counterTime.tacAndElapsed() << "s)\n" );
FLOG( FLog::Controller << "\t\t Computation : " << computationCounter.cumulated() << " s\n" );
FLOG( FLog::Controller << "\t\t Prepare : " << prepareCounter.cumulated() << " s\n" );
FLOG( FLog::Controller << "\t\t Wait : " << waitCounter.cumulated() << " s\n" );
}
/////////////////////////////////////////////////////////////////////////////
// Downard
/////////////////////////////////////////////////////////////////////////////
/** M2L */
void transferPass(){
FTRACE( FTrace::FFunction functionTrace(__FUNCTION__, "Fmm" , __FILE__ , __LINE__) );
FLOG( FLog::Controller.write("\tStart Downward Pass (M2L)\n").write(FLog::Flush); );
FLOG(FTic counterTime);
FLOG(FTic computationCounter);
FLOG(FTic sendCounter);
FLOG(FTic receiveCounter);
FLOG(FTic prepareCounter);
FLOG(FTic gatherCounter);
//////////////////////////////////////////////////////////////////
// First know what to send to who
//////////////////////////////////////////////////////////////////
// pointer to send
FVector<typename OctreeClass::Iterator> toSend[nbProcess * OctreeHeight];
// index
int*const indexToSend = new int[nbProcess * OctreeHeight];
memset(indexToSend, 0, sizeof(int) * nbProcess * OctreeHeight);
// To know which one has need someone
FBoolArray** const leafsNeedOther = new FBoolArray*[OctreeHeight];
memset(leafsNeedOther, 0, sizeof(FBoolArray*) * OctreeHeight);
{
FTRACE( FTrace::FRegion regionTrace( "Preprocess" , __FUNCTION__ , __FILE__ , __LINE__) );
FLOG(prepareCounter.tic());
// To know if a leaf has been already sent to a proc
bool*const alreadySent = new bool[nbProcess];
memset(alreadySent, 0, sizeof(bool) * nbProcess);
typename OctreeClass::Iterator octreeIterator(tree);
octreeIterator.moveDown();