Maj terminée. Pour consulter la release notes associée voici le lien :
https://about.gitlab.com/releases/2021/07/07/critical-security-release-gitlab-14-0-4-released/

Commit d250bb13 authored by Millian Poquet's avatar Millian Poquet
Browse files

Merge branch 'move-allocations-from-protocol-to-server'

parents 9f6585ea 7122e7cc
......@@ -102,6 +102,9 @@ std::string ip_message_type_to_string(IPMessageType type)
case IPMessageType::SCHED_WAIT_ANSWER:
s = "SCHED_WAIT_ANSWER";
break;
case IPMessageType::SCHED_SET_JOB_METADATA:
s = "SCHED_SET_JOB_METADATA";
break;
case IPMessageType::WAIT_QUERY:
s = "WAIT_QUERY";
break;
......@@ -217,6 +220,11 @@ IPMessage::~IPMessage()
case IPMessageType::SCHED_TELL_ME_ENERGY:
{
} break;
case IPMessageType::SCHED_SET_JOB_METADATA:
{
SetJobMetadataMessage * msg = (SetJobMetadataMessage *) data;
delete msg;
} break;
case IPMessageType::WAIT_QUERY:
{
WaitQueryMessage * msg = (WaitQueryMessage *) data;
......
......@@ -37,6 +37,7 @@ enum class IPMessageType
,SCHED_KILL_JOB //!< Scheduler -> Server. The scheduler tells the server a scheduling event occured (kill a job).
,SCHED_CALL_ME_LATER //!< Scheduler -> Server. The scheduler tells the server a scheduling event occured (the scheduler wants to be called in the future).
,SCHED_TELL_ME_ENERGY //!< Scheduler -> Server. The scheduler tells the server a scheduling event occured (the scheduler wants to know the platform consumed energy).
,SCHED_SET_JOB_METADATA //!< Scheduler -> Server. The scheduler tells the server a scheduling event occured (a SET_JOB_METADATA message).
,SCHED_WAIT_ANSWER //!< Scheduler -> Server. The scheduler tells the server a scheduling event occured (a WAIT_ANSWER message).
,WAIT_QUERY //!< Server -> Scheduler. The scheduler tells the server a scheduling event occured (a WAIT_ANSWER message).
,SCHED_READY //!< Scheduler -> Server. The scheduler tells the server that the scheduler is ready (the scheduler is ready, messages can be sent to it).
......@@ -107,6 +108,15 @@ struct ProfileRegisteredByDPMessage
std::string profile; //!< The registered profile data
};
/**
* @brief The content of the SetJobMetadataMessage message
*/
struct SetJobMetadataMessage
{
JobIdentifier job_id; //!< The JobIdentifier of the new job
std::string metadata; //!< The job metadata string (empty if redis is enabled)
};
/**
* @brief The content of the JobCompleted message
*/
......
......@@ -286,7 +286,7 @@ void Jobs::displayDebug() const
s += "Jobs : [" + boost::algorithm::join(jobsVector, ", ") + "]";
// Let us display the string which has been built
XBT_INFO("%s", s.c_str());
XBT_DEBUG("%s", s.c_str());
}
const std::map<JobIdentifier, Job* > &Jobs::jobs() const
......
......@@ -784,18 +784,6 @@ void JsonProtocolReader::handle_reject_job(int event_number,
JobRejectedMessage * message = new JobRejectedMessage;
message->job_id = JobIdentifier(job_id);
if (!(context->workloads.job_is_registered(message->job_id)))
{
xbt_assert(false, "Job '%s' does not exist.", job_id.c_str());
}
Job * job = context->workloads.job_at(message->job_id);
(void) job; // Avoids a warning if assertions are ignored
xbt_assert(job->state == JobState::JOB_STATE_SUBMITTED,
"Invalid JSON message: "
"Invalid rejection received: job %s cannot be rejected at the present time."
"For being rejected, a job must be submitted and not allocated yet.",
job->id.to_string().c_str());
send_message_at_time(timestamp, "server", IPMessageType::SCHED_REJECT_JOB, (void*) message);
}
......@@ -847,10 +835,6 @@ void JsonProtocolReader::handle_execute_job(int event_number,
// Let's retrieve the job identifier
JobIdentifier job_id = JobIdentifier(job_id_str);
if (!(context->workloads.job_is_registered(job_id)))
{
xbt_assert(false, "Invalid message in event %d (EXECUTE_JOB): job with job_id '%s' does not exists", event_number, job_id_str.c_str());
}
message->allocation->job_id = job_id;
// *********************
......@@ -1166,14 +1150,11 @@ void JsonProtocolReader::handle_set_job_metadata(int event_number,
boost::regex r("[^\"]*");
xbt_assert(boost::regex_match(metadata, r), "Invalid JSON message: the 'metadata' value in the 'data' value of event %d (SET_JOB_METADATA) should not contain double quotes (got ###%s###)", event_number, metadata.c_str());
JobIdentifier job_identifier = JobIdentifier(job_id);
if (!(context->workloads.job_is_registered(job_identifier)))
{
xbt_assert(false, "Invalid message in event %d (SET_JOB_METADATA): The following job does not exist: %s", event_number, job_id.c_str());
}
SetJobMetadataMessage * message = new SetJobMetadataMessage;
message->job_id = JobIdentifier(job_id);
message->metadata = metadata;
Job * job = context->workloads.job_at(job_identifier);
job->metadata = metadata;
send_message_at_time(timestamp, "server", IPMessageType::SCHED_SET_JOB_METADATA, (void *) message);
}
void JsonProtocolReader::handle_change_job_state(int event_number,
......@@ -1202,6 +1183,7 @@ void JsonProtocolReader::handle_change_job_state(int event_number,
xbt_assert(job_state_value.IsString(), "Invalid JSON message: in event %d (CHANGE_JOB_STATE): ['data']['job_state'] should be a string", event_number);
string job_state = job_state_value.GetString();
// Put this as a 'global' set?
set<string> allowed_states = {"NOT_SUBMITTED",
"RUNNING",
"COMPLETED_SUCCESSFULLY",
......@@ -1209,7 +1191,6 @@ void JsonProtocolReader::handle_change_job_state(int event_number,
"COMPLETED_KILLED",
"REJECTED"};
if (allowed_states.count(job_state) != 1)
{
xbt_assert(false, "Invalid JSON message: in event %d (CHANGE_JOB_STATE): "
......@@ -1218,13 +1199,7 @@ void JsonProtocolReader::handle_change_job_state(int event_number,
}
ChangeJobStateMessage * message = new ChangeJobStateMessage;
message->job_id = JobIdentifier(job_id);
if (!(context->workloads.job_is_registered(message->job_id)))
{
xbt_assert(false, "The job '%s' does not exist.", job_id.c_str());
}
message->job_state = job_state;
send_message_at_time(timestamp, "server", IPMessageType::SCHED_CHANGE_JOB_STATE, (void *) message);
......@@ -1291,12 +1266,7 @@ void JsonProtocolReader::handle_to_job_msg(int event_number,
string msg = msg_value.GetString();
ToJobMessage * message = new ToJobMessage;
message->job_id = JobIdentifier(job_id);
if (!(context->workloads.job_is_registered(message->job_id)))
{
xbt_assert(false, "The job '%s' does not exist.", job_id.c_str());
}
message->message = msg;
send_message_at_time(timestamp, "server", IPMessageType::TO_JOB_MSG, (void *) message);
......@@ -1341,12 +1311,7 @@ void JsonProtocolReader::handle_register_job(int event_number,
const Value & job_id_value = data_object["job_id"];
xbt_assert(job_id_value.IsString(), "Invalid JSON message: in event %d (REGISTER_JOB): ['data']['job_id'] should be a string", event_number);
string job_id = job_id_value.GetString();
message->job_id = JobIdentifier(job_id);
if (context->workloads.exists(message->job_id.workload_name))
{
xbt_assert(!context->workloads.job_is_registered(message->job_id), "Invalid message in event %d (REGISTER_JOB): job_id '%s' already exists", event_number, job_id.c_str());
}
// Read the job description, either directly or from Redis
if (data_object.HasMember("job"))
......@@ -1370,35 +1335,6 @@ void JsonProtocolReader::handle_register_job(int event_number,
message->job_description = context->storage.get(job_key);
}
// FIXME: All this initialization should be done in the server?
// Create the job into memory now (so that following events at the same timestamp can refer to this job).
// But first, create the workload if needed.
Workload * workload = nullptr;
if (context->workloads.exists(message->job_id.workload_name))
{
workload = context->workloads.at(message->job_id.workload_name);
}
else
{
workload = Workload::new_dynamic_workload(message->job_id.workload_name);
context->workloads.insert_workload(workload->name, workload);
}
// Create the job.
XBT_DEBUG("Parsing user-submitted job %s", message->job_id.to_string().c_str());
Job * job = Job::from_json(message->job_description, workload,
"Invalid JSON job submitted by the scheduler");
xbt_assert(job->id.job_name == message->job_id.job_name, "Internal error");
xbt_assert(job->id.workload_name == message->job_id.workload_name, "Internal error");
xbt_assert(job->submission_time - (Rational)timestamp <= 1, "Invalid job registered by the scheduler '%s': The desired submission time (%f) is in the future but this is not directly possible. If you really want to submit a job later on, first send a CALL_ME_LATER then register the job at the right time.", job->id.to_string().c_str(), (double)job->submission_time);
workload->jobs->add_job(job);
job->state = JobState::JOB_STATE_SUBMITTED;
xbt_assert(!data_object.HasMember("profile"),
"Profile description found in REGISTER_JOB event: this is not allowed anymore. "
"Please submit the profile in a separate REGISTER_PROFILE event.");
send_message_at_time(timestamp, "server", IPMessageType::JOB_REGISTERED_BY_DP, (void *) message);
}
......@@ -1483,10 +1419,6 @@ void JsonProtocolReader::handle_kill_job(int event_number,
{
const Value & job_id_value = job_ids_array[i];
message->jobs_ids[i] = JobIdentifier(job_id_value.GetString());
if (!(context->workloads.job_is_registered(message->jobs_ids[i])))
{
xbt_assert(false, "Invalid message in event %d (KILL_JOB): job %d with job_id:'%s' does not exists", event_number, i, message->jobs_ids[i].to_string().c_str());
}
}
send_message_at_time(timestamp, "server", IPMessageType::SCHED_KILL_JOB, (void *) message);
......
......@@ -58,6 +58,7 @@ void server_process(BatsimContext * context)
handler_map[IPMessageType::SCHED_KILL_JOB] = server_on_kill_jobs;
handler_map[IPMessageType::SCHED_CALL_ME_LATER] = server_on_call_me_later;
handler_map[IPMessageType::SCHED_TELL_ME_ENERGY] = server_on_sched_tell_me_energy;
handler_map[IPMessageType::SCHED_SET_JOB_METADATA] = server_on_set_job_metadata;
handler_map[IPMessageType::SCHED_WAIT_ANSWER] = server_on_sched_wait_answer;
handler_map[IPMessageType::WAIT_QUERY] = server_on_wait_query;
handler_map[IPMessageType::SCHED_READY] = server_on_sched_ready;
......@@ -559,14 +560,18 @@ void server_on_register_job(ServerData * data,
xbt_assert(data->context->workloads.exists(message->job_id.workload_name),
"Internal error: Workload '%s' should exist.",
message->job_id.workload_name.c_str());
xbt_assert(data->context->workloads.job_is_registered(message->job_id),
"Internal error: job '%s' should exist.",
message->job_id.to_string().c_str());
const Job * job = data->context->workloads.job_at(message->job_id);
xbt_assert(!data->context->workloads.job_is_registered(message->job_id),
"Cannot register new job '%s', it already exists in the workload.", message->job_id.to_string().c_str());
Workload * workload = data->context->workloads.at(message->job_id.workload_name);
// Create the job.
XBT_DEBUG("Parsing user-submitted job %s", message->job_id.to_string().c_str());
Job * job = Job::from_json(message->job_description, workload,
"Invalid JSON job submitted by the scheduler");
xbt_assert(job->id.job_name == message->job_id.job_name, "Internal error");
xbt_assert(job->id.workload_name == message->job_id.workload_name, "Internal error");
if (!workload->profiles->exists(job->profile))
{
xbt_die(
......@@ -581,9 +586,12 @@ void server_on_register_job(ServerData * data,
}
workload->check_single_job_validity(job);
workload->jobs->add_job(job);
job->state = JobState::JOB_STATE_SUBMITTED;
if (data->context->registration_sched_ack)
{
// TODO Sleep until submit time is reached before sending the ack (JOB_SUBMITTED)
string job_json_description, profile_json_description;
if (!data->context->redis_enabled)
......@@ -644,12 +652,33 @@ void server_on_register_profile(ServerData * data,
}
}
void server_on_set_job_metadata(ServerData * data,
IPMessage * task_data)
{
xbt_assert(task_data->data != nullptr);
SetJobMetadataMessage * message = (SetJobMetadataMessage *)task_data->data;
JobIdentifier job_identifier = JobIdentifier(message->job_id);
if (!(data->context->workloads.job_is_registered(job_identifier)))
{
xbt_die("The job '%s' does not exist, cannot set its metadata", message->job_id.to_string().c_str());
}
Job * job = data->context->workloads.job_at(job_identifier);
job->metadata = message->metadata;
XBT_DEBUG("Metadata of job '%s' has been set", message->job_id.to_string().c_str());
}
void server_on_change_job_state(ServerData * data,
IPMessage * task_data)
{
xbt_assert(task_data->data != nullptr);
ChangeJobStateMessage * message = (ChangeJobStateMessage *) task_data->data;
if (!(data->context->workloads.job_is_registered(message->job_id)))
{
xbt_die("The job '%s' does not exist.", message->job_id.to_string().c_str());
}
Job * job = data->context->workloads.job_at(message->job_id);
XBT_INFO("Change job state: Job %s to state %s",
......@@ -715,9 +744,14 @@ void server_on_to_job_msg(ServerData * data,
xbt_assert(task_data->data != nullptr);
ToJobMessage * message = (ToJobMessage *) task_data->data;
if (!(data->context->workloads.job_is_registered(message->job_id)))
{
xbt_die("The job '%s' does not exist, cannot send a message to that job.",
message->job_id.to_string().c_str());
}
Job * job = data->context->workloads.job_at(message->job_id);
XBT_INFO("Send message to job: Job %s message=%s",
XBT_INFO("Send message to job: Job '%s' message='%s'",
job->id.to_string().c_str(),
message->message.c_str());
......@@ -750,11 +784,22 @@ void server_on_reject_job(ServerData * data,
xbt_assert(task_data->data != nullptr);
JobRejectedMessage * message = (JobRejectedMessage *) task_data->data;
if (!(data->context->workloads.job_is_registered(message->job_id)))
{
xbt_die("Job '%s' does not exist.", message->job_id.to_string().c_str());
}
Job * job = data->context->workloads.job_at(message->job_id);
(void) job; // Avoids a warning if assertions are ignored
xbt_assert(job->state == JobState::JOB_STATE_SUBMITTED,
"Invalid rejection received: job '%s' cannot be rejected at the present time. "
"To be rejected, a job must be submitted and not allocated yet.",
job->id.to_string().c_str());
job->state = JobState::JOB_STATE_REJECTED;
data->nb_completed_jobs++;
XBT_INFO("Job %s has been rejected",
XBT_INFO("Job '%s' has been rejected",
job->id.to_string().c_str());
check_simulation_finished(data);
......@@ -770,6 +815,9 @@ void server_on_kill_jobs(ServerData * data,
for (const JobIdentifier & job_id : message->jobs_ids)
{
xbt_assert(data->context->workloads.job_is_registered(job_id),
"Trying to kill job '%s' but it does not exist.", job_id.to_string().c_str());
Job * job = data->context->workloads.job_at(job_id);
// Let's discard jobs whose kill has already been requested
......
......@@ -199,6 +199,14 @@ void server_on_register_job(ServerData * data,
void server_on_register_profile(ServerData * data,
IPMessage * task_data);
/**
* @brief Server SCHED_SET_JOB_METADATA handler
* @param[in,out] data The data asssociated with the server_process
* @param[in,out] task_data The data associated with the message the server received
*/
void server_on_set_job_metadata(ServerData * data,
IPMessage * task_data);
/**
* @brief Server SCHED_REJECT_JOB handler
* @param[in,out] data The data associated with the server_process
......
......@@ -277,6 +277,7 @@ void Workloads::register_smpi_applications()
bool Workloads::job_is_registered(const JobIdentifier &job_id)
{
at(job_id.workload_name)->jobs->displayDebug();
return at(job_id.workload_name)->jobs->exists(job_id);
}
......@@ -303,7 +304,7 @@ string Workloads::to_string()
{
string key = mit.first;
Workload * workload = mit.second;
str += key + ": " + workload->to_string() + " ";
str += workload->to_string() + " ";
}
return str;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment