Commit a4d7fa31 authored by BRAMAS Berenger's avatar BRAMAS Berenger
Browse files

make the distribution of the work working for 1 billion and 10 procs (but need...

make the distribution of the work working for 1 billion and 10 procs (but need to divide the message for the P2P)
parent d996f38e
......@@ -324,8 +324,6 @@ public:
const FSize leavesOffsetInParticles[], const ParticleClass particlesArrayInLeafOrder[],
const FSize currentNbLeaves,
const FSize currentNbParts, FAbstractBalanceAlgorithm * balancer){
const FSize MAX_BYTE_PER_MPI_MESS = 2000000000;
const FSize MAX_PARTICLES_PER_MPI_MESS = FMath::Max(FSize(1), FSize(MAX_BYTE_PER_MPI_MESS/sizeof(ParticleClass)));
const int myRank = communicator.processId();
const int nbProcs = communicator.processCount();
......@@ -367,7 +365,6 @@ public:
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] Get my interval (" << packsToSend.size() << ")\n"; FLog::Controller.flush(); );
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] Send data\n"; FLog::Controller.flush(); );
std::unique_ptr<FSize[]> nbPartsPerPackToSend(new FSize[packsToSend.size()]);
// Store the requests
std::vector<MPI_Request> requestsNbParts;
requestsNbParts.reserve(packsToSend.size());
......@@ -377,17 +374,15 @@ public:
const FEqualize::Package& pack = packsToSend[idxPack];
if(pack.idProc != myRank && 0 < (pack.elementTo-pack.elementFrom)){
// If not to me and if there is something to send
nbPartsPerPackToSend[idxPack] = leavesOffsetInParticles[pack.elementTo]-leavesOffsetInParticles[pack.elementFrom];
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] pre-send to " << pack.idProc << " nb " << nbPartsPerPackToSend[idxPack] << " \n"; FLog::Controller.flush(); );
const long long int nbPartsPerPackToSend = leavesOffsetInParticles[pack.elementTo]-leavesOffsetInParticles[pack.elementFrom];
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] pre-send to " << pack.idProc << " nb " << nbPartsPerPackToSend << " \n"; FLog::Controller.flush(); );
// Send the size of the data
requestsNbParts.emplace_back();
FMpi::MpiAssert(MPI_Isend(&nbPartsPerPackToSend[idxPack],1,MPI_LONG_LONG_INT,pack.idProc,
FMpi::TagExchangeIndexs+1, communicator.getComm(), &requestsNbParts.back()),__LINE__);
FMpi::MpiAssert(MPI_Isend(&nbPartsPerPackToSend,1,MPI_LONG_LONG_INT,pack.idProc,
FMpi::TagExchangeIndexs, communicator.getComm(), &requestsNbParts.back()),__LINE__);
}
else {
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] skip " << idxPack << " \n"; FLog::Controller.flush(); );
// Nothing to send
nbPartsPerPackToSend[idxPack] = 0;
}
}
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] Send done \n"; FLog::Controller.flush(); );
......@@ -414,7 +409,7 @@ public:
// We need to know how much particles to receive
requestsNbParts.emplace_back();
FMpi::MpiAssert(MPI_Irecv(&nbPartsPerPackToRecv[idxPack], 1, MPI_LONG_LONG_INT, pack.idProc,
FMpi::TagExchangeIndexs+1, communicator.getComm(), &requestsNbParts.back()), __LINE__);
FMpi::TagExchangeIndexs, communicator.getComm(), &requestsNbParts.back()), __LINE__);
}
else{
if(pack.idProc == myRank){
......@@ -437,19 +432,20 @@ public:
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] Wait Done \n"; FLog::Controller.flush(); );
//std::vector<MPI_Request> requestsParts;
std::unique_ptr<int[]> sendcounts(new int[communicator.processCount()]);
memset(sendcounts.get(), 0, sizeof(int)*communicator.processCount());
std::unique_ptr<int[]> sdispls(new int[communicator.processCount()]);
memset(sdispls.get(), 0, sizeof(int)*communicator.processCount());
std::vector<MPI_Request> requestsParts;
for(unsigned int idxPack = 0; idxPack< packsToSend.size() ; ++idxPack){
const FEqualize::Package& pack = packsToSend[idxPack];
if(pack.idProc != myRank && 0 < (pack.elementTo-pack.elementFrom)){
sendcounts[pack.idProc] = int(sizeof(ParticleClass)*nbPartsPerPackToSend[idxPack]);
if(pack.idProc!=0) sdispls[pack.idProc] = sdispls[pack.idProc-1] + sendcounts[pack.idProc-1];
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] send to "
<< pack.idProc << " nb " << nbPartsPerPackToSend[idxPack] << " sdispls " << sdispls[pack.idProc] << " \n"; FLog::Controller.flush(); );
<< pack.idProc << " nb " << (pack.elementTo-pack.elementFrom) << " \n"; FLog::Controller.flush(); );
FMpi::ISendSplit(&particlesArrayInLeafOrder[pack.elementFrom],
(pack.elementTo - pack.elementFrom),
pack.idProc,
FMpi::TagExchangeIndexs + 1,
communicator,
&requestsParts);
}
}
......@@ -462,23 +458,25 @@ public:
totalPartsToReceive += nbPartsPerPackToRecv[idxPack];
}
std::vector<ParticleClass> particlesRecvBuffer;
std::unique_ptr<int[]> recvcounts(new int[communicator.processCount()]);
memset(recvcounts.get(), 0, sizeof(int)*communicator.processCount());
std::unique_ptr<int[]> rdispls(new int[communicator.processCount()]);
memset(rdispls.get(), 0, sizeof(int)*communicator.processCount());
std::unique_ptr<ParticleClass[]> particlesRecvBuffer(new ParticleClass[totalPartsToReceive]);
// Post all the receive and copy mine
if(totalPartsToReceive){
particlesRecvBuffer.resize(totalPartsToReceive);
FSize offsetToRecv = 0;
for(unsigned int idxPack = 0; idxPack < packsToRecv.size(); ++idxPack){
const FEqualize::Package& pack = packsToRecv[idxPack];
if(pack.idProc != myRank && 0 < (pack.elementTo-pack.elementFrom)){
recvcounts[pack.idProc] = int(sizeof(ParticleClass)*nbPartsPerPackToRecv[idxPack]);
rdispls[pack.idProc] = int(sizeof(ParticleClass)*offsetToRecv);
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] recv from "
<< pack.idProc << " nb " << nbPartsPerPackToRecv[idxPack] << " (offset " << rdispls[pack.idProc] << ") \n"; FLog::Controller.flush(); );
<< pack.idProc << " nb " << nbPartsPerPackToRecv[idxPack] << "\n"; FLog::Controller.flush(); );
FAssertLF(pack.elementTo <= size_t(totalPartsToReceive));
FMpi::IRecvSplit(&particlesRecvBuffer[pack.elementFrom],
(pack.elementTo - pack.elementFrom),
pack.idProc,
FMpi::TagExchangeIndexs + 1,
communicator,
&requestsParts);
}
else if(pack.idProc == myRank){
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] skip " << idxPack << " \n"; FLog::Controller.flush(); );
......@@ -493,28 +491,13 @@ public:
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] pre Wait \n"; FLog::Controller.flush(); );
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] MPI_Alltoallv \n"; FLog::Controller.flush(); );
for(int idxProc = 0 ; idxProc < communicator.processCount() ; ++idxProc){
if(sendcounts[idxProc]){
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] for "
<< idxProc << " send " << sendcounts[idxProc] << " \n"; FLog::Controller.flush(); );
}
if(recvcounts[idxProc]){
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] for "
<< idxProc << " recv " << recvcounts[idxProc] << " rdispls " << rdispls[idxProc] << " \n"; FLog::Controller.flush(); );
}
}
FMpi::MpiAssert( MPI_Alltoallv((const void *)particlesArrayInLeafOrder, sendcounts.get(),
sdispls.get(), MPI_BYTE,
(void *)particlesRecvBuffer.data(), recvcounts.get(),
rdispls.get(), MPI_BYTE, communicator.getComm()), __LINE__);
FMpi::Assert( MPI_Waitall(int(requestsParts.size()), requestsParts.data(), MPI_STATUSES_IGNORE), __LINE__ );
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] MPI_Alltoallv Done \n"; FLog::Controller.flush(); );
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << communicator.processId() << "] Wait Done \n"; FLog::Controller.flush(); );
// Insert in the particle saver
for(FSize idPartsToStore = 0 ; idPartsToStore < int(particlesRecvBuffer.size()) ; ++idPartsToStore){
for(FSize idPartsToStore = 0 ; idPartsToStore < totalPartsToReceive ; ++idPartsToStore){
particlesSaver->push(particlesRecvBuffer[idPartsToStore]);
}
}
......
......@@ -28,6 +28,7 @@
#include "FNoCopyable.hpp"
#include "FMath.hpp"
#include "FAssert.hpp"
//Need that for converting datas
#include "FComplex.hpp"
......@@ -434,6 +435,44 @@ public:
}
}
static const size_t MaxBytesPerDivMess = 20000000;
template <class ObjectType, class VectorType>
static int ISendSplit(const ObjectType toSend[], const size_t nbItems,
const int dest, const int tagBase, const FMpi::FComm& communicator,
VectorType* requestVector){
const size_t totalByteToSend = (nbItems*sizeof(ObjectType));
unsigned char*const ptrDataToSend = (unsigned char*)const_cast<ObjectType*>(toSend);
for(size_t idxSize = 0 ; idxSize < totalByteToSend ; idxSize += MaxBytesPerDivMess){
MPI_Request currentRequest;
const size_t nbBytesInMessage = FMath::Min(MaxBytesPerDivMess, totalByteToSend-idxSize);
FAssertLF(nbBytesInMessage < std::numeric_limits<int>::max());
FMpi::Assert( MPI_Isend(&ptrDataToSend[idxSize], int(nbBytesInMessage), MPI_BYTE , dest,
tagBase + int(idxSize/MaxBytesPerDivMess), communicator.getComm(), &currentRequest) , __LINE__);
requestVector->push_back(currentRequest);
}
return int((totalByteToSend+MaxBytesPerDivMess-1)/MaxBytesPerDivMess);
}
template <class ObjectType, class VectorType>
static int IRecvSplit(ObjectType toRecv[], const size_t nbItems,
const int source, const int tagBase, const FMpi::FComm& communicator,
VectorType* requestVector){
const size_t totalByteToRecv = (nbItems*sizeof(ObjectType));
unsigned char*const ptrDataToRecv = (unsigned char*)(toRecv);
for(size_t idxSize = 0 ; idxSize < totalByteToRecv ; idxSize += MaxBytesPerDivMess){
MPI_Request currentRequest;
const size_t nbBytesInMessage = FMath::Min(MaxBytesPerDivMess, totalByteToRecv-idxSize);
FAssertLF(nbBytesInMessage < std::numeric_limits<int>::max());
FMpi::Assert( MPI_Irecv(&ptrDataToRecv[idxSize], int(nbBytesInMessage), MPI_BYTE , source,
tagBase + int(idxSize/MaxBytesPerDivMess), communicator.getComm(), &currentRequest) , __LINE__);
requestVector->push_back(currentRequest);
}
return int((totalByteToRecv+MaxBytesPerDivMess-1)/MaxBytesPerDivMess);
}
private:
/// The original communicator
FComm* communicator;
......
......@@ -26,9 +26,6 @@
template <class SortType, class CompareType, class IndexType = size_t>
class FQuickSortMpi : public FQuickSort< SortType, IndexType> {
/** We are limited by the size of int in MPI coms */
static const int FQS_MAX_MPI_BYTES = 2000000000;
// We need a structure see the algorithm detail to know more
struct Partition{
IndexType lowerPart;
......@@ -181,24 +178,14 @@ class FQuickSortMpi : public FQuickSort< SortType, IndexType> {
for(int idxPack = 0 ; idxPack < int(whatToRecvFromWho.size()) ; ++idxPack){
const PackData& pack = whatToRecvFromWho[idxPack];
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << currentComm.processId() << "] Recv from " << pack.idProc << " from " << pack.fromElement << " to " << pack.toElement << "\n"; );
FAssertLF((pack.toElement - pack.fromElement) * sizeof(SortType) < std::numeric_limits<int>::max());
// FMpi::Assert( MPI_Irecv((SortType*)&recvBuffer[pack.fromElement], int((pack.toElement - pack.fromElement) * sizeof(SortType)), MPI_BYTE, pack.idProc,
// FMpi::TagQuickSort, currentComm.getComm(), &requests[idxPack]) , __LINE__);
// Work per max size
const IndexType nbElementsInPack = (pack.toElement - pack.fromElement);
const IndexType totalByteToRecv = IndexType(nbElementsInPack*sizeof(SortType));
unsigned char*const ptrDataToRecv = (unsigned char*)&recvBuffer[pack.fromElement];
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << currentComm.processId() << "] Recv in " << (totalByteToRecv+FQS_MAX_MPI_BYTES-1)/FQS_MAX_MPI_BYTES << " messages \n"; );
FLOG( FLog::Controller.flush(); );
for(IndexType idxSize = 0 ; idxSize < totalByteToRecv ; idxSize += FQS_MAX_MPI_BYTES){
MPI_Request currentRequest;
const FSize nbBytesInMessage = int(FMath::Min(IndexType(FQS_MAX_MPI_BYTES), totalByteToRecv-idxSize));
FAssertLF(nbBytesInMessage < std::numeric_limits<int>::max());
FMpi::Assert( MPI_Irecv(&ptrDataToRecv[idxSize], int(nbBytesInMessage), MPI_BYTE, pack.idProc,
int(FMpi::TagQuickSort + idxSize/FQS_MAX_MPI_BYTES), currentComm.getComm(), &currentRequest) , __LINE__);
requests.push_back(currentRequest);
}
FAssertLF(pack.toElement <= totalToRecv);
FMpi::IRecvSplit(&recvBuffer[pack.fromElement],
(pack.toElement - pack.fromElement),
pack.idProc,
FMpi::TagQuickSort,
currentComm,
&requests);
}
FAssertLF(whatToRecvFromWho.size() <= requests.size());
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << "Wait for " << requests.size() << " request \n" );
......@@ -226,24 +213,13 @@ class FQuickSortMpi : public FQuickSort< SortType, IndexType> {
for(int idxPack = 0 ; idxPack < int(whatToSendToWho.size()) ; ++idxPack){
const PackData& pack = whatToSendToWho[idxPack];
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << currentComm.processId() << "] Send to " << pack.idProc << " from " << pack.fromElement << " to " << pack.toElement << "\n"; );
// FAssertLF((pack.toElement - pack.fromElement)* sizeof(SortType) < std::numeric_limits<int>::max());
// FMpi::Assert( MPI_Isend(const_cast<SortType*>(&inPartToSend[pack.fromElement]), int((pack.toElement - pack.fromElement) * sizeof(SortType)), MPI_BYTE , pack.idProc,
// FMpi::TagQuickSort, currentComm.getComm(), &requests[idxPack]) , __LINE__);
// Work per max size
const IndexType nbElementsInPack = (pack.toElement - pack.fromElement);
const IndexType totalByteToSend = IndexType(nbElementsInPack*sizeof(SortType));
unsigned char*const ptrDataToSend = (unsigned char*)const_cast<SortType*>(&inPartToSend[pack.fromElement]);
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << currentComm.processId() << "] Send in " << (totalByteToSend+FQS_MAX_MPI_BYTES-1)/FQS_MAX_MPI_BYTES << " messages \n"; );
FLOG( FLog::Controller.flush(); );
for(IndexType idxSize = 0 ; idxSize < totalByteToSend ; idxSize += FQS_MAX_MPI_BYTES){
MPI_Request currentRequest;
const IndexType nbBytesInMessage = int(FMath::Min(IndexType(FQS_MAX_MPI_BYTES), totalByteToSend-idxSize));
FAssertLF(nbBytesInMessage < std::numeric_limits<int>::max());
FMpi::Assert( MPI_Isend((SortType*)&ptrDataToSend[idxSize], int(nbBytesInMessage), MPI_BYTE , pack.idProc,
int(FMpi::TagQuickSort + idxSize/FQS_MAX_MPI_BYTES), currentComm.getComm(), &currentRequest) , __LINE__);
requests.push_back(currentRequest);
}
FMpi::ISendSplit(&inPartToSend[pack.fromElement],
(pack.toElement - pack.fromElement),
pack.idProc,
FMpi::TagQuickSort,
currentComm,
&requests);
}
FAssertLF(whatToSendToWho.size() <= requests.size());
FLOG( FLog::Controller << "SCALFMM-DEBUG [" << currentComm.processId() << "] Wait for " << requests.size() << " request \n" );
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment