Commit 74e512e6 authored by Olivier Richard's avatar Olivier Richard

Add execution of SMPI profile

parent ad1f48d8
......@@ -20,7 +20,6 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(batexec, "Batexec");
int nb_nodes = 0;
msg_host_t *nodes;
/**
* \brief Execute jobs alone
*
......@@ -83,9 +82,6 @@ msg_error_t deploy_all(const char *platform_file, bool smpi_used)
int main(int argc, char *argv[])
{
msg_error_t res = MSG_OK;
bool smpi_used;
json_t *json_workload_profile;
//Comment to remove debug message
xbt_log_control_set("batexec.threshold:debug");
......@@ -99,18 +95,25 @@ int main(int argc, char *argv[])
printf("example: %s ../platforms/small_platform.xml ../workload_profiles/test_workload_profile.json\n", argv[0]);
exit(1);
}
json_t *json_workload_profile;
json_workload_profile = load_json_workload_profile(argv[2]);
retrieve_jobs(json_workload_profile);
retrieve_profiles(json_workload_profile);
checkJobsAndProfilesValidity();
MSG_init(&argc, argv);
//register all smpi jobs app and init SMPI
smpi_used = register_smpi_app_instances();
XBT_INFO("finished initialize for smpi");
// Register all smpi jobs app and init SMPI
bool smpi_used = register_smpi_app_instances();
res = deploy_all(argv[1], smpi_used);
msg_error_t res = deploy_all(argv[1], smpi_used);
json_decref(json_workload_profile);
// Let's clear global allocated data
freeJobStructures();
free(nodes);
if (res == MSG_OK)
return 0;
......
......@@ -15,6 +15,7 @@
#include <unistd.h>
#include <string.h>
#include <argp.h>
#include <stdbool.h>
#include <simgrid/msg.h>
......@@ -661,11 +662,14 @@ int server(int argc, char *argv[])
* @param[in] pajeTraceFilename The name of the Pajé trace to generate
* @param[in] csvJobsFilename The name of the CSV output file about jobs
* @param[in] csvScheduleFilename The name of the CSV output file about the schedule
* @param[in] smpi_used The flag to indicate the presence of SMPI job
* @return The msg_error_t result of the inner call of MSG_main() (MSG_OK on success)
*/
msg_error_t deploy_all(const char *platform_file, const char * masterHostName, const char * pajeTraceFilename, const char * csvJobsFilename, const char * csvScheduleFilename)
msg_error_t deploy_all(const char *platform_file, const char * masterHostName, const char * pajeTraceFilename, const char * csvJobsFilename, const char * csvScheduleFilename, bool smpi_used)
{
MSG_config("host/model", "ptask_L07");
if (!smpi_used)
MSG_config("host/model", "ptask_L07");
MSG_create_environment(platform_file);
xbt_dynar_t all_hosts = MSG_hosts_as_dynar();
......@@ -895,6 +899,9 @@ int main(int argc, char *argv[])
MSG_init(&argc, argv);
// Register all smpi jobs app and init SMPI
bool smpi_used = register_smpi_app_instances();
open_uds(mainArgs.socketFilename, mainArgs.nbConnectTries, mainArgs.connectDelay);
char * pajeFilename;
......@@ -908,7 +915,7 @@ int main(int argc, char *argv[])
ret = asprintf(&csvScheduleFilename, "%s_schedule.csv", mainArgs.exportPrefix);
xbt_assert(ret != -1, "asprintf failed (not enough memory?)");
msg_error_t res = deploy_all(mainArgs.platformFilename, mainArgs.masterHostName, pajeFilename, csvJobsFilename, csvScheduleFilename);
msg_error_t res = deploy_all(mainArgs.platformFilename, mainArgs.masterHostName, pajeFilename, csvJobsFilename, csvScheduleFilename, smpi_used);
free(pajeFilename);
free(csvJobsFilename);
......
......@@ -44,6 +44,21 @@ int killerDelay(int argc, char *argv[])
return 0;
}
int smpi_replay(int argc, char *argv[])
{
//just to verify given argv
int index;
for(index = 0; index < argc; index++) {
printf("The %d is %s\n",index,argv[index]);
}
smpi_replay_run(&argc, &argv);
//printf("smpi_rank %d\n", smpi_comm_rank(MPI_));
return 0;
}
/**
* @brief Executes the profile of a given job
* @param[in] profile_str The name of the profile to execute
......@@ -196,7 +211,35 @@ int profile_exec(const char *profile_str, int job_id, int nb_res, msg_host_t *jo
}
else if (strcmp(profile->type, "smpi") == 0)
{
xbt_die("Cannot execute job %d: the profile type '%s' is not yet implemented", job_id, profile->type);
xbt_dynar_t traceFilenamesDynar = ((s_smpi_t *)(profile->data))->trace_filenames_dynar;
for (int i = 0; i < nb_res; i++)
{
char *str_instance_id = NULL;
int ret = asprintf(&str_instance_id, "%d", job_id);
xbt_assert(ret != -1, "asprintf failed (not enough memory?)");
char *str_rank_id = NULL;
ret = asprintf(&str_rank_id, "%d", i);
xbt_assert(ret != -1, "asprintf failed (not enough memory?)");
char *str_pname = NULL;
ret = asprintf(&str_pname, "%d_%d", job_id, i);
xbt_assert(ret != -1, "asprintf failed (not enough memory?)");
char **argv = xbt_new(char*, 5);
argv[0] = xbt_strdup("1"); // Fonction_replay_label (can be ignored, for log only),
argv[1] = str_instance_id; // Instance Id (application) job_id is used
argv[2] = str_rank_id; // Rank Id
argv[3] = xbt_dynar_get_as(traceFilenamesDynar, i, char *);
argv[4] = xbt_strdup("0"); //
MSG_process_create_with_arguments(str_pname, smpi_replay, NULL, job_res[i], 5, argv );
free(str_pname);
}
}
else
xbt_die("Cannot execute job %d: the profile type '%s' is unknown", job_id, profile->type);
......@@ -220,29 +263,6 @@ int job_exec(int job_id, int nb_res, int *res_idxs, msg_host_t *nodes, double wa
return ret;
}
int smpi_replay(int argc, char *argv[])
{
//just to verify given argv
int index;
for(index = 0; index < argc; index++) {
printf("The %d is %s\n",index,argv[index]);
}
printf("yop...\n");
smpi_replay_init(&argc, &argv);
//printf("smpi_rank %d\n", smpi_comm_rank(MPI_));
/* Actually do the simulation using smpi_action_trace_run */
smpi_action_trace_run(NULL);
//smpi_replay_finalize_instance();
smpi_replay_finalize();
return 0;
}
bool register_smpi_app_instances()
{
bool smpi_used = false;
......
......@@ -3,6 +3,7 @@
#include <string.h>
#include <errno.h>
#include <libgen.h>
#include "job.h"
#include "utils.h"
......@@ -17,6 +18,7 @@ int nb_jobs = 0;
xbt_dict_t profiles = NULL;
xbt_dynar_t jobs_dynar = NULL;
xbt_dict_t job_id_to_dynar_pos = NULL;
char *dirname_workload_file = NULL;
/**
* @brief Compares two jobs according to their submission time
......@@ -43,6 +45,8 @@ json_t *load_json_workload_profile(char *filename)
filename = "../workload_profiles/test_workload_profile.json";
}
dirname_workload_file = dirname(xbt_strdup(filename));
root = json_load_file(filename, 0, &error);
if(!root)
......@@ -288,24 +292,38 @@ void retrieve_profiles(json_t *root)
xbt_assert(e != NULL, "The smpi profile '%s' must have a 'trace' field", key);
xbt_assert(json_typeof(e) == JSON_STRING, "The 'trace' field of the smpi profile '%s' must be a string", key);
//! Retrieves filename of each traces
const char* filename = json_string_value(e);
char* filename = NULL;
char* trace_filename = NULL;
FILE *fp = NULL;
ssize_t read;
char *line = NULL;
size_t n = 0;
char *dname = NULL;
dirname(xbt_strdup(filename));
int ret = asprintf(&filename, "%s/%s", dirname_workload_file, json_string_value(e));
xbt_assert(ret != -1, "asprintf failed (not enough memory?)");
dname = dirname(xbt_strdup(filename));
xbt_dynar_t traceFilenamesDynar = xbt_dynar_new(sizeof(char *), NULL);
fp = fopen(filename, "r");
if (fp == NULL)
xbt_die("Cannot open %s: %s", filename, strerror(errno));
while ((read = xbt_getline(&line,&n,fp)) != -1){
xbt_str_trim(line, NULL);
xbt_dynar_push(traceFilenamesDynar, line);
printf("line: %s\n", line);
ret = asprintf(&trace_filename, "%s/%s", dname, line);
xbt_assert(ret != -1, "asprintf failed (not enough memory?)");
xbt_dynar_push_as(traceFilenamesDynar, char *, trace_filename);
}
smpi_prof->trace_filenames_dynar = traceFilenamesDynar;
smpi_prof->nb_traces = xbt_dynar_length(traceFilenamesDynar);
free(filename);
free(dname);
}
else
......@@ -314,6 +332,7 @@ void retrieve_profiles(json_t *root)
iter = json_object_iter_next(j_profiles, iter);
}
free(dirname_workload_file);
XBT_INFO("%d profiles had been read from the JSON file", xbt_dict_size(profiles));
}
......@@ -337,6 +356,12 @@ void freeProfile(void * profile)
xbt_free(data->seq);
}
else if (strcmp(prof->type, "smpi") == 0)
{
s_smpi_t * data = prof->data;
xbt_dynar_free(&data->trace_filenames_dynar);
}
free(prof->type);
free(prof->data);
......
0 init
0 send 1 1e6
0 recv 1 1e6
0 send 1 1e6
0 finalize
1 init
1 recv 0 1e6
1 compute 1e9
1 Isend 0 1e6
1 Irecv 0 1e6
1 wait
1 finalize
2/actions0.txt
2/actions1.txt
{
"version": 0,
"command:": "",
"date": "Tue, 11 Mar 2015 9:44:30 +0100",
"description": "workload with profile file for test",
"nb_res": 4,
"jobs": [
{"id":1, "subtime":10, "walltime": 100, "res": 2, "profile": "1"}
],
"profiles": {
"1": {
"type": "smpi",
"trace": "app_1/traces2.txt"
},
"6": {
"type": "smpi",
"trace": "app_1/traces2.txt"
}
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment