Commit b9b5e854 authored by Millian Poquet's avatar Millian Poquet

New modelisation of the inter-process communication and of the process...

New modelisation of the inter-process communication and of the process creation (and parameters). A context has been created to avoid any global variable. Network is now implemented in C++. Network parsing is done in C++ too. The server socker is now BatSim's. To be continued.
parent 35611845
*.o
\ No newline at end of file
/**
* @file context.hpp The Batsim context
*/
#include "network.hpp"
#include "machines.hpp"
#include "jobs.hpp"
#include "profiles.hpp"
#include "export.hpp"
struct BatsimContext
{
UnixDomainSocket socket;
Machines machines;
Jobs jobs;
Profiles profiles;
long long microseconds_used_by_scheduler;
};
#include "ipp.hpp"
#include <simgrid/msg.h>
#include <map>
using namespace std;
XBT_LOG_NEW_DEFAULT_CATEGORY(ipp, "ipp");
void send_message(const std::string & destination_mailbox, IPMessageType type, void * data)
{
IPMessage * message = new IPMessage;
message->type = type;
message->data = data;
msg_task_t task_to_send = MSG_task_create(NULL, 0, 1e-6, message);
XBT_INFO("message from '%s' to '%s' of type '%s' with data %p",
MSG_process_get_name(MSG_process_self()), destination_mailbox.c_str(), ipMessageTypeToString(type).c_str(), data);
MSG_task_send(task_to_send, destination_mailbox.c_str());
}
std::string ipMessageTypeToString(IPMessageType type)
{
static map<IPMessageType, string> type_to_string =
{
{IPMessageType::JOB_SUBMITTED, "JOB_SUBMITTED"},
{IPMessageType::JOB_COMPLETED, "JOB_COMPLETED"},
{IPMessageType::SCHED_ALLOCATION, "SCHED_ALLOCATION"},
{IPMessageType::SCHED_NOP, "SCHED_NOP"},
{IPMessageType::SCHED_READY, "SCHED_READY"},
{IPMessageType::SUBMITTER_HELLO, "SUBMITTER_HELLO"},
{IPMessageType::SUBMITTER_BYE, "SUBMITTER_BYE"},
};
return type_to_string[type];
}
#pragma once
/**
* @file ipp.hpp Inter-Process Protocol
*/
#include <vector>
#include <string>
struct BatsimContext;
enum class IPMessageType
{
JOB_SUBMITTED //!< Submitter -> Server. The submitter tells the server a new job has been submitted.
,JOB_COMPLETED //!< Launcher/killer -> Server. The launcher tells the server a job has been completed.
,SCHED_ALLOCATION //!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a job allocation).
,SCHED_NOP //!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a NOP message).
,SCHED_READY //!< SchedulerHandler -> Server. The scheduler handler tells the server that the scheduler is ready (messages can be sent to it).
,SUBMITTER_HELLO //!< Submitter -> Server. The submitter tells it starts submitting to the server.
,SUBMITTER_BYE //!< Submitter -> Server. The submitter tells it stops submitting to the server.
};
struct JobSubmittedMessage
{
int job_id; //! The job ID
};
struct JobCompletedMessage
{
int job_id; //! The job ID
};
struct SchedulingAllocation
{
int job_id;
std::vector<int> machine_ids; //! The IDs of the machines on which the job should be allocated
};
struct SchedulingAllocationMessage
{
std::vector<SchedulingAllocation> allocations; //! Possibly several allocations
};
struct IPMessage
{
IPMessageType type; //! The message type
void * data; //! The message data (can be NULL if type is in [SCHED_NOP, SUBMITTER_HELLO, SUBMITTER_BYE, SUBMITTER_READY]). Otherwise, it is either a JobSubmittedMessage*, a JobCompletedMessage* or a SchedulingAllocationMessage* according to type.
};
struct RequestReplyProcessArguments
{
BatsimContext * context;
std::string send_buffer;
};
struct ServerProcessArguments
{
BatsimContext * context;
};
/**
* @brief Sends a message from the given process to the given mailbox
* @param[in] dst The destination mailbox
* @param[in] type The type of message to send
* @param[in] job_id The job the message is about
* @param[in] data The data associated to the message
*/
void send_message(const std::string & destination_mailbox, IPMessageType type, void * data = nullptr);
std::string ipMessageTypeToString(IPMessageType type);
/**
* @file jobs.cpp
*/
/**
* @file jobs.hpp
*/
#pragma once
#include <map>
enum class JobState
{
JOB_STATE_NOT_SUBMITTED //!< The job exists but cannot be scheduled yet.
,JOB_STATE_SUBMITTED //!< The job has been submitted, it can now be scheduled.
,JOB_STATE_RUNNING //!< The job has been scheduled and is currently being processed.
,JOB_STATE_COMPLETED_SUCCESSFULLY //!< The job execution finished before its walltime.
,JOB_STATE_COMPLETED_KILLED //!< The job execution time was longer than its walltime so the job had been killed.
};
struct Job
{
int id;
std::string profile;
double submission_time;
double walltime;
int required_nb_res;
double starting_time;
double runtime;
std::vector<int> allocation;
JobState state;
};
class Jobs
{
public:
Jobs();
~Jobs();
void load_from_json(const std::string & filename);
Job * operator[](int job_id);
const Job * operator[](int job_id) const;
bool exists(int job_id) const;
const std::map<int, Job> & jobs() const;
private:
std::map<int, Job> _jobs;
};
......@@ -2,6 +2,7 @@
#include <algorithm>
#include <iterator>
#include <map>
using namespace std;
......@@ -32,7 +33,7 @@ void Machines::createMachines(xbt_dynar_t hosts)
machine.name = MSG_host_get_name(host);
machine.host = host;
machine.jobs_being_computed = {};
machine.state = Machine::IDLE;
machine.state = MachineState::IDLE;
}
}
......@@ -51,7 +52,7 @@ void Machines::updateMachinesOnJobRun(int jobID, const std::vector<int> & usedMa
for (int machineID : usedMachines)
{
Machine & machine = _machines[machineID];
machine.state = Machine::COMPUTING;
machine.state = MachineState::COMPUTING;
// cout << machine;
machine.jobs_being_computed.insert(jobID);
......@@ -72,7 +73,7 @@ void Machines::updateMachinesOnJobEnd(int jobID, const std::vector<int> & usedMa
if (machine.jobs_being_computed.empty())
{
machine.state = Machine::IDLE;
machine.state = MachineState::IDLE;
// todo: handle the Pajé trace in this file, not directly in batsim.c
}
......@@ -87,7 +88,7 @@ ostream & operator<<(ostream & out, const Machine & machine)
"COMPUTING"};
out << "Machine " << machine.id << ", ";
out << "state = " << machineStateToStr[machine.state] << ", ";
out << "state = " << machineStateToString(machine.state) << ", ";
out << "jobs = [";
std::copy(machine.jobs_being_computed.begin(), machine.jobs_being_computed.end(),
std::ostream_iterator<char>(out, " "));
......@@ -95,3 +96,15 @@ ostream & operator<<(ostream & out, const Machine & machine)
return out;
}
string machineStateToString(MachineState state)
{
static const std::map<MachineState,std::string> conv =
{
{MachineState::SLEEPING, "sleeping"},
{MachineState::IDLE, "idle"},
{MachineState::COMPUTING, "computing"}
};
return conv.at(state);
}
......@@ -7,19 +7,19 @@
#include <simgrid/msg.h>
enum class MachineState
{
SLEEPING,
IDLE,
COMPUTING,
};
struct Machine
{
enum State
{
SLEEPING,
IDLE,
COMPUTING,
};
int id;
std::string name;
int id;
std::string name;
msg_host_t host;
State state;
MachineState state;
std::set<int> jobs_being_computed;
};
......@@ -28,19 +28,23 @@ std::ostream & operator<<(std::ostream & out, const Machine & machine);
class Machines
{
public:
Machines();
~Machines();
void createMachines(xbt_dynar_t hosts);
void updateMachinesOnJobRun(int jobID, const std::vector<int> & usedMachines);
void updateMachinesOnJobEnd(int jobID, const std::vector<int> & usedMachines);
Machines();
~Machines();
void createMachines(xbt_dynar_t hosts);
void updateMachinesOnJobRun(int jobID, const std::vector<int> & usedMachines);
void updateMachinesOnJobEnd(int jobID, const std::vector<int> & usedMachines);
const Machine & operator[](int machineID) const;
Machine & operator[](int machineID);
const Machine & operator[](int machineID) const;
Machine & operator[](int machineID);
bool exists(int machineID) const;
private:
std::vector<Machine> _machines;
std::vector<Machine> _machines;
};
std::string machineStateToString(MachineState state);
// The array of machines
extern int nb_machines;
extern Machine* machines;
......
#include "network.hpp"
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <chrono>
#include <stdexcept>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/join.hpp>
#include <simgrid/msg.h>
#include "context.hpp"
#include "ipp.hpp"
XBT_LOG_NEW_DEFAULT_CATEGORY(network, "network");
using namespace std;
UnixDomainSocket::UnixDomainSocket()
{
_server_socket = -1;
_client_socket = -1;
}
UnixDomainSocket::UnixDomainSocket(const string & filename) : UnixDomainSocket()
{
create_socket(filename);
}
UnixDomainSocket::~UnixDomainSocket()
{
if (_client_socket != -1)
{
::close(_client_socket);
_client_socket = -1;
}
if (_server_socket != -1)
{
::close(_server_socket);
_server_socket = -1;
}
}
void UnixDomainSocket::create_socket(const string & filename)
{
_server_socket = socket(AF_UNIX, SOCK_STREAM, 0);
xbt_assert(_server_socket != -1, "Impossible to create socket");
sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, filename.c_str(), sizeof(addr.sun_path)-1);
unlink(filename.c_str());
int ret = bind(_server_socket, (struct sockaddr*)&addr, sizeof(addr));
xbt_assert(ret != -1, "Impossible to bind socket");
ret = listen(_server_socket, 1);
xbt_assert(ret != -1, "Impossible to listen on socket");
}
void UnixDomainSocket::accept_pending_connection()
{
_client_socket = accept(_server_socket, NULL, NULL);
xbt_assert(_client_socket != -1, "Impossible to accept on socket");
}
string UnixDomainSocket::receive()
{
string msg;
int32_t message_size;
// Let's read the message size first
int ret = read(_client_socket, &message_size, 4);
if (ret != 4)
{
::close(_client_socket);
_client_socket = -1;
throw std::runtime_error("Cannot read on socket. Closed by remote?");
}
printf("ret = %d\n", ret);
xbt_assert(message_size > 0, "Invalid message received (size=%d)", message_size);
msg.resize(message_size);
// Then the message content
ret = read(_client_socket, (void*)msg.data(), message_size);
printf("ret = %d\n", ret);
XBT_CINFO(network, "Received '%s'", msg.c_str());
return msg;
}
void UnixDomainSocket::send(const string & message)
{
int32_t message_size = message.size();
XBT_CINFO(network, "Sending '%s'", message.c_str());
write(_client_socket, &message_size, 4);
write(_client_socket, (void*)message.c_str(), message_size);
}
int main()
{
UnixDomainSocket socket("/tmp/bouh");
socket.accept_pending_connection();
bool ok = true;
string s;
for (int i = 0; ok; ++i)
{
socket.send("Hello " + to_string(i));
s = socket.receive();
printf("Received : '%s'\n", s.c_str());
if (s.empty())
ok = false;
}
return 0;
}
int request_reply_scheduler_process(int argc, char *argv[])
{
(void) argc;
(void) argv;
RequestReplyProcessArguments * args = (RequestReplyProcessArguments *) MSG_process_get_data(MSG_process_self());
BatsimContext * context = args->context;
char sendDateAsString[16];
sprintf(sendDateAsString, "%f", MSG_get_clock());
char *sendBuf = (char*) MSG_process_get_data(MSG_process_self());
XBT_INFO("Buffer received in REQ-REP: '%s'", sendBuf);
context->socket.send(sendBuf);
free(sendBuf);
auto start = chrono::steady_clock::now();
string message_received = context->socket.receive();
auto end = chrono::steady_clock::now();
long long elapsed_microseconds = chrono::duration <double, micro> (end - start).count();
context->microseconds_used_by_scheduler += elapsed_microseconds;
/*
The kind of message that is expected to be received is the following:
0:25.836709|15.000015:J:1=1,2,0,3;2=3|20.836694:N|25.836709:J:2=0,2,1,3
The top-level separator is '|'.
The first part is always 0:MESSAGE_SENDING_DATE
There are then a variable number of parts (> 1). Those parts are called events.
They are composed of the form EVENT_DATE:STAMP[:STAMP_RELATED_CONTENT]. Stamps are 1 char long.
Reading the events from left to right, their event dates must be non-decreasing.
The following stamp exist at the moment :
N : do nothing. Content : none.
J : allocation of a static job (described in the JSON file). Content : jobID1=mID1_1,mID1_2,...,mID1_n[;jobID2=MID2_1,...MID2_n[;...]]
*/
// Let us split the message by '|'.
vector<string> events;
boost::split(events, message_received, boost::is_any_of("|"), boost::token_compress_on);
xbt_assert(events.size() >= 2, "Invalid message received ('%s'): it should be composed of at least 2 parts separated by a '|'", message_received.c_str());
double previousDate = atof(sendDateAsString);
for (const std::string & event_string : events)
{
// Let us split the event by ':'.
vector<string> parts2;
boost::split(parts2, event_string, boost::is_any_of(":"), boost::token_compress_on);
xbt_assert(parts2.size() >= 2, "Invalid event received ('%s'): it should be composed of at least 2 parts separated by a ':'", event_string.c_str());
xbt_assert(parts2[1].size() == 1, "Invalid event received ('%s'): network stamp ('%s') should be of length 1", event_string.c_str(), parts2[1].c_str());
double date = std::stof(parts2[0]);
NetworkStamp received_stamp = (NetworkStamp) parts2[1][0];
xbt_assert(date >= previousDate, "Invalid event received ('%s'): its date (%lf) cannot be before the previous event date (%lf)",
event_string.c_str(), date, previousDate);
// Let us wait until the event occurrence
MSG_process_sleep(std::max(0.0, date - previousDate));
previousDate = date;
switch(received_stamp)
{
case NOP:
{
send_message("server", IPMessageType::SCHED_NOP);
} break; // End of case received_stamp == NOP
case STATIC_JOB_ALLOCATION:
{
xbt_assert(parts2.size() == 3, "Invalid event received ('%s'): static job allocations must be composed of 3 parts separated by ':'",
event_string.c_str());
// Let us create the message which will be sent to the server.
SchedulingAllocationMessage * message = new SchedulingAllocationMessage;
// Since allocations can be done on several jobs at once, let us split the content again, by ';' this time.
vector<string> allocations;
boost::split(allocations, parts2[2], boost::is_any_of(";"), boost::token_compress_on);
for (const std::string & allocation_string : allocations)
{
// Each allocation is written in the form of jobID=machineID1,machineID2,...,machineIDn
vector<string> allocation_parts;
boost::split(allocation_parts, allocation_string, boost::is_any_of("="), boost::token_compress_on);
xbt_assert(allocation_parts.size() == 2, "Invalid static job allocation received ('%s'): it must be composed of two parts separated by a '='",
allocation_string.c_str());
SchedulingAllocation alloc;
alloc.job_id = std::stoi(allocation_parts[0]);
xbt_assert(context->jobs.exists(alloc.job_id), "Invalid static job allocation received ('%s'): the job %d does not exist",
allocation_string.c_str(), alloc.job_id);
Job * job = context->jobs[alloc.job_id];
xbt_assert(job->state == JobState::JOB_STATE_SUBMITTED,
"Invalid static job allocation received ('%s') : the job %d state indicates it cannot be executed now",
allocation_string.c_str(), job->id);
// In order to get the machines, let us do a final split by ','!
vector<string> allocation_machines;
boost::split(allocation_machines, allocation_parts[1], boost::is_any_of(","), boost::token_compress_on);
xbt_assert((int)allocation_machines.size() == job->required_nb_res,
"Invalid static job allocation received ('%s'): the job %d size is %d but %lu machines were allocated",
allocation_string.c_str(), job->id, job->required_nb_res, allocation_machines.size());
alloc.machine_ids.resize(allocation_machines.size());
for (unsigned int i = 0; i < allocation_machines.size(); ++i)
{
int machineID = std::stoi(allocation_machines[i]);
xbt_assert(context->machines.exists(machineID), "Invalid static job allocation received ('%s'): the machine %d does not exist",
allocation_string.c_str(), machineID);
alloc.machine_ids[i] = machineID;
}
// Let us sort the allocation, to detect easily whether all machines are different or not
std::sort(alloc.machine_ids.begin(), alloc.machine_ids.end());
bool all_different = true;
for (unsigned int i = 1; i < alloc.machine_ids.size(); ++i)
{
if (alloc.machine_ids[i-1] == alloc.machine_ids[i])
{
all_different = false;
break;
}
}
xbt_assert(all_different, "Invalid static job allocation received ('%s') : all machines are not different", allocation_string.c_str());
message->allocations.push_back(alloc);
}
send_message("server", IPMessageType::SCHED_ALLOCATION, (void*) message);
} break; // End of case received_stamp == STATIC_JOB_ALLOCATION
default:
{
xbt_die("Invalid event received ('%s') : unhandled network stamp received ('%c')", event_string.c_str(), received_stamp);
}
} // end of job type switch
} // end of events traversal
send_message("server", IPMessageType::SCHED_READY);
delete args;
return 0;
}
int uds_server_process(int argc, char *argv[])
{
(void) argc;
(void) argv;
ServerProcessArguments * args = (ServerProcessArguments *) MSG_process_get_data(MSG_process_self());
BatsimContext * context = args->context;
msg_task_t task_received = NULL;
IPMessage * task_data;
int nb_completed_jobs = 0;
int nb_submitted_jobs = 0;
int nb_scheduled_jobs = 0;
int nb_submitters = 0;
int nb_submitters_finished = 0;
int nb_running_jobs = 0;
bool sched_ready = true;
string send_buffer;
// it may avoid the SG deadlock...
while ((nb_submitters == 0) || (nb_submitters_finished < nb_submitters) ||
(nb_completed_jobs < nb_submitted_jobs) || !sched_ready)
{
// Let's wait a message from a node, a job submitter, the request-reply process...
MSG_task_receive(&(task_received), "server");
task_data = (IPMessage *) MSG_task_get_data(task_received);
XBT_INFO("Server receive Task/message type %s:", ipMessageTypeToString(task_data->type).c_str());
switch (task_data->type)
{
case IPMessageType::SUBMITTER_HELLO:
{
nb_submitters++;
XBT_INFO("New submitter said hello. Number of polite submitters: %d", nb_submitters);
break;
} // end of case SUBMITTER_HELLO
case IPMessageType::SUBMITTER_BYE:
{