Commit 4da345f2 authored by Nathalie Furmento's avatar Nathalie Furmento
Browse files

Merge branch 'master' into update/auto-heteroprio

parents 86ca6599 878326d2
Pipeline #262667 passed with stages
in 19 minutes and 55 seconds
......@@ -312,17 +312,22 @@ endif
########################
examplebin_PROGRAMS += \
mpi_redux/mpi_redux
mpi_redux/mpi_redux \
mpi_redux/mpi_redux_tree
mpi_redux_mpi_redux_SOURCES = \
mpi_redux/mpi_redux.c
mpi_redux/mpi_redux.c
mpi_redux_mpi_redux_tree_SOURCES = \
mpi_redux/mpi_redux_tree.c
mpi_redux_mpi_redux_LDADD = \
-lm
mpi_redux_mpi_redux_tree_LDADD = \
-lm
if !STARPU_SIMGRID
starpu_mpi_EXAMPLES += \
mpi_redux/mpi_redux
mpi_redux/mpi_redux \
mpi_redux/mpi_redux_tree
endif
##########################################
......@@ -377,20 +382,30 @@ endif
if STARPU_HAVE_MPIFORT
if !STARPU_SANITIZE
examplebin_PROGRAMS += \
native_fortran/nf_mpi_redux
examplebin_PROGRAMS += \
native_fortran/nf_mpi_redux \
native_fortran/nf_mpi_redux_tree
native_fortran_nf_mpi_redux_SOURCES = \
native_fortran_nf_mpi_redux_SOURCES = \
native_fortran/fstarpu_mpi_mod.f90 \
native_fortran/fstarpu_mod.f90 \
native_fortran/nf_mpi_redux.f90
native_fortran_nf_mpi_redux_LDADD = \
native_fortran_nf_mpi_redux_LDADD = \
-lm
native_fortran_nf_mpi_redux_tree_SOURCES = \
native_fortran/fstarpu_mpi_mod.f90 \
native_fortran/fstarpu_mod.f90 \
native_fortran/nf_mpi_redux_tree.f90
native_fortran_nf_mpi_redux_tree_LDADD = \
-lm
if !STARPU_SIMGRID
starpu_mpi_EXAMPLES += \
native_fortran/nf_mpi_redux
native_fortran/nf_mpi_redux \
native_fortran/nf_mpi_redux_tree
endif
endif
endif
......@@ -513,6 +528,7 @@ native_fortran/nf_mm_task_build.o: nf_mm_cl.mod fstarpu_mpi_mod.mod fstarpu_mod.
native_fortran/nf_basic_ring.o: fstarpu_mpi_mod.mod fstarpu_mod.mod
native_fortran/nf_redux_test.o: fstarpu_mpi_mod.mod fstarpu_mod.mod
native_fortran/nf_mpi_redux.o: fstarpu_mpi_mod.mod fstarpu_mod.mod
native_fortran/nf_mpi_redux_tree.o: fstarpu_mpi_mod.mod fstarpu_mod.mod
endif
endif
......
......@@ -92,7 +92,7 @@ static struct starpu_codelet task_red_cl =
{
.cpu_funcs = { cl_cpu_task_red },
.nbuffers = 2,
.modes = { STARPU_RW, STARPU_R },
.modes = { STARPU_RW|STARPU_COMMUTE, STARPU_R },
.name = "task_red"
};
......
/* StarPU --- Runtime system for heterogeneous multicore architectures.
*
* Copyright (C) 2016-2021 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
*
* StarPU is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 2.1 of the License, or (at
* your option) any later version.
*
* StarPU is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
*
* See the GNU Lesser General Public License in COPYING.LGPL for more details.
*/
/*
* This example illustrates how to use the STARPU_MPI_REDUX mode
* and compare it with the standard STARPU_REDUX.
*
* In order to make this comparison salliant, the init codelet is not
* a task that set the handle to a neutral element but rather depends
* on the working node.
* This is not a proper way to use a reduction pattern however it
* can be analogous to the cost/weight of each contribution.
*/
#include <stdlib.h>
#include <stdio.h>
#include <assert.h>
#include <math.h>
#include <starpu.h>
#include <starpu_mpi.h>
#include "helper.h"
#include <unistd.h>
static void cl_cpu_work(void *handles[], void*arg)
{
(void)arg;
double *a = (double *)STARPU_VARIABLE_GET_PTR(handles[0]);
double *b = (double *)STARPU_VARIABLE_GET_PTR(handles[1]);
sleep(2);
printf("work_cl (rank:%d,worker:%d) %f =>",starpu_mpi_world_rank(), starpu_worker_get_id(), *a);
*a = 3.0 + *a + *b;
printf("%f\n",*a);
}
static struct starpu_codelet work_cl =
{
.cpu_funcs = { cl_cpu_work },
.nbuffers = 2,
.modes = { STARPU_REDUX, STARPU_R },
.name = "task_init"
};
static struct starpu_codelet mpi_work_cl =
{
.cpu_funcs = { cl_cpu_work },
.nbuffers = 2,
.modes = { STARPU_RW | STARPU_COMMUTE, STARPU_R },
.name = "task_init-mpi"
};
static void cl_cpu_task_init(void *handles[], void*arg)
{
(void) arg;
double *a = (double *)STARPU_VARIABLE_GET_PTR(handles[0]);
sleep(1);
printf("init_cl (rank:%d,worker:%d) %d (was %f)\n", starpu_mpi_world_rank(), starpu_worker_get_id(), starpu_mpi_world_rank(), *a);
*a = starpu_mpi_world_rank();
}
static struct starpu_codelet task_init_cl =
{
.cpu_funcs = { cl_cpu_task_init },
.nbuffers = 1,
.modes = { STARPU_W },
.name = "task_init"
};
static void cl_cpu_task_red(void *handles[], void*arg)
{
(void) arg;
double *ad = (double *)STARPU_VARIABLE_GET_PTR(handles[0]);
double *as = (double *)STARPU_VARIABLE_GET_PTR(handles[1]);
sleep(2);
printf("red_cl (rank:%d,worker:%d) %f ; %f --> %f\n", starpu_mpi_world_rank(), starpu_worker_get_id(), *as, *ad, *as+*ad);
*ad = *ad + *as;
}
static struct starpu_codelet task_red_cl =
{
.cpu_funcs = { cl_cpu_task_red },
.nbuffers = 2,
.modes = { STARPU_RW|STARPU_COMMUTE, STARPU_R },
.name = "task_red"
};
int main(int argc, char *argv[])
{
int comm_rank, comm_size;
/* Initializes STarPU and the StarPU-MPI layer */
starpu_fxt_autostart_profiling(0);
int ret = starpu_mpi_init_conf(&argc, &argv, 1, MPI_COMM_WORLD, NULL);
STARPU_CHECK_RETURN_VALUE(ret, "starpu_mpi_ini_conft");
int nworkers = starpu_cpu_worker_get_count();
starpu_mpi_comm_size(MPI_COMM_WORLD, &comm_size);
if (comm_size < 2)
{
FPRINTF(stderr, "We need at least 2 nodes.\n");
starpu_mpi_shutdown();
return STARPU_TEST_SKIPPED;
}
starpu_mpi_comm_rank(MPI_COMM_WORLD, &comm_rank);
double a, b[comm_size];
starpu_data_handle_t a_h, b_h[comm_size];
double work_coef = 2;
enum starpu_data_access_mode codelet_mode;
enum starpu_data_access_mode task_mode;
int arity,j,work_node;
starpu_mpi_tag_t tag = 0;
for (arity = 2 ; arity < comm_size ; arity++)
{
starpu_mpi_barrier(MPI_COMM_WORLD);
task_mode = STARPU_MPI_REDUX;
if (comm_rank == 0)
{
a = 1.0;
printf("init a = %f\n", a);
starpu_variable_data_register(&a_h, STARPU_MAIN_RAM, (uintptr_t)&a, sizeof(double));
for (j=0;j<comm_size;j++)
starpu_variable_data_register(&b_h[j], -1, 0, sizeof(double));
}
else
{
b[comm_rank] = 1.0 / (comm_rank + 1.0);
printf("init b_%d = %f\n", comm_rank, b[comm_rank]);
starpu_variable_data_register(&a_h, -1, 0, sizeof(double));
for (j=0;j<comm_size;j++)
{
if (j == comm_rank)
starpu_variable_data_register(&b_h[j], STARPU_MAIN_RAM, (uintptr_t)&b[j], sizeof(double));
else
starpu_variable_data_register(&b_h[j], -1, 0, sizeof(double));
}
}
starpu_mpi_data_register(a_h, tag++, 0);
for (j=0;j<comm_size;j++)
starpu_mpi_data_register(b_h[j], tag++, j);
starpu_data_set_reduction_methods(a_h, &task_red_cl, &task_init_cl);
starpu_fxt_start_profiling();
for (work_node=1; work_node < comm_size;work_node++)
{
for (j=1;j<=work_coef*nworkers;j++)
{
starpu_mpi_task_insert(MPI_COMM_WORLD,
&mpi_work_cl,
task_mode, a_h,
STARPU_R, b_h[work_node],
STARPU_EXECUTE_ON_NODE, work_node,
0);
}
}
starpu_mpi_redux_data_tree(MPI_COMM_WORLD, a_h, arity);
starpu_mpi_wait_for_all(MPI_COMM_WORLD);
starpu_mpi_barrier(MPI_COMM_WORLD);
if (comm_rank == 0)
{
double tmp = 0.0;
for (work_node = 1; work_node < comm_size ; work_node++)
tmp += 1.0 / (work_node + 1.0);
printf("computed result ---> %f expected %f\n", a, 1.0 + (comm_size - 1.0)*(comm_size)/2.0 + work_coef*nworkers*((comm_size-1)*3.0 + tmp));
}
starpu_data_unregister(a_h);
for (work_node=0; work_node < comm_size;work_node++)
starpu_data_unregister(b_h[work_node]);
starpu_mpi_barrier(MPI_COMM_WORLD);
}
starpu_mpi_shutdown();
return 0;
}
......@@ -43,7 +43,7 @@ program nf_mpi_redux
comm_size = fstarpu_mpi_world_size()
if (comm_size.lt.2) then
write(*,'(" ")')
write(*,'("This application is meant to run with at least two nodes.")')
write(*,'("This application is meant to run with at least two nodes (found ",i4," ; i am ",i4,").")') comm_size, comm_w_rank
stop 2
end if
allocate(b(comm_size-1), bhdl(comm_size-1))
......@@ -58,7 +58,7 @@ program nf_mpi_redux
task_red_cl = fstarpu_codelet_allocate()
call fstarpu_codelet_set_name(task_red_cl, namered)
call fstarpu_codelet_add_cpu_func(task_red_cl,C_FUNLOC(cl_cpu_task_red))
call fstarpu_codelet_add_buffer(task_red_cl, FSTARPU_RW)
call fstarpu_codelet_add_buffer(task_red_cl, FSTARPU_RW.ior.FSTARPU_COMMUTE)
call fstarpu_codelet_add_buffer(task_red_cl, FSTARPU_R)
task_ini_cl = fstarpu_codelet_allocate()
......@@ -70,91 +70,90 @@ program nf_mpi_redux
do trial=1,2
if (trial.eq.1) then
write(*,*) "Using STARPU_MPI_REDUX"
codelet_mode = FSTARPU_RW.ior.FSTARPU_COMMUTE
task_mode = FSTARPU_MPI_REDUX
else if (trial.eq.2) then
write(*,*) "Using STARPU_REDUX"
codelet_mode = FSTARPU_REDUX
task_mode = FSTARPU_REDUX
end if
! allocate and fill codelet structs
work_cl = fstarpu_codelet_allocate()
call fstarpu_codelet_set_name(work_cl, name)
call fstarpu_codelet_add_cpu_func(work_cl, C_FUNLOC(cl_cpu_task))
call fstarpu_codelet_add_buffer(work_cl, codelet_mode)
call fstarpu_codelet_add_buffer(work_cl, FSTARPU_R)
err = fstarpu_mpi_barrier(comm_world)
if(comm_w_rank.eq.0) then
write(*,'(" ")')
a = 1.0
write(*,*) "init a = ", a
else
b(comm_w_rank) = 1.0 / (comm_w_rank + 1.0)
write(*,*) "init b_",comm_w_rank,"=", b(comm_w_rank), " AT ", &
c_loc(bhdl(comm_w_rank)) ! This is not really meaningful
end if
err = fstarpu_mpi_barrier(comm_world)
tag = 0
if(comm_w_rank.eq.0) then
call fstarpu_variable_data_register(ahdl, 0, c_loc(a),c_sizeof(a))
do i=1,comm_size-1
call fstarpu_variable_data_register(bhdl(i), -1, c_null_ptr,c_sizeof(b(i)))
end do
else
call fstarpu_variable_data_register(ahdl, -1, c_null_ptr,c_sizeof(a))
if (trial.eq.2) then
write(*,*) "Using STARPU_MPI_REDUX"
codelet_mode = FSTARPU_RW.ior.FSTARPU_COMMUTE
task_mode = FSTARPU_MPI_REDUX
else if (trial.eq.1) then
write(*,*) "Using STARPU_REDUX"
codelet_mode = FSTARPU_REDUX
task_mode = FSTARPU_REDUX
end if
! allocate and fill codelet structs
work_cl = fstarpu_codelet_allocate()
call fstarpu_codelet_set_name(work_cl, name)
call fstarpu_codelet_add_cpu_func(work_cl, C_FUNLOC(cl_cpu_task))
call fstarpu_codelet_add_buffer(work_cl, codelet_mode)
call fstarpu_codelet_add_buffer(work_cl, FSTARPU_R)
err = fstarpu_mpi_barrier(comm_world)
if(comm_w_rank.eq.0) then
write(*,'(" ")')
a = 1.0
write(*,*) "init a = ", a
else
b(comm_w_rank) = 1.0 / (comm_w_rank + 1.0)
write(*,*) "init b_",comm_w_rank,"=", b(comm_w_rank)
end if
err = fstarpu_mpi_barrier(comm_world)
tag = 0
if(comm_w_rank.eq.0) then
call fstarpu_variable_data_register(ahdl, 0, c_loc(a),c_sizeof(a))
do i=1,comm_size-1
call fstarpu_variable_data_register(bhdl(i), -1, c_null_ptr,c_sizeof(b(i)))
end do
else
call fstarpu_variable_data_register(ahdl, -1, c_null_ptr,c_sizeof(a))
do i=1,comm_size-1
if (i.eq.comm_w_rank) then
call fstarpu_variable_data_register(bhdl(i), 0, c_loc(b(i)),c_sizeof(b(i)))
else
call fstarpu_variable_data_register(bhdl(i), -1, c_null_ptr,c_sizeof(b(i)))
end if
end do
end if
call fstarpu_mpi_data_register(ahdl, tag, 0)
do i=1,comm_size-1
if (i.eq.comm_w_rank) then
call fstarpu_variable_data_register(bhdl(i), 0, c_loc(b(i)),c_sizeof(b(i)))
else
call fstarpu_variable_data_register(bhdl(i), -1, c_null_ptr,c_sizeof(b(i)))
end if
call fstarpu_mpi_data_register(bhdl(i), tag+i,i)
end do
end if
call fstarpu_mpi_data_register(ahdl, tag, 0)
do i=1,comm_size-1
call fstarpu_mpi_data_register(bhdl(i), tag+i,i)
end do
tag = tag + comm_size
tag = tag + comm_size
call fstarpu_data_set_reduction_methods(ahdl,task_red_cl,task_ini_cl)
call fstarpu_data_set_reduction_methods(ahdl,task_red_cl,task_ini_cl)
err = fstarpu_mpi_barrier(comm_world)
err = fstarpu_mpi_barrier(comm_world)
call fstarpu_fxt_start_profiling()
do w_node=1,comm_size-1
do i=1,work_coef*nworkers
call fstarpu_mpi_task_insert( (/ c_loc(comm_world), &
work_cl, &
task_mode, ahdl, &
FSTARPU_R, bhdl(w_node), &
FSTARPU_EXECUTE_ON_NODE, c_loc(w_node), &
C_NULL_PTR /))
call fstarpu_fxt_start_profiling()
do w_node=1,comm_size-1
do i=1,work_coef*nworkers
call fstarpu_mpi_task_insert( (/ c_loc(comm_world), &
work_cl, &
task_mode, ahdl, &
FSTARPU_R, bhdl(w_node), &
FSTARPU_EXECUTE_ON_NODE, c_loc(w_node), &
C_NULL_PTR /))
end do
end do
end do
call fstarpu_mpi_redux_data(comm_world, ahdl)
err = fstarpu_mpi_wait_for_all(comm_world)
if(comm_w_rank.eq.0) then
tmp = 0
call fstarpu_mpi_redux_data(comm_world, ahdl)
err = fstarpu_mpi_wait_for_all(comm_world)
if(comm_w_rank.eq.0) then
tmp = 0
do w_node=1,comm_size-1
tmp = tmp + 1.0 / (w_node+1.0)
end do
write(*,*) 'computed result ---> ',a, "expected =",&
1.0 + (comm_size-1.0)*(comm_size)/2.0 + work_coef*nworkers*((comm_size-1.0)*3.0 + tmp)
end if
err = fstarpu_mpi_barrier(comm_world)
call fstarpu_data_unregister(ahdl)
do w_node=1,comm_size-1
tmp = tmp + 1.0 / (w_node+1.0)
call fstarpu_data_unregister(bhdl(w_node))
end do
write(*,*) 'computed result ---> ',a, "expected =",&
1.0 + (comm_size-1.0)*(comm_size)/2.0 + work_coef*nworkers*((comm_size-1.0)*3.0 + tmp)
end if
err = fstarpu_mpi_barrier(comm_world)
call fstarpu_data_unregister(ahdl)
do w_node=1,comm_size-1
call fstarpu_data_unregister(bhdl(w_node))
end do
call fstarpu_codelet_free(work_cl)
call fstarpu_codelet_free(work_cl)
end do
......@@ -166,7 +165,7 @@ c_loc(bhdl(comm_w_rank)) ! This is not really meaningful
err = fstarpu_mpi_shutdown()
call fstarpu_shutdown()
deallocate(b, bhdl)
stop
stop 0
contains
......
! StarPU --- Runtime system for heterogeneous multicore architectures.
!
! Copyright (C) 2016-2021 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
!
! StarPU is free software; you can redistribute it and/or modify
! it under the terms of the GNU Lesser General Public License as published by
! the Free Software Foundation; either version 2.1 of the License, or (at
! your option) any later version.
!
! StarPU is distributed in the hope that it will be useful, but
! WITHOUT ANY WARRANTY; without even the implied warranty of
! MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
!
! See the GNU Lesser General Public License in COPYING.LGPL for more details.
!
program nf_mpi_redux
use iso_c_binding
use fstarpu_mod
use fstarpu_mpi_mod
implicit none
integer, target :: ret, np, i, j, arity
type(c_ptr) :: work_cl, task_rw_cl,task_red_cl, task_ini_cl
character(kind=c_char,len=*), parameter :: name=C_CHAR_"task"//C_NULL_CHAR
character(kind=c_char,len=*), parameter :: namered=C_CHAR_"task_red"//C_NULL_CHAR
character(kind=c_char,len=*), parameter :: nameini=C_CHAR_"task_ini"//C_NULL_CHAR
real(kind(1.d0)), target :: a,tmp
real(kind(1.d0)), target, allocatable :: b(:)
integer(kind=8) :: tag, err
type(c_ptr), target :: ahdl
type(c_ptr), target, allocatable :: bhdl(:)
type(c_ptr) :: task_mode, codelet_mode
integer, target :: comm_world,comm_w_rank, comm_size
integer(c_int), target :: w_node, nworkers, work_coef
!call fstarpu_fxt_autostart_profiling(0)
ret = fstarpu_init(c_null_ptr)
ret = fstarpu_mpi_init(1)
comm_world = fstarpu_mpi_world_comm()
comm_w_rank = fstarpu_mpi_world_rank()
comm_size = fstarpu_mpi_world_size()
allocate(b(comm_size-1), bhdl(comm_size-1))
nworkers = fstarpu_worker_get_count()
if (nworkers.lt.1) then
write(*,'(" ")')
write(*,'("This application is meant to run with at least one worker per node.")')
stop 2
end if
! allocate and reduction codelets
task_red_cl = fstarpu_codelet_allocate()
call fstarpu_codelet_set_name(task_red_cl, namered)
call fstarpu_codelet_add_cpu_func(task_red_cl,C_FUNLOC(cl_cpu_task_red))
call fstarpu_codelet_add_buffer(task_red_cl, FSTARPU_RW.ior.FSTARPU_COMMUTE)
call fstarpu_codelet_add_buffer(task_red_cl, FSTARPU_R)
task_ini_cl = fstarpu_codelet_allocate()
call fstarpu_codelet_set_name(task_ini_cl, nameini)
call fstarpu_codelet_add_cpu_func(task_ini_cl,C_FUNLOC(cl_cpu_task_ini))
call fstarpu_codelet_add_buffer(task_ini_cl, FSTARPU_W)
work_coef=2
codelet_mode = FSTARPU_RW.ior.FSTARPU_COMMUTE
task_mode = FSTARPU_MPI_REDUX
! allocate and fill codelet structs
work_cl = fstarpu_codelet_allocate()
call fstarpu_codelet_set_name(work_cl, name)
call fstarpu_codelet_add_cpu_func(work_cl, C_FUNLOC(cl_cpu_task))
call fstarpu_codelet_add_buffer(work_cl, codelet_mode)
call fstarpu_codelet_add_buffer(work_cl, FSTARPU_R)
err = fstarpu_mpi_barrier(comm_world)
do arity=2,comm_size
if(comm_w_rank.eq.0) then
write(*,'(" ")')
a = 1.0
write(*,*) "init a = ", a
else
b(comm_w_rank) = 1.0 / (comm_w_rank + 1.0)
write(*,*) "init b_",comm_w_rank,"=", b(comm_w_rank)
end if
err = fstarpu_mpi_barrier(comm_world)
tag = 0
if(comm_w_rank.eq.0) then
call fstarpu_variable_data_register(ahdl, 0, c_loc(a),c_sizeof(a))
do i=1,comm_size-1
call fstarpu_variable_data_register(bhdl(i), -1, c_null_ptr,c_sizeof(b(i)))
end do
else
call fstarpu_variable_data_register(ahdl, -1, c_null_ptr,c_sizeof(a))
do i=1,comm_size-1
if (i.eq.comm_w_rank) then
call fstarpu_variable_data_register(bhdl(i), 0, c_loc(b(i)),c_sizeof(b(i)))
else
call fstarpu_variable_data_register(bhdl(i), -1, c_null_ptr,c_sizeof(b(i)))
end if
end do
end if
call fstarpu_mpi_data_register(ahdl, tag, 0)
do i=1,comm_size-1
call fstarpu_mpi_data_register(bhdl(i), tag+i,i)
end do
tag = tag + comm_size
call fstarpu_data_set_reduction_methods(ahdl,task_red_cl,task_ini_cl)
err = fstarpu_mpi_barrier(comm_world)
call fstarpu_fxt_start_profiling()
do w_node=1,comm_size-1
do i=1,work_coef*nworkers
call fstarpu_mpi_task_insert( (/ c_loc(comm_world), &
work_cl, &
task_mode, ahdl, &
FSTARPU_R, bhdl(w_node), &
FSTARPU_EXECUTE_ON_NODE, c_loc(w_node), &
C_NULL_PTR /))
end do
end do