Commit cc7d2a03 authored by Millian Poquet's avatar Millian Poquet

The server should now be able to handle several job submitters. The process...

The server should now be able to handle several job submitters. The process completion could also be better, which may fix the deadlock issue once all jobs are processed
parent e7b4989c
......@@ -39,17 +39,16 @@ const int schedMessageMaxLength = 1024*1024 - 1; // 1 Mio should be enough...
char *task_type2str[] =
{
"ECHO",
"FINALIZE",
"LAUNCH_JOB",
"JOB_SUBMITTED",
"JOB_COMPLETED",
"KILL_JOB",
"SUSPEND_JOB",
"SCHED_EVENT",
"SCHED_READY",
"LAUNCHER_INFORMATION",
"KILLER_INFORMATION"
"KILLER_INFORMATION",
"SUBMITTER_HELLO",
"SUBMITTER_BYE"
};
int nb_nodes = 0;
......@@ -190,7 +189,6 @@ void send_message(const char *dst, e_task_type_t type, int job_id, void *data)
req_data->type = type;
req_data->job_id = job_id;
req_data->data = data;
req_data->src = MSG_host_get_name(MSG_host_self());
task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
XBT_INFO("message from '%s' to '%s' of type '%s' with data %p",
......@@ -409,6 +407,8 @@ static int jobs_submitter(int argc, char *argv[])
s_job_t * job;
unsigned int job_index;
send_message("server", SUBMITTER_HELLO, 0, NULL);
// todo: read jobs here and sort them by ascending submission date
double previousSubmissionDate = MSG_get_clock();
......@@ -424,6 +424,8 @@ static int jobs_submitter(int argc, char *argv[])
send_message("server", JOB_SUBMITTED, job->id, NULL);
}
send_message("server", SUBMITTER_BYE, 0, NULL);
return 0;
}
......@@ -440,6 +442,8 @@ int server(int argc, char *argv[])
int nb_completed_jobs = 0;
int nb_submitted_jobs = 0;
int nb_submitters = 0;
int nb_submitters_finished = 0;
int sched_ready = true;
int size_m;
char *tmp;
......@@ -457,17 +461,29 @@ int server(int argc, char *argv[])
// todo: add a better finition, for example the submitters could say "hello" and "goodbye" to the scheduler
// it may avoid the SG deadlock...
while ((nb_completed_jobs < nb_jobs) || !sched_ready)
while ((nb_submitters == 0) || (nb_submitters_finished < nb_submitters) ||
(nb_completed_jobs < nb_submitted_jobs) || !sched_ready)
{
// wait message node, submitter, scheduler...
MSG_task_receive(&(task_received), "server");
task_data = (s_task_data_t *) MSG_task_get_data(task_received);
XBT_INFO("Server receive Task/message type %s:", task_type2str[task_data->type]);
switch (task_data->type)
{
case SUBMITTER_HELLO:
{
nb_submitters++;
XBT_INFO("New submitter said hello. Number of polite submitters: %d", nb_submitters);
break;
}
case SUBMITTER_BYE:
{
nb_submitters_finished++;
XBT_INFO("A submitted said goodbye. Number of finished submitters: %d", nb_submitters_finished);
break;
}
case JOB_COMPLETED:
{
nb_completed_jobs++;
......@@ -662,8 +678,8 @@ msg_error_t deploy_all(const char *platform_file)
XBT_INFO("Nb nodes: %d", nb_nodes);
// Let's create processes on the master host
MSG_process_create("server", server, NULL, master_host);
MSG_process_create("jobs_submitter", jobs_submitter, NULL, master_host);
MSG_process_create("server", server, NULL, master_host);
// We can now initialize the tracing and run the processes
tracer = pajeTracer_create("schedule.trace", 0, 32);
......
......@@ -3,32 +3,30 @@
#include <msg/msg.h>
/**
* Types of tasks exchanged between nodes.
* @brief Types of tasks exchanged between nodes.
*/
typedef enum
{
ECHO,
FINALIZE,
LAUNCH_JOB,
JOB_SUBMITTED,
JOB_COMPLETED,
KILL_JOB,
SUSPEND_JOB,
SCHED_EVENT,
SCHED_READY,
LAUNCHER_INFORMATION,
KILLER_INFORMATION
FINALIZE //! Server -> Node
,LAUNCH_JOB //! Server -> Node
,JOB_SUBMITTED //! Submitter -> Server
,JOB_COMPLETED //! Launcher/killer -> Server
,SCHED_EVENT //! SchedulerHandler -> Server
,SCHED_READY //! SchedulerHandler -> Server
,LAUNCHER_INFORMATION //! Node -> Launcher
,KILLER_INFORMATION //! Node -> Killer
,SUBMITTER_HELLO //! Submitter -> Server
,SUBMITTER_BYE //! Submitter -> Server
} e_task_type_t;
/*
* Data attached with the tasks sent and received
* @brief Data attached with the tasks used to communicate between MSG processes
*/
typedef struct s_task_data
{
e_task_type_t type; // type of task
int job_id;
void *data;
const char* src; // used for logging
e_task_type_t type; //! Type of task
int job_id; //! The job ID
void *data; //! Either NULL or points to something else based on type
} s_task_data_t;
/**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment