Commit 58445c90 authored by Millian Poquet's avatar Millian Poquet

Launch/kill synchronization: a message-based approach is used instead of a barrier-based one.

parent 20382f6b
......@@ -44,7 +44,9 @@ char *task_type2str[] =
"JOB_COMPLETED",
"KILL_JOB",
"SUSPEND_JOB",
"SCHED_READY"
"SCHED_READY",
"LAUNCHER_INFORMATION",
"KILLER_INFORMATION"
};
int nb_nodes = 0;
......@@ -168,7 +170,7 @@ static int send_sched(int argc, char *argv[])
void send_message(const char *dst, e_task_type_t type, int job_id, void *data)
{
msg_task_t task_sent;
task_data_t req_data = xbt_new0(s_task_data_t,1);
s_task_data_t * req_data = xbt_new0(s_task_data_t,1);
req_data->type = type;
req_data->job_id = job_id;
req_data->data = data;
......@@ -200,18 +202,22 @@ static void task_free(struct msg_task ** task)
*/
static int launch_job(int argc, char *argv[])
{
// Let's read a first time the input to get the barrier
s_launch_data_t * data = MSG_process_get_data(MSG_process_self());
// Let's wait our data
msg_task_t task_received = NULL;
MSG_task_receive(&(task_received), MSG_process_get_name(MSG_process_self()));
// Let's wait on the barrier to be sure we can read our arguments
MSG_barrier_wait(*(data->barrier));
// Let's get our data
s_task_data_t * task_data = (s_task_data_t *) MSG_task_get_data(task_received);
xbt_assert(task_data->type == LAUNCHER_INFORMATION);
// After the second read, we know who the killerProcess is and its associated data
data = MSG_process_get_data(MSG_process_self());
s_launch_data_t * data = task_data->data;
s_kill_data_t * kdata = data->killerData;
int jobID = data->jobID;
s_job_t * job = jobFromJobID(jobID);
task_free(&task_received);
free(task_data);
XBT_INFO("Launching job %d", jobID);
// Let's run the job
......@@ -255,19 +261,23 @@ static int launch_job(int argc, char *argv[])
*/
static int kill_job(int argc, char *argv[])
{
// Let's read a first time the input to get the barrier
s_kill_data_t * data = MSG_process_get_data(MSG_process_self());
// Let's wait our data
msg_task_t task_received = NULL;
MSG_task_receive(&(task_received), MSG_process_get_name(MSG_process_self()));
// Let's wait on the barrier to be sure we can read our arguments
MSG_barrier_wait(*(data->barrier));
// Let's get our data
s_task_data_t * task_data = (s_task_data_t *) MSG_task_get_data(task_received);
xbt_assert(task_data->type == KILLER_INFORMATION);
// After the second read, we know who the launcherProcess is and its associated data
data = MSG_process_get_data(MSG_process_self());
s_kill_data_t * data = task_data->data;
s_launch_data_t * ldata = data->launcherData;
int jobID = ldata->jobID;
s_job_t * job = jobFromJobID(jobID);
double walltime = job->walltime;
task_free(&task_received);
free(task_data);
XBT_INFO("Sleeping for %lf seconds to possibly kill job %d", walltime, jobID);
// Let's sleep until the walltime is reached
......@@ -311,7 +321,7 @@ static int node(int argc, char *argv[])
const char *node_id = MSG_host_get_name(MSG_host_self());
msg_task_t task_received = NULL;
task_data_t task_data;
s_task_data_t * task_data;
int type = -1;
XBT_INFO("I am %s", node_id);
......@@ -320,51 +330,52 @@ static int node(int argc, char *argv[])
{
MSG_task_receive(&(task_received), node_id);
task_data = (task_data_t) MSG_task_get_data(task_received);
task_data = (s_task_data_t *) MSG_task_get_data(task_received);
type = task_data->type;
XBT_INFO("MSG_Task received %s, type %s", node_id, task_type2str[type]);
if (type == FINALIZE) break;
switch (type)
{
case FINALIZE:
{
return 0;
break;
}
case LAUNCH_JOB:
{
// Let's retrieve the launch data and create a kill data
s_launch_data_t * launchData = (s_launch_data_t *) task_data->data;
s_kill_data_t * killData = (s_kill_data_t *) malloc(sizeof(s_kill_data_t));
// Since the launcher needs to know the killer and vice versa, let's synchronize this knowledge by a barrier
msg_bar_t barrier = MSG_barrier_init(3);
launchData->barrier = killData->barrier = &barrier;
char * plname = NULL;
char * pkname = NULL;
asprintf(&plname, "launcher %d", launchData->jobID);
asprintf(&pkname, "killer %d", launchData->jobID);
// MSG process launching. These processes wait on the given barrier on their beginning
// MSG process launching. These processes wait because their data is incomplete
msg_process_t launcher = MSG_process_create(plname, launch_job, launchData, MSG_host_self());
msg_process_t killer = MSG_process_create(pkname, kill_job, killData, MSG_host_self());
free(plname);
free(pkname);
// The processes can now know each other
// Now that both processes exist, they know each other
launchData->killerProcess = killer;
launchData->killerData = killData;
killData->launcherProcess = launcher;
killData->launcherData = launchData;
XBT_INFO("job %d master: processes and data set", task_data->job_id);
// Let's send their data to the processes
send_message(plname, LAUNCHER_INFORMATION, launchData->jobID, launchData);
send_message(pkname, KILLER_INFORMATION, launchData->jobID, killData);
// Let's wake the processes then destroy the barrier
MSG_barrier_wait(barrier);
MSG_barrier_destroy(barrier);
free(plname);
free(pkname);
break;
}
default:
{
XBT_ERROR("Unhandled message type received (%d)", type);
}
}
task_free(&task_received);
}
......@@ -439,7 +450,7 @@ int server(int argc, char *argv[])
msg_host_t node;
msg_task_t task_received = NULL;
task_data_t task_data;
s_task_data_t * task_data;
int nb_completed_jobs = 0;
int nb_submitted_jobs = 0;
......@@ -459,7 +470,7 @@ int server(int argc, char *argv[])
// wait message node, submitter, scheduler...
MSG_task_receive(&(task_received), "server");
task_data = (task_data_t) MSG_task_get_data(task_received);
task_data = (s_task_data_t *) MSG_task_get_data(task_received);
XBT_INFO("Server receive Task/message type %s:", task_type2str[task_data->type]);
......
......@@ -5,7 +5,8 @@
/**
* Types of tasks exchanged between nodes.
*/
typedef enum {
typedef enum
{
ECHO,
FINALIZE,
LAUNCH_JOB,
......@@ -13,18 +14,21 @@ typedef enum {
JOB_COMPLETED,
KILL_JOB,
SUSPEND_JOB,
SCHED_READY
SCHED_READY,
LAUNCHER_INFORMATION,
KILLER_INFORMATION
} e_task_type_t;
/*
* Data attached with the tasks sent and received
*/
typedef struct s_task_data {
typedef struct s_task_data
{
e_task_type_t type; // type of task
int job_id;
void *data;
const char* src; // used for logging
} s_task_data_t, *task_data_t;
} s_task_data_t;
/**
* @brief Data structure used to launch a job
......@@ -34,7 +38,6 @@ typedef struct s_lauch_data
int jobID; //! The job identification number
int reservedNodeCount; //! The number of reserved nodes
int * reservedNodesIDs; //! The nodes on which the job will be run
msg_bar_t * barrier; //! The barrier used to know when the process can read its full data (especially the killer process)
msg_process_t killerProcess; //! The SG killer process
struct s_kill_data * killerData; //! The data used by the killer
xbt_dict_t dataToRelease; //! The data used by the launcher which will need to be released (used when killing the job)
......@@ -45,7 +48,6 @@ typedef struct s_lauch_data
*/
typedef struct s_kill_data
{
msg_bar_t * barrier; //! The barrier used to know when the process can read its full data (especially the launcher process)
msg_process_t launcherProcess; //! The SG launcher process
s_launch_data_t * launcherData; //! The data used by the launcher
} s_kill_data_t;
import struct
import socket
import sys
import os
import json
from random import sample
def create_uds(uds_name):
# Make sure the socket does not already exist
try:
os.unlink(uds_name)
except OSError:
if os.path.exists(uds_name):
raise
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# Bind the socket to the port
print >>sys.stderr, 'starting up on %s' % uds_name
sock.bind(uds_name)
# Listen for incoming connections
sock.listen(1)
return sock
def read_bat_msg(connection):
lg_str = connection.recv(4)
if not lg_str:
print "connection is closed by batsim core"
exit(1)
#print 'from client (lg_str): %r' % lg_str
lg = struct.unpack("i",lg_str)[0]
#print 'size msg to recv %d' % lg
msg = connection.recv(lg)
print 'from batsim : %r' % msg
sub_msgs = msg.split('|')
data = sub_msgs[0].split(":")
version = int(data[0])
now = float(data[1])
print "version: ", version, " now: ", now
jobs_submitted = []
new_jobs_completed = []
for i in range(1, len(sub_msgs)):
data = sub_msgs[i].split(':')
if data[1] == 'S':
jobs_submitted.append( int(data[2]) )
elif data[1] == 'C':
new_jobs_completed.append(int(data[2]))
else:
raise Exception("Unknow submessage type" + data[1] )
return (now, jobs_submitted, new_jobs_completed)
def send_bat_msg(connection, now, jids_toLaunch, jobs):
msg = "0:" + str(now) + "|"
if jids_toLaunch:
msg += str(now) + ":J:"
for jid in jids_toLaunch:
msg += str(jid) + "="
for r in jobs[jid]:
msg += str(r) + ","
msg = msg[:-1] + ";" # replace last comma by semicolon separtor between jobs
msg = msg[:-1] # remove last semicolon
else: #Do nothing
msg += str(now) +":N"
print msg
lg = struct.pack("i",int(len(msg)))
connection.sendall(lg)
connection.sendall(msg)
def load_json_workload_profile(filename):
wkp_file = open(filename)
wkp = json.load(wkp_file)
return wkp["jobs"], wkp["nb_res"]
###
#
#
server_address = '/tmp/bat_socket'
json_jobs, nb_res = load_json_workload_profile(sys.argv[1])
print "nb_res", nb_res
jobs_res_req = { j["id"]: j["res"] for j in json_jobs}
nb_jobs = len(jobs_res_req)
print "nb_jobs", nb_jobs
nb_completed_jobs = 0
jobs_res = {}
jobs_completed = []
jobs_waiting = []
sched_delay = 5.0
##
# uds creation and waiting for connection
#
sock = create_uds(server_address)
print "waiting for a connection"
connection, client_address = sock.accept()
# Submission of 1, which is instantly allocated
now_str, jobs_submitted, new_jobs_completed = read_bat_msg(connection)
assert(jobs_submitted == [1])
assert(new_jobs_completed == [])
jobs_res[1] = [0,1,2,3]
send_bat_msg(connection, float(now_str), jobs_submitted, jobs_res)
# Submission of 2, but cannot be scheduled yet
now_str, jobs_submitted, new_jobs_completed = read_bat_msg(connection)
assert(jobs_submitted == [2])
assert(new_jobs_completed == [])
jobs_submitted = []
send_bat_msg(connection, float(now_str), jobs_submitted, jobs_res)
# Completion of 1, allocation of 2
now_str, jobs_submitted, new_jobs_completed = read_bat_msg(connection)
assert(jobs_submitted == [])
assert(new_jobs_completed == [1])
jobs_submitted = [2]
jobs_res[2] = [0,1,2,3]
send_bat_msg(connection, float(now_str), jobs_submitted, jobs_res)
# Completion of 2, nothing to do
now_str, jobs_submitted, new_jobs_completed = read_bat_msg(connection)
assert(jobs_submitted == [])
assert(new_jobs_completed == [2])
send_bat_msg(connection, float(now_str), jobs_submitted, jobs_res)
# Submission of 3, which is instantly allocated
now_str, jobs_submitted, new_jobs_completed = read_bat_msg(connection)
assert(jobs_submitted == [3])
assert(new_jobs_completed == [])
jobs_res[3] = [0,1,2,3]
send_bat_msg(connection, float(now_str), jobs_submitted, jobs_res)
# Submission of 4, which is instantly allocated whereas resources are busy
now_str, jobs_submitted, new_jobs_completed = read_bat_msg(connection)
assert(jobs_submitted == [4])
assert(new_jobs_completed == [])
jobs_res[4] = [0,1,2,3]
send_bat_msg(connection, float(now_str), jobs_submitted, jobs_res)
# Completion of 3, nothing to do
now_str, jobs_submitted, new_jobs_completed = read_bat_msg(connection)
assert(jobs_submitted == [])
assert(new_jobs_completed == [3])
send_bat_msg(connection, float(now_str), jobs_submitted, jobs_res)
# Completion of 4, nothing to do
now_str, jobs_submitted, new_jobs_completed = read_bat_msg(connection)
assert(jobs_submitted == [])
assert(new_jobs_completed == [4])
send_bat_msg(connection, float(now_str), jobs_submitted, jobs_res)
now_str, jobs_submitted, new_jobs_completed = read_bat_msg(connection)
'''
0:10.000015|10.000015:S:1
0:10.000015|10.000015:J:1=0,1,2,3
0:20.002021|20.000030:S:2
0:20.002021|20.002021:N
0:20.839641|20.839641:C:1
0:20.839641|20.839641:J:2=0,1,2,3
0:25.008078|25.008078:C:2
0:25.008078|25.008078:N
0:30.000030|30.000030:S:3
0:30.000030|30.000030:J:3=0,1,2,3
0:40.002021|40.000030:S:4
0:40.002021|40.002021:J:4=0,1,2,3
0:43.005988|43.005988:C:3
0:43.005988|43.005988:N
0:143.005988|143.005988:C:4
0:143.005988|143.005988:N
'''
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment