Mentions légales du service

Skip to content
Snippets Groups Projects
Commit 8a8f6d64 authored by hhakim's avatar hhakim
Browse files

Fix faust_transform_omp_mul test (OMP/C++ threads Faust-matrix mul. parallel reduction).

The functions fixed are Faust::multiply_omp/multiply_par.
parent 21f77fd8
Branches
Tags
No related merge requests found
......@@ -41,36 +41,32 @@
#include <iostream>
#include <iomanip>
/** \brief unitary test for testing multiplication by faust
/** \brief unit test for testing multiplication by faust with parallel reduction on matrix products
*/
/** It was a kind of experiment to verify that parallelizing on columns/rows dot product as Eigen does is faster than doing a parallel reduction on factors in addition to parallel dot product.
* About comp. times: OMP/C++ threads works better than Eigen only if multithreading/OpenMP is disabled on eigen (see arg 1 of the program in main to disable dynamically).
*/
typedef @TEST_FPP@ FPP;
using namespace Faust;
int main(int argc, char* argv[])
{
if (typeid(FPP) == typeid(double))
{
cout<<"floating point precision == double"<<endl;
}
if (typeid(FPP) == typeid(float))
{
cout<<"floating point precision == float"<<endl;
}
if(argc > 1)
{
std::cout << "WARNING: Eigen is monothreaded." << std::endl;
Eigen::setNbThreads(1);
}
void test_prod(RandFaustType mat_type)
{
//RandFaustType t, unsigned int min_num_factors, unsigned int max_num_factors, unsigned int min_dim_size, unsigned int max_dim_size, float density=.1f, bool per_row=true
TransformHelper<FPP,Cpu>* th = TransformHelper<FPP,Cpu>::randFaust(DENSE, 12, 12, 2048, 2048, .5);
TransformHelper<FPP,Cpu>* th = TransformHelper<FPP,Cpu>::randFaust(mat_type, 12, 12, 1024, 1024, .05);
// th->save_mat_file("test_faust.mat");
// TransformHelper<FPP,Cpu>* th = new TransformHelper<FPP, Cpu>();
// th->read_from_mat_file("test_faust.mat");
th->display();
MatDense<FPP,Cpu>* M = MatDense<FPP,Cpu>::randMat(2048,2048);
cout << "F norm:" << th->normFro() << endl;
MatDense<FPP,Cpu>* M = MatDense<FPP,Cpu>::randMat(1024,1024);
// MatDense<FPP, Cpu>* M = new MatDense<FPP, Cpu>(6, 6);
// M->setOnes();
// *M *= 2;
auto start = std::chrono::system_clock::now();
auto FM = th->multiply(*M);
auto end = std::chrono::system_clock::now();
......@@ -78,14 +74,6 @@ int main(int argc, char* argv[])
std::cout << "Eigen Time "<< diff.count() << " s\n";
FM.Display();
cout << "FM norm: " << FM.norm() << endl;
th->set_FM_mul_mode(OMP_PROD_PAR_REDUC);
start = std::chrono::system_clock::now();
auto FM2 = th->multiply(*M);
end = std::chrono::system_clock::now();
diff = end-start;
std::cout << "OMP Time: "<< diff.count() << " s\n";
FM2.Display();
cout << "FM2 norm: " << FM2.norm() << endl;
th->set_FM_mul_mode(CPP_PROD_PAR_REDUC);
start = std::chrono::system_clock::now();
auto FM3 = th->multiply(*M);
......@@ -94,6 +82,48 @@ int main(int argc, char* argv[])
std::cout << "C++ threads Time : "<< diff.count() << " s\n";
FM3.Display();
cout << "FM3 norm: " << FM3.norm() << endl;
MatDense<FPP, Cpu> FM3_err = FM3;
FM3_err -= FM;
cout << "FM3 rel err:" << FM3_err.norm() / FM.norm() << endl;
assert(FM3_err.norm() / FM.norm() < 1e-6);
th->set_FM_mul_mode(OMP_PROD_PAR_REDUC);
start = std::chrono::system_clock::now();
auto FM2 = th->multiply(*M);
end = std::chrono::system_clock::now();
diff = end-start;
std::cout << "OMP Time: "<< diff.count() << " s\n";
FM2.Display();
cout << "FM2 norm: " << FM2.norm() << endl;
MatDense<FPP, Cpu> FM2_err = FM2;
FM2_err -= FM;
cout << "FM2 rel err:" << FM2_err.norm() / FM.norm() << endl;
assert(FM2_err.norm() / FM.norm() < 1e-6);
delete M;
delete th;
}
int main(int argc, char* argv[])
{
if (typeid(FPP) == typeid(double))
{
cout<<"floating point precision == double"<<endl;
}
if (typeid(FPP) == typeid(float))
{
cout<<"floating point precision == float"<<endl;
}
if(argc > 1)
{
std::cout << "WARNING: Eigen is monothreaded." << std::endl;
Eigen::setNbThreads(1);
}
test_prod(DENSE);
//test_prod(SPARSE); //TODO: debug
return 0;
}
......@@ -418,99 +418,130 @@ void Faust::multiply_order_opt(const int mode, std::vector<Faust::MatGeneric<FPP
template<typename FPP, FDevice DEVICE>
Faust::MatDense<FPP,DEVICE> Faust::multiply_omp(const std::vector<Faust::MatGeneric<FPP,DEVICE>*> data, const Faust::MatDense<FPP,DEVICE> A, const char opThis)
{
// TODO: refactoring with macros and with multiply_par(_run)
Faust::MatDense<FPP, DEVICE> *M = nullptr;
#ifdef OMP_ENABLED
// until this this method is disabled at compilation unless we manually define the constant in CFLAGS (for example).
int nth, start_nth, thid, num_per_th, data_size;
Faust::MatDense<FPP,DEVICE>* mats[8];
int thid, num_per_th, data_size;
std::vector<Faust::MatDense<FPP,DEVICE>*> mats;
std::vector<Faust::MatGeneric<FPP, DEVICE> *> _data(data.size()+1);
Faust::MatSparse<FPP, DEVICE> * sM;
Faust::MatDense<FPP,DEVICE>* tmp; // (_data[end_id-1]);
Faust::MatSparse<FPP, DEVICE> * sM = nullptr;
Faust::MatDense<FPP,DEVICE>* tmp = nullptr; // (_data[end_id-1]);
// std::mutex m;
int i = 0;
for(auto ptr: data)
{
// std::cout << "i=" << i << " _data[i]=" << ptr << endl;
_data[i++] = ptr;
}
_data[i] = const_cast<Faust::MatDense<FPP,DEVICE>*>(&A);
#pragma omp parallel private(nth, thid, tmp, num_per_th, data_size, start_nth)
_data[i] = const_cast<Faust::MatDense<FPP,DEVICE>*>(&A); // harmless, A won't be modified
#pragma omp parallel private(thid, tmp, num_per_th, data_size, i)
{
data_size = data.size()+1; // _data + 1
start_nth = omp_get_num_threads()<=data_size>>1?omp_get_num_threads():data_size>>1;
// assert start_nth is even ?
nth = start_nth;
data_size = data.size()+1; // _data.size()
// outside of the OMP block omp_get_num_threads strangely returns 1
int nth = omp_get_num_threads();
int start_nth = nth;
thid = omp_get_thread_num();
if (thid == 0)
{
mats.resize(nth);
for(int i=0; i < mats.size(); i++)
mats[i] = nullptr;
}
#pragma omp barrier
num_per_th = data_size;
num_per_th = num_per_th%2?num_per_th-1:num_per_th;
num_per_th /= nth;
if (num_per_th * nth < data_size)
num_per_th += 1;
num_per_th = num_per_th<2?2:num_per_th;
// std::cout << "omp th: " << nth << " num_per_th: " << num_per_th << std::endl;
thid = omp_get_thread_num();
// std::cout << "num_per_th: " << num_per_th << std::endl;
while(num_per_th > 1)
while(nth > 0)
{
int first_id, end_id, id;
first_id = num_per_th*thid;
if(nth == start_nth && thid == nth - 1)
end_id = data_size;
else
end_id = num_per_th*(thid+1);
if(first_id < data_size)
if(thid < nth)
{
// {
// std::lock_guard<std::mutex> guard(m);
// std::cout << "thread id: " << thid << " first_id: " << first_id << " end_id: " << end_id << std::endl;
// }
id = 'N' == opThis? end_id-1: first_id;
if(sM = dynamic_cast<Faust::MatSparse<FPP,DEVICE>*>(_data[id]))
tmp = new Faust::MatDense<FPP,DEVICE>(*sM);
else
tmp = new Faust::MatDense<FPP,DEVICE>(*(Faust::MatDense<FPP,DEVICE>*)(_data[id]));
mats[thid] = tmp;
// std::cout << "thid=" << thid << "mats[thid]=" << mats[thid] << "tmp=" << tmp << endl;
if(opThis == 'N')
for(int i=end_id-2;i>=first_id; i--)
int first_id = 0, end_id = 0, id;
first_id = num_per_th * thid;
end_id = num_per_th * (thid + 1);
end_id = end_id <= data_size? end_id : data_size;
if(first_id < data_size)
{
if (first_id < end_id)
{
// std::cout << "mul:" << _data[i] << endl;
if(sM = dynamic_cast<Faust::MatSparse<FPP,DEVICE>*>(_data[i]))
sM->multiply(*mats[thid], opThis);
// {
// std::lock_guard<std::mutex> guard(m);
// std::cout << "thread id: " << thid << " first_id: " << first_id << " end_id: " << end_id << std::endl;
// }
id = 'N' == opThis? end_id-1: first_id;
if(sM = dynamic_cast<Faust::MatSparse<FPP,DEVICE>*>(_data[id]))
tmp = new Faust::MatDense<FPP,DEVICE>(*sM);
else
tmp = new Faust::MatDense<FPP,DEVICE>(*(Faust::MatDense<FPP,DEVICE>*)(_data[id]));
if(opThis == 'N')
for(int i=end_id-2;i>=first_id; i--)
{
if(sM = dynamic_cast<Faust::MatSparse<FPP,DEVICE>*>(_data[i]))
sM->multiply(*tmp, opThis);
else
dynamic_cast<Faust::MatDense<FPP,DEVICE>*>(_data[i])->multiply(*tmp, opThis);
}
else
dynamic_cast<Faust::MatDense<FPP,DEVICE>*>(_data[i])->multiply(*mats[thid], opThis);
for(int i=first_id+1;i < end_id; i++)
{
if(sM = dynamic_cast<Faust::MatSparse<FPP,DEVICE>*>(_data[i]))
sM->multiply(*tmp,opThis);
else
dynamic_cast<Faust::MatDense<FPP,DEVICE>*>(_data[i])->multiply(*tmp, opThis);
}
mats[thid] = tmp;
}
else
for(int i=first_id+1;i < end_id; i++)
if(sM = dynamic_cast<Faust::MatSparse<FPP,DEVICE>*>(_data[i]))
sM->multiply(*mats[thid],opThis);
else
{
if(sM = dynamic_cast<Faust::MatSparse<FPP,DEVICE>*>(_data[first_id]))
mats[thid] = new Faust::MatDense<FPP,DEVICE>(*sM);
else
dynamic_cast<Faust::MatDense<FPP,DEVICE>*>(_data[i])->multiply(*mats[thid], opThis);
_data[thid] = mats[thid];
// mats[thid]->Display();
// std::cout << mats[thid]->norm() << endl;
// std::cout << "thid=" << thid << "mats[thid]=" << mats[thid] << "_data[thid]=" << data[thid] << endl;
data_size = nth;
mats[thid] = new Faust::MatDense<FPP,DEVICE>(*(Faust::MatDense<FPP,DEVICE>*)(_data[first_id]));
}
// std::cout << "nth: " << nth << " thid :" << thid << " first_id:" << first_id << " end_id:" << end_id << std::endl << mats[thid]->to_string(false, true) << std::endl;
}
}
if(nth > 1)
num_per_th = nth/(nth>>1);
num_per_th = 2; //nth/(nth>>1); // num_per_th can be > 2 only on first level of reduction
else
num_per_th = 1;
nth >>= 1;
#pragma omp barrier
if(thid == 0)
{
for(int i=0;i<data_size;i++)
{
if(start_nth != nth)
delete _data[i];
_data[i] = mats[i];
}
}
#pragma omp barrier
nth >>= 1;
data_size = (int) ceil(((double)data_size)/num_per_th);
}
// if(thid == 0) M = dynamic_cast<Faust::MatDense<FPP,DEVICE>*>(_data[0]);
}
M = dynamic_cast<Faust::MatDense<FPP,DEVICE>*>(mats[0]);
// std::cout << M << endl;
MatDense<FPP,DEVICE> M_;
if(! M)
MatDense<FPP, Cpu> M_;
if(M)
{
M_ = *M;
delete M;
}
else
{
M_ = Faust::MatDense<FPP,DEVICE>(*dynamic_cast<Faust::MatSparse<FPP,DEVICE>*>(mats[0]));
M = &M_;
delete mats[0];
}
//TODO: delete other thread mats
//TODO: return a ptr instead of a copy
return M_;
#else
throw std::runtime_error("It's not possible to call Faust::multiply_omp because the library hasn't been compiled with this function enabled.");
throw std::runtime_error("It's not possible to call Faust::multiply_omp because the FAµST library hasn't been compiled with this function enabled.");
#endif
return *M;
}
......@@ -519,24 +550,21 @@ template<typename FPP, FDevice DEVICE>
Faust::MatDense<FPP,DEVICE> Faust::multiply_par(const std::vector<Faust::MatGeneric<FPP,DEVICE>*>& data, const Faust::MatDense<FPP,DEVICE> A, const char opThis)
{
#ifdef OMP_ENABLED // this function doesn't use OpenMP but C++ threads are implemented using POSIX threads on Linux gcc as OpenMP, so for FAµST we assume it's the same
int nth = std::thread::hardware_concurrency(); // https://en.cppreference.com/w/cpp/thread/thread/hardware_concurrency
int barrier_count = nth;
Faust::MatDense<FPP, DEVICE> *M;
std::vector<std::thread*> threads;
std::vector<Faust::MatDense<FPP,DEVICE>*> mats(nth);
std::vector<Faust::MatDense<FPP,DEVICE>*> mats(nth, nullptr);
std::vector<Faust::MatGeneric<FPP, DEVICE>*> _data(data.size()+1);
std::mutex barrier_mutex;
std::condition_variable barrier_cv;
int data_size = data.size()+1; // counting A in addition to Transform factors
int num_per_th = data_size;
num_per_th /= nth;
if (num_per_th * nth < data_size)
num_per_th += 1;
num_per_th = num_per_th<2?2:num_per_th;
// recompute effective needed nth
nth = data_size / num_per_th;
nth = nth%2?nth-1:nth; // nth must be even
nth = nth == 0?1:nth;
// std::cout << "nth=" << nth << endl;
barrier_count = nth;
int i = 0;
if(opThis != 'N')
......@@ -548,131 +576,146 @@ Faust::MatDense<FPP,DEVICE> Faust::multiply_par(const std::vector<Faust::MatGene
_data[data_size-1] = const_cast<Faust::MatDense<FPP,DEVICE>*>(&A);
for(auto ptr: data)
{
// std::cout << "fac i=" << i << ": " << ptr << endl;
if(opThis == 'N')
_data[i++] = ptr;
else
_data[i--] = ptr;
}
// std::cout << "A ptr: " << &A << endl;
// std::cout << A.norm() << endl;
std::thread *th;
for(i=0; i < nth; i++)
{
// th = new std::thread(&Faust::Transform<FPP,DEVICE>::multiply_par_run, nth, i, num_per_th, data_size, opThis, _data, mats, threads);
th = new std::thread(Faust::multiply_par_run<FPP, Cpu>, nth, i, num_per_th, data_size, opThis, std::ref(_data), std::ref(mats), std::ref(threads), std::ref(barrier_mutex), std::ref(barrier_cv), std::ref(barrier_count));
threads.push_back(th);
}
for(auto t: threads)
if(t->get_id() != std::this_thread::get_id() )
if(t->get_id() != std::this_thread::get_id())
{
t->join();
delete t;
}
M = dynamic_cast<Faust::MatDense<FPP,DEVICE>*>(mats[0]);
// std::cout << "M:" << M << endl;
MatDense<FPP,DEVICE> M_;
if(! M)
MatDense<FPP, Cpu> M_;
if(M)
{
M_ = *M;
delete M;
}
else
{
M_ = Faust::MatDense<FPP,DEVICE>(*dynamic_cast<Faust::MatSparse<FPP,DEVICE>*>(mats[0]));
M = &M_;
delete mats[0];
}
//TODO: delete other thread mats
//TODO: return a ptr instead of a copy
//TODO: delete threads
return *M;
#else
throw std::runtime_error("It's not possible to call Faust::multiply_par because the library hasn't been compiled with this function enabled.");
#endif
return M_;
}
template<typename FPP, FDevice DEVICE>
void Faust::multiply_par_run(int nth, int thid, int num_per_th, int data_size, char opThis, std::vector<Faust::MatGeneric<FPP, DEVICE>*>& data, std::vector<Faust::MatDense<FPP,DEVICE>*>& mats, std::vector<std::thread*>& threads, std::mutex & barrier_mutex, std::condition_variable & barrier_cv, int & barrier_count)
{
Faust::MatSparse<FPP, DEVICE> * sM;
Faust::MatDense<FPP, DEVICE> *M;
Faust::MatDense<FPP,DEVICE>* tmp = nullptr, *tmp2 = nullptr; // (data[end_id-1]);
// std::cout << "num_per_th: " << num_per_th << std::endl;
Faust::MatSparse<FPP, DEVICE> * sM = nullptr;
Faust::MatDense<FPP, DEVICE> *M = nullptr;
Faust::MatDense<FPP,DEVICE>* tmp = nullptr;
int start_nth = nth;
while(num_per_th > 1)
num_per_th = data_size;
num_per_th /= nth;
if (num_per_th * nth < data_size)
num_per_th += 1;
num_per_th = num_per_th<2?2:num_per_th;
while(nth > 0)
{
if(thid < nth)
{
int first_id, end_id, id;
first_id = num_per_th*thid;
end_id = num_per_th*(thid+1);
if(thid == start_nth-1 && end_id < data_size)
end_id = data_size;
else
end_id = end_id>data_size?data_size:end_id;
id = 'N' == opThis? end_id-1: first_id;
// std::cout << "thid=" << thid << " end orig adr.:" << data[id] << endl;
if(tmp != nullptr) tmp2 = tmp; //keep track of previous tmp to delete it afterward
if((sM = dynamic_cast<Faust::MatSparse<FPP,DEVICE>*>(data[id])))
tmp = new Faust::MatDense<FPP,DEVICE>(*sM);
else
tmp = new Faust::MatDense<FPP,DEVICE>(*(Faust::MatDense<FPP,DEVICE>*)(data[id]));
mats[thid] = tmp;
// std::cout << "thid=" << thid << "mats[thid]=" << mats[thid] << "tmp=" << tmp << endl;
// std::cout << "tmp.norm()" << tmp->norm() << endl;
if(opThis == 'N')
for(int i=end_id-2;i>=first_id; i--)
int first_id = 0, end_id = 0, id;
first_id = num_per_th * thid;
end_id = num_per_th * (thid + 1);
end_id = end_id <= data_size? end_id : data_size;
if(first_id < data_size)
{
if (first_id < end_id)
{
// std::cout << "mul:" << data[i] << " thid: " << thid << endl;
if((sM = dynamic_cast<Faust::MatSparse<FPP,DEVICE>*>(data[i])))
sM->multiply(*mats[thid], opThis);
id = 'N' == opThis? end_id-1: first_id;
if((sM = dynamic_cast<Faust::MatSparse<FPP,DEVICE>*>(data[id])))
tmp = new Faust::MatDense<FPP,DEVICE>(*sM);
else
tmp = new Faust::MatDense<FPP,DEVICE>(*(Faust::MatDense<FPP,DEVICE>*)(data[id]));
if(opThis == 'N')
for(int i=end_id-2;i>=first_id; i--)
{
// std::cout << "mul:" << data[i] << " thid: " << thid << endl;
if((sM = dynamic_cast<Faust::MatSparse<FPP,DEVICE>*>(data[i])))
sM->multiply(*tmp, opThis);
else
dynamic_cast<Faust::MatDense<FPP,DEVICE>*>(data[i])->multiply(*tmp, opThis);
}
else
dynamic_cast<Faust::MatDense<FPP,DEVICE>*>(data[i])->multiply(*mats[thid], opThis);
for(int i=first_id+1;i < end_id; i++)
if((sM = dynamic_cast<Faust::MatSparse<FPP,DEVICE>*>(data[i])))
sM->multiply(*tmp,opThis);
else
dynamic_cast<Faust::MatDense<FPP,DEVICE>*>(data[i])->multiply(*tmp, opThis);
if(mats[thid] != nullptr)
{
mats[thid] = nullptr;
}
mats[thid] = tmp;
}
else
for(int i=first_id+1;i < end_id; i++)
if((sM = dynamic_cast<Faust::MatSparse<FPP,DEVICE>*>(data[i])))
sM->multiply(*mats[thid],opThis);
else
{
if(mats[thid] != nullptr)
{
mats[thid] = nullptr;
}
if(sM = dynamic_cast<Faust::MatSparse<FPP,DEVICE>*>(data[first_id]))
mats[thid] = new Faust::MatDense<FPP,DEVICE>(*sM);
else
dynamic_cast<Faust::MatDense<FPP,DEVICE>*>(data[i])->multiply(*mats[thid], opThis);
// mats[thid]->Display();
// std::cout << mats[thid]->norm() << endl;
// std::cout << "thid=" << thid << "mats[thid]=" << mats[thid] << "data[thid]=" << data[thid] << endl;
if(tmp2 != nullptr) delete tmp2;
}
mats[thid] = new Faust::MatDense<FPP,DEVICE>(*(Faust::MatDense<FPP,DEVICE>*)(data[first_id]));
}
#define barrier() \
{ \
std::unique_lock<std::mutex> l1(barrier_mutex); /* mutex block */ \
barrier_count--;\
/* string ite_str = i>=0?string(" (ite = ") + to_string(i) + ") ": "";*/ \
if(barrier_count > 0)\
{\
/* the threads wait here for the last one to finish its iteration i*/ \
/* std::cout << "thread " << thid << ite_str << " is waiting on barrier (" << barrier_count << ")." << std::endl;*/ \
barrier_cv.wait(l1/*, [&barrier_count, &num_threads]{return barrier_count == num_threads;}*/);\
/* std::cout << "thread " << thid << ite_str <<" continues." << std::endl;*/\
}\
else {\
/* the last thread to finish the iteration i wakes the other to continue the execution */ \
/* std::cout << "thread " << thid << ite_str << " reinits barrier." << std::endl;*/\
barrier_count = start_nth;\
barrier_cv.notify_all();\
}\
}
barrier();
if(thid < nth)
{
data[thid] = mats[thid];
{ \
std::unique_lock<std::mutex> l1(barrier_mutex); /* mutex block */ \
barrier_count--;\
/* string ite_str = i>=0?string(" (ite = ") + to_string(i) + ") ": "";*/ \
if(barrier_count > 0)\
{\
/* the threads wait here for the last one to finish its iteration i*/ \
/* std::cout << "thread " << thid << ite_str << " is waiting on barrier (" << barrier_count << ")." << std::endl;*/ \
barrier_cv.wait(l1/*, [&barrier_count, &num_threads]{return barrier_count == num_threads;}*/);\
/* std::cout << "thread " << thid << ite_str <<" continues." << std::endl;*/\
}\
else {\
/* the last thread to finish the iteration i wakes the other to continue the execution */ \
/* std::cout << "thread " << thid << ite_str << " reinits barrier." << std::endl;*/\
barrier_count = start_nth;\
barrier_cv.notify_all();\
}\
}
// std::cout << "nth: " << nth << " thid :" << thid << " first_id:" << first_id << " end_id:" << end_id << std::endl << mats[thid]->to_string(false, true) << std::endl;
}
}
barrier();
if(nth > 1)
{
num_per_th = 2;
}
else
num_per_th = 1;
barrier();
if(thid == 0)
{
for(int i=0;i<data_size;i++)
{
if(start_nth != nth)
delete data[i];
data[i] = mats[i];
}
}
barrier();
nth >>= 1;
// for(auto t: threads)
// if(t->get_id() != std::this_thread::get_id() )
// t->join();
// std::cout <<"num_per_th: " << num_per_th << " thid:" << thid << endl;
data_size = (int) ceil(((double) data_size)/num_per_th);
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment