Commit ac21b4ab authored by MERCIER Michael's avatar MERCIER Michael

[code] job id int-> string

parent 86348bc7
......@@ -344,6 +344,9 @@ If redis is enabled (``{"redis": {"enabled": true}}``),
``job_id`` is the only forwarded field.
Otherwise (if redis is disabled), a JSON description of the job is forwarded
in the ``job`` field.
**Note:** The workload name MUST NOT be present in the job description id field.
A JSON description of the job profile is sent if and only if
profiles forwarding is enabled
(``{"job_submission": {"forward_profiles": true}}``).
......@@ -709,6 +712,20 @@ Batsim acknowledges it by sending one
```
### SET_JOB_METADATA
This message is a convenient way to to attach metadata to a job during
simulation runtime that will appear in the final result file. A column
named ``metadata`` will be present in th output file ``PREFIX_job.csv``
with the string provided by the scheduler, or an empty string if not set.
**Note**: If you need to add **static** metadata to a job you can simply
add one or more fields in the job profile.
**Warning**: This not a way to delegate to batsim the storage of metadata.
That should be done though Redis, (when you have to share information
between different process for example), or using the scheduler's internal
data structures.
- **data**: a job id and its metadata
- **example**:
```json
......
Subproject commit 0a52e8200377d695ef5946f8bff5802d20fb62d2
Subproject commit 7d0e835cfcc88d5a808e49a8726f07b41d152937
......@@ -92,6 +92,28 @@ bool file_exists(const std::string & filename)
return boost::filesystem::exists(filename);
}
/**
* @brief Computes the absolute filename of a given file
* @param[in] filename The name of the file (not necessarily existing).
* @return The absolute filename corresponding to the given filename
*/
std::string absolute_filename(const std::string & filename)
{
xbt_assert(filename.length() > 0);
// Let's assume filenames starting by "/" are absolute.
if (filename[0] == '/')
{
return filename;
}
char cwd_buf[PATH_MAX];
char * getcwd_ret = getcwd(cwd_buf, PATH_MAX);
xbt_assert(getcwd_ret == cwd_buf, "getcwd failed");
return string(getcwd_ret) + '/' + filename;
}
/**
* @brief Converts a string to a VerbosityLevel
* @param[in] str The string
......
......@@ -511,7 +511,7 @@ void PajeTracer::register_new_job(const Job *job)
"%d %s%s %s \"%s\" %s\n",
DEFINE_ENTITY_VALUE, jobPrefix, job->id.to_string().c_str(),
machineState, job->id.to_string().c_str(),
_colors[job->number % (int)_colors.size()].c_str());
_colors[nb_total_jobs++ % (int)_colors.size()].c_str());
xbt_assert(nb_printed < buf_size - 1,
"Writing error: buffer has been completely filled, some information might "
"have been lost. Please increase Batsim's output temporary buffers' size");
......@@ -741,10 +741,10 @@ void export_jobs_to_csv(const std::string &filename, const BatsimContext *contex
int success = (job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY);
// Update all values
job_map["job_id"] = to_string(job->number);
job_map["job_id"] = job->id.job_name;
job_map["workload_name"] = string(workload_name);
job_map["submission_time"] = to_string((double)job->submission_time);
job_map["requested_number_of_processors"] = to_string(job->required_nb_res);
job_map["requested_number_of_processors"] = to_string(job->requested_nb_res);
job_map["requested_time"] = to_string((double)job->walltime);
job_map["success"] = to_string(success);
job_map["starting_time"] = to_string((double)job->starting_time);
......@@ -1056,13 +1056,13 @@ void EnergyConsumptionTracer::set_filename(const string &filename)
_wbuf->append_text("time,energy,event_type,wattmin,epower\n");
}
void EnergyConsumptionTracer::add_job_start(double date, int job_id)
void EnergyConsumptionTracer::add_job_start(double date, JobIdentifier job_id)
{
(void) job_id;
add_entry(date, 's');
}
void EnergyConsumptionTracer::add_job_end(double date, int job_id)
void EnergyConsumptionTracer::add_job_end(double date, JobIdentifier job_id)
{
(void) job_id;
_context->energy_last_job_completion = add_entry(date, 'e');
......
......@@ -16,6 +16,7 @@
#include <simgrid/msg.h>
#include "machines.hpp"
#include "jobs.hpp"
struct BatsimContext;
struct Job;
......@@ -261,6 +262,7 @@ private:
std::vector<std::string> _colors; //!< Strings associated with colors, used for the jobs
PajeTracerState state = UNINITIALIZED; //!< The state of the PajeTracer
int nb_total_jobs = 0; //!< The total number of jobs added to Paje
};
......@@ -356,14 +358,14 @@ public:
* @param[in] date The date at which the job has been started
* @param[in] job_id The job unique number
*/
void add_job_start(double date, int job_id);
void add_job_start(double date, JobIdentifier job_id);
/**
* @brief Adds a job end in the tracer
* @param[in] date The date at which the job has ended
* @param[in] job_id The job unique number
*/
void add_job_end(double date, int job_id);
void add_job_end(double date, JobIdentifier job_id);
/**
* @brief Adds a power state change in the tracer
......
......@@ -279,20 +279,3 @@ IPMessage::~IPMessage()
data = nullptr;
}
JobIdentifier::JobIdentifier(const string &workload_name, int job_number) :
workload_name(workload_name),
job_number(job_number)
{
}
string JobIdentifier::to_string() const
{
return workload_name + '!' + std::to_string(job_number);
}
bool operator<(const JobIdentifier &ji1, const JobIdentifier &ji2)
{
return ji1.to_string() < ji2.to_string();
}
......@@ -46,8 +46,6 @@ int static_job_submitter_process(int argc, char *argv[])
Workload * workload = context->workloads.at(args->workload_name);
XBT_INFO("Nom : %s", (args->workload_name).c_str() );
string submitter_name = args->workload_name + "_submitter";
/* ░░░░░░░░▄▄▄███░░░░░░░░░░░░░░░░░░░░
......@@ -89,8 +87,6 @@ int static_job_submitter_process(int argc, char *argv[])
sort(jobsVector.begin(), jobsVector.end(), job_comparator_subtime_number);
XBT_INFO("taille vecteur : %d", (int) jobsVector.size() );
if (jobsVector.size() > 0)
{
const Job * first_submitted_job = *jobsVector.begin();
......@@ -105,10 +101,8 @@ int static_job_submitter_process(int argc, char *argv[])
//job->completion_notification_mailbox = "SOME_MAILBOX";
// Let's put the metadata about the job into the data storage
JobIdentifier job_id(workload->name, job->number);
string job_key = RedisStorage::job_key(job_id);
string job_key = RedisStorage::job_key(job->id);
string profile_key = RedisStorage::profile_key(workload->name, job->profile);
XBT_INFO("IN STATIC JOB SUBMITTER: '%s'", job->json_description.c_str());
if (context->redis_enabled)
{
......@@ -122,8 +116,7 @@ int static_job_submitter_process(int argc, char *argv[])
// Let's now continue the simulation
JobSubmittedMessage * msg = new JobSubmittedMessage;
msg->submitter_name = submitter_name;
msg->job_id.workload_name = args->workload_name;
msg->job_id.job_number = job->number;
msg->job_id = JobIdentifier(job->id);
send_message("server", IPMessageType::JOB_SUBMITTED, (void*)msg);
previous_submission_date = MSG_get_clock();
......@@ -279,7 +272,7 @@ static string submit_workflow_task_as_job(BatsimContext *context, string workflo
const string workload_name = workflow_name;
int job_number = task_id_counters[workflow_name];
string job_number = to_string(task_id_counters[workflow_name]);
task_id_counters[workflow_name]++;
......@@ -299,7 +292,7 @@ static string submit_workflow_task_as_job(BatsimContext *context, string workflo
// Create JSON description of Job corresponding to Task
double walltime = task->execution_time + 10.0;
string job_json_description = std::string() + "{" +
"\"id\": \"" + workload_name + "!" + std::to_string(job_number) + "\", " +
"\"id\": \"" + workload_name + "!" + job_number + "\", " +
"\"subtime\":" + std::to_string(MSG_get_clock()) + ", " +
"\"walltime\":" + std::to_string(walltime) + ", " +
"\"res\":" + std::to_string(task->num_procs) + ", " +
......@@ -311,11 +304,10 @@ static string submit_workflow_task_as_job(BatsimContext *context, string workflo
"Invalid workflow-injected JSON job");
context->workloads.at(workload_name)->jobs->add_job(job);
// Put the metadata about the job into the data storage
JobIdentifier job_id(workload_name, job_number);
if (context->redis_enabled)
{
// Put the metadata about the job into the data storage
JobIdentifier job_id(workload_name, job_number);
string job_key = RedisStorage::job_key(job_id);
context->storage.set(job_key, job_json_description);
......@@ -329,8 +321,7 @@ static string submit_workflow_task_as_job(BatsimContext *context, string workflo
// Submit the job
JobSubmittedMessage * msg = new JobSubmittedMessage;
msg->submitter_name = submitter_name;
msg->job_id.workload_name = workload_name;
msg->job_id.job_number = job_number;
msg->job_id = job_id;
send_message("server", IPMessageType::JOB_SUBMITTED, (void*)msg);
// HOWTO Test Wait Query
......@@ -346,11 +337,8 @@ static string submit_workflow_task_as_job(BatsimContext *context, string workflo
// XBT_INFO("Got my answer : %f", std::get<2>(answer));
(void)wait_for_query_answer; // Horrible hack to silence "unused" warning.
// Create an ID to return
string id_to_return = workload_name + "!" + std::to_string(job_number);
// Return a key
return id_to_return;
return job_id.to_string();
}
/**
......@@ -367,8 +355,7 @@ static string wait_for_job_completion(string submitter_name)
SubmitterJobCompletionCallbackMessage *notification_data =
(SubmitterJobCompletionCallbackMessage *) task_notification_data->data;
return notification_data->job_id.workload_name + "!" +
std::to_string(notification_data->job_id.job_number);
return notification_data->job_id.to_string();
}
/**
......@@ -404,14 +391,12 @@ int batexec_job_launcher_process(int argc, char *argv[])
for (auto & mit : jobs)
{
Job * job = mit.second;
job->id = JobIdentifier(workload->name, job->number);
int nb_res = job->required_nb_res;
int nb_res = job->requested_nb_res;
SchedulingAllocation * alloc = new SchedulingAllocation;
alloc->job_id.workload_name = args->workload_name;
alloc->job_id.job_number = job->number;
alloc->job_id = job->id;
alloc->hosts.clear();
alloc->hosts.reserve(nb_res);
alloc->machine_ids.clear();
......
This diff is collapsed.
......@@ -28,20 +28,31 @@ struct Job;
*/
struct JobIdentifier
{
/**
* @brief Creates an empty JobIdentifier
*/
JobIdentifier() {};
/**
* @brief Creates a JobIdentifier
* @param[in] workload_name The workload name
* @param[in] job_number The job number
* @param[in] job_name The job name
*/
explicit JobIdentifier(const std::string & workload_name,
const std::string &job_name);
/**
* @brief Creates a JobIdentifier from a string to parse
* @param[in] job_id_str The string to parse
*/
explicit JobIdentifier(const std::string & workload_name = "",
int job_number = -1);
explicit JobIdentifier(const std::string &job_id_str);
std::string workload_name; //!< The name of the workload the job belongs to
int job_number; //!< The job unique number inside its workload
std::string job_name; //!< The job unique name inside its workload
/**
* @brief Returns a string representation of the JobIdentifier.
* @details Output format is WORKLOAD_NAME!JOB_NUMBER
* @details Output format is WORKLOAD_NAME!JOB_NAME
* @return A string representation of the JobIdentifier.
*/
std::string to_string() const;
......@@ -142,7 +153,6 @@ struct Job
// Batsim internals
Workload * workload = nullptr; //!< The workload the job belongs to
int number; //!< The job unique number within its workload
JobIdentifier id; //!< The job unique identifier
BatTask * task = nullptr; //!< The root task be executed by this job (profile instantiation).
std::string json_description; //!< The JSON description of the job
......@@ -166,7 +176,7 @@ struct Job
std::string profile; //!< The job profile name. The corresponding profile tells how the job should be computed
Rational submission_time; //!< The job submission time: The time at which the becomes available
Rational walltime = -1; //!< The job walltime: if the job is executed for more than this amount of time, it will be killed. Set at -1 to disable this behavior
int required_nb_res; //!< The number of resources the job is requested to be executed on
int requested_nb_res; //!< The number of resources the job is requested to be executed on
int return_code = -1; //!< The return code of the job
public:
......@@ -206,6 +216,13 @@ public:
bool is_complete() const;
};
/**
* @brief Compares two Job thanks to their string representations
* @param[in] j1 The first Job
* @param[in] j2 The second Job
* @return j1.id < j2.id
*/
bool operator<(const Job & j1, const Job & j2);
/**
* @brief Compares job thanks to their submission times
......@@ -258,46 +275,47 @@ public:
void load_from_json(const rapidjson::Document & doc, const std::string & filename);
/**
* @brief Accesses one job thanks to its unique number
* @param[in] job_number The job unique number
* @return A pointer to the job associated to the given job number
* @brief Accesses one job thanks to its identifier
* @param[in] job_id The job id
* @return A pointer to the job associated to the given job id
*/
Job * operator[](int job_number);
Job * operator[](JobIdentifier job_id);
/**
* @brief Accesses one job thanks to its unique number (const version)
* @param[in] job_number The job unique number
* @return A (const) pointer to the job associated to the given job number
* @brief Accesses one job thanks to its unique name (const version)
* @param[in] job_id The job id
* @return A (const) pointer to the job associated to the given job id
*/
const Job * operator[](int job_number) const;
const Job * operator[](JobIdentifier job_id) const;
/**
* @brief Accesses one job thanks to its unique number
* @param[in] job_number The job unique number
* @return A pointer to the job associated to the given job number
* @brief Accesses one job thanks to its unique id
* @param[in] job_id The job unique id
* @return A pointer to the job associated to the given job id
*/
Job * at(int job_number);
Job * at(JobIdentifier job_id);
/**
* @brief Accesses one job thanks to its unique number (const version)
* @param[in] job_number The job unique number
* @return A (const) pointer to the job associated to the given job number
* @brief Accesses one job thanks to its unique id (const version)
* @param[in] job_id The job unique name
* @return A (const) pointer to the job associated to the given job
* name
*/
const Job * at(int job_number) const;
const Job * at(JobIdentifier job_id) const;
/**
* @brief Adds a job into a Jobs instance
* @param[in] job The job to add
* @pre No job with the same number exist in the Jobs instance
* @pre No job with the same name exist in the Jobs instance
*/
void add_job(Job * job);
/**
* @brief Allows to know whether a job exists
* @param[in] job_number The unique job number
* @return True if and only if a job with the given job number exists
* @param[in] job_id The unique job name
* @return True if and only if a job with the given job name exists
*/
bool exists(int job_number) const;
bool exists(JobIdentifier job_id) const;
/**
* @brief Allows to know whether the Jobs contains any SMPI job
......@@ -314,13 +332,13 @@ public:
* @brief Returns a copy of the std::map which contains the jobs
* @return A copy of the std::map which contains the jobs
*/
const std::map<int, Job*> & jobs() const;
const std::map<JobIdentifier, Job*> & jobs() const;
/**
* @brief Returns a reference to the std::map which contains the jobs
* @return A reference to the std::map which contains the jobs
*/
std::map<int, Job*> & jobs();
std::map<JobIdentifier, Job*> & jobs();
/**
* @brief Returns the number of jobs of the Jobs instance
......@@ -329,7 +347,7 @@ public:
int nb_jobs() const;
private:
std::map<int, Job*> _jobs; //!< The std::map which contains the jobs
std::map<JobIdentifier, Job*> _jobs; //!< The std::map which contains the jobs
Profiles * _profiles = nullptr; //!< The profiles associated with the jobs
Workload * _workload = nullptr; //!< The Workload the jobs belong to
};
......
......@@ -81,7 +81,7 @@ int execute_task(BatTask * btask,
job->workload->profiles->at(data->sequence[profile_index_in_sequence]));
btask->sub_tasks.push_back(sub_btask);
string task_name = "seq" + to_string(job->number) + "'" + job->profile + "'";
string task_name = "seq" + job->id.to_string() + "'" + job->profile + "'";
XBT_INFO("Creating sequential tasks '%s'", task_name.c_str());
int ret_last_profile = execute_task(sub_btask, context, allocation,
......@@ -182,7 +182,7 @@ int execute_task(BatTask * btask,
job->workload->profiles->at(profile_to_execute));
btask->sub_tasks.push_back(sub_btask);
string task_name = "recv" + to_string(job->number) + "'" + job->profile + "'";
string task_name = "recv" + job->id.to_string() + "'" + job->profile + "'";
XBT_INFO("Creating receive task '%s'", task_name.c_str());
int ret_last_profile = execute_task(sub_btask, context, allocation,
......@@ -224,7 +224,7 @@ int execute_task(BatTask * btask,
for (int i = 0; i < nb_ranks; ++i)
{
job->smpi_ranks_to_hosts_mapping[i] = host_to_use;
++host_to_use %= job->required_nb_res; // ++ is done first
++host_to_use %= job->requested_nb_res; // ++ is done first
}
}
......@@ -236,7 +236,7 @@ int execute_task(BatTask * btask,
for (int i = 0; i < nb_ranks; ++i)
{
char *str_instance_id = NULL;
int ret = asprintf(&str_instance_id, "%s!%d", job->workload->name.c_str(), job->number);
int ret = asprintf(&str_instance_id, "%s", job->id.to_string().c_str());
(void) ret; // Avoids a warning if assertions are ignored
xbt_assert(ret != -1, "asprintf failed (not enough memory?)");
......@@ -245,7 +245,7 @@ int execute_task(BatTask * btask,
xbt_assert(ret != -1, "asprintf failed (not enough memory?)");
char *str_pname = NULL;
ret = asprintf(&str_pname, "%d_%d", job->number, i);
ret = asprintf(&str_pname, "%s_%d", job->id.to_string().c_str(), i);
xbt_assert(ret != -1, "asprintf failed (not enough memory?)");
char **argv = xbt_new(char*, 5);
......@@ -359,7 +359,7 @@ int execute_job_process(int argc, char *argv[])
ExecuteJobProcessArguments * args = (ExecuteJobProcessArguments *) MSG_process_get_data(MSG_process_self());
Workload * workload = args->context->workloads.at(args->allocation->job_id.workload_name);
Job * job = workload->jobs->at(args->allocation->job_id.job_number);
Job * job = workload->jobs->at(args->allocation->job_id);
job->starting_time = MSG_get_clock();
job->allocation = args->allocation->machine_ids;
double remaining_time = (double)job->walltime;
......@@ -369,7 +369,7 @@ int execute_job_process(int argc, char *argv[])
{
job->consumed_energy = consumed_energy_on_machines(args->context, job->allocation);
// Let's trace the consumed energy
args->context->energy_tracer.add_job_start(MSG_get_clock(), job->number);
args->context->energy_tracer.add_job_start(MSG_get_clock(), job->id);
}
// Job computation
......@@ -430,7 +430,7 @@ int execute_job_process(int argc, char *argv[])
job->consumed_energy -= job->consumed_energy - consumed_energy_before;
// Let's trace the consumed energy
args->context->energy_tracer.add_job_end(MSG_get_clock(), job->number);
args->context->energy_tracer.add_job_end(MSG_get_clock(), job->id);
}
if (args->notify_server_at_end)
......@@ -553,7 +553,7 @@ int killer_process(int argc, char *argv[])
job->consumed_energy = job->consumed_energy - consumed_energy_before;
// Let's trace the consumed energy
args->context->energy_tracer.add_job_end(MSG_get_clock(), job->number);
args->context->energy_tracer.add_job_end(MSG_get_clock(), job->id);
}
}
}
......
......@@ -76,46 +76,3 @@ int request_reply_scheduler_process(int argc, char *argv[])
delete args;
return 0;
}
std::string absolute_filename(const std::string & filename)
{
xbt_assert(filename.length() > 0);
// Let's assume filenames starting by "/" are absolute.
if (filename[0] == '/')
{
return filename;
}
char cwd_buf[PATH_MAX];
char * getcwd_ret = getcwd(cwd_buf, PATH_MAX);
xbt_assert(getcwd_ret == cwd_buf, "getcwd failed");
return string(getcwd_ret) + '/' + filename;
}
void parse_job_identifier(const std::string & job_identifier_string,
JobIdentifier & job_id)
{
// Let's split the job_identifier by '!'
vector<string> job_identifier_parts;
boost::split(job_identifier_parts, job_identifier_string, boost::is_any_of("!"), boost::token_compress_on);
XBT_DEBUG("Parsed job_identifier: '%d'", job_identifier_parts.size());
if (job_identifier_parts.size() == 1)
{
XBT_WARN("Job ID is not of format WORKLOAD!NUMBER... assuming static!");
job_id.workload_name = "static";
job_id.job_number = std::stoi(job_identifier_parts[0]);
}
else if (job_identifier_parts.size() == 2)
{
job_id.workload_name = job_identifier_parts[0];
job_id.job_number = std::stoi(job_identifier_parts[1]);
}
else
{
xbt_assert("Empty or invalid job identifier: %s", job_identifier_string.c_str());
}
}
......@@ -37,19 +37,3 @@ enum NetworkStamp : char
* @return 0
*/
int request_reply_scheduler_process(int argc, char *argv[]);
/**
* @brief Computes the absolute filename of a given file
* @param[in] filename The name of the file (not necessarily existing).
* @return The absolute filename corresponding to the given filename
*/
std::string absolute_filename(const std::string & filename);
/**
* @brief Retrieves the workload_name and the job_id from a job_identifier
* @details Job identifiers are in the form [WORKLOAD_NAME!]JOB_ID
* @param[in] job_identifier_string The input job identifier string
* @param[out] job_id The output JobIdentifier
*/
void parse_job_identifier(const std::string & job_identifier_string, JobIdentifier & job_id);
......@@ -588,9 +588,10 @@ void JsonProtocolReader::parse_and_apply_event(const Value & event_object,
const Value & data_object = event_object["data"];
auto handler_function = _type_to_handler_map[type];
XBT_INFO("Start Processing Event number: %d, Type: %s", event_number, type.c_str());
XBT_DEBUG("Starting event processing (number: %d, Type: %s)", event_number, type.c_str());
handler_function(this, event_number, timestamp, data_object);
XBT_DEBUG("Finished event processing (number: %d, Type: %s)", event_number, type.c_str());
}
void JsonProtocolReader::handle_query(int event_number, double timestamp, const Value &data_object)
......@@ -706,7 +707,7 @@ void JsonProtocolReader::handle_reject_job(int event_number,
string job_id = job_id_value.GetString();
JobRejectedMessage * message = new JobRejectedMessage;
parse_job_identifier(job_id, message->job_id);
message->job_id = JobIdentifier(job_id);
if (!(context->workloads.job_is_registered(message->job_id)))
{
xbt_assert(false, "The folowing job does not exist.", job_id.c_str());
......@@ -754,7 +755,7 @@ void JsonProtocolReader::handle_execute_job(int event_number,
string job_id = job_id_value.GetString();
// Let's retrieve the job identifier
parse_job_identifier(job_id, message->allocation->job_id);
message->allocation->job_id = JobIdentifier(job_id);
if (!(context->workloads.job_is_registered(message->allocation->job_id)))
{
xbt_assert(false, "Invalid message in event %d (EXECUTE_JOB): job %d with job_id:'%s' does not exists", event_number, job_id.c_str());
......@@ -964,8 +965,7 @@ void JsonProtocolReader::handle_set_job_metadata(int event_number,
boost::regex r("[^\"]*");
xbt_assert(boost::regex_match(metadata, r), "Invalid JSON message: the 'metadata' value in the 'data' value of event %d (SET_JOB_METADATA) should not contain double quotes (got ###%s###)", event_number, metadata.c_str());
JobIdentifier job_identifier;
parse_job_identifier(job_id, job_identifier);
JobIdentifier job_identifier = JobIdentifier(job_id);
if (!(context->workloads.job_is_registered(job_identifier)))
{
xbt_assert(false, "Invalid message in event %d (SET_JOB_METADATA): The folowing job does not exist: %s", event_number, job_id.c_str());
......@@ -1032,7 +1032,7 @@ void JsonProtocolReader::handle_change_job_state(int event_number,
ChangeJobStateMessage * message = new ChangeJobStateMessage;
parse_job_identifier(job_id, message->job_id);
message->job_id = JobIdentifier(job_id);
if (!(context->workloads.job_is_registered(message->job_id)))
{
xbt_assert(false, "The folowing job does not exist.", job_id.c_str());
......@@ -1106,7 +1106,7 @@ void JsonProtocolReader::handle_to_job_msg(int event_number,
ToJobMessage * message = new ToJobMessage;
parse_job_identifier(job_id, message->job_id);
message->job_id = JobIdentifier(job_id);
if (!(context->workloads.job_is_registered(message->job_id)))
{
xbt_assert(false, "The folowing job does not exist.", job_id.c_str());
......@@ -1157,7 +1157,7 @@ void JsonProtocolReader::handle_submit_job(int event_number,
xbt_assert(job_id_value.IsString(), "Invalid JSON message: in event %d (SUBMIT_JOB): ['data']['job_id'] should be a string", event_number);
string job_id = job_id_value.GetString();
parse_job_identifier(job_id, message->job_id);
message->job_id = JobIdentifier(job_id);
if (context->workloads.exists(message->job_id.workload_name))
{
xbt_assert(!context->workloads.job_is_registered(message->job_id), "Invalid message in event %d (SUBMIT_JOB): job_id '%s' already exists", event_number, job_id.c_str());
......@@ -1195,7 +1195,7 @@ void JsonProtocolReader::handle_submit_job(int event_number,
}
else
{
workload = new Workload(message->job_id.workload_name, "Dynamic");
workload = new Workload(message->job_id.workload_name);
context->workloads.insert_workload(workload->name, workload);
}
......@@ -1203,8 +1203,10 @@ void JsonProtocolReader::handle_submit_job(int event_number,
XBT_INFO("Parsing user-submitted job %s", message->job_id.to_string().c_str());
Job * job = Job::from_json(message->job_description, workload,
"Invalid JSON job submitted by the scheduler");
assert(job->id.job_name == message->job_id.job_name);
assert(job->id.workload_name == message->job_id.workload_name);
workload->jobs->add_job(job);
job->id = JobIdentifier(workload->name, job->number);
job->state = JobState::JOB_STATE_SUBMITTED;
// Read the profile description if possible
......@@ -1220,7 +1222,7 @@ void JsonProtocolReader::handle_submit_job(int event_number,
profile_object.Accept(writer);
message->job_profile_description = string(buffer.GetString(), buffer.GetSize());
XBT_INFO("A profile was submited with the job '%d' in workload '%s': %s", job->number, workload->name.c_str(), message->job_profile_description.c_str());
XBT_INFO("A profile was submited with the job '%s' : %s", job->id.to_string().c_str(), message->job_profile_description.c_str());
}
else if (context->redis_enabled)
{
......@@ -1317,7 +1319,7 @@ void JsonProtocolReader::handle_submit_profile(int event_number,
}
else
{
workload = new Workload(message->workload_name, "Dynamic");
workload = new Workload(message->workload_name);
context->workloads.insert_workload(workload->name, workload);
}
......@@ -1366,7 +1368,7 @@ void JsonProtocolReader::handle_kill_job(int event_number,
for (unsigned int i = 0; i < job_ids_array.Size(); ++i)
{
const Value & job_id_value = job_ids_array[i];
parse_job_identifier(job_id_value.GetString(), message->jobs_ids[i]);
message->jobs_ids[i] = JobIdentifier(job_id_value.GetString());
if (!(context->workloads.job_is_registered(message->jobs_ids[i])))
{
xbt_assert(false, "Invalid message in event %d (KILL_JOB): job %d with job_id:'%s' does not exists", event_number, i, message->jobs_ids[i].to_string().c_str());
......
......@@ -205,8 +205,8 @@ void server_on_job_completed(ServerData * data,
xbt_assert(data->nb_completed_jobs + data->nb_running_jobs <= data->nb_submitted_jobs);
Job * job = data->context->workloads.job_at(message->job_id);
XBT_INFO("Job %s!%d COMPLETED. %d jobs completed so far",
job->workload->name.c_str(), job->number, data->nb_completed_jobs);
XBT_INFO("Job COMPLETED. %d jobs completed so far",
job->id.to_string().c_str(), data->nb_completed_jobs);
string status = "UNKNOWN";
if (job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY)
......@@ -257,7 +257,10 @@ void server_on_job_submitted(ServerData * data,
}
// Let's retrieve the Job from memory (or add it into memory if it is dynamic)
XBT_INFO("GOT JOB: %s %d\n", message->job_id.workload_name.c_str(), message->job_id.job_number);
XBT_INFO("Job received: %s\n", message->job_id.to_string().c_str());
XBT_INFO("Workloads: %s", data->context->workloads.to_string().c_str());
xbt_assert(data->context->workloads.job_is_registered(message->job_id));
Job * job = data->context->workloads.job_at(message->job_id);
job->id = message->job_id;
......@@ -542,6 +545,9 @@ void server_on_submit_job(ServerData * data,
// Let's update global states
++data->nb_submitted_jobs;
xbt_assert(data->context->workloads.exists(message->job_id.workload_name),
"Internal error: Workload '%s' should exist.",
message->job_id.workload_name.c_str());
xbt_assert(data->context->workloads.job_is_registered(message->job_id),
"Internal error: job '%s' should exist.",
message->job_id.to_string().c_str());
......@@ -594,19 +600,7 @@ void server_on_submit_profile(ServerData * data,
"seems disabled... It can be activated by specifying a configuration file "
"to Batsim.");
// Let's create the workload if it doesn't exist, or retrieve it otherwise
Workload * workload = nullptr;
if (data->context->workloads.exists(message->workload_name))
{
workload = data->context->workloads.at(message->workload_name);
}