Commit 0a68c314 authored by berenger-bramas's avatar berenger-bramas
Browse files

Now the mpi communication are buffured.

To enable buffering there is an abstractClass Sendable.
So we are not using the mpi buffering system but our own model
with an abstraction to cell data class that need to be sent.

git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/scalfmm/scalfmm/trunk@69 2616d619-271b-44dc-8df4-d4a8f33a7222
parent a4aefb42
#ifndef FBUFFERVECTOR_HPP
#define FBUFFERVECTOR_HPP
// /!\ Please, you must read the license at the bottom of this page
#include "../Utils/FAssertable.hpp"
// To get memcpy
#include <cstring>
/**
* @author Berenger Bramas (berenger.bramas@inria.fr)
* @class FBufferVector
* Please read the license
*/
template <int Capacity>
class FBufferVector : protected FAssertable {
char* buffer;
int occuped;
void checkBuffer(){
if(!buffer){
buffer = new char[Capacity];
}
}
public:
FBufferVector() : buffer(0), occuped(0) {
assert(Capacity > 0, "Capacity has to be positive", __LINE__, __FILE__);
}
~FBufferVector(){
if(buffer) delete(buffer);
}
bool addEnoughSpace(const int neededSpace) const{
return occuped + neededSpace + sizeof(int) <= Capacity;
}
int getCapacity() const{
return Capacity;
}
int getSize(){
return occuped;
}
void* getData() {
return buffer;
}
template < class T >
void addDataUp(const int tag, const T& object){
checkBuffer();
memcpy(&buffer[occuped],&tag,sizeof(int));
occuped += sizeof(int);
occuped += object.writeUp(&buffer[occuped], Capacity - occuped);
}
template < class T >
void addDataDown(const int tag, const T& object){
checkBuffer();
memcpy(&buffer[occuped],&tag,sizeof(int));
occuped += sizeof(int);
occuped += object.writeDown(&buffer[occuped], Capacity - occuped);
}
void clear(){
occuped = 0;
}
};
#endif //FBUFFERVECTOR_HPP
// [--LICENSE--]
......@@ -10,6 +10,7 @@
#include "../Containers/FBoolArray.hpp"
#include "../Containers/FOctree.hpp"
#include "../Containers/FBufferVector.hpp"
#include "../Utils/FMpi.hpp"
......@@ -43,8 +44,8 @@ int OctreeHeight, int SubtreeHeight>
FMpi& app; //< The app to communicate
Octree* const tree; //< The octree to work on
Kernel** kernels; //< The kernels
Octree* const tree; //< The octree to work on
Kernel** kernels; //< The kernels
OctreeIterator* iterArray; //< To store the iterator
OctreeIterator* previousIterArray; //< To store the previous iterator
......@@ -61,7 +62,8 @@ int OctreeHeight, int SubtreeHeight>
const int nbProcess; //< Number of process
const int idPorcess; //< Id of current process
void run(){}
const static int BufferSize = 2000; //< To know max of the buffer we receive
FBufferVector<BufferSize> * sendBuffer; //< To put data to send into a buffer
/** To swap between two arrays
* the current and the previous
......@@ -81,7 +83,8 @@ public:
FFmmAlgorithmThreadProc(FMpi& inApp, Octree* const inTree, Kernel* const inKernels)
: app(inApp), tree(inTree) , kernels(0), iterArray(0),
previousIterArray(0), previousLeft(0),previousRight(0), previousSize(0),
MaxThreads(omp_get_max_threads()), nbProcess(inApp.processCount()), idPorcess(inApp.processId()) {
MaxThreads(omp_get_max_threads()), nbProcess(inApp.processCount()), idPorcess(inApp.processId()),
sendBuffer(0) {
assert(tree, "tree cannot be null", __LINE__, __FILE__);
......@@ -90,6 +93,8 @@ public:
this->kernels[idxThread] = new KernelClass<ParticleClass, CellClass, OctreeHeight>(*inKernels);
}
this->sendBuffer = new FBufferVector<2000>[nbProcess];
FDEBUG(FDebug::Controller << "FFmmAlgorithmThreadProc\n");
FDEBUG(FDebug::Controller << "Max threads = " << MaxThreads << " .\n");
}
......@@ -100,6 +105,8 @@ public:
delete this->kernels[idxThread];
}
delete [] this->kernels;
delete [] this->sendBuffer;
}
/**
......@@ -273,10 +280,20 @@ public:
}
const int idxReceiver = getProc(parentOffset,leafs);
app.sendData(idxReceiver,sizeof(CellClass),previousIterArray[this->previousLeft+leftOffset].getCurrentCell(),previousLeft+leftOffset);
if(!this->sendBuffer[idxReceiver].addEnoughSpace(previousIterArray[this->previousLeft+leftOffset].getCurrentCell()->bytesToSendUp())){
app.sendData(idxReceiver,this->sendBuffer[idxReceiver].getSize(),this->sendBuffer[idxReceiver].getData(),idxLevel);
this->sendBuffer[idxReceiver].clear();
}
this->sendBuffer[idxReceiver].addDataUp(previousLeft+leftOffset,*previousIterArray[this->previousLeft+leftOffset].getCurrentCell());
++leftOffset;
}
for(int idxProc = 0 ; idxProc < this->nbProcess ; ++idxProc){
if(this->sendBuffer[idxProc].getSize()){
app.sendData(idxProc,this->sendBuffer[idxProc].getSize(),this->sendBuffer[idxProc].getData(),idxLevel);
this->sendBuffer[idxProc].clear();
}
}
}
else if(this->previousLeft > 0 && leftChildIter > MostLeftChild){
while( previousIterArray[previousLeft+leftOffset - 1].getCurrentGlobalIndex() >= MostLeftChild){
......@@ -308,10 +325,20 @@ public:
parentIndex = iterArray[parentOffset].getCurrentGlobalIndex();
}
const int idxReceiver = getProc(parentOffset,leafs);
app.sendData(idxReceiver,sizeof(CellClass),previousIterArray[this->previousRight-rightOffset].getCurrentCell(),previousRight-rightOffset);
if(!this->sendBuffer[idxReceiver].addEnoughSpace(previousIterArray[this->previousRight-rightOffset].getCurrentCell()->bytesToSendUp())){
app.sendData(idxReceiver,this->sendBuffer[idxReceiver].getSize(),this->sendBuffer[idxReceiver].getData(),idxLevel);
this->sendBuffer[idxReceiver].clear();
}
this->sendBuffer[idxReceiver].addDataUp(previousRight-rightOffset,*previousIterArray[this->previousRight-rightOffset].getCurrentCell());
++rightOffset;
}
for(int idxProc = 0 ; idxProc < this->nbProcess ; ++idxProc){
if(this->sendBuffer[idxProc].getSize()){
app.sendData(idxProc,this->sendBuffer[idxProc].getSize(),this->sendBuffer[idxProc].getData(),idxLevel);
this->sendBuffer[idxProc].clear();
}
}
}
rightOffsets[idxLevel+1] = rightOffset;
}
......@@ -324,15 +351,18 @@ public:
{
FDEBUG(receiveCounter.tic());
int needToReceive = FMath::Max(0,-rightOffset) + FMath::Max(0,-leftOffset);
CellClass tempCell;
int source = 0, tag = 0, filled = 0;
int source = 0, filled = 0;
int position;
char buffer[BufferSize];
while(needToReceive){
app.receiveData(sizeof(CellClass),&tempCell,&source,&tag,&filled);
if(filled){
*previousIterArray[tag].getCurrentCell() = tempCell;
app.receiveData( BufferSize, idxLevel, buffer, &source, &filled);
for(int idxBuff = 0 ; idxBuff < filled;){
memcpy(&position,&buffer[idxBuff],sizeof(int));
idxBuff += sizeof(int);
idxBuff += previousIterArray[position].getCurrentCell()->readUp(&buffer[idxBuff],filled-idxBuff);
--needToReceive;
}
--needToReceive;
}
FDEBUG(receiveCounter.tac());
}
......@@ -366,7 +396,18 @@ public:
parentIndex = iterArray[parentOffset].getCurrentGlobalIndex();
}
const int idxReceiver = getProc(parentOffset,leafs);
app.sendData(idxReceiver,sizeof(CellClass),previousIterArray[idxLeafs].getCurrentCell(),idxLeafs);
if(!this->sendBuffer[idxReceiver].addEnoughSpace(previousIterArray[idxLeafs].getCurrentCell()->bytesToSendUp())){
app.sendData(idxReceiver,this->sendBuffer[idxReceiver].getSize(),this->sendBuffer[idxReceiver].getData(),idxLevel);
this->sendBuffer[idxReceiver].clear();
}
this->sendBuffer[idxReceiver].addDataUp(idxLeafs,*previousIterArray[idxLeafs].getCurrentCell());
}
for(int idxProc = 0 ; idxProc < this->nbProcess ; ++idxProc){
if(this->sendBuffer[idxProc].getSize()){
app.sendData(idxProc,this->sendBuffer[idxProc].getSize(),this->sendBuffer[idxProc].getData(),idxLevel);
this->sendBuffer[idxProc].clear();
}
}
leftOffsets[idxLevel+1] = (previousRight-previousLeft) + 1;
......@@ -377,9 +418,10 @@ public:
this->previousRight = endIdx - 1;
this->previousSize = leafs;
app.processBarrier();
}
app.processBarrier();
FDEBUG( FDebug::Controller << "\tFinished (" << counterTime.tacAndElapsed() << "s)\n" );
FDEBUG( FDebug::Controller << "\t\t Computation : " << computationCounter.cumulated() << " s\n" );
FDEBUG( FDebug::Controller << "\t\t Send : " << sendCounter.cumulated() << " s\n" );
......@@ -484,9 +526,14 @@ public:
#pragma omp critical(CheckToSend)
{
if(!alreadySent[idxReceiver]->get(idxLeafs)){
app.sendData(idxReceiver,sizeof(CellClass),iterArray[idxLeafs].getCurrentCell(),idxLeafs);
alreadySent[idxReceiver]->set(idxLeafs,true);
needData = true;
if(!this->sendBuffer[idxReceiver].addEnoughSpace(iterArray[idxLeafs].getCurrentCell()->bytesToSendUp())){
app.sendData(idxReceiver,this->sendBuffer[idxReceiver].getSize(),this->sendBuffer[idxReceiver].getData(),idxLevel);
this->sendBuffer[idxReceiver].clear();
}
this->sendBuffer[idxReceiver].addDataUp(idxLeafs,*iterArray[idxLeafs].getCurrentCell());
}
}
#pragma omp critical(CheckToReceive)
......@@ -515,6 +562,13 @@ public:
#pragma omp single nowait
{
for(int idxProc = 0 ; idxProc < this->nbProcess ; ++idxProc){
if(this->sendBuffer[idxProc].getSize()){
app.sendData(idxProc,this->sendBuffer[idxProc].getSize(),this->sendBuffer[idxProc].getData(),idxLevel);
this->sendBuffer[idxProc].clear();
}
}
FDEBUG(sendCounter.tac());
}
......@@ -524,16 +578,20 @@ public:
FDEBUG( FDebug::Controller << "\t\tNeed to receive " << needToReceive << " cells.\n" );
FDEBUG(receiveCounter.tic());
CellClass tempCell;
int source = 0, tag = 0, filled = 0;
int source = 0, filled = 0;
int position;
char buffer[BufferSize];
while(needToReceive){
app.receiveData(sizeof(CellClass),&tempCell,&source,&tag,&filled);
if(filled){
*iterArray[tag].getCurrentCell() = tempCell;
app.receiveData( BufferSize, idxLevel, buffer, &source, &filled);
for(int idxBuff = 0 ; idxBuff < filled;){
memcpy(&position,&buffer[idxBuff],sizeof(int));
idxBuff += sizeof(int);
idxBuff += iterArray[position].getCurrentCell()->readUp(&buffer[idxBuff],filled-idxBuff);
--needToReceive;
}
--needToReceive;
}
FDEBUG(receiveCounter.tac());
}
......@@ -562,9 +620,9 @@ public:
delete alreadySent[idxProc];
}
app.processBarrier();
}
app.processBarrier();
FDEBUG( FDebug::Controller << "\tFinished (" << counterTime.tacAndElapsed() << "s)\n" );
FDEBUG( FDebug::Controller << "\t\t Computation : " << computationCounter.cumulated() << " s\n" );
FDEBUG( FDebug::Controller << "\t\t Send : " << sendCounter.cumulated() << " s\n" );
......@@ -613,13 +671,29 @@ public:
const int leftOffset = -leftOffsets[idxLevel];
for(int idxLeafs = 1 ; idxLeafs <= leftOffset ; ++idxLeafs){
const int idxReceiver = getProc((currentLeft-idxLeafs),leafs);
app.sendData(idxReceiver,sizeof(CellClass),iterArray[currentLeft-idxLeafs].getCurrentCell(),currentLeft-idxLeafs);
if(!this->sendBuffer[idxReceiver].addEnoughSpace(iterArray[currentLeft-idxLeafs].getCurrentCell()->bytesToSendDown())){
app.sendData(idxReceiver,this->sendBuffer[idxReceiver].getSize(),this->sendBuffer[idxReceiver].getData(),idxLevel);
this->sendBuffer[idxReceiver].clear();
}
this->sendBuffer[idxReceiver].addDataDown(currentLeft-idxLeafs,*iterArray[currentLeft-idxLeafs].getCurrentCell());
}
const int rightOffset = -rightOffsets[idxLevel];
for(int idxLeafs = 1 ; idxLeafs <= rightOffset ; ++idxLeafs){
const int idxReceiver = getProc((currentRight+idxLeafs),leafs);
app.sendData(idxReceiver,sizeof(CellClass),iterArray[currentRight+idxLeafs].getCurrentCell(),currentRight+idxLeafs);
if(!this->sendBuffer[idxReceiver].addEnoughSpace(iterArray[currentRight+idxLeafs].getCurrentCell()->bytesToSendDown())){
app.sendData(idxReceiver,this->sendBuffer[idxReceiver].getSize(),this->sendBuffer[idxReceiver].getData(),idxLevel);
this->sendBuffer[idxReceiver].clear();
}
this->sendBuffer[idxReceiver].addDataDown(currentRight+idxLeafs,*iterArray[currentRight+idxLeafs].getCurrentCell());
}
for(int idxProc = 0 ; idxProc < this->nbProcess ; ++idxProc){
if(this->sendBuffer[idxProc].getSize()){
app.sendData(idxProc,this->sendBuffer[idxProc].getSize(),this->sendBuffer[idxProc].getData(),idxLevel);
this->sendBuffer[idxProc].clear();
}
}
FDEBUG(sendCounter.tac());
}
......@@ -628,17 +702,20 @@ public:
{
FDEBUG(receiveCounter.tic());
int needToReceive = FMath::Max(0,rightOffsets[idxLevel]) + FMath::Max(0,leftOffsets[idxLevel]);
CellClass tempCell;
int source = 0, tag = 0, filled = 0;
int source = 0, filled = 0;
int position;
char buffer[BufferSize];
while(needToReceive){
app.receiveData(sizeof(CellClass),&tempCell,&source,&tag,&filled);
if(filled){
iterArray[tag].getCurrentCell()->addCell(tempCell);
app.receiveData( BufferSize, idxLevel, buffer, &source, &filled);
for(int idxBuff = 0 ; idxBuff < filled;){
memcpy(&position,&buffer[idxBuff],sizeof(int));
idxBuff += sizeof(int);
idxBuff += iterArray[position].getCurrentCell()->readDown(&buffer[idxBuff],filled-idxBuff);
--needToReceive;
}
--needToReceive;
}
FDEBUG(receiveCounter.tac());
}
}
......@@ -654,10 +731,11 @@ public:
}
}
FDEBUG(computationCounter.tac());
app.processBarrier();
}
}
app.processBarrier();
FDEBUG( FDebug::Controller << "\tFinished (" << counterTime.tacAndElapsed() << "s)\n" );
FDEBUG( FDebug::Controller << "\t\t Computation : " << computationCounter.cumulated() << " s\n" );
FDEBUG( FDebug::Controller << "\t\t Send : " << sendCounter.cumulated() << " s\n" );
......
......@@ -14,14 +14,22 @@ protected:
virtual ~FAbstractSendable(){}
/** To know the number of bytes needed */
virtual int bytesToSend() const = 0;
virtual int bytesToSendUp() const = 0;
/** To put the object into the buffer */
virtual int write(void* const buffer, const int limit) const = 0;
virtual int writeUp(void* const buffer, const int limit) const = 0;
/** To know the number of bytes needed */
virtual int bytesToReceiveUp() const = 0;
/** To retreive data from a buffer */
virtual int read(void* const buffer, const int limit) = 0;
virtual int readUp(void* const buffer, const int limit) = 0;
/** To know the number of bytes needed */
virtual int bytesToSendDown() const = 0;
/** To put the object into the buffer */
virtual int writeDown(void* const buffer, const int limit) const = 0;
/** To know the number of bytes needed */
virtual int bytesToReceiveDown() const = 0;
/** To retreive data from a buffer */
virtual int readDown(void* const buffer, const int limit) = 0;
};
#endif //FABSTRACTSENDABLE_HPP
......
......@@ -46,12 +46,27 @@ public:
MPI_Get_count(&status,MPI_CHAR,inFilledSize);
}
void receiveData(const int inSize, const int inTag, void* const inData, int* const inSource, int* const inFilledSize){
MPI_Status status;
MPI_Recv(inData, inSize, MPI_CHAR, MPI_ANY_SOURCE, inTag, MPI_COMM_WORLD, &status);
*inSource = status.MPI_SOURCE;
MPI_Get_count(&status,MPI_CHAR,inFilledSize);
}
bool receivedData(){
int flag;
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
return flag;
}
bool receivedData(int* const tag){
MPI_Status status;
int flag;
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, &status);
*tag = status.MPI_TAG;
return flag;
}
int processId() {
int id;
MPI_Comm_rank(MPI_COMM_WORLD,&id);
......@@ -72,6 +87,13 @@ public:
MPI_Abort(MPI_COMM_WORLD, inErrorCode);
}
double reduceSum(double data){
double result;
MPI_Reduce( &data, &result, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD );
return result;
}
#else
////////////////////////////////////////////////////////
// Without MPI
......@@ -87,10 +109,19 @@ public:
*inFilledSize = 0;
}
void receiveData(const int , const int , void* const , int* const , int* const ){
*inSource = 0;
*inFilledSize = 0;
}
bool receivedData(){
return false;
}
bool receivedData(int* const){
return false;
}
int processId() {
return 0;
}
......@@ -105,6 +136,10 @@ public:
exit(inErrorCode);
}
double reduceSum(double data){
return data;
}
#endif
////////////////////////////////////////////////////////
......
......@@ -14,6 +14,7 @@
#include "../Src/Components/FSimpleLeaf.hpp"
#include "../Src/Utils/F3DPosition.hpp"
#include "../Src/Utils/FAbstractSendable.hpp"
#include "../Src/Components/FFmaParticle.hpp"
#include "../Src/Components/FTestParticle.hpp"
......@@ -45,9 +46,40 @@ public:
class FTestCellPar : public FTestCell{
public :
void addCell(const FTestCellPar& other){
//setDataUp(this->getDataUp() + other.getDataUp());
setDataDown(this->getDataDown() + other.getDataDown());
int bytesToSendUp() const{
return sizeof(long);
}
int writeUp(void* const buffer, const int) const {
const long tmpUp = getDataUp();
memcpy(buffer,&tmpUp,bytesToSendUp());
return bytesToSendUp();
}
int bytesToReceiveUp() const{
return sizeof(long);
}
int readUp(void* const buffer, const int) {
long tmpUp;
memcpy(&tmpUp,buffer,bytesToSendUp());
setDataUp(tmpUp);
return bytesToReceiveUp();
}
int bytesToSendDown() const{
return sizeof(long);
}
int writeDown(void* const buffer, const int) const {
const long tmpDown = getDataDown();
memcpy(buffer,&tmpDown,bytesToSendDown());
return bytesToSendDown();
}
int bytesToReceiveDown() const{
return sizeof(long);
}
int readDown(void* const buffer, const int) {
long tmpDown;
memcpy(&tmpDown,buffer,bytesToSendDown());
setDataDown(tmpDown + getDataDown());
return bytesToReceiveDown();
}
};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment