Commit 807fbecd authored by Millian Poquet's avatar Millian Poquet

CMakeLists has been updated. Added some files which were missing in previous...

CMakeLists has been updated. Added some files which were missing in previous commits. Checked workload validity. Lots of TODO have been done: updating the export.hpp documentation, better displays...
parent 0e6412a4
......@@ -19,16 +19,16 @@ set(CMAKE_BUILD_TYPE Debug)
# message(STATUS "The compiler ${CMAKE_C_COMPILER} has no C11 nor C99 support. Please update your C compiler.")
#endif()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Wextra")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -std=c++14")
add_executable(batsim batsim.c job.c utils.c export.c machines.c)
add_executable(batexec batexec.c job.c utils.c export.c)
add_executable(batsim batsim.cpp export.cpp ipp.cpp job_submitter.cpp jobs.cpp jobs_execution.cpp machines.cpp network.cpp profiles.cpp workload.cpp)
#add_executable(batexec batexec.c job.c utils.c export.c)
### Add definitions for compile
target_link_libraries(batsim simgrid jansson m)
target_link_libraries(batexec simgrid jansson m)
target_link_libraries(batsim simgrid boost_system boost_filesystem)
#target_link_libraries(batexec simgrid jansson m)
## Add intall target
INSTALL(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/batsim
${CMAKE_CURRENT_BINARY_DIR}/batexec
#${CMAKE_CURRENT_BINARY_DIR}/batexec
DESTINATION bin)
This diff is collapsed.
......@@ -15,9 +15,12 @@
#include "machines.hpp"
#include "network.hpp"
#include "profiles.hpp"
#include "workload.hpp"
using namespace std;
XBT_LOG_NEW_DEFAULT_CATEGORY(batsim, "batsim");
/**
* @brief The main function arguments (a.k.a. program arguments)
*/
......@@ -123,17 +126,23 @@ int main(int argc, char * argv[])
MSG_init(&argc, argv);
BatsimContext context;
context.jobs.load_from_json(mainArgs.workloadFilename);
context.profiles.load_from_json(mainArgs.workloadFilename);
load_json_workload(&context, mainArgs.workloadFilename);
context.jobs.setProfiles(&context.profiles);
context.tracer.setFilename(mainArgs.exportPrefix + "_schedule.trace");
// TODO: check jobs & profile validity
//context.jobs.displayDebug();
XBT_INFO("Checking whether SMPI is used or not...");
bool smpi_used = context.jobs.containsSMPIJob();
if (!smpi_used)
{
XBT_INFO("SMPI will NOT be used.");
MSG_config("host/model", "ptask_L07");
}
else
XBT_INFO("SMPI will be used.");
XBT_INFO("Creating the machines...");
MSG_create_environment(mainArgs.platformFilename.c_str());
xbt_dynar_t hosts = MSG_hosts_as_dynar();
......@@ -142,20 +151,24 @@ int main(int argc, char * argv[])
const Machine * masterMachine = context.machines.masterMachine();
context.machines.setTracer(&context.tracer);
context.tracer.initialize(&context, MSG_get_clock());
//context.machines.displayDebug();
XBT_INFO("Machines created successfully. There are %d computing machines.", context.machines.machines().size());
// Socket
context.socket.create_socket(mainArgs.socketFilename);
context.socket.accept_pending_connection();
// Main processes running
XBT_INFO("Creating jobs_submitter process...");
JobSubmitterProcessArguments * submitterArgs = new JobSubmitterProcessArguments;
submitterArgs->context = &context;
MSG_process_create("jobs_submitter", job_submitter_process, (void*)submitterArgs, masterMachine->host);
XBT_INFO("The jobs_submitter process has been created.");
XBT_INFO("Creating the uds_server process...");
ServerProcessArguments * serverArgs = new ServerProcessArguments;
serverArgs->context = &context;
MSG_process_create("server", uds_server_process, (void*)serverArgs, masterMachine->host);
XBT_INFO("The uds_server process has been created.");
msg_error_t res = MSG_main();
......
......@@ -296,6 +296,7 @@ void PajeTracer::finalize(BatsimContext * context, double time)
void PajeTracer::addJobLaunching(int jobID, const std::vector<int> & usedMachineIDs, double time)
{
(void) jobID;
xbt_assert(state == INITIALIZED, "Bad addJobLaunching call: the PajeTracer object is not initialized or had been finalized");
const int bufSize = 64;
......@@ -507,7 +508,7 @@ void PajeTracer::hsvToRgb(double h, double s, double v, double & r, double & g,
void exportJobsToCSV(const char *filename)
{
(void) filename;
}
/* TODO
void exportJobsToCSV(const char *filename)
......
......@@ -84,7 +84,7 @@ public:
/**
* @brief Initializes a PajeTracer.
* @details This function must be called once before adding job launchings, runnings or endings.
* TODO UPDATE
* @param context The Batsim context
* @param machines The machines
*/
void initialize(BatsimContext * context, double time);
......@@ -92,8 +92,7 @@ public:
/**
* @brief Finalizes a PajeTracer.
* @details This function must be called before the PajeTracer's object destruction.
* @param machines The machines
* TODO UPDATE
* @param context The Batsim context
* @param time The simulation time at which the finalization is done
*/
void finalize(BatsimContext * context, double time);
......@@ -101,8 +100,8 @@ public:
/**
* @brief Adds a job launch in the file trace.
* @details Please note that this method can only be called when the PajeTracer object has been initialized and had not been finalized yet.
* @param job The job
* TODO UPDATE
* @param jobID The job unique number
* @param usedMachineIDs The machines which compute the job
* @param time The simulation time at which the addition is done
*/
void addJobLaunching(int jobID, const std::vector<int> & usedMachineIDs, double time);
......@@ -114,8 +113,7 @@ public:
/**
* @brief Adds a job run in the file trace.
* @details Please note that this method can only be called when the PajeTracer object has been initialized and had not been finalized yet.
* @param job The job
* TODO UPDATE
* @param jobID The job unique number
* @param time The simulation time at which the addition is done
*/
void addJobRunning(int jobID, const std::vector<int> & usedMachineIDs, double time);
......@@ -123,18 +121,19 @@ public:
/**
* @brief Adds a job end in the file trace.
* @details Please note that this method can only be called when the PajeTracer object has been initialized and had not been finalized yet.
* @param job The job
* TODO UPDATE
* @param time The simulation time at which the addition is done
* @param jobID The job unique number
* @param usedMachineIDs The machines which compute the job
* @param time The simulation time at which the kill is done
*/
void addJobEnding(int jobID, const std::vector<int> & usedMachineIDs, double time);
/**
* @brief Adds a job kill in the file trace.
* @details Please note that this method can only be called when the PajeTracer object has been initialized and had not been finalized yet.
* @param job The job that have been killed
* TODO UPDATE
* @param jobID The job unique number
* @param usedMachineIDs The machines which compute the job
* @param time The simulation time at which the kill is done
* @param associateKillToMachines By default (false), one event is added in the killer container. If set to true, one event is added for every machine on which the kill occurs.
*/
void addJobKill(int jobID, const std::vector<int> & usedMachineIDs, double time, bool associateKillToMachines = false);
......@@ -185,7 +184,7 @@ private:
const char * mstateWaiting = "w";
const char * mstateLaunching = "l";
const char * varGlobalUtilization = "vgu";
//const char * varGlobalUtilization = "vgu";
const char * root = "root";
const char * scheduler = "sc";
......
/* Copyright (c) 2015. The OAR Team.
* All rights reserved. */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <smpi/smpi.h>
#include "job.h"
#include "utils.h"
XBT_LOG_NEW_DEFAULT_CATEGORY(task, "task");
//! The input data of a killerDelay
typedef struct s_killer_delay_data
{
msg_task_t task; //! The task that will be cancelled if the walltime is reached
double walltime; //! The number of seconds to wait before cancelling the task
} KillerDelayData;
/**
* @brief The function used to kill jobs which exceed their walltime
* @param[in] argc The number of input arguments
* @param[in] argv The input arguments
* @return 0
*/
int killerDelay(int argc, char *argv[])
{
(void) argc;
(void) argv;
KillerDelayData * data = MSG_process_get_data(MSG_process_self());
// The sleep can either stop normally (res=MSG_OK) or be cancelled when the task execution completed (res=MSG_TASK_CANCELED)
msg_error_t res = MSG_process_sleep(data->walltime);
if (res == MSG_OK)
{
// If we had time to sleep until walltime (res=MSG_OK), the task execution is not over and must be cancelled
XBT_INFO("Cancelling task '%s'", MSG_task_get_name(data->task));
MSG_task_cancel(data->task);
}
free(data);
return 0;
}
int smpi_replay(int argc, char *argv[])
{
//just to verify given argv
int index;
for(index = 0; index < argc; index++) {
printf("The %d is %s\n",index,argv[index]);
}
smpi_replay_run(&argc, &argv);
//printf("smpi_rank %d\n", smpi_comm_rank(MPI_));
return 0;
}
/**
* @brief Executes the profile of a given job
* @param[in] profile_str The name of the profile to execute
* @param[in] job_id The job number
* @param[in] nb_res The number of resources on which the job will be executed
* @param[in] job_res The resources on which the job will be executed
* @param[in,out] remainingTime The time remaining to execute the full profile
* @return 1 if the profile had been executed in time, 0 if the walltime had been reached
*/
int profile_exec(const char *profile_str, int job_id, int nb_res, msg_host_t *job_res, double * remainingTime)
{
double *computation_amount;
double *communication_amount;
msg_task_t ptask;
profile_t profile = xbt_dict_get(profiles, profile_str);
if (strcmp(profile->type, "msg_par") == 0)
{
// These amounts are deallocated by SG
computation_amount = xbt_new(double, nb_res);
communication_amount = xbt_new(double, nb_res * nb_res);
double *cpu = ((s_msg_par_t *)(profile->data))->cpu;
double *com = ((s_msg_par_t *)(profile->data))->com;
memcpy(computation_amount , cpu, nb_res * sizeof(double));
memcpy(communication_amount, com, nb_res * nb_res * sizeof(double));
char * tname = NULL;
int ret = asprintf(&tname, "p %d", job_id);
xbt_assert(ret != -1, "asprintf failed (not enough memory?)");
XBT_INFO("Creating task '%s'", tname);
ptask = MSG_parallel_task_create(tname,
nb_res, job_res,
computation_amount,
communication_amount, NULL);
free(tname);
// Let's spawn a process which will wait until walltime and cancel the task if needed
KillerDelayData * killData = xbt_new(KillerDelayData, 1);
killData->task = ptask;
killData->walltime = *remainingTime;
msg_process_t killProcess = MSG_process_create("killer", killerDelay, killData, MSG_host_self());
double timeBeforeExecute = MSG_get_clock();
XBT_INFO("Executing task '%s'", MSG_task_get_name(ptask));
msg_error_t err = MSG_parallel_task_execute(ptask);
*remainingTime = *remainingTime - (MSG_get_clock() - timeBeforeExecute);
ret = 1;
if (err == MSG_OK)
SIMIX_process_throw(killProcess, cancel_error, 0, "wake up");
else if (err == MSG_TASK_CANCELED)
ret = 0;
else
xbt_die("A task execution had been stopped by an unhandled way (err = %d)", err);
XBT_INFO("Task '%s' finished", MSG_task_get_name(ptask));
MSG_task_destroy(ptask);
return ret;
}
else if (strcmp(profile->type, "msg_par_hg") == 0)
{
double cpu = ((s_msg_par_hg_t *)(profile->data))->cpu;
double com = ((s_msg_par_hg_t *)(profile->data))->com;
// These amounts are deallocated by SG
computation_amount = xbt_new(double, nb_res);
communication_amount = xbt_new(double, nb_res * nb_res);
//XBT_DEBUG("msg_par_hg: nb_res: %d , cpu: %f , com: %f", nb_res, cpu, com);
for (int i = 0; i < nb_res; i++)
computation_amount[i] = cpu;
for (int j = 0; j < nb_res; j++)
for (int i = 0; i < nb_res; i++)
communication_amount[nb_res * j + i] = com;
char * tname = NULL;
int ret = asprintf(&tname, "hg %d", job_id);
xbt_assert(ret != -1, "asprintf failed (not enough memory?)");
XBT_INFO("Creating task '%s'", tname);
ptask = MSG_parallel_task_create(tname,
nb_res, job_res,
computation_amount,
communication_amount, NULL);
free(tname);
// Let's spawn a process which will wait until walltime and cancel the task if needed
KillerDelayData * killData = xbt_new(KillerDelayData, 1);
killData->task = ptask;
killData->walltime = *remainingTime;
msg_process_t killProcess = MSG_process_create("killer", killerDelay, killData, MSG_host_self());
double timeBeforeExecute = MSG_get_clock();
XBT_INFO("Executing task '%s'", MSG_task_get_name(ptask));
msg_error_t err = MSG_parallel_task_execute(ptask);
*remainingTime = *remainingTime - (MSG_get_clock() - timeBeforeExecute);
ret = 1;
if (err == MSG_OK)
SIMIX_process_throw(killProcess, cancel_error, 0, "wake up");
else if (err == MSG_TASK_CANCELED)
ret = 0;
else
xbt_die("A task execution had been stopped by an unhandled way (err = %d)", err);
XBT_INFO("Task '%s' finished", MSG_task_get_name(ptask));
MSG_task_destroy(ptask);
return ret;
}
else if (strcmp(profile->type, "composed") == 0)
{
s_composed_prof_t * data = (s_composed_prof_t *) profile->data;
int nb = data->nb;
int lg_seq = data->lg_seq;
char **seq = data->seq;
for (int i = 0; i < nb; i++)
for (int j = 0; j < lg_seq; j++)
if (profile_exec(seq[j], job_id, nb_res, job_res, remainingTime) == 0)
return 0;
}
else if (strcmp(profile->type, "delay") == 0)
{
double delay = ((s_delay_t *)(profile->data))->delay;
if (delay < *remainingTime)
{
XBT_INFO("Sleeping the whole task length");
MSG_process_sleep(delay);
XBT_INFO("Sleeping done");
return 1;
}
else
{
XBT_INFO("Sleeping until walltime");
MSG_process_sleep(*remainingTime);
XBT_INFO("Walltime reached");
*remainingTime = 0;
return 0;
}
}
else if (strcmp(profile->type, "smpi") == 0)
{
xbt_dynar_t traceFilenamesDynar = ((s_smpi_t *)(profile->data))->trace_filenames_dynar;
for (int i = 0; i < nb_res; i++)
{
char *str_instance_id = NULL;
int ret = asprintf(&str_instance_id, "%d", job_id);
xbt_assert(ret != -1, "asprintf failed (not enough memory?)");
char *str_rank_id = NULL;
ret = asprintf(&str_rank_id, "%d", i);
xbt_assert(ret != -1, "asprintf failed (not enough memory?)");
char *str_pname = NULL;
ret = asprintf(&str_pname, "%d_%d", job_id, i);
xbt_assert(ret != -1, "asprintf failed (not enough memory?)");
char **argv = xbt_new(char*, 5);
argv[0] = xbt_strdup("1"); // Fonction_replay_label (can be ignored, for log only),
argv[1] = str_instance_id; // Instance Id (application) job_id is used
argv[2] = str_rank_id; // Rank Id
argv[3] = xbt_dynar_get_as(traceFilenamesDynar, i, char *);
argv[4] = xbt_strdup("0"); //
MSG_process_create_with_arguments(str_pname, smpi_replay, NULL, job_res[i], 5, argv );
free(str_pname);
}
}
else
xbt_die("Cannot execute job %d: the profile type '%s' is unknown", job_id, profile->type);
return 0;
}
int job_exec(int job_id, int nb_res, int *res_idxs, msg_host_t *nodes, double walltime)
{
s_job_t * job = jobFromJobID(job_id);
//XBT_INFO("job_exec: jobID %d, job=%p", job_id, job);
msg_host_t * job_res = (msg_host_t *) xbt_new(msg_host_t, nb_res);
for(int i = 0; i < nb_res; i++)
job_res[i] = nodes[res_idxs[i]];
int ret = profile_exec(job->profile, job_id, nb_res, job_res, &walltime);
free(job_res);
return ret;
}
bool register_smpi_app_instances()
{
bool smpi_used = false;
s_job_t *job;
profile_t profile;
unsigned int job_index;
int nb_traces;
xbt_dynar_foreach(jobs_dynar, job_index, job)
{
profile = xbt_dict_get(profiles,job->profile);
if (strcmp(profile->type, "smpi") == 0)
{
smpi_used = true;
nb_traces = ((s_smpi_t *)(profile->data))->nb_traces;
if ( nb_traces != job->nb_res)
xbt_die("SMPI nb_traces (%d) and nb_res (%d) for job (%d) are not equal\n", nb_traces, job->nb_res, job->id);
SMPI_app_instance_register(job->id_str, smpi_replay, nb_traces);
XBT_INFO("register SMPI app instance %s with nb_traces %d", job->id_str, nb_traces);
}
}
if (smpi_used)
SMPI_init();
return(smpi_used);
}
#include "job_submitter.hpp"
#include <vector>
#include <algorithm>
#include "jobs.hpp"
#include "ipp.hpp"
#include "context.hpp"
using namespace std;
int job_submitter_process(int argc, char *argv[])
{
(void) argc;
(void) argv;
JobSubmitterProcessArguments * args = (JobSubmitterProcessArguments *) MSG_process_get_data(MSG_process_self());
BatsimContext * context = args->context;
send_message("server", IPMessageType::SUBMITTER_HELLO);
double previousSubmissionDate = MSG_get_clock();
vector<const Job *> jobsVector;
const auto & jobs = context->jobs.jobs();
for (const auto & mit : jobs)
{
const Job * job = mit.second;
jobsVector.push_back(job);
}
sort(jobsVector.begin(), jobsVector.end(), job_comparator_subtime);
for (const Job * job : jobsVector)
{
if (job->submission_time > previousSubmissionDate)
MSG_process_sleep(job->submission_time - previousSubmissionDate);
JobSubmittedMessage * msg = new JobSubmittedMessage;
msg->job_id = job->id;
send_message("server", IPMessageType::JOB_SUBMITTED, (void*)msg);
previousSubmissionDate = MSG_get_clock();
}
send_message("server", IPMessageType::SUBMITTER_BYE);
delete args;
return 0;
}
#pragma once
int job_submitter_process(int argc, char *argv[]);
......@@ -41,23 +41,8 @@ void Jobs::setProfiles(Profiles *profiles)
_profiles = profiles;
}
void Jobs::load_from_json(const std::string &filename)
void Jobs::load_from_json(const Document &doc, const string &filename)
{
// Let the file content be placed in a string
ifstream ifile(filename);
string content;
ifile.seekg(0, ios::end);
content.reserve(ifile.tellg());
ifile.seekg(0, ios::beg);
content.assign((std::istreambuf_iterator<char>(ifile)),
std::istreambuf_iterator<char>());
// JSON document creation
Document doc;
doc.Parse(content.c_str());
xbt_assert(doc.IsObject());
xbt_assert(doc.HasMember("jobs"), "Invalid JSON file '%s': the 'jobs' array is missing", filename.c_str());
const Value & jobs = doc["jobs"];
......
......@@ -7,6 +7,8 @@
#include <map>
#include <vector>
#include <rapidjson/document.h>
class Profiles;
enum class JobState
......@@ -42,7 +44,7 @@ public:
void setProfiles(Profiles * profiles);
void load_from_json(const std::string & filename);
void load_from_json(const rapidjson::Document & doc, const std::string & filename);
Job * operator[](int job_id);
const Job * operator[](int job_id) const;
......
......@@ -157,7 +157,7 @@ int execute_profile(BatsimContext *context,
for (int i = 0; i < data->repeat; i++)
{
for (int j = 0; j < data->sequence.size(); j++)
for (unsigned int j = 0; j < data->sequence.size(); j++)
{
if (execute_profile(context, data->sequence[j], allocation, remaining_time) == 0)
return 0;
......
......@@ -48,6 +48,8 @@ UnixDomainSocket::~UnixDomainSocket()
void UnixDomainSocket::create_socket(const string & filename)
{
XBT_INFO("Creating UDS socket on '%s'", filename.c_str());
_server_socket = socket(AF_UNIX, SOCK_STREAM, 0);
xbt_assert(_server_socket != -1, "Impossible to create socket");
......
......@@ -27,23 +27,8 @@ Profiles::~Profiles()
}
}
void Profiles::load_from_json(const string & filename)
void Profiles::load_from_json(const Document &doc, const string & filename)
{
// Let the file content be placed in a string
ifstream ifile(filename);
string content;
ifile.seekg(0, ios::end);
content.reserve(ifile.tellg());
ifile.seekg(0, ios::beg);
content.assign((std::istreambuf_iterator<char>(ifile)),
std::istreambuf_iterator<char>());
// JSON document creation
Document doc;
doc.Parse(content.c_str());
xbt_assert(doc.IsObject());
xbt_assert(doc.HasMember("profiles"), "Invalid JSON file '%s': the 'profiles' object is missing", filename.c_str());
const Value & profiles = doc["profiles"];
......@@ -194,6 +179,11 @@ bool Profiles::exists(const std::string &profile_name) const
return mit != _profiles.end();
}
const std::map<string, Profile *> Profiles::profiles() const
{
return _profiles;
}