Commit 66376a5c authored by BRAMAS Berenger's avatar BRAMAS Berenger

prefetch data on node in the scheduler

parent fa0a4436
...@@ -129,17 +129,20 @@ struct _starpu_heteroprio_worker{ ...@@ -129,17 +129,20 @@ struct _starpu_heteroprio_worker{
struct starpu_task* tasks_queue[HETEROPRIO_MAX_PREFETCH]; struct starpu_task* tasks_queue[HETEROPRIO_MAX_PREFETCH];
unsigned tasks_queue_size; unsigned tasks_queue_size;
unsigned tasks_queue_index; unsigned tasks_queue_index;
starpu_pthread_mutex_t ws_prefetch_mutex;
}; };
/* Init a worker by setting every thing to zero */ /* Init a worker by setting every thing to zero */
static void _starpu_heteroprio_worker_init(struct _starpu_heteroprio_worker* worker){ static void _starpu_heteroprio_worker_init(struct _starpu_heteroprio_worker* worker){
memset(worker, 0, sizeof(*worker)); memset(worker, 0, sizeof(*worker));
worker->tasks_queue_index = 0; worker->tasks_queue_index = 0;
STARPU_PTHREAD_MUTEX_INIT(&worker->ws_prefetch_mutex, NULL);
} }
/* Release a worker */ /* Release a worker */
static void _starpu_heteroprio_worker_release(struct _starpu_heteroprio_worker* worker){ static void _starpu_heteroprio_worker_release(struct _starpu_heteroprio_worker* worker){
assert(worker->tasks_queue_size == 0); assert(worker->tasks_queue_size == 0);
STARPU_PTHREAD_MUTEX_DESTROY(&worker->ws_prefetch_mutex);
} }
/* HETEROPRIO_MAX_PRIO is the maximum prio/buckets available */ /* HETEROPRIO_MAX_PRIO is the maximum prio/buckets available */
...@@ -385,7 +388,7 @@ static int push_task_heteroprio_policy(struct starpu_task *task) ...@@ -385,7 +388,7 @@ static int push_task_heteroprio_policy(struct starpu_task *task)
static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id) static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
{ {
unsigned workerid = starpu_worker_get_id(); const unsigned workerid = starpu_worker_get_id();
struct _starpu_heteroprio_center_policy_heteroprio *heteroprio = (struct _starpu_heteroprio_center_policy_heteroprio*)starpu_sched_ctx_get_policy_data(sched_ctx_id); struct _starpu_heteroprio_center_policy_heteroprio *heteroprio = (struct _starpu_heteroprio_center_policy_heteroprio*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
struct _starpu_heteroprio_worker* worker = &heteroprio->workers_heteroprio[workerid]; struct _starpu_heteroprio_worker* worker = &heteroprio->workers_heteroprio[workerid];
...@@ -404,6 +407,9 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id) ...@@ -404,6 +407,9 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
STARPU_PTHREAD_MUTEX_LOCK(&heteroprio->policy_mutex); STARPU_PTHREAD_MUTEX_LOCK(&heteroprio->policy_mutex);
/* keep track of the new added task to perfom real prefetch on node */
unsigned nb_added_tasks = 0;
/* Check that some tasks are available for the current worker arch */ /* Check that some tasks are available for the current worker arch */
if( heteroprio->nb_remaining_tasks_per_arch_index[worker->arch_index] != 0 ){ if( heteroprio->nb_remaining_tasks_per_arch_index[worker->arch_index] != 0 ){
/* Ideally we would like to fill the prefetch array */ /* Ideally we would like to fill the prefetch array */
...@@ -418,6 +424,8 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id) ...@@ -418,6 +424,8 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
else nb_tasks_to_prefetch = 0; else nb_tasks_to_prefetch = 0;
} }
nb_added_tasks = nb_tasks_to_prefetch;
/* We iterate until we found all the tasks we need */ /* We iterate until we found all the tasks we need */
for(unsigned idx_prio = 0 ; nb_tasks_to_prefetch && idx_prio < heteroprio->nb_prio_per_arch_index[worker->arch_index] ; ++idx_prio){ for(unsigned idx_prio = 0 ; nb_tasks_to_prefetch && idx_prio < heteroprio->nb_prio_per_arch_index[worker->arch_index] ; ++idx_prio){
/* Retrieve the bucket using the mapping */ /* Retrieve the bucket using the mapping */
...@@ -467,14 +475,25 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id) ...@@ -467,14 +475,25 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
/* Each worker starts from its own index and do a turn */ /* Each worker starts from its own index and do a turn */
for(unsigned idx_worker_it = 1 ; idx_worker_it < heteroprio->nb_workers ; ++idx_worker_it){ for(unsigned idx_worker_it = 1 ; idx_worker_it < heteroprio->nb_workers ; ++idx_worker_it){
const unsigned idx_worker = ((workerid+idx_worker_it)%heteroprio->nb_workers); const unsigned idx_worker = ((workerid+idx_worker_it)%heteroprio->nb_workers);
/* we must never test on ourself */
assert(idx_worker != workerid);
/* If it is the same arch and there is a task to steal */ /* If it is the same arch and there is a task to steal */
if(heteroprio->workers_heteroprio[idx_worker].arch_index == worker->arch_index if(heteroprio->workers_heteroprio[idx_worker].arch_index == worker->arch_index
&& heteroprio->workers_heteroprio[idx_worker].tasks_queue_size){ && heteroprio->workers_heteroprio[idx_worker].tasks_queue_size){
task = heteroprio->workers_heteroprio[idx_worker].tasks_queue[heteroprio->workers_heteroprio[idx_worker].tasks_queue_index]; /* ensure the worker is not currently prefetching its data */
heteroprio->workers_heteroprio[idx_worker].tasks_queue_index = (heteroprio->workers_heteroprio[idx_worker].tasks_queue_index+1)%HETEROPRIO_MAX_PREFETCH; STARPU_PTHREAD_MUTEX_LOCK(&heteroprio->workers_heteroprio[idx_worker].ws_prefetch_mutex);
heteroprio->workers_heteroprio[idx_worker].tasks_queue_size -= 1; if(heteroprio->workers_heteroprio[idx_worker].arch_index == worker->arch_index
heteroprio->nb_prefetched_tasks_per_arch_index[heteroprio->workers_heteroprio[idx_worker].arch_index] -= 1; && heteroprio->workers_heteroprio[idx_worker].tasks_queue_size){
break; /* steal the last added task */
task = heteroprio->workers_heteroprio[idx_worker].tasks_queue[(heteroprio->workers_heteroprio[idx_worker].tasks_queue_index+heteroprio->workers_heteroprio[idx_worker].tasks_queue_size-1)
% HETEROPRIO_MAX_PREFETCH];
/* update the worker by saying we steal a task */
heteroprio->workers_heteroprio[idx_worker].tasks_queue_size -= 1;
/* we steal a task update global counter */
heteroprio->nb_prefetched_tasks_per_arch_index[heteroprio->workers_heteroprio[idx_worker].arch_index] -= 1;
break;
}
STARPU_PTHREAD_MUTEX_UNLOCK(&heteroprio->workers_heteroprio[idx_worker].ws_prefetch_mutex);
} }
} }
} }
...@@ -496,6 +515,23 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id) ...@@ -496,6 +515,23 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
} }
/* End copy of eager */ /* End copy of eager */
/* if we have task (task) me way have some in the queue (worker->tasks_queue_size) that was freshly addeed (nb_added_tasks) */
if(task && worker->tasks_queue_size && nb_added_tasks && starpu_get_prefetch_flag()){
const unsigned memory_node = starpu_worker_get_memory_node(workerid);
/* prefetch the new task that I own but protecte my node from work stealing during the prefetch */
STARPU_PTHREAD_MUTEX_LOCK(&worker->ws_prefetch_mutex);
/* prefetch task but stop in case we now some one may steal a task from us */
while(nb_added_tasks && heteroprio->nb_remaining_tasks_per_arch_index[worker->arch_index] != 0){
/* prefetch from closest to end task */
starpu_prefetch_task_input_on_node(worker->tasks_queue[(worker->tasks_queue_index+worker->tasks_queue_size-nb_added_tasks)%HETEROPRIO_MAX_PREFETCH], memory_node);
nb_added_tasks -= 1;
}
STARPU_PTHREAD_MUTEX_UNLOCK(&worker->ws_prefetch_mutex);
}
return task; return task;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment