Commit f7bb7d8b authored by Millian Poquet's avatar Millian Poquet

New strategy to stop jobs once their walltime is reached. It uses SimGrid...

New strategy to stop jobs once their walltime is reached. It uses SimGrid remote exception calls, which have been fixed in SG revision 'ebf5094d45dec1fc10623f4fd67ff47ff0bca554' (2015-04-08). The latest unstable version of SG is therefore needed.
parent 8fe182ad
......@@ -2,7 +2,7 @@
* Installation
** dependencies
- simgrid 3.12 (minimun)
- simgrid (lastest unstable version)
- Jansson: a C library for encoding, decoding and manipulating JSON data.
apt-get install libjansson4
apt-get install libjansson-dev
......
/* Copyright (c) 2007-2014. The SimGrid Team and OAR Team
* All rights reserved. */
#include <msg/msg.h>
#include <simgrid/msg.h>
#include <xbt/sysdep.h> /* calloc, printf */
/* Create a log channel to have nice outputs. */
#include <xbt/log.h>
#include <xbt/asserts.h>
#include <math.h>
XBT_LOG_NEW_DEFAULT_CATEGORY(batexec, "Batexec");
#include "job.h"
......@@ -25,6 +27,9 @@ msg_host_t *nodes;
*/
static int job_launcher(int argc, char *argv[])
{
(void) argc;
(void) argv;
unsigned int job_index;
s_job_t * job;
......@@ -36,7 +41,7 @@ static int job_launcher(int argc, char *argv[])
for (int i = 0; i < job->nb_res; i++)
res_idxs[i] = i;
job_exec(job->id, job->nb_res, res_idxs, nodes, NULL);
job_exec(job->id, job->nb_res, res_idxs, nodes, INFINITY);
XBT_INFO("Job id %d, job simulation time: %f", job->id, MSG_get_clock() - t);
free(res_idxs);
}
......@@ -49,8 +54,6 @@ msg_error_t deploy_all(const char *platform_file)
msg_error_t res = MSG_OK;
xbt_dynar_t all_hosts;
msg_host_t first_host;
msg_host_t host;
int i;
MSG_config("workstation/model", "ptask_L07");
MSG_create_environment(platform_file);
......@@ -77,7 +80,6 @@ msg_error_t deploy_all(const char *platform_file)
int main(int argc, char *argv[])
{
msg_error_t res = MSG_OK;
int i;
json_t *json_workload_profile;
......
This diff is collapsed.
#pragma once
#include <msg/msg.h>
#include <simgrid/msg.h>
/**
* @brief Types of tasks exchanged between nodes.
......@@ -37,16 +37,4 @@ typedef struct s_lauch_data
int jobID; //! The job identification number
int reservedNodeCount; //! The number of reserved nodes
int * reservedNodesIDs; //! The nodes on which the job will be run
msg_process_t killerProcess; //! The SG killer process
struct s_kill_data * killerData; //! The data used by the killer
xbt_dict_t dataToRelease; //! The data used by the launcher which will need to be released (used when killing the job)
} s_launch_data_t;
/**
* @brief Data structure used to kill a job
*/
typedef struct s_kill_data
{
msg_process_t launcherProcess; //! The SG launcher process
s_launch_data_t * launcherData; //! The data used by the launcher
} s_kill_data_t;
......@@ -2,7 +2,7 @@
#include <stdio.h>
#include <sys/types.h> /* ssize_t, needed by xbt/str.h, included by msg/msg.h */
#include <msg/msg.h>
#include <simgrid/msg.h>
/**
* @brief Handles bufferized writings into a single file
......
......@@ -9,7 +9,44 @@
XBT_LOG_NEW_DEFAULT_CATEGORY(job, "job");
void profile_exec(const char *profile_str, int job_id, int nb_res, msg_host_t *job_res, xbt_dict_t * allocatedStuff)
//! The input data of a killerDelay
typedef struct s_killer_delay_data
{
msg_task_t task; //! The task that will be cancelled if the walltime is reached
double walltime; //! The number of seconds to wait before cancelling the task
} KillerDelayData;
int killerDelay(int argc, char *argv[])
{
(void) argc;
(void) argv;
KillerDelayData * data = MSG_process_get_data(MSG_process_self());
// The sleep can either stop normally (res=MSG_OK) or be cancelled when the task execution completed (res=MSG_TASK_CANCELED)
msg_error_t res = MSG_process_sleep(data->walltime);
if (res == MSG_OK)
{
// If we had time to sleep until walltime (res=MSG_OK), the task execution is not over and must be cancelled
MSG_task_cancel(data->task);
}
free(data);
return 0;
}
/**
* @brief Executes the profile of a given job
* @param[in] profile_str The name of the profile to execute
* @param[in] job_id The job number
* @param[in] nb_res The number of resources on which the job will be executed
* @param[in] job_res The resources on which the job will be executed
* @param[in,out] remainingTime The time remaining to execute the full profile
* @return 1 if the profile had been executed in time, 0 if the walltime had been reached
*/
int profile_exec(const char *profile_str, int job_id, int nb_res, msg_host_t *job_res, double * remainingTime)
{
double *computation_amount;
double *communication_amount;
......@@ -33,15 +70,34 @@ void profile_exec(const char *profile_str, int job_id, int nb_res, msg_host_t *j
asprintf(&tname, "p %d", job_id);
XBT_INFO("Creating task '%s'", tname);
//ptask = malloc(sizeof(msg_task_t *));
ptask = MSG_parallel_task_create(tname,
nb_res, job_res,
computation_amount,
communication_amount, NULL);
free(tname);
xbt_dict_set(*allocatedStuff, "task", (void*)ptask, freeTask);
// Let's spawn a process which will wait until walltime and cancel the task if needed
KillerDelayData * killData = malloc(sizeof(KillerDelayData));
killData->task = ptask;
killData->walltime = *remainingTime;
msg_process_t killProcess = MSG_process_create("killer", killerDelay, killData, MSG_host_self());
double timeBeforeExecute = MSG_get_clock();
msg_error_t err = MSG_parallel_task_execute(ptask);
xbt_assert(err == MSG_OK);
*remainingTime = *remainingTime - (MSG_get_clock() - timeBeforeExecute);
int ret = 1;
if (err == MSG_OK)
SIMIX_process_throw(killProcess, cancel_error, 0, "wake up");
else if (err == MSG_TASK_CANCELED)
ret = 0;
else
xbt_die("A task execution had been stopped by an unhandled way (err = %d)", err);
MSG_task_destroy(ptask);
return ret;
}
else if (strcmp(profile->type, "msg_par_hg") == 0)
{
......@@ -65,15 +121,33 @@ void profile_exec(const char *profile_str, int job_id, int nb_res, msg_host_t *j
asprintf(&tname, "hg %d", job_id);
XBT_INFO("Creating task '%s'", tname);
//ptask = malloc(sizeof(msg_task_t *));
ptask = MSG_parallel_task_create(tname,
nb_res, job_res,
computation_amount,
communication_amount, NULL);
free(tname);
xbt_dict_set(*allocatedStuff, "task", (void*)ptask, freeTask);
// Let's spawn a process which will wait until walltime and cancel the task if needed
KillerDelayData * killData = malloc(sizeof(KillerDelayData));
killData->task = ptask;
killData->walltime = *remainingTime;
msg_process_t killProcess = MSG_process_create("killer", killerDelay, killData, MSG_host_self());
double timeBeforeExecute = MSG_get_clock();
msg_error_t err = MSG_parallel_task_execute(ptask);
xbt_assert(err == MSG_OK);
*remainingTime = *remainingTime - (MSG_get_clock() - timeBeforeExecute);
int ret = 1;
if (err == MSG_OK)
SIMIX_process_throw(killProcess, cancel_error, 0, "wake up");
else if (err == MSG_TASK_CANCELED)
ret = 0;
else
xbt_die("A task execution had been stopped by an unhandled way (err = %d)", err);
MSG_task_destroy(ptask);
return ret;
}
else if (strcmp(profile->type, "composed") == 0)
{
......@@ -85,79 +159,46 @@ void profile_exec(const char *profile_str, int job_id, int nb_res, msg_host_t *j
XBT_DEBUG("composed: nb: %d, lg_seq: %d", nb, lg_seq);
for (int i = 0; i < nb; i++)
{
for (int j = 0; j < lg_seq; j++)
{
profile_exec(seq[j], job_id, nb_res, job_res, allocatedStuff);
}
}
if (profile_exec(seq[j], job_id, nb_res, job_res, remainingTime) == 0)
return 0;
}
else if (strcmp(profile->type, "delay") == 0)
{
double delay = ((s_delay_t *)(profile->data))->delay;
MSG_process_sleep(delay);
if (delay < *remainingTime)
{
MSG_process_sleep(delay);
return 1;
}
else
{
MSG_process_sleep(*remainingTime);
*remainingTime = 0;
return 0;
}
}
else if (strcmp(profile->type, "smpi") == 0)
{
xbt_die("Profile with type %s is not yet implemented", profile->type);
}
else
{
xbt_die("Profile with type %s is not supported", profile->type);
}
}
/**
* \brief Load workload with jobs' profiles file
*/
return 0;
}
void job_exec(int job_id, int nb_res, int *res_idxs, msg_host_t *nodes, xbt_dict_t * allocatedStuff)
int job_exec(int job_id, int nb_res, int *res_idxs, msg_host_t *nodes, double walltime)
{
int dictCreatedHere = 0;
if (allocatedStuff == NULL)
{
allocatedStuff = malloc(sizeof(xbt_dict_t));
*allocatedStuff = xbt_dict_new();
dictCreatedHere = 1;
}
s_job_t * job = jobFromJobID(job_id);
XBT_INFO("job_exec: jobID %d, job=%p", job_id, job);
msg_host_t * job_res = (msg_host_t *) malloc(nb_res * sizeof(s_msg_host_t));
xbt_dict_set(*allocatedStuff, "hosts", job_res, free);
for(int i = 0; i < nb_res; i++)
job_res[i] = nodes[res_idxs[i]];
profile_exec(job->profile, job_id, nb_res, job_res, allocatedStuff);
int ret = profile_exec(job->profile, job_id, nb_res, job_res, &walltime);
if (dictCreatedHere)
{
xbt_dict_free(allocatedStuff);
free(allocatedStuff);
}
free(job_res);
return ret;
}
void freeTask(void * task)
{
msg_task_t t = (msg_task_t) task;
const static int doLeak = 0;
if (doLeak)
{
XBT_INFO("freeing task '%s' (with memory leak)", MSG_task_get_name(t));
XBT_INFO("freeing task (with memory leak) done");
}
else
{
// todo: make this work -> SG mailing list? cancelling the task might be better BEFORE killing the associated process...
XBT_INFO("freeing task '%s'", MSG_task_get_name(t));
//MSG_task_cancel(t);
MSG_task_destroy(t);
XBT_INFO("freeing task done");
}
}
\ No newline at end of file
......@@ -4,7 +4,7 @@
#include <sys/types.h> /* ssize_t, needed by xbt/str.h, included by msg/msg.h */
#include <msg/msg.h>
#include <simgrid/msg.h>
#include <xbt.h>
#include <xbt/sysdep.h>
#include <xbt/log.h>
......@@ -60,6 +60,13 @@ typedef struct s_delay
double delay;
} s_delay_t;
void job_exec(int job_id, int nb_res, int *res_idxs, msg_host_t *nodes, xbt_dict_t * allocatedStuff);
void freeTask(void * task);
\ No newline at end of file
/**
* @brief Executes a job
* @param[in] job_id The job number
* @param[in] nb_res The number of resources on which the job will be executed
* @param[in] res_idxs The resources on which the job will be executed (indexes of the nodes parameter)
* @param[in] nodes The array of hosts which can be used to execute the job
* @param[in] walltime The maximum execution time of the job
* @return 1 if the job finished in time, 0 if the walltime had been reached
*/
int job_exec(int job_id, int nb_res, int *res_idxs, msg_host_t *nodes, double walltime);
......@@ -82,13 +82,13 @@ void retrieve_jobs(json_t *root) // todo: sort jobs by ascending submission time
job->submission_time = json_number_to_double(json_object_get(j,"subtime"));
job->walltime = json_number_to_double(json_object_get(j,"walltime"));
asprintf(&(job->profile), "%s", json_string_value(json_object_get(j,"profile"))); // todo: clean
XBT_INFO("Read profile '%s' from job %d", job->profile, job->id);
/*XBT_INFO("Read profile '%s' from job %d", job->profile, job->id);*/
job->runtime = -1;
job->nb_res = json_number_to_double(json_object_get(j,"res"));
job->state = JOB_STATE_NOT_SUBMITTED;
XBT_INFO("Read job: id=%d, subtime=%lf, walltime=%lf, profile='%s', nb_res=%d",
job->id, job->submission_time, job->walltime, job->profile, job->nb_res);
/*XBT_INFO("Read job: id=%d, subtime=%lf, walltime=%lf, profile='%s', nb_res=%d",
job->id, job->submission_time, job->walltime, job->profile, job->nb_res);*/
if (!jobExists(job->id))
{
......@@ -97,7 +97,7 @@ void retrieve_jobs(json_t *root) // todo: sort jobs by ascending submission time
int * insertPosition = (int *) malloc(sizeof(int));
*insertPosition = xbt_dynar_length(jobs_dynar) - 1;
xbt_dict_set(job_id_to_dynar_pos, job->id_str, insertPosition, free);
XBT_INFO("Added to map '%s'->%d", job->id_str, *insertPosition);
/*XBT_INFO("Added to map '%s'->%d", job->id_str, *insertPosition);*/
}
else
{
......@@ -349,4 +349,4 @@ s_job_t * jobFromJobID(int jobID)
free(jobName);
return *((s_job_t **) xbt_dynar_get_ptr(jobs_dynar, *dynarPosition));
}
\ No newline at end of file
}
......@@ -2,7 +2,7 @@
* All rights reserved. */
#pragma once
#include <msg/msg.h>
#include <simgrid/msg.h>
#include <xbt.h>
#include <xbt/sysdep.h>
#include <xbt/log.h>
......@@ -35,4 +35,4 @@ void initializeJobStructures();
void freeJobStructures();
int jobExists(int jobID);
s_job_t * jobFromJobID(int jobID);
\ No newline at end of file
s_job_t * jobFromJobID(int jobID);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment