Commit 4567c6dd authored by Millian Poquet's avatar Millian Poquet

Machine: ranges are now used instead of sets.

This modification impacts all parts of Batsim in which machines were involved.
The protocol has also been updated to allow the scheduler to send machine ranges
in its allocations. The previous syntax is still supported. Now, each comma-separated
value can either be 1 machineID or a range of machineIDs (syntax: machineIDa-machineIDb
to represent the closed interval [machineIDa, machineIDb]).

Please note that Batsim has been only been tested with old schedulers (that do not
support the new protocol syntax).
parent 64497dd2
......@@ -24,7 +24,7 @@ include_directories(${RAPIDJSON_INCLUDE_DIRS})
# Executables
add_executable(batsim batsim.cpp export.cpp ipp.cpp job_submitter.cpp jobs.cpp jobs_execution.cpp machines.cpp network.cpp profiles.cpp pstate.cpp server.cpp workload.cpp context.hpp export.hpp ipp.hpp job_submitter.hpp jobs.hpp jobs_execution.hpp machines.hpp network.hpp profiles.hpp pstate.hpp server.hpp workload.hpp)
add_executable(batsim batsim.cpp export.cpp ipp.cpp job_submitter.cpp jobs.cpp jobs_execution.cpp machine_range.cpp machines.cpp network.cpp profiles.cpp pstate.cpp server.cpp workload.cpp context.hpp export.hpp ipp.hpp job_submitter.hpp jobs.hpp jobs_execution.hpp machine_range.hpp machines.hpp network.hpp profiles.hpp pstate.hpp server.hpp workload.hpp)
#add_executable(batexec batexec.c job.c utils.c export.c)
# Libraries to link
......
......@@ -39,7 +39,7 @@ Each MSG_CONTENT follows this syntax:
|---------------- |-------|-------------- |------------------------- |-------------
| 0+ | S | Bastim->Sched | JOB_ID | Job submission: one (static) job is available and can now be allocated tor resources.
| 0+ | C | Batsim->Sched | JOB_ID | Job completion: one (static) job finished its execution.
| 0+ | J | Sched->Batsim | JID1=MID1,MID2,MIDn[;...]| Job allocation: tells to put job JID1 on machines MID1, ..., MIDn. Many jobs might be allocated in the same event. | |
| 0+ | J | Sched->Batsim | JID1=MID1,MID2,MIDn[;...]| Job allocation: tells to put job JID1 on machines MID1, ..., MIDn. Many jobs might be allocated in the same event. Each MIDk part can be a single machine ID or a closed interval MIDa-MIDb where MIDa <= MIDb
| 0+ | N | Both | No content | NOP: tells to do nothing / nothing happened.
| 1+ | P | Sched->Batsim | MACHINE_ID=PSTATE | Ask to change the pstate of a machine.
| 1+ | p | Batsim->Sched | MACHINE_ID=PSTATE | Tells the scheduler the pstate of a machine has changed.
......
......@@ -394,7 +394,7 @@ void PajeTracer::addJobEnding(int jobID, const vector<int> & usedMachineIDs, dou
}
}
void PajeTracer::addJobKill(int jobID, const vector<int> & usedMachineIDs, double time, bool associateKillToMachines)
void PajeTracer::addJobKill(int jobID, const MachineRange & usedMachineIDs, double time, bool associateKillToMachines)
{
xbt_assert(state == INITIALIZED, "Bad addJobKill call: the PajeTracer object is not initialized or had been finalized");
......@@ -410,11 +410,12 @@ void PajeTracer::addJobKill(int jobID, const vector<int> & usedMachineIDs, doubl
if (associateKillToMachines)
{
// Let's add a kill event associated with each machine
for (const int & machineID : usedMachineIDs)
for (auto it = usedMachineIDs.elements_begin(); it != usedMachineIDs.elements_end(); ++it)
{
int machine_id = *it;
snprintf(buf, bufSize,
"%d %lf %s %s%d \"%d\"\n",
NEW_EVENT, time, killEventMachine, machinePrefix, machineID, jobID);
NEW_EVENT, time, killEventMachine, machinePrefix, machine_id, jobID);
_wbuf->appendText(buf);
}
}
......@@ -547,16 +548,7 @@ void exportJobsToCSV(const string &filename, BatsimContext *context)
xbt_assert((int)job->allocation.size() == job->required_nb_res);
vector<string> machine_id_strings;
for (const int & machine_id : job->allocation)
machine_id_strings.push_back(to_string(machine_id));
f << boost::algorithm::join(machine_id_strings, " ");
// todo: use union of intervals instead
f << "\n";
f << job->allocation.to_string_hyphen() << "\n";
}
}
......
......@@ -135,7 +135,7 @@ public:
* @param time The simulation time at which the kill is done
* @param associateKillToMachines By default (false), one event is added in the killer container. If set to true, one event is added for every machine on which the kill occurs.
*/
void addJobKill(int jobID, const std::vector<int> & usedMachineIDs, double time, bool associateKillToMachines = false);
void addJobKill(int jobID, const MachineRange & usedMachineIDs, double time, bool associateKillToMachines = false);
/**
* @brief Adds a global utilization value of the system.
......
......@@ -101,6 +101,7 @@ IPMessage::~IPMessage()
case IPMessageType::SCHED_ALLOCATION:
{
SchedulingAllocationMessage * msg = (SchedulingAllocationMessage *) data;
// The Allocations themselves are not memory-deallocated there but at the end of the job execution.
delete msg;
} break;
case IPMessageType::SCHED_REJECTION:
......
......@@ -9,6 +9,8 @@
#include <simgrid/msg.h>
#include "machine_range.hpp"
struct BatsimContext;
enum class IPMessageType
......@@ -47,13 +49,13 @@ struct JobRejectedMessage
struct SchedulingAllocation
{
int job_id;
std::vector<int> machine_ids; //! The IDs of the machines on which the job should be allocated
MachineRange machine_ids; //! The IDs of the machines on which the job should be allocated
std::vector<msg_host_t> hosts; //! The corresponding SimGrid hosts
};
struct SchedulingAllocationMessage
{
std::vector<SchedulingAllocation> allocations; //! Possibly several allocations
std::vector<SchedulingAllocation *> allocations; //! Possibly several allocations
};
struct PStateModificationMessage
......@@ -88,7 +90,7 @@ struct ServerProcessArguments
struct ExecuteJobProcessArguments
{
BatsimContext * context;
SchedulingAllocation allocation;
SchedulingAllocation * allocation;
};
struct KillerProcessArguments
......
......@@ -9,6 +9,8 @@
#include <rapidjson/document.h>
#include "machine_range.hpp"
class Profiles;
enum class JobState
......@@ -33,7 +35,7 @@ struct Job
double starting_time;
double runtime;
std::vector<int> allocation;
MachineRange allocation;
JobState state;
};
......
......@@ -232,9 +232,9 @@ int execute_job_process(int argc, char *argv[])
// Retrieving input parameters
ExecuteJobProcessArguments * args = (ExecuteJobProcessArguments *) MSG_process_get_data(MSG_process_self());
Job * job = args->context->jobs[args->allocation.job_id];
Job * job = args->context->jobs[args->allocation->job_id];
job->starting_time = MSG_get_clock();
job->allocation = args->allocation.machine_ids;
job->allocation = args->allocation->machine_ids;
double remaining_time = job->walltime;
// If energy is enabled, let us compute the energy used by the machines before running the job
......@@ -242,16 +242,17 @@ int execute_job_process(int argc, char *argv[])
{
job->consumed_energy = 0;
for(const int & machine_id : job->allocation)
for (auto it = job->allocation.elements_begin(); it != job->allocation.elements_end(); ++it)
{
int machine_id = *it;
Machine * machine = args->context->machines[machine_id];
job->consumed_energy += sg_host_get_consumed_energy(machine->host);
}
}
// Job computation
args->context->machines.updateMachinesOnJobRun(job->id, args->allocation.machine_ids);
if (execute_profile(args->context, job->profile, &args->allocation, &remaining_time) == 1)
args->context->machines.updateMachinesOnJobRun(job->id, args->allocation->machine_ids);
if (execute_profile(args->context, job->profile, args->allocation, &remaining_time) == 1)
{
XBT_INFO("Job %d finished in time", job->id);
job->state = JobState::JOB_STATE_COMPLETED_SUCCESSFULLY;
......@@ -261,10 +262,10 @@ int execute_job_process(int argc, char *argv[])
XBT_INFO("Job %d had been killed (walltime %lf reached", job->id, job->walltime);
job->state = JobState::JOB_STATE_COMPLETED_KILLED;
if (args->context->trace_schedule)
args->context->paje_tracer.addJobKill(job->id, args->allocation.machine_ids, MSG_get_clock(), true);
args->context->paje_tracer.addJobKill(job->id, args->allocation->machine_ids, MSG_get_clock(), true);
}
args->context->machines.updateMachinesOnJobEnd(job->id, args->allocation.machine_ids);
args->context->machines.updateMachinesOnJobEnd(job->id, args->allocation->machine_ids);
job->runtime = MSG_get_clock() - job->starting_time;
// If energy is enabled, let us compute the energy used by the machines after running the job
......@@ -273,8 +274,10 @@ int execute_job_process(int argc, char *argv[])
long double consumed_energy_before = job->consumed_energy;
job->consumed_energy = 0;
for(const int & machine_id : job->allocation)
for (auto it = job->allocation.elements_begin(); it != job->allocation.elements_end(); ++it)
{
int machine_id = *it;
Machine * machine = args->context->machines[machine_id];
job->consumed_energy += sg_host_get_consumed_energy(machine->host);
}
......@@ -288,6 +291,7 @@ int execute_job_process(int argc, char *argv[])
message->job_id = job->id;
send_message("server", IPMessageType::JOB_COMPLETED, (void*)message);
delete args->allocation;
delete args;
return 0;
......
#include "machine_range.hpp"
#include <vector>
#include <boost/algorithm/string/join.hpp>
#include <simgrid/msg.h>
XBT_LOG_NEW_DEFAULT_CATEGORY(machine_range, "machine_range");
using namespace std;
MachineRange::Set::element_iterator MachineRange::elements_begin()
{
return boost::icl::elements_begin(set);
}
MachineRange::Set::element_const_iterator MachineRange::elements_begin() const
{
return boost::icl::elements_begin(set);
}
MachineRange::Set::element_iterator MachineRange::elements_end()
{
return boost::icl::elements_end(set);
}
MachineRange::Set::element_const_iterator MachineRange::elements_end() const
{
return boost::icl::elements_end(set);
}
MachineRange::Set::iterator MachineRange::intervals_begin()
{
return set.begin();
}
MachineRange::Set::const_iterator MachineRange::intervals_begin() const
{
return set.begin();
}
MachineRange::Set::iterator MachineRange::intervals_end()
{
return set.end();
}
MachineRange::Set::const_iterator MachineRange::intervals_end() const
{
return set.end();
}
void MachineRange::clear()
{
set.clear();
}
void MachineRange::insert(MachineRange::Interval interval)
{
set.insert(interval);
}
void MachineRange::insert(int value)
{
set.insert(value);
}
int MachineRange::first_element() const
{
xbt_assert(size() > 0);
return *elements_begin();
}
unsigned int MachineRange::size() const
{
return set.size();
}
std::string MachineRange::to_string_brackets(const std::string & union_str,
const std::string & opening_bracket,
const std::string & closing_bracket,
const std::string & sep)
{
vector<string> machine_id_strings;
for (auto it = intervals_begin(); it != intervals_begin(); ++it)
machine_id_strings.push_back(opening_bracket + to_string(it->lower()) + sep + to_string(it->upper()) + closing_bracket);
return boost::algorithm::join(machine_id_strings, union_str);
}
std::string MachineRange::to_string_hyphen(const std::string &sep, const std::string &joiner)
{
vector<string> machine_id_strings;
for (auto it = intervals_begin(); it != intervals_begin(); ++it)
machine_id_strings.push_back(to_string(it->lower()) + joiner + to_string(it->upper()));
return boost::algorithm::join(machine_id_strings, sep);
}
#pragma once
#include <string>
#include <boost/icl/interval_set.hpp>
#include <boost/icl/closed_interval.hpp>
struct MachineRange
{
public:
typedef boost::icl::interval_set<int> Set;
typedef Set::interval_type Interval;
public:
Set set;
public:
Set::element_iterator elements_begin();
Set::element_const_iterator elements_begin() const;
Set::element_iterator elements_end();
Set::element_const_iterator elements_end() const;
Set::iterator intervals_begin();
Set::const_iterator intervals_begin() const;
Set::iterator intervals_end();
Set::const_iterator intervals_end() const;
void clear();
void insert(Interval interval);
void insert(int value);
int first_element() const;
unsigned int size() const;
std::string to_string_brackets(const std::string & union_str = "∪", const std::string & opening_bracket = "[", const std::string & closing_bracket = "]", const std::string & sep = ",");
std::string to_string_hyphen(const std::string & sep = ",", const std::string & joiner = "-");
};
......@@ -249,11 +249,12 @@ long double Machines::total_consumed_energy(BatsimContext *context) const
return total_consumed_energy;
}
void Machines::updateMachinesOnJobRun(int jobID, const std::vector<int> & usedMachines)
void Machines::updateMachinesOnJobRun(int jobID, const MachineRange & usedMachines)
{
for (int machineID : usedMachines)
for (auto it = usedMachines.elements_begin(); it != usedMachines.elements_end(); ++it)
{
Machine * machine = _machines[machineID];
int machine_id = *it;
Machine * machine = _machines[machine_id];
machine->state = MachineState::COMPUTING;
int previous_top_job = -1;
......@@ -270,11 +271,12 @@ void Machines::updateMachinesOnJobRun(int jobID, const std::vector<int> & usedMa
}
}
void Machines::updateMachinesOnJobEnd(int jobID, const std::vector<int> & usedMachines)
void Machines::updateMachinesOnJobEnd(int jobID, const MachineRange & usedMachines)
{
for (int machineID : usedMachines)
for (auto it = usedMachines.elements_begin(); it != usedMachines.elements_end(); ++it)
{
Machine * machine = _machines[machineID];
int machine_id = *it;
Machine * machine = _machines[machine_id];
xbt_assert(!machine->jobs_being_computed.empty());
int previous_top_job = *machine->jobs_being_computed.begin();
......
......@@ -8,6 +8,7 @@
#include <simgrid/msg.h>
#include "pstate.hpp"
#include "machine_range.hpp"
class PajeTracer;
struct BatsimContext;
......@@ -45,8 +46,8 @@ public:
Machines();
~Machines();
void createMachines(xbt_dynar_t hosts, BatsimContext * context, const std::string & masterHostName);
void updateMachinesOnJobRun(int jobID, const std::vector<int> & usedMachines);
void updateMachinesOnJobEnd(int jobID, const std::vector<int> & usedMachines);
void updateMachinesOnJobRun(int jobID, const MachineRange & usedMachines);
void updateMachinesOnJobEnd(int jobID, const MachineRange & usedMachines);
void setTracer(PajeTracer * tracer);
......
......@@ -228,53 +228,78 @@ int request_reply_scheduler_process(int argc, char *argv[])
for (const std::string & allocation_string : allocations)
{
// Each allocation is written in the form of jobID=machineID1,machineID2,...,machineIDn
// Each machineIDk can either be a single machine or a closed interval machineIDa-machineIDb
vector<string> allocation_parts;
boost::split(allocation_parts, allocation_string, boost::is_any_of("="), boost::token_compress_on);
xbt_assert(allocation_parts.size() == 2, "Invalid static job allocation received ('%s'): it must be composed of two parts separated by a '='",
allocation_string.c_str());
SchedulingAllocation alloc;
alloc.job_id = std::stoi(allocation_parts[0]);
xbt_assert(context->jobs.exists(alloc.job_id), "Invalid static job allocation received ('%s'): the job %d does not exist",
allocation_string.c_str(), alloc.job_id);
Job * job = context->jobs[alloc.job_id];
SchedulingAllocation * alloc = new SchedulingAllocation;
alloc->job_id = std::stoi(allocation_parts[0]);
xbt_assert(context->jobs.exists(alloc->job_id), "Invalid static job allocation received ('%s'): the job %d does not exist",
allocation_string.c_str(), alloc->job_id);
Job * job = context->jobs[alloc->job_id];
xbt_assert(job->state == JobState::JOB_STATE_SUBMITTED,
"Invalid static job allocation received ('%s') : the job %d state indicates it cannot be executed now",
allocation_string.c_str(), job->id);
// In order to get the machines, let us do a final split by ','!
// In order to get the machines, let us do a split by ','!
vector<string> allocation_machines;
boost::split(allocation_machines, allocation_parts[1], boost::is_any_of(","), boost::token_compress_on);
xbt_assert((int)allocation_machines.size() == job->required_nb_res,
"Invalid static job allocation received ('%s'): the job %d size is %d but %lu machines were allocated",
allocation_string.c_str(), job->id, job->required_nb_res, allocation_machines.size());
alloc.machine_ids.resize(allocation_machines.size());
alloc.hosts.resize(allocation_machines.size());
alloc->hosts.clear();
alloc->hosts.reserve(alloc->machine_ids.size());
alloc->machine_ids.clear();
for (unsigned int i = 0; i < allocation_machines.size(); ++i)
{
int machineID = std::stoi(allocation_machines[i]);
xbt_assert(context->machines.exists(machineID), "Invalid static job allocation received ('%s'): the machine %d does not exist",
allocation_string.c_str(), machineID);
alloc.machine_ids[i] = machineID;
alloc.hosts[i] = context->machines[machineID]->host;
}
// Since each machineIDk can either be a single machine or a closed interval, let's try to split on '-'
vector<string> interval_parts;
boost::split(interval_parts, allocation_machines[i], boost::is_any_of("-"), boost::token_compress_on);
xbt_assert(interval_parts.size() >= 1 && interval_parts.size() <= 2,
"Invalid static job allocation received ('%s'): the MIDk '%s' should either be a single machine ID"
" (syntax: MID to represent the machine ID MID) or a closed interval (syntax: MIDa-MIDb to represent"
" the machine interval [MIDA,MIDb])", allocation_string.c_str(), allocation_machines[i].c_str());
if (interval_parts.size() == 1)
{
int machine_id = std::stoi(interval_parts[0]);
xbt_assert(context->machines.exists(machine_id), "Invalid static job allocation received ('%s'): the machine %d does not exist",
allocation_string.c_str(), machine_id);
alloc->machine_ids.insert(machine_id);
alloc->hosts.push_back(context->machines[machine_id]->host);
xbt_assert((int)alloc->hosts.size() <= job->required_nb_res,
"Invalid static job allocation received ('%s'): the job %d size is %d but at least %lu machines were allocated",
allocation_string.c_str(), job->id, job->required_nb_res, alloc->hosts.size());
}
else
{
int machineIDa = std::stoi(interval_parts[0]);
int machineIDb = std::stoi(interval_parts[1]);
// Let us sort the allocation, to detect easily whether all machines are different or not
vector<int> sorted_machine_ids = alloc.machine_ids;
std::sort(sorted_machine_ids.begin(), sorted_machine_ids.end());
xbt_assert(machineIDa <= machineIDb, "Invalid static job allocation received ('%s'): the MIDk '%s' is composed of two"
" parts (1:%d and 2:%d) but the first value must be lesser than or equal to the second one.", allocation_string.c_str(),
allocation_machines[i].c_str(), machineIDa, machineIDb);
bool all_different = true;
for (unsigned int i = 1; i < sorted_machine_ids.size(); ++i)
{
if (sorted_machine_ids[i-1] == sorted_machine_ids[i])
{
all_different = false;
break;
for (int machine_id = machineIDa; machine_id <= machineIDb; ++machine_id)
{
xbt_assert(context->machines.exists(machine_id), "Invalid static job allocation received ('%s'): the machine %d does not exist",
allocation_string.c_str(), machine_id);
alloc->hosts.push_back(context->machines[machine_id]->host);
xbt_assert((int)alloc->hosts.size() <= job->required_nb_res,
"Invalid static job allocation received ('%s'): the job %d size is %d but at least %lu machines were allocated",
allocation_string.c_str(), job->id, job->required_nb_res, alloc->hosts.size());
}
alloc->machine_ids.insert(MachineRange::Interval::closed(machineIDa, machineIDb));
}
}
xbt_assert(all_different, "Invalid static job allocation received ('%s'): all machines are not different", allocation_string.c_str());
// Let the number of allocated machines be checked
xbt_assert((int)alloc->machine_ids.size() == job->required_nb_res,
"Invalid static job allocation received ('%s'): the job %d size is %d but %u machines were allocated (%s)",
allocation_string.c_str(), job->id, job->required_nb_res, alloc->machine_ids.size(),
alloc->machine_ids.to_string_brackets().c_str());
message->allocations.push_back(alloc);
}
......
......@@ -212,9 +212,9 @@ int uds_server_process(int argc, char *argv[])
xbt_assert(task_data->data != nullptr);
SchedulingAllocationMessage * message = (SchedulingAllocationMessage *) task_data->data;
for (const auto & allocation : message->allocations)
for (SchedulingAllocation * allocation : message->allocations)
{
Job * job = context->jobs[allocation.job_id];
Job * job = context->jobs[allocation->job_id];
job->state = JobState::JOB_STATE_RUNNING;
nb_running_jobs++;
......@@ -224,9 +224,10 @@ int uds_server_process(int argc, char *argv[])
if (!context->allow_space_sharing)
{
for (const int & machineID : allocation.machine_ids)
for (auto machine_id_it = allocation->machine_ids.elements_begin(); machine_id_it != allocation->machine_ids.elements_end(); ++machine_id_it)
{
const Machine * machine = context->machines[machineID];
int machine_id = *machine_id_it;
const Machine * machine = context->machines[machine_id];
xbt_assert(machine->jobs_being_computed.empty(),
"Invalid job allocation: machine %d ('%s') is currently computing jobs (these ones:"
" {%s}) whereas space sharing is forbidden. Space sharing can be enabled via an option,"
......@@ -238,9 +239,10 @@ int uds_server_process(int argc, char *argv[])
if (context->energy_used)
{
// Check that every machine is in a computation pstate
for (const int & machineID : allocation.machine_ids)
for (auto machine_id_it = allocation->machine_ids.elements_begin(); machine_id_it != allocation->machine_ids.elements_end(); ++machine_id_it)
{
Machine * machine = context->machines[machineID];
int machine_id = *machine_id_it;
Machine * machine = context->machines[machine_id];
int ps = MSG_host_get_pstate(machine->host);
xbt_assert(machine->has_pstate(ps));
xbt_assert(machine->pstates[ps] == PStateType::COMPUTATION_PSTATE,
......@@ -257,7 +259,7 @@ int uds_server_process(int argc, char *argv[])
exec_args->context = context;
exec_args->allocation = allocation;
string pname = "job" + to_string(job->id);
MSG_process_create(pname.c_str(), execute_job_process, (void*)exec_args, context->machines[allocation.machine_ids[0]]->host);
MSG_process_create(pname.c_str(), execute_job_process, (void*)exec_args, context->machines[allocation->machine_ids.first_element()]->host);
}
} break; // end of case SCHED_ALLOCATION
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment