Commit d996f38e authored by BRAMAS Berenger's avatar BRAMAS Berenger

update the tree builder

parent 2be79693
......@@ -188,6 +188,8 @@ public:
if( (*workingSize) != 0 ){
borderLeavesState[0] = leavesInfo[0];
borderLeavesState[1] = leavesInfo[leavesInfo.getSize()-1];
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] First " << borderLeavesState[0].mindex << "\n"; FLog::Controller.flush(); );
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] Last " << borderLeavesState[1].mindex << "\n"; FLog::Controller.flush(); );
}
std::unique_ptr<LeafInfo[]> allProcFirstLeafStates(new LeafInfo[nbProcs*2]);
......@@ -210,9 +212,10 @@ public:
if(idProcToSendTo != myRank && allProcFirstLeafStates[(idProcToSendTo)*2 + 1].mindex == borderLeavesState[0].mindex){
// Post and send message for the first leaf
requests.push((MPI_Request)0);
FAssertLF(borderLeavesState[0].nbParts < std::numeric_limits<int>::max());
FMpi::MpiAssert(MPI_Isend(&workingArray[0], int(borderLeavesState[0].nbParts), MPI_BYTE, idProcToSendTo,
FAssertLF(borderLeavesState[0].nbParts*sizeof(IndexedParticle) < std::numeric_limits<int>::max());
FMpi::MpiAssert(MPI_Isend(&workingArray[0], int(borderLeavesState[0].nbParts*sizeof(IndexedParticle)), MPI_BYTE, idProcToSendTo,
FMpi::TagExchangeIndexs, communicator.getComm(), &requests[0]),__LINE__);
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] send " << borderLeavesState[0].nbParts << " to " << idProcToSendTo << "\n"; FLog::Controller.flush(); );
hasSentFirstLeaf = true;
}
}
......@@ -240,9 +243,10 @@ public:
// If there are some on this proc
if(allProcFirstLeafStates[(postRecvIdx)*2].mindex != noDataFlag){
requests.push((MPI_Request)0);
FAssertLF(allProcFirstLeafStates[(postRecvIdx)*2].nbParts < std::numeric_limits<int>::max());
FMpi::MpiAssert(MPI_Irecv(&receivedParticles[postPositionRecv], int(allProcFirstLeafStates[(postRecvIdx)*2].nbParts), MPI_BYTE, postRecvIdx,
FAssertLF(allProcFirstLeafStates[(postRecvIdx)*2].nbParts*sizeof(IndexedParticle) < std::numeric_limits<int>::max());
FMpi::MpiAssert(MPI_Irecv(&receivedParticles[postPositionRecv], int(allProcFirstLeafStates[(postRecvIdx)*2].nbParts*sizeof(IndexedParticle)), MPI_BYTE, postRecvIdx,
FMpi::TagExchangeIndexs, communicator.getComm(), &requests[0]),__LINE__);
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] recv " << allProcFirstLeafStates[(postRecvIdx)*2].nbParts << " from " << postRecvIdx << "\n"; FLog::Controller.flush(); );
// Inc the write position
postPositionRecv += allProcFirstLeafStates[(postRecvIdx)*2].nbParts;
}
......@@ -282,6 +286,7 @@ public:
delete[] workingArray;
workingArray = particlesWithExtension;
(*workingSize) = finalParticlesNumber;
leavesInfo[leavesInfo.getSize()-1].nbParts += receivedParticles.size();
}
}
{//Filling the Array with leaves and parts //// COULD BE MOVED IN AN OTHER FUCTION
......@@ -336,7 +341,10 @@ public:
FMpi::MpiAssert(MPI_Allgather(const_cast<FSize*>(&currentNbLeaves), 1, MPI_LONG_LONG_INT, numberOfLeavesPerProc.get(),
1, MPI_LONG_LONG_INT, communicator.getComm()), __LINE__);
//prefix sum
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] Exchange number of leaves\n"; FLog::Controller.flush(); );
// prefix sum
std::unique_ptr<FSize[]> diffNumberOfLeavesPerProc(new FSize[nbProcs+1]);
diffNumberOfLeavesPerProc[0] = 0;
for(int idxProc = 0 ; idxProc < nbProcs ; ++idxProc ){
......@@ -355,34 +363,35 @@ public:
// Ask for the pack to send
std::pair<size_t, size_t> myCurrentInter = {diffNumberOfLeavesPerProc[myRank], diffNumberOfLeavesPerProc[myRank+1]};
const std::vector<FEqualize::Package> packsToSend = FEqualize::GetPackToSend(myCurrentInter, allObjectives);
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] Get my interval (" << packsToSend.size() << ")\n"; FLog::Controller.flush(); );
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] Send data\n"; FLog::Controller.flush(); );
std::unique_ptr<FSize[]> nbPartsPerPackToSend(new FSize[packsToSend.size()]);
// Store the requests
std::vector<MPI_Request> requestsParts;
std::vector<MPI_Request> requestsNbParts;
requestsNbParts.reserve(packsToSend.size());
// Send every thing except for me or if size == 0
for(unsigned int idxPack = 0; idxPack< packsToSend.size() ; ++idxPack){
const FEqualize::Package& pack = packsToSend[idxPack];
if(pack.idProc != myRank && 0 < (pack.elementTo-pack.elementFrom)){
// If not to me and if there is something to send
nbPartsPerPackToSend[idxPack] = leavesOffsetInParticles[pack.elementTo]-leavesOffsetInParticles[pack.elementFrom];
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] pre-send to " << pack.idProc << " nb " << nbPartsPerPackToSend[idxPack] << " \n"; FLog::Controller.flush(); );
// Send the size of the data
requestsNbParts.emplace_back();
FMpi::MpiAssert(MPI_Isend(&nbPartsPerPackToSend[idxPack],1,MPI_LONG_LONG_INT,pack.idProc,
FMpi::TagExchangeIndexs+1, communicator.getComm(), &requestsNbParts.back()),__LINE__);
// Send the data
for(FSize idxMess = 0 ; idxMess < nbPartsPerPackToSend[idxPack]; idxMess += MAX_PARTICLES_PER_MPI_MESS){
const int nbElementsInMessage = int(FMath::Min(nbPartsPerPackToSend[idxPack]-idxMess, MAX_PARTICLES_PER_MPI_MESS));
requestsParts.emplace_back();
FMpi::MpiAssert(MPI_Isend(const_cast<ParticleClass*>(&particlesArrayInLeafOrder[leavesOffsetInParticles[pack.elementFrom]+idxMess]),
int(sizeof(ParticleClass)*nbElementsInMessage),
MPI_BYTE, pack.idProc, int(FMpi::TagExchangeIndexs + 2 + idxMess), communicator.getComm(), &requestsParts.back()), __LINE__);
}
}
else {
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] skip " << idxPack << " \n"; FLog::Controller.flush(); );
// Nothing to send
nbPartsPerPackToSend[idxPack] = 0;
}
}
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] Send done \n"; FLog::Controller.flush(); );
// Compute the current intervals
std::vector< std::pair<size_t,size_t> > allCurrentIntervals;
allCurrentIntervals.resize(nbProcs);
......@@ -391,14 +400,17 @@ public:
allCurrentIntervals[idxProc].second = diffNumberOfLeavesPerProc[idxProc+1];
}
// Ask the packs to receive to fill my objective
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] Get my receive interval \n"; FLog::Controller.flush(); );
std::pair<size_t, size_t> myObjective = allObjectives[myRank];
const std::vector<FEqualize::Package> packsToRecv = FEqualize::GetPackToRecv(myObjective, allCurrentIntervals);
const std::vector<FEqualize::Package> packsToRecv = FEqualize::GetPackToRecv(myObjective, allCurrentIntervals);
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] recv nb particles \n"; FLog::Controller.flush(); );
// Count the number of parts to receive
std::unique_ptr<FSize[]> nbPartsPerPackToRecv(new FSize[packsToRecv.size()]);
for(unsigned int idxPack = 0; idxPack < packsToRecv.size(); ++idxPack){
const FEqualize::Package& pack = packsToRecv[idxPack];
if(pack.idProc != myRank && 0 < (pack.elementTo-pack.elementFrom)){
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] pre-recv from " << pack.idProc << " \n"; FLog::Controller.flush(); );
// We need to know how much particles to receive
requestsNbParts.emplace_back();
FMpi::MpiAssert(MPI_Irecv(&nbPartsPerPackToRecv[idxPack], 1, MPI_LONG_LONG_INT, pack.idProc,
......@@ -406,6 +418,7 @@ public:
}
else{
if(pack.idProc == myRank){
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] skip recv " << idxPack << " \n"; FLog::Controller.flush(); );
// Take my own data
const FSize sourcePosition = FMath::Max(myObjective.first, myCurrentInter.first) - myCurrentInter.first;
const FSize nbLeavesToCopy = pack.elementTo-pack.elementFrom;
......@@ -418,8 +431,31 @@ public:
}
}
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] Wait \n"; FLog::Controller.flush(); );
FMpi::MpiAssert(MPI_Waitall(int(requestsNbParts.size()), requestsNbParts.data(), MPI_STATUSES_IGNORE), __LINE__);
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] Wait Done \n"; FLog::Controller.flush(); );
//std::vector<MPI_Request> requestsParts;
std::unique_ptr<int[]> sendcounts(new int[communicator.processCount()]);
memset(sendcounts.get(), 0, sizeof(int)*communicator.processCount());
std::unique_ptr<int[]> sdispls(new int[communicator.processCount()]);
memset(sdispls.get(), 0, sizeof(int)*communicator.processCount());
for(unsigned int idxPack = 0; idxPack< packsToSend.size() ; ++idxPack){
const FEqualize::Package& pack = packsToSend[idxPack];
if(pack.idProc != myRank && 0 < (pack.elementTo-pack.elementFrom)){
sendcounts[pack.idProc] = int(sizeof(ParticleClass)*nbPartsPerPackToSend[idxPack]);
if(pack.idProc!=0) sdispls[pack.idProc] = sdispls[pack.idProc-1] + sendcounts[pack.idProc-1];
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] send to "
<< pack.idProc << " nb " << nbPartsPerPackToSend[idxPack] << " sdispls " << sdispls[pack.idProc] << " \n"; FLog::Controller.flush(); );
}
}
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] barrier after all send \n"; FLog::Controller.flush(); );
// Count the number of leaf to receive
FSize totalPartsToReceive = 0;
for(unsigned int idxPack = 0; idxPack < packsToRecv.size(); ++idxPack){
......@@ -427,6 +463,11 @@ public:
}
std::vector<ParticleClass> particlesRecvBuffer;
std::unique_ptr<int[]> recvcounts(new int[communicator.processCount()]);
memset(recvcounts.get(), 0, sizeof(int)*communicator.processCount());
std::unique_ptr<int[]> rdispls(new int[communicator.processCount()]);
memset(rdispls.get(), 0, sizeof(int)*communicator.processCount());
// Post all the receive and copy mine
if(totalPartsToReceive){
particlesRecvBuffer.resize(totalPartsToReceive);
......@@ -434,15 +475,13 @@ public:
for(unsigned int idxPack = 0; idxPack < packsToRecv.size(); ++idxPack){
const FEqualize::Package& pack = packsToRecv[idxPack];
if(pack.idProc != myRank && 0 < (pack.elementTo-pack.elementFrom)){
for(FSize idxMess = 0 ; idxMess < nbPartsPerPackToRecv[idxPack]; idxMess += MAX_PARTICLES_PER_MPI_MESS){
const int nbElementsInMessage = int(FMath::Min(nbPartsPerPackToRecv[idxPack]-idxMess, MAX_PARTICLES_PER_MPI_MESS));
requestsParts.emplace_back();
FMpi::MpiAssert( MPI_Irecv(&particlesRecvBuffer[offsetToRecv+idxMess],
int(sizeof(ParticleClass)*nbElementsInMessage), MPI_BYTE, pack.idProc,
int(FMpi::TagExchangeIndexs + 2 + idxMess), communicator.getComm(), &requestsParts.back()), __LINE__);
}
recvcounts[pack.idProc] = int(sizeof(ParticleClass)*nbPartsPerPackToRecv[idxPack]);
rdispls[pack.idProc] = int(sizeof(ParticleClass)*offsetToRecv);
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] recv from "
<< pack.idProc << " nb " << nbPartsPerPackToRecv[idxPack] << " (offset " << rdispls[pack.idProc] << ") \n"; FLog::Controller.flush(); );
}
else if(pack.idProc == myRank){
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] skip " << idxPack << " \n"; FLog::Controller.flush(); );
// Copy my particles
const FSize sourcePosition = FMath::Max(myObjective.first, myCurrentInter.first) - myCurrentInter.first;
memcpy(&particlesRecvBuffer[offsetToRecv], &particlesArrayInLeafOrder[leavesOffsetInParticles[sourcePosition]],
......@@ -452,8 +491,27 @@ public:
}
}
// Finalize communication
FMpi::MpiAssert(MPI_Waitall(int(requestsParts.size()), requestsParts.data(), MPI_STATUSES_IGNORE), __LINE__);
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] pre Wait \n"; FLog::Controller.flush(); );
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] MPI_Alltoallv \n"; FLog::Controller.flush(); );
for(int idxProc = 0 ; idxProc < communicator.processCount() ; ++idxProc){
if(sendcounts[idxProc]){
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] for "
<< idxProc << " send " << sendcounts[idxProc] << " \n"; FLog::Controller.flush(); );
}
if(recvcounts[idxProc]){
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] for "
<< idxProc << " recv " << recvcounts[idxProc] << " rdispls " << rdispls[idxProc] << " \n"; FLog::Controller.flush(); );
}
}
FMpi::MpiAssert( MPI_Alltoallv((const void *)particlesArrayInLeafOrder, sendcounts.get(),
sdispls.get(), MPI_BYTE,
(void *)particlesRecvBuffer.data(), recvcounts.get(),
rdispls.get(), MPI_BYTE, communicator.getComm()), __LINE__);
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] MPI_Alltoallv Done \n"; FLog::Controller.flush(); );
// Insert in the particle saver
for(FSize idPartsToStore = 0 ; idPartsToStore < int(particlesRecvBuffer.size()) ; ++idPartsToStore){
......@@ -471,7 +529,7 @@ public:
const FPoint<FReal>& boxCenter, const FReal boxWidth, const int treeHeight,
ContainerClass* particleSaver, FAbstractBalanceAlgorithm* balancer, const SortingType sortingType = QuickSort){
FLOG( FLog::Controller << "Particles Distribution: " << "Enter DistributeArrayToContainer\n" ; FLog::Controller.flush(); );
FLOG( FLog::Controller << "[" << communicator.processId() << "] Particles Distribution: " << "Enter DistributeArrayToContainer\n" ; FLog::Controller.flush(); );
FLOG( FTic timer );
IndexedParticle* sortedParticlesArray = nullptr;
......@@ -479,7 +537,7 @@ public:
// From ParticleClass get array of IndexedParticle sorted
GetSortedParticlesFromArray(communicator, originalParticlesArray, originalNbParticles, sortingType, boxCenter, boxWidth, treeHeight,
&sortedParticlesArray, &nbParticlesInArray);
FLOG( FLog::Controller << "Particles Distribution: " << "\t GetSortedParticlesFromArray is over (" << timer.tacAndElapsed() << "s)\n"; FLog::Controller.flush(); );
FLOG( FLog::Controller << "[" << communicator.processId() << "] Particles Distribution: " << "\t GetSortedParticlesFromArray is over (" << timer.tacAndElapsed() << "s)\n"; FLog::Controller.flush(); );
FLOG( timer.tic() );
ParticleClass* particlesArrayInLeafOrder = nullptr;
......@@ -489,7 +547,7 @@ public:
MergeSplitedLeaves(communicator, sortedParticlesArray, &nbParticlesInArray, &leavesOffsetInParticles, &particlesArrayInLeafOrder, &nbLeaves);
delete[] sortedParticlesArray;
FLOG( FLog::Controller << "Particles Distribution: " << "\t MergeSplitedLeaves is over (" << timer.tacAndElapsed() << "s)\n"; FLog::Controller.flush(); );
FLOG( FLog::Controller << "[" << communicator.processId() << "] Particles Distribution: " << "\t MergeSplitedLeaves is over (" << timer.tacAndElapsed() << "s)\n"; FLog::Controller.flush(); );
FLOG( timer.tic() );
// Equalize and balance
......@@ -498,9 +556,9 @@ public:
delete[] particlesArrayInLeafOrder;
delete[] leavesOffsetInParticles;
FLOG( FLog::Controller << "Particles Distribution: " << "\t EqualizeAndFillContainer is over (" << timer.tacAndElapsed() << "s)\n"; FLog::Controller.flush(); );
FLOG( FLog::Controller << "[" << communicator.processId() << "] Particles Distribution: " << "\t EqualizeAndFillContainer is over (" << timer.tacAndElapsed() << "s)\n"; FLog::Controller.flush(); );
FLOG( FLog::Controller << "Particles Distribution: " << "\t DistributeArrayToContainer is over (" << timer.cumulated() << "s)\n"; FLog::Controller.flush(); );
FLOG( FLog::Controller << "[" << communicator.processId() << "] Particles Distribution: " << "\t DistributeArrayToContainer is over (" << timer.cumulated() << "s)\n"; FLog::Controller.flush(); );
#ifdef SCALFMM_USE_LOG
/** To produce stats after the Equalize phase */
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment