Commit 412b555b authored by Millian Poquet's avatar Millian Poquet

Gestion du kill de jobs. Deux process MSG sont créés : un pour le launch et un...

Gestion du kill de jobs. Deux process MSG sont créés : un pour le launch et un autre pour le kill. Ces deux process se connaissent (utilisation d'une barrière de synchro dans ce but). Ainsi, lorsqu'un des process termine, il tue l'autre. L'utilisation d'une structure de données pour stocker les allocations faites par le launcher devrait permettre d'éviter les fuites mémoires.
parent c6fb4b8a
......@@ -41,7 +41,7 @@ static int job_launcher(int argc, char *argv[])
res_idxs[i] = i;
}
job_exec(job_idx, res_idxs, nodes);
job_exec(job_idx, job.nb_res, res_idxs, nodes, NULL);
XBT_INFO("Job id %d, job simulation time: %f", job.id, MSG_get_clock() - t);
free(res_idxs);
}
......
This diff is collapsed.
#pragma once
#include <msg/msg.h>
/**
* Types of tasks exchanged between nodes.
*/
typedef enum {
ECHO,
FINALIZE,
LAUNCH_JOB,
JOB_SUBMITTED,
JOB_COMPLETED,
KILL_JOB,
SUSPEND_JOB,
SCHED_READY
} e_task_type_t;
/*
* Data attached with the tasks sent and received
*/
typedef struct s_task_data {
e_task_type_t type; // type of task
int job_idx;
void *data;
const char* src; // used for logging
} s_task_data_t, *task_data_t;
/**
* @brief Data structure used to launch a job
*/
typedef struct s_lauch_data
{
int jobID; //! The job identification number
int reservedNodeCount; //! The number of reserved nodes
int * reservedNodesIDs; //! The nodes on which the job will be run
msg_bar_t * barrier; //! The barrier used to know when the process can read its full data (especially the killer process)
msg_process_t killerProcess; //! The SG killer process
struct s_kill_data * killerData; //! The data used by the killer
xbt_dict_t dataToRelease; //! The data used by the launcher which will need to be released (used when killing the job)
} s_launch_data_t;
/**
* @brief Data structure used to kill a job
*/
typedef struct s_kill_data
{
msg_bar_t * barrier; //! The barrier used to know when the process can read its full data (especially the launcher process)
msg_process_t launcherProcess; //! The SG launcher process
s_launch_data_t * launcherData; //! The data used by the launcher
} s_kill_data_t;
/* Copyright (c) 2015. The OAR Team.
* All rights reserved. */
* All rights reserved. */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
......@@ -9,122 +9,125 @@
XBT_LOG_NEW_DEFAULT_CATEGORY(job, "job");
void profile_exec(const char *profile_str, int nb_res, msg_host_t *job_res) {
void profile_exec(const char *profile_str, int nb_res, msg_host_t *job_res, xbt_dict_t * allocatedStuff)
{
double *computation_amount;
double *communication_amount;
msg_task_t ptask;
profile_t profile;
double *computation_amount;
double *communication_amount;
msg_task_t ptask;
int i, j;
profile_t profile = xbt_dict_get(profiles, profile_str);
profile = xbt_dict_get(profiles, profile_str);
if (strcmp(profile->type, "msg_par") == 0)
{
// These amounts are deallocated by SG
computation_amount = malloc(nb_res * sizeof(double));
communication_amount = malloc(nb_res * nb_res * sizeof(double));
if (strcmp(profile->type, "msg_par") == 0) {
computation_amount = malloc(nb_res * sizeof(double));
communication_amount = malloc(nb_res * nb_res * sizeof(double));
double *cpu = ((msg_par_t)(profile->data))->cpu;
double *com = ((msg_par_t)(profile->data))->com;
double *cpu = ((msg_par_t)(profile->data))->cpu;
double *com = ((msg_par_t)(profile->data))->com;
memcpy(computation_amount , cpu, nb_res * sizeof(double));
memcpy(communication_amount, com, nb_res * nb_res * sizeof(double));
memcpy(computation_amount , cpu, nb_res * sizeof(double));
memcpy(communication_amount, com, nb_res * nb_res * sizeof(double));
ptask = MSG_parallel_task_create("parallel task",
nb_res, job_res,
computation_amount,
communication_amount, NULL);
MSG_parallel_task_execute(ptask);
ptask = MSG_parallel_task_create("parallel task",
nb_res, job_res,
computation_amount,
communication_amount, NULL);
MSG_parallel_task_execute(ptask);
MSG_task_destroy(ptask);
} else if (strcmp(profile->type, "msg_par_hg") == 0) {
double cpu = ((msg_par_hg_t)(profile->data))->cpu;
double com = ((msg_par_hg_t)(profile->data))->com;
computation_amount = malloc(nb_res * sizeof(double));
communication_amount = malloc(nb_res * nb_res * sizeof(double));
XBT_DEBUG("msg_par_hg: nb_res: %d , cpu: %f , com: %f", nb_res, cpu, com);
for (i = 0; i < nb_res; i++) {
computation_amount[i] = cpu;
MSG_task_destroy(ptask);
}
for (j = 0; j < nb_res; j++) {
for (i = 0; i < nb_res; i++) {
communication_amount[nb_res * j + i] = com;
}
else if (strcmp(profile->type, "msg_par_hg") == 0)
{
double cpu = ((msg_par_hg_t)(profile->data))->cpu;
double com = ((msg_par_hg_t)(profile->data))->com;
// These amounts are deallocated by SG
computation_amount = malloc(nb_res * sizeof(double));
communication_amount = malloc(nb_res * nb_res * sizeof(double));
XBT_DEBUG("msg_par_hg: nb_res: %d , cpu: %f , com: %f", nb_res, cpu, com);
for (int i = 0; i < nb_res; i++)
computation_amount[i] = cpu;
for (int j = 0; j < nb_res; j++)
for (int i = 0; i < nb_res; i++)
communication_amount[nb_res * j + i] = com;
ptask = MSG_parallel_task_create("parallel task hg",
nb_res, job_res,
computation_amount,
communication_amount, NULL);
MSG_parallel_task_execute(ptask);
MSG_task_destroy(ptask);
}
ptask = MSG_parallel_task_create("parallel task hg",
nb_res, job_res,
computation_amount,
communication_amount, NULL);
MSG_parallel_task_execute(ptask);
MSG_task_destroy(ptask);
} else if (strcmp(profile->type, "composed") == 0) {
char buffer[20];
int nb = ((composed_prof_t)(profile->data))->nb;
int lg_seq = ((composed_prof_t)(profile->data))->lg_seq;
int *seq = ((composed_prof_t)(profile->data))->seq;
XBT_DEBUG("composed: nb: %d, lg_seq: %d", nb, lg_seq);
for(j = 0; j < lg_seq; j++) {
for(i = 0; i < nb; i++) {
sprintf(buffer, "%d", seq[j]);
profile_exec(buffer, nb_res, job_res);
}
else if (strcmp(profile->type, "composed") == 0)
{
char buffer[20];
int nb = ((composed_prof_t)(profile->data))->nb;
int lg_seq = ((composed_prof_t)(profile->data))->lg_seq;
int *seq = ((composed_prof_t)(profile->data))->seq;
XBT_DEBUG("composed: nb: %d, lg_seq: %d", nb, lg_seq);
for (int j = 0; j < lg_seq; j++)
{
for (int i = 0; i < nb; i++)
{
sprintf(buffer, "%d", seq[j]);
profile_exec(buffer, nb_res, job_res, allocatedStuff);
}
}
}
else if (strcmp(profile->type, "delay") == 0)
{
double delay = ((delay_t)(profile->data))->delay;
MSG_process_sleep(delay);
} else if (strcmp(profile->type, "delay") == 0) {
double delay = ((delay_t)(profile->data))->delay;
MSG_process_sleep(delay);
} else if (strcmp(profile->type, "smpi") == 0) {
xbt_die("Profile with type %s is not yet implemented", profile->type);
} else {
xbt_die("Profile with type %s is not supported", profile->type);
}
}
else if (strcmp(profile->type, "smpi") == 0)
{
xbt_die("Profile with type %s is not yet implemented", profile->type);
}
else
{
xbt_die("Profile with type %s is not supported", profile->type);
}
}
/**
* \brief Load workload with jobs' profiles file
*/
void job_exec(int job_idx, int *res_idxs, msg_host_t *nodes) {
msg_host_t *job_res;
s_job_t job;
int nb_res;
int i;
void job_exec(int job_idx, int nb_res, int *res_idxs, msg_host_t *nodes, xbt_dict_t * allocatedStuff)
{
int dictCreatedHere = 0;
job = jobs[job_idx];
XBT_DEBUG("Launch_job: idx %d, id %s profile %s", job_idx, jobs[job_idx].id_str, job.profile);
if (allocatedStuff == NULL)
{
allocatedStuff = malloc(sizeof(xbt_dict_t));
*allocatedStuff = xbt_dict_new();
dictCreatedHere = 1;
}
nb_res = job.nb_res;
s_job_t job = jobs[job_idx];
XBT_DEBUG("Launch_job: idx %d, id %s profile %s", job_idx, jobs[job_idx].id_str, job.profile);
job_res = (msg_host_t *)malloc( nb_res * sizeof(s_msg_host_t) );
msg_host_t * job_res = (msg_host_t *)malloc( nb_res * sizeof(s_msg_host_t) );
xbt_dict_set(*allocatedStuff, "hosts", job_res, free);
for(i = 0; i < nb_res; i++) {
job_res[i] = nodes[res_idxs[i]];
}
for(int i = 0; i < nb_res; i++)
job_res[i] = nodes[res_idxs[i]];
profile_exec(job.profile, nb_res, job_res);
profile_exec(job.profile, nb_res, job_res, allocatedStuff);
free(job_res);
if (dictCreatedHere)
{
xbt_dict_free(allocatedStuff);
free(allocatedStuff);
}
}
// TODO ! don't free manually set elements, they will be released automatically on reset !
\ No newline at end of file
/* Copyright (c) 2015. The OAR Team.
* All rights reserved. */
#ifndef BATSIM_JOB_H
#define BATSIM_JOB_H
#pragma once
#include <sys/types.h> /* ssize_t, needed by xbt/str.h, included by msg/msg.h */
......@@ -46,6 +45,4 @@ typedef struct s_delay {
double delay;
} s_delay_t, *delay_t;
void job_exec(int job_idx, int *res_idxs, msg_host_t *nodes);
#endif /* BATSIM_JOB_H */
void job_exec(int job_idx, int nb_res, int *res_idxs, msg_host_t *nodes, xbt_dict_t * allocatedStuff);
/* Copyright (c) 2015. The OAR Team.
* All rights reserved. */
#ifndef BATSIM_UTILS_H
#define BATSIM_UTILS_H
#pragma once
#include "msg/msg.h"
#include "xbt.h"
......@@ -27,4 +26,3 @@ json_t *load_json_workload_profile(char *filename);
void retrieve_jobs(json_t *root);
void retrieve_profiles(json_t *root);
#endif /* BATSIM_UTILS_H */
......@@ -8,7 +8,7 @@
"jobs": [
{"id":1, "subtime":10, "walltime": 100, "res": 4, "profile": "2"},
{"id":2, "subtime":20, "walltime": 100, "res": 4, "profile": "1"},
{"id":3, "subtime":30, "walltime": 100, "res": 4, "profile": "4"},
{"id":3, "subtime":30, "walltime": 3, "res": 4, "profile": "4"},
{"id":4, "subtime":40, "walltime": 100, "res": 4, "profile": "3"}
],
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment