Commit 3beec90a authored by Millian Poquet's avatar Millian Poquet
Browse files

code: usage trace profile initial implementation

parent a6fff164
......@@ -2,6 +2,8 @@
* @file jobs_execution.cpp
* @brief Contains functions related to the execution of the jobs
*/
#include <algorithm>
#include <cmath>
#include <regex>
#include "jobs_execution.hpp"
......@@ -11,6 +13,7 @@
#include <simgrid/s4u.hpp>
#include <simgrid/plugins/energy.h>
#include <xbt/replay.hpp>
#include <smpi/smpi.h>
......@@ -44,6 +47,54 @@ void smpi_replay_process(JobPtr job, SmpiProfileData * profile_data, const std::
}
}
void usage_trace_replayer(simgrid::xbt::ReplayAction & action)
{
double usage = std::stod(action[2]);
double flops = std::stod(action[3]);
xbt_assert(isfinite(usage) && usage >= 0.0 && usage <= 1.0, "invalid usage read: %g not in [0,1]", usage);
xbt_assert(isfinite(flops) && flops >= 0.0, "invalid flops read: %g not positive and finite", flops);
// compute how many cores should be used depending on usage and on which host is used
const double nb_cores = simgrid::s4u::this_actor::get_host()->get_core_count();
const int nb_cores_to_use = std::max(round(usage * nb_cores), 1.0); // use at least 1 core, otherwise using flops is impossible
// generate ptask
std::vector<simgrid::s4u::Host*> hosts_to_use(nb_cores_to_use, simgrid::s4u::this_actor::get_host());
std::vector<double> computation_vector(nb_cores_to_use, flops);
std::vector<double> communication_matrix;
// execute ptask
simgrid::s4u::ExecPtr ptask = simgrid::s4u::this_actor::exec_init(hosts_to_use, computation_vector, communication_matrix);
ptask->start();
ptask->wait();
}
void usage_trace_replayer_process(JobPtr job, UsageTraceProfileData * data, const std::string & termination_mbox_name, int rank)
{
try
{
// Prepare data for replay_runner
char * str_rank = nullptr;
int ret = asprintf(&str_rank, "%d", rank);
(void) ret; // Avoids a warning if assertions are ignored
xbt_assert(ret != -1, "asprintf failed (not enough memory?)");
XBT_INFO("Replaying rank %d of job %s (usage trace)", rank, job->id.to_cstring());
simgrid::xbt::replay_runner(str_rank, data->trace_filenames[static_cast<size_t>(rank)].c_str());
XBT_INFO("Replaying rank %d of job %s (usage trace) done", rank, job->id.to_cstring());
// Tell parent process that replay has finished for this rank.
auto mbox = simgrid::s4u::Mailbox::by_name(termination_mbox_name);
auto rank_copy = new unsigned int;
*rank_copy = static_cast<unsigned int>(rank);
mbox->put(static_cast<void*>(rank_copy), 4);
}
catch (const simgrid::NetworkFailureException & e)
{
XBT_INFO("Caught a NetworkFailureException caught: %s", e.what());
}
}
int execute_task(BatTask * btask,
BatsimContext *context,
const SchedulingAllocation * allocation,
......@@ -208,11 +259,22 @@ int execute_task(BatTask * btask,
}
return profile->return_code;
}
else if (profile->type == ProfileType::SMPI)
else if (profile->type == ProfileType::SMPI || profile->type == ProfileType::USAGE_TRACE)
{
auto * data = static_cast<SmpiProfileData *>(profile->data);
std::vector<std::string> trace_filenames;
if (profile->type == ProfileType::SMPI)
{
auto * data = static_cast<SmpiProfileData *>(profile->data);
trace_filenames = data->trace_filenames;
}
else
{
auto * data = static_cast<UsageTraceProfileData *>(profile->data);
trace_filenames = data->trace_filenames;
}
unsigned int nb_ranks = static_cast<unsigned int>(data->trace_filenames.size());
unsigned int nb_ranks = static_cast<unsigned int>(trace_filenames.size());
// Let's use the default mapping is none is provided (round-robin on hosts, as we do not
// know the number of cores on each host)
......@@ -241,7 +303,17 @@ int execute_task(BatTask * btask,
{
std::string actor_name = job->id.to_string() + "_" + std::to_string(rank);
simgrid::s4u::Host* host_to_use = allocation->hosts[static_cast<size_t>(job->smpi_ranks_to_hosts_mapping[rank])];
simgrid::s4u::ActorPtr actor = simgrid::s4u::Actor::create(actor_name, host_to_use, smpi_replay_process, job, data, termination_mbox_name, rank);
simgrid::s4u::ActorPtr actor = nullptr;
if (profile->type == ProfileType::SMPI)
{
auto * data = static_cast<SmpiProfileData *>(profile->data);
actor = simgrid::s4u::Actor::create(actor_name, host_to_use, smpi_replay_process, job, data, termination_mbox_name, rank);
}
else
{
auto * data = static_cast<UsageTraceProfileData *>(profile->data);
actor = simgrid::s4u::Actor::create(actor_name, host_to_use, usage_trace_replayer_process, job, data, termination_mbox_name, rank);
}
child_actors[rank] = actor;
job->execution_actors.insert(actor);
}
......
......@@ -206,6 +206,15 @@ Profile::~Profile()
d = nullptr;
}
}
else if (type == ProfileType::USAGE_TRACE)
{
auto * d = static_cast<UsageTraceProfileData *>(data);
if (d != nullptr)
{
delete d;
d = nullptr;
}
}
else if (type == ProfileType::SEQUENCE)
{
auto * d = static_cast<SequenceProfileData *>(data);
......@@ -628,11 +637,8 @@ ProfilePtr Profile::from_json(const std::string & profile_name,
}
profile->data = data;
}
else if (profile_type == "smpi")
else if (profile_type == "smpi" || "usage_trace")
{
profile->type = ProfileType::SMPI;
SmpiProfileData * data = new SmpiProfileData;
xbt_assert(json_desc.HasMember("trace"), "%s: profile '%s' has no 'trace' field",
error_prefix.c_str(), profile_name.c_str());
xbt_assert(json_desc["trace"].IsString(), "%s: profile '%s' has a non-string 'trace' field",
......@@ -659,18 +665,32 @@ ProfilePtr Profile::from_json(const std::string & profile_name,
ifstream trace_file(trace_path.string());
xbt_assert(trace_file.is_open(), "Cannot open file '%s'", trace_path.string().c_str());
std::vector<std::string> trace_filenames;
string line;
while (std::getline(trace_file, line))
{
boost::trim_right(line);
fs::path rank_trace_path = trace_path.parent_path().string() + "/" + line;
data->trace_filenames.push_back(rank_trace_path.string());
trace_filenames.push_back(rank_trace_path.string());
}
string filenames = boost::algorithm::join(data->trace_filenames, ", ");
string filenames = boost::algorithm::join(trace_filenames, ", ");
XBT_INFO("Filenames of profile '%s': [%s]", profile_name.c_str(), filenames.c_str());
profile->data = data;
if (profile_type == "smpi")
{
profile->type = ProfileType::SMPI;
auto * data = new SmpiProfileData;
data->trace_filenames = trace_filenames;
profile->data = data;
}
else
{
profile->type = ProfileType::USAGE_TRACE;
auto * data = new UsageTraceProfileData;
data->trace_filenames = trace_filenames;
profile->data = data;
}
}
else
{
......@@ -732,6 +752,9 @@ std::string profile_type_to_string(const ProfileType & type)
case ProfileType::SMPI:
str = "SMPI";
break;
case ProfileType::USAGE_TRACE:
str = "USAGE_TRACE";
break;
case ProfileType::SEQUENCE:
str = "SEQUENCE";
break;
......
......@@ -25,6 +25,7 @@ enum class ProfileType
,PARALLEL_HOMOGENEOUS //!< a homogeneous parallel task that executes the given amounts of computation and communication on every node. Its data is of type ParallelHomogeneousProfileData
,PARALLEL_HOMOGENEOUS_TOTAL_AMOUNT //!< a homogeneous parallel task that spreads the given amounts of computation and communication among all the nodes. Its data is of type ParallelHomogeneousTotalAmountProfileData
,SMPI //!< a SimGrid MPI time-independent trace. Its data is of type SmpiProfileData
,USAGE_TRACE //!< a usage over time trace. Its data is of type UsageTraceData
,SEQUENCE //!< non-atomic: it is composed of a sequence of other profiles
,PARALLEL_HOMOGENEOUS_PFS //!< Read and writes data to a PFS storage nodes. data type ParallelHomogeneousPFSProfileData
,DATA_STAGING //!< for moving data between the pfs hosts. Its data is of type DataStagingProfileData
......@@ -136,6 +137,11 @@ struct SmpiProfileData
std::vector<std::string> trace_filenames; //!< all defined tracefiles
};
struct UsageTraceProfileData
{
std::vector<std::string> trace_filenames; //!< all defined tracefiles
};
/**
* @brief The data associated to SEQUENCE profiles
*/
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment