Commit 71d3cc19 authored by BLANCHARD Pierre's avatar BLANCHARD Pierre
Browse files
parents 0259b16a f0494086
......@@ -29,8 +29,8 @@ if(SCALFMM_ADDON_CKERNELAPI)
# Adding the entire project dir as an include dir
INCLUDE_DIRECTORIES(
${CMAKE_BINARY_DIR}/Src
${CMAKE_SOURCE_DIR}/Src
${SCALFMM_BINARY_DIR}/Src
${SCALFMM_SOURCE_DIR}/Src
${SCALFMM_INCLUDES}
)
......@@ -44,7 +44,7 @@ if(SCALFMM_ADDON_CKERNELAPI)
INSTALL( FILES ${hpp_in_dir} DESTINATION include/ScalFmm/CKernelApi )
file( GLOB_RECURSE source_tests_files Tests/*.c )
INCLUDE_DIRECTORIES( ${CMAKE_BINARY_DIR}/Src )
INCLUDE_DIRECTORIES( ${SCALFMM_BINARY_DIR}/Src )
# Then build test files
foreach(exec ${source_tests_files})
......
......@@ -31,8 +31,8 @@ if(SCALFMM_ADDON_FMMAPI)
# Adding the entire project dir as an include dir
INCLUDE_DIRECTORIES(
${CMAKE_BINARY_DIR}/Src
${CMAKE_SOURCE_DIR}/Src
${SCALFMM_BINARY_DIR}/Src
${SCALFMM_SOURCE_DIR}/Src
${SCALFMM_INCLUDES}
)
......@@ -46,7 +46,7 @@ if(SCALFMM_ADDON_FMMAPI)
INSTALL( FILES ${hpp_in_dir} DESTINATION include/ScalFmm/FmmApi )
file( GLOB_RECURSE source_tests_files Tests/*.cpp )
INCLUDE_DIRECTORIES( ${CMAKE_BINARY_DIR}/Src )
INCLUDE_DIRECTORIES( ${SCALFMM_BINARY_DIR}/Src )
# Then build test files
foreach(exec ${source_tests_files})
......
......@@ -43,8 +43,8 @@ if(SCALFMM_ADDON_HMAT)
# Adding the entire project dir as an include dir
INCLUDE_DIRECTORIES(
${CMAKE_BINARY_DIR}/Src
${CMAKE_SOURCE_DIR}/Src
${SCALFMM_BINARY_DIR}/Src
${SCALFMM_SOURCE_DIR}/Src
${SCALFMM_INCLUDES}
)
......@@ -65,7 +65,7 @@ if(SCALFMM_ADDON_HMAT)
install( TARGETS cclusteringlib ARCHIVE DESTINATION lib )
file( GLOB_RECURSE source_tests_files Tests/*.cpp )
INCLUDE_DIRECTORIES( ${CMAKE_BINARY_DIR}/Src )
INCLUDE_DIRECTORIES( ${SCALFMM_BINARY_DIR}/Src )
# Then build test files
SET(hmat_list_execs "")
......
......@@ -93,6 +93,7 @@ if (MORSE_DISTRIB_DIR OR EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/CMakeModules/morse/
# OPENMP 4/5 support
option( OPENMP_SUPPORT_COMMUTE "Set to ON to let tasks commute (KSTAR/StarPU compiler only)" OFF )
option( OPENMP_SUPPORT_PRIORITY "Set to ON to enable tasks priority (KSTAR/StarPU compiler only)" OFF )
option( OPENMP_SUPPORT_TASK_NAME "Set to ON to enable a taskname clause for tasks (KSTAR/StarPU compiler only)" OFF )
option( SCALFMM_DISABLE_NATIVE_OMP4 "Set to ON to disable the gcc/intel omp4" OFF )
option( SCALFMM_TIME_OMPTASKS "Set to ON to time omp4 tasks and generate output file" OFF )
# STARPU options
......
This diff is collapsed.
......@@ -17,8 +17,8 @@ file(
# Adding the project sources dir as an include dir
INCLUDE_DIRECTORIES(
${CMAKE_BINARY_DIR}/Src
${CMAKE_SOURCE_DIR}/Src
${SCALFMM_BINARY_DIR}/Src
${SCALFMM_SOURCE_DIR}/Src
${SCALFMM_INCLUDES}
)
......
......@@ -109,5 +109,5 @@ FOREACH(my_dir ${my_include_dirs})
INSTALL( FILES ${hpp_in_dir} DESTINATION include/${my_dir} )
ENDFOREACH()
INSTALL( FILES "${CMAKE_BINARY_DIR}/Src/ScalFmmConfig.h" DESTINATION include/Utils/${my_dir} )
INSTALL( FILES "${SCALFMM_BINARY_DIR}/Src/ScalFmmConfig.h" DESTINATION include/Utils/${my_dir} )
......@@ -21,34 +21,27 @@
#include "FAbstractBuffer.hpp"
#include "../Utils/FAssert.hpp"
/** @author Cyrille Piacibello
* This class provide the same features as FBufferWriter using MPI_Pack system
/** @author Cyrille Piacibello, Berenger Bramas
* This class provide the same features as FBufferWriter
*
* Put some data
* then insert back if needed
* finally use data pointer as you like
*/
class FMpiBufferReader : public FAbstractBufferReader {
MPI_Comm comm; //< Communicator needed by MPI_Pack functions
FSize arrayCapacity; //< Allocated space
std::unique_ptr<char[]> array; //< Allocated Array
FSize currentIndex;
public :
/*Constructor with a default arrayCapacity of 512 bytes */
explicit FMpiBufferReader(const MPI_Comm inComm = MPI_COMM_WORLD, const FSize inDefaultCapacity = 512):
comm(inComm),
explicit FMpiBufferReader(const FSize inDefaultCapacity = 512):
arrayCapacity(inDefaultCapacity),
array(new char[inDefaultCapacity]),
currentIndex(0){
FAssertLF(array, "Cannot allocate array");
}
/** Change the comm (or to set it later) */
void setComm(const MPI_Comm inComm){
comm = inComm;
}
/** To change the capacity (but reset the head to 0) */
void cleanAndResize(const FSize newCapacity){
if(newCapacity != arrayCapacity){
......@@ -97,50 +90,34 @@ public :
/** Get a value with memory cast */
template <class ClassType>
ClassType getValue(){
FAssertLF(arrayCapacity < std::numeric_limits<int>::max());
FAssertLF(currentIndex < std::numeric_limits<int>::max());
int previousIndex = int(currentIndex);
FAssertLF(currentIndex + FSize(sizeof(ClassType)) <= arrayCapacity );
ClassType value;
FMpi::Assert(MPI_Unpack(array.get(),int(arrayCapacity),&previousIndex,&value,FMpi::GetTypeCount(value),FMpi::GetType(value),comm), __LINE__);
seek(FSize(sizeof(value) + currentIndex));
FAssertLF(previousIndex == currentIndex);
memcpy(&value, &array[currentIndex], sizeof(ClassType));
currentIndex += sizeof(ClassType);
return value;
}
/** Get a value with memory cast at a specified index */
template <class ClassType>
ClassType getValue(const FSize ind){
ClassType value;
FAssertLF(arrayCapacity < std::numeric_limits<int>::max());
FAssertLF(ind < std::numeric_limits<int>::max());
int previousIndex = int(ind);
FMpi::Assert(MPI_Unpack(array.get(),int(arrayCapacity),&previousIndex,&value,FMpi::GetTypeCount(value),FMpi::GetType(value),comm), __LINE__);
seek(FSize(sizeof(value)+ind));
FAssertLF(previousIndex == currentIndex);
return value;
currentIndex = ind;
return getValue<ClassType>();
}
/** Fill a value with memory cast */
template <class ClassType>
void fillValue(ClassType* const inValue){
FAssertLF(arrayCapacity < std::numeric_limits<int>::max());
FAssertLF(currentIndex < std::numeric_limits<int>::max());
int previousIndex = int(currentIndex);
FMpi::Assert(MPI_Unpack(array.get(),int(arrayCapacity),&previousIndex,inValue,FMpi::GetTypeCount(*inValue),FMpi::GetType(*inValue),comm), __LINE__);
seek(FSize(sizeof(ClassType) + currentIndex));
FAssertLF(previousIndex == currentIndex);
FAssertLF(currentIndex + FSize(sizeof(ClassType)) <= arrayCapacity );
memcpy(inValue, &array[currentIndex], sizeof(ClassType));
currentIndex += sizeof(ClassType);
}
/** Fill one/many value(s) with memcpy */
template <class ClassType>
void fillArray(ClassType* const inArray, const FSize inSize){
FAssertLF(arrayCapacity < std::numeric_limits<int>::max());
FAssertLF(currentIndex < std::numeric_limits<int>::max());
FAssertLF(inSize < std::numeric_limits<int>::max());
int previousIndex = int(currentIndex);
FMpi::Assert(MPI_Unpack(array.get(),int(arrayCapacity),&previousIndex,inArray,int(inSize)*FMpi::GetTypeCount(*inArray),FMpi::GetType(*inArray),comm), __LINE__);
seek(FSize(sizeof(ClassType) * inSize + currentIndex));
FAssertLF(previousIndex == currentIndex);
FAssertLF(currentIndex + FSize(sizeof(ClassType))*inSize <= arrayCapacity );
memcpy(inArray, &array[currentIndex], sizeof(ClassType)*inSize);
currentIndex += sizeof(ClassType)*inSize;
}
/** Same as fillValue */
......
......@@ -21,22 +21,21 @@
#include "FAbstractBuffer.hpp"
#include "../Utils/FAssert.hpp"
/** @author Cyrille Piacibello
* This class provide the same features as FBufferWriter using MPI_Pack system
/** @author Cyrille Piacibello, Berenger Bramas
* This class provide the same features as FBufferWriter
*
* Put some data
* then insert back if needed
* finally use data pointer as you like
*/
class FMpiBufferWriter : public FAbstractBufferWriter {
MPI_Comm mpiComm; //< Communicator needed by MPI_Pack functions
FSize arrayCapacity; //< Allocated Space
std::unique_ptr<char[]> array; //< Allocated Array
FSize currentIndex; //< Currently filled space
/** Test and exit if not enought space */
void expandIfNeeded(const size_t requestedSpace) {
if( arrayCapacity < FSize(currentIndex + requestedSpace) ){
void expandIfNeeded(const FSize requestedSpace) {
if( arrayCapacity < currentIndex + requestedSpace){
arrayCapacity = FSize(double(currentIndex + requestedSpace + 1) * 1.5);
char* arrayTmp = new char[arrayCapacity];
memcpy(arrayTmp, array.get(), sizeof(char)*currentIndex);
......@@ -46,19 +45,13 @@ class FMpiBufferWriter : public FAbstractBufferWriter {
public:
/** Constructor with a default arrayCapacity of 512 bytes */
explicit FMpiBufferWriter(const MPI_Comm inComm, const FSize inDefaultCapacity = 1024):
mpiComm(inComm),
explicit FMpiBufferWriter(const FSize inDefaultCapacity = 1024):
arrayCapacity(inDefaultCapacity),
array(new char[inDefaultCapacity]),
currentIndex(0)
{}
/** Change the comm (or to set it later) */
void setComm(const MPI_Comm inComm){
mpiComm = inComm;
}
/** To change the capacity (but reset the head to 0 if size if lower) */
void resize(const FSize newCapacity){
if(newCapacity != arrayCapacity){
......@@ -98,10 +91,8 @@ public:
template <class ClassType>
void write(const ClassType& object){
expandIfNeeded(sizeof(ClassType));
FAssertLF(currentIndex < std::numeric_limits<int>::max());
int intCurrentIndex = int(currentIndex);
FMpi::Assert(MPI_Pack(const_cast<ClassType*>(&object), FMpi::GetTypeCount(object), FMpi::GetType(object), array.get(), int(arrayCapacity), &intCurrentIndex, mpiComm), __LINE__);
currentIndex = intCurrentIndex;
memcpy(&array[currentIndex], &object, sizeof(ClassType));
currentIndex += sizeof(ClassType);
}
/**
......@@ -110,20 +101,15 @@ public:
template <class ClassType>
void write(const ClassType&& object){
expandIfNeeded(sizeof(ClassType));
FAssertLF(arrayCapacity < std::numeric_limits<int>::max());
int intCurrentIndex = int(currentIndex);
FMpi::Assert(MPI_Pack(const_cast<ClassType*>(&object), FMpi::GetTypeCount(object), FMpi::GetType(object), array.get(), int(arrayCapacity), &intCurrentIndex, mpiComm), __LINE__);
currentIndex = intCurrentIndex;
memcpy(&array[currentIndex], &object, sizeof(ClassType));
currentIndex += sizeof(ClassType);
}
/** Write back, position + sizeof(object) has to be < size */
template <class ClassType>
void writeAt(const FSize position, const ClassType& object){
FAssertLF(FSize(position + sizeof(ClassType)) <= currentIndex);
FAssertLF(arrayCapacity < std::numeric_limits<int>::max());
FAssertLF(position < std::numeric_limits<int>::max());
int noConstPosition = int(position);
FMpi::Assert(MPI_Pack(const_cast<ClassType*>(&object), FMpi::GetTypeCount(object), FMpi::GetType(object), array.get(), int(arrayCapacity), &noConstPosition, mpiComm), __LINE__);
FAssertLF(position+FSize(sizeof(ClassType)) <= currentIndex);
memcpy(&array[position], &object, sizeof(ClassType));
}
/** Write an array
......@@ -132,11 +118,8 @@ public:
template <class ClassType>
void write(const ClassType* const objects, const FSize inSize){
expandIfNeeded(sizeof(ClassType) * inSize);
FAssertLF(arrayCapacity < std::numeric_limits<int>::max());
FAssertLF(inSize < std::numeric_limits<int>::max());
int intCurrentIndex = int(currentIndex);
FMpi::Assert(MPI_Pack( const_cast<ClassType*>(objects), int(inSize)*FMpi::GetTypeCount(*objects), FMpi::GetType(*objects), array.get(), int(arrayCapacity), &intCurrentIndex, mpiComm), __LINE__);
currentIndex = intCurrentIndex;
memcpy(&array[currentIndex], objects, sizeof(ClassType)*inSize);
currentIndex += sizeof(ClassType)*inSize;
}
/** Equivalent to write */
......
......@@ -363,6 +363,7 @@ protected:
FLOG(computationCounter.tac());
FLOG( FLog::Controller << "\tFinished (@Bottom Pass (P2M) = " << counterTime.tacAndElapsed() << " s)\n" );
FLOG( FLog::Controller << "\t\t Computation : " << computationCounter.elapsed() << " s\n" );
FLOG( FLog::Controller.flush());
}
/////////////////////////////////////////////////////////////////////////////
......@@ -400,7 +401,7 @@ protected:
MPI_Status statusSize[8];
FSize bufferSize;
FMpiBufferWriter sendBuffer(comm.getComm(), 1);// Max = 1 + sizeof(cell)*7
FMpiBufferWriter sendBuffer(1);// Max = 1 + sizeof(cell)*7
std::unique_ptr<FMpiBufferReader[]> recvBuffer(new FMpiBufferReader[7]);
FSize recvBufferSize[7];
CellClass recvBufferCells[7];
......@@ -491,7 +492,7 @@ protected:
MPI_Isend(&bufferSize, 1, FMpi::GetType(bufferSize), currentProcIdToSendTo,
FMpi::TagFmmM2MSize + idxLevel, comm.getComm(), &requestsSize[iterMpiRequestsSize++]);
FAssertLF(sendBuffer.getSize() < std::numeric_limits<int>::max());
MPI_Isend(sendBuffer.data(), int(sendBuffer.getSize()), MPI_PACKED, currentProcIdToSendTo,
MPI_Isend(sendBuffer.data(), int(sendBuffer.getSize()), MPI_BYTE, currentProcIdToSendTo,
FMpi::TagFmmM2M + idxLevel, comm.getComm(), &requests[iterMpiRequests++]);
}
}
......@@ -532,7 +533,7 @@ protected:
if(procHasWorkAtLevel(idxLevel+1, idProcSource) && procCoversMyRightBorderCell(idxLevel, idProcSource)){
recvBuffer[nbProcThatSendToMe].cleanAndResize(recvBufferSize[nbProcThatSendToMe]);
FAssertLF(recvBufferSize[nbProcThatSendToMe] < std::numeric_limits<int>::max());
MPI_Irecv(recvBuffer[nbProcThatSendToMe].data(), int(recvBufferSize[nbProcThatSendToMe]), MPI_PACKED,
MPI_Irecv(recvBuffer[nbProcThatSendToMe].data(), int(recvBufferSize[nbProcThatSendToMe]), MPI_BYTE,
idProcSource, FMpi::TagFmmM2M + idxLevel, comm.getComm(), &requests[iterMpiRequests++]);
nbProcThatSendToMe += 1;
FAssertLF(nbProcThatSendToMe <= 7);
......@@ -556,7 +557,7 @@ protected:
// Retreive data and merge my child and the child from others
for(int idxProc = 0 ; idxProc < nbProcThatSendToMe ; ++idxProc){
int packageFlags = int(recvBuffer[idxProc].getValue<char>());
unsigned packageFlags = unsigned(recvBuffer[idxProc].getValue<unsigned char>());
int position = 0;
int positionToInsert = 0;
......@@ -602,6 +603,7 @@ protected:
FLOG( FLog::Controller << "\t\t Computation : " << computationCounter.elapsed() << " s\n" );
FLOG( FLog::Controller << "\t\t Single : " << singleCounter.cumulated() << " s\n" );
FLOG( FLog::Controller << "\t\t Parallel : " << parallelCounter.cumulated() << " s\n" );
FLOG( FLog::Controller.flush());
}
/////////////////////////////////////////////////////////////////////////////
......@@ -754,15 +756,14 @@ protected:
FLOG(sendCounter.tic());
// Then they can send and receive (because they know what they will receive)
// To send in asynchrone way
MPI_Request*const requests = new MPI_Request[2 * nbProcess * OctreeHeight];
MPI_Status*const status = new MPI_Status[2 * nbProcess * OctreeHeight];
int iterRequest = 0;
std::vector<MPI_Request> requests;
requests.reserve(2 * nbProcess * OctreeHeight);
for(int idxLevel = 2 ; idxLevel < OctreeHeight ; ++idxLevel ){
for(int idxProc = 0 ; idxProc < nbProcess ; ++idxProc){
const long long int toSendAtProcAtLevel = indexToSend[idxLevel * nbProcess + idxProc];
if(toSendAtProcAtLevel != 0){
sendBuffer[idxLevel * nbProcess + idxProc] = new FMpiBufferWriter(comm.getComm(),int(toSendAtProcAtLevel));
sendBuffer[idxLevel * nbProcess + idxProc] = new FMpiBufferWriter(toSendAtProcAtLevel);
sendBuffer[idxLevel * nbProcess + idxProc]->write(int(toSend[idxLevel * nbProcess + idxProc].getSize()));
......@@ -776,20 +777,18 @@ protected:
FAssertLF(sendBuffer[idxLevel * nbProcess + idxProc]->getSize() == toSendAtProcAtLevel);
FAssertLF(sendBuffer[idxLevel * nbProcess + idxProc]->getSize() < std::numeric_limits<int>::max());
FMpi::MpiAssert( MPI_Isend( sendBuffer[idxLevel * nbProcess + idxProc]->data(),
int(sendBuffer[idxLevel * nbProcess + idxProc]->getSize()),MPI_PACKED, idxProc,
FMpi::TagLast + idxLevel, comm.getComm(), &requests[iterRequest++]) , __LINE__ );
FMpi::ISendSplit(sendBuffer[idxLevel * nbProcess + idxProc]->data(),
sendBuffer[idxLevel * nbProcess + idxProc]->getSize(), idxProc,
FMpi::TagLast + idxLevel*100, comm, &requests);
}
const long long int toReceiveFromProcAtLevel = globalReceiveMap[(idxProc * nbProcess * OctreeHeight) + idxLevel * nbProcess + idProcess];
if(toReceiveFromProcAtLevel){
recvBuffer[idxLevel * nbProcess + idxProc] = new FMpiBufferReader(comm.getComm(),int(toReceiveFromProcAtLevel));
recvBuffer[idxLevel * nbProcess + idxProc] = new FMpiBufferReader(toReceiveFromProcAtLevel);
FAssertLF(recvBuffer[idxLevel * nbProcess + idxProc]->getCapacity() < std::numeric_limits<int>::max());
FMpi::MpiAssert( MPI_Irecv(recvBuffer[idxLevel * nbProcess + idxProc]->data(),
int(recvBuffer[idxLevel * nbProcess + idxProc]->getCapacity()), MPI_PACKED,idxProc,
FMpi::TagLast + idxLevel, comm.getComm(), &requests[iterRequest++]) , __LINE__ );
FMpi::IRecvSplit(recvBuffer[idxLevel * nbProcess + idxProc]->data(),
recvBuffer[idxLevel * nbProcess + idxProc]->getCapacity(), idxProc,
FMpi::TagLast + idxLevel*100, comm, &requests);
}
}
}
......@@ -799,10 +798,7 @@ protected:
//////////////////////////////////////////////////////////////////
// Wait to receive every things (and send every things)
FMpi::MpiAssert(MPI_Waitall(iterRequest, requests, status), __LINE__);
delete[] requests;
delete[] status;
FMpi::MpiAssert(MPI_Waitall(int(requests.size()), requests.data(), MPI_STATUS_IGNORE), __LINE__);
FLOG(sendCounter.tac());
}//End of Master region
......@@ -1009,6 +1005,7 @@ protected:
FLOG( FLog::Controller << "\t\t Receive : " << receiveCounter.cumulated() << " s\n" );
FLOG( FLog::Controller << "\t\t Gather : " << gatherCounter.cumulated() << " s\n" );
FLOG( FLog::Controller << "\t\t Prepare : " << prepareCounter.cumulated() << " s\n" );
FLOG( FLog::Controller.flush());
}
......@@ -1039,8 +1036,8 @@ protected:
const int heightMinusOne = FAbstractAlgorithm::lowerWorkingLevel - 1;
FMpiBufferWriter sendBuffer(comm.getComm());
FMpiBufferReader recvBuffer(comm.getComm());
FMpiBufferWriter sendBuffer;
FMpiBufferReader recvBuffer;
int righestProcToSendTo = nbProcess - 1;
......@@ -1116,7 +1113,7 @@ protected:
FMpi::MpiAssert( MPI_Isend(&sendBufferSize, 1, FMpi::GetType(sendBufferSize), idxProcSend,
FMpi::TagFmmL2LSize + idxLevel, comm.getComm(), &requestsSize[iterRequestsSize++]), __LINE__);
FAssertLF(sendBuffer.getSize() < std::numeric_limits<int>::max());
FMpi::MpiAssert( MPI_Isend(sendBuffer.data(), int(sendBuffer.getSize()), MPI_PACKED, idxProcSend,
FMpi::MpiAssert( MPI_Isend(sendBuffer.data(), int(sendBuffer.getSize()), MPI_BYTE, idxProcSend,
FMpi::TagFmmL2L + idxLevel, comm.getComm(), &requests[iterRequests++]), __LINE__);
// Inc and check the counter
nbMessageSent += 1;
......@@ -1139,7 +1136,7 @@ protected:
if(hasToReceive){
recvBuffer.cleanAndResize(recvBufferSize);
FAssertLF(recvBuffer.getCapacity() < std::numeric_limits<int>::max());
FMpi::MpiAssert( MPI_Irecv( recvBuffer.data(), int(recvBuffer.getCapacity()), MPI_PACKED, idxProcToReceive,
FMpi::MpiAssert( MPI_Irecv( recvBuffer.data(), int(recvBuffer.getCapacity()), MPI_BYTE, idxProcToReceive,
FMpi::TagFmmL2L + idxLevel, comm.getComm(), &requests[iterRequests++]), __LINE__ );
}
......@@ -1184,6 +1181,7 @@ protected:
FLOG( FLog::Controller << "\t\t Computation : " << computationCounter.cumulated() << " s\n" );
FLOG( FLog::Controller << "\t\t Prepare : " << prepareCounter.cumulated() << " s\n" );
FLOG( FLog::Controller << "\t\t Wait : " << waitCounter.cumulated() << " s\n" );
FLOG( FLog::Controller.flush());
}
......@@ -1213,25 +1211,6 @@ protected:
///////////////////////////////////////////////////
FLOG(prepareCounter.tic());
// To send in asynchrone way
MPI_Request requests[2 * nbProcess];
MPI_Status status[2 * nbProcess];
int iterRequest = 0;
int nbMessagesToRecv = 0;
FMpiBufferWriter**const sendBuffer = new FMpiBufferWriter*[nbProcess];
memset(sendBuffer, 0, sizeof(FMpiBufferWriter*) * nbProcess);
FMpiBufferReader**const recvBuffer = new FMpiBufferReader*[nbProcess];
memset(recvBuffer, 0, sizeof(FMpiBufferReader*) * nbProcess);
/* This a nbProcess x nbProcess matrix of integer
* let U and V be id of processes :
* globalReceiveMap[U*nbProcess + V] == size of information needed by V and own by U
*/
FSize*const globalReceiveMap = new FSize[nbProcess * nbProcess];
memset(globalReceiveMap, 0, sizeof(FSize) * nbProcess * nbProcess);
FBoolArray leafsNeedOther(this->numberOfLeafs);
int countNeedOther = 0;
......@@ -1320,28 +1299,43 @@ protected:
#pragma omp master // nowait
if(p2pEnabled){
/* This a nbProcess x nbProcess matrix of integer
* let U and V be id of processes :
* globalReceiveMap[U*nbProcess + V] == size of information needed by V and own by U
*/
FSize*const globalReceiveMap = new FSize[nbProcess * nbProcess];
memset(globalReceiveMap, 0, sizeof(FSize) * nbProcess * nbProcess);
//Share to all processus globalReceiveMap
FLOG(gatherCounter.tic());
FMpi::MpiAssert( MPI_Allgather( partsToSend, nbProcess, FMpi::GetType(*partsToSend),
globalReceiveMap, nbProcess, FMpi::GetType(*partsToSend), comm.getComm()), __LINE__ );
FLOG(gatherCounter.tac());
FMpiBufferReader**const recvBuffer = new FMpiBufferReader*[nbProcess];
memset(recvBuffer, 0, sizeof(FMpiBufferReader*) * nbProcess);
FMpiBufferWriter**const sendBuffer = new FMpiBufferWriter*[nbProcess];
memset(sendBuffer, 0, sizeof(FMpiBufferWriter*) * nbProcess);
// To send in asynchrone way
std::vector<MPI_Request> requests;
requests.reserve(2 * nbProcess);
//Prepare receive
for(int idxProc = 0 ; idxProc < nbProcess ; ++idxProc){
if(globalReceiveMap[idxProc * nbProcess + idProcess]){ //if idxProc has sth for me.
//allocate buffer of right size
recvBuffer[idxProc] = new FMpiBufferReader(comm.getComm(),globalReceiveMap[idxProc * nbProcess + idProcess]);
FAssertLF(recvBuffer[idxProc]->getCapacity() < std::numeric_limits<int>::max());
FMpi::MpiAssert( MPI_Irecv(recvBuffer[idxProc]->data(), int(recvBuffer[idxProc]->getCapacity()), MPI_PACKED,
idxProc, FMpi::TagFmmP2P, comm.getComm(), &requests[iterRequest++]) , __LINE__ );
recvBuffer[idxProc] = new FMpiBufferReader(globalReceiveMap[idxProc * nbProcess + idProcess]);
FMpi::IRecvSplit(recvBuffer[idxProc]->data(), recvBuffer[idxProc]->getCapacity(),
idxProc, FMpi::TagFmmP2P, comm, &requests);
}
}
nbMessagesToRecv = iterRequest;
// Prepare send
for(int idxProc = 0 ; idxProc < nbProcess ; ++idxProc){
if(toSend[idxProc].getSize() != 0){
sendBuffer[idxProc] = new FMpiBufferWriter(comm.getComm(),globalReceiveMap[idProcess*nbProcess+idxProc]);
sendBuffer[idxProc] = new FMpiBufferWriter(globalReceiveMap[idProcess*nbProcess+idxProc]);
// << is equivalent to write().
(*sendBuffer[idxProc]) << toSend[idxProc].getSize();
for(int idxLeaf = 0 ; idxLeaf < toSend[idxProc].getSize() ; ++idxLeaf){
......@@ -1350,9 +1344,9 @@ protected:
}
FAssertLF(sendBuffer[idxProc]->getSize() == globalReceiveMap[idProcess*nbProcess+idxProc]);
FAssertLF(sendBuffer[idxProc]->getSize() < std::numeric_limits<int>::max());
FMpi::MpiAssert( MPI_Isend( sendBuffer[idxProc]->data(), int(sendBuffer[idxProc]->getSize()) , MPI_PACKED ,
idxProc, FMpi::TagFmmP2P, comm.getComm(), &requests[iterRequest++]) , __LINE__ );
FMpi::ISendSplit(sendBuffer[idxProc]->data(), sendBuffer[idxProc]->getSize(),
idxProc, FMpi::TagFmmP2P, comm, &requests);
}
}
......@@ -1364,23 +1358,34 @@ protected:
// Waitsend receive
//////////////////////////////////////////////////////////
std::unique_ptr<MPI_Status[]> status(new MPI_Status[requests.size()]);
// Wait data
FLOG(waitCounter.tic());
MPI_Waitall(iterRequest, requests, status);
MPI_Waitall(int(requests.size()), requests.data(), status.get());
FLOG(waitCounter.tac());
for(int idxRcv = 0 ; idxRcv < nbMessagesToRecv ; ++idxRcv){
const int idxProc = status[idxRcv].MPI_SOURCE;
FSize nbLeaves;
(*recvBuffer[idxProc]) >> nbLeaves;
for(FSize idxLeaf = 0 ; idxLeaf < nbLeaves ; ++idxLeaf){
MortonIndex leafIndex;
(*recvBuffer[idxProc]) >> leafIndex;
otherP2Ptree.createLeaf(leafIndex)->getSrc()->restore((*recvBuffer[idxProc]));
for(int idxProc = 0 ; idxProc < nbProcess ; ++idxProc){
if(globalReceiveMap[idxProc * nbProcess + idProcess]){ //if idxProc has sth for me.
FAssertLF(recvBuffer[idxProc]);
FMpiBufferReader& currentBuffer = (*recvBuffer[idxProc]);
FSize nbLeaves;
currentBuffer >> nbLeaves;
for(FSize idxLeaf = 0 ; idxLeaf < nbLeaves ; ++idxLeaf){
MortonIndex leafIndex;
currentBuffer >> leafIndex;
otherP2Ptree.createLeaf(leafIndex)->getSrc()->restore(currentBuffer);
}
// Realease memory early
delete recvBuffer[idxProc];
recvBuffer[idxProc] = nullptr;
}
}
for(int idxProc = 0 ; idxProc < nbProcess ; ++idxProc){
delete sendBuffer[idxProc];
delete recvBuffer[idxProc];
recvBuffer[idxProc] = nullptr;
}
delete[] globalReceiveMap;
}
///////////////////////////////////////////////////
......@@ -1530,11 +1535,6 @@ protected:
}
}
for(int idxProc = 0 ; idxProc < nbProcess ; ++idxProc){
delete sendBuffer[idxProc];
delete recvBuffer[idxProc];
}