Commit ea2ac999 authored by berenger-bramas's avatar berenger-bramas
Browse files

Improve parallel P2P with MPI_Waitsome => insert into tree => compute when finished.

git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/scalfmm/scalfmm/trunk@215 2616d619-271b-44dc-8df4-d4a8f33a7222
parent 44196490
......@@ -121,20 +121,14 @@ That is the reason why we need to balance the data among nodes.
After sorting, each process has potentially several leaves.
If we have two processes $P_{i}$ and $P_{j}$ with $i < j$ the sort guarantees that all leaves from node i are inferior than the leaves on the node j in a Morton indexing way.
But the leaves are randomly distributed among the nodes and we need to balance them.
Our idea is to use a two passes algorithm describes as a sand settling:
It is a simple reordoring of the data, but the data has to stayed sorted.
\begin{enumerate}
\item We can see each node as a heap of sand.
This heap represents the leaves of the octree.
Some nodes have lot of leaves and then are a big heap.
On the contrary, some are small heaps because composed by a few leaves.
Starting from the extremities, each node can know the sand there is on its left and on its right.
\item Because each node knows the total among of sand in the system which is the sum of the sand there is on each of its sides plus its own sand it can compute a balancing calculus.
What should happen if we put a heavy plank above all our sand heaps?
Well the system should balance until all heaps have the same size.
The same happens here, each node can know what to do to balance the system.
Each node communicates only with its two neighbors by sending or receiving entire leaves.
\item Each process informs other to tell how many leaves it holds.
\item Each process compute how many leaves it has to send or to receive from left or right.
\end{enumerate}
At the end of the algorithm our system is completely balanced with the same number of leaves on each process.
\begin{figure}[h!]
\begin{center}
\includegraphics[width=14cm, height=17cm, keepaspectratio=true]{SandSettling.png}
......
......@@ -169,7 +169,7 @@ public:
delete [] iterArray;
iterArray = 0;
}
......@@ -234,7 +234,7 @@ public:
FDEBUG( FDebug::Controller << "\tFinished (@Bottom Pass (P2M) = " << counterTime.tacAndElapsed() << "s)\n" );
FDEBUG( FDebug::Controller << "\t\t Computation : " << computationCounter.elapsed() << " s\n" );
}
/////////////////////////////////////////////////////////////////////////////
......@@ -398,7 +398,7 @@ public:
FDEBUG( FDebug::Controller << "\t\t Computation : " << computationCounter.cumulated() << " s\n" );
FDEBUG( FDebug::Controller << "\t\t Prepare : " << prepareCounter.cumulated() << " s\n" );
FDEBUG( FDebug::Controller << "\t\t Wait : " << waitCounter.cumulated() << " s\n" );
}
/////////////////////////////////////////////////////////////////////////////
......@@ -821,7 +821,7 @@ public:
FDEBUG( FDebug::Controller << "\t\t Wait : " << waitCounter.cumulated() << " s\n" );
}
}
/////////////////////////////////////////////////////////////////////////////
......@@ -846,6 +846,7 @@ public:
MPI_Request requests[2 * nbProcess];
MPI_Status status[2 * nbProcess];
int iterRequest = 0;
int nbMessagesToRecv = 0;
ParticleClass* sendBuffer[nbProcess];
memset(sendBuffer, 0, sizeof(ParticleClass*) * nbProcess);
......@@ -940,6 +941,17 @@ public:
FDEBUG(gatherCounter.tac());
// Prepare receive
for(int idxProc = 0 ; idxProc < nbProcess ; ++idxProc){
if(globalReceiveMap[idxProc * nbProcess + idProcess]){
recvBuffer[idxProc] = reinterpret_cast<ParticleClass*>(new char[sizeof(ParticleClass) * globalReceiveMap[idxProc * nbProcess + idProcess]]);
mpiassert( MPI_Irecv(recvBuffer[idxProc], globalReceiveMap[idxProc * nbProcess + idProcess]*sizeof(ParticleClass), MPI_BYTE,
idxProc, FMpi::TagFmmP2P, MPI_COMM_WORLD, &requests[iterRequest++]) , __LINE__ );
}
}
nbMessagesToRecv = iterRequest;
// Prepare send
for(int idxProc = 0 ; idxProc < nbProcess ; ++idxProc){
if(indexToSend[idxProc] != 0){
sendBuffer[idxProc] = reinterpret_cast<ParticleClass*>(new char[sizeof(ParticleClass) * partsToSend[idxProc]]);
......@@ -955,12 +967,6 @@ public:
idxProc, FMpi::TagFmmP2P, MPI_COMM_WORLD, &requests[iterRequest++]) , __LINE__ );
}
if(globalReceiveMap[idxProc * nbProcess + idProcess]){
recvBuffer[idxProc] = reinterpret_cast<ParticleClass*>(new char[sizeof(ParticleClass) * globalReceiveMap[idxProc * nbProcess + idProcess]]);
mpiassert( MPI_Irecv(recvBuffer[idxProc], globalReceiveMap[idxProc * nbProcess + idProcess]*sizeof(ParticleClass), MPI_BYTE,
idxProc, FMpi::TagFmmP2P, MPI_COMM_WORLD, &requests[iterRequest++]) , __LINE__ );
}
}
......@@ -1078,32 +1084,40 @@ public:
// Wait send receive
//////////////////////////////////////////////////////////
// Wait data
FDEBUG(waitCounter.tic());
MPI_Waitall(iterRequest, requests, status);
FDEBUG(waitCounter.tac());
FDEBUG(FTic computation2Counter);
// Create an octree with leaves from others
OctreeClass otherP2Ptree( tree->getHeight(), tree->getSubHeight(), tree->getBoxWidth(), tree->getBoxCenter() );
for(int idxProc = 0 ; idxProc < nbProcess ; ++idxProc){
for(int idxPart = 0 ; idxPart < globalReceiveMap[idxProc * nbProcess + idProcess] ; ++idxPart){
otherP2Ptree.insert(recvBuffer[idxProc][idxPart]);
int complete = 0;
while( complete != iterRequest){
int indexMessage[nbProcess * 2];
memset(indexMessage, 0, sizeof(int) * nbProcess * 2);
int countMessages = 0;
// Wait data
FDEBUG(waitCounter.tic());
MPI_Waitsome(iterRequest, requests, &countMessages, indexMessage, status);
FDEBUG(waitCounter.tac());
complete += countMessages;
for(int idxRcv = 0 ; idxRcv < countMessages ; ++idxRcv){
if( indexMessage[idxRcv] < nbMessagesToRecv ){
const int idxProc = status[idxRcv].MPI_SOURCE;
for(int idxPart = 0 ; idxPart < globalReceiveMap[idxProc * nbProcess + idProcess] ; ++idxPart){
otherP2Ptree.insert(recvBuffer[idxProc][idxPart]);
}
delete [] reinterpret_cast<char*>(recvBuffer[idxProc]);
}
}
}
for(int idxProc = 0 ; idxProc < nbProcess ; ++idxProc){
delete [] reinterpret_cast<char*>(sendBuffer[idxProc]);
delete [] reinterpret_cast<char*>(recvBuffer[idxProc]);
}
//////////////////////////////////////////////////////////
// Computation P2P that need others data
//////////////////////////////////////////////////////////
FTRACE( FTrace::FRegion regionOtherTrace("Compute P2P Other", __FUNCTION__ , __FILE__ , __LINE__) );
FDEBUG(FTic computation2Counter);
FDEBUG( computation2Counter.tic() );
#pragma omp parallel
{
......@@ -1151,6 +1165,11 @@ public:
}
}
for(int idxProc = 0 ; idxProc < nbProcess ; ++idxProc){
delete [] reinterpret_cast<char*>(sendBuffer[idxProc]);
//delete [] reinterpret_cast<char*>(recvBuffer[idxProc]);
}
delete[] leafsDataArray;
FDEBUG(computation2Counter.tac());
......@@ -1162,7 +1181,7 @@ public:
FDEBUG( FDebug::Controller << "\t\t Prepare P2P : " << prepareCounter.elapsed() << " s\n" );
FDEBUG( FDebug::Controller << "\t\t Gather P2P : " << gatherCounter.elapsed() << " s\n" );
FDEBUG( FDebug::Controller << "\t\t Wait : " << waitCounter.elapsed() << " s\n" );
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment