Commit f083f79d authored by Millian Poquet's avatar Millian Poquet

Implementation of the energy mechanism.

If the energy is enabled (by setting the -p run option), the platform is assumed to contain energy information.
Such information is read by SimGrid itself but another ones, such as the 'sleep_pstates' property, are read by Batsim.
A lot of assertions has been put in the reading of the platform file which will help generating energy platforms.
An example of such a platform can be found in platforms/energy_platform.xml. This platform follows the same rules
that those of SimGrid energy platforms but assures pstate coherency via the 'sleep_pstates' property: it says
which pstates are sleep pstates and which pstates are its associated virtual pstates (for switching ON and OFF
each pstate). More information can be found in the file 'platforms/energy_platform.xml'

The scheduler can now send messages of the form TIMESTAMP:P:M=S where M is a machineID and S the asked new pstate
for machineID. Batsim does the stuff to simulate the time & energy consumptions of the switchON/OFF (if any, you
can also switch from one computation state to another). Once the pstate of a machine has changed, Batsim sends
it on the socket with messages of the form TIMESTAMP:p:M=S where M is a machineID and S the new pstate of the
corresponding machine (note the lowercase 'p' instead of an uppercase 'P').

To simulate the energy & time costs of the switch ON/OFF, the exact same mechanism as the one presented in
SimGrid energy 1O1 is used: computing 1 flop on virtual pstates whose computational power and power
consumption met our time and energy consumptions needs.
parent feb4031e
......@@ -24,7 +24,7 @@ include_directories(${RAPIDJSON_INCLUDE_DIRS})
# Executables
add_executable(batsim batsim.cpp export.cpp ipp.cpp job_submitter.cpp jobs.cpp jobs_execution.cpp machines.cpp network.cpp profiles.cpp workload.cpp)
add_executable(batsim batsim.cpp export.cpp ipp.cpp job_submitter.cpp jobs.cpp jobs_execution.cpp machines.cpp network.cpp profiles.cpp pstate.cpp workload.cpp context.hpp export.hpp ipp.hpp job_submitter.hpp jobs.hpp jobs_execution.hpp machines.hpp network.hpp profiles.hpp pstate.hpp workload.hpp)
#add_executable(batexec batexec.c job.c utils.c export.c)
# Libraries to link
......
......@@ -5,6 +5,7 @@
#include <unistd.h>
#include <simgrid/msg.h>
#include <simgrid/plugins.h>
#include "context.hpp"
#include "export.hpp"
......@@ -34,6 +35,8 @@ struct MainArguments
std::string masterHostName; //! The name of the SimGrid host which runs scheduler processes and not user tasks
std::string exportPrefix; //! The filename prefix used to export simulation information
bool energy_used; //! True if and only if the SimGrid energy plugin should be used.
bool abort; //! A boolean value. If set to yet, the launching should be aborted for reason abortReason
std::string abortReason; //! Human readable reasons which explains why the launch should be aborted
};
......@@ -57,6 +60,9 @@ static int parse_opt (int key, char *arg, struct argp_state *state)
case 'm':
mainArgs->masterHostName = arg;
break;
case 'p':
mainArgs->energy_used = true;
break;
case 's':
mainArgs->socketFilename = arg;
break;
......@@ -100,10 +106,12 @@ static int parse_opt (int key, char *arg, struct argp_state *state)
int main(int argc, char * argv[])
{
// TODO : boolean option to enable the energy plugin
MainArguments mainArgs;
mainArgs.socketFilename = "/tmp/bat_socket";
mainArgs.masterHostName = "master_host";
mainArgs.exportPrefix = "out";
mainArgs.energy_used = false;
mainArgs.abort = false;
struct argp_option options[] =
......@@ -111,6 +119,7 @@ int main(int argc, char * argv[])
{"socket", 's', "FILENAME", 0, "Unix Domain Socket filename", 0},
{"master-host", 'm', "NAME", 0, "The name of the host in PLATFORM_FILE which will run SimGrid scheduling processes and won't be used to compute tasks", 0},
{"export", 'e', "FILENAME_PREFIX", 0, "The export filename prefix used to generate simulation output", 0},
{"energy-plugin", 'p', 0, 0, "Enables energy-aware experiments", 0},
{0, '\0', 0, 0, 0, 0} // The options array must be NULL-terminated
};
struct argp argp = {options, parse_opt, "PLATFORM_FILE WORKLOAD_FILE", "A tool to simulate (via SimGrid) the behaviour of scheduling algorithms.", 0, 0, 0};
......@@ -122,10 +131,16 @@ int main(int argc, char * argv[])
return 1;
}
if (mainArgs.energy_used)
sg_energy_plugin_init();
// Initialization
MSG_init(&argc, argv);
BatsimContext context;
context.platform_filename = mainArgs.platformFilename;
context.workload_filename = mainArgs.workloadFilename;
context.energy_used = mainArgs.energy_used;
load_json_workload(&context, mainArgs.workloadFilename);
context.jobs.setProfiles(&context.profiles);
......@@ -146,7 +161,7 @@ int main(int argc, char * argv[])
MSG_create_environment(mainArgs.platformFilename.c_str());
xbt_dynar_t hosts = MSG_hosts_as_dynar();
context.machines.createMachines(hosts, mainArgs.masterHostName);
context.machines.createMachines(hosts, &context, mainArgs.masterHostName);
xbt_dynar_free(&hosts);
const Machine * masterMachine = context.machines.masterMachine();
context.machines.setTracer(&context.tracer);
......
......@@ -18,5 +18,8 @@ struct BatsimContext
Profiles profiles;
PajeTracer tracer;
long long microseconds_used_by_scheduler;
long long microseconds_used_by_scheduler = 0;
bool energy_used = false;
std::string platform_filename;
std::string workload_filename;
};
......@@ -7,6 +7,8 @@ SEP2 = ':'
'N' : NOP
'S' : Job Submission
'C' : Job Completion
'P' : Ask for a machine pstate change
'p' : Machine pstate changed
SCHEDULER -> SIMULATOR
0:15.000015|15.000015:J:1=1,2,0,3;2=3
......@@ -27,4 +29,14 @@ SEP2 = ':'
0:30.000030|30.000030:S:3
0:35.001305|30.005125:C:2
0:40.001320|38.002565:C:3|40.000030:S:4
0:145.003855|145.003855:C:4
\ No newline at end of file
0:145.003855|145.003855:C:4
Machine pstate change example.
Sched asks that machine 2 switches to pstate 3
Scheduler -> Simulator
0:20|20:P:2=3
Simulator -> Scheduler
0:21|21:p:2=3
......@@ -42,3 +42,54 @@ void send_message(const char *destination_mailbox, IPMessageType type, void *dat
const string str = destination_mailbox;
send_message(str, type, data);
}
IPMessage::~IPMessage()
{
switch (type)
{
case IPMessageType::JOB_SUBMITTED:
{
JobSubmittedMessage * msg = (JobSubmittedMessage *) data;
delete msg;
} break;
case IPMessageType::JOB_COMPLETED:
{
JobCompletedMessage * msg = (JobCompletedMessage *) data;
delete msg;
} break;
case IPMessageType::PSTATE_MODIFICATION:
{
PStateModificationMessage * msg = (PStateModificationMessage *) data;
delete msg;
} break;
case IPMessageType::SCHED_ALLOCATION:
{
SchedulingAllocationMessage * msg = (SchedulingAllocationMessage *) data;
delete msg;
} break;
case IPMessageType::SCHED_NOP:
{
} break;
case IPMessageType::SCHED_READY:
{
} break;
case IPMessageType::SUBMITTER_HELLO:
{
} break;
case IPMessageType::SUBMITTER_BYE:
{
} break;
case IPMessageType::SWITCHED_ON:
{
PStateModificationMessage * msg = (PStateModificationMessage *) data;
delete msg;
} break;
case IPMessageType::SWITCHED_OFF:
{
PStateModificationMessage * msg = (PStateModificationMessage *) data;
delete msg;
} break;
}
data = nullptr;
}
......@@ -15,11 +15,14 @@ enum class IPMessageType
{
JOB_SUBMITTED //!< Submitter -> Server. The submitter tells the server a new job has been submitted.
,JOB_COMPLETED //!< Launcher/killer -> Server. The launcher tells the server a job has been completed.
,PSTATE_MODIFICATION//!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a pstate modification).
,SCHED_ALLOCATION //!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a job allocation).
,SCHED_NOP //!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a NOP message).
,SCHED_READY //!< SchedulerHandler -> Server. The scheduler handler tells the server that the scheduler is ready (messages can be sent to it).
,SUBMITTER_HELLO //!< Submitter -> Server. The submitter tells it starts submitting to the server.
,SUBMITTER_BYE //!< Submitter -> Server. The submitter tells it stops submitting to the server.
,SWITCHED_ON //!< SwitcherON -> Server. The switcherON process tells the server the machine pstate has been changed
,SWITCHED_OFF //!< SwitcherOFF -> Server. The switcherOFF process tells the server the machine pstate has been changed.
};
struct JobSubmittedMessage
......@@ -44,8 +47,15 @@ struct SchedulingAllocationMessage
std::vector<SchedulingAllocation> allocations; //! Possibly several allocations
};
struct PStateModificationMessage
{
int machine;
int new_pstate;
};
struct IPMessage
{
~IPMessage();
IPMessageType type; //! The message type
void * data; //! The message data (can be NULL if type is in [SCHED_NOP, SUBMITTER_HELLO, SUBMITTER_BYE, SUBMITTER_READY]). Otherwise, it is either a JobSubmittedMessage*, a JobCompletedMessage* or a SchedulingAllocationMessage* according to type.
};
......@@ -71,7 +81,13 @@ struct KillerProcessArguments
{
msg_task_t task; //! The task that will be cancelled if the walltime is reached
double walltime; //! The number of seconds to wait before cancelling the task
} ;
};
struct SwitchPStateProcessArguments
{
BatsimContext * context;
PStateModificationMessage * message;
};
struct JobSubmitterProcessArguments
{
......
......@@ -6,8 +6,10 @@
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/join.hpp>
#include <boost/lexical_cast.hpp>
#include "export.hpp"
#include "context.hpp"
using namespace std;
......@@ -25,7 +27,7 @@ Machines::~Machines()
_machines.clear();
}
void Machines::createMachines(xbt_dynar_t hosts, const string &masterHostName)
void Machines::createMachines(xbt_dynar_t hosts, BatsimContext *context, const string &masterHostName)
{
xbt_assert(_machines.size() == 0, "Bad call to Machines::createMachines(): machines already created");
......@@ -44,6 +46,124 @@ void Machines::createMachines(xbt_dynar_t hosts, const string &masterHostName)
machine->jobs_being_computed = {};
machine->state = MachineState::IDLE;
if (context->energy_used)
{
int nb_pstates = MSG_host_get_nb_pstates(machine->host);
bool contains_sleep_pstates = false;
const char * sleep_states_cstr = MSG_host_get_property_value(machine->host, "sleep_pstates");
contains_sleep_pstates = (sleep_states_cstr != NULL);
// Let the sleep_pstates property be traversed in order to find the sleep and virtual transition pstates
if (contains_sleep_pstates)
{
string sleep_states_str = sleep_states_cstr;
vector<string> sleep_pstate_triplets;
boost::split(sleep_pstate_triplets, sleep_states_str, boost::is_any_of(","), boost::token_compress_on);
for (const auto & triplet : sleep_pstate_triplets)
{
vector<string> pstates;
boost::split(pstates, triplet, boost::is_any_of(":"), boost::token_compress_on);
xbt_assert(pstates.size() == 3, "Invalid platform file '%s': host '%s' has an invalid 'sleep_pstates' property:"
" each comma-separated part must be composed of three pstates of three colon-separated pstates, whereas"
" '%s' is not valid. Each comma-separated part represents one sleep pstate sleep_ps and its virtual pstates"
" on_ps and off_ps used to simulate the switch ON and switch OFF mechanisms."
" Example of a valid comma-separated part: 0:1:3, where sleep_ps=0, on_ps=1 and off_ps=3",
context->platform_filename.c_str(), machine->name.c_str(), triplet.c_str());
int sleep_ps, on_ps, off_ps;
bool conversion_succeeded = true;
try
{
sleep_ps = boost::lexical_cast<unsigned int>(pstates[0]);
on_ps = boost::lexical_cast<unsigned int>(pstates[1]);
off_ps = boost::lexical_cast<unsigned int>(pstates[2]);
}
catch(boost::bad_lexical_cast& e)
{
conversion_succeeded = false;
}
xbt_assert(conversion_succeeded, "Invalid platform file '%s': host '%s' has an invalid 'sleep_pstates' property:"
" the pstates of the comma-separated sleep pstate '%s' are invalid: impossible to convert the pstates to"
" unsigned integers", context->platform_filename.c_str(), machine->name.c_str(), triplet.c_str());
xbt_assert(sleep_ps >= 0 && sleep_ps < nb_pstates, "Invalid platform file '%s': host '%s' has an invalid 'sleep_pstates' property:"
" the pstates of the comma-separated sleep pstate '%s' are invalid: the pstate %d does not exist",
context->platform_filename.c_str(), machine->name.c_str(), triplet.c_str(), sleep_ps);
xbt_assert(on_ps >= 0 && on_ps < nb_pstates, "Invalid platform file '%s': host '%s' has an invalid 'sleep_pstates' property:"
" the pstates of the comma-separated sleep pstate '%s' are invalid: the pstate %d does not exist",
context->platform_filename.c_str(), machine->name.c_str(), triplet.c_str(), on_ps);
xbt_assert(off_ps >= 0 && off_ps < nb_pstates, "Invalid platform file '%s': host '%s' has an invalid 'sleep_pstates' property:"
" the pstates of the comma-separated sleep pstate '%s' are invalid: the sleep pstate %d does not exist",
context->platform_filename.c_str(), machine->name.c_str(), triplet.c_str(), off_ps);
if (machine->has_pstate(sleep_ps))
{
if (machine->pstates[sleep_ps] == PStateType::SLEEP_PSTATE)
{
XBT_ERROR("Invalid platform file '%s': host '%s' has an invalid 'sleep_pstates' property:"
" the pstate %d is defined several times, which is forbidden.",
context->platform_filename.c_str(), machine->name.c_str(), sleep_ps);
}
else if (machine->pstates[sleep_ps] == PStateType::TRANSITION_VIRTUAL_PSTATE)
{
XBT_ERROR("Invalid platform file '%s': host '%s' has an invalid 'sleep_pstates' property:"
" the pstate %d is defined as a sleep pstate and as a virtual transition pstate."
" A pstate can either be a computation one, a sleeping one or a virtual transition one, but combinations are forbidden.",
context->platform_filename.c_str(), machine->name.c_str(), sleep_ps);
}
else
{
XBT_ERROR("Invalid platform file '%s': host '%s' has an invalid 'sleep_pstates' property:"
" the pstate %d is defined as a sleep pstate and as another type of pstate."
" A pstate can either be a computation one, a sleeping one or a virtual transition one, but combinations are forbidden.",
context->platform_filename.c_str(), machine->name.c_str(), sleep_ps);
}
}
if (machine->has_pstate(on_ps))
{
xbt_assert(machine->pstates[on_ps] == PStateType::TRANSITION_VIRTUAL_PSTATE,
"Invalid platform file '%s': host '%s' has an invalid 'sleep_pstates' property:"
" a pstate can either be a computation one, a sleeping one or a virtual transition one, but combinations are forbidden."
" Pstate %d is defined as a virtual transition pstate but also as another type of pstate.",
context->platform_filename.c_str(), machine->name.c_str(), on_ps);
}
if (machine->has_pstate(off_ps))
{
xbt_assert(machine->pstates[off_ps] == PStateType::TRANSITION_VIRTUAL_PSTATE,
"Invalid platform file '%s': host '%s' has an invalid 'sleep_pstates' property:"
" a pstate can either be a computation one, a sleeping one or a virtual transition one, but combinations are forbidden."
" Pstate %d is defined as a virtual transition pstate but also as another type of pstate.",
context->platform_filename.c_str(), machine->name.c_str(), off_ps);
}
SleepPState * sleep_pstate = new SleepPState;
sleep_pstate->sleep_pstate = sleep_ps;
sleep_pstate->switch_on_virtual_pstate = on_ps;
sleep_pstate->switch_off_virtual_pstate = off_ps;
machine->sleep_pstates[sleep_ps] = sleep_pstate;
machine->pstates[sleep_ps] = PStateType::SLEEP_PSTATE;
machine->pstates[on_ps] = PStateType::TRANSITION_VIRTUAL_PSTATE;
machine->pstates[off_ps] = PStateType::TRANSITION_VIRTUAL_PSTATE;
}
}
// Let the computation pstates be defined by those who are not sleep pstates nor virtual transition pstates
for (int ps = 0; ps < nb_pstates; ++ps)
{
if (!machine->has_pstate(ps))
{
// TODO: check that the pstate computational power is not null
machine->pstates[ps] = PStateType::COMPUTATION_PSTATE;
}
}
}
if (machine->name != masterHostName)
_machines.push_back(machine);
else
......@@ -181,3 +301,67 @@ string machineStateToString(MachineState state)
return conv.at(state);
}
Machine::~Machine()
{
for (auto mit : sleep_pstates)
{
delete mit.second;
}
sleep_pstates.clear();
}
bool Machine::has_pstate(int pstate) const
{
return pstates.find(pstate) != pstates.end();
}
void Machine::display_machine(bool is_energy_used) const
{
// Let us traverse jobs_being_computed to display some information about them
vector<string> jobsVector;
for (auto & jobID : jobs_being_computed)
{
jobsVector.push_back(std::to_string(jobID));
}
string str = "Machine\n";
str += " id = " + to_string(id) + "\n";
str += " name = '" + name + "'\n";
str += " state = " + machineStateToString(state) + "\n";
str += " jobs_being_computed = [" + boost::algorithm::join(jobsVector, ", ") + "]\n";
if (is_energy_used)
{
vector<string> pstates_vector;
vector<string> comp_pstates_vector;
vector<string> sleep_pstates_vector;
vector<string> vt_pstates_vector;
for (auto & mit : pstates)
{
pstates_vector.push_back(to_string(mit.first));
if (mit.second == PStateType::COMPUTATION_PSTATE)
comp_pstates_vector.push_back(to_string(mit.first));
else if (mit.second == PStateType::SLEEP_PSTATE)
sleep_pstates_vector.push_back(to_string(mit.first));
else if (mit.second == PStateType::TRANSITION_VIRTUAL_PSTATE)
vt_pstates_vector.push_back(to_string(mit.first));
}
str += " pstates = [\n" + boost::algorithm::join(pstates_vector, ", ") + "]\n";
str += " computation pstates = [\n" + boost::algorithm::join(comp_pstates_vector, ", ") + "]\n";
str += " sleep pstates = [\n" + boost::algorithm::join(sleep_pstates_vector, ", ") + "]\n";
str += " virtual transition pstates = [\n" + boost::algorithm::join(vt_pstates_vector, ", ") + "]\n";
for (auto & mit : sleep_pstates)
{
str += " sleep_ps=" + to_string(mit.second->sleep_pstate) +
", on_ps=" + to_string(mit.second->switch_on_virtual_pstate) +
", off_ps=" + to_string(mit.second->switch_off_virtual_pstate) + "\n";
}
}
XBT_INFO("%s", str.c_str());
}
......@@ -3,26 +3,40 @@
#include <iostream>
#include <vector>
#include <set>
#include <map>
#include <string>
#include <simgrid/msg.h>
#include "pstate.hpp"
class PajeTracer;
struct BatsimContext;
enum class MachineState
{
SLEEPING,
IDLE,
COMPUTING,
TRANSITING_FROM_SLEEPING_TO_COMPUTING,
TRANSITING_FROM_COMPUTING_TO_SLEEPING,
};
struct Machine
{
~Machine();
int id;
std::string name;
msg_host_t host;
MachineState state;
std::set<int> jobs_being_computed;
std::map<int, PStateType> pstates;
std::map<int, SleepPState *> sleep_pstates;
bool has_pstate(int pstate) const;
void display_machine(bool is_energy_used) const;
};
std::ostream & operator<<(std::ostream & out, const Machine & machine);
......@@ -32,7 +46,7 @@ class Machines
public:
Machines();
~Machines();
void createMachines(xbt_dynar_t hosts, const std::string & masterHostName);
void createMachines(xbt_dynar_t hosts, BatsimContext * context, const std::string & masterHostName);
void updateMachinesOnJobRun(int jobID, const std::vector<int> & usedMachines);
void updateMachinesOnJobEnd(int jobID, const std::vector<int> & usedMachines);
......
......@@ -103,7 +103,6 @@ string UnixDomainSocket::receive()
xbt_assert(message_size > 0, "Invalid message received (size=%d)", message_size);
msg.resize(message_size);
XBT_INFO("message_size = %d", message_size);
// The message content is then read, following the same logic
nb_bytes_read = 0;
......@@ -166,6 +165,7 @@ int request_reply_scheduler_process(int argc, char *argv[])
The following stamp exist at the moment :
N : do nothing. Content : none.
J : allocation of a static job (described in the JSON file). Content : jobID1=mID1_1,mID1_2,...,mID1_n[;jobID2=MID2_1,...MID2_n[;...]]
P : changes the pstate of a machine. Content : mID=pstate
*/
// Let us split the message by '|'.
......@@ -198,6 +198,7 @@ int request_reply_scheduler_process(int argc, char *argv[])
{
case NOP:
{
xbt_assert(parts2.size() == 2, "Invalid event received ('%s'): NOP messages must be composed of 2 parts separated by ':'", event_string.c_str());
send_message("server", IPMessageType::SCHED_NOP);
} break; // End of case received_stamp == NOP
case STATIC_JOB_ALLOCATION:
......@@ -260,7 +261,7 @@ int request_reply_scheduler_process(int argc, char *argv[])
}
}
xbt_assert(all_different, "Invalid static job allocation received ('%s') : all machines are not different", allocation_string.c_str());
xbt_assert(all_different, "Invalid static job allocation received ('%s'): all machines are not different", allocation_string.c_str());
message->allocations.push_back(alloc);
}
......@@ -268,6 +269,52 @@ int request_reply_scheduler_process(int argc, char *argv[])
send_message("server", IPMessageType::SCHED_ALLOCATION, (void*) message);
} break; // End of case received_stamp == STATIC_JOB_ALLOCATION
case PSTATE_SET:
{
xbt_assert(parts2.size() == 3, "Invalid event received ('%s'): pstate modifications must be composed of 3 parts separated by ':'",
event_string.c_str());
xbt_assert(context->energy_used, "A pstate modification message has been received alors energy is not used by Batsim."
" You can switch energy ON by a Batsim command-line option.");
PStateModificationMessage * message = new PStateModificationMessage;
// Let the content part of the message splitted by "="
vector<string> parts3;
boost::split(parts3, parts2[2], boost::is_any_of("="), boost::token_compress_on);
xbt_assert(parts3.size() == 2, "Invalid event received ('%s'): invalid pstate modification message content ('%s'): it must be"
" of type M=P where M is a machine number and P a pstate number of machine M", event_string.c_str(), parts2[2].c_str());
int machineID = std::stoi(parts3[0]);
int pstate = std::stoi(parts3[1]);
xbt_assert(context->machines.exists(machineID), "Invalid event received ('%s'): machine %d does not exist", event_string.c_str(), machineID);
Machine * machine = context->machines[machineID];
xbt_assert(machine->state == MachineState::IDLE, "Invalid event received ('%s'): machine %d's pstate can only be changed while the machine is idle, which is not the case now.", event_string.c_str(), machineID);
xbt_assert(context->machines[machineID]->has_pstate(pstate), "Invalid event received ('%s'): machine %d has no pstate %d", event_string.c_str(), machineID, pstate);
int current_pstate = MSG_host_get_pstate(machine->host);
xbt_assert(machine->has_pstate(current_pstate));
if (machine->pstates[current_pstate] == PStateType::COMPUTATION_PSTATE)
{
xbt_assert(machine->pstates[pstate] == PStateType::COMPUTATION_PSTATE ||
machine->pstates[pstate] == PStateType::SLEEP_PSTATE, "Invalid event received ('%s'): asked to switch machine %d ('%s') from"
" pstate %d (a computation one) to pstate %d (not a computation pstate nor a sleep pstate), which is forbidden",
event_string.c_str(), machineID, machine->name.c_str(), current_pstate, pstate);
}
else if (machine->pstates[current_pstate] == PStateType::SLEEP_PSTATE)
{
xbt_assert(machine->pstates[pstate] == PStateType::COMPUTATION_PSTATE, "Invalid event received ('%s'): asked to switch machine %d ('%s') from"
" pstate %d (a sleep pstate) to pstate %d (not a computation pstate), which is forbidden",
event_string.c_str(), machineID, machine->name.c_str(), current_pstate, pstate);
}
message->machine = machineID;
message->new_pstate = pstate;
send_message("server", IPMessageType::PSTATE_MODIFICATION, (void*) message);
} break; // End of case received_stamp == PSTATE_SET
default:
{
xbt_die("Invalid event received ('%s') : unhandled network stamp received ('%c')", event_string.c_str(), received_stamp);
......@@ -316,14 +363,16 @@ int uds_server_process(int argc, char *argv[])
{
nb_submitters++;
XBT_INFO("New submitter said hello. Number of polite submitters: %d", nb_submitters);
break;
} // end of case SUBMITTER_HELLO
} break; // end of case SUBMITTER_HELLO
case IPMessageType::SUBMITTER_BYE:
{
nb_submitters_finished++;
XBT_INFO("A submitted said goodbye. Number of finished submitters: %d", nb_submitters_finished);
break;
} // end of case SUBMITTER_BYE
} break; // end of case SUBMITTER_BYE
case IPMessageType::JOB_COMPLETED:
{
xbt_assert(task_data->data != nullptr);
......@@ -339,8 +388,8 @@ int uds_server_process(int argc, char *argv[])
send_buffer += '|' + std::to_string(MSG_get_clock()) + ":C:" + std::to_string(job->id);
XBT_INFO("Message to send to scheduler: %s", send_buffer.c_str());
break;
} // end of case JOB_COMPLETED
} break; // end of case JOB_COMPLETED
case IPMessageType::JOB_SUBMITTED:
{
xbt_assert(task_data->data != nullptr);
......@@ -354,8 +403,62 @@ int uds_server_process(int argc, char *argv[])
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":S:" + std::to_string(job->id);
XBT_INFO("Message to send to scheduler: %s", send_buffer.c_str());
break;
} // end of case JOB_SUBMITTED
} break; // end of case JOB_SUBMITTED
case IPMessageType::PSTATE_MODIFICATION:
{
xbt_assert(task_data->data != nullptr);
PStateModificationMessage * message = (PStateModificationMessage *) task_data->data;
Machine * machine = context->machines[message->machine];
int curr_pstate = MSG_host_get_pstate(machine->host);
if (machine->pstates[curr_pstate] == PStateType::COMPUTATION_PSTATE)
{
if (machine->pstates[message->new_pstate] == PStateType::COMPUTATION_PSTATE)
{
MSG_host_set_pstate(machine->host, message->new_pstate);
xbt_assert(MSG_host_get_pstate(machine->host) == message->new_pstate);
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":p:" +
std::to_string(machine->id) + "=" + std::to_string(message->new_pstate);
XBT_INFO("Message to send to scheduler : '%s'", send_buffer.c_str());
}
else if (machine->pstates[message->new_pstate] == PStateType::SLEEP_PSTATE)
{
machine->state = MachineState::TRANSITING_FROM_COMPUTING_TO_SLEEPING;
SwitchPStateProcessArguments * args = new SwitchPStateProcessArguments;
args->context = context;
args->message = new PStateModificationMessage;
args->message->machine = message->machine;
args->message->new_pstate = message->new_pstate;
string pname = "switch ON " + to_string(message->machine);
MSG_process_create(pname.c_str(), switch_on_machine_process, (void*)args, machine->host);
}
else
XBT_ERROR("Switching from a communication pstate to an invalid pstate on machine %d ('%s') : %d -> %d",
machine->id, machine->name.c_str(), curr_pstate, message->new_pstate);
}
else if (machine->pstates[curr_pstate] == PStateType::SLEEP_PSTATE)
{
xbt_assert(machine->pstates[message->new_pstate] == PStateType::COMPUTATION_PSTATE);
machine->state = MachineState::TRANSITING_FROM_SLEEPING_TO_COMPUTING;
SwitchPStateProcessArguments * args = new SwitchPStateProcessArguments;
args->context = context;
args->message = new PStateModificationMessage;
args->message->machine = message->machine;
args->message->new_pstate = message->new_pstate;
string pname = "switch OFF " + to_string(message->machine);
MSG_process_create(pname.c_str(), switch_off_machine_process, (void*)args, machine->host);
}
else
XBT_ERROR("Machine %d ('%s') has an invalid pstate : %d", machine->id, machine->name.c_str(), curr_pstate);
} break; // end of case PSTATE_MODIFICATION
case IPMessageType::SCHED_NOP:
{
XBT_INFO("Nothing to do received.");
......@@ -379,8 +482,9 @@ int uds_server_process(int argc, char *argv[])
XBT_INFO("The available jobs are [%s]", submittedJobsString.c_str());
}
break;
}
} break; // end of case SCHED_NOP
case IPMessageType::SCHED_ALLOCATION:
{
xbt_assert(task_data->data != nullptr);
......@@ -403,13 +507,38 @@ int uds_server_process(int argc, char *argv[])
MSG_process_create(pname.c_str(), execute_job_process, (void*)exec_args, context->machines[allocation.machine_ids[0]]->host);
}
break;
}
} break; // end of case SCHED_ALLOCATION
case IPMessageType::SCHED_READY:
{
sched_ready = true;
break;
}
} break; // end of case SCHED_READY
case IPMessageType::SWITCHED_ON:
{
xbt_assert(task_data->data != nullptr);
PStateModificationMessage * message = (PStateModificationMessage *) task_data->data;
xbt_assert(context->machines.exists(message->machine));
Machine * machine =