Commit f2deb09a authored by MERCIER Michael's avatar MERCIER Michael

[code] Make additional_io_job works

parent 4c246c94
......@@ -556,7 +556,7 @@ void configure_batsim_logging_output(const MainArguments & main_args)
{
vector<string> log_categories_to_set = {"workload", "job_submitter", "redis", "jobs", "machines", "pstate",
"workflow", "jobs_execution", "server", "export", "profiles", "machine_range",
"network", "ipp"};
"network", "ipp", "task_execution"};
string log_threshold_to_set = "critical";
if (main_args.verbosity == VerbosityLevel::QUIET || main_args.verbosity == VerbosityLevel::NETWORK_ONLY)
......
......@@ -252,6 +252,13 @@ MachineRange & difference(const MachineRange & one, const MachineRange &other)
return *result;
}
MachineRange & union_itvs(const MachineRange & one, const MachineRange &other)
{
MachineRange * result = new MachineRange();
result->set = one.set + other.set;
return *result;
}
MachineRange & MachineRange::operator&=(const MachineRange & other)
{
set &= other.set;
......
......@@ -237,3 +237,5 @@ public:
MachineRange & difference(const MachineRange & one, const MachineRange & other);
MachineRange & intersection(const MachineRange & one, const MachineRange & other);
MachineRange & union_itvs(const MachineRange & one, const MachineRange & other);
......@@ -951,31 +951,36 @@ void JsonProtocolReader::handle_execute_job(int event_number,
xbt_assert(io_job_value["profile_name"].IsString(), "Invalid JSON message: Invalid 'profile_name' of event %d (EXECUTE_JOB): should be a string", event_number);
string profile_name = io_job_value["profile_name"].GetString();
Profile * io_profile;
if (io_job_value.HasMember("profile"))
{
const Value & profile_object = io_job_value["profile"];
xbt_assert(profile_object.IsObject(), "Invalid JSON message: in event %d (EXECUTE_JOB): ['data']['profile'] should be an object", event_number);
StringBuffer buffer;
::Writer<rapidjson::StringBuffer> writer(buffer);
profile_object.Accept(writer);
string additional_io_job_profile_description = string(buffer.GetString(), buffer.GetSize());
// create the io_profile
io_profile = Profile::from_json(profile_name,
additional_io_job_profile_description,
"Invalid JSON profile received from the scheduler for the 'additional_io_job'");
// Add it to the wokload
workload->profiles->add_profile(profile_name, io_profile);
if (workload->profiles->exists(profile_name))
{
XBT_WARN("The given profile name '%s' already exists: ignore the new one", profile_name.c_str());
}
else
{
const Value & profile_object = io_job_value["profile"];
xbt_assert(profile_object.IsObject(), "Invalid JSON message: in event %d (EXECUTE_JOB): ['data']['profile'] should be an object", event_number);
StringBuffer buffer;
::Writer<rapidjson::StringBuffer> writer(buffer);
profile_object.Accept(writer);
string additional_io_job_profile_description = string(buffer.GetString(), buffer.GetSize());
// create the io_profile
Profile * new_io_profile = Profile::from_json(profile_name,
additional_io_job_profile_description,
"Invalid JSON profile received from the scheduler for the 'additional_io_job'");
// Add it to the wokload
workload->profiles->add_profile(profile_name, new_io_profile);
}
}
else // profile should already be submitted
{
xbt_assert(workload->profiles->exists(profile_name),
// get the profile
xbt_assert(workload->profiles->exists(profile_name),
"The given profile name '%d' does not exists");
io_profile = workload->profiles->at(profile_name);
}
Profile * io_profile = workload->profiles->at(profile_name);
// manage sequence profile special case
if (io_profile->type == ProfileType::SEQUENCE)
......
......@@ -892,7 +892,7 @@ void server_on_execute_job(ServerData * data,
// Also generate io hosts list if any
allocation->io_hosts.reserve(allocation->io_allocation.size());
for (unsigned int id = 0; id < allocation->io_hosts.size(); ++id)
for (unsigned int id = 0; id < allocation->io_allocation.size(); ++id)
{
allocation->io_hosts.push_back(data->context->machines[id]->host);
}
......
......@@ -321,17 +321,24 @@ void print_matrices(double * computation_vector, double * communication_matrix,
{
string comp = "";
string comm = "";
int k = 0;
for (unsigned int i=0; i < nb_res; i++)
{
comp += to_string(computation_vector[i]) + ", ";
for (unsigned int j=0; j < nb_res; j++)
if (computation_vector != nullptr)
{
comp += to_string(computation_vector[i]) + ", ";
}
if (communication_matrix != nullptr)
{
comm += to_string(communication_matrix[i+j]) + ", ";
for (unsigned int j = 0; j < nb_res; j++)
{
comm += to_string(communication_matrix[k++]) + ", ";
}
comm += "\n";
}
comm += "\n";
}
XBT_INFO("Generated matrices: \nCompute: \n%s\nComm:\n%s", comp.c_str(), comm.c_str());
XBT_DEBUG("Generated matrices: \nCompute: \n%s\nComm:\n%s", comp.c_str(), comm.c_str());
}
/**
* }
......@@ -407,7 +414,11 @@ int execute_msg_task(BatTask * btask,
double* computation_vector = nullptr;
double* communication_matrix = nullptr;
XBT_INFO("Generating comm/compute matrix for job");
string task_name = profile_type_to_string(profile->type) + '_' + btask->parent_job->id.to_string() +
"_" + btask->profile->name;
XBT_DEBUG("Generating comm/compute matrix for task '%s' with allocation %s",
task_name.c_str(), allocation->machine_ids.to_string_hyphen().c_str());
generate_matices_from_profile(computation_vector,
communication_matrix,
hosts_to_use,
......@@ -422,7 +433,9 @@ int execute_msg_task(BatTask * btask,
double* io_computation_vector = nullptr;
double* io_communication_matrix = nullptr;
XBT_INFO("Generating comm/compute matrix for IO");
XBT_DEBUG("Generating comm/compute matrix for IO with alloaction: %s",
allocation->io_allocation.to_string_hyphen().c_str());
generate_matices_from_profile(io_computation_vector,
io_communication_matrix,
allocation->io_hosts,
......@@ -434,13 +447,11 @@ int execute_msg_task(BatTask * btask,
MachineRange immut_job_alloc = difference(allocation->machine_ids, allocation->io_allocation);
MachineRange immut_io_alloc = difference(allocation->io_allocation, allocation->machine_ids);
MachineRange to_merge_alloc = intersection(allocation->machine_ids, allocation->io_allocation);
MachineRange new_alloc;
new_alloc &= immut_job_alloc;
new_alloc &= immut_io_alloc;
new_alloc &= to_merge_alloc;
MachineRange new_alloc = union_itvs(allocation->machine_ids, allocation->io_allocation);
// FIXME this does not work for profiles that changes the number of hosts: where the allocation and the host to use
// are different
XBT_DEBUG("Job+IO allocation: %s", new_alloc.to_string_hyphen().c_str());
//Generate the new list of hosts
vector<msg_host_t> new_hosts_to_use;
......@@ -463,20 +474,25 @@ int execute_msg_task(BatTask * btask,
int row_job_host_index = 0;
int col_io_host_index = 0;
int row_io_host_index = 0;
bool col_only_in_job, col_only_in_io = false;
bool col_only_in_job;
bool col_only_in_io;
for (unsigned int col = 0; col < nb_res; ++col)
{
col_only_in_job = false;
col_only_in_io = false;
int curr_machine = new_alloc[col];
XBT_DEBUG("Current machine in generation: %d");
// Fill computation vector
if (immut_job_alloc.contains(new_alloc[col]) and immut_io_alloc.contains(new_alloc[col]))
if (to_merge_alloc.contains(curr_machine))
{
new_computation_vector[col] = computation_vector[col_job_host_index++] + io_computation_vector[col_io_host_index++];
}
else if (immut_job_alloc.contains(new_alloc[col]))
else if (immut_job_alloc.contains(curr_machine))
{
new_computation_vector[col] = computation_vector[col_job_host_index++];
col_only_in_job = true;
}
else if (immut_io_alloc.contains(new_alloc[col]))
else if (immut_io_alloc.contains(curr_machine))
{
new_computation_vector[col] = io_computation_vector[col_io_host_index++];
col_only_in_io = true;
......@@ -489,7 +505,7 @@ int execute_msg_task(BatTask * btask,
// Fill communication matrix with merged values
for (unsigned int row = 0; row < nb_res; ++row)
{
if (immut_job_alloc.contains(new_alloc[row]) and immut_io_alloc.contains(new_alloc[row]))
if (to_merge_alloc.contains(new_alloc[row]))
{
if (col_only_in_job){
new_communication_matrix[k] = communication_matrix[row_job_host_index++];
......@@ -533,16 +549,12 @@ int execute_msg_task(BatTask * btask,
computation_vector = new_computation_vector;
hosts_to_use = new_hosts_to_use;
// TODO Free old job and io structures
}
XBT_INFO("Merged Job+IO matrices");
print_matrices(computation_vector, communication_matrix, hosts_to_use.size());
XBT_DEBUG("Merged Job+IO matrices");
print_matrices(computation_vector, communication_matrix, hosts_to_use.size());
}
// Create the MSG task
string task_name = profile_type_to_string(profile->type) + '_' + btask->parent_job->id.to_string() +
"_" + btask->profile->name;
XBT_DEBUG("Creating MSG task '%s' on %d resources", task_name.c_str(), hosts_to_use.size());
msg_task_t ptask = MSG_parallel_task_create(task_name.c_str(), hosts_to_use.size(),
hosts_to_use.data(), computation_vector,
communication_matrix, NULL);
......@@ -557,13 +569,13 @@ int execute_msg_task(BatTask * btask,
msg_error_t err;
if (*remaining_time < 0)
{
XBT_INFO("Executing task '%s' without walltime", MSG_task_get_name(ptask));
XBT_DEBUG("Executing task '%s' without walltime", MSG_task_get_name(ptask));
err = MSG_parallel_task_execute(ptask);
}
else
{
double time_before_execute = MSG_get_clock();
XBT_INFO("Executing task '%s' with walltime of %g", MSG_task_get_name(ptask), *remaining_time);
XBT_DEBUG("Executing task '%s' with walltime of %g", MSG_task_get_name(ptask), *remaining_time);
err = MSG_parallel_task_execute_with_timeout(ptask, *remaining_time);
*remaining_time = *remaining_time - (MSG_get_clock() - time_before_execute);
}
......@@ -582,7 +594,7 @@ int execute_msg_task(BatTask * btask,
xbt_die("A task execution had been stopped by an unhandled way (err = %d)", err);
}
XBT_INFO("Task '%s' finished", MSG_task_get_name(ptask));
XBT_DEBUG("Task '%s' finished", MSG_task_get_name(ptask));
MSG_task_destroy(ptask);
// The task has been executed, the data does need to be freed in the cleanup function anymore
......@@ -591,4 +603,3 @@ int execute_msg_task(BatTask * btask,
return ret;
}
......@@ -6,51 +6,51 @@
"nb_res": 4,
"jobs": [
{"id":1, "subtime":10, "walltime": 100, "res": 4, "profile": "2"},
{"id":"toto2", "subtime":20, "walltime": 100, "res": 4, "profile": "1"},
{"id":21, "subtime":20, "walltime": 100, "res": 4, "profile": "21"},
{"id":22, "subtime":20, "walltime": 100, "res": 4, "profile": "22"},
{"id":3, "subtime":30, "walltime": 3, "res": 4, "profile": "1"},
{"id":4, "subtime":32, "walltime": 100, "res": 4, "profile": "3"},
{"id":5, "subtime":15, "walltime": 30, "res": 1, "profile": "4"},
{"id":6, "subtime":15, "walltime": 30, "res": 2, "profile": "5"},
{"id":7, "subtime":15, "walltime": 30, "res": 4, "profile": "5"}
{"id":"delay", "subtime":0, "walltime": 30, "res": 1, "profile": "delay"},
{"id":"simple", "subtime":1, "walltime": 100, "res": 4, "profile": "simple"},
{"id":"reach_walltime", "subtime":30, "walltime": 1, "res": 4, "profile": "simple"},
{"id":"homo", "subtime":10, "walltime": 100, "res": 4, "profile": "homogeneous"},
{"id":"homo_no_cpu", "subtime":20, "walltime": 100, "res": 4, "profile": "homogeneous_no_cpu"},
{"id":"homo_no_com", "subtime":20, "walltime": 100, "res": 4, "profile": "homogeneous_no_com"},
{"id":"seq", "subtime":32, "walltime": 100, "res": 4, "profile": "sequence"},
{"id":"2_resources", "subtime":15, "walltime": 30, "res": 2, "profile": "homogeneous_total"},
{"id":"4_resources", "subtime":15, "walltime": 30, "res": 4, "profile": "homogeneous_total"}
],
"profiles": {
"1": {
"simple": {
"type": "msg_par",
"cpu": [5e6,5e6,5e6,5e6],
"com": [5e6,5e6,5e6,5e6,
5e6,5e6,5e6,5e6,
5e6,5e6,5e6,5e6,
5e6,5e6,5e6,5e6]
"cpu": [5e6, 0, 0, 0],
"com": [5e6, 0, 0, 0,
5e6,5e6, 0, 0,
5e6,5e6, 0, 0,
5e6,5e6,5e6, 0]
},
"2": {
"homogeneous": {
"type": "msg_par_hg",
"cpu": 10e6,
"com": 1e6
},
"21": {
"homogeneous_no_cpu": {
"type": "msg_par_hg",
"cpu": 0,
"com": 1e6
},
"22": {
"homogeneous_no_com": {
"type": "msg_par_hg",
"cpu": 2e5,
"com": 0
},
"3": {
"sequence": {
"type": "composed",
"nb" : 4,
"seq": ["1","2","1"]
"seq": ["simple","homogeneous","simple"]
},
"4": {
"delay": {
"type": "delay",
"delay": 20.20
},
"5": {
"homogeneous_total": {
"type": "msg_par_hg_tot",
"cpu": 10e6,
"com": 1e6
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment