Attention une mise à jour du serveur va être effectuée le lundi 17 mai entre 13h et 13h30. Cette mise à jour va générer une interruption du service de quelques minutes.

Commit 06ce2d76 authored by BRAMAS Berenger's avatar BRAMAS Berenger

Put the send recv for the P2P and M2L for starpu mpi

parent 6cec93cc
......@@ -128,11 +128,15 @@ public:
starpu_resume();
if( operationsToProceed & FFmmP2P ) directPass();
//if( operationsToProceed & FFmmP2P ) directPassMpi();
if( operationsToProceed & FFmmP2P ) {
insertParticlesSend();
//directPassMpi();
}
if(operationsToProceed & FFmmP2M) bottomPass();
if(operationsToProceed & FFmmM2M) upwardPass();
if(operationsToProceed & FFmmM2L) insertCellsSend();
if(operationsToProceed & FFmmM2L) transferPass();
//if(operationsToProceed & FFmmM2L) transferPassMpi();
......@@ -547,6 +551,125 @@ protected:
}
}
struct MpiDependency{
int src;
int dest;
int level;
int globalBlockId;
};
std::vector<MpiDependency> toSend;
void postRecvAllocatedBlocks(){
std::vector<MpiDependency> toRecv;
for(int idxLevel = 0 ; idxLevel < tree->getHeight() ; ++idxLevel){
for(int idxHandle = 0 ; idxHandle < int(remoteCellGroups[idxLevel].size()) ; ++idxHandle){
if(remoteCellGroups[idxLevel][idxHandle].ptr){
starpu_mpi_irecv_detached( remoteCellGroups[idxLevel][idxHandle].handle,
processesBlockInfos[idxLevel][idxHandle].owner,
idxLevel*processesBlockInfos[idxLevel][idxHandle].firstIndex,
comm.getComm(), 0, 0 );
toRecv.push_back({processesBlockInfos[idxLevel][idxHandle].owner,
comm.processId(), idxLevel, idxHandle});
}
}
}
{
for(int idxHandle = 0 ; idxHandle < int(remoteParticleGroupss.size()) ; ++idxHandle){
if(remoteParticleGroupss[idxHandle].ptr){
starpu_mpi_irecv_detached( remoteParticleGroupss[idxHandle].handle,
processesBlockInfos[tree->getHeight()-1][idxHandle].owner,
(tree->getHeight())*processesBlockInfos[tree->getHeight()-1][idxHandle].firstIndex,
comm.getComm(), 0, 0 );
toRecv.push_back({processesBlockInfos[tree->getHeight()-1][idxHandle].owner,
comm.processId(), tree->getHeight(), idxHandle});
}
}
}
FQuickSort<MpiDependency, int>::QsSequential(toRecv.data(),int(toRecv.size()),[](const MpiDependency& d1, const MpiDependency& d2){
return d1.src <= d2.src;
});
std::unique_ptr<int[]> nbBlocksToRecvFromEach(new int[comm.processCount()]);
memset(nbBlocksToRecvFromEach.get(), 0, sizeof(int)*comm.processCount());
for(int idxDep = 0 ; idxDep < int(toRecv.size()) ; ++idxDep){
nbBlocksToRecvFromEach[toRecv[idxDep].src] += 1;
}
FAssertLF(nbBlocksToRecvFromEach[comm.processId()] == 0);
int offset = 0;
for(int idxProc = 0 ; idxProc < comm.processCount() ; ++idxProc){
if(idxProc == comm.processId()){
// How much to send to each
std::unique_ptr<int[]> nbBlocksToSendToEach(new int[comm.processCount()]);
FMpi::Assert(MPI_Gather(&nbBlocksToRecvFromEach[idxProc], 1,
MPI_INT, nbBlocksToSendToEach.get(), 1,
MPI_INT, idxProc, comm.getComm() ), __LINE__);
std::unique_ptr<int[]> displs(new int[comm.processCount()]);
displs[0] = 0;
for(int idxProc = 1 ; idxProc < comm.processCount() ; ++idxProc){
displs[idxProc] = displs[idxProc-1] + nbBlocksToRecvFromEach[idxProc-1];
}
toSend.resize(displs[comm.processCount()-1] + nbBlocksToRecvFromEach[comm.processCount()-1]);
FMpi::Assert(MPI_Gatherv( 0, 0, MPI_BYTE,
toSend.data(), toSend.size()*sizeof(MpiDependency),
nbBlocksToSendToEach.get(), displs.get(),
MPI_BYTE, idxProc, comm.getComm()), __LINE__);
}
else{
FMpi::Assert(MPI_Gather(&nbBlocksToRecvFromEach[idxProc], 1,
MPI_INT, 0, 0, MPI_INT, idxProc, comm.getComm() ), __LINE__);
FMpi::Assert(MPI_Gatherv(
toRecv[offset], nbBlocksToRecvFromEach[idxProc]*sizeof(MpiDependency), MPI_BYTE,
0, 0, 0, MPI_BYTE, idxProc, comm.getComm() ), __LINE__);
offset += nbBlocksToRecvFromEach[idxProc];
}
}
}
void insertParticlesSend(){
for(int idxSd = 0 ; idxSd < int(toSend.size()) ; ++idxSd){
const MpiDependency sd = toSend[idxSd];
if(sd.level == tree->getHeight()){
const int localId = sd.globalBlockId - nbBlocksPerLevelAll[tree->getHeight()];
FAssertLF(0 <= localId);
FAssertLF(localId < tree->getNbParticleGroup());
FLOG(FLog::Controller << "[SMpi] Post a send during P2P for Idx " << tree->getParticleGroup(localId)->getStartingIndex() <<
" and dest is " << sd.dest << "\n");
starpu_mpi_isend_detached( handles[tree->getHeight()][localId], sd.dest,
tree->getHeight()*tree->getParticleGroup(localId)->getStartingIndex(),
comm.getComm(), 0/*callback*/, 0/*arg*/ );
}
}
}
void insertCellsSend(){
for(int idxSd = 0 ; idxSd < int(toSend.size()) ; ++idxSd){
const MpiDependency sd = toSend[idxSd];
if(sd.level != tree->getHeight()){
const int localId = sd.globalBlockId - nbBlocksPerLevelAll[sd.level];
FAssertLF(0 <= localId);
FAssertLF(localId < tree->getNbCellGroupAtLevel(sd.level));
FLOG(FLog::Controller << "[SMpi] Post a send during P2P for Idx " << tree->getCellGroup(sd.level, localId)->getStartingIndex() <<
" and dest is " << sd.dest << "\n");
starpu_mpi_isend_detached( handles[sd.level][localId], sd.dest,
sd.level*tree->getCellGroup(sd.level, localId)->getStartingIndex(),
comm.getComm(), 0/*callback*/, 0/*arg*/ );
}
}
}
void cleanHandleMpi(){
for(int idxLevel = 0 ; idxLevel < tree->getHeight() ; ++idxLevel){
for(int idxHandle = 0 ; idxHandle < int(remoteCellGroups[idxLevel].size()) ; ++idxHandle){
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment