Commit e56ca13c authored by Millian Poquet's avatar Millian Poquet

One fully runnable C++ version! Unfortunately, this version does not work :(....

One fully runnable C++ version! Unfortunately, this version does not work :(. Firstly because of a bug of the latest SG version. Secondly there is a problem about message boxes (SG commit b85de8f1e83118 can be used to investigate this issue).
parent 1d657a0c
#include <string>
#include <stdio.h>
#include <argp.h>
#include <unistd.h>
#include <simgrid/msg.h>
#include "context.hpp"
#include "export.hpp"
#include "ipp.hpp"
#include "job_submitter.hpp"
#include "jobs.hpp"
#include "jobs_execution.hpp"
#include "machines.hpp"
#include "network.hpp"
#include "profiles.hpp"
using namespace std;
/**
* @brief The main function arguments (a.k.a. program arguments)
*/
struct MainArguments
{
std::string platformFilename; //! The SimGrid platform filename
std::string workloadFilename; //! The JSON workload filename
std::string socketFilename; //! The Unix Domain Socket filename
std::string masterHostName; //! The name of the SimGrid host which runs scheduler processes and not user tasks
std::string exportPrefix; //! The filename prefix used to export simulation information
bool abort; //! A boolean value. If set to yet, the launching should be aborted for reason abortReason
std::string abortReason; //! Human readable reasons which explains why the launch should be aborted
};
/**
* @brief Used to parse the main function parameters
* @param[in] key The current key
* @param[in] arg The current argument
* @param[in, out] state The current argp_state
* @return 0
*/
static int parse_opt (int key, char *arg, struct argp_state *state)
{
MainArguments * mainArgs = (MainArguments *) state->input;
switch (key)
{
case 'e':
mainArgs->exportPrefix = arg;
break;
case 'm':
mainArgs->masterHostName = arg;
break;
case 's':
mainArgs->socketFilename = arg;
break;
case ARGP_KEY_ARG:
switch(state->arg_num)
{
case 0:
mainArgs->platformFilename = arg;
if (access(mainArgs->platformFilename.c_str(), R_OK) == -1)
{
mainArgs->abort = true;
mainArgs->abortReason += "\n invalid PLATFORM_FILE argument: file '" + string(mainArgs->platformFilename) + "' cannot be read";
}
break;
case 1:
mainArgs->workloadFilename = arg;
if (access(mainArgs->workloadFilename.c_str(), R_OK) == -1)
{
mainArgs->abort = true;
mainArgs->abortReason += "\n invalid WORKLOAD_FILE argument: file '" + string(mainArgs->workloadFilename) + "' cannot be read";
}
break;
}
break;
case ARGP_KEY_END:
if (state->arg_num < 2)
{
mainArgs->abort = 1;
mainArgs->abortReason += "\n\tToo few arguments. Try the --help option to display usage information.";
}
/*else if (state->arg_num > 2)
{
mainArgs->abort = 1;
strcat(mainArgs->abortReason, "\n\tToo many arguments.");
}*/
break;
}
return 0;
}
int main(int argc, char * argv[])
{
MainArguments mainArgs;
mainArgs.socketFilename = "/tmp/bat_socket";
mainArgs.masterHostName = "master_host";
mainArgs.exportPrefix = "out";
mainArgs.abort = false;
struct argp_option options[] =
{
{"socket", 's', "FILENAME", 0, "Unix Domain Socket filename", 0},
{"master-host", 'm', "NAME", 0, "The name of the host in PLATFORM_FILE which will run SimGrid scheduling processes and won't be used to compute tasks", 0},
{"export", 'e', "FILENAME_PREFIX", 0, "The export filename prefix used to generate simulation output", 0},
{0, '\0', 0, 0, 0, 0} // The options array must be NULL-terminated
};
struct argp argp = {options, parse_opt, "PLATFORM_FILE WORKLOAD_FILE", "A tool to simulate (via SimGrid) the behaviour of scheduling algorithms.", 0, 0, 0};
argp_parse(&argp, argc, argv, 0, 0, &mainArgs);
if (mainArgs.abort)
{
fprintf(stderr, "Impossible to run batsim:%s\n", mainArgs.abortReason.c_str());
return 1;
}
// Initialization
MSG_init(&argc, argv);
BatsimContext context;
context.jobs.load_from_json(mainArgs.workloadFilename);
context.profiles.load_from_json(mainArgs.workloadFilename);
context.jobs.setProfiles(&context.profiles);
context.tracer.setFilename(mainArgs.exportPrefix + "_schedule.trace");
// TODO: check jobs & profile validity
bool smpi_used = context.jobs.containsSMPIJob();
if (!smpi_used)
MSG_config("host/model", "ptask_L07");
MSG_create_environment(mainArgs.platformFilename.c_str());
xbt_dynar_t hosts = MSG_hosts_as_dynar();
context.machines.createMachines(hosts, mainArgs.masterHostName);
xbt_dynar_free(&hosts);
const Machine * masterMachine = context.machines.masterMachine();
// Socket
context.socket.create_socket(mainArgs.socketFilename);
context.socket.accept_pending_connection();
// Main processes running
JobSubmitterProcessArguments * submitterArgs = new JobSubmitterProcessArguments;
submitterArgs->context = &context;
MSG_process_create("jobs_submitter", job_submitter_process, (void*)submitterArgs, masterMachine->host);
ServerProcessArguments * serverArgs = new ServerProcessArguments;
serverArgs->context = &context;
MSG_process_create("server", uds_server_process, (void*)serverArgs, masterMachine->host);
msg_error_t res = MSG_main();
// Finalization
exportScheduleToCSV(mainArgs.exportPrefix + "_schedule.csv", MSG_get_clock(), &context);
if (res == MSG_OK)
return 0;
else
return 1;
return 0;
}
This diff is collapsed.
......@@ -11,6 +11,8 @@
#include "machines.hpp"
struct BatsimContext;
class WriteBuffer
{
public:
......@@ -56,7 +58,7 @@ void exportJobsToCSV(const char * filename);
* @param filename The is the name of the output file used to write the CSV data.
* @param microseconds_used_by_scheduler The number of seconds the scheduler had hand on execution flow
*/
void exportScheduleToCSV(const char * filename, double scheduling_time);
void exportScheduleToCSV(const std::string &filename, double scheduling_time, BatsimContext * context);
/**
......@@ -70,7 +72,9 @@ public:
* @param filename
* @param logLaunchings If set to true, job launching time will be written in the trace. This option leads to larger trace files.
*/
PajeTracer(const std::string & filename, bool _logLaunchings = false);
PajeTracer(bool _logLaunchings = false);
void setFilename(const std::string & filename);
/**
* @brief PajeTracer destructor.
......
......@@ -17,7 +17,7 @@ def create_uds(uds_name):
raise
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# Bind the socket to the port
print('Starting up on', uds_name)
sock.bind(uds_name)
......@@ -29,11 +29,11 @@ def create_uds(uds_name):
def read_bat_msg(connection):
lg_str = connection.recv(4)
if not lg_str:
print("connection is closed by batsim core")
exit(1)
lg = struct.unpack("i",lg_str)[0]
msg = connection.recv(lg).decode()
print('from batsim : %r' % msg)
......@@ -41,7 +41,7 @@ def read_bat_msg(connection):
data = sub_msgs[0].split(":")
version = int(data[0])
now = float(data[1])
print("version: ", version, " now: ", now)
jobs_submitted = []
......@@ -53,14 +53,14 @@ def read_bat_msg(connection):
elif data[1] == 'C':
new_jobs_completed.append(int(data[2]))
else:
raise Exception("Unknow submessage type" + data[1] )
raise Exception("Unknow submessage type" + data[1] )
return (now, jobs_submitted, new_jobs_completed)
def send_bat_msg(connection, now, jids_toLaunch, jobs):
msg = "0:" + str(now) + "|"
if jids_toLaunch:
msg += str(now) + ":J:"
msg += str(now) + ":J:"
for jid in jids_toLaunch:
msg += str(jid) + "="
for r in jobs[jid]:
......@@ -68,7 +68,7 @@ def send_bat_msg(connection, now, jids_toLaunch, jobs):
msg = msg[:-1] + ";" # replace last comma by semicolon separtor between jobs
msg = msg[:-1] # remove last semicolon
else: #Do nothing
else: #Do nothing
msg += str(now) +":N"
print(msg)
......@@ -79,7 +79,7 @@ def send_bat_msg(connection, now, jids_toLaunch, jobs):
def load_json_workload_profile(filename):
wkp_file = open(filename)
wkp = json.load(wkp_file)
return wkp["jobs"], wkp["nb_res"]
return wkp["jobs"], wkp["nb_res"]
......@@ -110,15 +110,20 @@ openJobs = set()
availableResources = SortedSet(range(nb_res))
previousAllocations = dict()
##
##
# uds creation and waiting for connection
#
sock = create_uds(server_address)
print("waiting for a connection")
connection, client_address = sock.accept()
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
print('connecting to', server_address)
try:
sock.connect(server_address)
print('connected')
except socket.error:
print("socket error")
sys.exit(1)
while True:
now_str, jobs_submitted, jobs_completed = read_bat_msg(connection)
now_str, jobs_submitted, jobs_completed = read_bat_msg(sock)
for job in jobs_submitted:
openJobs.add(job)
......@@ -133,7 +138,7 @@ while True:
print('openJobs = ', openJobs)
print('available = ', availableResources)
print('previous = ', previousAllocations)
# Iterating over a copy to be able to remove jobs from openJobs at traversal
for job in set(openJobs):
nb_res_req = jobs_res_req[job]
......@@ -149,11 +154,11 @@ while True:
openJobs.remove(job)
# update time
# update time
now = float(now_str) + sched_delay
# send to uds
send_bat_msg(connection, now, scheduledJobs, jobs_res)
send_bat_msg(sock, now, scheduledJobs, jobs_res)
print('openJobs = ', openJobs)
print('available = ', availableResources)
......
......@@ -50,7 +50,6 @@ struct IPMessage
void * data; //! The message data (can be NULL if type is in [SCHED_NOP, SUBMITTER_HELLO, SUBMITTER_BYE, SUBMITTER_READY]). Otherwise, it is either a JobSubmittedMessage*, a JobCompletedMessage* or a SchedulingAllocationMessage* according to type.
};
struct RequestReplyProcessArguments
{
BatsimContext * context;
......@@ -74,6 +73,11 @@ struct KillerProcessArguments
double walltime; //! The number of seconds to wait before cancelling the task
} ;
struct JobSubmitterProcessArguments
{
BatsimContext * context;
};
/**
* @brief Sends a message from the given process to the given mailbox
* @param[in] dst The destination mailbox
......
......@@ -7,11 +7,14 @@
#include <string>
#include <fstream>
#include <streambuf>
#include <algorithm>
#include <simgrid/msg.h>
#include <rapidjson/document.h>
#include "profiles.hpp"
using namespace std;
using namespace rapidjson;
......@@ -30,6 +33,11 @@ Jobs::~Jobs()
}
}
void Jobs::setProfiles(Profiles *profiles)
{
_profiles = profiles;
}
void Jobs::load_from_json(const std::string &filename)
{
// Let the file content be placed in a string
......@@ -109,7 +117,23 @@ bool Jobs::exists(int job_id) const
return it != _jobs.end();
}
bool Jobs::containsSMPIJob() const
{
for (auto & mit : _jobs)
{
Job * job = mit.second;
if ((*_profiles)[job->profile]->type == ProfileType::SMPI)
return true;
}
return false;
}
const std::map<int, Job* > &Jobs::jobs() const
{
return _jobs;
}
bool job_comparator_subtime(const Job *a, const Job *b)
{
return a->submission_time < b->submission_time;
}
......@@ -7,6 +7,8 @@
#include <map>
#include <vector>
class Profiles;
enum class JobState
{
JOB_STATE_NOT_SUBMITTED //!< The job exists but cannot be scheduled yet.
......@@ -30,20 +32,26 @@ struct Job
JobState state;
};
bool job_comparator_subtime(const Job * a, const Job * b);
class Jobs
{
public:
Jobs();
~Jobs();
void setProfiles(Profiles * profiles);
void load_from_json(const std::string & filename);
Job * operator[](int job_id);
const Job * operator[](int job_id) const;
bool exists(int job_id) const;
bool containsSMPIJob() const;
const std::map<int, Job*> & jobs() const;
private:
std::map<int, Job*> _jobs;
Profiles * _profiles;
};
......@@ -20,12 +20,12 @@ Machines::~Machines()
_machines.clear();
}
void Machines::createMachines(xbt_dynar_t hosts)
void Machines::createMachines(xbt_dynar_t hosts, const string &masterHostName)
{
xbt_assert(_machines.size() == 0, "Bad call to Machines::createMachines(): machines already created");
int nb_machines = xbt_dynar_length(hosts);
_machines.resize(nb_machines);
_machines.reserve(nb_machines);
msg_host_t host;
unsigned int i;
......@@ -39,8 +39,16 @@ void Machines::createMachines(xbt_dynar_t hosts)
machine->jobs_being_computed = {};
machine->state = MachineState::IDLE;
_machines[i] = machine;
if (machine->name != masterHostName)
_machines.push_back(machine);
else
{
xbt_assert(_masterMachine == nullptr);
_masterMachine = machine;
}
}
xbt_assert(_masterMachine != nullptr, "Cannot find the MasterHost '%s' in the platform file", masterHostName.c_str());
}
const Machine * Machines::operator[](int machineID) const
......@@ -58,6 +66,16 @@ bool Machines::exists(int machineID) const
return machineID >= 0 && machineID < (int)_machines.size();
}
const std::vector<Machine *> &Machines::machines() const
{
return _machines;
}
const Machine *Machines::masterMachine() const
{
return _masterMachine;
}
void Machines::updateMachinesOnJobRun(int jobID, const std::vector<int> & usedMachines)
{
for (int machineID : usedMachines)
......@@ -96,7 +114,6 @@ void Machines::updateMachinesOnJobEnd(int jobID, const std::vector<int> & usedMa
{
machine->state = MachineState::IDLE;
_tracer->set_machine_idle(machine->id, MSG_get_clock());
// todo: handle the Pajé trace in this file, not directly in batsim.c
}
else if (*machine->jobs_being_computed.begin() != previous_top_job)
{
......
......@@ -32,7 +32,7 @@ class Machines
public:
Machines();
~Machines();
void createMachines(xbt_dynar_t hosts);
void createMachines(xbt_dynar_t hosts, const std::string & masterHostName);
void updateMachinesOnJobRun(int jobID, const std::vector<int> & usedMachines);
void updateMachinesOnJobEnd(int jobID, const std::vector<int> & usedMachines);
......@@ -43,8 +43,12 @@ public:
bool exists(int machineID) const;
const std::vector<Machine *> & machines() const;
const Machine * masterMachine() const;
private:
std::vector<Machine *> _machines;
Machine * _masterMachine = nullptr;
PajeTracer * _tracer = nullptr;
};
......
......@@ -66,8 +66,10 @@ void UnixDomainSocket::create_socket(const string & filename)
void UnixDomainSocket::accept_pending_connection()
{
XBT_INFO("Waiting for an incoming connection...");
_client_socket = accept(_server_socket, NULL, NULL);
xbt_assert(_client_socket != -1, "Impossible to accept on socket");
XBT_INFO("Connected!");
}
string UnixDomainSocket::receive()
......@@ -75,6 +77,8 @@ string UnixDomainSocket::receive()
string msg;
int32_t message_size;
// TODO: safer socket reads
// Let's read the message size first
int ret = read(_client_socket, &message_size, 4);
......@@ -137,7 +141,7 @@ int request_reply_scheduler_process(int argc, char *argv[])
char sendDateAsString[16];
sprintf(sendDateAsString, "%f", MSG_get_clock());
char *sendBuf = (char*) MSG_process_get_data(MSG_process_self());
char *sendBuf = (char*) args->send_buffer.c_str();
XBT_INFO("Buffer received in REQ-REP: '%s'", sendBuf);
context->socket.send(sendBuf);
......@@ -171,8 +175,9 @@ int request_reply_scheduler_process(int argc, char *argv[])
double previousDate = atof(sendDateAsString);
for (const std::string & event_string : events)
for (unsigned int eventI = 1; eventI < events.size(); ++eventI)
{
const std::string & event_string = events[eventI];
// Let us split the event by ':'.
vector<string> parts2;
boost::split(parts2, event_string, boost::is_any_of(":"), boost::token_compress_on);
......@@ -232,6 +237,7 @@ int request_reply_scheduler_process(int argc, char *argv[])
allocation_string.c_str(), job->id, job->required_nb_res, allocation_machines.size());
alloc.machine_ids.resize(allocation_machines.size());
alloc.hosts.resize(allocation_machines.size());
for (unsigned int i = 0; i < allocation_machines.size(); ++i)
{
int machineID = std::stoi(allocation_machines[i]);
......@@ -299,11 +305,11 @@ int uds_server_process(int argc, char *argv[])
while ((nb_submitters == 0) || (nb_submitters_finished < nb_submitters) ||
(nb_completed_jobs < nb_submitted_jobs) || !sched_ready)
{
// Let's wait a message from a node, a job submitter, the request-reply process...
// Let's wait a message from a node or the request-reply process...
MSG_task_receive(&(task_received), "server");
task_data = (IPMessage *) MSG_task_get_data(task_received);
XBT_INFO("Server receive Task/message type %s:", ipMessageTypeToString(task_data->type).c_str());
XBT_INFO("Server received a message of type %s:", ipMessageTypeToString(task_data->type).c_str());
switch (task_data->type)
{
......@@ -417,8 +423,9 @@ int uds_server_process(int argc, char *argv[])
RequestReplyProcessArguments * req_rep_args = new RequestReplyProcessArguments;
req_rep_args->context = context;
req_rep_args->send_buffer = "0:" + to_string(MSG_get_clock()) + send_buffer;
send_buffer.clear();
MSG_process_create("Scheduler REQ-REP", request_reply_scheduler_process, &req_rep_args, MSG_host_self());
MSG_process_create("Scheduler REQ-REP", request_reply_scheduler_process, (void*)req_rep_args, MSG_host_self());
sched_ready = false;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment