Commit b0a3b6a9 authored by Millian Poquet's avatar Millian Poquet

Submitter/Server protocol improved.

Submitters can now identify themselves, which allows
a callback mechanism when jobs are completed.
The submitters which want this callback mechanism
should enable it when saying HELLO to the server,
then, all jobs submiitted by this submitter will
generate a callback once they complete.
parent 3ecccd22
...@@ -64,6 +64,9 @@ std::string ipMessageTypeToString(IPMessageType type) ...@@ -64,6 +64,9 @@ std::string ipMessageTypeToString(IPMessageType type)
case IPMessageType::SUBMITTER_HELLO: case IPMessageType::SUBMITTER_HELLO:
s = "SUBMITTER_HELLO"; s = "SUBMITTER_HELLO";
break; break;
case IPMessageType::SUBMITTER_CALLBACK:
s = "SUBMITTER_CALLBACK";
break;
case IPMessageType::SUBMITTER_BYE: case IPMessageType::SUBMITTER_BYE:
s = "SUBMITTER_BYE"; s = "SUBMITTER_BYE";
break; break;
...@@ -130,9 +133,18 @@ IPMessage::~IPMessage() ...@@ -130,9 +133,18 @@ IPMessage::~IPMessage()
} break; } break;
case IPMessageType::SUBMITTER_HELLO: case IPMessageType::SUBMITTER_HELLO:
{ {
SubmitterHelloMessage * msg = (SubmitterHelloMessage *) data;
delete msg;
} break;
case IPMessageType::SUBMITTER_CALLBACK:
{
SubmitterJobCompletionCallbackMessage * msg = (SubmitterJobCompletionCallbackMessage *) data;
delete msg;
} break; } break;
case IPMessageType::SUBMITTER_BYE: case IPMessageType::SUBMITTER_BYE:
{ {
SubmitterByeMessage * msg = (SubmitterByeMessage *) data;
delete msg;
} break; } break;
case IPMessageType::SWITCHED_ON: case IPMessageType::SWITCHED_ON:
{ {
...@@ -156,3 +168,8 @@ string JobIdentifier::to_string() const ...@@ -156,3 +168,8 @@ string JobIdentifier::to_string() const
{ {
return workload_name + '!' + std::to_string(job_number); return workload_name + '!' + std::to_string(job_number);
} }
bool operator<(const JobIdentifier &ji1, const JobIdentifier &ji2)
{
return ji1.to_string() < ji2.to_string();
}
...@@ -30,6 +30,14 @@ struct JobIdentifier ...@@ -30,6 +30,14 @@ struct JobIdentifier
std::string to_string() const; std::string to_string() const;
}; };
/**
* @brief Compares two JobIdentifier thanks to their string representations
* @param[in] ji1 The first JobIdentifier
* @param[in] ji2 The second JobIdentifier
* @return ji1.to_string() < ji2.to_string()
*/
bool operator<(const JobIdentifier & ji1, const JobIdentifier & ji2);
/** /**
* @brief Stores the different types of inter-process messages * @brief Stores the different types of inter-process messages
*/ */
...@@ -46,16 +54,43 @@ enum class IPMessageType ...@@ -46,16 +54,43 @@ enum class IPMessageType
,SCHED_READY //!< SchedulerHandler -> Server. The scheduler handler tells the server that the scheduler is ready (messages can be sent to it). ,SCHED_READY //!< SchedulerHandler -> Server. The scheduler handler tells the server that the scheduler is ready (messages can be sent to it).
,WAITING_DONE //!< Waiter -> server. The waiter tells the server that the target time has been reached. ,WAITING_DONE //!< Waiter -> server. The waiter tells the server that the target time has been reached.
,SUBMITTER_HELLO //!< Submitter -> Server. The submitter tells it starts submitting to the server. ,SUBMITTER_HELLO //!< Submitter -> Server. The submitter tells it starts submitting to the server.
,SUBMITTER_CALLBACK //!< Server -> Submitter. The server sends a message to the Submitter. This message is initiated when a Job which has been submitted by the submitter has completed. The submitter must have said that it wanted to be called back when he said hello.
,SUBMITTER_BYE //!< Submitter -> Server. The submitter tells it stops submitting to the server. ,SUBMITTER_BYE //!< Submitter -> Server. The submitter tells it stops submitting to the server.
,SWITCHED_ON //!< SwitcherON -> Server. The switcherON process tells the server the machine pstate has been changed ,SWITCHED_ON //!< SwitcherON -> Server. The switcherON process tells the server the machine pstate has been changed
,SWITCHED_OFF //!< SwitcherOFF -> Server. The switcherOFF process tells the server the machine pstate has been changed. ,SWITCHED_OFF //!< SwitcherOFF -> Server. The switcherOFF process tells the server the machine pstate has been changed.
}; };
/**
* @brief The content of the SUBMITTER_HELLO message
*/
struct SubmitterHelloMessage
{
std::string submitter_name; //!< The name of the submitter. Must be unique. Is also used as a mailbox.
bool enable_callback_on_job_completion; //!< If set to true, the submitter should be called back when its jobs complete.
};
/**
* @brief The content of the SUBMITTER_BYE message
*/
struct SubmitterByeMessage
{
std::string submitter_name; //!< The name of the submitter.
};
/**
* @brief The content of the SUBMITTER_CALLBACK message
*/
struct SubmitterJobCompletionCallbackMessage
{
JobIdentifier job_id; //!< The JobIdentifier
};
/** /**
* @brief The content of the JobSubmitted message * @brief The content of the JobSubmitted message
*/ */
struct JobSubmittedMessage struct JobSubmittedMessage
{ {
std::string submitter_name; //!< The name of the submitter which submitted the job.
JobIdentifier job_id; //!< The JobIdentifier JobIdentifier job_id; //!< The JobIdentifier
}; };
......
...@@ -27,8 +27,33 @@ int static_job_submitter_process(int argc, char *argv[]) ...@@ -27,8 +27,33 @@ int static_job_submitter_process(int argc, char *argv[])
"which does not exist", args->workload_name.c_str()); "which does not exist", args->workload_name.c_str());
Workload * workload = context->workloads.at(args->workload_name); Workload * workload = context->workloads.at(args->workload_name);
const string submitter_name = "static_submitter";
send_message("server", IPMessageType::SUBMITTER_HELLO);
/* ░░░░░░░░▄▄▄███░░░░░░░░░░░░░░░░░░░░
░░░▄▄██████████░░░░░░░░░░░░░░░░░░░
░███████████████░░░░░░░░░░░░░░░░░░
░▀███████████████░░░░░▄▄▄░░░░░░░░░
░░░███████████████▄███▀▀▀░░░░░░░░░
░░░░███████████████▄▄░░░░░░░░░░░░░
░░░░▄████████▀▀▄▄▄▄▄░▀░░░░░░░░░░░░
▄███████▀█▄▀█▄░░█░▀▀▀░█░░▄▄░░░░░░░
▀▀░░░██▄█▄░░▀█░░▄███████▄█▀░░░▄░░░
░░░░░█░█▀▄▄▀▄▀░█▀▀▀█▀▄▄▀░░░░░░▄░▄█
░░░░░█░█░░▀▀▄▄█▀░█▀▀░░█░░░░░░░▀██░
░░░░░▀█▄░░░░░░░░░░░░░▄▀░░░░░░▄██░░
░░░░░░▀█▄▄░░░░░░░░▄▄█░░░░░░▄▀░░█░░
░░░░░░░░░▀███▀▀████▄██▄▄░░▄▀░░░░░░
░░░░░░░░░░░█▄▀██▀██▀▄█▄░▀▀░░░░░░░░
░░░░░░░░░░░██░▀█▄█░█▀░▀▄░░░░░░░░░░
░░░░░░░░░░█░█▄░░▀█▄▄▄░░█░░░░░░░░░░
░░░░░░░░░░█▀██▀▀▀▀░█▄░░░░░░░░░░░░░
░░░░░░░░░░░░▀░░░░░░░░░░░▀░░░░░░░░░ */
SubmitterHelloMessage * hello_msg = new SubmitterHelloMessage;
hello_msg->submitter_name = submitter_name;
hello_msg->enable_callback_on_job_completion = false;
send_message("server", IPMessageType::SUBMITTER_HELLO, (void*) hello_msg);
double previousSubmissionDate = MSG_get_clock(); double previousSubmissionDate = MSG_get_clock();
...@@ -61,6 +86,7 @@ int static_job_submitter_process(int argc, char *argv[]) ...@@ -61,6 +86,7 @@ int static_job_submitter_process(int argc, char *argv[])
// Let's now continue the simulation // Let's now continue the simulation
JobSubmittedMessage * msg = new JobSubmittedMessage; JobSubmittedMessage * msg = new JobSubmittedMessage;
msg->submitter_name = submitter_name;
msg->job_id.workload_name = args->workload_name; msg->job_id.workload_name = args->workload_name;
msg->job_id.job_number = job->number; msg->job_id.job_number = job->number;
...@@ -72,7 +98,9 @@ int static_job_submitter_process(int argc, char *argv[]) ...@@ -72,7 +98,9 @@ int static_job_submitter_process(int argc, char *argv[])
} }
} }
send_message("server", IPMessageType::SUBMITTER_BYE); SubmitterByeMessage * bye_msg = new SubmitterByeMessage;
bye_msg->submitter_name = submitter_name;
send_message("server", IPMessageType::SUBMITTER_BYE, (void *) bye_msg);
delete args; delete args;
return 0; return 0;
} }
...@@ -39,6 +39,18 @@ int uds_server_process(int argc, char *argv[]) ...@@ -39,6 +39,18 @@ int uds_server_process(int argc, char *argv[])
const int protocol_version = 2; const int protocol_version = 2;
bool sched_ready = true; bool sched_ready = true;
// Let's store some information about the submitters
struct Submitter
{
string mailbox;
bool should_be_called_back;
};
map<string, Submitter*> submitters;
// Let's store the origin or some jobs
map<JobIdentifier, Submitter*> origin_of_jobs;
string send_buffer; string send_buffer;
while ((nb_submitters == 0) || (nb_submitters_finished < nb_submitters) || while ((nb_submitters == 0) || (nb_submitters_finished < nb_submitters) ||
...@@ -57,13 +69,33 @@ int uds_server_process(int argc, char *argv[]) ...@@ -57,13 +69,33 @@ int uds_server_process(int argc, char *argv[])
{ {
case IPMessageType::SUBMITTER_HELLO: case IPMessageType::SUBMITTER_HELLO:
{ {
xbt_assert(task_data->data != nullptr);
SubmitterHelloMessage * message = (SubmitterHelloMessage *) task_data->data;
xbt_assert(submitters.count(message->submitter_name) == 0,
"Invalid new submitter '%s': a submitter with the same name already exists!",
message->submitter_name.c_str());
nb_submitters++; nb_submitters++;
XBT_INFO( "New submitter said hello. Number of polite submitters: %d", nb_submitters);
Submitter * submitter = new Submitter;
submitter->mailbox = message->submitter_name;
submitter->should_be_called_back = message->enable_callback_on_job_completion;
submitters[message->submitter_name] = submitter;
XBT_INFO("New submitter said hello. Number of polite submitters: %d", nb_submitters);
} break; // end of case SUBMITTER_HELLO } break; // end of case SUBMITTER_HELLO
case IPMessageType::SUBMITTER_BYE: case IPMessageType::SUBMITTER_BYE:
{ {
xbt_assert(task_data->data != nullptr);
SubmitterByeMessage * message = (SubmitterByeMessage *) task_data->data;
xbt_assert(submitters.count(message->submitter_name) == 1);
submitters.erase(message->submitter_name);
nb_submitters_finished++; nb_submitters_finished++;
XBT_INFO( "A submitted said goodbye. Number of finished submitters: %d", nb_submitters_finished); XBT_INFO( "A submitted said goodbye. Number of finished submitters: %d", nb_submitters_finished);
...@@ -74,6 +106,18 @@ int uds_server_process(int argc, char *argv[]) ...@@ -74,6 +106,18 @@ int uds_server_process(int argc, char *argv[])
xbt_assert(task_data->data != nullptr); xbt_assert(task_data->data != nullptr);
JobCompletedMessage * message = (JobCompletedMessage *) task_data->data; JobCompletedMessage * message = (JobCompletedMessage *) task_data->data;
if (origin_of_jobs.count(message->job_id) == 1)
{
// Let's call the submitter which submitted the job back
SubmitterJobCompletionCallbackMessage * msg = new SubmitterJobCompletionCallbackMessage;
msg->job_id = message->job_id;
Submitter * submitter = origin_of_jobs.at(message->job_id);
send_message(submitter->mailbox, IPMessageType::SUBMITTER_CALLBACK, (void*) msg);
origin_of_jobs.erase(message->job_id);
}
nb_running_jobs--; nb_running_jobs--;
xbt_assert(nb_running_jobs >= 0); xbt_assert(nb_running_jobs >= 0);
nb_completed_jobs++; nb_completed_jobs++;
...@@ -91,10 +135,20 @@ int uds_server_process(int argc, char *argv[]) ...@@ -91,10 +135,20 @@ int uds_server_process(int argc, char *argv[])
xbt_assert(task_data->data != nullptr); xbt_assert(task_data->data != nullptr);
JobSubmittedMessage * message = (JobSubmittedMessage *) task_data->data; JobSubmittedMessage * message = (JobSubmittedMessage *) task_data->data;
xbt_assert(submitters.count(message->submitter_name) == 1);
Submitter * submitter = submitters.at(message->submitter_name);
if (submitter->should_be_called_back)
{
xbt_assert(origin_of_jobs.count(message->job_id) == 0);
origin_of_jobs[message->job_id] = submitter;
}
nb_submitted_jobs++; nb_submitted_jobs++;
Job * job = context->workloads.job_at(message->job_id); Job * job = context->workloads.job_at(message->job_id);
job->state = JobState::JOB_STATE_SUBMITTED; job->state = JobState::JOB_STATE_SUBMITTED;
XBT_INFO( "Job %d SUBMITTED. %d jobs submitted so far", job->number, nb_submitted_jobs); XBT_INFO( "Job %d SUBMITTED. %d jobs submitted so far", job->number, nb_submitted_jobs);
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":S:" + message->job_id.to_string(); send_buffer += "|" + std::to_string(MSG_get_clock()) + ":S:" + message->job_id.to_string();
XBT_DEBUG( "Message to send to scheduler: '%s'", send_buffer.c_str()); XBT_DEBUG( "Message to send to scheduler: '%s'", send_buffer.c_str());
...@@ -348,7 +402,12 @@ int uds_server_process(int argc, char *argv[]) ...@@ -348,7 +402,12 @@ int uds_server_process(int argc, char *argv[])
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":e:" + send_buffer += "|" + std::to_string(MSG_get_clock()) + ":e:" +
std::to_string(total_consumed_energy); std::to_string(total_consumed_energy);
XBT_DEBUG("Message to send to scheduler : '%s'", send_buffer.c_str()); XBT_DEBUG("Message to send to scheduler : '%s'", send_buffer.c_str());
} } break; // end of case SCHED_TELL_ME_ENERGY
case IPMessageType::SUBMITTER_CALLBACK:
{
xbt_assert(false, "The server received a SUBMITTER_CALLBACK message, which should not happen");
} break; // end of case SUBMITTER_CALLBACK
} // end of switch } // end of switch
delete task_data; delete task_data;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment