Commit 7e5c6a18 authored by MERCIER Michael's avatar MERCIER Michael

[doc] job id int-> string (to finish)

parent 86348bc7
...@@ -709,6 +709,20 @@ Batsim acknowledges it by sending one ...@@ -709,6 +709,20 @@ Batsim acknowledges it by sending one
``` ```
### SET_JOB_METADATA ### SET_JOB_METADATA
This message is a convenient way to to attach metadata to a job during
simulation runtime that will appear in the final result file. A column
named ``metadata`` will be present in th output file ``PREFIX_job.csv``
with the string provided by the scheduler, or an empty string if not set.
**Note**: If you need to add **static** metadata to a job you can simply
add one or more fields in the job profile.
**Warning**: This not a way to delegate to batsim the storage of metadata.
That should be done though Redis, (when you have to share information
between different process for example), or using the scheduler's internal
data structures.
- **data**: a job id and its metadata - **data**: a job id and its metadata
- **example**: - **example**:
```json ```json
......
...@@ -92,6 +92,28 @@ bool file_exists(const std::string & filename) ...@@ -92,6 +92,28 @@ bool file_exists(const std::string & filename)
return boost::filesystem::exists(filename); return boost::filesystem::exists(filename);
} }
/**
* @brief Computes the absolute filename of a given file
* @param[in] filename The name of the file (not necessarily existing).
* @return The absolute filename corresponding to the given filename
*/
std::string absolute_filename(const std::string & filename)
{
xbt_assert(filename.length() > 0);
// Let's assume filenames starting by "/" are absolute.
if (filename[0] == '/')
{
return filename;
}
char cwd_buf[PATH_MAX];
char * getcwd_ret = getcwd(cwd_buf, PATH_MAX);
xbt_assert(getcwd_ret == cwd_buf, "getcwd failed");
return string(getcwd_ret) + '/' + filename;
}
/** /**
* @brief Converts a string to a VerbosityLevel * @brief Converts a string to a VerbosityLevel
* @param[in] str The string * @param[in] str The string
......
...@@ -744,7 +744,7 @@ void export_jobs_to_csv(const std::string &filename, const BatsimContext *contex ...@@ -744,7 +744,7 @@ void export_jobs_to_csv(const std::string &filename, const BatsimContext *contex
job_map["job_id"] = to_string(job->number); job_map["job_id"] = to_string(job->number);
job_map["workload_name"] = string(workload_name); job_map["workload_name"] = string(workload_name);
job_map["submission_time"] = to_string((double)job->submission_time); job_map["submission_time"] = to_string((double)job->submission_time);
job_map["requested_number_of_processors"] = to_string(job->required_nb_res); job_map["requested_number_of_processors"] = to_string(job->requested_nb_res);
job_map["requested_time"] = to_string((double)job->walltime); job_map["requested_time"] = to_string((double)job->walltime);
job_map["success"] = to_string(success); job_map["success"] = to_string(success);
job_map["starting_time"] = to_string((double)job->starting_time); job_map["starting_time"] = to_string((double)job->starting_time);
...@@ -1056,13 +1056,13 @@ void EnergyConsumptionTracer::set_filename(const string &filename) ...@@ -1056,13 +1056,13 @@ void EnergyConsumptionTracer::set_filename(const string &filename)
_wbuf->append_text("time,energy,event_type,wattmin,epower\n"); _wbuf->append_text("time,energy,event_type,wattmin,epower\n");
} }
void EnergyConsumptionTracer::add_job_start(double date, int job_id) void EnergyConsumptionTracer::add_job_start(double date, string job_id)
{ {
(void) job_id; (void) job_id;
add_entry(date, 's'); add_entry(date, 's');
} }
void EnergyConsumptionTracer::add_job_end(double date, int job_id) void EnergyConsumptionTracer::add_job_end(double date, string job_id)
{ {
(void) job_id; (void) job_id;
_context->energy_last_job_completion = add_entry(date, 'e'); _context->energy_last_job_completion = add_entry(date, 'e');
......
...@@ -280,16 +280,16 @@ IPMessage::~IPMessage() ...@@ -280,16 +280,16 @@ IPMessage::~IPMessage()
data = nullptr; data = nullptr;
} }
JobIdentifier::JobIdentifier(const string &workload_name, int job_number) : JobIdentifier::JobIdentifier(const string &workload_name, const string &job_number) :
workload_name(workload_name), workload_name(workload_name),
job_number(job_number) job_name(job_number)
{ {
} }
string JobIdentifier::to_string() const string JobIdentifier::to_string() const
{ {
return workload_name + '!' + std::to_string(job_number); return workload_name + '!' + job_name;
} }
bool operator<(const JobIdentifier &ji1, const JobIdentifier &ji2) bool operator<(const JobIdentifier &ji1, const JobIdentifier &ji2)
......
...@@ -89,8 +89,6 @@ int static_job_submitter_process(int argc, char *argv[]) ...@@ -89,8 +89,6 @@ int static_job_submitter_process(int argc, char *argv[])
sort(jobsVector.begin(), jobsVector.end(), job_comparator_subtime_number); sort(jobsVector.begin(), jobsVector.end(), job_comparator_subtime_number);
XBT_INFO("taille vecteur : %d", (int) jobsVector.size() );
if (jobsVector.size() > 0) if (jobsVector.size() > 0)
{ {
const Job * first_submitted_job = *jobsVector.begin(); const Job * first_submitted_job = *jobsVector.begin();
...@@ -105,10 +103,8 @@ int static_job_submitter_process(int argc, char *argv[]) ...@@ -105,10 +103,8 @@ int static_job_submitter_process(int argc, char *argv[])
//job->completion_notification_mailbox = "SOME_MAILBOX"; //job->completion_notification_mailbox = "SOME_MAILBOX";
// Let's put the metadata about the job into the data storage // Let's put the metadata about the job into the data storage
JobIdentifier job_id(workload->name, job->number); string job_key = RedisStorage::job_key(job->id);
string job_key = RedisStorage::job_key(job_id);
string profile_key = RedisStorage::profile_key(workload->name, job->profile); string profile_key = RedisStorage::profile_key(workload->name, job->profile);
XBT_INFO("IN STATIC JOB SUBMITTER: '%s'", job->json_description.c_str());
if (context->redis_enabled) if (context->redis_enabled)
{ {
...@@ -123,7 +119,7 @@ int static_job_submitter_process(int argc, char *argv[]) ...@@ -123,7 +119,7 @@ int static_job_submitter_process(int argc, char *argv[])
JobSubmittedMessage * msg = new JobSubmittedMessage; JobSubmittedMessage * msg = new JobSubmittedMessage;
msg->submitter_name = submitter_name; msg->submitter_name = submitter_name;
msg->job_id.workload_name = args->workload_name; msg->job_id.workload_name = args->workload_name;
msg->job_id.job_number = job->number; msg->job_id.job_name = job->id.to_string();
send_message("server", IPMessageType::JOB_SUBMITTED, (void*)msg); send_message("server", IPMessageType::JOB_SUBMITTED, (void*)msg);
previous_submission_date = MSG_get_clock(); previous_submission_date = MSG_get_clock();
...@@ -330,7 +326,7 @@ static string submit_workflow_task_as_job(BatsimContext *context, string workflo ...@@ -330,7 +326,7 @@ static string submit_workflow_task_as_job(BatsimContext *context, string workflo
JobSubmittedMessage * msg = new JobSubmittedMessage; JobSubmittedMessage * msg = new JobSubmittedMessage;
msg->submitter_name = submitter_name; msg->submitter_name = submitter_name;
msg->job_id.workload_name = workload_name; msg->job_id.workload_name = workload_name;
msg->job_id.job_number = job_number; msg->job_id.job_name = job_number;
send_message("server", IPMessageType::JOB_SUBMITTED, (void*)msg); send_message("server", IPMessageType::JOB_SUBMITTED, (void*)msg);
// HOWTO Test Wait Query // HOWTO Test Wait Query
...@@ -368,7 +364,7 @@ static string wait_for_job_completion(string submitter_name) ...@@ -368,7 +364,7 @@ static string wait_for_job_completion(string submitter_name)
(SubmitterJobCompletionCallbackMessage *) task_notification_data->data; (SubmitterJobCompletionCallbackMessage *) task_notification_data->data;
return notification_data->job_id.workload_name + "!" + return notification_data->job_id.workload_name + "!" +
std::to_string(notification_data->job_id.job_number); std::to_string(notification_data->job_id.job_name);
} }
/** /**
...@@ -404,14 +400,12 @@ int batexec_job_launcher_process(int argc, char *argv[]) ...@@ -404,14 +400,12 @@ int batexec_job_launcher_process(int argc, char *argv[])
for (auto & mit : jobs) for (auto & mit : jobs)
{ {
Job * job = mit.second; Job * job = mit.second;
job->id = JobIdentifier(workload->name, job->number);
int nb_res = job->required_nb_res; int nb_res = job->requested_nb_res;
SchedulingAllocation * alloc = new SchedulingAllocation; SchedulingAllocation * alloc = new SchedulingAllocation;
alloc->job_id.workload_name = args->workload_name; alloc->job_id = job->id;
alloc->job_id.job_number = job->number;
alloc->hosts.clear(); alloc->hosts.clear();
alloc->hosts.reserve(nb_res); alloc->hosts.reserve(nb_res);
alloc->machine_ids.clear(); alloc->machine_ids.clear();
......
This diff is collapsed.
...@@ -34,14 +34,14 @@ struct JobIdentifier ...@@ -34,14 +34,14 @@ struct JobIdentifier
* @param[in] job_number The job number * @param[in] job_number The job number
*/ */
explicit JobIdentifier(const std::string & workload_name = "", explicit JobIdentifier(const std::string & workload_name = "",
int job_number = -1); const std::string &job_name = "");
std::string workload_name; //!< The name of the workload the job belongs to std::string workload_name; //!< The name of the workload the job belongs to
int job_number; //!< The job unique number inside its workload std::string job_name; //!< The job unique name inside its workload
/** /**
* @brief Returns a string representation of the JobIdentifier. * @brief Returns a string representation of the JobIdentifier.
* @details Output format is WORKLOAD_NAME!JOB_NUMBER * @details Output format is WORKLOAD_NAME!JOB_NAME
* @return A string representation of the JobIdentifier. * @return A string representation of the JobIdentifier.
*/ */
std::string to_string() const; std::string to_string() const;
...@@ -142,7 +142,6 @@ struct Job ...@@ -142,7 +142,6 @@ struct Job
// Batsim internals // Batsim internals
Workload * workload = nullptr; //!< The workload the job belongs to Workload * workload = nullptr; //!< The workload the job belongs to
int number; //!< The job unique number within its workload
JobIdentifier id; //!< The job unique identifier JobIdentifier id; //!< The job unique identifier
BatTask * task = nullptr; //!< The root task be executed by this job (profile instantiation). BatTask * task = nullptr; //!< The root task be executed by this job (profile instantiation).
std::string json_description; //!< The JSON description of the job std::string json_description; //!< The JSON description of the job
...@@ -166,7 +165,7 @@ struct Job ...@@ -166,7 +165,7 @@ struct Job
std::string profile; //!< The job profile name. The corresponding profile tells how the job should be computed std::string profile; //!< The job profile name. The corresponding profile tells how the job should be computed
Rational submission_time; //!< The job submission time: The time at which the becomes available Rational submission_time; //!< The job submission time: The time at which the becomes available
Rational walltime = -1; //!< The job walltime: if the job is executed for more than this amount of time, it will be killed. Set at -1 to disable this behavior Rational walltime = -1; //!< The job walltime: if the job is executed for more than this amount of time, it will be killed. Set at -1 to disable this behavior
int required_nb_res; //!< The number of resources the job is requested to be executed on int requested_nb_res; //!< The number of resources the job is requested to be executed on
int return_code = -1; //!< The return code of the job int return_code = -1; //!< The return code of the job
public: public:
...@@ -329,7 +328,7 @@ public: ...@@ -329,7 +328,7 @@ public:
int nb_jobs() const; int nb_jobs() const;
private: private:
std::map<int, Job*> _jobs; //!< The std::map which contains the jobs std::map<JobIdentifier, Job*> _jobs; //!< The std::map which contains the jobs
Profiles * _profiles = nullptr; //!< The profiles associated with the jobs Profiles * _profiles = nullptr; //!< The profiles associated with the jobs
Workload * _workload = nullptr; //!< The Workload the jobs belong to Workload * _workload = nullptr; //!< The Workload the jobs belong to
}; };
......
...@@ -81,7 +81,7 @@ int execute_task(BatTask * btask, ...@@ -81,7 +81,7 @@ int execute_task(BatTask * btask,
job->workload->profiles->at(data->sequence[profile_index_in_sequence])); job->workload->profiles->at(data->sequence[profile_index_in_sequence]));
btask->sub_tasks.push_back(sub_btask); btask->sub_tasks.push_back(sub_btask);
string task_name = "seq" + to_string(job->number) + "'" + job->profile + "'"; string task_name = "seq" + job->id.to_string() + "'" + job->profile + "'";
XBT_INFO("Creating sequential tasks '%s'", task_name.c_str()); XBT_INFO("Creating sequential tasks '%s'", task_name.c_str());
int ret_last_profile = execute_task(sub_btask, context, allocation, int ret_last_profile = execute_task(sub_btask, context, allocation,
...@@ -182,7 +182,7 @@ int execute_task(BatTask * btask, ...@@ -182,7 +182,7 @@ int execute_task(BatTask * btask,
job->workload->profiles->at(profile_to_execute)); job->workload->profiles->at(profile_to_execute));
btask->sub_tasks.push_back(sub_btask); btask->sub_tasks.push_back(sub_btask);
string task_name = "recv" + to_string(job->number) + "'" + job->profile + "'"; string task_name = "recv" + job->id.to_string() + "'" + job->profile + "'";
XBT_INFO("Creating receive task '%s'", task_name.c_str()); XBT_INFO("Creating receive task '%s'", task_name.c_str());
int ret_last_profile = execute_task(sub_btask, context, allocation, int ret_last_profile = execute_task(sub_btask, context, allocation,
...@@ -224,7 +224,7 @@ int execute_task(BatTask * btask, ...@@ -224,7 +224,7 @@ int execute_task(BatTask * btask,
for (int i = 0; i < nb_ranks; ++i) for (int i = 0; i < nb_ranks; ++i)
{ {
job->smpi_ranks_to_hosts_mapping[i] = host_to_use; job->smpi_ranks_to_hosts_mapping[i] = host_to_use;
++host_to_use %= job->required_nb_res; // ++ is done first ++host_to_use %= job->requested_nb_res; // ++ is done first
} }
} }
...@@ -236,7 +236,7 @@ int execute_task(BatTask * btask, ...@@ -236,7 +236,7 @@ int execute_task(BatTask * btask,
for (int i = 0; i < nb_ranks; ++i) for (int i = 0; i < nb_ranks; ++i)
{ {
char *str_instance_id = NULL; char *str_instance_id = NULL;
int ret = asprintf(&str_instance_id, "%s!%d", job->workload->name.c_str(), job->number); int ret = asprintf(&str_instance_id, "%s", job->id.to_string().c_str());
(void) ret; // Avoids a warning if assertions are ignored (void) ret; // Avoids a warning if assertions are ignored
xbt_assert(ret != -1, "asprintf failed (not enough memory?)"); xbt_assert(ret != -1, "asprintf failed (not enough memory?)");
...@@ -245,7 +245,7 @@ int execute_task(BatTask * btask, ...@@ -245,7 +245,7 @@ int execute_task(BatTask * btask,
xbt_assert(ret != -1, "asprintf failed (not enough memory?)"); xbt_assert(ret != -1, "asprintf failed (not enough memory?)");
char *str_pname = NULL; char *str_pname = NULL;
ret = asprintf(&str_pname, "%d_%d", job->number, i); ret = asprintf(&str_pname, "%s_%d", job->id.to_string().c_str(), i);
xbt_assert(ret != -1, "asprintf failed (not enough memory?)"); xbt_assert(ret != -1, "asprintf failed (not enough memory?)");
char **argv = xbt_new(char*, 5); char **argv = xbt_new(char*, 5);
...@@ -359,7 +359,7 @@ int execute_job_process(int argc, char *argv[]) ...@@ -359,7 +359,7 @@ int execute_job_process(int argc, char *argv[])
ExecuteJobProcessArguments * args = (ExecuteJobProcessArguments *) MSG_process_get_data(MSG_process_self()); ExecuteJobProcessArguments * args = (ExecuteJobProcessArguments *) MSG_process_get_data(MSG_process_self());
Workload * workload = args->context->workloads.at(args->allocation->job_id.workload_name); Workload * workload = args->context->workloads.at(args->allocation->job_id.workload_name);
Job * job = workload->jobs->at(args->allocation->job_id.job_number); Job * job = workload->jobs->at(args->allocation->job_id.job_name);
job->starting_time = MSG_get_clock(); job->starting_time = MSG_get_clock();
job->allocation = args->allocation->machine_ids; job->allocation = args->allocation->machine_ids;
double remaining_time = (double)job->walltime; double remaining_time = (double)job->walltime;
...@@ -369,7 +369,7 @@ int execute_job_process(int argc, char *argv[]) ...@@ -369,7 +369,7 @@ int execute_job_process(int argc, char *argv[])
{ {
job->consumed_energy = consumed_energy_on_machines(args->context, job->allocation); job->consumed_energy = consumed_energy_on_machines(args->context, job->allocation);
// Let's trace the consumed energy // Let's trace the consumed energy
args->context->energy_tracer.add_job_start(MSG_get_clock(), job->number); args->context->energy_tracer.add_job_start(MSG_get_clock(), job->id);
} }
// Job computation // Job computation
......
...@@ -76,46 +76,3 @@ int request_reply_scheduler_process(int argc, char *argv[]) ...@@ -76,46 +76,3 @@ int request_reply_scheduler_process(int argc, char *argv[])
delete args; delete args;
return 0; return 0;
} }
std::string absolute_filename(const std::string & filename)
{
xbt_assert(filename.length() > 0);
// Let's assume filenames starting by "/" are absolute.
if (filename[0] == '/')
{
return filename;
}
char cwd_buf[PATH_MAX];
char * getcwd_ret = getcwd(cwd_buf, PATH_MAX);
xbt_assert(getcwd_ret == cwd_buf, "getcwd failed");
return string(getcwd_ret) + '/' + filename;
}
void parse_job_identifier(const std::string & job_identifier_string,
JobIdentifier & job_id)
{
// Let's split the job_identifier by '!'
vector<string> job_identifier_parts;
boost::split(job_identifier_parts, job_identifier_string, boost::is_any_of("!"), boost::token_compress_on);
XBT_DEBUG("Parsed job_identifier: '%d'", job_identifier_parts.size());
if (job_identifier_parts.size() == 1)
{
XBT_WARN("Job ID is not of format WORKLOAD!NUMBER... assuming static!");
job_id.workload_name = "static";
job_id.job_number = std::stoi(job_identifier_parts[0]);
}
else if (job_identifier_parts.size() == 2)
{
job_id.workload_name = job_identifier_parts[0];
job_id.job_number = std::stoi(job_identifier_parts[1]);
}
else
{
xbt_assert("Empty or invalid job identifier: %s", job_identifier_string.c_str());
}
}
...@@ -37,19 +37,3 @@ enum NetworkStamp : char ...@@ -37,19 +37,3 @@ enum NetworkStamp : char
* @return 0 * @return 0
*/ */
int request_reply_scheduler_process(int argc, char *argv[]); int request_reply_scheduler_process(int argc, char *argv[]);
/**
* @brief Computes the absolute filename of a given file
* @param[in] filename The name of the file (not necessarily existing).
* @return The absolute filename corresponding to the given filename
*/
std::string absolute_filename(const std::string & filename);
/**
* @brief Retrieves the workload_name and the job_id from a job_identifier
* @details Job identifiers are in the form [WORKLOAD_NAME!]JOB_ID
* @param[in] job_identifier_string The input job identifier string
* @param[out] job_id The output JobIdentifier
*/
void parse_job_identifier(const std::string & job_identifier_string, JobIdentifier & job_id);
...@@ -706,7 +706,7 @@ void JsonProtocolReader::handle_reject_job(int event_number, ...@@ -706,7 +706,7 @@ void JsonProtocolReader::handle_reject_job(int event_number,
string job_id = job_id_value.GetString(); string job_id = job_id_value.GetString();
JobRejectedMessage * message = new JobRejectedMessage; JobRejectedMessage * message = new JobRejectedMessage;
parse_job_identifier(job_id, message->job_id); message->job_id = JobIdentifier(job_id);
if (!(context->workloads.job_is_registered(message->job_id))) if (!(context->workloads.job_is_registered(message->job_id)))
{ {
xbt_assert(false, "The folowing job does not exist.", job_id.c_str()); xbt_assert(false, "The folowing job does not exist.", job_id.c_str());
...@@ -754,7 +754,7 @@ void JsonProtocolReader::handle_execute_job(int event_number, ...@@ -754,7 +754,7 @@ void JsonProtocolReader::handle_execute_job(int event_number,
string job_id = job_id_value.GetString(); string job_id = job_id_value.GetString();
// Let's retrieve the job identifier // Let's retrieve the job identifier
parse_job_identifier(job_id, message->allocation->job_id); message->allocation->job_id = JobIdentifier(job_id);
if (!(context->workloads.job_is_registered(message->allocation->job_id))) if (!(context->workloads.job_is_registered(message->allocation->job_id)))
{ {
xbt_assert(false, "Invalid message in event %d (EXECUTE_JOB): job %d with job_id:'%s' does not exists", event_number, job_id.c_str()); xbt_assert(false, "Invalid message in event %d (EXECUTE_JOB): job %d with job_id:'%s' does not exists", event_number, job_id.c_str());
...@@ -964,8 +964,7 @@ void JsonProtocolReader::handle_set_job_metadata(int event_number, ...@@ -964,8 +964,7 @@ void JsonProtocolReader::handle_set_job_metadata(int event_number,
boost::regex r("[^\"]*"); boost::regex r("[^\"]*");
xbt_assert(boost::regex_match(metadata, r), "Invalid JSON message: the 'metadata' value in the 'data' value of event %d (SET_JOB_METADATA) should not contain double quotes (got ###%s###)", event_number, metadata.c_str()); xbt_assert(boost::regex_match(metadata, r), "Invalid JSON message: the 'metadata' value in the 'data' value of event %d (SET_JOB_METADATA) should not contain double quotes (got ###%s###)", event_number, metadata.c_str());
JobIdentifier job_identifier; JobIdentifier job_identifier = JobIdentifier(job_id);
parse_job_identifier(job_id, job_identifier);
if (!(context->workloads.job_is_registered(job_identifier))) if (!(context->workloads.job_is_registered(job_identifier)))
{ {
xbt_assert(false, "Invalid message in event %d (SET_JOB_METADATA): The folowing job does not exist: %s", event_number, job_id.c_str()); xbt_assert(false, "Invalid message in event %d (SET_JOB_METADATA): The folowing job does not exist: %s", event_number, job_id.c_str());
...@@ -1032,7 +1031,7 @@ void JsonProtocolReader::handle_change_job_state(int event_number, ...@@ -1032,7 +1031,7 @@ void JsonProtocolReader::handle_change_job_state(int event_number,
ChangeJobStateMessage * message = new ChangeJobStateMessage; ChangeJobStateMessage * message = new ChangeJobStateMessage;
parse_job_identifier(job_id, message->job_id); message->job_id = JobIdentifier(job_id);
if (!(context->workloads.job_is_registered(message->job_id))) if (!(context->workloads.job_is_registered(message->job_id)))
{ {
xbt_assert(false, "The folowing job does not exist.", job_id.c_str()); xbt_assert(false, "The folowing job does not exist.", job_id.c_str());
...@@ -1106,7 +1105,7 @@ void JsonProtocolReader::handle_to_job_msg(int event_number, ...@@ -1106,7 +1105,7 @@ void JsonProtocolReader::handle_to_job_msg(int event_number,
ToJobMessage * message = new ToJobMessage; ToJobMessage * message = new ToJobMessage;
parse_job_identifier(job_id, message->job_id); message->job_id = JobIdentifier(job_id);
if (!(context->workloads.job_is_registered(message->job_id))) if (!(context->workloads.job_is_registered(message->job_id)))
{ {
xbt_assert(false, "The folowing job does not exist.", job_id.c_str()); xbt_assert(false, "The folowing job does not exist.", job_id.c_str());
...@@ -1157,7 +1156,7 @@ void JsonProtocolReader::handle_submit_job(int event_number, ...@@ -1157,7 +1156,7 @@ void JsonProtocolReader::handle_submit_job(int event_number,
xbt_assert(job_id_value.IsString(), "Invalid JSON message: in event %d (SUBMIT_JOB): ['data']['job_id'] should be a string", event_number); xbt_assert(job_id_value.IsString(), "Invalid JSON message: in event %d (SUBMIT_JOB): ['data']['job_id'] should be a string", event_number);
string job_id = job_id_value.GetString(); string job_id = job_id_value.GetString();
parse_job_identifier(job_id, message->job_id); message->job_id = JobIdentifier(job_id);
if (context->workloads.exists(message->job_id.workload_name)) if (context->workloads.exists(message->job_id.workload_name))
{ {
xbt_assert(!context->workloads.job_is_registered(message->job_id), "Invalid message in event %d (SUBMIT_JOB): job_id '%s' already exists", event_number, job_id.c_str()); xbt_assert(!context->workloads.job_is_registered(message->job_id), "Invalid message in event %d (SUBMIT_JOB): job_id '%s' already exists", event_number, job_id.c_str());
...@@ -1204,7 +1203,6 @@ void JsonProtocolReader::handle_submit_job(int event_number, ...@@ -1204,7 +1203,6 @@ void JsonProtocolReader::handle_submit_job(int event_number,
Job * job = Job::from_json(message->job_description, workload, Job * job = Job::from_json(message->job_description, workload,
"Invalid JSON job submitted by the scheduler"); "Invalid JSON job submitted by the scheduler");
workload->jobs->add_job(job); workload->jobs->add_job(job);
job->id = JobIdentifier(workload->name, job->number);
job->state = JobState::JOB_STATE_SUBMITTED; job->state = JobState::JOB_STATE_SUBMITTED;
// Read the profile description if possible // Read the profile description if possible
...@@ -1220,7 +1218,7 @@ void JsonProtocolReader::handle_submit_job(int event_number, ...@@ -1220,7 +1218,7 @@ void JsonProtocolReader::handle_submit_job(int event_number,
profile_object.Accept(writer); profile_object.Accept(writer);
message->job_profile_description = string(buffer.GetString(), buffer.GetSize()); message->job_profile_description = string(buffer.GetString(), buffer.GetSize());
XBT_INFO("A profile was submited with the job '%d' in workload '%s': %s", job->number, workload->name.c_str(), message->job_profile_description.c_str()); XBT_INFO("A profile was submited with the job '%s' : %s", job->id.to_string().c_str(), message->job_profile_description.c_str());
} }
else if (context->redis_enabled) else if (context->redis_enabled)
{ {
...@@ -1366,7 +1364,7 @@ void JsonProtocolReader::handle_kill_job(int event_number, ...@@ -1366,7 +1364,7 @@ void JsonProtocolReader::handle_kill_job(int event_number,
for (unsigned int i = 0; i < job_ids_array.Size(); ++i) for (unsigned int i = 0; i < job_ids_array.Size(); ++i)
{ {
const Value & job_id_value = job_ids_array[i]; const Value & job_id_value = job_ids_array[i];
parse_job_identifier(job_id_value.GetString(), message->jobs_ids[i]); message->jobs_ids[i] = JobIdentifier(job_id_value.GetString());
if (!(context->workloads.job_is_registered(message->jobs_ids[i]))) if (!(context->workloads.job_is_registered(message->jobs_ids[i])))
{ {
xbt_assert(false, "Invalid message in event %d (KILL_JOB): job %d with job_id:'%s' does not exists", event_number, i, message->jobs_ids[i].to_string().c_str()); xbt_assert(false, "Invalid message in event %d (KILL_JOB): job %d with job_id:'%s' does not exists", event_number, i, message->jobs_ids[i].to_string().c_str());
......
...@@ -205,8 +205,8 @@ void server_on_job_completed(ServerData * data, ...@@ -205,8 +205,8 @@ void server_on_job_completed(ServerData * data,
xbt_assert(data->nb_completed_jobs + data->nb_running_jobs <= data->nb_submitted_jobs); xbt_assert(data->nb_completed_jobs + data->nb_running_jobs <= data->nb_submitted_jobs);
Job * job = data->context->workloads.job_at(message->job_id); Job * job = data->context->workloads.job_at(message->job_id);
XBT_INFO("Job %s!%d COMPLETED. %d jobs completed so far", XBT_INFO("Job COMPLETED. %d jobs completed so far",
job->workload->name.c_str(), job->number, data->nb_completed_jobs); job->id.to_string().c_str(), data->nb_completed_jobs);
string status = "UNKNOWN"; string status = "UNKNOWN";
if (job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY) if (job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY)
...@@ -257,7 +257,7 @@ void server_on_job_submitted(ServerData * data, ...@@ -257,7 +257,7 @@ void server_on_job_submitted(ServerData * data,
} }
// Let's retrieve the Job from memory (or add it into memory if it is dynamic) // Let's retrieve the Job from memory (or add it into memory if it is dynamic)
XBT_INFO("GOT JOB: %s %d\n", message->job_id.workload_name.c_str(), message->job_id.job_number); XBT_INFO("GOT JOB: %s %d\n", message->job_id.workload_name.c_str(), message->job_id.job_name);
xbt_assert(data->context->workloads.job_is_registered(message->job_id)); xbt_assert(data->context->workloads.job_is_registered(message->job_id));
Job * job = data->context->workloads.job_at(message->job_id); Job * job = data->context->workloads.job_at(message->job_id);
job->id = message->job_id; job->id = message->job_id;
...@@ -633,9 +633,9 @@ void server_on_change_job_state(ServerData * data, ...@@ -633,9 +633,9 @@ void server_on_change_job_state(ServerData * data,
Job * job = data->context->workloads.job_at(message->job_id); Job * job = data->context->workloads.job_at(message->job_id);
XBT_INFO("Change job state: Job %d (workload=%s) to state %s (kill_Reason=%s)", XBT_INFO("Change job state: Job %s to state %s",
job->number, job->workload->name.c_str(), job->id.to_string().c_str(),
message->job_state.c_str(), message->kill_reason.c_str()); message->job_state.c_str());
JobState new_state = job_state_from_string(message->job_state); JobState new_state = job_state_from_string(message->job_state);
...@@ -688,9 +688,6 @@ void server_on_change_job_state(ServerData * data, ...@@ -688,9 +688,6 @@ void server_on_change_job_state(ServerData * data,
job->state = new_state; job->state = new_state;
job->kill_reason = message->kill_reason; job->kill_reason = message->kill_reason;
XBT_INFO("Job state changed: Job %d (workload=%s)",
job->number, job->workload->name.c_str());
check_submitted_and_completed(data); check_submitted_and_completed(data);
} }
...@@ -702,8 +699,8 @@ void server_on_to_job_msg(ServerData * data, ...@@ -702,8 +699,8 @@ void server_on_to_job_msg(ServerData * data,
Job * job = data->context->workloads.job_at(message->job_id); Job * job = data->context->workloads.job_at(message->job_id);
XBT_INFO("Send message to job: Job %d (workload=%s) message=%s", XBT_INFO("Send message to job: Job %s message=%s",
job->number, job->workload->name.c_str(), job->id.to_string().c_str(),
message->message.c_str()); message->message.c_str());
job->incoming_message_buffer.push_back(message->message); job->incoming_message_buffer.push_back(message->message);
...@@ -719,8 +716,8 @@ void server_on_from_job_msg(ServerData * data, ...@@ -719,8 +716,8 @@ void server_on_from_job_msg(ServerData * data,
Job * job = data->context->workloads.job_at(message->job_id); Job * job = data->context->workloads.job_at(message->job_id);
XBT_INFO("Send message to scheduler: Job %d (workload=%s)", XBT_INFO("Send message to scheduler: Job %s",
job->number, job->workload->name.c_str()); job->id.to_string().c_str());
data->context->proto_writer->append_from_job_message(message->job_id.to_string(), data->context->proto_writer->append_from_job_message(message->job_id.to_string(),
message->message, message->message,
...@@ -739,8 +736,8 @@ void server_on_reject_job(ServerData * data, ...@@ -739,8 +736,8 @@ void server_on_reject_job(ServerData * data,
job->state = JobState::JOB_STATE_REJECTED; job->state = JobState::JOB_STATE_REJECTED;
data->nb_completed_jobs++; data->nb_completed_jobs++;
XBT_INFO("Job %d (workload=%s) has been rejected", XBT_INFO("Job %s has been rejected",
job->number, job->workload->name.c_str()); job->id.to_string().c_str());
check_submitted_and_completed(data); check_submitted_and_completed(data);
} }
...@@ -860,11 +857,11 @@ void server_on_execute_job(ServerData * data, ...@@ -860,11 +857,11 @@ void server_on_execute_job(ServerData * data,
// manage correctly a different number of resources than the requested // manage correctly a different number of resources than the requested
// number // number
if (job->workload->profiles->at(job->profile)->type != ProfileType::MSG_PARALLEL_HOMOGENEOUS_TOTAL_AMOUNT) { if (job->workload->profiles->at(job->profile)->type != ProfileType::MSG_PARALLEL_HOMOGENEOUS_TOTAL_AMOUNT) {
xbt_assert((int)allocation->mapping.size() == job->required_nb_res, xbt_assert((int)allocation->mapping.size() == job->requested_nb_res,
"Invalid job %s allocation. The job requires %d machines but only %d were given (%s). " "Invalid job %s allocation. The job requires %d machines but only %d were given (%s). "
"Using a different number of machines is only allowed if a custom mapping is specified. " "Using a different number of machines is only allowed if a custom mapping is specified. "
"This mapping must specify which allocated machine each executor should use.", "This mapping must specify which allocated machine each executor should use.",
job->id.to_string().c_str(), job->required_nb_res, (int)allocation->mapping.size(), job->id.to_string().c_str(), job->requested_nb_res, (int)allocation->mapping.size(),
allocation->machine_ids.to_string_hyphen().c_str()); allocation->machine_ids.to_string_hyphen().c_str());
} }
......
...@@ -108,7 +108,7 @@ std::string RedisStorage::key_subparts_separator() const ...@@ -108,7 +108,7 @@ std::string RedisStorage::key_subparts_separator() const
std::string RedisStorage::job_key(const JobIdentifier &job_id) std::string RedisStorage::job_key(const JobIdentifier &job_id)
{ {
std::string key = "job_" + job_id.workload_name + '!' + to_string(job_id.job_number); std::string key = "job_" + job_id.workload_name + '!' + to_string(job_id.job_name);
return key; return key;
} }
......