Commit 5ab0e2f1 authored by Millian Poquet's avatar Millian Poquet

[code] s4u base (nosched tests pass)

parent 74251f20
......@@ -31,6 +31,7 @@
#include <string>
#include <fstream>
#include <simgrid/s4u.hpp>
#include <simgrid/msg.h>
#include <smpi/smpi.h>
#include <simgrid/plugins/energy.h>
......@@ -595,29 +596,6 @@ void configure_batsim_logging_output(const MainArguments & main_args)
xbt_log_control_set("surf_energy.thresh:critical");
}
void initialize_msg(const MainArguments & main_args, int argc, char * argv[])
{
// Must be initialized before MSG_init
if (main_args.energy_used)
{
sg_host_energy_plugin_init();
}
MSG_init(&argc, argv);
// Setting SimGrid configuration if the SimGrid process tracing is enabled
if (main_args.enable_simgrid_process_tracing)
{
string sg_trace_filename = main_args.export_prefix + "_sg_processes.trace";
MSG_config("tracing", "yes");
MSG_config("tracing/msg/process", "yes");
MSG_config("tracing/uncategorized", "yes");
MSG_config("tracing/filename", sg_trace_filename.c_str());
MSG_config("smpi/privatization", "yes"); // TODO: why here?
}
}
void load_workloads_and_workflows(const MainArguments & main_args, BatsimContext * context, int & max_nb_machines_to_use)
{
int max_nb_machines_in_workloads = -1;
......@@ -678,21 +656,24 @@ void start_initial_simulation_processes(const MainArguments & main_args,
// Let's run a static_job_submitter process for each workload
for (const MainArguments::WorkloadDescription & desc : main_args.workload_descriptions)
{
XBT_DEBUG("Creating a workload_submitter process...");
JobSubmitterProcessArguments * submitter_args = new JobSubmitterProcessArguments;
submitter_args->context = context;
submitter_args->workload_name = desc.name;
string submitter_instance_name = "workload_submitter_" + desc.name;
XBT_DEBUG("Creating a workload_submitter process...");
if (!is_batexec)
{
JobSubmitterProcessArguments * submitter_args = new JobSubmitterProcessArguments;
submitter_args->context = context;
submitter_args->workload_name = desc.name;
MSG_process_create(submitter_instance_name.c_str(), static_job_submitter_process,
(void*) submitter_args, master_machine->host);
}
else
{
MSG_process_create(submitter_instance_name.c_str(), batexec_job_launcher_process,
(void*) submitter_args, master_machine->host);
simgrid::s4u::Actor::create(submitter_instance_name.c_str(),
master_machine->host,
batexec_job_launcher_process,
context, desc.name);
}
XBT_INFO("The process '%s' has been created.", submitter_instance_name.c_str());
}
......@@ -751,8 +732,27 @@ int main(int argc, char * argv[])
// Let's configure how Batsim should be logged
configure_batsim_logging_output(main_args);
// Let's initialize SimGrid
initialize_msg(main_args, argc, argv);
// Initialize the energy plugin before creating the engine
if (main_args.energy_used)
{
sg_host_energy_plugin_init();
}
// Instantiate SimGrid
MSG_init(&argc, argv); // Required for SMPI as I write these lines
simgrid::s4u::Engine engine(&argc, argv);
// Setting SimGrid configuration if the SimGrid process tracing is enabled
if (main_args.enable_simgrid_process_tracing)
{
string sg_trace_filename = main_args.export_prefix + "_sg_processes.trace";
engine.set_config("tracing:1");
engine.set_config("tracing/msg/process:1");
engine.set_config("tracing/uncategorized:1");
engine.set_config("tracing/filename:" + sg_trace_filename);
engine.set_config("smpi/privatization:1"); // TODO: why here?
}
// Let's create the BatsimContext, which stores information about the current instance
BatsimContext context;
......@@ -771,7 +771,7 @@ int main(int argc, char * argv[])
if (!context.smpi_used)
{
XBT_INFO("SMPI will NOT be used.");
MSG_config("host/model", "ptask_L07");
engine.set_config("host/model:ptask_L07");
}
else
{
......@@ -817,7 +817,7 @@ int main(int argc, char * argv[])
}
// Simulation main loop, handled by MSG
msg_error_t res = MSG_main();
engine.run();
delete context.zmq_socket;
context.zmq_socket = nullptr;
......@@ -837,14 +837,7 @@ int main(int argc, char * argv[])
// Let's finalize Batsim's outputs
finalize_batsim_outputs(&context);
if (res == MSG_OK)
{
return 0;
}
else
{
return 1;
}
return 0;
}
void set_configuration(BatsimContext *context,
......
......@@ -119,14 +119,6 @@ void parse_main_args(int argc, char * argv[], MainArguments & main_args,
*/
void configure_batsim_logging_output(const MainArguments & main_args);
/**
* @brief Initializes SimGrid
* @param[in] main_args Batsim arguments
* @param[in] argc The number of arguments given to the main function
* @param[in] argv The values of arguments given to the main function
*/
void initialize_msg(const MainArguments & main_args, int argc, char * argv[]);
/**
* @brief Loads the workloads defined in Batsim arguments
* @param[in] main_args Batsim arguments
......
......@@ -9,6 +9,8 @@
#include <algorithm>
#include <boost/bind.hpp>
#include <simgrid/s4u.hpp>
#include "jobs.hpp"
#include "jobs_execution.hpp"
#include "ipp.hpp"
......@@ -378,14 +380,10 @@ static std::tuple<int,double,double> wait_for_query_answer(string submitter_name
}
int batexec_job_launcher_process(int argc, char *argv[])
void batexec_job_launcher_process(BatsimContext * context,
std::string workload_name)
{
(void) argc;
(void) argv;
JobSubmitterProcessArguments * args = (JobSubmitterProcessArguments *) MSG_process_get_data(MSG_process_self());
BatsimContext * context = args->context;
Workload * workload = context->workloads.at(args->workload_name);
Workload * workload = context->workloads.at(workload_name);
auto & jobs = workload->jobs->jobs();
for (auto & mit : jobs)
......@@ -412,11 +410,9 @@ int batexec_job_launcher_process(int argc, char *argv[])
exec_args->allocation = alloc;
exec_args->notify_server_at_end = false;
string pname = "job" + job->id.to_string();
msg_process_t process = MSG_process_create(pname.c_str(), execute_job_process,
(void*) exec_args,
context->machines[alloc->machine_ids.first_element()]->host);
job->execution_processes.insert(process);
simgrid::s4u::Actor::create(pname.c_str(),
context->machines[alloc->machine_ids.first_element()]->host,
execute_job_process, context, alloc, false, nullptr);
//job->execution_processes.insert(process); TODO S4U
}
return 0;
}
......@@ -5,6 +5,10 @@
#pragma once
#include <string>
struct BatsimContext;
/**
* @brief The process in charge of submitting static jobs (those described before running the simulations)
* @param argc The number of arguments
......@@ -23,8 +27,8 @@ int workflow_submitter_process(int argc, char *argv[]);
/**
* @brief Execute jobs sequentially without server and scheduler
* @param[in] argc The number of arguments
* @param[in] argv The argument values
* @return 0
* @param[in] context The BatsimContext
* @param[in] workload_name The name of the workload attached to the submitter
*/
int batexec_job_launcher_process(int argc, char *argv[]);
void batexec_job_launcher_process(BatsimContext * context,
std::string workload_name);
......@@ -274,7 +274,7 @@ int execute_task(BatTask * btask,
}
MSG_sem_acquire(sem);
free(sem);
MSG_sem_destroy(sem);
return profile->return_code;
}
else
......@@ -400,22 +400,17 @@ BatTask * initialize_sequential_tasks(Job * job, Profile * profile, Profile * io
return task;
}
int execute_job_process(int argc, char *argv[])
void execute_job_process(BatsimContext * context,
SchedulingAllocation * allocation,
bool notify_server_at_end,
Profile * io_profile)
{
(void) argc;
(void) argv;
// Retrieving input parameters
ExecuteJobProcessArguments * args = (ExecuteJobProcessArguments *) MSG_process_get_data(MSG_process_self());
Workload * workload = args->context->workloads.at(args->allocation->job_id.workload_name);
Job * job = workload->jobs->at(args->allocation->job_id);
Workload * workload = context->workloads.at(allocation->job_id.workload_name);
Job * job = workload->jobs->at(allocation->job_id);
Profile * profile = workload->profiles->at(job->profile);
Profile * io_profile = args->io_profile;
SchedulingAllocation * allocation = args->allocation;
job->starting_time = MSG_get_clock();
job->allocation = args->allocation->machine_ids;
job->allocation = allocation->machine_ids;
double remaining_time = (double)job->walltime;
// Create the root task
......@@ -442,7 +437,7 @@ int execute_job_process(int argc, char *argv[])
int machine_id_within_allocated_resources = allocation->mapping[executor_id];
int machine_id = allocation->machine_ids[machine_id_within_allocated_resources];
msg_host_t to_add = args->context->machines[machine_id]->host;
msg_host_t to_add = context->machines[machine_id]->host;
allocation->hosts.push_back(to_add);
}
......@@ -450,28 +445,27 @@ int execute_job_process(int argc, char *argv[])
allocation->io_hosts.reserve(allocation->io_allocation.size());
for (unsigned int id = 0; id < allocation->io_allocation.size(); ++id)
{
allocation->io_hosts.push_back(args->context->machines[id]->host);
allocation->io_hosts.push_back(context->machines[id]->host);
}
// If energy is enabled, let us compute the energy used by the machines before running the job
if (args->context->energy_used)
if (context->energy_used)
{
job->consumed_energy = consumed_energy_on_machines(args->context, job->allocation);
job->consumed_energy = consumed_energy_on_machines(context, job->allocation);
// Let's trace the consumed energy
args->context->energy_tracer.add_job_start(MSG_get_clock(), job->id);
context->energy_tracer.add_job_start(MSG_get_clock(), job->id);
}
// Job computation
args->context->machines.update_machines_on_job_run(job,
args->allocation->machine_ids,
args->context);
context->machines.update_machines_on_job_run(job, allocation->machine_ids,
context);
// Add a cleanup hook on the process
CleanExecuteTaskData * cleanup_data = new CleanExecuteTaskData;
cleanup_data->exec_process_args = args;
MSG_process_on_exit(execute_task_cleanup, cleanup_data);
//cleanup_data->exec_process_args = args; TODO S4U
//MSG_process_on_exit(execute_task_cleanup, cleanup_data); TODO S4U
// Execute the process
job->return_code = execute_task(job->task, args->context, args->allocation,
job->return_code = execute_task(job->task, context, allocation,
cleanup_data, &remaining_time);
if (job->return_code == 0)
{
......@@ -489,15 +483,14 @@ int execute_job_process(int argc, char *argv[])
XBT_INFO("Job %s had been killed (walltime %g reached)",
job->id.to_string().c_str(), (double) job->walltime);
job->state = JobState::JOB_STATE_COMPLETED_WALLTIME_REACHED;
if (args->context->trace_schedule)
if (context->trace_schedule)
{
args->context->paje_tracer.add_job_kill(job, args->allocation->machine_ids,
MSG_get_clock(), true);
context->paje_tracer.add_job_kill(job, allocation->machine_ids,
MSG_get_clock(), true);
}
}
args->context->machines.update_machines_on_job_end(job, args->allocation->machine_ids,
args->context);
context->machines.update_machines_on_job_end(job, allocation->machine_ids, context);
job->runtime = MSG_get_clock() - job->starting_time;
if (job->runtime == 0)
{
......@@ -506,29 +499,28 @@ int execute_job_process(int argc, char *argv[])
}
// If energy is enabled, let us compute the energy used by the machines after running the job
if (args->context->energy_used)
if (context->energy_used)
{
long double consumed_energy_before = job->consumed_energy;
job->consumed_energy = consumed_energy_on_machines(args->context, job->allocation);
job->consumed_energy = consumed_energy_on_machines(context, job->allocation);
// The consumed energy is the difference (consumed_energy_after_job - consumed_energy_before_job)
job->consumed_energy -= job->consumed_energy - consumed_energy_before;
// Let's trace the consumed energy
args->context->energy_tracer.add_job_end(MSG_get_clock(), job->id);
context->energy_tracer.add_job_end(MSG_get_clock(), job->id);
}
if (args->notify_server_at_end)
if (notify_server_at_end)
{
// Let us tell the server that the job completed
JobCompletedMessage * message = new JobCompletedMessage;
message->job_id = args->allocation->job_id;
message->job_id = allocation->job_id;
send_message("server", IPMessageType::JOB_COMPLETED, (void*)message);
}
job->execution_processes.erase(MSG_process_self());
return 0;
//job->execution_processes.erase(MSG_process_self()); TODO S4U
}
int waiter_process(int argc, char *argv[])
......
......@@ -69,11 +69,12 @@ int execute_task_cleanup(void * unknown, void * data);
/**
* @brief The process in charge of executing a job
* @param[in] argc The number of arguments
* @param[in] argv The arguments values
* @return 0
* @param context The BatsimContext
* @param allocation The job allocation
* @param notify_server_at_end Whether a message to the server must be sent after job completion
* @param io_profile The optional IO profile
*/
int execute_job_process(int argc, char *argv[]);
void execute_job_process(BatsimContext *context, SchedulingAllocation *allocation, bool notify_server_at_end, Profile *io_profile);
/**
* @brief The process in charge of waiting for a given amount of time (related to the NOPMeLater message)
......
......@@ -10,6 +10,7 @@
#include <boost/algorithm/string.hpp>
#include <simgrid/msg.h>
#include <simgrid/s4u.hpp>
#include "context.hpp"
#include "ipp.hpp"
......@@ -886,17 +887,11 @@ void server_on_execute_job(ServerData * data,
allocation->machine_ids.to_string_hyphen().c_str());
}
ExecuteJobProcessArguments * exec_args = new ExecuteJobProcessArguments;
exec_args->context = data->context;
exec_args->allocation = allocation;
exec_args->notify_server_at_end = true;
exec_args->io_profile = message->io_profile;
string pname = "job_" + job->id.to_string();
msg_process_t process = MSG_process_create(pname.c_str(), execute_job_process,
(void*)exec_args,
data->context->machines[allocation->machine_ids.first_element()]->host);
job->execution_processes.insert(process);
simgrid::s4u::Actor::create(pname.c_str(),
data->context->machines[allocation->machine_ids.first_element()]->host,
execute_job_process, data->context, allocation, true, message->io_profile);
// job->execution_processes.insert(process); TODO S4U
}
bool is_simumation_finished(const ServerData * data)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment