Commit e1701a37 authored by S. Lackner's avatar S. Lackner

[code] Add job <-> scheduler communication profiles

parent 86daf6a2
...@@ -127,6 +127,9 @@ std::string ip_message_type_to_string(IPMessageType type) ...@@ -127,6 +127,9 @@ std::string ip_message_type_to_string(IPMessageType type)
break; break;
case IPMessageType::CONTINUE_DYNAMIC_SUBMIT: case IPMessageType::CONTINUE_DYNAMIC_SUBMIT:
s = "CONTINUE_DYNAMIC_SUBMIT"; s = "CONTINUE_DYNAMIC_SUBMIT";
break;
case IPMessageType::TO_JOB_MSG:
s = "TO_JOB_MSG";
} }
return s; return s;
...@@ -251,6 +254,11 @@ IPMessage::~IPMessage() ...@@ -251,6 +254,11 @@ IPMessage::~IPMessage()
case IPMessageType::CONTINUE_DYNAMIC_SUBMIT: case IPMessageType::CONTINUE_DYNAMIC_SUBMIT:
{ {
} break; } break;
case IPMessageType::TO_JOB_MSG:
{
ToJobMessage * msg = (ToJobMessage *) data;
delete msg;
} break;
} }
data = nullptr; data = nullptr;
......
...@@ -73,6 +73,7 @@ enum class IPMessageType ...@@ -73,6 +73,7 @@ enum class IPMessageType
,SWITCHED_OFF //!< SwitcherOFF -> Server. The switcherOFF process tells the server the machine pstate has been changed. ,SWITCHED_OFF //!< SwitcherOFF -> Server. The switcherOFF process tells the server the machine pstate has been changed.
,END_DYNAMIC_SUBMIT //!< Scheduler -> Server. The scheduler tells the server that dynamic job submissions are finished. ,END_DYNAMIC_SUBMIT //!< Scheduler -> Server. The scheduler tells the server that dynamic job submissions are finished.
,CONTINUE_DYNAMIC_SUBMIT //!< Scheduler -> Server. The scheduler tells the server that dynamic job submissions continue. ,CONTINUE_DYNAMIC_SUBMIT //!< Scheduler -> Server. The scheduler tells the server that dynamic job submissions continue.
,TO_JOB_MSG //!< Scheduler -> Server. The scheduler sends a message to a job.
}; };
/** /**
...@@ -228,6 +229,15 @@ struct KillingDoneMessage ...@@ -228,6 +229,15 @@ struct KillingDoneMessage
std::vector<JobIdentifier> jobs_ids; //!< The IDs of the jobs which have been killed std::vector<JobIdentifier> jobs_ids; //!< The IDs of the jobs which have been killed
}; };
/**
* @brief The content of the ToJobMessage message
*/
struct ToJobMessage
{
JobIdentifier job_id; //!< The JobIdentifier
std::string message; //!< The message to send to the job
};
/** /**
* @brief The base struct sent in inter-process messages * @brief The base struct sent in inter-process messages
*/ */
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include <map> #include <map>
#include <vector> #include <vector>
#include <deque>
#include <rapidjson/document.h> #include <rapidjson/document.h>
...@@ -56,6 +57,8 @@ struct Job ...@@ -56,6 +57,8 @@ struct Job
long double consumed_energy; //!< The sum, for each machine on which the job has been allocated, of the consumed energy (in Joules) during the job execution time (consumed_energy_after_job_completion - consumed_energy_before_job_start) long double consumed_energy; //!< The sum, for each machine on which the job has been allocated, of the consumed energy (in Joules) during the job execution time (consumed_energy_after_job_completion - consumed_energy_before_job_start)
std::deque<std::string> incoming_message_buffer; //!< The buffer for incoming messages from the scheduler.
Rational starting_time; //!< The time at which the job starts to be executed. Rational starting_time; //!< The time at which the job starts to be executed.
Rational runtime; //!< The amount of time during which the job has been executed Rational runtime; //!< The amount of time during which the job has been executed
MachineRange allocation; //!< The machines on which the job has been executed. MachineRange allocation; //!< The machines on which the job has been executed.
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
* @file jobs_execution.cpp * @file jobs_execution.cpp
* @brief Contains functions related to the execution of the jobs * @brief Contains functions related to the execution of the jobs
*/ */
#include <regex>
#include "jobs_execution.hpp" #include "jobs_execution.hpp"
#include "jobs.hpp" #include "jobs.hpp"
...@@ -293,6 +294,78 @@ int execute_profile(BatsimContext *context, ...@@ -293,6 +294,78 @@ int execute_profile(BatsimContext *context,
} }
return profile->return_code; return profile->return_code;
} }
else if (profile->type == ProfileType::SCHEDULER_SEND)
{
SchedulerSendProfileData * data = (SchedulerSendProfileData *) profile->data;
XBT_INFO("Sending message to the scheduler: %s", data->message.c_str());
context->proto_writer->append_from_job_message(job->id, data->message, MSG_get_clock());
return profile->return_code;
}
else if (profile->type == ProfileType::SCHEDULER_RECV)
{
SchedulerRecvProfileData * data = (SchedulerRecvProfileData *) profile->data;
string profile_to_execute = "";
bool has_messages = false;
XBT_INFO("Trying to receive message from scheduler");
if (job->incoming_message_buffer.empty()) {
if (data->on_timeout == "") {
XBT_INFO("Waiting for message from scheduler");
while (true) {
static double sleeptime = 0.00001;
if (sleeptime < *remaining_time)
{
MSG_process_sleep(sleeptime);
*remaining_time = *remaining_time - sleeptime;
}
else
{
XBT_INFO("Job has reached walltime without receiving message from scheduler");
MSG_process_sleep(*remaining_time);
*remaining_time = 0;
return -1;
}
if (!job->incoming_message_buffer.empty()) {
XBT_INFO("Finally got message from scheduler");
has_messages = true;
break;
}
}
} else {
XBT_INFO("Timeout on waiting for message from scheduler");
profile_to_execute = data->on_timeout;
}
} else {
has_messages = true;
}
if (has_messages) {
string first_message = job->incoming_message_buffer.front();
job->incoming_message_buffer.pop_front();
regex msg_regex(data->regex);
if (regex_match(first_message, msg_regex)) {
XBT_INFO("Message from scheduler matches");
profile_to_execute = data->on_success;
} else {
XBT_INFO("Message from scheduler does not match");
profile_to_execute = data->on_failure;
}
}
if (profile_to_execute != "") {
XBT_INFO("Execute profile: %s", profile_to_execute.c_str());
int ret_last_profile = execute_profile(context, profile_to_execute, allocation,
cleanup_data, remaining_time);
if (ret_last_profile != 0) {
return ret_last_profile;
}
}
return profile->return_code;
}
else if (profile->type == ProfileType::DELAY) else if (profile->type == ProfileType::DELAY)
{ {
DelayProfileData * data = (DelayProfileData *) profile->data; DelayProfileData * data = (DelayProfileData *) profile->data;
......
...@@ -195,6 +195,24 @@ Profile::~Profile() ...@@ -195,6 +195,24 @@ Profile::~Profile()
d = nullptr; d = nullptr;
} }
} }
else if (type == ProfileType::SCHEDULER_SEND)
{
SchedulerSendProfileData * d = (SchedulerSendProfileData *) data;
if (d != nullptr)
{
delete d;
d = nullptr;
}
}
else if (type == ProfileType::SCHEDULER_RECV)
{
SchedulerRecvProfileData * d = (SchedulerRecvProfileData *) data;
if (d != nullptr)
{
delete d;
d = nullptr;
}
}
else else
{ {
XBT_ERROR("Deletion of an unknown profile type (%d)", type); XBT_ERROR("Deletion of an unknown profile type (%d)", type);
...@@ -409,6 +427,44 @@ Profile *Profile::from_json(const std::string & profile_name, ...@@ -409,6 +427,44 @@ Profile *Profile::from_json(const std::string & profile_name,
error_prefix.c_str(), profile_name.c_str(), direction); error_prefix.c_str(), profile_name.c_str(), direction);
} }
profile->data = data;
}
else if (profile_type == "send")
{
profile->type = ProfileType::SCHEDULER_SEND;
SchedulerSendProfileData * data = new SchedulerSendProfileData;
xbt_assert(json_desc.HasMember("msg"), "%s: profile '%s' has no 'msg' field",
error_prefix.c_str(), profile_name.c_str());
data->message = json_desc["msg"].GetString();
profile->data = data;
}
else if (profile_type == "recv")
{
profile->type = ProfileType::SCHEDULER_RECV;
SchedulerRecvProfileData * data = new SchedulerRecvProfileData;
data->regex = string(".*");
if (json_desc.HasMember("regex")) {
data->regex = json_desc["regex"].GetString();
}
data->on_success = string("");
if (json_desc.HasMember("success")) {
data->on_success = json_desc["success"].GetString();
}
data->on_failure = string("");
if (json_desc.HasMember("failure")) {
data->on_failure = json_desc["failure"].GetString();
}
data->on_timeout = string("");
if (json_desc.HasMember("timeout")) {
data->on_timeout = json_desc["timeout"].GetString();
}
profile->data = data; profile->data = data;
} }
else if (profile_type == "smpi") else if (profile_type == "smpi")
......
...@@ -23,6 +23,8 @@ enum class ProfileType ...@@ -23,6 +23,8 @@ enum class ProfileType
,SEQUENCE //!< The profile is non-atomic: it is composed of a sequence of other profiles ,SEQUENCE //!< The profile is non-atomic: it is composed of a sequence of other profiles
,MSG_PARALLEL_HOMOGENEOUS_PFS_MULTIPLE_TIERS //!< The profile is a homogeneous MSG for complex parallel filesystem access. Its data is of type MsgParallelHomogeneousPFSMultipleTiersProfileData ,MSG_PARALLEL_HOMOGENEOUS_PFS_MULTIPLE_TIERS //!< The profile is a homogeneous MSG for complex parallel filesystem access. Its data is of type MsgParallelHomogeneousPFSMultipleTiersProfileData
,MSG_DATA_STAGING //!< The profile is a MSG for moving data between the pfs hosts. Its data is of type DataStagingProfileData ,MSG_DATA_STAGING //!< The profile is a MSG for moving data between the pfs hosts. Its data is of type DataStagingProfileData
,SCHEDULER_SEND //!< The profile is a profile simulating a message sent to the scheduler. Its data is of type SchedulerSendProfileData
,SCHEDULER_RECV //!< The profile receives a message from the scheduler and can execute a profile based on a value comparison of the message. Its data is of type SchedulerRecvProfileData
}; };
/** /**
...@@ -153,6 +155,26 @@ struct MsgDataStagingProfileData ...@@ -153,6 +155,26 @@ struct MsgDataStagingProfileData
Direction direction; //!< Whether data should be transfered to the HPST or from the HPST Direction direction; //!< Whether data should be transfered to the HPST or from the HPST
}; };
/**
* @brief The data associated to SCHEDULER_SEND profiles
*/
struct SchedulerSendProfileData
{
std::string message; //!< The message being sent to the scheduler
};
/**
* @brief The data associated to SCHEDULER_RECV profiles
*/
struct SchedulerRecvProfileData
{
std::string regex; //!< The regex which is tested for matching
std::string on_success; //!< The profile to execute if it matches
std::string on_failure; //!< The profile to execute if it does not match
std::string on_timeout; //!< The profile to execute if no message is in the buffer (i.e. the scheduler has not answered in time). Can be omitted which will result that the job will wait until its walltime is reached.
};
/** /**
* @brief Used to handles all the profiles of one workload * @brief Used to handles all the profiles of one workload
*/ */
......
...@@ -254,6 +254,36 @@ void JsonProtocolWriter::append_job_killed(const vector<string> & job_ids, ...@@ -254,6 +254,36 @@ void JsonProtocolWriter::append_job_killed(const vector<string> & job_ids,
_events.PushBack(event, _alloc); _events.PushBack(event, _alloc);
} }
void JsonProtocolWriter::append_from_job_message(const std::string & job_id,
const std::string & message,
double date)
{
/* {
"timestamp": 10.0,
"type": "FROM_JOB_MSG",
"data": {
"job_id": "w0!1",
"msg": "some_message"
}
} */
xbt_assert(date >= _last_date, "Date inconsistency");
_last_date = date;
_is_empty = false;
Value data(rapidjson::kObjectType);
data.AddMember("job_id",
Value().SetString(job_id.c_str(), _alloc), _alloc);
data.AddMember("msg", Value().SetString(message.c_str(), _alloc), _alloc);
Value event(rapidjson::kObjectType);
event.AddMember("timestamp", Value().SetDouble(date), _alloc);
event.AddMember("type", Value().SetString("FROM_JOB_MSG"), _alloc);
event.AddMember("data", data, _alloc);
_events.PushBack(event, _alloc);
}
void JsonProtocolWriter::append_resource_state_changed(const MachineRange & resources, void JsonProtocolWriter::append_resource_state_changed(const MachineRange & resources,
const string & new_state, const string & new_state,
double date) double date)
...@@ -344,6 +374,7 @@ JsonProtocolReader::JsonProtocolReader(BatsimContext *context) : ...@@ -344,6 +374,7 @@ JsonProtocolReader::JsonProtocolReader(BatsimContext *context) :
_type_to_handler_map["SUBMIT_JOB"] = &JsonProtocolReader::handle_submit_job; _type_to_handler_map["SUBMIT_JOB"] = &JsonProtocolReader::handle_submit_job;
_type_to_handler_map["SET_RESOURCE_STATE"] = &JsonProtocolReader::handle_set_resource_state; _type_to_handler_map["SET_RESOURCE_STATE"] = &JsonProtocolReader::handle_set_resource_state;
_type_to_handler_map["NOTIFY"] = &JsonProtocolReader::handle_notify; _type_to_handler_map["NOTIFY"] = &JsonProtocolReader::handle_notify;
_type_to_handler_map["TO_JOB_MSG"] = &JsonProtocolReader::handle_to_job_msg;
} }
JsonProtocolReader::~JsonProtocolReader() JsonProtocolReader::~JsonProtocolReader()
...@@ -785,6 +816,47 @@ void JsonProtocolReader::handle_notify(int event_number, ...@@ -785,6 +816,47 @@ void JsonProtocolReader::handle_notify(int event_number,
(void) timestamp; (void) timestamp;
} }
void JsonProtocolReader::handle_to_job_msg(int event_number,
double timestamp,
const Value &data_object)
{
(void) event_number; // Avoids a warning if assertions are ignored
/* {
"timestamp": 42.0,
"type": "TO_JOB_MSG",
"data": {
"job_id": "w!0",
"msg": "Some answer"
}
} */
xbt_assert(data_object.IsObject(), "Invalid JSON message: the 'data' value of event %d (TO_JOB_MSG) should be an object", event_number);
xbt_assert(data_object.HasMember("job_id"), "Invalid JSON message: the 'data' value of event %d (TO_JOB_MSG) should have a 'job_id' key", event_number);
const Value & job_id_value = data_object["job_id"];
xbt_assert(job_id_value.IsString(), "Invalid JSON message: in event %d (TO_JOB_MSG): ['data']['job_id'] should be a string", event_number);
string job_id = job_id_value.GetString();
xbt_assert(data_object.HasMember("msg"), "Invalid JSON msg: the 'data' value of event %d (TO_JOB_MSG) should have a 'msg' key", event_number);
const Value & msg_value = data_object["msg"];
xbt_assert(msg_value.IsString(), "Invalid JSON msg: in event %d (TO_JOB_MSG): ['data']['msg'] should be a string", event_number);
string msg = msg_value.GetString();
ToJobMessage * message = new ToJobMessage;
if (!identify_job_from_string(context, job_id, message->job_id))
{
xbt_assert(false, "Invalid JSON message: "
"Invalid job change job state received: The job identifier '%s' is not valid. "
"Job identifiers must be of the form [WORKLOAD_NAME!]JOB_ID. "
"If WORKLOAD_NAME! is omitted, WORKLOAD_NAME='static' is used. "
"Furthermore, the corresponding job must exist.", job_id.c_str());
}
message->message = msg;
send_message(timestamp, "server", IPMessageType::TO_JOB_MSG, (void *) message);
}
void JsonProtocolReader::handle_submit_job(int event_number, void JsonProtocolReader::handle_submit_job(int event_number,
double timestamp, double timestamp,
const Value &data_object) const Value &data_object)
......
...@@ -121,6 +121,16 @@ public: ...@@ -121,6 +121,16 @@ public:
virtual void append_job_killed(const std::vector<std::string> & job_ids, virtual void append_job_killed(const std::vector<std::string> & job_ids,
double date) = 0; double date) = 0;
/**
* @brief Appends a FROM_JOB_MSG event.
* @param[in] job_id The identifier of the job which sends the message.
* @param[in] message The message to be sent to the scheduler.
* @param[in] date The event date. Must be greater than or equal to the previous event.
*/
virtual void append_from_job_message(const std::string & job_id,
const std::string & message,
double date) = 0;
/** /**
* @brief Appends a RESOURCE_STATE_CHANGED event. * @brief Appends a RESOURCE_STATE_CHANGED event.
* @param[in] resources The resources whose state has changed. * @param[in] resources The resources whose state has changed.
...@@ -241,6 +251,16 @@ public: ...@@ -241,6 +251,16 @@ public:
void append_job_killed(const std::vector<std::string> & job_ids, void append_job_killed(const std::vector<std::string> & job_ids,
double date); double date);
/**
* @brief Appends a FROM_JOB_MSG event.
* @param[in] job_id The identifier of the job which sends the message.
* @param[in] message The message to be sent to the scheduler.
* @param[in] date The event date. Must be greater than or equal to the previous event.
*/
void append_from_job_message(const std::string & job_id,
const std::string & message,
double date);
/** /**
* @brief Appends a RESOURCE_STATE_CHANGED event. * @brief Appends a RESOURCE_STATE_CHANGED event.
* @param[in] resources The resources whose state has changed. * @param[in] resources The resources whose state has changed.
...@@ -417,6 +437,14 @@ public: ...@@ -417,6 +437,14 @@ public:
*/ */
void handle_notify(int event_number, double timestamp, const rapidjson::Value & data_object); void handle_notify(int event_number, double timestamp, const rapidjson::Value & data_object);
/**
* @brief Handles a TO_JOB_MSG event
* @param[in] event_number The event number in [0,nb_events[.
* @param[in] timestamp The event timestamp
* @param[in] data_object The data associated with the event (JSON object)
*/
void handle_to_job_msg(int event_number, double timestamp, const rapidjson::Value & data_object);
/** /**
* @brief Handles a SUBMIT_JOB event * @brief Handles a SUBMIT_JOB event
* @param[in] event_number The event number in [0,nb_events[. * @param[in] event_number The event number in [0,nb_events[.
......
...@@ -55,6 +55,7 @@ int server_process(int argc, char *argv[]) ...@@ -55,6 +55,7 @@ int server_process(int argc, char *argv[])
handler_map[IPMessageType::PSTATE_MODIFICATION] = server_on_pstate_modification; handler_map[IPMessageType::PSTATE_MODIFICATION] = server_on_pstate_modification;
handler_map[IPMessageType::SCHED_EXECUTE_JOB] = server_on_execute_job; handler_map[IPMessageType::SCHED_EXECUTE_JOB] = server_on_execute_job;
handler_map[IPMessageType::SCHED_CHANGE_JOB_STATE] = server_on_change_job_state; handler_map[IPMessageType::SCHED_CHANGE_JOB_STATE] = server_on_change_job_state;
handler_map[IPMessageType::TO_JOB_MSG] = server_on_to_job_msg;
handler_map[IPMessageType::SCHED_REJECT_JOB] = server_on_reject_job; handler_map[IPMessageType::SCHED_REJECT_JOB] = server_on_reject_job;
handler_map[IPMessageType::SCHED_KILL_JOB] = server_on_kill_jobs; handler_map[IPMessageType::SCHED_KILL_JOB] = server_on_kill_jobs;
handler_map[IPMessageType::SCHED_CALL_ME_LATER] = server_on_call_me_later; handler_map[IPMessageType::SCHED_CALL_ME_LATER] = server_on_call_me_later;
...@@ -674,6 +675,23 @@ void server_on_change_job_state(ServerData * data, ...@@ -674,6 +675,23 @@ void server_on_change_job_state(ServerData * data,
check_submitted_and_completed(data); check_submitted_and_completed(data);
} }
void server_on_to_job_msg(ServerData * data,
IPMessage * task_data)
{
xbt_assert(task_data->data != nullptr);
ToJobMessage * message = (ToJobMessage *) task_data->data;
Job * job = data->context->workloads.job_at(message->job_id);
XBT_INFO("Send message to job: Job %d (workload=%s) message=%s",
job->number, job->workload->name.c_str(),
message->message.c_str());
job->incoming_message_buffer.push_back(message->message);
check_submitted_and_completed(data);
}
void server_on_reject_job(ServerData * data, void server_on_reject_job(ServerData * data,
IPMessage * task_data) IPMessage * task_data)
{ {
......
...@@ -222,3 +222,11 @@ void server_on_execute_job(ServerData * data, ...@@ -222,3 +222,11 @@ void server_on_execute_job(ServerData * data,
*/ */
void server_on_change_job_state(ServerData * data, void server_on_change_job_state(ServerData * data,
IPMessage * task_data); IPMessage * task_data);
/**
* @brief Server TO_JOB_MSG handler
* @param[in,out] data The data associated with the server_process
* @param[in,out] task_data The data associated with the message the server received
*/
void server_on_to_job_msg(ServerData * data,
IPMessage * task_data);
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment