Commit 58cb51e7 authored by Millian Poquet's avatar Millian Poquet

[code] s4u: remove MSG_process_create + cleanup

parent 5ab0e2f1
......@@ -30,6 +30,7 @@
#include <string>
#include <fstream>
#include <functional>
#include <simgrid/s4u.hpp>
#include <simgrid/msg.h>
......@@ -659,22 +660,16 @@ void start_initial_simulation_processes(const MainArguments & main_args,
string submitter_instance_name = "workload_submitter_" + desc.name;
XBT_DEBUG("Creating a workload_submitter process...");
if (!is_batexec)
auto actor_function = static_job_submitter_process;
if (is_batexec)
{
JobSubmitterProcessArguments * submitter_args = new JobSubmitterProcessArguments;
submitter_args->context = context;
submitter_args->workload_name = desc.name;
MSG_process_create(submitter_instance_name.c_str(), static_job_submitter_process,
(void*) submitter_args, master_machine->host);
}
else
{
simgrid::s4u::Actor::create(submitter_instance_name.c_str(),
master_machine->host,
batexec_job_launcher_process,
context, desc.name);
actor_function = batexec_job_launcher_process;
}
simgrid::s4u::Actor::create(submitter_instance_name.c_str(),
master_machine->host,
actor_function,
context, desc.name);
XBT_INFO("The process '%s' has been created.", submitter_instance_name.c_str());
}
......@@ -682,22 +677,19 @@ void start_initial_simulation_processes(const MainArguments & main_args,
for (const MainArguments::WorkflowDescription & desc : main_args.workflow_descriptions)
{
XBT_DEBUG("Creating a workflow_submitter process...");
WorkflowSubmitterProcessArguments * submitter_args = new WorkflowSubmitterProcessArguments;
submitter_args->context = context;
submitter_args->workflow_name = desc.name;
string submitter_instance_name = "workflow_submitter_" + desc.name;
MSG_process_create(submitter_instance_name.c_str(), workflow_submitter_process, (void*)submitter_args, master_machine->host);
simgrid::s4u::Actor::create(submitter_instance_name.c_str(),
master_machine->host,
workflow_submitter_process,
context, desc.name);
XBT_INFO("The process '%s' has been created.", submitter_instance_name.c_str());
}
if (!is_batexec)
{
XBT_DEBUG("Creating the 'server' process...");
ServerProcessArguments * server_args = new ServerProcessArguments;
server_args->context = context;
MSG_process_create("server", server_process, (void*)server_args, master_machine->host);
simgrid::s4u::Actor::create("server", master_machine->host,
server_process, context);
XBT_INFO("The process 'server' has been created.");
}
}
......
......@@ -252,81 +252,6 @@ struct IPMessage
void * data; //!< The message data (can be NULL if type is in [SCHED_NOP, SUBMITTER_HELLO, SUBMITTER_BYE, SUBMITTER_READY]). Otherwise, it is either a JobSubmittedMessage*, a JobCompletedMessage* or a SchedulingAllocationMessage* according to type.
};
/**
* @brief The arguments of the request_reply_scheduler_process process
*/
struct RequestReplyProcessArguments
{
BatsimContext * context; //!< The BatsimContext
std::string send_buffer; //!< The message to send to the Decision real process
};
/**
* @brief The arguments of the server_process process
*/
struct ServerProcessArguments
{
BatsimContext * context; //!< The BatsimContext
};
/**
* @brief The arguments of the execute_job_process process
*/
struct ExecuteJobProcessArguments
{
BatsimContext * context; //!< The BatsimContext
SchedulingAllocation * allocation; //!< The SchedulingAllocation
bool notify_server_at_end; //!< Whether a message to the server must be sent after job completion
Profile * io_profile; //!< The optional IO profile
};
/**
* @brief The arguments of the switch_on_machine_process and switch_off_machine_process processes
*/
struct SwitchPStateProcessArguments
{
BatsimContext * context; //!< The BatsimContext
int machine_id; //!< The unique number of the machine whose power state should be switched
int new_pstate; //!< The power state into which the machine should be put
};
/**
* @brief The arguments of the static_job_submitter_process process
*/
struct JobSubmitterProcessArguments
{
BatsimContext * context; //!< The BatsimContext
std::string workload_name; //!< The name of the workload the submitter should use
};
/**
* @brief The arguments of the workflow_submitter_process process
*/
struct WorkflowSubmitterProcessArguments
{
BatsimContext * context; //!< The BatsimContext
std::string workflow_name; //!< The name of the workflow the submitter should use
};
/**
* @brief The arguments of the waiter_process process
*/
struct WaiterProcessArguments
{
double target_time; //!< The time at which the waiter should stop waiting
ServerData * server_data; //!< The ServerData. Used to check whether the simulation is finished or not
};
/**
* @brief The arguments of the killer_process process
*/
struct KillerProcessArguments
{
BatsimContext * context; //!< The BatsimContext
std::vector<JobIdentifier> jobs_ids; //!< The ids of the jobs to kill
};
/**
* @brief The arguments of the smpi_replay_process process
*/
......
......@@ -34,21 +34,16 @@ Task* bottom_level_f (Task *child, Task *parent)
using namespace std;
int static_job_submitter_process(int argc, char *argv[])
void static_job_submitter_process(BatsimContext * context,
std::string workload_name)
{
(void) argc;
(void) argv;
JobSubmitterProcessArguments * args = (JobSubmitterProcessArguments *) MSG_process_get_data(MSG_process_self());
BatsimContext * context = args->context;
xbt_assert(context->workloads.exists(args->workload_name),
xbt_assert(context->workloads.exists(workload_name),
"Error: a static_job_submitter_process is in charge of workload '%s', "
"which does not exist", args->workload_name.c_str());
"which does not exist", workload_name.c_str());
Workload * workload = context->workloads.at(args->workload_name);
Workload * workload = context->workloads.at(workload_name);
string submitter_name = args->workload_name + "_submitter";
string submitter_name = workload_name + "_submitter";
/* ░░░░░░░░▄▄▄███░░░░░░░░░░░░░░░░░░░░
░░░▄▄██████████░░░░░░░░░░░░░░░░░░░
......@@ -134,8 +129,6 @@ int static_job_submitter_process(int argc, char *argv[])
bye_msg->is_workflow_submitter = false;
bye_msg->submitter_name = submitter_name;
send_message("server", IPMessageType::SUBMITTER_BYE, (void *) bye_msg);
delete args;
return 0;
}
......@@ -146,27 +139,23 @@ static std::tuple<int,double,double> wait_for_query_answer(string submitter_name
/* Ugly Global */
std::map<std::string, int> task_id_counters;
int workflow_submitter_process(int argc, char *argv[])
void workflow_submitter_process(BatsimContext * context,
std::string workflow_name)
{
(void) argc;
(void) argv;
// Get the workflow
WorkflowSubmitterProcessArguments * args = (WorkflowSubmitterProcessArguments *) MSG_process_get_data(MSG_process_self());
BatsimContext * context = args->context;
xbt_assert(context->workflows.exists(args->workflow_name),
xbt_assert(context->workflows.exists(workflow_name),
"Error: a workflow_job_submitter_process is in charge of workload '%s', "
"which does not exist", args->workflow_name.c_str());
Workflow * workflow = context->workflows.at(args->workflow_name);
"which does not exist", workflow_name.c_str());
Workflow * workflow = context->workflows.at(workflow_name);
int limit = context->workflow_nb_concurrent_jobs_limit;
bool not_limiting = (limit == 0);
int current_nb = 0;
const string submitter_name = args->workflow_name + "_submitter";
const string submitter_name = workflow_name + "_submitter";
XBT_INFO("New Workflow submitter for workflow %s (start time = %lf)!",
args->workflow_name.c_str(),workflow->start_time);
workflow_name.c_str(),workflow->start_time);
/* Initializing my task_id counter */
task_id_counters[workflow->name] = 0;
......@@ -201,7 +190,7 @@ int workflow_submitter_process(int argc, char *argv[])
ready_tasks.erase(ready_tasks.begin() + 0);
/* Send a Job corresponding to the Task Job */
string job_key = submit_workflow_task_as_job(context, args->workflow_name, submitter_name, task);
string job_key = submit_workflow_task_as_job(context, workflow_name, submitter_name, task);
XBT_INFO("Inserting task %s", job_key.c_str());
......@@ -256,10 +245,6 @@ int workflow_submitter_process(int argc, char *argv[])
bye_msg->is_workflow_submitter = true;
bye_msg->submitter_name = submitter_name;
send_message("server", IPMessageType::SUBMITTER_BYE, (void *) bye_msg);
delete args;
return 0;
}
/**
......@@ -405,10 +390,6 @@ void batexec_job_launcher_process(BatsimContext * context,
alloc->hosts.push_back(context->machines[i]->host);
}
ExecuteJobProcessArguments * exec_args = new ExecuteJobProcessArguments;
exec_args->context = context;
exec_args->allocation = alloc;
exec_args->notify_server_at_end = false;
string pname = "job" + job->id.to_string();
simgrid::s4u::Actor::create(pname.c_str(),
context->machines[alloc->machine_ids.first_element()]->host,
......
......@@ -11,19 +11,19 @@ struct BatsimContext;
/**
* @brief The process in charge of submitting static jobs (those described before running the simulations)
* @param argc The number of arguments
* @param argv The argument values
* @return 0
* @param[in] context The BatsimContext
* @param[in] workload_name The name of the workload attached to the submitter
*/
int static_job_submitter_process(int argc, char *argv[]);
void static_job_submitter_process(BatsimContext * context,
std::string workload_name);
/**
* @brief The process in charge of submitting dynamic jobs that are part of a workflow
* @param argc The number of arguments
* @param argv The argument values
* @return 0
* @param[in] context The BatsimContext
* @param[in] workflow_name The name of the workflow attached to the submitter
*/
int workflow_submitter_process(int argc, char *argv[]);
void workflow_submitter_process(BatsimContext * context,
std::string workflow_name);
/**
* @brief Execute jobs sequentially without server and scheduler
......
......@@ -327,6 +327,7 @@ int execute_task_cleanup(void * unknown, void * data)
XBT_DEBUG("before freeing communication amount %p", cleanup_data->communication_amount);
xbt_free(cleanup_data->communication_amount);
/* TODO S4U
if (cleanup_data->exec_process_args != nullptr)
{
XBT_DEBUG("before deleting exec_process_args->allocation %p",
......@@ -335,6 +336,7 @@ int execute_task_cleanup(void * unknown, void * data)
XBT_DEBUG("before deleting exec_process_args %p", cleanup_data->exec_process_args);
delete cleanup_data->exec_process_args;
}
*/
if (cleanup_data->task != nullptr)
{
......@@ -523,35 +525,30 @@ void execute_job_process(BatsimContext * context,
//job->execution_processes.erase(MSG_process_self()); TODO S4U
}
int waiter_process(int argc, char *argv[])
void waiter_process(double target_time, const ServerData * server_data)
{
(void) argc;
(void) argv;
WaiterProcessArguments * args = (WaiterProcessArguments *) MSG_process_get_data(MSG_process_self());
double curr_time = MSG_get_clock();
if (curr_time < args->target_time)
if (curr_time < target_time)
{
double time_to_wait = args->target_time - curr_time;
double time_to_wait = target_time - curr_time;
// Sometimes time_to_wait is so small that it does not affect MSG_process_sleep. The value of 1e-5 have been found on trial-error.
if(time_to_wait < 1e-5)
{
time_to_wait = 1e-5;
}
XBT_INFO("Sleeping %g seconds to reach time %g", time_to_wait, args->target_time);
XBT_INFO("Sleeping %g seconds to reach time %g", time_to_wait, target_time);
MSG_process_sleep(time_to_wait);
XBT_INFO("Sleeping done");
}
else
{
XBT_INFO("Time %g is already reached, skipping sleep", args->target_time);
XBT_INFO("Time %g is already reached, skipping sleep", target_time);
}
if (args->server_data->end_of_simulation_in_send_buffer ||
args->server_data->end_of_simulation_sent ||
args->server_data->end_of_simulation_ack_received)
if (server_data->end_of_simulation_in_send_buffer ||
server_data->end_of_simulation_sent ||
server_data->end_of_simulation_ack_received)
{
XBT_INFO("Simulation have finished. Thus, NOT sending WAITING_DONE to the server.");
}
......@@ -559,27 +556,19 @@ int waiter_process(int argc, char *argv[])
{
send_message("server", IPMessageType::WAITING_DONE);
}
delete args;
return 0;
}
int killer_process(int argc, char *argv[])
void killer_process(BatsimContext * context, std::vector<JobIdentifier> jobs_ids)
{
(void) argc;
(void) argv;
KillerProcessArguments * args = (KillerProcessArguments *) MSG_process_get_data(MSG_process_self());
KillingDoneMessage * message = new KillingDoneMessage;
message->jobs_ids = args->jobs_ids;
message->jobs_ids = jobs_ids;
for (const JobIdentifier & job_id : args->jobs_ids)
for (const JobIdentifier & job_id : jobs_ids)
{
Job * job = args->context->workloads.job_at(job_id);
Profile * profile = args->context->workloads.at(job_id.workload_name)->profiles->at(job->profile);
Job * job = context->workloads.job_at(job_id);
Profile * profile = context->workloads.at(job_id.workload_name)->profiles->at(job->profile);
(void) profile;
xbt_assert(! (job->state == JobState::JOB_STATE_REJECTED ||
......@@ -614,9 +603,7 @@ int killer_process(int argc, char *argv[])
// Let's update the job information
job->state = JobState::JOB_STATE_COMPLETED_KILLED;
args->context->machines.update_machines_on_job_end(job,
job->allocation,
args->context);
context->machines.update_machines_on_job_end(job, job->allocation, context);
job->runtime = (Rational)MSG_get_clock() - job->starting_time;
xbt_assert(job->runtime >= 0, "Negative runtime of killed job '%s' (%g)!",
......@@ -629,22 +616,19 @@ int killer_process(int argc, char *argv[])
}
// If energy is enabled, let us compute the energy used by the machines after running the job
if (args->context->energy_used)
if (context->energy_used)
{
long double consumed_energy_before = job->consumed_energy;
job->consumed_energy = consumed_energy_on_machines(args->context, job->allocation);
job->consumed_energy = consumed_energy_on_machines(context, job->allocation);
// The consumed energy is the difference (consumed_energy_after_job - consumed_energy_before_job)
job->consumed_energy = job->consumed_energy - consumed_energy_before;
// Let's trace the consumed energy
args->context->energy_tracer.add_job_end(MSG_get_clock(), job->id);
context->energy_tracer.add_job_end(MSG_get_clock(), job->id);
}
}
}
send_message("server", IPMessageType::KILLING_DONE, (void*)message);
delete args;
return 0;
}
......@@ -16,17 +16,15 @@ struct CleanExecuteTaskData
{
double * computation_amount = nullptr; //!< The computation amount (may be null)
double * communication_amount = nullptr; //!< The communication amount (may be null)
ExecuteJobProcessArguments * exec_process_args = nullptr; //!< The ExecuteJobProcessArguments
msg_task_t task = nullptr; //!< The task
};
/**
* @brief The process in charge of killing a job if it reaches its walltime
* @param argc The number of arguments
* @param argv The arguments values
* @return 0
* @param[in] context The BatsimContext
* @param[in] jobs_ids The ids of the jobs to kill
*/
int killer_process(int argc, char *argv[]);
void killer_process(BatsimContext *context, std::vector<JobIdentifier> jobs_ids);
/**
* @brief The process in charge of executing a SMPI job
......@@ -78,16 +76,7 @@ void execute_job_process(BatsimContext *context, SchedulingAllocation *allocatio
/**
* @brief The process in charge of waiting for a given amount of time (related to the NOPMeLater message)
* @param[in] argc The number of arguments
* @param[in] argv The arguments values
* @return 0
*/
int waiter_process(int argc, char *argv[]);
/**
* @brief The process in charge of killing a given job
* @param[in] argc The number of arguments
* @param[in] argv The arguments values
* @return 0
* @param[in] target_time The time at which the waiter should stop waiting
* @param[in] server_data The ServerData. Used to check whether the simulation is finished or not
*/
int killer_process(int argc, char *argv[]);
void waiter_process(double target_time, const ServerData *server_data);
......@@ -26,18 +26,12 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(network, "network"); //!< Logging
using namespace std;
int request_reply_scheduler_process(int argc, char *argv[])
void request_reply_scheduler_process(BatsimContext * context, std::string send_buffer)
{
(void) argc;
(void) argv;
RequestReplyProcessArguments * args = (RequestReplyProcessArguments *) MSG_process_get_data(MSG_process_self());
BatsimContext * context = args->context;
XBT_DEBUG("Buffer received in REQ-REP: '%s'", args->send_buffer.c_str());
XBT_DEBUG("Buffer received in REQ-REP: '%s'", send_buffer.c_str());
// Let's make sure the message is sent as UTF-8
string message_to_send = args->send_buffer;
string message_to_send = send_buffer;
// Send the message
XBT_INFO("Sending '%s'", message_to_send.c_str());
......@@ -72,7 +66,4 @@ int request_reply_scheduler_process(int argc, char *argv[])
context->microseconds_used_by_scheduler += elapsed_microseconds;
context->proto_reader->parse_and_apply_message(message_received);
delete args;
return 0;
}
......@@ -32,8 +32,7 @@ enum NetworkStamp : char
/**
* @brief The process in charge of doing a Request-Reply iteration with the Decision real process
* @details This process sends a message to the Decision real process (Request) then waits for the answered message (Reply)
* @param[in] argc The number of arguments
* @param[in] argv The arguments' values
* @return 0
* @param[in] context The BatsimContext
* @param[in] send_buffer The message to send to the Decision real process
*/
int request_reply_scheduler_process(int argc, char *argv[]);
void request_reply_scheduler_process(BatsimContext *context, std::string send_buffer);
......@@ -14,24 +14,16 @@ using namespace std;
XBT_LOG_NEW_DEFAULT_CATEGORY(pstate, "pstate"); //!< Logging
int switch_on_machine_process(int argc, char *argv[])
void switch_on_machine_process(BatsimContext *context, int machine_id, int new_pstate)
{
(void) argc;
(void) argv;
SwitchPStateProcessArguments * args = (SwitchPStateProcessArguments *) MSG_process_get_data(MSG_process_self());
int machineID = args->machine_id;
int pstate = args->new_pstate;
xbt_assert(args->context->machines.exists(machineID));
Machine * machine = args->context->machines[machineID];
xbt_assert(context->machines.exists(machine_id));
Machine * machine = context->machines[machine_id];
xbt_assert(machine->host == MSG_process_get_host(MSG_process_self()));
xbt_assert(machine->state == MachineState::TRANSITING_FROM_SLEEPING_TO_COMPUTING);
xbt_assert(machine->jobs_being_computed.empty());
xbt_assert(machine->has_pstate(pstate));
xbt_assert(machine->pstates[pstate] == PStateType::COMPUTATION_PSTATE);
xbt_assert(machine->has_pstate(new_pstate));
xbt_assert(machine->pstates[new_pstate] == PStateType::COMPUTATION_PSTATE);
int current_pstate = MSG_host_get_pstate(machine->host);
int on_ps = machine->sleep_pstates[current_pstate]->switch_on_virtual_pstate;
......@@ -55,41 +47,30 @@ int switch_on_machine_process(int argc, char *argv[])
MSG_task_destroy(bootup);
XBT_INFO("1 flop has been computed. Switching machine %d ('%s') to computing pstate %d",
machine->id, machine->name.c_str(), pstate);
MSG_host_set_pstate(machine->host, pstate);
machine->id, machine->name.c_str(), new_pstate);
MSG_host_set_pstate(machine->host, new_pstate);
//args->context->pstate_tracer.add_pstate_change(MSG_get_clock(), machine->id, pstate);
machine->update_machine_state(MachineState::IDLE);
SwitchMessage * msg = new SwitchMessage;
msg->machine_id = args->machine_id;
msg->new_pstate = args->new_pstate;
msg->machine_id = machine_id;
msg->new_pstate = new_pstate;
send_message("server", IPMessageType::SWITCHED_ON, (void *) msg);
delete args;
return 0;
}
int switch_off_machine_process(int argc, char *argv[])
void switch_off_machine_process(BatsimContext * context, int machine_id, int new_pstate)
{
(void) argc;
(void) argv;
SwitchPStateProcessArguments * args = (SwitchPStateProcessArguments *) MSG_process_get_data(MSG_process_self());
int machineID = args->machine_id;
int pstate = args->new_pstate;
xbt_assert(args->context->machines.exists(machineID));
Machine * machine = args->context->machines[machineID];
xbt_assert(context->machines.exists(machine_id));
Machine * machine = context->machines[machine_id];
xbt_assert(machine->host == MSG_process_get_host(MSG_process_self()));
xbt_assert(machine->state == MachineState::TRANSITING_FROM_COMPUTING_TO_SLEEPING);
xbt_assert(machine->jobs_being_computed.empty());
xbt_assert(machine->has_pstate(pstate));
xbt_assert(machine->pstates[pstate] == PStateType::SLEEP_PSTATE);
xbt_assert(machine->has_pstate(new_pstate));
xbt_assert(machine->pstates[new_pstate] == PStateType::SLEEP_PSTATE);
int off_ps = machine->sleep_pstates[pstate]->switch_off_virtual_pstate;
int off_ps = machine->sleep_pstates[new_pstate]->switch_off_virtual_pstate;
XBT_INFO("Switching machine %d ('%s') OFF. Passing in virtual pstate %d to do so", machine->id,
machine->name.c_str(), off_ps);
......@@ -110,19 +91,16 @@ int switch_off_machine_process(int argc, char *argv[])
MSG_task_destroy(shutdown);
XBT_INFO("1 flop has been computed. Switching machine %d ('%s') to sleeping pstate %d",
machine->id, machine->name.c_str(), pstate);
MSG_host_set_pstate(machine->host, pstate);
machine->id, machine->name.c_str(), new_pstate);
MSG_host_set_pstate(machine->host, new_pstate);
//args->context->pstate_tracer.add_pstate_change(MSG_get_clock(), machine->id, pstate);
machine->update_machine_state(MachineState::SLEEPING);
SwitchMessage * msg = new SwitchMessage;
msg->machine_id = args->machine_id;
msg->new_pstate = args->new_pstate;
msg->machine_id = machine_id;
msg->new_pstate = new_pstate;
send_message("server", IPMessageType::SWITCHED_OFF, (void *) msg);
delete args;
return 0;
}
void CurrentSwitches::add_switch(const MachineRange &machines, int target_pstate)
......
......@@ -90,17 +90,17 @@ private:
/**
* @brief Process used to switch ON a machine (transition from a sleep power state to a computation one)
* @param[in] argc The number of arguments
* @param[in] argv The arguments' values
* @return 0
* @param[in] context The BatsimContext
* @param[in] machine_id The unique number of the machine whose power state should be switched
* @param[in] new_pstate The power state into which the machine should be put
*/
int switch_on_machine_process(int argc, char * argv[]);
void switch_on_machine_process(BatsimContext *context, int machine_id, int new_pstate);
/**
* @brief Process used to switch OFF a machine (transition from a computation power state to a sleep one)
* @param[in] argc The number of arguments
* @param[in] argv The arguments' values
* @return 0
* @param[in] context The BatsimContext
* @param[in] machine_id The unique number of the machine whose power state should be switched
* @param[in] new_pstate The power state into which the machine should be put
*/
int switch_off_machine_process(int argc, char * argv[]);
void switch_off_machine_process(BatsimContext *context, int machine_id, int new_pstate);
......@@ -21,14 +21,8 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(server, "server"); //!< Logging
using namespace std;
int server_process(int argc, char *argv[])
void server_process(BatsimContext * context)
{
(void) argc;
(void) argv;
ServerProcessArguments * args = (ServerProcessArguments *) MSG_process_get_data(MSG_process_self());
BatsimContext * context = args->context;
ServerData * data = new ServerData;
data->context = context;
......@@ -40,13 +34,12 @@ int server_process(int argc, char *argv[])
context->allow_time_sharing,
MSG_get_clock());
RequestReplyProcessArguments * req_rep_args = new RequestReplyProcessArguments;
req_rep_args->context = context;
req_rep_args->send_buffer = context->proto_writer->generate_current_message(MSG_get_clock());
string send_buffer = context->proto_writer->generate_current_message(MSG_get_clock());
context->proto_writer->clear();
MSG_process_create("Scheduler REQ-REP", request_reply_scheduler_process,
(void*)req_rep_args, MSG_host_self());
simgrid::s4u::Actor::create("Scheduler REQ-REP", simgrid::s4u::this_actor::get_host(),
request_reply_scheduler_process,
context, send_buffer);
data->sched_ready = false;
// Let's prepare a handler map to react on events
......@@ -114,12 +107,12 @@ int server_process(int argc, char *argv[])
!data->end_of_simulation_ack_received // The simulation must NOT be finished
)
{
RequestReplyProcessArguments * req_rep_args = new RequestReplyProcessArguments;
req_rep_args->context = context;
req_rep_args->send_buffer = context->proto_writer->generate_current_message(MSG_get_clock());
string send_buffer = context->proto_writer->generate_current_message(MSG_get_clock());
context->proto_writer->clear();
MSG_process_create("Scheduler REQ-REP", request_reply_scheduler_process, (void*)req_rep_args, MSG_host_self());
simgrid::s4u::Actor::create("Scheduler REQ-REP", simgrid::s4u::this_actor::get_host(),
request_reply_scheduler_process,
context, send_buffer);
data->sched_ready = false;
if (data->end_of_simulation_in_send_buffer)
......@@ -151,8 +144,6 @@ int server_process(int argc, char *argv[])
xbt_assert(data->nb_completed_jobs == data->nb_submitted_jobs, "All submitted jobs have not been completed (either executed and finished, or rejected).");
delete data;
delete args;
return 0;
}
void server_on_submitter_hello(ServerData * data,
......@@ -358,13 +349,10 @@ void server_on_pstate_modification(ServerData * data,
else if (machine->pstates[message->new_pstate] == PStateType::SLEEP_PSTATE)
{
machine->update_machine_state(MachineState::TRANSITING_FROM_COMPUTING_TO_SLEEPING);
SwitchPStateProcessArguments * args = new SwitchPStateProcessArguments;
args->context = data->context;
args->machine_id = machine_id;
args->new_pstate = message->new_pstate;
string pname = "switch ON " + to_string(machine_id);
MSG_process_create(pname.c_str(), switch_off_machine_process, (void*)args, machine->host);
simgrid::s4u::Actor::create(pname.c_str(), machine->host, switch_off_machine_process,
data->context, machine_id, message->new_pstate);
++data->nb_switching_machines;
}
......@@ -381,13 +369,10 @@ void server_on_pstate_modification(ServerData * data,
machine->id, machine->name.c_str(), curr_pstate, message->new_pstate);
machine->update_machine_state(MachineState::TRANSITING_FROM_SLEEPING_TO_COMPUTING);
SwitchPStateProcessArguments * args = new SwitchPStateProcessArguments;
args->context = data->context;
args->machine_id = machine_id;
args->new_pstate = message->new_pstate;
string pname = "switch OFF " + to_string(machine_id);
MSG_process_create(pname.c_str(), switch_on_machine_process, (void*)args, machine->host);
simgrid::s4u::Actor::create(pname.c_str(), machine->host, switch_on_machine_process,
data->context, machine_id, message->new_pstate);
++data->nb_switching_machines;
}
......@@ -767,8 +752,7 @@ void server_on_kill_jobs(ServerData * data,
xbt_assert(task_data->data != nullptr);
KillJobMessage * message = (KillJobMessage *) task_data->data;
KillerProcessArguments * args = new KillerProcessArguments;