Commit 70fc4abe authored by BRAMAS Berenger's avatar BRAMAS Berenger

update buffer reader writter for mpi to be able to store more than 2Go

parent f70173a3
......@@ -21,34 +21,27 @@
#include "FAbstractBuffer.hpp"
#include "../Utils/FAssert.hpp"
/** @author Cyrille Piacibello
* This class provide the same features as FBufferWriter using MPI_Pack system
/** @author Cyrille Piacibello, Berenger Bramas
* This class provide the same features as FBufferWriter
*
* Put some data
* then insert back if needed
* finally use data pointer as you like
*/
class FMpiBufferReader : public FAbstractBufferReader {
MPI_Comm comm; //< Communicator needed by MPI_Pack functions
FSize arrayCapacity; //< Allocated space
std::unique_ptr<char[]> array; //< Allocated Array
FSize currentIndex;
public :
/*Constructor with a default arrayCapacity of 512 bytes */
explicit FMpiBufferReader(const MPI_Comm inComm = MPI_COMM_WORLD, const FSize inDefaultCapacity = 512):
comm(inComm),
explicit FMpiBufferReader(const FSize inDefaultCapacity = 512):
arrayCapacity(inDefaultCapacity),
array(new char[inDefaultCapacity]),
currentIndex(0){
FAssertLF(array, "Cannot allocate array");
}
/** Change the comm (or to set it later) */
void setComm(const MPI_Comm inComm){
comm = inComm;
}
/** To change the capacity (but reset the head to 0) */
void cleanAndResize(const FSize newCapacity){
if(newCapacity != arrayCapacity){
......@@ -97,50 +90,34 @@ public :
/** Get a value with memory cast */
template <class ClassType>
ClassType getValue(){
FAssertLF(arrayCapacity < std::numeric_limits<int>::max());
FAssertLF(currentIndex < std::numeric_limits<int>::max());
int previousIndex = int(currentIndex);
FAssertLF(currentIndex + FSize(sizeof(ClassType)) <= arrayCapacity );
ClassType value;
FMpi::Assert(MPI_Unpack(array.get(),int(arrayCapacity),&previousIndex,&value,FMpi::GetTypeCount(value),FMpi::GetType(value),comm), __LINE__);
seek(FSize(sizeof(value) + currentIndex));
FAssertLF(previousIndex == currentIndex);
memcpy(&value, &array[currentIndex], sizeof(ClassType));
currentIndex += sizeof(ClassType);
return value;
}
/** Get a value with memory cast at a specified index */
template <class ClassType>
ClassType getValue(const FSize ind){
ClassType value;
FAssertLF(arrayCapacity < std::numeric_limits<int>::max());
FAssertLF(ind < std::numeric_limits<int>::max());
int previousIndex = int(ind);
FMpi::Assert(MPI_Unpack(array.get(),int(arrayCapacity),&previousIndex,&value,FMpi::GetTypeCount(value),FMpi::GetType(value),comm), __LINE__);
seek(FSize(sizeof(value)+ind));
FAssertLF(previousIndex == currentIndex);
return value;
currentIndex = ind;
return getValue<ClassType>();
}
/** Fill a value with memory cast */
template <class ClassType>
void fillValue(ClassType* const inValue){
FAssertLF(arrayCapacity < std::numeric_limits<int>::max());
FAssertLF(currentIndex < std::numeric_limits<int>::max());
int previousIndex = int(currentIndex);
FMpi::Assert(MPI_Unpack(array.get(),int(arrayCapacity),&previousIndex,inValue,FMpi::GetTypeCount(*inValue),FMpi::GetType(*inValue),comm), __LINE__);
seek(FSize(sizeof(ClassType) + currentIndex));
FAssertLF(previousIndex == currentIndex);
FAssertLF(currentIndex + FSize(sizeof(ClassType)) <= arrayCapacity );
memcpy(inValue, &array[currentIndex], sizeof(ClassType));
currentIndex += sizeof(ClassType);
}
/** Fill one/many value(s) with memcpy */
template <class ClassType>
void fillArray(ClassType* const inArray, const FSize inSize){
FAssertLF(arrayCapacity < std::numeric_limits<int>::max());
FAssertLF(currentIndex < std::numeric_limits<int>::max());
FAssertLF(inSize < std::numeric_limits<int>::max());
int previousIndex = int(currentIndex);
FMpi::Assert(MPI_Unpack(array.get(),int(arrayCapacity),&previousIndex,inArray,int(inSize)*FMpi::GetTypeCount(*inArray),FMpi::GetType(*inArray),comm), __LINE__);
seek(FSize(sizeof(ClassType) * inSize + currentIndex));
FAssertLF(previousIndex == currentIndex);
FAssertLF(currentIndex + FSize(sizeof(ClassType))*inSize <= arrayCapacity );
memcpy(inArray, &array[currentIndex], sizeof(ClassType)*inSize);
currentIndex += sizeof(ClassType)*inSize;
}
/** Same as fillValue */
......
......@@ -21,22 +21,21 @@
#include "FAbstractBuffer.hpp"
#include "../Utils/FAssert.hpp"
/** @author Cyrille Piacibello
* This class provide the same features as FBufferWriter using MPI_Pack system
/** @author Cyrille Piacibello, Berenger Bramas
* This class provide the same features as FBufferWriter
*
* Put some data
* then insert back if needed
* finally use data pointer as you like
*/
class FMpiBufferWriter : public FAbstractBufferWriter {
MPI_Comm mpiComm; //< Communicator needed by MPI_Pack functions
FSize arrayCapacity; //< Allocated Space
std::unique_ptr<char[]> array; //< Allocated Array
FSize currentIndex; //< Currently filled space
/** Test and exit if not enought space */
void expandIfNeeded(const size_t requestedSpace) {
if( arrayCapacity < FSize(currentIndex + requestedSpace) ){
void expandIfNeeded(const FSize requestedSpace) {
if( arrayCapacity < currentIndex + requestedSpace){
arrayCapacity = FSize(double(currentIndex + requestedSpace + 1) * 1.5);
char* arrayTmp = new char[arrayCapacity];
memcpy(arrayTmp, array.get(), sizeof(char)*currentIndex);
......@@ -46,19 +45,13 @@ class FMpiBufferWriter : public FAbstractBufferWriter {
public:
/** Constructor with a default arrayCapacity of 512 bytes */
explicit FMpiBufferWriter(const MPI_Comm inComm, const FSize inDefaultCapacity = 1024):
mpiComm(inComm),
explicit FMpiBufferWriter(const FSize inDefaultCapacity = 1024):
arrayCapacity(inDefaultCapacity),
array(new char[inDefaultCapacity]),
currentIndex(0)
{}
/** Change the comm (or to set it later) */
void setComm(const MPI_Comm inComm){
mpiComm = inComm;
}
/** To change the capacity (but reset the head to 0 if size if lower) */
void resize(const FSize newCapacity){
if(newCapacity != arrayCapacity){
......@@ -98,10 +91,8 @@ public:
template <class ClassType>
void write(const ClassType& object){
expandIfNeeded(sizeof(ClassType));
FAssertLF(currentIndex < std::numeric_limits<int>::max());
int intCurrentIndex = int(currentIndex);
FMpi::Assert(MPI_Pack(const_cast<ClassType*>(&object), FMpi::GetTypeCount(object), FMpi::GetType(object), array.get(), int(arrayCapacity), &intCurrentIndex, mpiComm), __LINE__);
currentIndex = intCurrentIndex;
memcpy(&array[currentIndex], &object, sizeof(ClassType));
currentIndex += sizeof(ClassType);
}
/**
......@@ -110,20 +101,16 @@ public:
template <class ClassType>
void write(const ClassType&& object){
expandIfNeeded(sizeof(ClassType));
FAssertLF(arrayCapacity < std::numeric_limits<int>::max());
int intCurrentIndex = int(currentIndex);
FMpi::Assert(MPI_Pack(const_cast<ClassType*>(&object), FMpi::GetTypeCount(object), FMpi::GetType(object), array.get(), int(arrayCapacity), &intCurrentIndex, mpiComm), __LINE__);
currentIndex = intCurrentIndex;
memcpy(&array[currentIndex], &object, sizeof(ClassType));
currentIndex += sizeof(ClassType);
}
/** Write back, position + sizeof(object) has to be < size */
template <class ClassType>
void writeAt(const FSize position, const ClassType& object){
FAssertLF(FSize(position + sizeof(ClassType)) <= currentIndex);
FAssertLF(arrayCapacity < std::numeric_limits<int>::max());
FAssertLF(position < std::numeric_limits<int>::max());
int noConstPosition = int(position);
FMpi::Assert(MPI_Pack(const_cast<ClassType*>(&object), FMpi::GetTypeCount(object), FMpi::GetType(object), array.get(), int(arrayCapacity), &noConstPosition, mpiComm), __LINE__);
FAssertLF(position <= currentIndex);
currentIndex = position;
write(object);
}
/** Write an array
......@@ -132,11 +119,8 @@ public:
template <class ClassType>
void write(const ClassType* const objects, const FSize inSize){
expandIfNeeded(sizeof(ClassType) * inSize);
FAssertLF(arrayCapacity < std::numeric_limits<int>::max());
FAssertLF(inSize < std::numeric_limits<int>::max());
int intCurrentIndex = int(currentIndex);
FMpi::Assert(MPI_Pack( const_cast<ClassType*>(objects), int(inSize)*FMpi::GetTypeCount(*objects), FMpi::GetType(*objects), array.get(), int(arrayCapacity), &intCurrentIndex, mpiComm), __LINE__);
currentIndex = intCurrentIndex;
memcpy(&array[currentIndex], objects, sizeof(ClassType)*inSize);
currentIndex += sizeof(ClassType)*inSize;
}
/** Equivalent to write */
......
......@@ -401,7 +401,7 @@ protected:
MPI_Status statusSize[8];
FSize bufferSize;
FMpiBufferWriter sendBuffer(comm.getComm(), 1);// Max = 1 + sizeof(cell)*7
FMpiBufferWriter sendBuffer(1);// Max = 1 + sizeof(cell)*7
std::unique_ptr<FMpiBufferReader[]> recvBuffer(new FMpiBufferReader[7]);
FSize recvBufferSize[7];
CellClass recvBufferCells[7];
......@@ -763,7 +763,7 @@ protected:
for(int idxProc = 0 ; idxProc < nbProcess ; ++idxProc){
const long long int toSendAtProcAtLevel = indexToSend[idxLevel * nbProcess + idxProc];
if(toSendAtProcAtLevel != 0){
sendBuffer[idxLevel * nbProcess + idxProc] = new FMpiBufferWriter(comm.getComm(),int(toSendAtProcAtLevel));
sendBuffer[idxLevel * nbProcess + idxProc] = new FMpiBufferWriter(toSendAtProcAtLevel);
sendBuffer[idxLevel * nbProcess + idxProc]->write(int(toSend[idxLevel * nbProcess + idxProc].getSize()));
......@@ -784,7 +784,7 @@ protected:
const long long int toReceiveFromProcAtLevel = globalReceiveMap[(idxProc * nbProcess * OctreeHeight) + idxLevel * nbProcess + idProcess];
if(toReceiveFromProcAtLevel){
recvBuffer[idxLevel * nbProcess + idxProc] = new FMpiBufferReader(comm.getComm(),int(toReceiveFromProcAtLevel));
recvBuffer[idxLevel * nbProcess + idxProc] = new FMpiBufferReader(toReceiveFromProcAtLevel);
FMpi::IRecvSplit(recvBuffer[idxLevel * nbProcess + idxProc]->data(),
recvBuffer[idxLevel * nbProcess + idxProc]->getCapacity(), idxProc,
......@@ -1036,8 +1036,8 @@ protected:
const int heightMinusOne = FAbstractAlgorithm::lowerWorkingLevel - 1;
FMpiBufferWriter sendBuffer(comm.getComm());
FMpiBufferReader recvBuffer(comm.getComm());
FMpiBufferWriter sendBuffer;
FMpiBufferReader recvBuffer;
int righestProcToSendTo = nbProcess - 1;
......@@ -1211,20 +1211,6 @@ protected:
///////////////////////////////////////////////////
FLOG(prepareCounter.tic());
FMpiBufferWriter**const sendBuffer = new FMpiBufferWriter*[nbProcess];
memset(sendBuffer, 0, sizeof(FMpiBufferWriter*) * nbProcess);
FMpiBufferReader**const recvBuffer = new FMpiBufferReader*[nbProcess];
memset(recvBuffer, 0, sizeof(FMpiBufferReader*) * nbProcess);
/* This a nbProcess x nbProcess matrix of integer
* let U and V be id of processes :
* globalReceiveMap[U*nbProcess + V] == size of information needed by V and own by U
*/
FSize*const globalReceiveMap = new FSize[nbProcess * nbProcess];
memset(globalReceiveMap, 0, sizeof(FSize) * nbProcess * nbProcess);
FBoolArray leafsNeedOther(this->numberOfLeafs);
int countNeedOther = 0;
......@@ -1313,12 +1299,25 @@ protected:
#pragma omp master // nowait
if(p2pEnabled){
/* This a nbProcess x nbProcess matrix of integer
* let U and V be id of processes :
* globalReceiveMap[U*nbProcess + V] == size of information needed by V and own by U
*/
FSize*const globalReceiveMap = new FSize[nbProcess * nbProcess];
memset(globalReceiveMap, 0, sizeof(FSize) * nbProcess * nbProcess);
//Share to all processus globalReceiveMap
FLOG(gatherCounter.tic());
FMpi::MpiAssert( MPI_Allgather( partsToSend, nbProcess, FMpi::GetType(*partsToSend),
globalReceiveMap, nbProcess, FMpi::GetType(*partsToSend), comm.getComm()), __LINE__ );
FLOG(gatherCounter.tac());
FMpiBufferReader**const recvBuffer = new FMpiBufferReader*[nbProcess];
memset(recvBuffer, 0, sizeof(FMpiBufferReader*) * nbProcess);
FMpiBufferWriter**const sendBuffer = new FMpiBufferWriter*[nbProcess];
memset(sendBuffer, 0, sizeof(FMpiBufferWriter*) * nbProcess);
// To send in asynchrone way
std::vector<MPI_Request> requests;
requests.reserve(2 * nbProcess);
......@@ -1326,18 +1325,17 @@ protected:
for(int idxProc = 0 ; idxProc < nbProcess ; ++idxProc){
if(globalReceiveMap[idxProc * nbProcess + idProcess]){ //if idxProc has sth for me.
//allocate buffer of right size
recvBuffer[idxProc] = new FMpiBufferReader(comm.getComm(),globalReceiveMap[idxProc * nbProcess + idProcess]);
recvBuffer[idxProc] = new FMpiBufferReader(globalReceiveMap[idxProc * nbProcess + idProcess]);
FMpi::IRecvSplit(recvBuffer[idxProc]->data(), recvBuffer[idxProc]->getCapacity(),
idxProc, FMpi::TagFmmP2P, comm, &requests);
}
}
const int nbMessagesToRecv = int(requests.size());
// Prepare send
for(int idxProc = 0 ; idxProc < nbProcess ; ++idxProc){
if(toSend[idxProc].getSize() != 0){
sendBuffer[idxProc] = new FMpiBufferWriter(comm.getComm(),globalReceiveMap[idProcess*nbProcess+idxProc]);
sendBuffer[idxProc] = new FMpiBufferWriter(globalReceiveMap[idProcess*nbProcess+idxProc]);
// << is equivalent to write().
(*sendBuffer[idxProc]) << toSend[idxProc].getSize();
for(int idxLeaf = 0 ; idxLeaf < toSend[idxProc].getSize() ; ++idxLeaf){
......@@ -1366,18 +1364,28 @@ protected:
MPI_Waitall(int(requests.size()), requests.data(), status.get());
FLOG(waitCounter.tac());
for(int idxRcv = 0 ; idxRcv < nbMessagesToRecv ; ++idxRcv){
const int idxProc = status[idxRcv].MPI_SOURCE;
FSize nbLeaves;
(*recvBuffer[idxProc]) >> nbLeaves;
for(FSize idxLeaf = 0 ; idxLeaf < nbLeaves ; ++idxLeaf){
MortonIndex leafIndex;
(*recvBuffer[idxProc]) >> leafIndex;
otherP2Ptree.createLeaf(leafIndex)->getSrc()->restore((*recvBuffer[idxProc]));
for(int idxProc = 0 ; idxProc < nbProcess ; ++idxProc){
if(globalReceiveMap[idxProc * nbProcess + idProcess]){ //if idxProc has sth for me.
FAssertLF(recvBuffer[idxProc]);
FMpiBufferReader& currentBuffer = (*recvBuffer[idxProc]);
FSize nbLeaves;
currentBuffer >> nbLeaves;
for(FSize idxLeaf = 0 ; idxLeaf < nbLeaves ; ++idxLeaf){
MortonIndex leafIndex;
currentBuffer >> leafIndex;
otherP2Ptree.createLeaf(leafIndex)->getSrc()->restore(currentBuffer);
}
// Realease memory early
delete recvBuffer[idxProc];
recvBuffer[idxProc] = nullptr;
}
}
for(int idxProc = 0 ; idxProc < nbProcess ; ++idxProc){
delete sendBuffer[idxProc];
delete recvBuffer[idxProc];
recvBuffer[idxProc] = nullptr;
}
delete[] globalReceiveMap;
}
///////////////////////////////////////////////////
......@@ -1527,11 +1535,6 @@ protected:
}
}
for(int idxProc = 0 ; idxProc < nbProcess ; ++idxProc){
delete sendBuffer[idxProc];
delete recvBuffer[idxProc];
}
delete[] globalReceiveMap;
delete[] leafsDataArray;
FLOG(computation2Counter.tac());
......
......@@ -404,7 +404,7 @@ protected:
MPI_Status statusSize[8];
FSize bufferSize;
FMpiBufferWriter sendBuffer(comm.getComm(), 1);// Max = 1 + sizeof(cell)*7
FMpiBufferWriter sendBuffer(1);// Max = 1 + sizeof(cell)*7
std::unique_ptr<FMpiBufferReader[]> recvBuffer(new FMpiBufferReader[7]);
FSize recvBufferSize[7];
CellClass recvBufferCells[7];
......@@ -856,7 +856,7 @@ protected:
for(int idxProc = 0 ; idxProc < nbProcess ; ++idxProc){
const long long int toSendAtProcAtLevel = indexToSend[idxLevel * nbProcess + idxProc];
if(toSendAtProcAtLevel != 0){
sendBuffer[idxLevel * nbProcess + idxProc] = new FMpiBufferWriter(comm.getComm(),int(toSendAtProcAtLevel));
sendBuffer[idxLevel * nbProcess + idxProc] = new FMpiBufferWriter(toSendAtProcAtLevel);
sendBuffer[idxLevel * nbProcess + idxProc]->write(int(toSend[idxLevel * nbProcess + idxProc].getSize()));
......@@ -878,7 +878,7 @@ protected:
const long long int toReceiveFromProcAtLevel = globalReceiveMap[(idxProc * nbProcess * OctreeHeight) + idxLevel * nbProcess + idProcess];
if(toReceiveFromProcAtLevel){
recvBuffer[idxLevel * nbProcess + idxProc] = new FMpiBufferReader(comm.getComm(),int(toReceiveFromProcAtLevel));
recvBuffer[idxLevel * nbProcess + idxProc] = new FMpiBufferReader(toReceiveFromProcAtLevel);
FAssertLF(recvBuffer[idxLevel * nbProcess + idxProc]->getCapacity() < std::numeric_limits<int>::max());
FMpi::MpiAssert( MPI_Irecv(recvBuffer[idxLevel * nbProcess + idxProc]->data(),
......@@ -1126,7 +1126,7 @@ protected:
MPI_Status*const statusSize = new MPI_Status[8];
FMpiBufferWriter sendBuffer(comm.getComm());
FMpiBufferReader recvBuffer(comm.getComm());
FMpiBufferReader recvBuffer;
int righestProcToSendTo = nbProcess - 1;
......@@ -1441,7 +1441,7 @@ protected:
for(int idxProc = 0 ; idxProc < nbProcess ; ++idxProc){
if(globalReceiveMap[idxProc * nbProcess + idProcess]){ //if idxProc has sth for me.
//allocate buffer of right size
recvBuffer[idxProc] = new FMpiBufferReader(comm.getComm(),globalReceiveMap[idxProc * nbProcess + idProcess]);
recvBuffer[idxProc] = new FMpiBufferReader(globalReceiveMap[idxProc * nbProcess + idProcess]);
FAssertLF(recvBuffer[idxProc]->getCapacity() < std::numeric_limits<int>::max());
FMpi::MpiAssert( MPI_Irecv(recvBuffer[idxProc]->data(), int(recvBuffer[idxProc]->getCapacity()), MPI_PACKED,
idxProc, FMpi::TagFmmP2P, comm.getComm(), &requests[iterRequest++]) , __LINE__ );
......@@ -1452,7 +1452,7 @@ protected:
// Prepare send
for(int idxProc = 0 ; idxProc < nbProcess ; ++idxProc){
if(toSend[idxProc].getSize() != 0){
sendBuffer[idxProc] = new FMpiBufferWriter(comm.getComm(),globalReceiveMap[idProcess*nbProcess+idxProc]);
sendBuffer[idxProc] = new FMpiBufferWriter(globalReceiveMap[idProcess*nbProcess+idxProc]);
// << is equivalent to write().
(*sendBuffer[idxProc]) << toSend[idxProc].getSize();
for(int idxLeaf = 0 ; idxLeaf < toSend[idxProc].getSize() ; ++idxLeaf){
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment