Une MAJ de sécurité est nécessaire sur notre version actuelle. Elle sera effectuée lundi 02/08 entre 12h30 et 13h. L'interruption de service devrait durer quelques minutes (probablement moins de 5 minutes).

Commit b032a70d authored by Millian Poquet's avatar Millian Poquet
Browse files

Separation of the network and the server in different files (which simplified...

Separation of the network and the server in different files (which simplified the use of XBT_LOG). Minor fixes: missing enumerated cases in string conversion, use of a switch to generate a warning on future enumerated value misses. Minor cosmetic modifications.
parent 54153030
......@@ -24,7 +24,7 @@ include_directories(${RAPIDJSON_INCLUDE_DIRS})
# Executables
add_executable(batsim batsim.cpp export.cpp ipp.cpp job_submitter.cpp jobs.cpp jobs_execution.cpp machines.cpp network.cpp profiles.cpp pstate.cpp workload.cpp context.hpp export.hpp ipp.hpp job_submitter.hpp jobs.hpp jobs_execution.hpp machines.hpp network.hpp profiles.hpp pstate.hpp workload.hpp)
add_executable(batsim batsim.cpp export.cpp ipp.cpp job_submitter.cpp jobs.cpp jobs_execution.cpp machines.cpp network.cpp profiles.cpp pstate.cpp server.cpp workload.cpp context.hpp export.hpp ipp.hpp job_submitter.hpp jobs.hpp jobs_execution.hpp machines.hpp network.hpp profiles.hpp pstate.hpp server.hpp workload.hpp)
#add_executable(batexec batexec.c job.c utils.c export.c)
# Libraries to link
......
......@@ -18,6 +18,7 @@
#include "machines.hpp"
#include "network.hpp"
#include "profiles.hpp"
#include "server.hpp"
#include "workload.hpp"
using namespace std;
......@@ -59,7 +60,7 @@ struct MainArguments
* @param[in, out] state The current argp_state
* @return 0
*/
static int parse_opt (int key, char *arg, struct argp_state *state)
int parse_opt (int key, char *arg, struct argp_state *state)
{
MainArguments * mainArgs = (MainArguments *) state->input;
......@@ -170,11 +171,6 @@ int main(int argc, char * argv[])
if (mainArgs.energy_used)
sg_energy_plugin_init();
/*QUIET,
NETWORK_ONLY,
INFORMATION,
DEBUG */
if (mainArgs.verbosity == VerbosityLevel::QUIET || mainArgs.verbosity == VerbosityLevel::NETWORK_ONLY)
{
xbt_log_control_set("workload.thresh:error");
......
#include "ipp.hpp"
#include <simgrid/msg.h>
#include <map>
using namespace std;
......@@ -23,18 +22,52 @@ void send_message(const std::string & destination_mailbox, IPMessageType type, v
std::string ipMessageTypeToString(IPMessageType type)
{
static map<IPMessageType, string> type_to_string =
string s;
switch(type)
{
{IPMessageType::JOB_SUBMITTED, "JOB_SUBMITTED"},
{IPMessageType::JOB_COMPLETED, "JOB_COMPLETED"},
{IPMessageType::SCHED_ALLOCATION, "SCHED_ALLOCATION"},
{IPMessageType::SCHED_NOP, "SCHED_NOP"},
{IPMessageType::SCHED_READY, "SCHED_READY"},
{IPMessageType::SUBMITTER_HELLO, "SUBMITTER_HELLO"},
{IPMessageType::SUBMITTER_BYE, "SUBMITTER_BYE"},
};
case IPMessageType::JOB_SUBMITTED:
s = "JOB_SUBMITTED";
break;
case IPMessageType::JOB_COMPLETED:
s = "JOB_COMPLETED";
break;
case IPMessageType::PSTATE_MODIFICATION:
s = "PSTATE_MODIFICATION";
break;
case IPMessageType::SCHED_ALLOCATION:
s = "SCHED_ALLOCATION";
break;
case IPMessageType::SCHED_REJECTION:
s = "SCHED_REJECTION";
break;
case IPMessageType::SCHED_NOP:
s = "SCHED_NOP";
break;
case IPMessageType::SCHED_NOP_ME_LATER:
s = "SCHED_NOP_ME_LATER";
break;
case IPMessageType::SCHED_READY:
s = "SCHED_READY";
break;
case IPMessageType::WAITING_DONE:
s = "WAITING_DONE";
break;
case IPMessageType::SUBMITTER_HELLO:
s = "SUBMITTER_HELLO";
break;
case IPMessageType::SUBMITTER_BYE:
s = "SUBMITTER_BYE";
break;
case IPMessageType::SWITCHED_ON:
s = "SWITCHED_ON";
break;
case IPMessageType::SWITCHED_OFF:
s = "SWITCHED_OFF";
break;
}
return type_to_string[type];
return s;
}
void send_message(const char *destination_mailbox, IPMessageType type, void *data)
......
#pragma once
#include "ipp.hpp"
#include "batsim.h"
#include "context.hpp"
int killer_process(int argc, char *argv[]);
......
#include "machines.hpp"
#include <algorithm>
#include <iterator>
#include <map>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/join.hpp>
#include <boost/lexical_cast.hpp>
#include "export.hpp"
#include "context.hpp"
#include "export.hpp"
using namespace std;
......@@ -290,14 +288,28 @@ void Machines::setTracer(PajeTracer *tracer)
string machineStateToString(MachineState state)
{
static const std::map<MachineState,std::string> conv =
string s;
switch (state)
{
{MachineState::SLEEPING, "sleeping"},
{MachineState::IDLE, "idle"},
{MachineState::COMPUTING, "computing"}
};
case MachineState::SLEEPING:
s = "sleeping";
break;
case MachineState::IDLE:
s = "idle";
break;
case MachineState::COMPUTING:
s = "computing";
break;
case MachineState::TRANSITING_FROM_SLEEPING_TO_COMPUTING:
s = "sleeping->computing";
break;
case MachineState::TRANSITING_FROM_COMPUTING_TO_SLEEPING:
s = "computing->sleeping";
break;
}
return conv.at(state);
return s;
}
Machine::~Machine()
......
......@@ -14,10 +14,8 @@
#include "context.hpp"
#include "ipp.hpp"
#include "jobs_execution.hpp"
XBT_LOG_NEW_DEFAULT_CATEGORY(network, "network");
XBT_LOG_NEW_CATEGORY(server, "server");
using namespace std;
......@@ -49,7 +47,7 @@ UnixDomainSocket::~UnixDomainSocket()
void UnixDomainSocket::create_socket(const string & filename)
{
XBT_CINFO(network, "Creating UDS socket on '%s'", filename.c_str());
XBT_INFO("Creating UDS socket on '%s'", filename.c_str());
_server_socket = socket(AF_UNIX, SOCK_STREAM, 0);
xbt_assert(_server_socket != -1, "Impossible to create socket");
......@@ -70,10 +68,10 @@ void UnixDomainSocket::create_socket(const string & filename)
void UnixDomainSocket::accept_pending_connection()
{
XBT_CINFO(network, "Waiting for an incoming connection...");
XBT_INFO("Waiting for an incoming connection...");
_client_socket = accept(_server_socket, NULL, NULL);
xbt_assert(_client_socket != -1, "Impossible to accept on socket");
XBT_CINFO(network, "Connected!");
XBT_INFO("Connected!");
}
string UnixDomainSocket::receive()
......@@ -118,7 +116,7 @@ string UnixDomainSocket::receive()
}
nb_bytes_read += ret;
}
XBT_CINFO(network, "Received '%s'", msg.c_str());
XBT_INFO("Received '%s'", msg.c_str());
return msg;
}
......@@ -126,7 +124,7 @@ string UnixDomainSocket::receive()
void UnixDomainSocket::send(const string & message)
{
int32_t message_size = message.size();
XBT_CINFO(network, "Sending '%s'", message.c_str());
XBT_INFO("Sending '%s'", message.c_str());
write(_client_socket, &message_size, 4);
write(_client_socket, (void*)message.c_str(), message_size);
}
......@@ -143,7 +141,7 @@ int request_reply_scheduler_process(int argc, char *argv[])
sprintf(sendDateAsString, "%f", MSG_get_clock());
char *sendBuf = (char*) args->send_buffer.c_str();
XBT_CDEBUG(server, "Buffer received in REQ-REP: '%s'", sendBuf);
XBT_DEBUG("Buffer received in REQ-REP: '%s'", sendBuf);
context->socket.send(sendBuf);
......@@ -364,291 +362,3 @@ int request_reply_scheduler_process(int argc, char *argv[])
delete args;
return 0;
}
int uds_server_process(int argc, char *argv[])
{
(void) argc;
(void) argv;
ServerProcessArguments * args = (ServerProcessArguments *) MSG_process_get_data(MSG_process_self());
BatsimContext * context = args->context;
int nb_completed_jobs = 0;
int nb_submitted_jobs = 0;
int nb_scheduled_jobs = 0;
int nb_submitters = 0;
int nb_submitters_finished = 0;
int nb_running_jobs = 0;
const int protocol_version = 1;
bool sched_ready = true;
string send_buffer;
// it may avoid the SG deadlock...
while ((nb_submitters == 0) || (nb_submitters_finished < nb_submitters) ||
(nb_completed_jobs < nb_submitted_jobs) || !sched_ready)
{
// Let's wait a message from a node or the request-reply process...
msg_task_t task_received = NULL;
IPMessage * task_data;
MSG_task_receive(&(task_received), "server");
task_data = (IPMessage *) MSG_task_get_data(task_received);
XBT_CINFO(server, "Server received a message of type %s:", ipMessageTypeToString(task_data->type).c_str());
switch (task_data->type)
{
case IPMessageType::SUBMITTER_HELLO:
{
nb_submitters++;
XBT_CINFO(server, "New submitter said hello. Number of polite submitters: %d", nb_submitters);
} break; // end of case SUBMITTER_HELLO
case IPMessageType::SUBMITTER_BYE:
{
nb_submitters_finished++;
XBT_CINFO(server, "A submitted said goodbye. Number of finished submitters: %d", nb_submitters_finished);
} break; // end of case SUBMITTER_BYE
case IPMessageType::JOB_COMPLETED:
{
xbt_assert(task_data->data != nullptr);
JobCompletedMessage * message = (JobCompletedMessage *) task_data->data;
nb_running_jobs--;
xbt_assert(nb_running_jobs >= 0);
nb_completed_jobs++;
Job * job = context->jobs[message->job_id];
XBT_CINFO(server, "Job %d COMPLETED. %d jobs completed so far", job->id, nb_completed_jobs);
send_buffer += '|' + std::to_string(MSG_get_clock()) + ":C:" + std::to_string(job->id);
XBT_CDEBUG(server, "Message to send to scheduler: %s", send_buffer.c_str());
} break; // end of case JOB_COMPLETED
case IPMessageType::JOB_SUBMITTED:
{
xbt_assert(task_data->data != nullptr);
JobSubmittedMessage * message = (JobSubmittedMessage *) task_data->data;
nb_submitted_jobs++;
Job * job = context->jobs[message->job_id];
job->state = JobState::JOB_STATE_SUBMITTED;
XBT_CINFO(server, "Job %d SUBMITTED. %d jobs submitted so far", job->id, nb_submitted_jobs);
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":S:" + std::to_string(job->id);
XBT_CDEBUG(server, "Message to send to scheduler: '%s'", send_buffer.c_str());
} break; // end of case JOB_SUBMITTED
case IPMessageType::SCHED_REJECTION:
{
xbt_assert(task_data->data != nullptr);
JobRejectedMessage * message = (JobRejectedMessage *) task_data->data;
Job * job = context->jobs[message->job_id];
job->state = JobState::JOB_STATE_REJECTED;
nb_completed_jobs++;
XBT_CINFO(server, "Job %d has been rejected", job->id);
} break; // end of case SCHED_REJECTION
case IPMessageType::SCHED_NOP_ME_LATER:
{
xbt_assert(task_data->data != nullptr);
NOPMeLaterMessage * message = (NOPMeLaterMessage *) task_data->data;
WaiterProcessArguments * args = new WaiterProcessArguments;
args->target_time = message->target_time;
string pname = "waiter " + to_string(message->target_time);
MSG_process_create(pname.c_str(), waiter_process, (void*) args, context->machines.masterMachine()->host);
} break; // end of case SCHED_NOP_ME_LATER
case IPMessageType::PSTATE_MODIFICATION:
{
xbt_assert(task_data->data != nullptr);
PStateModificationMessage * message = (PStateModificationMessage *) task_data->data;
Machine * machine = context->machines[message->machine];
int curr_pstate = MSG_host_get_pstate(machine->host);
if (machine->pstates[curr_pstate] == PStateType::COMPUTATION_PSTATE)
{
if (machine->pstates[message->new_pstate] == PStateType::COMPUTATION_PSTATE)
{
MSG_host_set_pstate(machine->host, message->new_pstate);
xbt_assert(MSG_host_get_pstate(machine->host) == message->new_pstate);
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":p:" +
std::to_string(machine->id) + "=" + std::to_string(message->new_pstate);
XBT_CDEBUG(server, "Message to send to scheduler : '%s'", send_buffer.c_str());
}
else if (machine->pstates[message->new_pstate] == PStateType::SLEEP_PSTATE)
{
machine->state = MachineState::TRANSITING_FROM_COMPUTING_TO_SLEEPING;
SwitchPStateProcessArguments * args = new SwitchPStateProcessArguments;
args->context = context;
args->message = new PStateModificationMessage;
args->message->machine = message->machine;
args->message->new_pstate = message->new_pstate;
string pname = "switch ON " + to_string(message->machine);
MSG_process_create(pname.c_str(), switch_off_machine_process, (void*)args, machine->host);
}
else
XBT_CERROR(server, "Switching from a communication pstate to an invalid pstate on machine %d ('%s') : %d -> %d",
machine->id, machine->name.c_str(), curr_pstate, message->new_pstate);
}
else if (machine->pstates[curr_pstate] == PStateType::SLEEP_PSTATE)
{
xbt_assert(machine->pstates[message->new_pstate] == PStateType::COMPUTATION_PSTATE,
"Switching from a sleep pstate to a non-computation pstate on machine %d ('%s') : %d -> %d, which is forbidden",
machine->id, machine->name.c_str(), curr_pstate, message->new_pstate);
machine->state = MachineState::TRANSITING_FROM_SLEEPING_TO_COMPUTING;
SwitchPStateProcessArguments * args = new SwitchPStateProcessArguments;
args->context = context;
args->message = new PStateModificationMessage;
args->message->machine = message->machine;
args->message->new_pstate = message->new_pstate;
string pname = "switch OFF " + to_string(message->machine);
MSG_process_create(pname.c_str(), switch_on_machine_process, (void*)args, machine->host);
}
else
XBT_CERROR(server, "Machine %d ('%s') has an invalid pstate : %d", machine->id, machine->name.c_str(), curr_pstate);
} break; // end of case PSTATE_MODIFICATION
case IPMessageType::SCHED_NOP:
{
XBT_CINFO(server, "Nothing to do received.");
if (nb_running_jobs == 0 && nb_scheduled_jobs < nb_submitted_jobs)
{
XBT_CINFO(server, "Nothing to do whereas no job is running and that they are jobs waiting to be scheduled... This might cause a deadlock!");
// Let us display the available jobs (to help the scheduler debugging)
const std::map<int, Job *> & jobs = context->jobs.jobs();
vector<string> submittedJobs;
for (auto & mit : jobs)
{
if (mit.second->state == JobState::JOB_STATE_SUBMITTED)
submittedJobs.push_back(std::to_string(mit.second->id));
}
string submittedJobsString = boost::algorithm::join(submittedJobs, ", ");
XBT_CINFO(server, "The available jobs are [%s]", submittedJobsString.c_str());
}
} break; // end of case SCHED_NOP
case IPMessageType::SCHED_ALLOCATION:
{
xbt_assert(task_data->data != nullptr);
SchedulingAllocationMessage * message = (SchedulingAllocationMessage *) task_data->data;
for (const auto & allocation : message->allocations)
{
Job * job = context->jobs[allocation.job_id];
job->state = JobState::JOB_STATE_RUNNING;
nb_running_jobs++;
xbt_assert(nb_running_jobs <= nb_submitted_jobs);
nb_scheduled_jobs++;
xbt_assert(nb_scheduled_jobs <= nb_submitted_jobs);
if (context->energy_used)
{
// Check that every machine is in a computation pstate
for (const int & machineID : allocation.machine_ids)
{
Machine * machine = context->machines[machineID];
int ps = MSG_host_get_pstate(machine->host);
xbt_assert(machine->has_pstate(ps));
xbt_assert(machine->pstates[ps] == PStateType::COMPUTATION_PSTATE,
"Invalid job allocation: machine %d ('%s') is not in a computation pstate (ps=%d)",
machine->id, machine->name.c_str(), ps);
xbt_assert(machine->state == MachineState::COMPUTING || machine->state == MachineState::IDLE,
"Invalid job allocation: machine %d ('%s') cannot compute jobs now (the machine is"
" neither computing nor being idle)", machine->id, machine->name.c_str());
}
}
ExecuteJobProcessArguments * exec_args = new ExecuteJobProcessArguments;
exec_args->context = context;
exec_args->allocation = allocation;
string pname = "job" + to_string(job->id);
MSG_process_create(pname.c_str(), execute_job_process, (void*)exec_args, context->machines[allocation.machine_ids[0]]->host);
}
} break; // end of case SCHED_ALLOCATION
case IPMessageType::WAITING_DONE:
{
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":N";
XBT_CDEBUG(server, "Message to send to scheduler: '%s'", send_buffer.c_str());
} break; // end of case WAITING_DONE
case IPMessageType::SCHED_READY:
{
sched_ready = true;
} break; // end of case SCHED_READY
case IPMessageType::SWITCHED_ON:
{
xbt_assert(task_data->data != nullptr);
PStateModificationMessage * message = (PStateModificationMessage *) task_data->data;
xbt_assert(context->machines.exists(message->machine));
Machine * machine = context->machines[message->machine];
xbt_assert(MSG_host_get_pstate(machine->host) == message->new_pstate);
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":p:" +
std::to_string(machine->id) + "=" + std::to_string(message->new_pstate);
XBT_CDEBUG(server, "Message to send to scheduler : '%s'", send_buffer.c_str());
} break; // end of case SWITCHED_ON
case IPMessageType::SWITCHED_OFF:
{
xbt_assert(task_data->data != nullptr);
PStateModificationMessage * message = (PStateModificationMessage *) task_data->data;
xbt_assert(context->machines.exists(message->machine));
Machine * machine = context->machines[message->machine];
xbt_assert(MSG_host_get_pstate(machine->host) == message->new_pstate);
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":p:" +
std::to_string(machine->id) + "=" + std::to_string(message->new_pstate);
XBT_CDEBUG(server, "Message to send to scheduler : '%s'", send_buffer.c_str());
} break; // end of case SWITCHED_ON
} // end of switch
delete task_data;
MSG_task_destroy(task_received);
if (sched_ready && !send_buffer.empty())
{
RequestReplyProcessArguments * req_rep_args = new RequestReplyProcessArguments;
req_rep_args->context = context;
req_rep_args->send_buffer = to_string(protocol_version) + ":" + to_string(MSG_get_clock()) + send_buffer;
send_buffer.clear();
MSG_process_create("Scheduler REQ-REP", request_reply_scheduler_process, (void*)req_rep_args, MSG_host_self());
sched_ready = false;
}
} // end of while
XBT_CINFO(server, "All jobs completed!");
delete args;
return 0;
}
......@@ -35,5 +35,4 @@ private:
int _client_socket;
};
int uds_server_process(int argc, char *argv[]);
int request_reply_scheduler_process(int argc, char *argv[]);
#include "server.hpp"
#include <string>
#include <boost/algorithm/string.hpp>
#include <simgrid/msg.h>
#include "context.hpp"
#include "ipp.hpp"
#include "network.hpp"
#include "jobs_execution.hpp"
XBT_LOG_NEW_DEFAULT_CATEGORY(server, "server");
using namespace std;
int uds_server_process(int argc, char *argv[])
{
(void) argc;
(void) argv;
ServerProcessArguments * args = (ServerProcessArguments *) MSG_process_get_data(MSG_process_self());
BatsimContext * context = args->context;
int nb_completed_jobs = 0;
int nb_submitted_jobs = 0;
int nb_scheduled_jobs = 0;
int nb_submitters = 0;
int nb_submitters_finished = 0;
int nb_running_jobs = 0;
int nb_switching_machines = 0;
const int protocol_version = 1;
bool sched_ready = true;
string send_buffer;
// it may avoid the SG deadlock...
while ((nb_submitters == 0) || (nb_submitters_finished < nb_submitters) ||
(nb_completed_jobs < nb_submitted_jobs) || (!sched_ready) ||
(nb_switching_machines > 0))
{
// Let's wait a message from a node or the request-reply process...
msg_task_t task_received = NULL;
IPMessage * task_data;
MSG_task_receive(&(task_received), "server");
task_data = (IPMessage *) MSG_task_get_data(task_received);
XBT_INFO( "Server received a message of type %s:", ipMessageTypeToString(task_data->type).c_str());
switch (task_data->type)
{
case IPMessageType::SUBMITTER_HELLO:
{
nb_submitters++;
XBT_INFO( "New submitter said hello. Number of polite submitters: %d", nb_submitters);
} break; // end of case SUBMITTER_HELLO
case IPMessageType::SUBMITTER_BYE:
{
nb_submitters_finished++;
XBT_INFO( "A submitted said goodbye. Number of finished submitters: %d", nb_submitters_finished);
} break; // end of case SUBMITTER_BYE
case IPMessageType::JOB_COMPLETED:
{
xbt_assert(task_data->data != nullptr);
JobCompletedMessage * message = (JobCompletedMessage *) task_data->data;
nb_running_jobs--;
xbt_assert(nb_running_jobs >= 0);
nb_completed_jobs++;
Job * job = context->jobs[message->job_id];
XBT_INFO( "Job %d COMPLETED. %d jobs completed so far", job->id, nb_completed_jobs);
send_buffer += '|' + std::to_string(MSG_get_clock()) + ":C:" + std::to_string(job->id);
XBT_DEBUG( "Message to send to scheduler: %s", send_buffer.c_str());
} break; // end of case JOB_COMPLETED
case IPMessageType::JOB_SUBMITTED:
{
xbt_assert(task_data->data != nullptr);
JobSubmittedMessage * message = (JobSubmittedMessage *) task_data->data;
nb_submitted_jobs++;
Job * job = context->jobs[message->job_id];
job->state = JobState::JOB_STATE_SUBMITTED;
XBT_INFO( "Job %d SUBMITTED. %d jobs submitted so far", job->id, nb_submitted_jobs);
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":S:" + std::to_string(job->id);
XBT_DEBUG( "Message to send to scheduler: '%s'", send_buffer.c_str());