Commit c10e3e26 authored by BRAMAS Berenger's avatar BRAMAS Berenger
Browse files

Add an equalize algorithm and update the tree builder, should be tester

parent baf3b0c4
#ifndef FEQUALIZE_HPP
#define FEQUALIZE_HPP
#include <iostream>
#include <vector>
/**
* This class proposes a method to distribute an interval own by a worker
* to some others.
* It returns a vector of Package which tell what interval to sent to which process.
* The algorithm works only if the objectives (the intervals that the workers shoud obtain
* are exclusive and given in ascendant order).
* Also each of the element from the current interval must belong to someone!
* Finally the current worker is included in the Package if its objective interval
* is related to its starting interval.
*/
class FEqualize {
// Just return the min
static size_t Min(const size_t v1, const size_t v2){
return v1 < v2 ? v1 : v2;
}
public:
/** To represent an interval to proceed */
struct Package{
int idProc;
size_t elementFrom;
size_t elementTo;
};
/**
* To know what to send to who.
* @param myCurrentInterval current process interval
* @param allObjectives the intevals that each process should have (in ascendant order, exclusive)
* @return the package that the current worker should sent to others
*/
static std::vector<Package> GetPackToSend(const std::pair<size_t, size_t> myCurrentInterval,
const std::vector< std::pair<size_t,size_t> >& allObjectives){
std::vector<Package> packToSend;
int idxProc = 0;
// Find the first proc to send to
while( idxProc != allObjectives.size()
&& allObjectives[idxProc].second < myCurrentInterval.first){
idxProc += 1;
}
// We will from the first element for sure
size_t currentElement = 0;
// Check each proc to send to
while( idxProc != allObjectives.size()
&& allObjectives[idxProc].first < myCurrentInterval.second){
Package pack;
pack.idProc = idxProc;
// The current element must start where the previous one end
// We can assert currentElement == allObjectives[idxProc].first - myCurrentInterval.first
pack.elementFrom = currentElement;
pack.elementTo = Min(allObjectives[idxProc].second , myCurrentInterval.second) - myCurrentInterval.first;
// Next time give from the previous end
currentElement = pack.elementTo;
packToSend.push_back(pack);
// Progress
idxProc += 1;
}
// We can assert that currentElement == myCurrentInterval.second
return packToSend;
}
};
#endif // FEQUALIZE_HPP
......@@ -24,6 +24,7 @@
#include "../Utils/FTrace.hpp"
#include "../BalanceTree/FLeafBalance.hpp"
#include "../BalanceTree/FEqualize.hpp"
/** This class manage the loading of particles for the mpi version.
* It use a BinLoader and then sort the data with a parallel quick sort
......@@ -305,196 +306,121 @@ private:
}
}
else{
// We have to know the number of leaves each procs holds
FSize*const numberOfLeavesPerProc = new FSize[nbProcs];
memset(numberOfLeavesPerProc, 0, sizeof(FSize) * nbProcs);
FSize intNbLeavesInIntervals = nbLeavesInIntervals;
MPI_Allgather(&intNbLeavesInIntervals, 1, MPI_LONG_LONG_INT, numberOfLeavesPerProc, 1, MPI_LONG_LONG_INT, communicator.getComm());
//For debug, print the input datas for each proc
//printf("Proc : %d : Currently have %lld parts in %lld leaves \n", myRank, myCurrentsParts, myNumberOfLeaves);
//We need the max number of leafs over th procs
FSize*const leavesOffsetPerProc = new FSize[nbProcs + 1];
FSize totalNumberOfLeaves = 0;
leavesOffsetPerProc[0] = 0;
totalNumberOfLeaves += numberOfLeavesPerProc[0];
for(int idxProc = 1 ; idxProc < nbProcs ; ++idxProc ){
leavesOffsetPerProc[idxProc] = leavesOffsetPerProc[idxProc-1] + numberOfLeavesPerProc[idxProc-1];
totalNumberOfLeaves += numberOfLeavesPerProc[idxProc];
}
leavesOffsetPerProc[nbProcs] = totalNumberOfLeaves;
const FSize currentLeafOnL = leavesOffsetPerProc[myRank]; //current leaf on my left
const FSize currentLeafOnR = leavesOffsetPerProc[myRank+1]; //current leaf on my left + my number of particles
//Building of counter send buffer
FSize * toSend = new FSize[nbProcs];
memset(toSend,0,sizeof(FSize)*nbProcs);
//Building of array of indices to send
FSize * idxToSend = new FSize[nbProcs];
memset(idxToSend,0,sizeof(FSize)*nbProcs);
for(int idxProc = 0 ; idxProc < nbProcs ; ++idxProc){
if(idxProc != myRank){
const FSize correctLeftLeafNumber = balancer->getLeft(totalNumberOfLeaves,nullptr,0,nullptr,nbProcs,idxProc);
const FSize correctRightLeafNumber = balancer->getRight(totalNumberOfLeaves,nullptr,0,nullptr,nbProcs,idxProc);
//5 cases : Refer to ParalleleDetails.pdf to know more
//First and Last : there are no particles in my current interval that belongs to this proc
if((correctRightLeafNumber < currentLeafOnL) || (correctLeftLeafNumber > currentLeafOnR)){
toSend[idxProc] = 0;
}
else{
//Second : the first part of my current interval belongs to idxProc
if((correctLeftLeafNumber <= currentLeafOnL) && (correctRightLeafNumber < currentLeafOnR)){
idxToSend[idxProc] = 0;
FSize maxToSendLeft = leavesIndices[correctRightLeafNumber-currentLeafOnL+1];
toSend[idxProc] = maxToSendLeft - leavesIndices[idxToSend[idxProc]];
}
else{//Third : all i have belongs to idxProc
if((correctLeftLeafNumber <= currentLeafOnL) && (correctRightLeafNumber >= currentLeafOnR)){
toSend[idxProc] = myCurrentsParts;
idxToSend[idxProc] = 0;
}
else{
//Forth : In my interval, there is currently all the datas belonging by idxProc
if((correctLeftLeafNumber >= currentLeafOnL) && (correctRightLeafNumber <= currentLeafOnR)){
idxToSend[idxProc] = (correctLeftLeafNumber - currentLeafOnL+1);
FSize maxToSend = (correctRightLeafNumber == currentLeafOnR)?
(myCurrentsParts) : leavesIndices[correctRightLeafNumber-currentLeafOnL +1];
toSend[idxProc] = maxToSend - leavesIndices[idxToSend[idxProc]];
}
else{
//Fifth The right part of my current interval belongs to idxProc
if((correctLeftLeafNumber >= currentLeafOnL) && (correctRightLeafNumber > currentLeafOnR)){
idxToSend[idxProc] = correctLeftLeafNumber-currentLeafOnL+1;
FSize sizeToSend = (correctLeftLeafNumber == currentLeafOnR)?
myCurrentsParts : myCurrentsParts-leavesIndices[correctLeftLeafNumber-currentLeafOnL+1];
toSend[idxProc] = sizeToSend;
}
}
}
}
}
}
}
//Then, we exchange the datas to send
FSize * globalSendRecvMap = new FSize[nbProcs*nbProcs];
memset(globalSendRecvMap,0,sizeof(FSize)*nbProcs*nbProcs);
//This could be replace by an array toRecv buildt in the same way as toSend
MPI_Allgather(toSend,nbProcs,MPI_LONG_LONG,globalSendRecvMap,nbProcs,MPI_LONG_LONG,communicator.getComm());
// { // ----------------For debug---------------------------------
// FSize totRemaining = 0;
// { //This is a sum of all parts to know if we forgot some of them
// FSize totToSend = 0;
// for(int t=0;t<nbProcs;++t){
// if(t==myRank){
// for(int k=0;k<nbProcs ; ++k){
// totToSend += toSend[k];
// printf("Proc : %d will send %lld parts to %d starting (leavesIndices[%lld] = %lld)\n",
// myRank,toSend[k],k,idxToSend[k],leavesIndices[idxToSend[k]]);
// }
// }
// MPI_Barrier(MPI_COMM_WORLD);
// }
// totRemaining = myCurrentsParts-totToSend;
// }
// if(myRank == 0){
// for(int k=0;k<nbProcs ; ++k){
// for(int t=0 ; t<nbProcs ; ++t){
// printf("%lld\t",globalSendRecvMap[k*nbProcs+t]);
// }
// printf("\n");
// }
// }
// MPI_Barrier(MPI_COMM_WORLD);
// for(int k=0;k<nbProcs ; ++k){
// totRemaining += globalSendRecvMap[k*nbProcs+myRank];
// }
// printf("Proc : %d, will have %lld parts \n",myRank,totRemaining);
// FSize totfor0 = communicator.reduceSum(totRemaining);
// if(myRank==0){
// printf("================ %lld ================\n",totfor0);
// }
// }
//Then, we have our global recv map.
//We just need to send and recv for real.
//Finally, store the remaining parts, recv the parts, send my parts
ParticleClass * finalPartBuffer;
FSize finalTotParts;
{
finalTotParts = myCurrentsParts; //We need to know how many particles we will have
FSize finalCurrentParts = myCurrentsParts; //Parts that I had and that belongs to me
for(int idxProc=0 ; idxProc<nbProcs ; ++idxProc){
finalCurrentParts -= toSend[idxProc]; //substract the parts sent
finalTotParts -= toSend[idxProc]; //substract the parts sent
finalTotParts += globalSendRecvMap[idxProc*nbProcs+myRank]; //add the parts received
}
finalPartBuffer = new ParticleClass[finalTotParts];
memset(finalPartBuffer,0,sizeof(ParticleClass)*finalTotParts);
//Copy of the parts we already hold
FSize finalIdxToStart = 0; //idx of the start of my parts inside leavesArray
FSize idxToWrite = 0; //idx to write my parts
{//we go from idxProc==0 to idxProc==myRank to increment the self starter
for(int idxProc=0 ; idxProc<myRank ; ++idxProc){
idxToWrite += globalSendRecvMap[idxProc*nbProcs+myRank];
finalIdxToStart += toSend[idxProc];
}
memcpy(&finalPartBuffer[idxToWrite],&leavesArray[finalIdxToStart],sizeof(ParticleClass)*finalCurrentParts);
}
//Second, receive in place:
MPI_Request* requests = new MPI_Request[nbProcs * 2];
int counterRequest = 0;
int tag = 99;
FSize idxToWriteRecvedDatas = 0;
//While I received from left, i write the datas at the start of the buffer
for(int idxProc=0 ; idxProc<nbProcs ; ++idxProc){
if(idxProc == myRank){ //When I myRank==idxProc, I increment the idxToWrite of What I kept to avoid erasing my parts with received parts
idxToWriteRecvedDatas += finalCurrentParts;
}
else{ //I received and inc the write index of what I get
if(globalSendRecvMap[idxProc*nbProcs+myRank]){//If i expect something from idxProc
MPI_Irecv(&finalPartBuffer[idxToWriteRecvedDatas],int(sizeof(ParticleClass))*int(globalSendRecvMap[idxProc*nbProcs+myRank]),MPI_BYTE,
idxProc,tag,communicator.getComm(),&requests[counterRequest++]);
idxToWriteRecvedDatas += globalSendRecvMap[idxProc*nbProcs+myRank];
}
}
}
//Third, send
for(int idxProc=0 ; idxProc<nbProcs ; ++idxProc){
if(toSend[idxProc]){ //If i have something for idxProc
MPI_Isend(&leavesArray[leavesIndices[idxToSend[idxProc]]],int(sizeof(ParticleClass))*int(globalSendRecvMap[myRank*nbProcs+idxProc]),MPI_BYTE,
idxProc,tag,communicator.getComm(),&requests[counterRequest++]);
}
}
//Wait for the comm :
MPI_Waitall(counterRequest,requests,MPI_STATUSES_IGNORE);
delete[] requests;
}
for(FSize idPartsToStore=0; idPartsToStore<finalTotParts ; ++idPartsToStore){
particlesSaver->push(finalPartBuffer[idPartsToStore]);
}
delete[] finalPartBuffer;
delete[] globalSendRecvMap;
delete[] idxToSend;
delete[] toSend;
delete[] numberOfLeavesPerProc;
delete[] leavesOffsetPerProc;
}
// We have to know the number of leaves each procs holds
FSize*const numberOfLeavesPerProc = new FSize[nbProcs];
memset(numberOfLeavesPerProc, 0, sizeof(FSize) * nbProcs);
FSize intNbLeavesInIntervals = nbLeavesInIntervals;
MPI_Allgather(&intNbLeavesInIntervals, 1, MPI_LONG_LONG_INT, numberOfLeavesPerProc, 1, MPI_LONG_LONG_INT, communicator.getComm());
//For debug, print the input datas for each proc
//printf("Proc : %d : Currently have %lld parts in %lld leaves \n", myRank, myCurrentsParts, myNumberOfLeaves);
//We need the max number of leafs over th procs
FSize totalNumberOfLeaves = numberOfLeavesPerProc[0];
for(int idxProc = 1 ; idxProc < nbProcs ; ++idxProc ){
totalNumberOfLeaves += numberOfLeavesPerProc[idxProc];
}
//Building of counter send buffer
FSize * toSend = new FSize[nbProcs];
memset(toSend,0,sizeof(FSize)*nbProcs);
//Building of array of indices to send
FSize * idxToSend = new FSize[nbProcs];
memset(idxToSend,0,sizeof(FSize)*nbProcs);
std::vector< std::pair<size_t,size_t> > allObjectives;
allObjectives.resize(nbProcs);
for(int idxProc = 0 ; idxProc < nbProcs ; ++idxProc){
allObjectives[idxProc].first = balancer->getLeft(totalNumberOfLeaves,NULL,0,0,nbProcs,idxProc);
allObjectives[idxProc].second = balancer->getRight(totalNumberOfLeaves,NULL,0,0,nbProcs,idxProc);
}
std::pair<size_t, size_t> myCurrentInter = allObjectives[myRank];
const std::vector<FEqualize::Package> packsToSend = FEqualize::GetPackToSend(myCurrentInter, allObjectives);
for(const FEqualize::Package& pack : packsToSend){
idxToSend[pack.idProc] = pack.elementFrom;
toSend[pack.idProc] = leavesIndices[pack.elementTo] - leavesIndices[pack.elementFrom];
}
//Then, we exchange the datas to send
FSize * globalSendRecvMap = new FSize[nbProcs*nbProcs];
memset(globalSendRecvMap,0,sizeof(FSize)*nbProcs*nbProcs);
//This could be replace by an array toRecv buildt in the same way as toSend
MPI_Allgather(toSend,nbProcs,MPI_LONG_LONG,globalSendRecvMap,nbProcs,MPI_LONG_LONG,communicator.getComm());
//Then, we have our global recv map.
//We just need to send and recv for real.
//Finally, store the remaining parts, recv the parts, send my parts
ParticleClass * finalPartBuffer;
FSize finalTotParts;
{
finalTotParts = myCurrentsParts; //We need to know how many particles we will have
FSize finalCurrentParts = myCurrentsParts; //Parts that I had and that belongs to me
for(int idxProc=0 ; idxProc<nbProcs ; ++idxProc){
finalCurrentParts -= toSend[idxProc]; //substract the parts sent
finalTotParts -= toSend[idxProc]; //substract the parts sent
finalTotParts += globalSendRecvMap[idxProc*nbProcs+myRank]; //add the parts received
}
finalPartBuffer = new ParticleClass[finalTotParts];
memset(finalPartBuffer,0,sizeof(ParticleClass)*finalTotParts);
//Copy of the parts we already hold
FSize finalIdxToStart = 0; //idx of the start of my parts inside leavesArray
FSize idxToWrite = 0; //idx to write my parts
{//we go from idxProc==0 to idxProc==myRank to increment the self starter
for(int idxProc=0 ; idxProc<myRank ; ++idxProc){
idxToWrite += globalSendRecvMap[idxProc*nbProcs+myRank];
finalIdxToStart += toSend[idxProc];
}
memcpy(&finalPartBuffer[idxToWrite],&leavesArray[finalIdxToStart],sizeof(ParticleClass)*finalCurrentParts);
}
//Second, receive in place:
MPI_Request* requests = new MPI_Request[nbProcs * 2];
int counterRequest = 0;
int tag = 99;
FSize idxToWriteRecvedDatas = 0;
//While I received from left, i write the datas at the start of the buffer
for(int idxProc=0 ; idxProc<nbProcs ; ++idxProc){
if(idxProc == myRank){ //When I myRank==idxProc, I increment the idxToWrite of What I kept to avoid erasing my parts with received parts
idxToWriteRecvedDatas += finalCurrentParts;
}
else{ //I received and inc the write index of what I get
if(globalSendRecvMap[idxProc*nbProcs+myRank]){//If i expect something from idxProc
MPI_Irecv(&finalPartBuffer[idxToWriteRecvedDatas],int(sizeof(ParticleClass))*int(globalSendRecvMap[idxProc*nbProcs+myRank]),MPI_BYTE,
idxProc,tag,communicator.getComm(),&requests[counterRequest++]);
idxToWriteRecvedDatas += globalSendRecvMap[idxProc*nbProcs+myRank];
}
}
}
//Third, send
for(int idxProc=0 ; idxProc<nbProcs ; ++idxProc){
if(toSend[idxProc]){ //If i have something for idxProc
MPI_Isend(&leavesArray[leavesIndices[idxToSend[idxProc]]],int(sizeof(ParticleClass))*int(globalSendRecvMap[myRank*nbProcs+idxProc]),MPI_BYTE,
idxProc,tag,communicator.getComm(),&requests[counterRequest++]);
}
}
//Wait for the comm :
MPI_Waitall(counterRequest,requests,MPI_STATUSES_IGNORE);
delete[] requests;
}
for(FSize idPartsToStore=0; idPartsToStore<finalTotParts ; ++idPartsToStore){
particlesSaver->push(finalPartBuffer[idPartsToStore]);
}
delete[] finalPartBuffer;
delete[] globalSendRecvMap;
delete[] idxToSend;
delete[] toSend;
delete[] numberOfLeavesPerProc;
}
delete[] leavesArray;
delete[] leavesIndices;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment