Commit 1d60fabc authored by Millian Poquet's avatar Millian Poquet
Browse files

Merge branch 'profile-usage-trace'

parents a6fff164 24770076
......@@ -124,6 +124,9 @@ let
"^workloads/smpi"
"^workloads/smpi/.*"
"^workloads/smpi/.*/.*\.txt"
"^workloads/usage-trace"
"^workloads/usage-trace/.*"
"^workloads/usage-trace/.*/.*\.txt"
"^events"
"^events/.*\.txt"
];
......
......@@ -251,6 +251,8 @@ both the ``from`` and the ``to`` fields.
"to": "nfs"
}
.. _smpi_trace_replay_profile:
SMPI trace replay
^^^^^^^^^^^^^^^^^
Profiles of this type correspond to the replay of a SMPI time-independent trace. Such traces allow to see the fine-grained behavior of MPI applications.
......@@ -283,5 +285,30 @@ Profiles of this type correspond to the replay of a SMPI time-independent trace.
"trace": "smpi/compute_only/traces.txt"
}
Usage trace replay
^^^^^^^^^^^^^^^^^^
Profiles of this type correspond to the replay of usage traces over time.
.. warning::
**This profile requires SimGrid host to have many cores for precision.**
This profile shares the same warnings as :ref:`smpi_trace_replay_profile`.
Each trace file contains a sequence of :math:`(usage, flops)` tuples.
:math:`flops` are executed in sequence on the target host,
using a :math:`usage \in [0,1]` fraction of the host computing load.
The usage replay is based on SimGrid cores on the target host.
For example, if one wants to execute 10 flops with usage=0.1 on a 100-core
host, this will be simulated as a single computation-only parallel task of
size=10, where each executor executes 10 flops.
.. code:: json
{
"type": "usage_trace",
"trace": "usage-trace/from-real-trace/3858728.txt"
}
.. _OAR: https://oar.imag.fr/start
.. _Batsim's initial article: https://hal.archives-ouvertes.fr/hal-01333471
<?xml version='1.0'?>
<!DOCTYPE platform SYSTEM "http://simgrid.gforge.inria.fr/simgrid/simgrid.dtd">
<platform version="4.1">
<zone id="AS0" routing="Full">
<!-- ljlkj -->
<host id="master_host" speed="100.0Mf"/>
<host id="a" core="100" pstate="0" speed="100.0f">
<prop id="wattage_per_state" value="50.0:100:200.0"/>
<prop id="wattage_off" value="5.0"/>
</host>
<host id="b" core="100" pstate="0" speed="100.0f">
<prop id="wattage_per_state" value="50.0:100:200.0"/>
<prop id="wattage_off" value="5.0"/>
</host>
<host id="c" core="100" pstate="0" speed="50.0f">
<prop id="wattage_per_state" value="50.0:90.0:150.0"/>
<prop id="wattage_off" value="5.0"/>
</host>
<host id="d" core="100" pstate="0" speed="50.0f">
<prop id="wattage_per_state" value="50.0:90.0:150.0"/>
<prop id="wattage_off" value="5.0"/>
</host>
<link id="6" bandwidth="41.279125MBps" latency="59.904us"/>
<link id="3" bandwidth="34.285625MBps" latency="514.433us"/>
<link id="7" bandwidth="11.618875MBps" latency="189.98us"/>
<link id="9" bandwidth="7.20975MBps" latency="1.461517ms"/>
<link id="2" bandwidth="118.6825MBps" latency="136.931us"/>
<link id="8" bandwidth="8.158MBps" latency="270.544us"/>
<link id="1" bandwidth="34.285625MBps" latency="514.433us"/>
<link id="4" bandwidth="10.099625MBps" latency="479.78us"/>
<link id="0" bandwidth="41.279125MBps" latency="59.904us"/>
<link id="5" bandwidth="27.94625MBps" latency="278.066us"/>
<link id="loopback" bandwidth="498MBps" latency="15us" sharing_policy="FATPIPE"/>
<route src="master_host" dst="master_host"><link_ctn id="loopback"/></route>
<route src="a" dst="a"><link_ctn id="loopback"/></route>
<route src="b" dst="b"><link_ctn id="loopback"/></route>
<route src="c" dst="c"><link_ctn id="loopback"/></route>
<route src="d" dst="d"><link_ctn id="loopback"/></route>
<route src="master_host" dst="a">
<link_ctn id="9"/>
</route>
<route src="master_host" dst="b">
<link_ctn id="4"/><link_ctn id="3"/><link_ctn id="2"/><link_ctn id="0"/><link_ctn id="1"/><link_ctn id="8"/>
</route>
<route src="master_host" dst="c">
<link_ctn id="4"/><link_ctn id="3"/><link_ctn id="5"/>
</route>
<route src="master_host" dst="d">
<link_ctn id="4"/><link_ctn id="3"/><link_ctn id="2"/><link_ctn id="0"/><link_ctn id="1"/><link_ctn id="6"/><link_ctn id="7"/>
</route>
<route src="a" dst="b">
<link_ctn id="9"/><link_ctn id="4"/><link_ctn id="3"/><link_ctn id="2"/><link_ctn id="0"/><link_ctn id="1"/><link_ctn id="8"/>
</route>
<route src="a" dst="c">
<link_ctn id="9"/><link_ctn id="4"/><link_ctn id="3"/><link_ctn id="5"/>
</route>
<route src="a" dst="d">
<link_ctn id="9"/><link_ctn id="4"/><link_ctn id="3"/><link_ctn id="2"/><link_ctn id="0"/><link_ctn id="1"/><link_ctn id="6"/><link_ctn id="7"/>
</route>
<route src="b" dst="c">
<link_ctn id="8"/><link_ctn id="1"/><link_ctn id="0"/><link_ctn id="2"/><link_ctn id="5"/>
</route>
<route src="b" dst="d">
<link_ctn id="8"/><link_ctn id="6"/><link_ctn id="7"/>
</route>
<route src="c" dst="d">
<link_ctn id="5"/><link_ctn id="2"/><link_ctn id="0"/><link_ctn id="1"/><link_ctn id="6"/><link_ctn id="7"/>
</route>
</zone>
</platform>
......@@ -847,6 +847,9 @@ int main(int argc, char * argv[])
SMPI_init();
}
// Register Batsim replay functions
xbt_replay_action_register("m_usage", usage_trace_replayer);
// Let's create the machines
create_machines(main_args, &context, max_nb_machines_to_use);
......
......@@ -2,6 +2,8 @@
* @file jobs_execution.cpp
* @brief Contains functions related to the execution of the jobs
*/
#include <algorithm>
#include <cmath>
#include <regex>
#include "jobs_execution.hpp"
......@@ -44,6 +46,61 @@ void smpi_replay_process(JobPtr job, SmpiProfileData * profile_data, const std::
}
}
void usage_trace_replayer(simgrid::xbt::ReplayAction & action)
{
double usage = std::stod(action[2]);
double flops = std::stod(action[3]);
xbt_assert(isfinite(usage) && usage >= 0.0 && usage <= 1.0, "invalid usage read: %g not in [0,1]", usage);
xbt_assert(isfinite(flops) && flops >= 0.0, "invalid flops read: %g not positive and finite", flops);
// compute how many cores should be used depending on usage and on which host is used
const double nb_cores = simgrid::s4u::this_actor::get_host()->get_core_count();
const int nb_cores_to_use = std::max(round(usage * nb_cores), 1.0); // use at least 1 core, otherwise using flops is impossible
// generate ptask
std::vector<simgrid::s4u::Host*> hosts_to_use(nb_cores_to_use, simgrid::s4u::this_actor::get_host());
std::vector<double> computation_vector(nb_cores_to_use, flops);
std::vector<double> communication_matrix;
// execute ptask
simgrid::s4u::ExecPtr ptask = simgrid::s4u::this_actor::exec_init(hosts_to_use, computation_vector, communication_matrix);
ptask->start();
ptask->wait();
}
/**
* @brief The actor that replays a usage trace
* @param[in] job The job whose trace is from
* @param[in] data The profile data of the job
* @param[in] termination_mbox_name The mailbox to use to synchronize the job termination
* @param[in] rank The rank of the actor of the job
*/
void usage_trace_replayer_process(JobPtr job, UsageTraceProfileData * data, const std::string & termination_mbox_name, int rank)
{
try
{
// Prepare data for replay_runner
char * str_rank = nullptr;
int ret = asprintf(&str_rank, "%d", rank);
(void) ret; // Avoids a warning if assertions are ignored
xbt_assert(ret != -1, "asprintf failed (not enough memory?)");
XBT_INFO("Replaying rank %d of job %s (usage trace)", rank, job->id.to_cstring());
simgrid::xbt::replay_runner(str_rank, data->trace_filenames[static_cast<size_t>(rank)].c_str());
XBT_INFO("Replaying rank %d of job %s (usage trace) done", rank, job->id.to_cstring());
// Tell parent process that replay has finished for this rank.
auto mbox = simgrid::s4u::Mailbox::by_name(termination_mbox_name);
auto rank_copy = new unsigned int;
*rank_copy = static_cast<unsigned int>(rank);
mbox->put(static_cast<void*>(rank_copy), 4);
}
catch (const simgrid::NetworkFailureException & e)
{
XBT_INFO("Caught a NetworkFailureException caught: %s", e.what());
}
}
int execute_task(BatTask * btask,
BatsimContext *context,
const SchedulingAllocation * allocation,
......@@ -208,11 +265,22 @@ int execute_task(BatTask * btask,
}
return profile->return_code;
}
else if (profile->type == ProfileType::SMPI)
else if (profile->type == ProfileType::SMPI || profile->type == ProfileType::USAGE_TRACE)
{
auto * data = static_cast<SmpiProfileData *>(profile->data);
std::vector<std::string> trace_filenames;
unsigned int nb_ranks = static_cast<unsigned int>(data->trace_filenames.size());
if (profile->type == ProfileType::SMPI)
{
auto * data = static_cast<SmpiProfileData *>(profile->data);
trace_filenames = data->trace_filenames;
}
else
{
auto * data = static_cast<UsageTraceProfileData *>(profile->data);
trace_filenames = data->trace_filenames;
}
unsigned int nb_ranks = static_cast<unsigned int>(trace_filenames.size());
// Let's use the default mapping is none is provided (round-robin on hosts, as we do not
// know the number of cores on each host)
......@@ -241,7 +309,17 @@ int execute_task(BatTask * btask,
{
std::string actor_name = job->id.to_string() + "_" + std::to_string(rank);
simgrid::s4u::Host* host_to_use = allocation->hosts[static_cast<size_t>(job->smpi_ranks_to_hosts_mapping[rank])];
simgrid::s4u::ActorPtr actor = simgrid::s4u::Actor::create(actor_name, host_to_use, smpi_replay_process, job, data, termination_mbox_name, rank);
simgrid::s4u::ActorPtr actor = nullptr;
if (profile->type == ProfileType::SMPI)
{
auto * data = static_cast<SmpiProfileData *>(profile->data);
actor = simgrid::s4u::Actor::create(actor_name, host_to_use, smpi_replay_process, job, data, termination_mbox_name, rank);
}
else
{
auto * data = static_cast<UsageTraceProfileData *>(profile->data);
actor = simgrid::s4u::Actor::create(actor_name, host_to_use, usage_trace_replayer_process, job, data, termination_mbox_name, rank);
}
child_actors[rank] = actor;
job->execution_actors.insert(actor);
}
......
......@@ -8,6 +8,13 @@
#include "ipp.hpp"
#include "context.hpp"
#include <xbt/replay.hpp>
/**
* @brief Replays a usage over time trace action (one line)
* @param[in] action The action to replay
*/
void usage_trace_replayer(simgrid::xbt::ReplayAction & action);
/**
* @brief The process in charge of killing a job if it reaches its walltime
......
......@@ -206,6 +206,15 @@ Profile::~Profile()
d = nullptr;
}
}
else if (type == ProfileType::USAGE_TRACE)
{
auto * d = static_cast<UsageTraceProfileData *>(data);
if (d != nullptr)
{
delete d;
d = nullptr;
}
}
else if (type == ProfileType::SEQUENCE)
{
auto * d = static_cast<SequenceProfileData *>(data);
......@@ -628,11 +637,8 @@ ProfilePtr Profile::from_json(const std::string & profile_name,
}
profile->data = data;
}
else if (profile_type == "smpi")
else if (profile_type == "smpi" || "usage_trace")
{
profile->type = ProfileType::SMPI;
SmpiProfileData * data = new SmpiProfileData;
xbt_assert(json_desc.HasMember("trace"), "%s: profile '%s' has no 'trace' field",
error_prefix.c_str(), profile_name.c_str());
xbt_assert(json_desc["trace"].IsString(), "%s: profile '%s' has a non-string 'trace' field",
......@@ -659,18 +665,32 @@ ProfilePtr Profile::from_json(const std::string & profile_name,
ifstream trace_file(trace_path.string());
xbt_assert(trace_file.is_open(), "Cannot open file '%s'", trace_path.string().c_str());
std::vector<std::string> trace_filenames;
string line;
while (std::getline(trace_file, line))
{
boost::trim_right(line);
fs::path rank_trace_path = trace_path.parent_path().string() + "/" + line;
data->trace_filenames.push_back(rank_trace_path.string());
trace_filenames.push_back(rank_trace_path.string());
}
string filenames = boost::algorithm::join(data->trace_filenames, ", ");
string filenames = boost::algorithm::join(trace_filenames, ", ");
XBT_INFO("Filenames of profile '%s': [%s]", profile_name.c_str(), filenames.c_str());
profile->data = data;
if (profile_type == "smpi")
{
profile->type = ProfileType::SMPI;
auto * data = new SmpiProfileData;
data->trace_filenames = trace_filenames;
profile->data = data;
}
else
{
profile->type = ProfileType::USAGE_TRACE;
auto * data = new UsageTraceProfileData;
data->trace_filenames = trace_filenames;
profile->data = data;
}
}
else
{
......@@ -732,6 +752,9 @@ std::string profile_type_to_string(const ProfileType & type)
case ProfileType::SMPI:
str = "SMPI";
break;
case ProfileType::USAGE_TRACE:
str = "USAGE_TRACE";
break;
case ProfileType::SEQUENCE:
str = "SEQUENCE";
break;
......
......@@ -25,6 +25,7 @@ enum class ProfileType
,PARALLEL_HOMOGENEOUS //!< a homogeneous parallel task that executes the given amounts of computation and communication on every node. Its data is of type ParallelHomogeneousProfileData
,PARALLEL_HOMOGENEOUS_TOTAL_AMOUNT //!< a homogeneous parallel task that spreads the given amounts of computation and communication among all the nodes. Its data is of type ParallelHomogeneousTotalAmountProfileData
,SMPI //!< a SimGrid MPI time-independent trace. Its data is of type SmpiProfileData
,USAGE_TRACE //!< a usage over time trace. Its data is of type UsageTraceData
,SEQUENCE //!< non-atomic: it is composed of a sequence of other profiles
,PARALLEL_HOMOGENEOUS_PFS //!< Read and writes data to a PFS storage nodes. data type ParallelHomogeneousPFSProfileData
,DATA_STAGING //!< for moving data between the pfs hosts. Its data is of type DataStagingProfileData
......@@ -136,6 +137,14 @@ struct SmpiProfileData
std::vector<std::string> trace_filenames; //!< all defined tracefiles
};
/**
* @brief The data associated to USAGE_TRACE profiles
*/
struct UsageTraceProfileData
{
std::vector<std::string> trace_filenames; //!< all defined tracefiles
};
/**
* @brief The data associated to SEQUENCE profiles
*/
......
......@@ -5,6 +5,7 @@ import subprocess
from collections import namedtuple
from os.path import abspath, basename, dirname, realpath
import helper
import pandas as pd
Workload = namedtuple('Workload', ['name', 'filename'])
Platform = namedtuple('Platform', ['name', 'filename'])
......@@ -56,6 +57,7 @@ def pytest_generate_tests(metafunc):
# Platforms
platforms_def = {
"small": "small_platform.xml",
"smallusage": "small_platform_usage_replay.xml",
"cluster512": "cluster512.xml",
"cluster512_pfs": "cluster512_pfs.xml",
"energy128notopo": "energy_platform_homogeneous_no_net_128.xml",
......@@ -86,6 +88,7 @@ def pytest_generate_tests(metafunc):
"smpicollectives": "test_smpi_collectives.json",
"tuto1": "test_case_study1.json",
"tutostencil": "test_tuto_stencil.json",
"usagetrace": "test_usage_trace.json",
"walltime": "test_walltime.json",
"walltimesmpi": "test_walltime_smpi.json",
}
......@@ -96,6 +99,7 @@ def pytest_generate_tests(metafunc):
moldable_perf_degradation_workloads = ['compute1', 'computetot1']
energymini_workloads = ['energymini0', 'energymini50', 'energymini100']
tuto_stencil_workloads = ['tutostencil']
usage_trace_workloads = ['usagetrace']
workflows = ['genome']
# Algorithms
......@@ -142,6 +146,8 @@ def pytest_generate_tests(metafunc):
metafunc.parametrize('cluster_pfs_platform', generate_platforms(platform_dir, platforms_def, ['cluster512_pfs']))
if 'properties_platform' in metafunc.fixturenames:
metafunc.parametrize('properties_platform', generate_platforms(platform_dir, platforms_def, ['properties_platform']))
if 'usage_trace_platform' in metafunc.fixturenames:
metafunc.parametrize('usage_trace_platform', generate_platforms(platform_dir, platforms_def, ['smallusage']))
# Workloads
if 'workload' in metafunc.fixturenames:
......@@ -162,6 +168,8 @@ def pytest_generate_tests(metafunc):
metafunc.parametrize('moldable_perf_degradation_workload', generate_workloads(workload_dir, workloads_def, moldable_perf_degradation_workloads))
if 'energymini_workload' in metafunc.fixturenames:
metafunc.parametrize('energymini_workload', generate_workloads(workload_dir, workloads_def, energymini_workloads))
if 'usage_trace_workload' in metafunc.fixturenames:
metafunc.parametrize('usage_trace_workload', generate_workloads(workload_dir, workloads_def, usage_trace_workloads))
if 'samesubmittime_workload' in metafunc.fixturenames:
metafunc.parametrize('samesubmittime_workload', generate_workloads(workload_dir, workloads_def, ['samesubmittime']))
if 'walltime_workload' in metafunc.fixturenames:
......@@ -233,3 +241,10 @@ def manage_redis_server(request):
print('Killing the spawned redis-server (if any)...')
proc.kill()
request.addfinalizer(on_finalize)
@pytest.fixture(scope="session", autouse=True)
def configure_pandas():
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.expand_frame_repr', False)
pd.set_option('max_colwidth', None)
#!/usr/bin/env python3
'''Usage trace tests.
These tests check that the energy consumption of usage trace profiles are the expected ones.
'''
import pandas as pd
import pytest
from helper import *
fast_speed = 100
fast_widle = 50
fast_wmin = 100
fast_wmax = 200
slow_speed = 50
slow_widle = 50
slow_wmin = 90
slow_wmax = 150
def joule_prediction(time, wattmin, wattmax, usage):
return time*(wattmin + (wattmax-wattmin)*usage)
def check_ok_bool(row):
if int(row['execution_time']) != int(row['expected_execution_time']):
return False
if int(row['consumed_energy']) != int(row['expected_consumed_energy']):
return False
return True
def check_ok(row):
return int(check_ok_bool(row))
def estimate_job_from_real_trace():
traces = [
(0, 0.86, 1186),
(0, 0.64, 469),
(0, 0.79, 456),
(0, 0.84, 4643),
(0, 0.85, 4659),
(0, 0.9, 1000),
(0, 0.83, 3614),
(0, 0.84, 4643),
(0, 0.9, 933),
(0, 0.83, 3759),
(0, 0.89, 1011),
(0, 0.83, 3614),
(0, 0.84, 4571),
(0, 0.91, 923),
(0, 0.84, 3643),
(0, 0.89, 1079),
(0, 0.83, 3614),
(0, 0.85, 4588),
(0, 0.85, 4588),
(0, 0.91, 989),
(0, 0.84, 3643),
(0, 0.85, 4659),
(0, 0.84, 4643),
(0, 0.88, 3068),
(0, 0.79, 1519),
(0, 0.89, 674),
(1, 0.98, 1041),
(1, 0.73, 411),
(1, 0.88, 409),
(1, 0.98, 2755),
(1, 0.88, 1364),
(1, 1.0, 1020),
(1, 0.94, 3383),
(1, 0.99, 2424),
(1, 0.88, 1432),
(1, 0.98, 2694),
(1, 0.89, 1416),
(1, 1, 840),
(1, 0.97, 2041),
(1, 0.89, 1281),
(1, 0.99, 2667),
(1, 0.89, 1416),
(1, 0.99, 2667),
(1, 0.89, 1348),
(1, 1, 840),
(1, 0.98, 1898),
(1, 0.89, 1348),
(1, 0.98, 2816),
(1, 0.88, 1364),
(1, 0.97, 18990),
(1, 0.89, 1348),
(1, 0.99, 2727),
(1, 0.89, 1348),
(1, 1.0, 600),
(2, 0.84, 1429),
(2, 0.64, 469),
(2, 0.86, 3837),
(2, 0.75, 960),
(2, 0.9, 667),
(2, 0.86, 3070),
(2, 0.76, 947),
(2, 0.86, 3698),
(2, 0.76, 947),
(2, 0.86, 3767),
(2, 0.76, 868),
(2, 0.86, 3837),
(2, 0.75, 880),
(2, 0.86, 3698),
(2, 0.75, 960),
(2, 0.86, 3628),
(2, 0.75, 960),
(2, 0.86, 3698),
(2, 0.75, 960),
(2, 0.86, 3767),
(2, 0.75, 960),
(2, 0.85, 3741),
(2, 0.75, 960),
(2, 0.86, 3698),
(2, 0.76, 947),
(2, 0.87, 3724),
(2, 0.75, 960),
(2, 0.87, 3655),
(2, 0.75, 960),
(2, 0.91, 593),
(2, 0.85, 3176),
(2, 0.75, 960),
(2, 0.85, 3741),
(2, 0.76, 947),
(2, 0.88, 682),
(3, 0.87, 1379),
(3, 0.66, 455),
(3, 0.89, 3708),
(3, 0.77, 935),
(3, 0.9, 3600),
(3, 0.78, 923),
(3, 0.89, 3573),
(3, 0.78, 923),
(3, 0.88, 3750),
(3, 0.77, 779),
(3, 0.89, 3708),
(3, 0.78, 846),
(3, 0.89, 3573),
(3, 0.77, 935),
(3, 0.89, 3506),
(3, 0.77, 935),
(3, 0.9, 3533),
(3, 0.77, 935),
(3, 0.89, 3640),
(3, 0.77, 935),
(3, 0.89, 3573),
(3, 0.77, 935),
(3, 0.91, 3495),
(3, 0.78, 923),
(3, 0.9, 3600),
(3, 0.78, 923),
(3, 0.9, 3533),
(3, 0.78, 923),
(3, 0.9, 3600),
(3, 0.78, 923),
(3, 0.89, 3573),
(3, 0.77, 935),
(3, 0.91, 659),
]
traces_df = pd.DataFrame(traces, columns = ['machine_id', 'usage', 'flops'])
# job allocation (rank->machine_type)
machines = [
(0, 'fast'),
(1, 'fast'),
(2, 'slow'),
(3, 'slow'),
]
machines_df = pd.DataFrame(machines, columns = ['machine_id', 'machine_type'])
# parameters of each type of machine
machine_types = [
('fast', fast_speed, fast_widle, fast_wmin, fast_wmax),
('slow', slow_speed, slow_widle, slow_wmin, slow_wmax),
]
machine_types_df = pd.DataFrame(machine_types, columns = ['machine_type', 'speed', 'widle', 'wmin', 'wmax'])
df = pd.merge(traces_df, pd.merge(machines_df, machine_types_df))
df['duration'] = df['flops'] / df['speed']
df['w'] = df['wmin'] + (df['wmax'] - df['wmin']) * df['usage']
df['joules'] = df['w'] * df['duration']
idles = df.groupby([