Commit c2179e8b authored by HE Kun's avatar HE Kun
Browse files

starpupy: add numpy handle partition

parent f075edfa
......@@ -37,6 +37,7 @@ if STARPU_STARPUPY_NUMPY
TESTS += starpu_py_parallel.sh
TESTS += starpu_py_np.sh
TESTS += starpu_py_handle.sh
TESTS += starpu_py_partition.sh
endif
endif
......@@ -50,12 +51,15 @@ EXTRA_DIST = \
starpu_py_np.py \
starpu_py_np.sh \
tasks_size_overhead.py \
tasks_size_overhead.sh
tasks_size_overhead.sh \
starpu_py_partition.py \
starpu_py_partition.sh
python_sourcesdir = $(libdir)/starpu/python
dist_python_sources_DATA = \
starpu_py_parallel.py \
starpu_py.py \
starpu_py_handle.py \
tasks_size_overhead.py
tasks_size_overhead.py \
starpu_py_partition.py
# StarPU --- Runtime system for heterogeneous multicore architectures.
#
# Copyright (C) 2020-2021 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
#
# StarPU is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or (at
# your option) any later version.
#
# StarPU is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See the GNU Lesser General Public License in COPYING.LGPL for more details.
#
import starpu
from starpu import starpupy
from starpu import Handle
from starpu import HandleNumpy
try:
import numpy as np
except ModuleNotFoundError as e:
print("Can't find \"Python3 NumPy\" module (consider running \"pip3 install numpy\" or refer to https://numpy.org/install/)")
starpupy.shutdown()
exit(77)
import asyncio
import time
import array
import struct
# 1-dimension
# arr = np.arange(20)
# 2-dimension
# n, m = 20, 10
# arr = np.arange(n*m).reshape(n, m)
# 3-dimension
# x, y, z = 10, 15, 20
# arr = np.arange(x*y*z).reshape(x, y, z)
# 4-dimension
x, y, z, t = 10, 5, 10, 20
arr = np.arange(x*y*z*t).reshape(x, y, z, t)
print("input array is", arr)
arr_h = Handle(arr)
# split into split_num of sub handles
split_num = 3
#arr_h_list = arr_h.partition(split_num, 1, [6,6,8])
arr_h_list = arr_h.partition(split_num, 1, [3,2,5])
n_arr = arr_h.get_partition_size(arr_h_list)
print("partition size is", n_arr)
def show(x):
print("Function printing:", x)
@starpu.access(a="RW")
def add(a,b):
np.add(a,b,out=a)
for i in range(split_num):
res=starpu.task_submit(ret_handle=True)(add, arr_h_list[i], arr_h_list[i])
arr_r = arr_h.acquire(mode='RW')
print("output array is:", arr_r)
arr_h.release()
arr_h.unpartition(arr_h_list, split_num)
arr_h.unregister()
starpupy.shutdown()
\ No newline at end of file
#!/bin/bash
# StarPU --- Runtime system for heterogeneous multicore architectures.
#
# Copyright (C) 2020-2021 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
#
# StarPU is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or (at
# your option) any later version.
#
# StarPU is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See the GNU Lesser General Public License in COPYING.LGPL for more details.
#
exec $(dirname $0)/execute.sh starpu_py_partition.py $*
......@@ -65,4 +65,6 @@ EXTRA_DIST = \
starpupy_interface.c \
starpupy_interface.h \
starpupy_buffer_interface.c \
starpupy_buffer_interface.h
starpupy_buffer_interface.h \
starpupy_numpy_filters.c \
starpupy_numpy_filters.h
......@@ -60,6 +60,17 @@ class Handle(object):
def unregister_submit(self):
return starpupy.starpupy_data_unregister_submit(self.handle_obj)
# partition
def partition(self, nchildren, filter_func, chunks_list=[]):
return starpupy.starpupy_data_partition(self.handle_obj, nchildren, filter_func, chunks_list)
# get partition size
def get_partition_size(self, handle_list):
return starpupy.starpupy_get_partition_size(self.handle_obj, handle_list)
# unpartition
def unpartition(self, handle_list, nchildren):
return starpupy.starpupy_data_unpartition(self.handle_obj, handle_list, nchildren)
#class handle
class HandleNumpy(Handle):
......
......@@ -20,6 +20,7 @@ import joblib as jl
from joblib import logger
from joblib._parallel_backends import ParallelBackendBase
from starpu import starpupy
from starpu import Handle
import starpu
import asyncio
import math
......@@ -125,26 +126,32 @@ def future_generator(iterable, n_jobs, dict_task):
L_fut=[]
# split the vector
args_split=[]
# handle list
arg_h=[]
for i in range(len(args)):
args_split.append([])
# if the array is an numpy array
if has_numpy and type(args[i]) is np.ndarray:
# one-dimension matrix
if args[i].ndim==1:
# split numpy array
args_split[i]=np.array_split(args[i],n_block)
# get the length of numpy array
l_arr.append(args[i].size)
# two-dimension matrix
elif args[i].ndim==2:
# split numpy 2D array
args_split[i]=array2d_split(args[i],n_block)
# check whether the arg is already registed
handle_dict = starpu.handle_dict
if handle_dict.get(id(args[i]))==None:
arr_h = Handle(args[i])
starpu.handle_dict_set_item(args[i], arr_h)
arg_h.append(arr_h)
args_split[i] = arr_h.partition(n_block, 1)
else:
arr_h = handle_dict.get(id(args[i]))
arg_h.append(arr_h)
args_split[i] = arr_h.partition(n_block, 1)
# if the array is a generator
elif isinstance(args[i],types.GeneratorType):
# split generator
args_split[i]=partition(list(args[i]),n_block)
arg_h.append(None)
# get the length of generator
l_arr.append(sum(len(args_split[i][j]) for j in range(len(args_split[i]))))
else:
arg_h.append(None)
if len(set(l_arr))>1:
raise SystemExit('Error: all arrays should have the same size')
#print("args list is", args_split)
......@@ -153,7 +160,16 @@ def future_generator(iterable, n_jobs, dict_task):
L_args=[]
sizebase=0
for j in range(len(args)):
if (has_numpy and type(args[j]) is np.ndarray) or isinstance(args[j],types.GeneratorType):
if (has_numpy and type(args[j]) is np.ndarray):
L_args.append(args_split[j][i])
n_arr = arg_h[j].get_partition_size(args_split[j])
if sizebase==0:
sizebase=n_arr[i]
elif sizebase==n_arr[i]:
continue
else:
raise SystemExit('Error: all arrays should be split into equal size')
elif isinstance(args[j],types.GeneratorType):
L_args.append(args_split[j][i])
if sizebase==0:
sizebase=len(args_split[j][i])
......@@ -168,10 +184,12 @@ def future_generator(iterable, n_jobs, dict_task):
color=dict_task['color'], flops=dict_task['flops'], perfmodel=dict_task['perfmodel'], sizebase=sizebase,\
ret_handle=dict_task['ret_handle'], ret_fut=dict_task['ret_fut'], arg_handle=dict_task['arg_handle'], modes=dict_task['modes'])\
(f, *L_args)
for j in range(len(args)):
if (has_numpy and type(args[j]) is np.ndarray):
starpu.unregister(args_split[j][i])
L_fut.append(fut)
# unpartition and unregister the numpy array
for i in range(len(args)):
if (has_numpy and type(args[i]) is np.ndarray):
arg_h[i].unpartition(args_split[i], n_block)
#arg_h[i].unregister()
return L_fut
# if iterable is a generator or a list of function
......
......@@ -24,7 +24,7 @@ starpupy = Extension('starpu.starpupy',
include_dirs = ['@STARPU_SRC_DIR@/include', '@STARPU_BUILD_DIR@/include', '@STARPU_SRC_DIR@/starpupy/src'] + numpy_include_dir,
libraries = ['starpu-@STARPU_EFFECTIVE_VERSION@'],
library_dirs = ['@STARPU_BUILD_DIR@/src/.libs'],
sources = ['starpu/starpu_task_wrapper.c', 'starpu/starpupy_interface.c', 'starpu/starpupy_buffer_interface.c'])
sources = ['starpu/starpu_task_wrapper.c', 'starpu/starpupy_interface.c', 'starpu/starpupy_buffer_interface.c', 'starpu/starpupy_numpy_filters.c'])
setup(
name = 'starpupy',
......
......@@ -22,6 +22,7 @@
#include <starpu.h>
#include "starpupy_interface.h"
#include "starpupy_buffer_interface.h"
#include "starpupy_numpy_filters.h"
#define PY_SSIZE_T_CLEAN
#include <Python.h>
......@@ -1549,6 +1550,63 @@ static PyObject* starpu_task_submit_wrapper(PyObject *self, PyObject *args)
Py_DECREF(arg_id);
}
/* check if the arg is the sub handle*/
else if(strcmp(tp_arg, "PyCapsule")==0)
{
//printf("it's the sub handles\n");
/*get the modes option, which stores the access mode*/
PyObject *PyModes = PyDict_GetItemString(dict_option, "modes");
/*get the access mode of the argument*/
PyObject *tmp_mode_py = PyDict_GetItem(PyModes,PyLong_FromVoidPtr(tmp));
char* tmp_mode;
if(tmp_mode_py != NULL)
{
const char* mode_str = PyUnicode_AsUTF8(tmp_mode_py);
tmp_mode = strdup(mode_str);
}
/*create the Handle_token object to replace the Handle Capsule*/
PyObject *token_obj = PyObject_CallObject(pInstanceToken, NULL);
PyTuple_SetItem(argList, i, token_obj);
/*get Handle*/
starpu_data_handle_t tmp_handle = (starpu_data_handle_t) PyCapsule_GetPointer(tmp, "Handle");
task->handles[h_index] = tmp_handle;
/*set access mode*/
/*mode is STARPU_R*/
if(tmp_mode_py != NULL && strcmp(tmp_mode, "R") == 0)
{
func_cl->modes[h_index] = STARPU_R;
}
/*mode is STARPU_W*/
if(tmp_mode_py != NULL && strcmp(tmp_mode, "W") == 0)
{
func_cl->modes[h_index] = STARPU_W;
}
/*mode is STARPU_RW*/
if(tmp_mode_py != NULL && strcmp(tmp_mode, "RW") == 0)
{
func_cl->modes[h_index] = STARPU_RW;
}
/*access mode is not defined*/
if(tmp_mode_py == NULL)
{
func_cl->modes[h_index] = STARPU_R;
}
h_index++;
nbuffer = h_index;
if(tmp_mode_py != NULL)
{
free(tmp_mode);
}
}
else
{
PyTuple_SetItem(argList, i, tmp);
......@@ -1878,6 +1936,9 @@ static PyMethodDef starpupyMethods[] =
{"starpupy_release_object", starpupy_release_object_wrapper, METH_VARARGS, "release PyObject handle"}, /*release handle*/
{"starpupy_data_unregister_object", starpupy_data_unregister_object_wrapper, METH_VARARGS, "unregister PyObject handle"}, /*unregister handle*/
{"starpupy_data_unregister_submit_object", starpupy_data_unregister_submit_object_wrapper, METH_VARARGS, "unregister PyObject handle and object"}, /*unregister handle and object*/
{"starpupy_data_partition", starpu_data_partition_wrapper, METH_VARARGS, "handle partition into sub handles"},
{"starpupy_data_unpartition", starpu_data_unpartition_wrapper, METH_VARARGS, "handle unpartition sub handles"},
{"starpupy_get_partition_size", starpupy_get_partition_size_wrapper, METH_VARARGS, "get the array size from each sub handle"},
{"set_ncpu", starpu_set_ncpu, METH_VARARGS,"reinitialize starpu with given number of CPU"},
{NULL, NULL}
};
......
/* StarPU --- Runtime system for heterogeneous multicore architectures.
*
* Copyright (C) 2020-2021 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
*
* StarPU is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 2.1 of the License, or (at
* your option) any later version.
*
* StarPU is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
*
* See the GNU Lesser General Public License in COPYING.LGPL for more details.
*/
#include <starpu.h>
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#ifdef STARPU_PYTHON_HAVE_NUMPY
#include <numpy/arrayobject.h>
#endif
#include "starpupy_buffer_interface.h"
#include "starpupy_numpy_filters.h"
void starpupy_numpy_filter(void *father_interface, void *child_interface, STARPU_ATTRIBUTE_UNUSED struct starpu_data_filter *f, unsigned id, unsigned nchunks)
{
struct starpupy_buffer_interface *buffer_father = (struct starpupy_buffer_interface *) father_interface;
struct starpupy_buffer_interface *buffer_child = (struct starpupy_buffer_interface *) child_interface;
Py_ssize_t nbuf = buffer_father->buffer_size;
size_t elemsize = buffer_father->item_size;
int narr = nbuf/elemsize;
int child_narr;
size_t offset;
unsigned dim = f->filter_arg -1;
/*get the ndim*/
int ndim = buffer_father->dim_size;
int ni[ndim];
int i;
for (i=0; i<ndim; i++)
{
ni[i] = buffer_father->array_dim[i];
}
int nn = ni[dim];
unsigned ld;
if (dim == 0 && ndim != 1)
{
ld = ni[1];
}
else if (dim == 1 || ndim == 1)
{
ld = 1;
}
else
{
ld = 1;
unsigned i;
for (i=0; i<dim; i++)
{
ld = ld * ni[i];
}
}
/*we will do the partition on ni*/
int child_nn;
int* chunks_list = (int*) f->filter_arg_ptr;
if (chunks_list != NULL)
{
child_nn = chunks_list[id];
int chunk_nn = 0;
unsigned i = 0;
while(i < id)
{
chunk_nn = chunk_nn + chunks_list[i];
i++;
}
offset = chunk_nn * ld * elemsize;
}
else
{
starpu_filter_nparts_compute_chunk_size_and_offset(nn, nchunks, elemsize, id, ld, &child_nn, &offset);
}
child_narr = narr/nn*child_nn;
buffer_child->buffer_type = buffer_father->buffer_type;
buffer_child->py_buffer = buffer_father->py_buffer + offset;
buffer_child->buffer_size = child_narr * elemsize;
buffer_child->dim_size = ndim;
npy_intp *child_dim;
child_dim = (npy_intp*)malloc(ndim*sizeof(npy_intp));
for (i=0; i<ndim; i++)
{
if (i!=dim)
{
child_dim[i] = ni[i];
}
else
{
child_dim[i] = child_nn;
}
}
buffer_child->array_dim = child_dim;
buffer_child->array_type = buffer_father->array_type;
buffer_child->item_size = elemsize;
}
/*wrapper data partition*/
PyObject* starpu_data_partition_wrapper(PyObject *self, PyObject *args)
{
PyObject *handle_obj;
int nparts;
int filter;
PyObject *chunks_list;
if (!PyArg_ParseTuple(args, "OIIO", &handle_obj, &nparts, &filter, &chunks_list))
return NULL;
int nlist = PyList_Size(chunks_list);
int *nchunks;
if (nlist == 0)
{
nchunks = NULL;
}
else
{
if (nlist != nparts)
{
PyErr_Format(PyObject_GetAttrString(self, "error"), "chunk list size does not correspond to the required split size");
return NULL;
}
nchunks = (int*)malloc(nparts*sizeof(int));
int i;
for (i=0; i<nparts; i++)
{
nchunks[i] = PyLong_AsLong(PyList_GetItem(chunks_list, i));
}
}
/*filter func*/
struct starpu_data_filter f;
/*PyObject *->handle*/
starpu_data_handle_t handle = (starpu_data_handle_t) PyCapsule_GetPointer(handle_obj, "Handle");
if (handle == -1)
{
PyErr_Format(PyObject_GetAttrString(self, "error"), "Handle has already been unregisted");
return NULL;
}
starpu_data_handle_t handles[nparts];
f.filter_func = starpupy_numpy_filter;
f.nchildren = nparts;
f.get_nchildren = 0;
f.get_child_ops = 0;
f.filter_arg_ptr = nchunks;
/*1 horizontal, 2 vertical, 3 depth*/
f.filter_arg = filter;
Py_BEGIN_ALLOW_THREADS
starpu_data_partition_plan(handle, &f, handles);
Py_END_ALLOW_THREADS
PyObject *handle_list = PyList_New(nparts);
int i;
for(i=0; i<nparts; i++)
{
PyList_SetItem(handle_list, i, PyCapsule_New(handles[i], "Handle", NULL));
}
return handle_list;
}
/*get the partition size list*/
PyObject* starpupy_get_partition_size_wrapper(PyObject *self, PyObject *args)
{
PyObject *handle_obj;
PyObject *handle_list;
int nparts;
if (!PyArg_ParseTuple(args, "OO", &handle_obj, &handle_list))
return NULL;
nparts = PyList_Size(handle_list);
/*PyObject *->handle*/
starpu_data_handle_t handle = (starpu_data_handle_t) PyCapsule_GetPointer(handle_obj, "Handle");
if (handle == -1)
{
PyErr_Format(PyObject_GetAttrString(self, "error"), "Handle has already been unregisted");
return NULL;
}
PyObject *arr_size = PyList_New(nparts);
int i;
for(i=0; i<nparts; i++)
{
PyObject *handles_cap = PyList_GetItem(handle_list, i);
starpu_data_handle_t handle_tmp = (starpu_data_handle_t) PyCapsule_GetPointer(handles_cap, "Handle");
int node = starpu_data_get_home_node(handle_tmp);
struct starpupy_buffer_interface *local_interface = (struct starpupy_buffer_interface *) starpu_data_get_interface_on_node(handle_tmp, node);
int narr = local_interface->buffer_size/local_interface->item_size;
PyList_SetItem(arr_size, i, Py_BuildValue("I", narr));
}
return arr_size;
}
/*wrapper data unpartition*/
PyObject* starpu_data_unpartition_wrapper(PyObject *self, PyObject *args)
{
PyObject *handle_obj;
PyObject *handle_list;
int nparts;
if (!PyArg_ParseTuple(args, "OOI", &handle_obj, &handle_list, &nparts))
return NULL;
/*PyObject *->handle*/
starpu_data_handle_t handle = (starpu_data_handle_t) PyCapsule_GetPointer(handle_obj, "Handle");
if (handle == -1)
{
PyErr_Format(PyObject_GetAttrString(self, "error"), "Handle has already been unregisted");
return NULL;
}
starpu_data_handle_t handles[nparts];
int i;
for(i=0; i<nparts; i++)
{
PyObject *handles_cap = PyList_GetItem(handle_list, i);
handles[i] = (starpu_data_handle_t) PyCapsule_GetPointer(handles_cap, "Handle");
}
Py_BEGIN_ALLOW_THREADS
starpu_data_partition_clean(handle, nparts, handles);
Py_END_ALLOW_THREADS
/*return type is void*/
Py_INCREF(Py_None);
return Py_None;
}
\ No newline at end of file
/* StarPU --- Runtime system for heterogeneous multicore architectures.
*
* Copyright (C) 2020-2021 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
*
* StarPU is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 2.1 of the License, or (at
* your option) any later version.
*
* StarPU is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of