Commit 3ecccd22 authored by Millian Poquet's avatar Millian Poquet

Major commit: multiple workloads.

This commit is a big step forward to handle multiple workloads at the same time.

Protocol update. Jobs are no longer identified only by a unique number, but
by a workload_name and a unique number within this workload_name. The separator
between these two fields is '!'. The default workload, the one read from the
input JSON file, is called "static". If the scheduler gives an order about
a job by only calling it by its unique number, the 'static' workload is
assumed, for compatibility reasons.

Batsim code update. There is no longer "Jobs" and "Profiles" roaming in the
BatsimContext. Now, Jobs and Profiles are grouped together inside one Workload
instance. Different Workload instances can be stored into a Workloads (note
the 's'!), which is just a map<string, Workload*> with wrapping methods
to simplify job handling. A Workloads is instanciated in the BatsimContext.

Batsim code update. Jobs are now identified by a structure JobIdentifier.
This struct is just a string (the workload name) and an integer (the unique
job number within its workload). JobIdentifier are used within most
ipp messages now. Furthermore, jobs know in which workload they are,
and Jobs know in which Workload they are.

Batsim tests pass this commit, but a Redis server should be launched to
avoid assertion fails. Redis is not yet used on the scheduler side,
this part has not been tested yet. Travis has not been updated yet to
install and execute Redis, so the tests won't work on Travis at the moment.
parent 6139096c
...@@ -38,35 +38,35 @@ the job ID of the job which just completed. This part is not mandatory, it depen ...@@ -38,35 +38,35 @@ the job ID of the job which just completed. This part is not mandatory, it depen
# Message Stamps # # Message Stamps #
| Proto. version | Stamp | Direction | Content syntax | Meaning | Proto. version | Stamp | Direction | Content syntax | Meaning
|---------------- |-------|-------------- |------------------------- |------------- |---------------- |-------|-------------- |-------------------------------- |-------------
| 0+ | S | Bastim->Sched | JOB_ID | Job submission: one (static) job is available and can now be allocated to resources. | 2+ | S | Bastim->Sched | WLOAD!JOB_ID | Job submission: job JOB_ID of workload WLOAD is available and can now be allocated to resources.
| 0+ | C | Batsim->Sched | JOB_ID | Job completion: one (static) job finished its execution. | 2+ | C | Batsim->Sched | WLOAD!JOB_ID | Job completion: job JOB_ID of workload WLOAD finished its execution.
| 0+ | J | Sched->Batsim | JID1=MID1,MID2,MIDn[;...]| Job allocation: tells to put job JID1 on machines MID1, ..., MIDn. Many jobs might be allocated in the same event. Each MIDk part can be a single machine ID or a closed interval MIDa-MIDb where MIDa <= MIDb | 2+ | J | Sched->Batsim | WLOAD!JID1=MID1,MID2,MIDn[;...] | Job allocation: tells to put job JID1 of workload WLOAD on machines MID1, ..., MIDn. Many jobs might be allocated in the same event. Each MIDk part can be a single machine ID or a closed interval MIDa-MIDb where MIDa <= MIDb
| 0+ | N | Both | No content | NOP: tells to do nothing / nothing happened. | 0+ | N | Both | No content | NOP: tells to do nothing / nothing happened.
| 1+ | P | Sched->Batsim | MID1,MID2,MIDn=PSTATE | Asks to change the power state of some machines. Each MIDk part can be a single machine ID or a closed interval MIDa-MIDb where MIDa <= MIDb | 1+ | P | Sched->Batsim | MID1,MID2,MIDn=PSTATE | Asks to change the power state of some machines. Each MIDk part can be a single machine ID or a closed interval MIDa-MIDb where MIDa <= MIDb
| 1+ | p | Batsim->Sched | MID1,MID2,MIDn=PSTATE | Tells the scheduler that the power state of one or several machines has changed. Each MIDk part can be a single machine ID or a closed interval MIDa-MIDb where MIDa <= MIDb. There is one and only one 'p' message for each 'P' message. | 1+ | p | Batsim->Sched | MID1,MID2,MIDn=PSTATE | Tells the scheduler that the power state of one or several machines has changed. Each MIDk part can be a single machine ID or a closed interval MIDa-MIDb where MIDa <= MIDb. There is one and only one 'p' message for each 'P' message.
| 1+ | R | Sched->Batsim | JOB_ID | Job rejection: the scheduler tells that one (static) job will not be computed. | 2+ | R | Sched->Batsim | WLOAD!JOB_ID | Job rejection: the scheduler tells that one (static) job will not be computed.
| 1+ | n | Sched->Batsim | TIME | NOP me later: the scheduler asks to be awaken at the given simulation time TIME. | 1+ | n | Sched->Batsim | TIME | NOP me later: the scheduler asks to be awaken at the given simulation time TIME.
| 1+ | E | Sched->Batsim | No content | Asks Batsim about the total consumed energy (from time 0 to now) in Joules. Works only in energy mode. | 1+ | E | Sched->Batsim | No content | Asks Batsim about the total consumed energy (from time 0 to now) in Joules. Works only in energy mode.
| 1+ | e | Batsim->Sched | CONSUMED_ENERGY | Batsim tells the total consumed energy (from time 0 to now) in Joules. Works only in energy mode. There is one and only one 'e' message for each 'E' message. | 1+ | e | Batsim->Sched | CONSUMED_ENERGY | Batsim tells the total consumed energy (from time 0 to now) in Joules. Works only in energy mode. There is one and only one 'e' message for each 'E' message.
# Message Examples # # Message Examples #
## Static Job Submission ## ## Static Job Submission ##
Batsim -> Scheduler Batsim -> Scheduler
0:10.000015|10.000015:S:1 0:10.000015|10.000015:S:static!1
0:13|12:S:2|12.5:S:3|13:S:4 0:13|12:S:2|12.5:S:3|13:S:static!4
## Static Job Completion ## ## Static Job Completion ##
Batsim -> Scheduler Batsim -> Scheduler
0:15.836694|15.836694:C:1 0:15.836694|15.836694:C:static!1
0:40.001320|25:C:2|38.002565:C:3 0:40.001320|25:C:2|38.002565:C:static!3
## Static Job Allocation ## ## Static Job Allocation ##
Scheduler -> Batsim Scheduler -> Batsim
0:15.000015|15.000015:J:1=1,2,0,3;2=3 0:15.000015|15.000015:J:static!1=1,2,0,3;static!2=3
0:45.00132|45.00132:J:4=3,1,2,0 0:45.00132|45.00132:J:static!4=3,1,2,0
## NOP ## ## NOP ##
Scheduler -> Batsim or Batsim -> Scheduler Scheduler -> Batsim or Batsim -> Scheduler
...@@ -85,7 +85,7 @@ the job ID of the job which just completed. This part is not mandatory, it depen ...@@ -85,7 +85,7 @@ the job ID of the job which just completed. This part is not mandatory, it depen
## Static Job Rejection ## ## Static Job Rejection ##
Scheduler -> Batsim Scheduler -> Batsim
0:50|50:R:5 0:50|50:R:static!5
## NOP Me Later ## ## NOP Me Later ##
Scheduler -> Batsim Scheduler -> Batsim
......
...@@ -149,6 +149,11 @@ class Batsim(object): ...@@ -149,6 +149,11 @@ class Batsim(object):
# [ (timestamp, txtDATA), ...] # [ (timestamp, txtDATA), ...]
self._msgs_to_send = [] self._msgs_to_send = []
# TODO: job identifiers are now WORKLOAD!JOB_NUMBER.
# The hack in the next loop allows pybatsim to still work with static
# jobs, but the new syntax should be handled so pybatsim also handles
# dynamic jobs.
for i in range(1, len(sub_msgs)): for i in range(1, len(sub_msgs)):
data = sub_msgs[i].split(':') data = sub_msgs[i].split(':')
if data[1] == 'R': if data[1] == 'R':
...@@ -156,10 +161,15 @@ class Batsim(object): ...@@ -156,10 +161,15 @@ class Batsim(object):
elif data[1] == 'N': elif data[1] == 'N':
self.scheduler.onNOP() self.scheduler.onNOP()
elif data[1] == 'S': elif data[1] == 'S':
self.scheduler.onJobSubmission(self.jobs[int(data[2])]) # Received WORKLOAD_NAME!JOB_ID
workload_name, job_id = data[2].split('!')
job_id = int(job_id)
self.scheduler.onJobSubmission(self.jobs[job_id])
self.nb_jobs_recieved += 1 self.nb_jobs_recieved += 1
elif data[1] == 'C': elif data[1] == 'C':
j = self.jobs[int(data[2])] workload_name, job_id = data[2].split('!')
job_id = int(job_id)
j = self.jobs[job_id]
j.finish_time = float(data[0]) j.finish_time = float(data[0])
self.scheduler.onJobCompletion(j) self.scheduler.onJobCompletion(j)
elif data[1] == 'p': elif data[1] == 'p':
......
...@@ -289,9 +289,12 @@ int main(int argc, char * argv[]) ...@@ -289,9 +289,12 @@ int main(int argc, char * argv[])
context.allow_space_sharing = mainArgs.allow_space_sharing; context.allow_space_sharing = mainArgs.allow_space_sharing;
context.trace_schedule = mainArgs.enable_schedule_tracing; context.trace_schedule = mainArgs.enable_schedule_tracing;
// Loading the static workload
int nb_machines_by_workload; int nb_machines_by_workload;
load_json_workload(&context, mainArgs.workloadFilename, nb_machines_by_workload); const string static_workload_name = "static";
context.jobs.setProfiles(&context.profiles); Workload * static_workload = new Workload;
static_workload->load_from_json(mainArgs.workloadFilename, nb_machines_by_workload);
context.workloads.insert_workload(static_workload_name, static_workload);
int limit_machines_count = -1; int limit_machines_count = -1;
if ((mainArgs.limit_machines_count_by_workload) && (mainArgs.limit_machines_count > 0)) if ((mainArgs.limit_machines_count_by_workload) && (mainArgs.limit_machines_count > 0))
...@@ -305,7 +308,7 @@ int main(int argc, char * argv[]) ...@@ -305,7 +308,7 @@ int main(int argc, char * argv[])
XBT_INFO("The number of machines will be limited to %d", limit_machines_count); XBT_INFO("The number of machines will be limited to %d", limit_machines_count);
XBT_INFO("Checking whether SMPI is used or not..."); XBT_INFO("Checking whether SMPI is used or not...");
context.smpi_used = context.jobs.containsSMPIJob(); context.smpi_used = static_workload->jobs->containsSMPIJob();
if (!context.smpi_used) if (!context.smpi_used)
{ {
XBT_INFO("SMPI will NOT be used."); XBT_INFO("SMPI will NOT be used.");
...@@ -314,7 +317,7 @@ int main(int argc, char * argv[]) ...@@ -314,7 +317,7 @@ int main(int argc, char * argv[])
else else
{ {
XBT_INFO("SMPI will be used."); XBT_INFO("SMPI will be used.");
register_smpi_applications(&context); static_workload->register_smpi_applications();
SMPI_init(); SMPI_init();
} }
...@@ -380,7 +383,8 @@ int main(int argc, char * argv[]) ...@@ -380,7 +383,8 @@ int main(int argc, char * argv[])
XBT_INFO("Creating jobs_submitter process..."); XBT_INFO("Creating jobs_submitter process...");
JobSubmitterProcessArguments * submitterArgs = new JobSubmitterProcessArguments; JobSubmitterProcessArguments * submitterArgs = new JobSubmitterProcessArguments;
submitterArgs->context = &context; submitterArgs->context = &context;
MSG_process_create("jobs_submitter", job_submitter_process, (void*)submitterArgs, masterMachine->host); submitterArgs->workload_name = static_workload_name;
MSG_process_create("jobs_submitter", static_job_submitter_process, (void*)submitterArgs, masterMachine->host);
XBT_INFO("The jobs_submitter process has been created."); XBT_INFO("The jobs_submitter process has been created.");
XBT_INFO("Creating the uds_server process..."); XBT_INFO("Creating the uds_server process...");
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
#include "export.hpp" #include "export.hpp"
#include "pstate.hpp" #include "pstate.hpp"
#include "storage.hpp" #include "storage.hpp"
#include "workload.hpp"
/** /**
* @brief The Batsim context * @brief The Batsim context
...@@ -20,8 +21,7 @@ struct BatsimContext ...@@ -20,8 +21,7 @@ struct BatsimContext
{ {
UnixDomainSocket socket; //!< The UnixDomainSocket UnixDomainSocket socket; //!< The UnixDomainSocket
Machines machines; //!< The machines Machines machines; //!< The machines
Jobs jobs; //!< The jobs Workloads workloads; //!< The workloads
Profiles profiles; //!< The profiles
PajeTracer paje_tracer; //!< The PajeTracer PajeTracer paje_tracer; //!< The PajeTracer
PStateChangeTracer pstate_tracer; //!< The PStateChangeTracer PStateChangeTracer pstate_tracer; //!< The PStateChangeTracer
EnergyConsumptionTracer energy_tracer; //!< The EnergyConsumptionTracer EnergyConsumptionTracer energy_tracer; //!< The EnergyConsumptionTracer
......
...@@ -492,41 +492,48 @@ void exportJobsToCSV(const std::string &filename, const BatsimContext *context) ...@@ -492,41 +492,48 @@ void exportJobsToCSV(const std::string &filename, const BatsimContext *context)
xbt_assert(f.is_open(), "Cannot write file '%s'", filename.c_str()); xbt_assert(f.is_open(), "Cannot write file '%s'", filename.c_str());
// write headers // write headers
f << "jobID,submission_time,requested_number_of_processors,requested_time,success,starting_time,execution_time,finish_time,waiting_time,turnaround_time,stretch,consumed_energy,allocated_processors\n"; f << "job_number,workload_name,submission_time,requested_number_of_processors,requested_time,success,starting_time,execution_time,finish_time,waiting_time,turnaround_time,stretch,consumed_energy,allocated_processors\n";
const auto & jobs = context->jobs.jobs(); for (const auto mit : context->workloads.workloads())
for (const auto & mit : jobs)
{ {
Job * job = mit.second; string workload_name = mit.first;
const Workload * workload = mit.second;
if (job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY || job->state == JobState::JOB_STATE_COMPLETED_KILLED) const auto & jobs = workload->jobs->jobs();
for (const auto & mit : jobs)
{ {
char * buf = nullptr; Job * job = mit.second;
int success = (job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY);
xbt_assert(job->runtime >= 0); if (job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY || job->state == JobState::JOB_STATE_COMPLETED_KILLED)
{
int ret = asprintf(&buf, "%d,%lf,%d,%lf,%d,%lf,%lf,%lf,%lf,%lf,%lf,%Lf,", // finished by a ',' because the next part is written after asprintf char * buf = nullptr;
job->id, int success = (job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY);
job->submission_time, xbt_assert(job->runtime >= 0);
job->required_nb_res,
job->walltime, int ret = asprintf(&buf, "%d,%s,%lf,%d,%lf,%d,%lf,%lf,%lf,%lf,%lf,%lf,%Lf,", // finished by a ',' because the next part is written after asprintf
success, job->number, // job_id
job->starting_time, workload_name.c_str(), // workload_name
job->runtime, job->submission_time, // submission_time
job->starting_time + job->runtime, // finish_time job->required_nb_res, // requested_number_of_processors
job->starting_time - job->submission_time, // waiting_time job->walltime, // requested_time
job->starting_time + job->runtime - job->submission_time, // turnaround_time success, // success
(job->starting_time + job->runtime - job->submission_time) / job->runtime, // stretch job->starting_time, // starting_time
job->consumed_energy job->runtime, // execution_time
); job->starting_time + job->runtime, // finish_time
(void) ret; // Avoids a warning if assertions are ignored job->starting_time - job->submission_time, // waiting_time
xbt_assert(ret != -1, "asprintf failed (not enough memory?)"); job->starting_time + job->runtime - job->submission_time, // turnaround_time
f << buf; (job->starting_time + job->runtime - job->submission_time) / job->runtime, // stretch
free(buf); job->consumed_energy // consumed energy
);
xbt_assert((int)job->allocation.size() == job->required_nb_res); (void) ret; // Avoids a warning if assertions are ignored
xbt_assert(ret != -1, "asprintf failed (not enough memory?)");
f << job->allocation.to_string_hyphen(" ") << "\n"; f << buf;
free(buf);
xbt_assert((int)job->allocation.size() == job->required_nb_res);
f << job->allocation.to_string_hyphen(" ") << "\n";
}
} }
} }
...@@ -552,33 +559,38 @@ void exportScheduleToCSV(const std::string &filename, const BatsimContext *conte ...@@ -552,33 +559,38 @@ void exportScheduleToCSV(const std::string &filename, const BatsimContext *conte
long double seconds_used_by_scheduler = context->microseconds_used_by_scheduler / (long double)1e6; long double seconds_used_by_scheduler = context->microseconds_used_by_scheduler / (long double)1e6;
const auto & jobs = context->jobs.jobs(); for (const auto mit : context->workloads.workloads())
for (const auto & mit : jobs)
{ {
Job * job = mit.second; const Workload * workload = mit.second;
if (job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY || job->state == JobState::JOB_STATE_COMPLETED_KILLED) const auto & jobs = workload->jobs->jobs();
for (const auto & mit : jobs)
{ {
nb_jobs_finished++; Job * job = mit.second;
if (job->runtime < min_job_execution_time) if (job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY || job->state == JobState::JOB_STATE_COMPLETED_KILLED)
min_job_execution_time = job->runtime; {
if (job->runtime > max_job_execution_time) nb_jobs_finished++;
max_job_execution_time = job->runtime;
if (job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY) if (job->runtime < min_job_execution_time)
nb_jobs_success++; min_job_execution_time = job->runtime;
else if (job->runtime > max_job_execution_time)
nb_jobs_killed++; max_job_execution_time = job->runtime;
double completion_time = job->starting_time + job->runtime; if (job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY)
double turnaround_time = job->starting_time + job->runtime - job->submission_time; nb_jobs_success++;
else
nb_jobs_killed++;
if (completion_time > makespan) double completion_time = job->starting_time + job->runtime;
makespan = completion_time; double turnaround_time = job->starting_time + job->runtime - job->submission_time;
if (turnaround_time > max_turnaround_time) if (completion_time > makespan)
max_turnaround_time = turnaround_time; makespan = completion_time;
if (turnaround_time > max_turnaround_time)
max_turnaround_time = turnaround_time;
}
} }
} }
......
...@@ -151,3 +151,8 @@ IPMessage::~IPMessage() ...@@ -151,3 +151,8 @@ IPMessage::~IPMessage()
data = nullptr; data = nullptr;
} }
string JobIdentifier::to_string() const
{
return workload_name + '!' + std::to_string(job_number);
}
...@@ -14,6 +14,22 @@ ...@@ -14,6 +14,22 @@
struct BatsimContext; struct BatsimContext;
/**
* @brief A simple structure used to identify one job
*/
struct JobIdentifier
{
std::string workload_name; //!< The name of the workload the job belongs to
int job_number; //!< The job unique number inside its workload
/**
* @brief Returns a string representation of the JobIdentifier.
* @details Output format is WORKLOAD_NAME!JOB_NUMBER
* @return A string representation of the JobIdentifier.
*/
std::string to_string() const;
};
/** /**
* @brief Stores the different types of inter-process messages * @brief Stores the different types of inter-process messages
*/ */
...@@ -40,7 +56,7 @@ enum class IPMessageType ...@@ -40,7 +56,7 @@ enum class IPMessageType
*/ */
struct JobSubmittedMessage struct JobSubmittedMessage
{ {
int job_id; //!< The job ID JobIdentifier job_id; //!< The JobIdentifier
}; };
/** /**
...@@ -48,7 +64,7 @@ struct JobSubmittedMessage ...@@ -48,7 +64,7 @@ struct JobSubmittedMessage
*/ */
struct JobCompletedMessage struct JobCompletedMessage
{ {
int job_id; //!< The job ID JobIdentifier job_id; //!< The JobIdentifier
}; };
/** /**
...@@ -56,7 +72,7 @@ struct JobCompletedMessage ...@@ -56,7 +72,7 @@ struct JobCompletedMessage
*/ */
struct JobRejectedMessage struct JobRejectedMessage
{ {
int job_id; //!< The job ID JobIdentifier job_id; //!< The JobIdentifier
}; };
/** /**
...@@ -64,7 +80,7 @@ struct JobRejectedMessage ...@@ -64,7 +80,7 @@ struct JobRejectedMessage
*/ */
struct SchedulingAllocation struct SchedulingAllocation
{ {
int job_id; //!< The job unique number JobIdentifier job_id; //!< The JobIdentifier
MachineRange machine_ids; //!< The IDs of the machines on which the job should be allocated MachineRange machine_ids; //!< The IDs of the machines on which the job should be allocated
std::vector<msg_host_t> hosts; //!< The corresponding SimGrid hosts std::vector<msg_host_t> hosts; //!< The corresponding SimGrid hosts
}; };
...@@ -177,6 +193,7 @@ struct SwitchPStateProcessArguments ...@@ -177,6 +193,7 @@ struct SwitchPStateProcessArguments
struct JobSubmitterProcessArguments struct JobSubmitterProcessArguments
{ {
BatsimContext * context; //!< The BatsimContext BatsimContext * context; //!< The BatsimContext
std::string workload_name; //!< The name of the workload the submitter should use
}; };
/** /**
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
using namespace std; using namespace std;
int job_submitter_process(int argc, char *argv[]) int static_job_submitter_process(int argc, char *argv[])
{ {
(void) argc; (void) argc;
(void) argv; (void) argv;
...@@ -22,13 +22,19 @@ int job_submitter_process(int argc, char *argv[]) ...@@ -22,13 +22,19 @@ int job_submitter_process(int argc, char *argv[])
JobSubmitterProcessArguments * args = (JobSubmitterProcessArguments *) MSG_process_get_data(MSG_process_self()); JobSubmitterProcessArguments * args = (JobSubmitterProcessArguments *) MSG_process_get_data(MSG_process_self());
BatsimContext * context = args->context; BatsimContext * context = args->context;
xbt_assert(context->workloads.exists(args->workload_name),
"Error: a static_job_submitter_process is in charge of workload '%s', "
"which does not exist", args->workload_name.c_str());
Workload * workload = context->workloads.at(args->workload_name);
send_message("server", IPMessageType::SUBMITTER_HELLO); send_message("server", IPMessageType::SUBMITTER_HELLO);
double previousSubmissionDate = MSG_get_clock(); double previousSubmissionDate = MSG_get_clock();
vector<const Job *> jobsVector; vector<const Job *> jobsVector;
const auto & jobs = context->jobs.jobs(); const auto & jobs = workload->jobs->jobs();
for (const auto & mit : jobs) for (const auto & mit : jobs)
{ {
const Job * job = mit.second; const Job * job = mit.second;
...@@ -46,8 +52,17 @@ int job_submitter_process(int argc, char *argv[]) ...@@ -46,8 +52,17 @@ int job_submitter_process(int argc, char *argv[])
if (job->submission_time > previousSubmissionDate) if (job->submission_time > previousSubmissionDate)
MSG_process_sleep(job->submission_time - previousSubmissionDate); MSG_process_sleep(job->submission_time - previousSubmissionDate);
// Let's put the metadata about the job into the data storage
string job_id_string = args->workload_name + "!" + to_string(job->number);
string job_key = "job_" + job_id_string;
string profile_key = "profile_" + job_id_string;
context->storage.set(job_key, job->json_description);
context->storage.set(profile_key, workload->profiles->at(job->profile)->json_description);
// Let's now continue the simulation
JobSubmittedMessage * msg = new JobSubmittedMessage; JobSubmittedMessage * msg = new JobSubmittedMessage;
msg->job_id = job->id; msg->job_id.workload_name = args->workload_name;
msg->job_id.job_number = job->number;
send_message("server", IPMessageType::JOB_SUBMITTED, (void*)msg); send_message("server", IPMessageType::JOB_SUBMITTED, (void*)msg);
previousSubmissionDate = MSG_get_clock(); previousSubmissionDate = MSG_get_clock();
......
...@@ -6,9 +6,9 @@ ...@@ -6,9 +6,9 @@
#pragma once #pragma once
/** /**
* @brief The process in charge of submitting jobs * @brief The process in charge of submitting static jobs (those described before running the simulations)
* @param argc The number of arguments * @param argc The number of arguments
* @param argv The argument values * @param argv The argument values
* @return 0 * @return 0
*/ */
int job_submitter_process(int argc, char *argv[]); int static_job_submitter_process(int argc, char *argv[]);
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#include <simgrid/msg.h> #include <simgrid/msg.h>
#include <rapidjson/document.h> #include <rapidjson/document.h>
#include <rapidjson/writer.h>
#include <rapidjson/stringbuffer.h>
#include "profiles.hpp" #include "profiles.hpp"
...@@ -42,6 +44,11 @@ void Jobs::setProfiles(Profiles *profiles) ...@@ -42,6 +44,11 @@ void Jobs::setProfiles(Profiles *profiles)
_profiles = profiles; _profiles = profiles;
} }
void Jobs::setWorkload(Workload *workload)
{
_workload = workload;
}
void Jobs::load_from_json(const Document &doc, const string &filename) void Jobs::load_from_json(const Document &doc, const string &filename)
{ {
(void) filename; // Avoids a warning if assertions are ignored (void) filename; // Avoids a warning if assertions are ignored
...@@ -53,6 +60,7 @@ void Jobs::load_from_json(const Document &doc, const string &filename) ...@@ -53,6 +60,7 @@ void Jobs::load_from_json(const Document &doc, const string &filename)
for (SizeType i = 0; i < jobs.Size(); i++) // Uses SizeType instead of size_t for (SizeType i = 0; i < jobs.Size(); i++) // Uses SizeType instead of size_t
{ {
Job * j = new Job; Job * j = new Job;
j->workload = _workload;
j->starting_time = -1; j->starting_time = -1;
j->runtime = -1; j->runtime = -1;
j->state = JobState::JOB_STATE_NOT_SUBMITTED; j->state = JobState::JOB_STATE_NOT_SUBMITTED;
...@@ -63,26 +71,32 @@ void Jobs::load_from_json(const Document &doc, const string &filename) ...@@ -63,26 +71,32 @@ void Jobs::load_from_json(const Document &doc, const string &filename)
xbt_assert(job.HasMember("id"), "Invalid JSON file '%s': one job has no 'id' field", filename.c_str()); xbt_assert(job.HasMember("id"), "Invalid JSON file '%s': one job has no 'id' field", filename.c_str());
xbt_assert(job["id"].IsInt(), "Invalid JSON file '%s': one job has a non-integral 'id' field ('%s')", filename.c_str(), job["id"].GetString()); xbt_assert(job["id"].IsInt(), "Invalid JSON file '%s': one job has a non-integral 'id' field ('%s')", filename.c_str(), job["id"].GetString());
j->id = job["id"].GetInt(); j->number = job["id"].GetInt();
xbt_assert(job.HasMember("subtime"), "Invalid JSON file '%s': job %d has no 'subtime' field", filename.c_str(), j->id); xbt_assert(job.HasMember("subtime"), "Invalid JSON file '%s': job %d has no 'subtime' field", filename.c_str(), j->number);
xbt_assert(job["subtime"].IsNumber(), "Invalid JSON file '%s': job %d has a non-number 'subtime' field", filename.c_str(), j->id); xbt_assert(job["subtime"].IsNumber(), "Invalid JSON file '%s': job %d has a non-number 'subtime' field", filename.c_str(), j->number);
j->submission_time = job["subtime"].GetDouble(); j->submission_time = job["subtime"].GetDouble();
xbt_assert(job.HasMember("walltime"), "Invalid JSON file '%s': job %d has no 'walltime' field", filename.c_str(), j->id); xbt_assert(job.HasMember("walltime"), "Invalid JSON file '%s': job %d has no 'walltime' field", filename.c_str(), j->number);
xbt_assert(job["walltime"].IsNumber(), "Invalid JSON file '%s': job %d has a non-number 'walltime' field", filename.c_str(), j->id); xbt_assert(job["walltime"].IsNumber(), "Invalid JSON file '%s': job %d has a non-number 'walltime' field", filename.c_str(), j->number);
j->walltime = job["walltime"].GetDouble(); j->walltime = job["walltime"].GetDouble();
xbt_assert(job.HasMember("res"), "Invalid JSON file '%s': job %d has no 'res' field", filename.c_str(), j->id); xbt_assert(job.HasMember("res"), "Invalid JSON file '%s': job %d has no 'res' field", filename.c_str(), j->number);
xbt_assert(job["res"].IsInt(), "Invalid JSON file '%s': job %d has a non-number 'res' field", filename.c_str(), j->id); xbt_assert(job["res"].IsInt(), "Invalid JSON file '%s': job %d has a non-number 'res' field", filename.c_str(), j->number);
j->required_nb_res = job["res"].GetInt(); j->required_nb_res = job["res"].GetInt();
xbt_assert(job.HasMember("profile"), "Invalid JSON file '%s': job %d has no 'profile' field", filename.c_str(), j->id); xbt_assert(job.HasMember("profile"), "Invalid JSON file '%s': job %d has no 'profile' field", filename.c_str(), j->number);
xbt_assert(job["profile"].IsString(), "Invalid JSON file '%s': job %d has a non-string 'profile' field", filename.c_str(), j->id); xbt_assert(job["profile"].IsString(), "Invalid JSON file '%s': job %d has a non-string 'profile' field", filename.c_str(), j->number);
j->profile = job["profile"].GetString(); j->profile = job["profile"].GetString();
xbt_assert(!exists(j->id), "Invalid JSON file '%s': duplication of job id %d", filename.c_str(), j->id); // Let's get the JSON string which describes the job
_jobs[j->id] = j; rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
job.Accept(writer);