Commit 52fab931 authored by PIACIBELLO Cyrille's avatar PIACIBELLO Cyrille

MpiBuffer added

parent e9bf706d
// ===================================================================================
// Copyright ScalFmm 2011 INRIA, Olivier Coulaud, Bérenger Bramas, Matthias Messner
// olivier.coulaud@inria.fr, berenger.bramas@inria.fr
// This software is a computer program whose purpose is to compute the FMM.
//
// This software is governed by the CeCILL-C and LGPL licenses and
// abiding by the rules of distribution of free software.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public and CeCILL-C Licenses for more details.
// "http://www.cecill.info".
// "http://www.gnu.org/licenses".
// ===================================================================================
#ifndef FMPIBUFFERREADER_HPP
#define FMPIBUFFERREADER_HPP
#include "../Utils/FMpi.hpp"
/** @author Cyrille Piacibello
* This class provide the same features as FBufferWriter using MPI_Pack system
*
* Put some data
* then insert back if needed
* finally use data pointer as you like
*/
class FMpiBufferReader {
private:
//Classe FMpiVector, used only by FMpiBuffer
template<class ObjectType>
class FMpiVector {
protected :
static const int DefaultSize = 10;
ObjectType* array; //memory area
int capacity; //Capacity of the array
long int index; //current position in Byte !!
public:
FMpiVector():
array(0),
capacity(DefaultSize),
index(0){
array = reinterpret_cast<ObjectType*>(new char[sizeof(ObjectType)* DefaultSize]);
}
FMpiVector(const int inCapa):
array(0),
capacity(inCapa),
index(0){
array = reinterpret_cast<ObjectType*>(new char[sizeof(ObjectType)* inCapa]);
}
virtual ~FMpiVector(){
delete[] reinterpret_cast< char* >(array);
}
//To get the capacity
const int getCapacity() const{
return this->capacity;
}
//To get the array
ObjectType * data(){
return array;
}
const ObjectType * data() const{
return array;
}
//To delete all the element stored of the array
void clear(){
while(0 < index){
(&array[--index])->~ObjectType();
}
}
//To get how much space is used
long int getSize() const{
return this->index;
}
//To get how many objects are stored
int getObjectsSize(){
return (this->index / sizeof(ObjectType));
}
//To inc the index
//Usually used with array.incIndex(sizeof(my_object_stored));
void incIndex(const int inInc){
if(index + inInc > capacity){
fprintf(stderr,"Aborting : index array out of range\n");
exit(0);
}
else{
this->index+=inInc;
}
}
//To set the index
void setIndex(const int inInd){
if(inInd>capacity){
fprintf(stderr,"Aborting : index array out of range\n");
exit(0);
}
else{
this->index = inInd;
}
}
};
MPI_Comm comm;
FMpiVector<char> array;
public :
FMpiBufferReader(MPI_Comm inComm, const int inCapacity = 0):
comm(inComm),
array(inCapacity)
{}
/** Destructor
*/
virtual ~FMpiBufferReader(){
delete &array;
}
/** Get the memory area */
char* data(){
return array.data();
}
/** Get the memory area */
const char* data() const {
return array.data();
}
/** Size of the memory initialized */
int getSize() const{
return array.getSize();
}
/** Move the read index to a position */
void seek(const int inIndex){
array.setIndex(inIndex);
}
/** Get the read position */
int tell() const {
return array.getSize();
}
/** Get a value with memory cast */
template <class ClassType>
ClassType getValue(){
ClassType value;
long int currentIndex = array.getSize();
array.incIndex(sizeof(value));
MPI_Unpack(array.data(),sizeof(ClassType),&currentIndex,&value,1,FMpi::GetType(value),*comm);
return value;
}
/** Fill a value with memory cast */
template <class ClassType>
void fillValue(ClassType* const inValue){
int currentIndex = array.getSize();
array.incIndex(sizeof(ClassType));
MPI_Pack(inValue,1,FMpi::GetType(inValue),array.data(),array.getCapacity(),&currentIndex,*comm);
}
/** Fill one/many value(s) with memcpy */
template <class ClassType>
void fillArray(ClassType* const inArray, const int inSize){
int currentIndex = array.getSize();
array.incIndex(sizeof(ClassType) * inSize);
MPI_Pack(inArray,inSize,FMpi::GetType(inArray),array.data(),array.getCapacity(),&currentIndex,*comm);
}
/** Same as fillValue */
template <class ClassType>
FBufferReader& operator>>(ClassType& object){
fillValue(&object);
return *this;
}
};
#endif
// ===================================================================================
// Copyright ScalFmm 2011 INRIA, Olivier Coulaud, Bérenger Bramas, Matthias Messner
// olivier.coulaud@inria.fr, berenger.bramas@inria.fr
// This software is a computer program whose purpose is to compute the FMM.
//
// This software is governed by the CeCILL-C and LGPL licenses and
// abiding by the rules of distribution of free software.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public and CeCILL-C Licenses for more details.
// "http://www.cecill.info".
// "http://www.gnu.org/licenses".
// ===================================================================================
#ifndef FMPIBUFFERWRITER_HPP
#define FMPIBUFFERWRITER_HPP
#include "../Utils/FMpi.hpp"
/** @author Cyrille Piacibello
* This class provide the same features as FBufferWriter using MPI_Pack system
*
* Put some data
* then insert back if needed
* finally use data pointer as you like
*/
class FMpiBufferWriter {
private :
//Classe FMpiVector, used only by FMpiBuffer.
template<class ObjectType>
class FMpiVector {
protected :
static const int DefaultSize = 10;
ObjectType* array; //memory area
int capacity; //Capacity of the array
long int index; //current position in Byte !!
public:
FMpiVector():
array(0),
capacity(DefaultSize),
index(0){
array = reinterpret_cast<ObjectType*>(new char[sizeof(ObjectType)* DefaultSize]);
}
FMpiVector(const int inCapa):
array(0),
capacity(inCapa),
index(0){
array = reinterpret_cast<ObjectType*>(new char[sizeof(ObjectType)* inCapa]);
}
virtual ~FMpiVector(){
delete[] reinterpret_cast< char* >(array);
}
//To get the capacity
const int getCapacity() const{
return this->capacity;
}
//To get the array
ObjectType * data(){
return array;
}
const ObjectType * data() const{
return array;
}
//To delete all the element stored of the array
void clear(){
while(0 < index){
(&array[--index])->~ObjectType();
}
}
//To get how much space is used
long int getSize() const{
return this->index;
}
//To get how many objects are stored
int getObjectsSize(){
return (this->index / sizeof(ObjectType));
}
//To inc the index
//Usually used with array.incIndex(sizeof(my_object_stored));
void incIndex(const int inInc){
if(index + inInc > capacity){
fprintf(stderr,"Aborting : index array out of range\n");
exit(0);
}
else{
this->index+=inInc;
}
}
//To set the index
void setIndex(const int inInd){
if(inInd>capacity){
fprintf(stderr,"Aborting : index array out of range\n");
exit(0);
}
else{
this->index = inInd;
}
}
};
const MPI_Comm comm; // Communicator needed by MPI_Pack functions
FMpiVector<char> array;
public:
/** Constructor with a default capacity of 512 bytes */
FMpiBufferWriter(const MPI_Comm inComm, const int inCapacity = 512):
comm(inComm),
array(inCapacity)
{}
/** Destructor */
virtual ~FMpiBufferWriter(){
delete &array;
}
/** Get allocated memory pointer */
char* data(){
return array.data();
}
/** Get allocated memory pointer */
const char* data() const {
return array.data();
}
/** Get the filled space */
int getSize() const {
return array.getSize();
}
/** Write data by packing cpy */
template <class ClassType>
void write(const ClassType& object){
// buffer.memocopy(reinterpret_cast<const char*>(&object), int(sizeof(ClassType)));
int currentIndex = array.getSize();
array.incIndex(sizeof(ClassType));
MPI_Pack(const_cast<ClassType*>(&object),1,FMpi::GetType(object),array.data(),array.getCapacity(),&currentIndex,comm);
}
/**
* Allow to pass rvalue to write
*/
template <class ClassType>
void write(const ClassType&& object){
// buffer.memocopy(reinterpret_cast<const char*>(&object), int(sizeof(ClassType)));
int currentIndex = array.getSize();
array.incIndex(sizeof(ClassType));
MPI_Pack(const_cast<ClassType*>(&object),1,FMpi::GetType(object),array.data(),array.getCapacity(),&currentIndex,comm);
}
/** Write back, position + sizeof(object) has to be < size */
template <class ClassType>
void writeAt(const int position, const ClassType& object){
//(*reinterpret_cast<ClassType*>(&buffer[position])) = object;
if(position < array.getSize()){
fprintf(stderr,"Aborting : writeAt is overwritting data\n");
}
else{
int temp = position;
if(position + FMpi::GetType(object) < array.getCapacity()){
MPI_Pack(&object,1,FMpi::GetType(object),array.data(),array.getCapacity(),&temp,comm);
}
array.setIndex(temp);
}
}
/** Write an array
* Warning : inSize is a number of ClassType object to write, not a size in bytes
*/
template <class ClassType>
void write(const ClassType* const objects, const int inSize){
int currentIndex = array.getSize();
array.incIndex(sizeof(ClassType) * inSize);
MPI_Pack(objects,inSize,FMpi::GetType(*objects),array.data(),array.getCapacity(),&currentIndex,comm);
}
/** Equivalent to write */
template <class ClassType>
FMpiBufferWriter& operator<<(const ClassType& object){
write(object);
return *this;
}
/** Reset the writing index, but do not change the capacity */
void reset(){
array.clear();
}
};
#endif // FBUFFERWRITER_HPP
......@@ -43,291 +43,295 @@
/**
* @author Berenger Bramas (berenger.bramas@inria.fr)
* @class FMpi
* Please read the license
*
*/
* @author Berenger Bramas (berenger.bramas@inria.fr)
* @class FMpi
* Please read the license
*
*/
class FMpi {
public:
////////////////////////////////////////////////////////
// MPI Flag
////////////////////////////////////////////////////////
enum FMpiTag {
// FMpiTreeBuilder
TagExchangeIndexs,
TagSplittedLeaf,
TagExchangeNbLeafs,
TagSandSettling,
// FQuickSort
TagQuickSort,
// FMM
TagFmmM2M,
TagFmmL2L,
TagFmmP2P,
// Bitonic,
TagBitonicMin,
TagBitonicMax,
TagBitonicMinMess,
TagBitonicMaxMess,
// Last defined tag
TagLast,
};
////////////////////////////////////////////////////////
// FComm to factorize MPI_Comm work
////////////////////////////////////////////////////////
/** This class is used to put all the usual method
* related mpi comm
*/
class FComm : public FNoCopyable {
int rank; //< rank related to the comm
int nbProc; //< nb proc in this group
MPI_Comm communicator; //< current mpi communicator
MPI_Group group; //< current mpi group
// reset : get rank and nb proc from mpi
void reset(){
FMpi::Assert( MPI_Comm_rank(communicator,&rank), __LINE__ );
FMpi::Assert( MPI_Comm_size(communicator,&nbProc), __LINE__ );
}
public:
/** Constructor : dup the comm given in parameter */
explicit FComm(MPI_Comm inCommunicator ) {
FMpi::Assert( MPI_Comm_dup(inCommunicator, &communicator), __LINE__ , "comm dup");
FMpi::Assert( MPI_Comm_group(communicator, &group), __LINE__ , "comm group");
reset();
}
/** Free communicator and group */
virtual ~FComm(){
FMpi::Assert( MPI_Comm_free(&communicator), __LINE__ );
FMpi::Assert( MPI_Group_free(&group), __LINE__ );
}
/** To get the mpi comm needed for communication */
MPI_Comm getComm() const {
return communicator;
}
/** The current rank */
int processId() const {
return rank;
}
/** The current number of procs in the group */
int processCount() const {
return nbProc;
}
////////////////////////////////////////////////////////////
// Split/Chunk functions
////////////////////////////////////////////////////////////
/** Get a left index related to a size */
template< class T >
T getLeft(const T inSize) const {
const double step = (double(inSize) / double(processCount()));
return T(FMath::Ceil(step * double(processId())));
}
/** Get a right index related to a size */
template< class T >
T getRight(const T inSize) const {
const double step = (double(inSize) / double(processCount()));
const T res = T(FMath::Ceil(step * double(processId()+1)));
if(res > inSize) return inSize;
else return res;
}
/** Get a right index related to a size and another id */
template< class T >
T getOtherRight(const T inSize, const int other) const {
const double step = (double(inSize) / double(processCount()));
const T res = T(FMath::Ceil(step * double(other+1)));
if(res > inSize) return inSize;
else return res;
}
/** Get a left index related to a size and another id */
template< class T >
T getOtherLeft(const T inSize, const int other) const {
const double step = (double(inSize) / double(processCount()));
return T(FMath::Ceil(step * double(other)));
}
/** Get a proc id from and index */
template< class T >
int getProc(const int position, const T inSize) const {
const double step = (double(inSize) / processCount());
return int(position/step);
}
////////////////////////////////////////////////////////////
// Mpi interface functions
////////////////////////////////////////////////////////////
/** Reduce a value for proc == 0 */
template< class T >
T reduceSum(T data) const {
T result(0);
FMpi::Assert( MPI_Reduce( &data, &result, 1, FMpi::GetType(data), MPI_SUM, 0, communicator ), __LINE__);
return result;
}
/** Reduce an average */
template< class T >
T reduceAverageAll(T data) const {
T result[processCount()];
FMpi::Assert( MPI_Allgather( &data, 1, FMpi::GetType(data), result, 1, FMpi::GetType(data), getComm()), __LINE__ );
T average = 0;
for(int idxProc = 0 ; idxProc < processCount() ;++idxProc){
average += result[idxProc] / processCount();
}
return average;
}
/** Change the group size */
void groupReduce(const int from , const int to){
int * procsIdArray = new int [to - from + 1];
for(int idxProc = from ;idxProc <= to ; ++idxProc){
procsIdArray[idxProc - from] = idxProc;
}
MPI_Group previousGroup = group;
FMpi::Assert( MPI_Group_incl(previousGroup, to - from + 1 , procsIdArray, &group), __LINE__ );
MPI_Comm previousComm = communicator;
FMpi::Assert( MPI_Comm_create(previousComm, group, &communicator), __LINE__ );
MPI_Comm_free(&previousComm);
MPI_Group_free(&previousGroup);
reset();
delete procsIdArray ;
}
};
////////////////////////////////////////////////////////
// FMpi methods
////////////////////////////////////////////////////////
/*
We use init with thread because of an openmpi error:
////////////////////////////////////////////////////////
// MPI Flag
////////////////////////////////////////////////////////
enum FMpiTag {
// FMpiTreeBuilder
TagExchangeIndexs,
TagSplittedLeaf,
TagExchangeNbLeafs,
TagSandSettling,
// FQuickSort
TagQuickSort,
// FMM
TagFmmM2M,
TagFmmL2L,
TagFmmP2P,
// Bitonic,
TagBitonicMin,
TagBitonicMax,
TagBitonicMinMess,
TagBitonicMaxMess,
// Last defined tag
TagLast,
};
////////////////////////////////////////////////////////
// FComm to factorize MPI_Comm work
////////////////////////////////////////////////////////
/** This class is used to put all the usual method
* related mpi comm
*/
class FComm : public FNoCopyable {
int rank; //< rank related to the comm
int nbProc; //< nb proc in this group
MPI_Comm communicator; //< current mpi communicator
MPI_Group group; //< current mpi group
// reset : get rank and nb proc from mpi
void reset(){
FMpi::Assert( MPI_Comm_rank(communicator,&rank), __LINE__ );
FMpi::Assert( MPI_Comm_size(communicator,&nbProc), __LINE__ );
}
[fourmi062:15896] [[13237,0],1]-[[13237,1],1] mca_oob_tcp_msg_recv: readv failed: Connection reset by peer (104)
[fourmi056:04597] [[13237,0],3]-[[13237,1],3] mca_oob_tcp_msg_recv: readv failed: Connection reset by peer (104)
[fourmi053:08571] [[13237,0],5]-[[13237,1],5] mca_oob_tcp_msg_recv: readv failed: Connection reset by peer (104)
public:
/** Constructor : dup the comm given in parameter */
explicit FComm(MPI_Comm inCommunicator ) {
FMpi::Assert( MPI_Comm_dup(inCommunicator, &communicator), __LINE__ , "comm dup");
FMpi::Assert( MPI_Comm_group(communicator, &group), __LINE__ , "comm group");
Erreur pour le proc1