Commit 67826d23 authored by Millian Poquet's avatar Millian Poquet

The protocol is now more formally explained in a markdown file (in...

The protocol is now more formally explained in a markdown file (in doc/proto_description.md). Addition of two protocol features: job rejection and the possibility for the scheduler to ask for being awaken at a given time. These features have not been tested.
parent e3c0a064
* TODO energy
* TODO test energy
* TODO test job rejection
* TODO test nop me later
* TODO file I/O
* TODO evolving/moldable/malleable job support
* TODO VM
......@@ -8,6 +10,9 @@
* TODO shared nodes
* TODO node sleep/wakeup
* DONE job rejection
* DONE allow the scheduler to ask when he wants to be NOPped
* DONE energy
* DONE smpi profile
* DONE msg_par_homogenous profile
* DONE job composed of sequence of profile [1, 5, 5]
......
# Introduction #
Batsim is run as a (Linux) process and communicates with a scheduler via a socket.
This socket is an Unix Domain Socket one in STREAM mode.
Since the C++ version (commit b9b5e8543c3b6172 of the C++ branch, 2015-11-20),
Batsim handles the server side of the socket (previously, Batsim was client-sided).
Batsim opens a server socket on a file (not on a port since Unix Domain Sockets are used).
Batsim then listens for 1 client socket and accepts its connection.
The used protocol is a simple synchronous semi-textual protocol. Its behaviour may be summarized
by a request-reply protocol. When Batsim needs a scheduling decision, the following events occur:
1. Batsim stops the simulation.
2. Batsim sends a request to the scheduler.
3. Batsim waits for a reply from the scheduler.
4. Batsim receives and reads the reply.
5. Batsim resumes the simulation.
# Message Composition #
All messages sent in this protocol are assumed to have the format MSG_SIZE MSG_CONTENT where:
- MSG_SIZE is a 32-bit native-endianness unsigned integer, which stores the number of bytes of the MSG_CONTENT part.
- MSG_CONTENT is the real message content. It is a (non null-terminated) string: a sequence of bytes interpreted as characters thanks to the ASCII table.
Each MSG_CONTENT follows this syntax:
{PROTO_VERSION}:{TIME_MSG}|{TIME_EVENT1}:{STAMP1}[:{STAMP_DEPENDENT_CONTENT1}][|{TIME_EVENT2}:{STAMP2}[:{STAMP_DEPENDENT_CONTENT2}][...]] with :
- PROTO_VERSION is the protocol version used in the message
- TIME_MSG is the simulation time at which the message has been sent by the scheduler
- TIME_EVENT1, TIME_EVENT2, ... TIME_EVENTn are the simulation time at which the event n was supposed to be sent to the scheduler. These values must be before the corresponding TIME_MSG time. Furthermore, all events must be in chronological order.
- STAMP1, STAMP2, ... STAMPn are the stamp of each event: it allows to know what event type should be parsed. They are 1-character long.
- STAMP_DEPENDENT_CONTENT1, STAMP_DEPENDENT_CONTENT2, ... STAMP_DEPENDENT_CONTENTn stores additional information about the event. For example, when a job is completed, this field stores
the job ID of the job which just completed. This part is not mandatory, it depends on the used
stamp.
# Message Stamps #
| Proto. version | Stamp | Direction | Content syntax | Meaning
|---------------- |-------|-------------- |------------------------- |-------------
| 0+ | S | Bastim->Sched | JOB_ID | Job submission: one (static) job is available and can now be allocated tor resources.
| 0+ | C | Batsim->Sched | JOB_ID | Job completion: one (static) job finished its execution.
| 0+ | J | Sched->Batsim | JID1=MID1,MID2,MIDn[;...]| Job allocation: tells to put job JID1 on machines MID1, ..., MIDn. Many jobs might be allocated in the same event. | |
| 0+ | N | Both | No content | NOP: tells to do nothing / nothing happened.
| 1+ | P | Sched->Batsim | MACHINE_ID=PSTATE | Ask to change the pstate of a machine.
| 1+ | p | Batsim->Sched | MACHINE_ID=PSTATE | Tells the scheduler the pstate of a machine has changed.
| 1+ | R | Sched->Batsim | JOB_ID | Job rejection: the scheduler tells that one (static) job will not be computed.
| 1+ | n | Sched->Batim | TIME | NOP me later: the scheduler asks to be awaken at simulation time TIME.
# Message Examples #
## Static Job Submission ##
Batsim -> Scheduler
0:10.000015|10.000015:S:1
0:13|12:S:2|12.5:S:3|13:S:4
## Static Job Completion ##
Batsim -> Scheduler
0:15.836694|15.836694:C:1
0:40.001320|25:C:2|38.002565:C:3
## Static Job Allocation ##
Scheduler -> Batsim
0:15.000015|15.000015:J:1=1,2,0,3;2=3
0:45.00132|45.00132:J:4=3,1,2,0
## Static Job Rejection ##
Scheduler -> Batsim
0:50|50:R:5
## NOP ##
Scheduler -> Batsim or Batsim -> Scheduler
0:2|2:N
0:42|10:N|20:N|30:N|40:N
## PState Modification Request ##
Scheduler -> Batsim
1:50|50:P:0=2
1:70|60:P:0=0|60:P:1=2|70:P:2=0
## PState Modification Acknowledgement ##
Batsim -> Scheduler
1:50.000001|50.000001:p:0=2
1:60.000001|60.000001:p:0=0|60.000001:p:1=2
## NOP Me Later ##
Scheduler -> Batsim
1:100|100:n:142
Expected result (Batsim -> Scheduler):
1:142.000001|142.000001:N
\ No newline at end of file
......@@ -67,6 +67,16 @@ IPMessage::~IPMessage()
SchedulingAllocationMessage * msg = (SchedulingAllocationMessage *) data;
delete msg;
} break;
case IPMessageType::SCHED_REJECTION:
{
JobRejectedMessage * msg = (JobRejectedMessage *) data;
delete msg;
} break;
case IPMessageType::SCHED_NOP_ME_LATER:
{
NOPMeLaterMessage * msg = (NOPMeLaterMessage*) data;
delete msg;
} break;
case IPMessageType::SCHED_NOP:
{
} break;
......@@ -89,6 +99,9 @@ IPMessage::~IPMessage()
PStateModificationMessage * msg = (PStateModificationMessage *) data;
delete msg;
} break;
case IPMessageType::WAITING_DONE:
{
} break;
}
data = nullptr;
......
......@@ -17,8 +17,11 @@ enum class IPMessageType
,JOB_COMPLETED //!< Launcher/killer -> Server. The launcher tells the server a job has been completed.
,PSTATE_MODIFICATION//!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a pstate modification).
,SCHED_ALLOCATION //!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a job allocation).
,SCHED_REJECTION //!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a job rejection).
,SCHED_NOP //!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a NOP message).
,SCHED_NOP_ME_LATER //!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a NOP_ME_LATTER message).
,SCHED_READY //!< SchedulerHandler -> Server. The scheduler handler tells the server that the scheduler is ready (messages can be sent to it).
,WAITING_DONE //!< Waiter -> server. The waiter tells the server that the target time has been reached.
,SUBMITTER_HELLO //!< Submitter -> Server. The submitter tells it starts submitting to the server.
,SUBMITTER_BYE //!< Submitter -> Server. The submitter tells it stops submitting to the server.
,SWITCHED_ON //!< SwitcherON -> Server. The switcherON process tells the server the machine pstate has been changed
......@@ -35,6 +38,11 @@ struct JobCompletedMessage
int job_id; //! The job ID
};
struct JobRejectedMessage
{
int job_id; //! The job ID
};
struct SchedulingAllocation
{
int job_id;
......@@ -53,6 +61,11 @@ struct PStateModificationMessage
int new_pstate;
};
struct NOPMeLaterMessage
{
double target_time;
};
struct IPMessage
{
~IPMessage();
......@@ -94,6 +107,11 @@ struct JobSubmitterProcessArguments
BatsimContext * context;
};
struct WaiterProcessArguments
{
double target_time;
};
/**
* @brief Sends a message from the given process to the given mailbox
* @param[in] dst The destination mailbox
......
......@@ -18,6 +18,7 @@ enum class JobState
,JOB_STATE_RUNNING //!< The job has been scheduled and is currently being processed.
,JOB_STATE_COMPLETED_SUCCESSFULLY //!< The job execution finished before its walltime.
,JOB_STATE_COMPLETED_KILLED //!< The job execution time was longer than its walltime so the job had been killed.
,JOB_STATE_REJECTED //!< The job has been rejected by the scheduler.
};
struct Job
......
......@@ -258,3 +258,28 @@ int execute_job_process(int argc, char *argv[])
return 0;
}
int waiter_process(int argc, char *argv[])
{
(void) argc;
(void) argv;
WaiterProcessArguments * args = (WaiterProcessArguments *) MSG_process_get_data(MSG_process_self());
double curr_time = MSG_get_clock();
if (curr_time < args->target_time)
{
double time_to_wait = args->target_time - curr_time;
XBT_INFO("Sleeping %g seconds to reach time %g", time_to_wait, args->target_time);
MSG_process_sleep(time_to_wait);
XBT_INFO("Sleeping done");
}
else
XBT_INFO("Time %g is already reached, skipping sleep", args->target_time);
send_message("server", IPMessageType::WAITING_DONE);
delete args;
return 0;
}
......@@ -13,3 +13,5 @@ int execute_profile(BatsimContext * context,
double * remaining_time);
int execute_job_process(int argc, char *argv[]);
int waiter_process(int argc, char *argv[]);
......@@ -182,11 +182,13 @@ void Machines::createMachines(xbt_dynar_t hosts, BatsimContext *context, const s
const Machine * Machines::operator[](int machineID) const
{
xbt_assert(exists(machineID), "Cannot get machine %d: it does not exist", machineID);
return _machines[machineID];
}
Machine * Machines::operator[](int machineID)
{
xbt_assert(exists(machineID), "Cannot get machine %d: it does not exist", machineID);
return _machines[machineID];
}
......@@ -235,9 +237,7 @@ void Machines::updateMachinesOnJobRun(int jobID, const std::vector<int> & usedMa
if (!machine->jobs_being_computed.empty())
previous_top_job = *machine->jobs_being_computed.begin();
// cout << machine;
machine->jobs_being_computed.insert(jobID);
// cout << machine;
if (previous_top_job == -1 || previous_top_job != *machine->jobs_being_computed.begin())
{
......@@ -252,7 +252,6 @@ void Machines::updateMachinesOnJobEnd(int jobID, const std::vector<int> & usedMa
for (int machineID : usedMachines)
{
Machine * machine = _machines[machineID];
// cout << machine;
xbt_assert(!machine->jobs_being_computed.empty());
int previous_top_job = *machine->jobs_being_computed.begin();
......@@ -273,7 +272,6 @@ void Machines::updateMachinesOnJobEnd(int jobID, const std::vector<int> & usedMa
_tracer->set_machine_as_computing_job(machine->id, *machine->jobs_being_computed.begin(), MSG_get_clock());
}
// cout << machine;
}
}
......@@ -282,18 +280,6 @@ void Machines::setTracer(PajeTracer *tracer)
_tracer = tracer;
}
ostream & operator<<(ostream & out, const Machine & machine)
{
out << "Machine " << machine.id << ", ";
out << "state = " << machineStateToString(machine.state) << ", ";
out << "jobs = [";
std::copy(machine.jobs_being_computed.begin(), machine.jobs_being_computed.end(),
std::ostream_iterator<char>(out, " "));
out << "]" << endl;
return out;
}
string machineStateToString(MachineState state)
{
static const std::map<MachineState,std::string> conv =
......
#pragma once
#include <iostream>
#include <vector>
#include <set>
#include <map>
......@@ -39,8 +38,6 @@ struct Machine
void display_machine(bool is_energy_used) const;
};
std::ostream & operator<<(std::ostream & out, const Machine & machine);
class Machines
{
public:
......
......@@ -201,6 +201,19 @@ int request_reply_scheduler_process(int argc, char *argv[])
xbt_assert(parts2.size() == 2, "Invalid event received ('%s'): NOP messages must be composed of 2 parts separated by ':'", event_string.c_str());
send_message("server", IPMessageType::SCHED_NOP);
} break; // End of case received_stamp == NOP
case NOP_ME_LATER:
{
xbt_assert(parts2.size() == 3, "Invalid event received ('%s'): NOP_ME_LATER messages must be composed of 3 parts separated by ':'", event_string.c_str());
NOPMeLaterMessage * message = new NOPMeLaterMessage;
message->target_time = std::stod(parts2[2]);
if (message->target_time < MSG_get_clock())
XBT_WARN("Event '%s' tells to wait until time %g but it is already reached", event_string.c_str(), message->target_time);
send_message("server", IPMessageType::SCHED_NOP_ME_LATER, (void*) message);
} break; // End of case received_stamp == NOP_ME_LATER
case STATIC_JOB_ALLOCATION:
{
xbt_assert(parts2.size() == 3, "Invalid event received ('%s'): static job allocations must be composed of 3 parts separated by ':'",
......@@ -269,6 +282,27 @@ int request_reply_scheduler_process(int argc, char *argv[])
send_message("server", IPMessageType::SCHED_ALLOCATION, (void*) message);
} break; // End of case received_stamp == STATIC_JOB_ALLOCATION
case JOB_REJECTION:
{
xbt_assert(parts2.size() == 3, "Invalid event received ('%s'): static job rejections must be composed of 3 parts separated by ':'",
event_string.c_str());
// Let us create the message which will be sent to the server.
JobRejectedMessage * message = new JobRejectedMessage;
message->job_id = std::stoi(parts2[2]);
xbt_assert(context->jobs.exists(message->job_id), "Invalid event received ('%s'): job %d does not exist",
event_string.c_str(), message->job_id);
Job * job = context->jobs[message->job_id];
xbt_assert(job->state == JobState::JOB_STATE_SUBMITTED, "Invalid event received ('%s'): job %d cannot be"
" rejected now. For being rejected, a job must be submitted and not allocated yet.",
event_string.c_str(), job->id);
send_message("server", IPMessageType::SCHED_REJECTION, (void*) message);
} break; // End of case received_stamp == JOB_REJECTION
case PSTATE_SET:
{
xbt_assert(parts2.size() == 3, "Invalid event received ('%s'): pstate modifications must be composed of 3 parts separated by ':'",
......@@ -315,6 +349,7 @@ int request_reply_scheduler_process(int argc, char *argv[])
send_message("server", IPMessageType::PSTATE_MODIFICATION, (void*) message);
} break; // End of case received_stamp == PSTATE_SET
default:
{
xbt_die("Invalid event received ('%s') : unhandled network stamp received ('%c')", event_string.c_str(), received_stamp);
......@@ -341,6 +376,7 @@ int uds_server_process(int argc, char *argv[])
int nb_submitters = 0;
int nb_submitters_finished = 0;
int nb_running_jobs = 0;
const int protocol_version = 1;
bool sched_ready = true;
string send_buffer;
......@@ -401,10 +437,34 @@ int uds_server_process(int argc, char *argv[])
XBT_INFO("Job %d SUBMITTED. %d jobs submitted so far", job->id, nb_submitted_jobs);
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":S:" + std::to_string(job->id);
XBT_INFO("Message to send to scheduler: %s", send_buffer.c_str());
XBT_INFO("Message to send to scheduler: '%s'", send_buffer.c_str());
} break; // end of case JOB_SUBMITTED
case IPMessageType::SCHED_REJECTION:
{
xbt_assert(task_data->data != nullptr);
JobRejectedMessage * message = (JobRejectedMessage *) task_data->data;
Job * job = context->jobs[message->job_id];
job->state = JobState::JOB_STATE_REJECTED;
nb_completed_jobs++;
XBT_INFO("Job %d has been rejected", job->id);
} break; // end of case SCHED_REJECTION
case IPMessageType::SCHED_NOP_ME_LATER:
{
xbt_assert(task_data->data != nullptr);
NOPMeLaterMessage * message = (NOPMeLaterMessage *) task_data->data;
WaiterProcessArguments * args = new WaiterProcessArguments;
args->target_time = message->target_time;
string pname = "waiter " + to_string(message->target_time);
MSG_process_create(pname.c_str(), waiter_process, (void*) args, context->machines.masterMachine()->host);
} break; // end of case SCHED_NOP_ME_LATER
case IPMessageType::PSTATE_MODIFICATION:
{
xbt_assert(task_data->data != nullptr);
......@@ -500,6 +560,24 @@ int uds_server_process(int argc, char *argv[])
nb_scheduled_jobs++;
xbt_assert(nb_scheduled_jobs <= nb_submitted_jobs);
if (context->energy_used)
{
// Check that every machine is in a computation pstate
for (const int & machineID : allocation.machine_ids)
{
Machine * machine = context->machines[machineID];
int ps = MSG_host_get_pstate(machine->host);
xbt_assert(machine->has_pstate(ps));
xbt_assert(machine->pstates[ps] == PStateType::COMPUTATION_PSTATE,
"Invalid job allocation: machine %d ('%s') is not in a computation pstate (ps=%d)",
machine->id, machine->name.c_str(), ps);
xbt_assert(machine->state == MachineState::COMPUTING || machine->state == MachineState::IDLE,
"Invalid job allocation: machine %d ('%s') cannot compute jobs now (the machine is"
" neither computing nor being idle)", machine->id, machine->name.c_str());
}
}
ExecuteJobProcessArguments * exec_args = new ExecuteJobProcessArguments;
exec_args->context = context;
exec_args->allocation = allocation;
......@@ -508,11 +586,19 @@ int uds_server_process(int argc, char *argv[])
}
} break; // end of case SCHED_ALLOCATION
case IPMessageType::WAITING_DONE:
{
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":N";
XBT_INFO("Message to send to scheduler: '%s'", send_buffer.c_str());
} break; // end of case WAITING_DONE
case IPMessageType::SCHED_READY:
{
sched_ready = true;
} break; // end of case SCHED_READY
case IPMessageType::SWITCHED_ON:
{
xbt_assert(task_data->data != nullptr);
......@@ -526,6 +612,7 @@ int uds_server_process(int argc, char *argv[])
std::to_string(machine->id) + "=" + std::to_string(message->new_pstate);
XBT_INFO("Message to send to scheduler : '%s'", send_buffer.c_str());
} break; // end of case SWITCHED_ON
case IPMessageType::SWITCHED_OFF:
{
xbt_assert(task_data->data != nullptr);
......@@ -548,7 +635,7 @@ int uds_server_process(int argc, char *argv[])
{
RequestReplyProcessArguments * req_rep_args = new RequestReplyProcessArguments;
req_rep_args->context = context;
req_rep_args->send_buffer = "0:" + to_string(MSG_get_clock()) + send_buffer;
req_rep_args->send_buffer = to_string(protocol_version) + ":" + to_string(MSG_get_clock()) + send_buffer;
send_buffer.clear();
MSG_process_create("Scheduler REQ-REP", request_reply_scheduler_process, (void*)req_rep_args, MSG_host_self());
......
......@@ -10,7 +10,10 @@ enum NetworkStamp : char
NOP = 'N',
STATIC_JOB_SUBMISSION = 'S',
STATIC_JOB_COMPLETION = 'C',
PSTATE_SET = 'P'
PSTATE_SET = 'P',
NOP_ME_LATER = 'n',
PSTATE_HAS_BEEN_SET = 'p'
};
class UnixDomainSocket
......
......@@ -45,6 +45,8 @@ int switch_on_machine_process(int argc, char *argv[])
machine->id, machine->name.c_str(), pstate);
MSG_host_set_pstate(machine->host, pstate);
machine->state = MachineState::IDLE;
send_message("server", IPMessageType::SWITCHED_ON, (void *) args->message);
delete args;
return 0;
......@@ -88,6 +90,8 @@ int switch_off_machine_process(int argc, char *argv[])
machine->id, machine->name.c_str(), pstate);
MSG_host_set_pstate(machine->host, pstate);
machine->state = MachineState::SLEEPING;
send_message("server", IPMessageType::SWITCHED_OFF, (void *) args->message);
delete args;
return 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment