Commit 637e8b24 authored by Antoine Jego's avatar Antoine Jego
Browse files

add handle pruning and check (though it is tricky to make it work with handle pruning

parent 44b11fcf
......@@ -31,23 +31,23 @@ Matrix* alloc_matrix(int mb, int nb, int b, int p, int q)
return X;
}
void free_matrix(Matrix* X, int mb, int nb)
void free_matrix(Matrix* X)
{
starpu_mpi_comm_rank(MPI_COMM_WORLD, &comm_rank);
int i,j;
for (i = 0; i<mb; i++)
for (i = 0; i<X->mb; i++)
{
for (j= 0; j<nb; j++)
for (j= 0; j<X->nb; j++)
{
if (X->blocks[i*nb+j].owner == comm_rank)
free(X->blocks[i*nb+j].c);
if (X->blocks[i*X->nb+j].owner == comm_rank)
free(X->blocks[i*X->nb+j].c);
}
}
free(X->blocks);
free(X);
}
void register_matrix(Matrix* X, starpu_data_handle_t* X_h, starpu_mpi_tag_t *tag, int mb, int nb, int datatype, int prune_handles, int p, int q, int row, int col)
void register_matrix(Matrix* X, starpu_data_handle_t* X_h, starpu_mpi_tag_t *tag, int mb, int nb, int datatype, int prune_handles, int p, int q, int row, int col, int check)
{
starpu_mpi_comm_rank(MPI_COMM_WORLD, &comm_rank);
int proc_row, proc_col;
......@@ -62,7 +62,7 @@ void register_matrix(Matrix* X, starpu_data_handle_t* X_h, starpu_mpi_tag_t *tag
for (b_col = 0; b_col < nb; b_col++)
{
Xij = & X->blocks[b_row*nb+b_col];
//printf("[%d] X_%d,%d | tag:%d\n",comm_rank,b_row,b_col,*tag + b_row*nb + b_col);
// printf("[%d] X_%d,%d | tag:%d\n",comm_rank,b_row,b_col,*tag + b_row*nb + b_col);
if (Xij->owner == comm_rank)
{
if (datatype) {
......@@ -74,7 +74,9 @@ void register_matrix(Matrix* X, starpu_data_handle_t* X_h, starpu_mpi_tag_t *tag
}
starpu_mpi_data_register(X_h[b_row*nb+b_col], (*tag + b_row*nb + b_col), Xij->owner);
} else if (!prune_handles || (row && proc_row == b_row % p) ||
(col && proc_col == b_col % q) ) {
(col && proc_col == b_col % q) ||
(check && Xij->owner == 0) ||
(check && comm_rank == 0) ) {
if (datatype) {
starpu_tile_register( &X_h[b_row*nb+b_col], -1, Xij );
} else {
......@@ -84,7 +86,7 @@ void register_matrix(Matrix* X, starpu_data_handle_t* X_h, starpu_mpi_tag_t *tag
}
starpu_mpi_data_register(X_h[b_row*nb+b_col], (*tag + b_row*nb + b_col), Xij->owner);
} else {
// printf("[%d] pruned X_%d,%d\n",comm_rank,b_row,b_col);
// printf("[%d] pruned X_%d,%d\n",comm_rank,b_row,b_col);
}
}
}
......@@ -94,16 +96,39 @@ void register_matrix(Matrix* X, starpu_data_handle_t* X_h, starpu_mpi_tag_t *tag
void unregister_matrix(Matrix* X, starpu_data_handle_t* X_h, int mb, int nb)
{
starpu_mpi_comm_rank(MPI_COMM_WORLD, &comm_rank);
// printf("[%d]unregistering %dx%d matrix\n", comm_rank, mb, nb);
int b_row,b_col;
for (b_row = 0; b_row < mb; b_row++)
{
for (b_col = 0; b_col < nb; b_col++)
{
// assuming we flush, we do not need to unregister everywhere
if (X->blocks[b_row*nb+b_col].owner == comm_rank)
if (X->blocks[b_row*nb+b_col].owner == comm_rank) {
// printf("[%d] unregistering X_%d,%d\n", comm_rank, b_row, b_col);
starpu_data_unregister(X_h[b_row*nb+b_col]);
}
}
}
free(X_h);
}
void print_block(Block* X, int b, int i, int j, char* name) {
starpu_mpi_comm_rank(MPI_COMM_WORLD, &comm_rank);
int b_row,b_col;
for (b_row = 0; b_row < X->m; b_row++) {
for (b_col = 0; b_col < X->n; b_col++) {
printf("[%d] %s_%d,%d (%d,%d) = %f\n",comm_rank, name, i,j,b_row,b_col, X->c[b_row*X->n + b_col]);
}
}
}
void print_matrix(Matrix* X, char* name) {
starpu_mpi_comm_rank(MPI_COMM_WORLD, &comm_rank);
int i,j;
for (i = 0 ; i < X->mb ; i++) {
for (j = 0 ; j < X->nb ; j++) {
if (comm_rank == X->blocks[i*X->nb+j].owner)
print_block(&X->blocks[i*X->nb + j], X->b, i, j, name);
}
}
}
......@@ -15,9 +15,8 @@ typedef struct Matrices
} Matrix;
Matrix* alloc_matrix(int mb, int nb, int b, int p, int q);
void free_matrix(Matrix* X, int mb, int nb);
void register_matrix(Matrix* X, starpu_data_handle_t* X_h, starpu_mpi_tag_t *tag, int mb, int nb, int datatype, int prune_handles, int p, int q, int row, int col);
void free_matrix(Matrix* X);
void register_matrix(Matrix* X, starpu_data_handle_t* X_h, starpu_mpi_tag_t *tag, int mb, int nb, int datatype, int prune_handles, int p, int q, int row, int col,int check);
void unregister_matrix(Matrix* X, starpu_data_handle_t* X_h, int mb, int nb);
void print_matrix(Matrix* X, char* name);
#endif
......@@ -142,6 +142,7 @@ static int BS = 512; /* Block size */
static int P = 2; /* height of the grid */
static int Q = 2; /* width of the grid */
static int T = 1; /* number of runs */
static int check = 0;
static int trace = STARPU_EXAMPLE_DGEMM_TRACE; /* whether to trace */
static int datatype =
STARPU_EXAMPLE_DGEMM_OWNDATATYPE; /* whether to register our own datatype */
......@@ -182,9 +183,9 @@ static void alloc_matrices(void)
static void free_matrices(void)
{
if (verbose) printf( "[%d] Freeing matrices\n", comm_rank);
free_matrix(A,MB,KB);
free_matrix(B,KB,NB);
free_matrix(C,MB,NB);
free_matrix(A);
free_matrix(B);
free_matrix(C);
}
starpu_mpi_tag_t tag = 0;
......@@ -199,9 +200,15 @@ static void register_matrices(int prune_handles)
starpu_tile_interface_register();
}
register_matrix(A,A_h,&tag,MB,KB,datatype,prune_handles,P,Q,1,0);
register_matrix(B,B_h,&tag,KB,NB,datatype,prune_handles,P,Q,0,1);
register_matrix(C,C_h,&tag,MB,NB,datatype,prune_handles,P,Q,1,1);
register_matrix(A,A_h,&tag,MB,KB,datatype,prune_handles,P,Q,1,0,check);
register_matrix(B,B_h,&tag,KB,NB,datatype,prune_handles,P,Q,0,1,check);
//register_matrix(C,C_h,&tag,MB,NB,datatype,prune_handles,P,Q,0,0);
// FIXME :
// the previous one seems logical because we do not need to know
// about blocks of C we do not contribute to, however startPU seems to be
// pending on task_insertion if we do not know about blocks on our row/column even if we do not contribute to them)
// - This could happen because we are seeing a Write on a NULL handle and StarPU is waiting (for what ?)
register_matrix(C,C_h,&tag,MB,NB,datatype,prune_handles,P,Q,1,1,check);
}
/* Unregister matrices from the StarPU management. */
......@@ -265,32 +272,93 @@ static void cpu_gemm(void *handles[], void *args)
ld_B, clargs->beta, block_C, ld_C ); // 13
}
int iseed[4];
int iseed[4] = { 1,1,1,1 };
static void cpu_fill(void *handles[], void *arg)
{
(void)arg;
double *block_A = (double *)STARPU_MATRIX_GET_PTR(handles[0]);
double *block_A;
unsigned n_col_A, n_row_A, ld_A;
if (datatype) {
Block* tile = ti_interface_get(handles[0]);
block_A = tile->c;
n_col_A = tile->n;
n_row_A = tile->m;
ld_A = tile->ld;
} else {
block_A = (double *)STARPU_MATRIX_GET_PTR(handles[0]);
n_col_A = STARPU_MATRIX_GET_NX(handles[0]);
n_row_A = STARPU_MATRIX_GET_NY(handles[0]);
ld_A = STARPU_MATRIX_GET_LD(handles[0]);
}
int i,j;
//if (verbose) printf("fill_task\n");
for (i=0;i<n_row_A;i++)
{
LAPACKE_dlarnv(1,iseed,n_col_A,block_A + i*BS);
LAPACKE_dlarnv(1,iseed,n_col_A,&block_A[i*BS]);
}
}
static void cpu_copy(void *handles[], void *arg)
{
(void)arg;
// FIXME
double *block_A = (double *)STARPU_MATRIX_GET_PTR(handles[0]);
double *block_B = (double *)STARPU_MATRIX_GET_PTR(handles[1]);
unsigned n_col_A, n_row_A;
if (datatype) {
Block* tile = ti_interface_get(handles[0]);
n_col_A = tile->n;
n_row_A = tile->m;
} else {
n_col_A = STARPU_MATRIX_GET_NX(handles[0]);
n_row_A = STARPU_MATRIX_GET_NY(handles[0]);
}
int i,j;
//if (verbose) printf("fill_task\n");
for (i = 0; i < n_row_A; i++) {
for (j = 0; j < n_col_A; j++) {
block_A[i*n_col_A + j] = block_B[i*n_col_A + j];
}
}
}
static void cpu_nrm2_comp(void *handles[], void *arg)
{
// printf("nrm task\n");
(void)arg;
// FIXME
double *block_A = (double *)STARPU_MATRIX_GET_PTR(handles[0]);
double *block_B = (double *)STARPU_MATRIX_GET_PTR(handles[1]);
unsigned n_col_A, n_row_A;
if (datatype) {
Block* tile = ti_interface_get(handles[0]);
n_col_A = tile->n;
n_row_A = tile->m;
} else {
n_col_A = STARPU_MATRIX_GET_NX(handles[0]);
n_row_A = STARPU_MATRIX_GET_NY(handles[0]);
}
int i,j;
//if (verbose) printf("fill_task\n");
double local_ssq = 0.0;
double tmp;
for (i = 0; i < n_row_A; i++) {
for (j = 0; j < n_col_A; j++) {
tmp = (block_A[i*n_col_A + j] - block_B[i*n_col_A + j]);
local_ssq += tmp*tmp;
}
}
i = n_row_A - 1;
j = n_col_A - 1;
printf("Some block diff : %f (%d,%d: %f - %f)\n", local_ssq, i,j, block_A[i*n_col_A+j], block_B[i*n_col_A+j]);
//printf("Some block diff : %f \n", local_ssq);
}
/* Define a StarPU 'codelet' structure for the matrix multiply kernel above.
* This structure enable specifying multiple implementations for the kernel (such as CUDA or OpenCL versions)
*/
......@@ -310,6 +378,23 @@ static struct starpu_codelet fill_cl =
.name = "fill" /* to display task name in traces */
};
static struct starpu_codelet copy_cl =
{
.cpu_funcs = {cpu_copy}, /* cpu implementation(s) of the routine */
.nbuffers = 2, /* number of data handles referenced by this routine */
.modes = {STARPU_W,STARPU_R},
.name = "copy" /* to display task name in traces */
};
// TODO : redux ssq
static struct starpu_codelet nrm_cl =
{
.cpu_funcs = {cpu_nrm2_comp}, /* cpu implementation(s) of the routine */
.nbuffers = 2, /* number of data handles referenced by this routine */
.modes = {STARPU_R,STARPU_R},
.name = "nrm2_comp" /* to display task name in traces */
};
static void init_matrix(Matrix* X, starpu_data_handle_t* X_h, int mb, int nb)
{
int row, col;
......@@ -328,6 +413,25 @@ static void init_matrix(Matrix* X, starpu_data_handle_t* X_h, int mb, int nb)
}
}
static void copy_matrix(Matrix* A, starpu_data_handle_t* A_h, Matrix* B, starpu_data_handle_t* B_h)
{
int row, col;
for (row = 0; row < A->mb; row++)
{
for (col = 0; col < A->nb; col++)
{
if (A->blocks[row*A->nb+col].owner == comm_rank
|| B->blocks[row*A->nb+col].owner == comm_rank)
{
starpu_mpi_task_insert(MPI_COMM_WORLD, &copy_cl,
STARPU_W, A_h[row*A->nb+col],
STARPU_R, B_h[row*A->nb+col], 0);
}
}
}
starpu_mpi_wait_for_all(MPI_COMM_WORLD);
}
static void init_matrices(void)
{
if (verbose) printf( "[%d] Initializing matrices\n", comm_rank);
......@@ -368,7 +472,7 @@ int main(int argc, char *argv[])
BS = arguments.b;
P = arguments.p;
Q = arguments.q;
// check
check = arguments.check;
verbose = arguments.verbose;
trace = arguments.trace;
T = arguments.niter;
......@@ -434,6 +538,7 @@ int main(int argc, char *argv[])
printf("comm_size = %d\n", comm_size);
printf("PxQ = %dx%d\n", P, Q);
if (trace) printf("- Tracing enabled\n");
if (check) printf("- Checking enabled\n");
if (datatype) printf("- MPI datatype enabled\n");
if (mpi_thread > -1) printf("- MPI thread support level : %d\n", provided_mpi_thread);
if (!flush) printf("- Flushing disabled\n");
......@@ -450,6 +555,17 @@ int main(int argc, char *argv[])
register_matrices(prune_handles);
init_matrices();
Matrix* Cwork;
starpu_data_handle_t *Cwh;
if (check) {
Cwh = malloc(MB*NB*sizeof(starpu_data_handle_t));
Cwork = alloc_matrix(MB,NB,BS,1,1);
register_matrix(Cwork,Cwh,&tag,MB,NB,datatype,0,1,1,1,1,1);
copy_matrix(Cwork,Cwh,C,C_h);
starpu_mpi_wait_for_all(MPI_COMM_WORLD);
if (verbose) print_matrix(C,"Cinit");
if (verbose) print_matrix(Cwork,"Cwork");
}
// starpu_data_display_memory_stats();
barrier_ret = starpu_mpi_barrier(MPI_COMM_WORLD);
start = starpu_timing_now();
......@@ -462,7 +578,8 @@ int main(int argc, char *argv[])
for (b_aisle=0;b_aisle<KB;b_aisle++)
{
// this just needs to be clarified
if ((!prune_handles && !prune) || (A->blocks[b_row*KB+b_aisle].owner == comm_rank || B->blocks[b_aisle*NB+b_col].owner == comm_rank || C->blocks[b_row*NB+b_col].owner == comm_rank)) {
//if ((!prune && !prune_handles) || (A->blocks[b_row*KB+b_aisle].owner == comm_rank || B->blocks[b_aisle*NB+b_col].owner == comm_rank || C->blocks[b_row*NB+b_col].owner == comm_rank)) {
if (!prune || (A->blocks[b_row*KB+b_aisle].owner == comm_rank || B->blocks[b_aisle*NB+b_col].owner == comm_rank || C->blocks[b_row*NB+b_col].owner == comm_rank)) {
//printf("[%d] inserting C_%d,%d += A_%d,%d B_%d,%d\n",comm_rank, b_row,b_col, b_row,b_aisle, b_aisle,b_col);
struct cl_zgemm_args_s *clargs = NULL;
if (C->blocks[b_row*NB+b_col].owner == comm_rank) {
......@@ -493,6 +610,65 @@ int main(int argc, char *argv[])
double timing = stop - start;
if (comm_rank==0) printf("RANK %d -> took %f s | %f Gflop/s\n", comm_rank, timing/1000/1000, 2.0*M*N*K/(timing*1000));
if (check) {
Matrix* Acheck, *Bcheck, *Ccheck;
starpu_data_handle_t *Ach,*Bch,*Cch;
Acheck = alloc_matrix(MB,KB,BS,1,1);
Bcheck = alloc_matrix(KB,NB,BS,1,1);
Ccheck = alloc_matrix(MB,NB,BS,1,1);
Ach = malloc(MB*KB*sizeof(starpu_data_handle_t));
Bch = malloc(KB*NB*sizeof(starpu_data_handle_t));
Cch = malloc(MB*NB*sizeof(starpu_data_handle_t));
register_matrix(Acheck,Ach,&tag,MB,KB,datatype,0,1,1,1,1,1);
register_matrix(Bcheck,Bch,&tag,KB,NB,datatype,0,1,1,1,1,1);
register_matrix(Ccheck,Cch,&tag,MB,NB,datatype,0,1,1,1,1,1);
copy_matrix(Acheck,Ach,A,A_h);
if (verbose) print_matrix(A,"A");
if (verbose) print_matrix(Acheck,"Ac");
copy_matrix(Bcheck,Bch,B,B_h);
if (verbose) print_matrix(B,"B");
if (verbose) print_matrix(Bcheck,"Bc");
copy_matrix(Ccheck,Cch,C,C_h);
if (verbose) print_matrix(C,"C");
if (verbose) print_matrix(Ccheck,"Cc");
if (comm_rank == 0) {
int b_row,b_col,b_aisle;
for (b_row = 0; b_row < MB; b_row++) {
for (b_col = 0; b_col < NB; b_col++) {
for (b_aisle=0;b_aisle<KB;b_aisle++) {
struct cl_zgemm_args_s *clargs = NULL;
clargs = malloc(sizeof( struct cl_zgemm_args_s ));
clargs->alpha = alpha;
clargs->beta = b_aisle==0? beta : 1.0;
starpu_mpi_task_insert(MPI_COMM_WORLD, &gemm_cl,
STARPU_CL_ARGS, clargs, sizeof(struct cl_zgemm_args_s),
STARPU_R, Ach[b_row*KB+b_aisle],
STARPU_R, Bch[b_aisle*NB+b_col],
STARPU_RW, Cwh[b_row*NB+b_col], 0);
}
starpu_mpi_task_insert(MPI_COMM_WORLD, &nrm_cl,
STARPU_R, Cch[b_row*NB+b_col],
STARPU_R, Cwh[b_row*NB+b_col], 0);
}
}
starpu_mpi_wait_for_all(MPI_COMM_WORLD);
}
barrier_ret = starpu_mpi_barrier(MPI_COMM_WORLD);
unregister_matrix(Acheck,Ach,MB,KB);
unregister_matrix(Bcheck,Bch,KB,NB);
unregister_matrix(Ccheck,Cch,MB,NB);
unregister_matrix(Cwork,Cwh,MB,NB);
barrier_ret = starpu_mpi_barrier(MPI_COMM_WORLD);
free_matrix(Acheck);
free_matrix(Bcheck);
free_matrix(Ccheck);
free_matrix(Cwork);
barrier_ret = starpu_mpi_barrier(MPI_COMM_WORLD);
}
starpu_mpi_cache_flush_all_data(MPI_COMM_WORLD);
unregister_matrices();
free_matrices();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment