Commit 927ad867 authored by Millian Poquet's avatar Millian Poquet

[code] kill_jobs + refactoring

parent dfb2c03a
......@@ -70,6 +70,9 @@ std::string ip_message_type_to_string(IPMessageType type)
case IPMessageType::SCHED_REJECT_JOB:
s = "SCHED_REJECT_JOB";
break;
case IPMessageType::SCHED_KILL_JOB:
s = "SCHED_KILL_JOB";
break;
case IPMessageType::SCHED_CALL_ME_LATER:
s = "SCHED_CALL_ME_LATER";
break;
......@@ -149,7 +152,7 @@ IPMessage::~IPMessage()
} break;
case IPMessageType::SCHED_EXECUTE_JOB:
{
SchedulingAllocationMessage * msg = (SchedulingAllocationMessage *) data;
ExecuteJobMessage * msg = (ExecuteJobMessage *) data;
// The Allocations themselves are not memory-deallocated there but at the end of the job execution.
delete msg;
} break;
......@@ -158,6 +161,11 @@ IPMessage::~IPMessage()
JobRejectedMessage * msg = (JobRejectedMessage *) data;
delete msg;
} break;
case IPMessageType::SCHED_KILL_JOB:
{
KillJobMessage * msg = (KillJobMessage *) data;
delete msg;
} break;
case IPMessageType::SCHED_CALL_ME_LATER:
{
CallMeLaterMessage * msg = (CallMeLaterMessage*) data;
......
......@@ -57,6 +57,7 @@ enum class IPMessageType
,PSTATE_MODIFICATION //!< Scheduler -> Server. The scheduler tells the server a scheduling event occured (modify the state of some resources).
,SCHED_EXECUTE_JOB //!< Scheduler -> Server. The scheduler tells the server a scheduling event occured (execute a job).
,SCHED_REJECT_JOB //!< Scheduler -> Server. The scheduler tells the server a scheduling event occured (reject a job).
,SCHED_KILL_JOB //!< Scheduler -> Server. The scheduler tells the server a scheduling event occured (kill a job).
,SCHED_CALL_ME_LATER //!< Scheduler -> Server. The scheduler tells the server a scheduling event occured (the scheduler wants to be called in the future).
,SCHED_TELL_ME_ENERGY //!< Scheduler -> Server. The scheduler tells the server a scheduling event occured (the scheduler wants to know the platform consumed energy).
,SCHED_WAIT_ANSWER //!< Scheduler -> Server. The scheduler tells the server a scheduling event occured (a WAIT_ANSWER message).
......@@ -144,13 +145,21 @@ struct SchedulingAllocation
};
/**
* @brief The content of the JobSubmitted message
* @brief The content of the EXECUTE_JOB message
*/
struct SchedulingAllocationMessage
struct ExecuteJobMessage
{
SchedulingAllocation * allocation; //!< The allocation itself
};
/**
* @brief The content of the KILL_JOB message
*/
struct KillJobMessage
{
std::vector<JobIdentifier> jobs_ids; //!< The ids of the jobs to kill
};
/**
* @brief The content of the PstateModification message
*/
......
......@@ -490,7 +490,7 @@ void JsonProtocolReader::handle_reject_job(int event_number,
"For being rejected, a job must be submitted and not allocated yet.",
job->id.c_str());
send_message(timestamp, "server", IPMessageType::SCHED_REJECTION, (void*) message);
send_message(timestamp, "server", IPMessageType::SCHED_REJECT_JOB, (void*) message);
}
void JsonProtocolReader::handle_execute_job(int event_number,
......@@ -507,10 +507,11 @@ void JsonProtocolReader::handle_execute_job(int event_number,
}
} */
SchedulingAllocationMessage * message = new SchedulingAllocationMessage;
ExecuteJobMessage * message = new ExecuteJobMessage;
message->allocation = new SchedulingAllocation;
xbt_assert(data_object.IsObject(), "Invalid JSON message: the 'data' value of event %d (EXECUTE_JOB) should be an object", event_number);
xbt_assert(data_object.MemberCount() == 2 || data_object.MemberCount() == 3, "Invalid JSON message: the 'data' value of event %d (EXECUTE_JOB) should be of size in {2,3} (size=%d)", event_number, (int)data_object.MemberCount());
// *************************
// Job identifier management
......@@ -614,7 +615,7 @@ void JsonProtocolReader::handle_execute_job(int event_number,
}
// Everything has been parsed correctly, let's inject the message into the simulation.
send_message(timestamp, "server", IPMessageType::SCHED_ALLOCATION, (void*) message);
send_message(timestamp, "server", IPMessageType::SCHED_EXECUTE_JOB, (void*) message);
}
void JsonProtocolReader::handle_call_me_later(int event_number,
......@@ -658,6 +659,9 @@ void JsonProtocolReader::handle_set_resource_state(int event_number,
// Resources management
// ********************
// Let's read it from the JSON message
xbt_assert(data_object.IsObject(), "Invalid JSON message: the 'data' value of event %d (SET_RESOURCE_STATE) should be an object", event_number);
xbt_assert(data_object.MemberCount() == 2, "Invalid JSON message: the 'data' value of event %d (SET_RESOURCE_STATE) should be of size 2 (size=%d)", event_number, (int)data_object.MemberCount());
xbt_assert(data_object.HasMember("resources"), "Invalid JSON message: the 'data' value of event %d (SET_RESOURCE_STATE) should contain a 'resources' key.", event_number);
const Value & resources_value = data_object["resources"];
xbt_assert(resources_value.IsString(), "Invalid JSON message: the 'resources' value in the 'data' value of event %d (SET_RESOURCE_STATE) should be a string.", event_number);
......@@ -705,10 +709,38 @@ void JsonProtocolReader::handle_kill_job(int event_number,
double timestamp,
const Value &data_object)
{
xbt_assert(false, "Unimplemented! TODO");
(void) event_number;
(void) timestamp;
(void) data_object;
/* {
"timestamp": 10.0,
"type": "KILL_JOB",
"data": {"job_ids": ["w0!1", "w0!2"]}
} */
KillJobMessage * message = new KillJobMessage;
xbt_assert(data_object.IsObject(), "Invalid JSON message: the 'data' value of event %d (KILL_JOB) should be an object", event_number);
xbt_assert(data_object.MemberCount() == 1, "Invalid JSON message: the 'data' value of event %d (KILL_JOB) should be of size 1 (size=%d)", event_number, (int)data_object.MemberCount());
xbt_assert(data_object.HasMember("job_ids"), "Invalid JSON message: the 'data' value of event %d (KILL_JOB) should contain a 'job_ids' key.", event_number);
const Value & job_ids_array = data_object["job_ids"];
xbt_assert(job_ids_array.IsArray(), "Invalid JSON message: the 'job_ids' value in the 'data' value of event %d (KILL_JOB) should be an array.", event_number);
xbt_assert(job_ids_array.Size() > 0, "Invalid JSON message: the 'job_ids' array in the 'data' value of event %d (KILL_JOB) should be non-empty.", event_number);
message->jobs_ids.reserve(job_ids_array.Size());
for (unsigned int i = 0; i < job_ids_array.Size(); ++i)
{
const Value & job_id_value = job_ids_array[i];
if (!identify_job_from_string(context, job_id_value.GetString(), message->jobs_ids[i]))
xbt_assert(false, "Invalid JSON message: in event %d (KILL_JOB): job_id %d ('%s') is invalid.", event_number, i, message->jobs_ids[i].to_string().c_str());
Job * job = context->workloads.job_at(message->jobs_ids[i]);
xbt_assert(job->state == JobState::JOB_STATE_RUNNING ||
job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY ||
job->state == JobState::JOB_STATE_COMPLETED_KILLED,
"Invalid JSON message: in event %d (KILL_JOB): job_id %d ('%s') refers to a job not being executed nor completed.",
event_number, i, message->jobs_ids[i].to_string().c_str());
}
send_message(timestamp, "server", IPMessageType::SCHED_KILL_JOB, (void *) message);
}
void JsonProtocolReader::send_message(double when,
......
......@@ -246,7 +246,7 @@ int server_process(int argc, char *argv[])
} break; // end of case JOB_SUBMITTED_BY_DP
case IPMessageType::SCHED_REJECTION:
case IPMessageType::SCHED_REJECT_JOB:
{
xbt_assert(task_data->data != nullptr);
JobRejectedMessage * message = (JobRejectedMessage *) task_data->data;
......@@ -259,6 +259,18 @@ int server_process(int argc, char *argv[])
job->number, job->workload->name.c_str());
} break; // end of case SCHED_REJECTION
case IPMessageType::SCHED_KILL_JOB:
{
xbt_assert(task_data->data != nullptr);
KillJobMessage * message = (KillJobMessage *) task_data->data;
KillerProcessArguments * args = new KillerProcessArguments;
args->context = context;
args->jobs_ids = message->jobs_ids;
MSG_process_create("killer_process", killer_process, (void *) args, MSG_host_self());
} break; // end of case SCHED_KILL_JOB
case IPMessageType::SCHED_CALL_ME_LATER:
{
xbt_assert(task_data->data != nullptr);
......@@ -363,10 +375,10 @@ int server_process(int argc, char *argv[])
} break; // end of case PSTATE_MODIFICATION
case IPMessageType::SCHED_ALLOCATION:
case IPMessageType::SCHED_EXECUTE_JOB:
{
xbt_assert(task_data->data != nullptr);
SchedulingAllocationMessage * message = (SchedulingAllocationMessage *) task_data->data;
ExecuteJobMessage * message = (ExecuteJobMessage *) task_data->data;
SchedulingAllocation * allocation = message->allocation;
Job * job = context->workloads.job_at(allocation->job_id);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment