Commit aeea131a authored by PIACIBELLO Cyrille's avatar PIACIBELLO Cyrille

MpiBuffer are fine, need to be abstracted

parent 6a6c2a9e
......@@ -16,6 +16,7 @@
#ifndef FMPIBUFFERREADER_HPP
#define FMPIBUFFERREADER_HPP
#include <memory>
#include "../Utils/FMpi.hpp"
......@@ -27,98 +28,26 @@
* finally use data pointer as you like
*/
class FMpiBufferReader {
private:
//Classe FMpiVector, used only by FMpiBuffer
template<class ObjectType>
class FMpiVector {
protected :
static const int DefaultSize = 10;
ObjectType* array; //memory area
int capacity; //Capacity of the array
long int index; //current position in Byte !!
public:
FMpiVector():
array(0),
capacity(DefaultSize),
index(0){
array = reinterpret_cast<ObjectType*>(new char[sizeof(ObjectType)* DefaultSize]);
}
FMpiVector(const int inCapa):
array(0),
capacity(inCapa),
index(0){
array = reinterpret_cast<ObjectType*>(new char[sizeof(ObjectType)* inCapa]);
}
virtual ~FMpiVector(){
delete[] reinterpret_cast< char* >(array);
}
//To get the capacity
const int getCapacity() const{
return this->capacity;
}
//To get the array
ObjectType * data(){
return array;
}
const ObjectType * data() const{
return array;
}
//To delete all the element stored of the array
void clear(){
while(0 < index){
(&array[--index])->~ObjectType();
}
}
//To get how much space is used
long int getSize() const{
return this->index;
}
//To get how many objects are stored
int getObjectsSize(){
return (this->index / sizeof(ObjectType));
}
//To inc the index
//Usually used with array.incIndex(sizeof(my_object_stored));
void incIndex(const int inInc){
if(index + inInc > capacity){
fprintf(stderr,"Aborting : index array out of range\n");
const MPI_Comm comm; //< Communicator needed by MPI_Pack functions
const int arrayCapacity; //< Allocated space
std::unique_ptr<char[]> array; //< Allocated Array
int currentIndex;
/** Test and exit if not enought space */
void assertRemainingSpace(const size_t requestedSpace) const {
if(int(currentIndex + requestedSpace) > arrayCapacity){
printf("Error FMpiBufferWriter has not enough space\n");
exit(0);
}
else{
this->index+=inInc;
}
}
//To set the index
void setIndex(const int inInd){
if(inInd>capacity){
fprintf(stderr,"Aborting : index array out of range\n");
exit(0);
}
else{
this->index = inInd;
}
}
};
MPI_Comm comm;
FMpiVector<char> array;
public :
FMpiBufferReader(MPI_Comm inComm, const int inCapacity = 0):
/*Constructor with a default arrayCapacity of 512 bytes */
FMpiBufferReader(const MPI_Comm inComm, const int inCapacity = 512):
comm(inComm),
array(inCapacity)
arrayCapacity(inCapacity),
array(new char[inCapacity]),
currentIndex(0)
{}
/** Destructor
......@@ -126,55 +55,76 @@ public :
virtual ~FMpiBufferReader(){
}
/** Get the memory area */
/** Get allocated memory pointer */
char* data(){
return array.data();
return array.get();
}
/** Get the memory area */
/** Get allocated memory pointer */
const char* data() const {
return array.data();
return array.get();
}
/** Size of the memory initialized */
/** get the filled space */
int getSize() const{
return array.getSize();
return currentIndex;
}
/** Size of the memory initialized */
int getCapacity() const{
return arrayCapacity;
}
/** Move the read index to a position */
void seek(const int inIndex){
array.setIndex(inIndex);
if(inIndex > arrayCapacity){
printf("FMpiBufferReader :: Aborting :: Can't move index because buffer isn't long enough");
exit(0);
}
else{
currentIndex = inIndex;
}
}
/** Get the read position */
int tell() const {
return array.getSize();
return currentIndex;
}
/** Get a value with memory cast */
template <class ClassType>
ClassType getValue(){
ClassType value;
int currentIndex = array.getSize();
array.incIndex(sizeof(value));
MPI_Unpack(array.data(),sizeof(ClassType),&currentIndex,&value,1,FMpi::GetType(value),comm);
int previousIndex = currentIndex;
seek(sizeof(value) + previousIndex);
MPI_Unpack(array.get(),arrayCapacity,&previousIndex,&value,1,FMpi::GetType(value),comm);
return value;
}
/** Get a value with memory cast at a specified index */
template <class ClassType>
ClassType getValue(const int ind){
ClassType value;
int previousIndex = ind;
seek(sizeof(value)+ind);
MPI_Unpack(array.get(),arrayCapacity,&previousIndex,&value,1,FMpi::GetType(value),comm);
return value;
}
/** Fill a value with memory cast */
template <class ClassType>
void fillValue(ClassType* const inValue){
int currentIndex = array.getSize();
array.incIndex(sizeof(ClassType));
MPI_Pack(inValue,1,FMpi::GetType(*inValue),array.data(),array.getCapacity(),&currentIndex,comm);
int previousIndex = currentIndex;
seek(sizeof(ClassType) + previousIndex);
MPI_Unpack(array.get(),arrayCapacity,&previousIndex,inValue,1,FMpi::GetType(*inValue),comm);
}
/** Fill one/many value(s) with memcpy */
template <class ClassType>
void fillArray(ClassType* const inArray, const int inSize){
int currentIndex = array.getSize();
array.incIndex(sizeof(ClassType) * inSize);
MPI_Pack(inArray,inSize,FMpi::GetType(*inArray),array.data(),array.getCapacity(),&currentIndex,comm);
int previousIndex = currentIndex;
seek(sizeof(ClassType) * inSize + previousIndex);
MPI_Unpack(array.get(),arrayCapacity,&previousIndex,inArray,inSize,FMpi::GetType(*inArray),comm);
}
/** Same as fillValue */
......
......@@ -17,7 +17,6 @@
#define FMPIBUFFERWRITER_HPP
#include <memory>
#include "../Utils/FMpi.hpp"
/** @author Cyrille Piacibello
......@@ -44,8 +43,11 @@ class FMpiBufferWriter {
public:
/** Constructor with a default arrayCapacity of 512 bytes */
FMpiBufferWriter(const MPI_Comm inComm, const int inCapacity = 512):
mpiComm(inComm), arrayCapacity(inCapacity), array(new char[inCapacity]), currentIndex(0){
}
mpiComm(inComm),
arrayCapacity(inCapacity),
array(new char[inCapacity]),
currentIndex(0)
{}
/** Destructor */
virtual ~FMpiBufferWriter(){
......@@ -90,13 +92,12 @@ public:
/** Write back, position + sizeof(object) has to be < size */
template <class ClassType>
void writeAt(const int position, const ClassType& object){
if(position + sizeof(ClassType) > arrayCapacity){
if(position + sizeof(ClassType) > currentIndex){
printf("Not enought space\n");
exit(0);
}
int noConstPosition = position;
MPI_Pack(const_cast<ClassType*>(&object), 1, FMpi::GetType(object), array.get(), arrayCapacity, &noConstPosition, mpiComm);
currentIndex = FMath::Max(currentIndex, noConstPosition);
}
/** Write an array
......
......@@ -202,7 +202,6 @@ public:
if(operationsToProceed & FFmmP2M) bottomPass();
if(operationsToProceed & FFmmM2M) upwardPass();
printf("So far so good \n");
if(operationsToProceed & FFmmM2L) transferPass();
......@@ -282,9 +281,9 @@ private:
MPI_Status status[14];
// Maximum data per message is:
FMpiBufferWriter sendBuffer(comm.getComm());
const int recvBufferOffset = 8 * MaxSizePerCell + 1;
FMpiBufferReader recvBuffer(comm.getComm(), nbProcess * recvBufferOffset);
FBufferWriter sendBuffer/*(comm.getComm())*/;
const int recvBufferOffset = (8 * MaxSizePerCell + 1);
FBufferReader recvBuffer(/*comm.getComm(),*/ nbProcess*recvBufferOffset);
CellClass recvBufferCells[8];
int firstProcThatSend = idProcess + 1;
......@@ -321,7 +320,8 @@ private:
&& (getWorkingInterval((idxLevel+1), idProcess).min >>3) <= (getWorkingInterval((idxLevel+1), idProcess - 1).max >>3)){
char state = 0;
sendBuffer.write(char(0));
sendBuffer.write(state);
printf("Index position after writing first state(%d) :: %d\n",state,sendBuffer.getSize());
const CellClass* const* const child = iterArray[cellsToSend].getCurrentChild();
for(int idxChild = 0 ; idxChild < 8 ; ++idxChild){
......@@ -331,23 +331,27 @@ private:
state = char(state | (0x1 << idxChild));
}
}
printf("state just before Send :: %d \n",state);
sendBuffer.writeAt(0,state);
printf("what is it in sendBuffer :: %d\n", sendBuffer.data()[0]);
while( sendToProc && iterArray[cellsToSend].getCurrentGlobalIndex() == getWorkingInterval(idxLevel , sendToProc - 1).max){
--sendToProc;
}
MPI_Isend(sendBuffer.data(), sendBuffer.getSize(), MPI_PACKED, sendToProc, FMpi::TagFmmM2M, comm.getComm(), &requests[iterRequests++]);
MPI_Isend(sendBuffer.data(), sendBuffer.getSize(), MPI_BYTE, sendToProc, FMpi::TagFmmM2M, comm.getComm(), &requests[iterRequests++]);
}
// We may need to receive something
bool hasToReceive = false;
int endProcThatSend = firstProcThatSend;
if(idProcess != nbProcess - 1){
if(idProcess != nbProcess - 1){ // if I'm the last one (idProcess == nbProcess-1), I shall not receive anything in a M2M
while(firstProcThatSend < nbProcess
&& (getWorkingInterval((idxLevel+1), firstProcThatSend).max) < (getWorkingInterval((idxLevel+1), idProcess).max)){
// Second condition :: while firstProcThatSend max morton index is < to myself max interval
++firstProcThatSend;
printf("\n \n PLOP \n \n");
}
if(firstProcThatSend < nbProcess &&
......@@ -365,7 +369,7 @@ private:
hasToReceive = true;
for(int idxProc = firstProcThatSend ; idxProc < endProcThatSend ; ++idxProc ){
MPI_Irecv(&recvBuffer.data()[idxProc * recvBufferOffset], recvBufferOffset, MPI_PACKED,
MPI_Irecv(&recvBuffer.data()[idxProc * recvBufferOffset], recvBufferOffset, MPI_BYTE,
idxProc, FMpi::TagFmmM2M, comm.getComm(), &requests[iterRequests++]);
}
}
......@@ -402,7 +406,7 @@ private:
for(int idxProc = firstProcThatSend ; idxProc < endProcThatSend ; ++idxProc){
recvBuffer.seek(idxProc * recvBufferOffset);
int state = int(recvBuffer.getValue<char>());
printf("state read in recv :: %d \n",state);
int position = 0;
while( state && position < 8){
while(!(state & 0x1)){
......@@ -438,7 +442,6 @@ private:
FLOG( FLog::Controller << "\t\t Computation : " << computationCounter.cumulated() << " s\n" );
FLOG( FLog::Controller << "\t\t Prepare : " << prepareCounter.cumulated() << " s\n" );
FLOG( FLog::Controller << "\t\t Wait : " << waitCounter.cumulated() << " s\n" );
printf("Almost finished\n");
}
/////////////////////////////////////////////////////////////////////////////
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment