Commit 6f5d650a authored by S. Lackner's avatar S. Lackner

[code] Add message: change_job_state

The new message can be used to change the state of either submitted or running
jobs without actually running the job profiles.
parent e8358185
......@@ -77,6 +77,9 @@ std::string ip_message_type_to_string(IPMessageType type)
case IPMessageType::SCHED_EXECUTE_JOB:
s = "SCHED_EXECUTE_JOB";
break;
case IPMessageType::SCHED_CHANGE_JOB_STATE:
s = "SCHED_CHANGE_JOB_STATE";
break;
case IPMessageType::SCHED_REJECT_JOB:
s = "SCHED_REJECT_JOB";
break;
......@@ -173,6 +176,11 @@ IPMessage::~IPMessage()
// The Allocations themselves are not memory-deallocated there but at the end of the job execution.
delete msg;
} break;
case IPMessageType::SCHED_CHANGE_JOB_STATE:
{
ChangeJobStateMessage * msg = (ChangeJobStateMessage *) data;
delete msg;
} break;
case IPMessageType::SCHED_REJECT_JOB:
{
JobRejectedMessage * msg = (JobRejectedMessage *) data;
......
......@@ -56,6 +56,7 @@ enum class IPMessageType
,JOB_COMPLETED //!< Launcher -> Server. The job launcher tells the server a job has been completed.
,PSTATE_MODIFICATION //!< Scheduler -> Server. The scheduler tells the server a scheduling event occured (modify the state of some resources).
,SCHED_EXECUTE_JOB //!< Scheduler -> Server. The scheduler tells the server a scheduling event occured (execute a job).
,SCHED_CHANGE_JOB_STATE //!< Scheduler -> Server. The scheduler tells the server a scheduling event occured (change the state of a job).
,SCHED_REJECT_JOB //!< Scheduler -> Server. The scheduler tells the server a scheduling event occured (reject a job).
,SCHED_KILL_JOB //!< Scheduler -> Server. The scheduler tells the server a scheduling event occured (kill a job).
,SCHED_CALL_ME_LATER //!< Scheduler -> Server. The scheduler tells the server a scheduling event occured (the scheduler wants to be called in the future).
......@@ -127,6 +128,16 @@ struct JobCompletedMessage
JobIdentifier job_id; //!< The JobIdentifier
};
/**
* @brief The content of the ChangeJobState message
*/
struct ChangeJobStateMessage
{
JobIdentifier job_id; //!< The JobIdentifier
std::string job_state; //!< The new job state
std::string kill_reason; //!< The optional kill reason if the new job state is COMPLETED_KILLED
};
/**
* @brief The content of the JobRejected message
*/
......
......@@ -296,6 +296,7 @@ JsonProtocolReader::JsonProtocolReader(BatsimContext *context) :
_type_to_handler_map["QUERY_REQUEST"] = &JsonProtocolReader::handle_query_request;
_type_to_handler_map["REJECT_JOB"] = &JsonProtocolReader::handle_reject_job;
_type_to_handler_map["EXECUTE_JOB"] = &JsonProtocolReader::handle_execute_job;
_type_to_handler_map["CHANGE_JOB_STATE"] = &JsonProtocolReader::handle_change_job_state;
_type_to_handler_map["CALL_ME_LATER"] = &JsonProtocolReader::handle_call_me_later;
_type_to_handler_map["KILL_JOB"] = &JsonProtocolReader::handle_kill_job;
_type_to_handler_map["SUBMIT_JOB"] = &JsonProtocolReader::handle_submit_job;
......@@ -644,6 +645,70 @@ void JsonProtocolReader::handle_set_resource_state(int event_number,
send_message(timestamp, "server", IPMessageType::PSTATE_MODIFICATION, (void*) message);
}
void JsonProtocolReader::handle_change_job_state(int event_number,
double timestamp,
const Value &data_object)
{
(void) event_number; // Avoids a warning if assertions are ignored
/* {
"timestamp": 42.0,
"type": "CHANGE_JOB_STATE",
"data": {
"job_id": "w12!45",
"job_state": "COMPLETED_KILLED",
"kill_reason": "Sub-jobs were killed."
}
} */
xbt_assert(data_object.IsObject(), "Invalid JSON message: the 'data' value of event %d (CHANGE_JOB_STATE) should be an object", event_number);
xbt_assert(data_object.HasMember("job_id"), "Invalid JSON message: the 'data' value of event %d (CHANGE_JOB_STATE) should have a 'job_id' key", event_number);
const Value & job_id_value = data_object["job_id"];
xbt_assert(job_id_value.IsString(), "Invalid JSON message: in event %d (CHANGE_JOB_STATE): ['data']['job_id'] should be a string", event_number);
string job_id = job_id_value.GetString();
xbt_assert(data_object.HasMember("job_state"), "Invalid JSON message: the 'data' value of event %d (CHANGE_JOB_STATE) should have a 'job_state' key", event_number);
const Value & job_state_value = data_object["job_state"];
xbt_assert(job_state_value.IsString(), "Invalid JSON message: in event %d (CHANGE_JOB_STATE): ['data']['job_state'] should be a string", event_number);
string job_state = job_state_value.GetString();
if (job_state != "NOT_SUBMITTED"
&& job_state != "SUBMITTED"
&& job_state != "RUNNING"
&& job_state != "COMPLETED_SUCCESSFULLY"
&& job_state != "COMPLETED_KILLED"
&& job_state != "REJECTED") {
xbt_assert(false, "Invalid JSON message: in event %d (CHANGE_JOB_STATE): ['data']['job_state'] must be one of: NOT_SUBMITTED, SUBMITTED, RUNNING, COMPLETED_SUCCESSFULLY, COMPLETED_KILLED, REJECTED", event_number);
}
string kill_reason;
if (data_object.HasMember("kill_reason")) {
const Value & kill_reason_value = data_object["kill_reason"];
xbt_assert(kill_reason_value.IsString(), "Invalid JSON message: in event %d (CHANGE_kill_reason): ['data']['kill_reason'] should be a string", event_number);
kill_reason = kill_reason_value.GetString();
if (kill_reason != "" && job_state != "COMPLETED_KILLED") {
xbt_assert(false, "Invalid JSON message: in event %d (CHANGE_JOB_STATE): ['data']['kill_reason'] is only allowed if the job_state is COMPLETED_KILLED", event_number);
}
}
ChangeJobStateMessage * message = new ChangeJobStateMessage;
if (!identify_job_from_string(context, job_id, message->job_id))
{
xbt_assert(false, "Invalid JSON message: "
"Invalid job change job state received: The job identifier '%s' is not valid. "
"Job identifiers must be of the form [WORKLOAD_NAME!]JOB_ID. "
"If WORKLOAD_NAME! is omitted, WORKLOAD_NAME='static' is used. "
"Furthermore, the corresponding job must exist.", job_id.c_str());
}
message->job_state = job_state;
message->kill_reason = kill_reason;
send_message(timestamp, "server", IPMessageType::SCHED_CHANGE_JOB_STATE, (void *) message);
}
void JsonProtocolReader::handle_notify(int event_number,
double timestamp,
const Value &data_object)
......
......@@ -368,6 +368,14 @@ public:
*/
void handle_execute_job(int event_number, double timestamp, const rapidjson::Value & data_object);
/**
* @brief Handles an CHANGE_JOB_STATE event
* @param[in] event_number The event number in [0,nb_events[.
* @param[in] timestamp The event timestamp
* @param[in] data_object The data associated with the event (JSON object)
*/
void handle_change_job_state(int event_number, double timestamp, const rapidjson::Value & data_object);
/**
* @brief Handles a CALL_ME_LATER event
* @param[in] event_number The event number in [0,nb_events[.
......
......@@ -52,6 +52,7 @@ int server_process(int argc, char *argv[])
handler_map[IPMessageType::JOB_COMPLETED] = server_on_job_completed;
handler_map[IPMessageType::PSTATE_MODIFICATION] = server_on_pstate_modification;
handler_map[IPMessageType::SCHED_EXECUTE_JOB] = server_on_execute_job;
handler_map[IPMessageType::SCHED_CHANGE_JOB_STATE] = server_on_change_job_state;
handler_map[IPMessageType::SCHED_REJECT_JOB] = server_on_reject_job;
handler_map[IPMessageType::SCHED_KILL_JOB] = server_on_kill_jobs;
handler_map[IPMessageType::SCHED_CALL_ME_LATER] = server_on_call_me_later;
......@@ -607,6 +608,77 @@ void server_on_submit_job(ServerData * data,
}
}
void server_on_change_job_state(ServerData * data,
IPMessage * task_data)
{
xbt_assert(task_data->data != nullptr);
ChangeJobStateMessage * message = (ChangeJobStateMessage *) task_data->data;
Job * job = data->context->workloads.job_at(message->job_id);
XBT_INFO("Change job state: Job %d (workload=%s) to state %s (kill_Reason=%s)",
job->number, job->workload->name.c_str(),
message->job_state.c_str(), message->kill_reason.c_str());
JobState new_state;
if (message->job_state == "NOT_SUBMITTED") {
new_state = JobState::JOB_STATE_NOT_SUBMITTED;
} else if (message->job_state == "SUBMITTED") {
new_state = JobState::JOB_STATE_SUBMITTED;
} else if (message->job_state == "RUNNING") {
new_state = JobState::JOB_STATE_RUNNING;
} else if (message->job_state == "COMPLETED_SUCCESSFULLY") {
new_state = JobState::JOB_STATE_COMPLETED_SUCCESSFULLY;
} else if (message->job_state == "COMPLETED_KILLED") {
new_state = JobState::JOB_STATE_COMPLETED_KILLED;
} else if (message->job_state == "REJECTED") {
new_state = JobState::JOB_STATE_REJECTED;
} else {
xbt_assert(false, "Invalid new job state");
}
switch (job->state) {
case JobState::JOB_STATE_SUBMITTED:
switch (new_state) {
case JobState::JOB_STATE_RUNNING:
job->starting_time = MSG_get_clock();
data->nb_running_jobs++;
xbt_assert(data->nb_running_jobs <= data->nb_submitted_jobs);
break;
case JobState::JOB_STATE_REJECTED:
data->nb_completed_jobs++;
xbt_assert(data->nb_completed_jobs + data->nb_running_jobs <= data->nb_submitted_jobs);
break;
default:
xbt_assert(false, "Can only change the state of a submitted job to running or rejected");
}
break;
case JobState::JOB_STATE_RUNNING:
switch (new_state) {
case JobState::JOB_STATE_COMPLETED_SUCCESSFULLY:
case JobState::JOB_STATE_COMPLETED_KILLED:
job->runtime = MSG_get_clock() - job->starting_time;
data->nb_running_jobs--;
xbt_assert(data->nb_running_jobs >= 0);
data->nb_completed_jobs++;
xbt_assert(data->nb_completed_jobs + data->nb_running_jobs <= data->nb_submitted_jobs);
break;
default:
xbt_assert(false, "Can only change the state of a running job to completed (successfully and killed)");
}
break;
default:
xbt_assert(false, "Can only change the state of a submitted or running job.");
}
job->state = new_state;
job->kill_reason = message->kill_reason;
XBT_INFO("Job state changed: Job %d (workload=%s)",
job->number, job->workload->name.c_str());
check_submitted_and_completed(data);
}
void server_on_reject_job(ServerData * data,
IPMessage * task_data)
{
......
......@@ -214,3 +214,11 @@ void server_on_call_me_later(ServerData * data,
*/
void server_on_execute_job(ServerData * data,
IPMessage * task_data);
/**
* @brief Server SCHED_CHANGE_JOB_STATE handler
* @param[in,out] data The data associated with the server_process
* @param[in,out] task_data The data associated with the message the server received
*/
void server_on_change_job_state(ServerData * data,
IPMessage * task_data);
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment