Commit e8d4f5c2 authored by Millian Poquet's avatar Millian Poquet

Towards dynamic jobs: memory management.

Dynamic jobs are now added into Batsim data structures when the server
received a JOB_SUBMITTED message if needed.

To do so, methods have been added in the data structures (Jobs, Profiles,
Workloads...).

Furthermore, the JSON parsing is a bit cleaner now to avoid code redundancy.
parent b0a3b6a9
......@@ -164,6 +164,13 @@ IPMessage::~IPMessage()
data = nullptr;
}
JobIdentifier::JobIdentifier(const string &workload_name, int job_number) :
workload_name(workload_name),
job_number(job_number)
{
}
string JobIdentifier::to_string() const
{
return workload_name + '!' + std::to_string(job_number);
......
......@@ -19,6 +19,13 @@ struct BatsimContext;
*/
struct JobIdentifier
{
/**
* @brief Creates a JobIdentifier
* @param[in] workload_name The workload name
* @param[in] job_number The job number
*/
JobIdentifier(const std::string & workload_name = "", int job_number = -1);
std::string workload_name; //!< The name of the workload the job belongs to
int job_number; //!< The job unique number inside its workload
......
......@@ -78,9 +78,9 @@ int static_job_submitter_process(int argc, char *argv[])
MSG_process_sleep(job->submission_time - previousSubmissionDate);
// Let's put the metadata about the job into the data storage
string job_id_string = args->workload_name + "!" + to_string(job->number);
string job_key = "job_" + job_id_string;
string profile_key = "profile_" + job_id_string;
JobIdentifier job_id(workload->name, job->number);
string job_key = RedisStorage::job_key(job_id);
string profile_key = RedisStorage::profile_key(workload->name, job->profile);
context->storage.set(job_key, job->json_description);
context->storage.set(profile_key, workload->profiles->at(job->profile)->json_description);
......
......@@ -59,74 +59,51 @@ void Jobs::load_from_json(const Document &doc, const string &filename)
for (SizeType i = 0; i < jobs.Size(); i++) // Uses SizeType instead of size_t
{
Job * j = new Job;
j->workload = _workload;
j->starting_time = -1;
j->runtime = -1;
j->state = JobState::JOB_STATE_NOT_SUBMITTED;
j->consumed_energy = -1;
const Value & job = jobs[i];
xbt_assert(job.IsObject(), "Invalid JSON file '%s': one job is not an object", filename.c_str());
xbt_assert(job.HasMember("id"), "Invalid JSON file '%s': one job has no 'id' field", filename.c_str());
xbt_assert(job["id"].IsInt(), "Invalid JSON file '%s': one job has a non-integral 'id' field ('%s')", filename.c_str(), job["id"].GetString());
j->number = job["id"].GetInt();
xbt_assert(job.HasMember("subtime"), "Invalid JSON file '%s': job %d has no 'subtime' field", filename.c_str(), j->number);
xbt_assert(job["subtime"].IsNumber(), "Invalid JSON file '%s': job %d has a non-number 'subtime' field", filename.c_str(), j->number);
j->submission_time = job["subtime"].GetDouble();
xbt_assert(job.HasMember("walltime"), "Invalid JSON file '%s': job %d has no 'walltime' field", filename.c_str(), j->number);
xbt_assert(job["walltime"].IsNumber(), "Invalid JSON file '%s': job %d has a non-number 'walltime' field", filename.c_str(), j->number);
j->walltime = job["walltime"].GetDouble();
xbt_assert(job.HasMember("res"), "Invalid JSON file '%s': job %d has no 'res' field", filename.c_str(), j->number);
xbt_assert(job["res"].IsInt(), "Invalid JSON file '%s': job %d has a non-number 'res' field", filename.c_str(), j->number);
j->required_nb_res = job["res"].GetInt();
xbt_assert(job.HasMember("profile"), "Invalid JSON file '%s': job %d has no 'profile' field", filename.c_str(), j->number);
xbt_assert(job["profile"].IsString(), "Invalid JSON file '%s': job %d has a non-string 'profile' field", filename.c_str(), j->number);
j->profile = job["profile"].GetString();
// Let's get the JSON string which describes the job
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
job.Accept(writer);
j->json_description = buffer.GetString();
const Value & job_json_description = jobs[i];
Job * j = Job::from_json(job_json_description, _workload);
xbt_assert(!exists(j->number), "Invalid JSON file '%s': duplication of job id %d", filename.c_str(), j->number);
_jobs[j->number] = j;
}
}
Job *Jobs::operator[](int job_id)
Job *Jobs::operator[](int job_number)
{
auto it = _jobs.find(job_id);
xbt_assert(it != _jobs.end(), "Cannot get job %d: it does not exist", job_id);
auto it = _jobs.find(job_number);
xbt_assert(it != _jobs.end(), "Cannot get job %d: it does not exist", job_number);
return it->second;
}
const Job *Jobs::operator[](int job_id) const
const Job *Jobs::operator[](int job_number) const
{
auto it = _jobs.find(job_id);
xbt_assert(it != _jobs.end(), "Cannot get job %d: it does not exist", job_id);
auto it = _jobs.find(job_number);
xbt_assert(it != _jobs.end(), "Cannot get job %d: it does not exist", job_number);
return it->second;
}
Job *Jobs::at(int job_id)
Job *Jobs::at(int job_number)
{
return operator[](job_number);
}
const Job *Jobs::at(int job_number) const
{
return operator[](job_id);
return operator[](job_number);
}
const Job *Jobs::at(int job_id) const
void Jobs::add_job(Job *job)
{
return operator[](job_id);
xbt_assert(!exists(job->number),
"Bad Jobs::add_job call: A job with number=%d already exists.",
job->number);
_jobs[job->number] = job;
}
bool Jobs::exists(int job_id) const
bool Jobs::exists(int job_number) const
{
auto it = _jobs.find(job_id);
auto it = _jobs.find(job_number);
return it != _jobs.end();
}
......@@ -170,3 +147,51 @@ bool job_comparator_subtime(const Job *a, const Job *b)
{
return a->submission_time < b->submission_time;
}
Job * Job::from_json(const rapidjson::Value & json_desc, Workload * workload)
{
Job * j = new Job;
j->workload = workload;
j->starting_time = -1;
j->runtime = -1;
j->state = JobState::JOB_STATE_NOT_SUBMITTED;
j->consumed_energy = -1;
xbt_assert(json_desc.IsObject(), "Invalid JSON: one job is not an object");
xbt_assert(json_desc.HasMember("id"), "Invalid JSON: one job has no 'id' field");
xbt_assert(json_desc["id"].IsInt(), "Invalid JSON: one job has a non-integral 'id' field ('%s')", json_desc["id"].GetString());
j->number = json_desc["id"].GetInt();
xbt_assert(json_desc.HasMember("subtime"), "Invalid JSON: job %d has no 'subtime' field", j->number);
xbt_assert(json_desc["subtime"].IsNumber(), "Invalid JSON: job %d has a non-number 'subtime' field", j->number);
j->submission_time = json_desc["subtime"].GetDouble();
xbt_assert(json_desc.HasMember("walltime"), "Invalid JSON: job %d has no 'walltime' field", j->number);
xbt_assert(json_desc["walltime"].IsNumber(), "Invalid JSON: job %d has a non-number 'walltime' field", j->number);
j->walltime = json_desc["walltime"].GetDouble();
xbt_assert(json_desc.HasMember("res"), "Invalid JSON: job %d has no 'res' field", j->number);
xbt_assert(json_desc["res"].IsInt(), "Invalid JSON: job %d has a non-number 'res' field", j->number);
j->required_nb_res = json_desc["res"].GetInt();
xbt_assert(json_desc.HasMember("profile"), "Invalid JSON: job %d has no 'profile' field", j->number);
xbt_assert(json_desc["profile"].IsString(), "Invalid JSON: job %d has a non-string 'profile' field", j->number);
j->profile = json_desc["profile"].GetString();
// Let's get the JSON string which describes the job (to conserve potential fields unused by Batsim)
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
json_desc.Accept(writer);
j->json_description = buffer.GetString();
return j;
}
Job * Job::from_json(const std::string & json_str, Workload *workload)
{
Document doc;
doc.Parse(json_str.c_str());
return Job::from_json(doc, workload);
}
......@@ -48,6 +48,26 @@ struct Job
double runtime; //!< The amount of time during which the job has been executed
MachineRange allocation; //!< The machines on which the job has been executed.
JobState state; //!< The current state of the job
/**
* @brief Creates a new-allocated Job from a JSON description
* @param[in] json_desc The JSON description of the job
* @param[in] workload The Workload the job is in
* @return The newly allocated Job
* @pre The JSON description of the job is valid
*/
static Job * from_json(const rapidjson::Value & json_desc,
Workload * workload);
/**
* @brief Creates a new-allocated Job from a JSON description
* @param[in] json_str The JSON description of the job (as a string)
* @param[in] workload The Workload the job is in
* @return The newly allocated Job
* @pre The JSON description of the job is valid
*/
static Job * from_json(const std::string & json_str,
Workload * workload);
};
/**
......@@ -70,6 +90,7 @@ public:
Jobs();
/**
* @brief Destroys a Jobs
* @details All Job instances will be deleted
*/
~Jobs();
......@@ -94,38 +115,45 @@ public:
/**
* @brief Accesses one job thanks to its unique number
* @param[in] job_id The job unique number
* @param[in] job_number The job unique number
* @return A pointer to the job associated to the given job number
*/
Job * operator[](int job_id);
Job * operator[](int job_number);
/**
* @brief Accesses one job thanks to its unique number (const version)
* @param[in] job_id The job unique number
* @param[in] job_number The job unique number
* @return A (const) pointer to the job associated to the given job number
*/
const Job * operator[](int job_id) const;
const Job * operator[](int job_number) const;
/**
* @brief Accesses one job thanks to its unique number
* @param[in] job_id The job unique number
* @param[in] job_number The job unique number
* @return A pointer to the job associated to the given job number
*/
Job * at(int job_id);
Job * at(int job_number);
/**
* @brief Accesses one job thanks to its unique number (const version)
* @param[in] job_id The job unique number
* @param[in] job_number The job unique number
* @return A (const) pointer to the job associated to the given job number
*/
const Job * at(int job_id) const;
const Job * at(int job_number) const;
/**
* @brief Adds a job into a Jobs instance
* @param[in] job The job to add
* @pre No job with the same number exist in the Jobs instance
*/
void add_job(Job * job);
/**
* @brief Allows to know whether a job exists
* @param[in] job_id The unique job number
* @param[in] job_number The unique job number
* @return True if and only if a job with the given job number exists
*/
bool exists(int job_id) const;
bool exists(int job_number) const;
/**
* @brief Allows to know whether the Jobs contains any SMPI job
......
This diff is collapsed.
......@@ -36,6 +36,26 @@ struct Profile
ProfileType type; //!< The type of the profile
void * data; //!< The associated data
std::string json_description; //!< The JSON description of the profile
/**
* @brief Creates a new-allocated Profile from a JSON description
* @param[in] profile_name The name of the profile
* @param[in] json_desc The JSON description
* @return The new-allocated Profile
* @pre The JSON description is valid
*/
static Profile * from_json(const std::string & profile_name,
const rapidjson::Value & json_desc);
/**
* @brief Creates a new-allocated Profile from a JSON description
* @param[in] profile_name The name of the profile
* @param[in] json_str The JSON description (as a string)
* @return The new-allocated Profile
* @pre The JSON description is valid
*/
static Profile * from_json(const std::string & profile_name,
const std::string & json_str);
};
/**
......@@ -148,6 +168,14 @@ public:
*/
bool exists(const std::string & profile_name) const;
/**
* @brief Adds a Profile into a Profiles instance
* @param[in] profile_name The name of the profile to name
* @param[in] profile The profile to add
* @pre No profile with the same name exists in the Profiles instance
*/
void add_profile(const std::string & profile_name, Profile * profile);
/**
* @brief Returns a copy of the internal std::map used in the Profiles
* @return A copy of the internal std::map used in the Profiles
......
......@@ -144,10 +144,12 @@ int uds_server_process(int argc, char *argv[])
origin_of_jobs[message->job_id] = submitter;
}
nb_submitted_jobs++;
Job * job = context->workloads.job_at(message->job_id);
job->state = JobState::JOB_STATE_SUBMITTED;
// Let's retrieve the Job from memory (or add it into memory if it is dynamic)
Job * job = context->workloads.add_job_if_not_exists(message->job_id, context);
// Update control information
job->state = JobState::JOB_STATE_SUBMITTED;
nb_submitted_jobs++;
XBT_INFO( "Job %d SUBMITTED. %d jobs submitted so far", job->number, nb_submitted_jobs);
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":S:" + message->job_id.to_string();
......
......@@ -7,6 +7,8 @@
#include <simgrid.h>
using namespace std;
RedisStorage::RedisStorage()
{
//TODO: wrap redox logging into simgrid?
......@@ -74,6 +76,21 @@ std::string RedisStorage::key_subparts_separator() const
return _key_subparts_separator;
}
std::string RedisStorage::job_key(const JobIdentifier &job_id)
{
std::string key = "job_" + job_id.workload_name + '!' + to_string(job_id.job_number);
return key;
}
std::string RedisStorage::profile_key(const std::string &workload_name,
const std::string &profile_name)
{
std::string key = "profile_" + workload_name + '!' + profile_name;
return key;
}
std::string RedisStorage::build_key(const std::string & user_given_key) const
{
return _instance_key_prefix + _key_subparts_separator + user_given_key;
......
......@@ -9,6 +9,8 @@
#include <redox.hpp>
#include "ipp.hpp"
/**
* @brief Wrapper class around Redox, a Redis client library.
* @details This class provides blocking methods that communicate
......@@ -83,6 +85,22 @@ public:
*/
std::string key_subparts_separator() const;
/**
* @brief Returns the key in the data storage corresponding to a JobIdentifier
* @param[in] job_id The JobIdentifier
* @return The key in the data storage corresponding to a JobIdentifier
*/
static std::string job_key(const JobIdentifier & job_id);
/**
* @brief Returns the key in the data storage corresponding to a profile
* @param[in] workload_name The workload name of the profile
* @param[in] profile_name The profile name
* @return The key in the data storage corresponding to a profile
*/
static std::string profile_key(const std::string & workload_name,
const std::string & profile_name);
private:
/**
* @brief Build a final key from a user-given key.
......
......@@ -215,15 +215,80 @@ bool Workloads::job_exists(const std::string &workload_name, const int job_numbe
if (!exists(workload_name))
return false;
return at(workload_name)->jobs->exists(job_number);
if (!at(workload_name)->jobs->exists(job_number))
return false;
const Job * job = at(workload_name)->jobs->at(job_number);
if (!at(workload_name)->profiles->exists(job->profile))
return false;
return true;
}
bool Workloads::job_exists(const JobIdentifier &job_id)
{
return job_exists(job_id.workload_name, job_id.job_number);
}
Job *Workloads::add_job_if_not_exists(const JobIdentifier &job_id, BatsimContext *context)
{
xbt_assert(this == &context->workloads,
"Bad Workloads::add_job_if_not_exists call: The given context "
"does not match the Workloads instance (this=%p, &context->workloads=%p",
this, &context->workloads);
// If the job already exists, let's just return it
if (job_exists(job_id))
return job_at(job_id);
// Let's create a Workload if needed
Workload * workload = nullptr;
if (!exists(job_id.workload_name))
return false;
{
workload = new Workload;
workload->name = job_id.workload_name;
insert_workload(job_id.workload_name, workload);
}
else
workload = at(job_id.workload_name);
// Let's retrieve the job information from the data storage
string job_key = RedisStorage::job_key(job_id);
string job_json_description = context->storage.get(job_key);
// Let's create a Job if needed
Job * job = nullptr;
if (!workload->jobs->exists(job_id.job_number))
{
job = Job::from_json(job_json_description, workload);
xbt_assert(job_id.job_number == job->number,
"Cannot add dynamic job %s!%d: JSON job number mismatch (%d)",
job_id.workload_name.c_str(), job_id.job_number, job->number);
// Let's insert the Job in the data structure
workload->jobs->add_job(job);
}
else
job = workload->jobs->at(job_id.job_number);
// Let's retrieve the profile information from the data storage
string profile_key = RedisStorage::profile_key(workload->name, job->profile);
string profile_json_description = context->storage.get(profile_key);
// Let's create a Profile if needed
Profile * profile = nullptr;
if (!workload->profiles->exists(job->profile))
{
profile = Profile::from_json(job->profile, profile_json_description);
// Let's insert the Profile in the data structure
workload->profiles->add_profile(job->profile, profile);
}
else
profile = workload->profiles->at(job->profile);
return at(job_id.workload_name)->jobs->exists(job_id.job_number);
// TODO: check job & profile consistency (nb_res, etc.)
return job;
}
std::map<std::string, Workload *> &Workloads::workloads()
......
......@@ -12,6 +12,7 @@ class Jobs;
struct Job;
class Profiles;
struct JobIdentifier;
struct BatsimContext;
/**
* @brief A workload is simply some Jobs with their associated Profiles
......@@ -152,6 +153,16 @@ public:
*/
bool job_exists(const JobIdentifier & job_id);
/**
* @brief Adds a job into memory if needed
* @details If the jobs already exists, this method does nothing. Otherwise, the job information is loaded from the remote data storage, one Job and one Profile are created and one Workload is created if needed.
* @param[in] job_id The job identifier
* @param[in,out] context The Batsim Context
* @return The Job corresponding to job_id.
*/
Job * add_job_if_not_exists(const JobIdentifier & job_id,
BatsimContext * context);
/**
* @brief Inserts a new Workload into a Workloads
* @param[in] workload_name The name of the new Workload to insert
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment