Commit 8bb9f240 authored by Millian Poquet's avatar Millian Poquet

Protocol update: the scheduler can now ask for the current total consumed...

Protocol update: the scheduler can now ask for the current total consumed energy and Batsim can give the answer. It has been tested via fcfs_sleeper.py, which asks for the consumed energy instead of NOPping with probability 1/2.
parent e01bc917
......@@ -44,7 +44,9 @@ Each MSG_CONTENT follows this syntax:
| 1+ | P | Sched->Batsim | MACHINE_ID=PSTATE | Ask to change the pstate of a machine.
| 1+ | p | Batsim->Sched | MACHINE_ID=PSTATE | Tells the scheduler the pstate of a machine has changed.
| 1+ | R | Sched->Batsim | JOB_ID | Job rejection: the scheduler tells that one (static) job will not be computed.
| 1+ | n | Sched->Batim | TIME | NOP me later: the scheduler asks to be awaken at simulation time TIME.
| 1+ | n | Sched->Batsim | TIME | NOP me later: the scheduler asks to be awaken at simulation time TIME.
| 1+ | E | Sched->Batsim | No content | Ask Batsim to give the total consumed energy (from time 0 to now) in Joules. Works only in energy mode.
| 1+ | e | Batsim->Sched | CONSUMED_ENERGY | Batsim gives the total consumed energy (from time 0 to now) in Joules. Works only in energy mode.
# Message Examples #
......
......@@ -607,17 +607,7 @@ void exportScheduleToCSV(const string &filename, double scheduling_time, BatsimC
}
}
long double total_consumed_energy = 0;
if (context->energy_used)
{
for (const Machine * m : context->machines.machines())
{
total_consumed_energy += MSG_host_get_consumed_energy(m->host);
}
}
else
total_consumed_energy = -1;
long double total_consumed_energy = context->machines.total_consumed_energy(context);
char * buf;
int ret = asprintf(&buf, "%d,%d,%d,%d,%lf,%lf,%lf,%lf,%lf,%Lg\n",
......
......@@ -5,7 +5,7 @@ import socket
import sys
import os
import json
from random import sample
import random
from sortedcontainers import SortedSet
from enum import Enum
......@@ -65,6 +65,9 @@ def read_bat_msg(connection):
elif data[1] == 'p':
data2 = data[2].split('=')
pstate_changed[int(data2[0])] = int(data2[1])
elif data[1] == 'e':
consumed_energy = float(data[2])
print("Consumed energy is " + str(consumed_energy) + " J")
else:
raise Exception("Unknow submessage type" + data[1] )
......@@ -91,7 +94,10 @@ def send_bat_msg(connection, now, jids_toLaunch, jobs, pstates_to_change):
msg += "|" + str(now) + ":P:" + part
if not didSomething:
msg += "|" + str(now) +":N"
if random.choice([0,1]) == 0:
msg += "|" + str(now) +":N"
else:
msg += "|" + str(now) +":E"
print(msg)
lg = struct.pack("i",int(len(msg)))
......
......@@ -47,6 +47,9 @@ std::string ipMessageTypeToString(IPMessageType type)
case IPMessageType::SCHED_NOP_ME_LATER:
s = "SCHED_NOP_ME_LATER";
break;
case IPMessageType::SCHED_TELL_ME_ENERGY:
s = "SCHED_TELL_ME_ENERGY";
break;
case IPMessageType::SCHED_READY:
s = "SCHED_READY";
break;
......@@ -113,6 +116,9 @@ IPMessage::~IPMessage()
case IPMessageType::SCHED_NOP:
{
} break;
case IPMessageType::SCHED_TELL_ME_ENERGY:
{
} break;
case IPMessageType::SCHED_READY:
{
} break;
......
......@@ -13,19 +13,20 @@ struct BatsimContext;
enum class IPMessageType
{
JOB_SUBMITTED //!< Submitter -> Server. The submitter tells the server a new job has been submitted.
,JOB_COMPLETED //!< Launcher/killer -> Server. The launcher tells the server a job has been completed.
,PSTATE_MODIFICATION//!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a pstate modification).
,SCHED_ALLOCATION //!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a job allocation).
,SCHED_REJECTION //!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a job rejection).
,SCHED_NOP //!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a NOP message).
,SCHED_NOP_ME_LATER //!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a NOP_ME_LATTER message).
,SCHED_READY //!< SchedulerHandler -> Server. The scheduler handler tells the server that the scheduler is ready (messages can be sent to it).
,WAITING_DONE //!< Waiter -> server. The waiter tells the server that the target time has been reached.
,SUBMITTER_HELLO //!< Submitter -> Server. The submitter tells it starts submitting to the server.
,SUBMITTER_BYE //!< Submitter -> Server. The submitter tells it stops submitting to the server.
,SWITCHED_ON //!< SwitcherON -> Server. The switcherON process tells the server the machine pstate has been changed
,SWITCHED_OFF //!< SwitcherOFF -> Server. The switcherOFF process tells the server the machine pstate has been changed.
JOB_SUBMITTED //!< Submitter -> Server. The submitter tells the server a new job has been submitted.
,JOB_COMPLETED //!< Launcher/killer -> Server. The launcher tells the server a job has been completed.
,PSTATE_MODIFICATION //!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a pstate modification).
,SCHED_ALLOCATION //!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a job allocation).
,SCHED_REJECTION //!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a job rejection).
,SCHED_NOP //!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a NOP message).
,SCHED_NOP_ME_LATER //!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a NOP_ME_LATTER message).
,SCHED_TELL_ME_ENERGY //!< SchedulerHandler -> Server. The scheduler handler tells the server a scheduling event occured (a TELL_ME_CONSUMED_ENERGY message).
,SCHED_READY //!< SchedulerHandler -> Server. The scheduler handler tells the server that the scheduler is ready (messages can be sent to it).
,WAITING_DONE //!< Waiter -> server. The waiter tells the server that the target time has been reached.
,SUBMITTER_HELLO //!< Submitter -> Server. The submitter tells it starts submitting to the server.
,SUBMITTER_BYE //!< Submitter -> Server. The submitter tells it stops submitting to the server.
,SWITCHED_ON //!< SwitcherON -> Server. The switcherON process tells the server the machine pstate has been changed
,SWITCHED_OFF //!< SwitcherOFF -> Server. The switcherOFF process tells the server the machine pstate has been changed.
};
struct JobSubmittedMessage
......
......@@ -232,6 +232,21 @@ const Machine *Machines::masterMachine() const
return _masterMachine;
}
long double Machines::total_consumed_energy(BatsimContext *context) const
{
long double total_consumed_energy = 0;
if (context->energy_used)
{
for (const Machine * m : _machines)
total_consumed_energy += MSG_host_get_consumed_energy(m->host);
}
else
total_consumed_energy = -1;
return total_consumed_energy;
}
void Machines::updateMachinesOnJobRun(int jobID, const std::vector<int> & usedMachines)
{
for (int machineID : usedMachines)
......
......@@ -58,6 +58,7 @@ public:
const std::vector<Machine *> & machines() const;
const Machine * masterMachine() const;
long double total_consumed_energy(BatsimContext * context) const;
private:
std::vector<Machine *> _machines;
......
......@@ -352,6 +352,15 @@ int request_reply_scheduler_process(int argc, char *argv[])
} break; // End of case received_stamp == PSTATE_SET
case TELL_ME_CONSUMED_ENERGY:
{
xbt_assert(parts2.size() == 2, "Invalid event received ('%s'): messages to ask the consumed energy must be composed of 2 parts separated by ':'",
event_string.c_str());
xbt_assert(context->energy_used, "A message to ask the consumed energy has been received whereas energy is not currently used by Batsim."
" You can use the energy plugin of Batsim via a command-line option, try --help to display those options.");
send_message("server", IPMessageType::SCHED_TELL_ME_ENERGY);
} break; // End of case received_stamp == TELL_ME_CONSUMED_ENERGY
default:
{
xbt_die("Invalid event received ('%s') : unhandled network stamp received ('%c')", event_string.c_str(), received_stamp);
......
......@@ -12,8 +12,10 @@ enum NetworkStamp : char
STATIC_JOB_COMPLETION = 'C',
PSTATE_SET = 'P',
NOP_ME_LATER = 'n',
TELL_ME_CONSUMED_ENERGY = 'E',
PSTATE_HAS_BEEN_SET = 'p'
PSTATE_HAS_BEEN_SET = 'p',
CONSUMED_ENERGY = 'e'
};
class UnixDomainSocket
......
......@@ -226,8 +226,8 @@ int uds_server_process(int argc, char *argv[])
const Machine * machine = context->machines[machineID];
xbt_assert(machine->jobs_being_computed.empty(),
"Invalid job allocation: machine %d ('%s') is currently computing jobs (these ones:"
" {%s}) whereas space sharing is forbidden. Space sharing can be enabled via an option"
" (rerun with --help to display the available options)", machine->id, machine->name.c_str(),
" {%s}) whereas space sharing is forbidden. Space sharing can be enabled via an option,"
" try --help to display the available options", machine->id, machine->name.c_str(),
machine->jobs_being_computed_as_string().c_str());
}
}
......@@ -262,7 +262,7 @@ int uds_server_process(int argc, char *argv[])
case IPMessageType::WAITING_DONE:
{
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":N";
XBT_DEBUG( "Message to send to scheduler: '%s'", send_buffer.c_str());
XBT_DEBUG("Message to send to scheduler: '%s'", send_buffer.c_str());
} break; // end of case WAITING_DONE
case IPMessageType::SCHED_READY:
......@@ -282,7 +282,7 @@ int uds_server_process(int argc, char *argv[])
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":p:" +
std::to_string(machine->id) + "=" + std::to_string(message->new_pstate);
XBT_DEBUG( "Message to send to scheduler : '%s'", send_buffer.c_str());
XBT_DEBUG("Message to send to scheduler : '%s'", send_buffer.c_str());
--nb_switching_machines;
} break; // end of case SWITCHED_ON
......@@ -298,10 +298,19 @@ int uds_server_process(int argc, char *argv[])
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":p:" +
std::to_string(machine->id) + "=" + std::to_string(message->new_pstate);
XBT_DEBUG( "Message to send to scheduler : '%s'", send_buffer.c_str());
XBT_DEBUG("Message to send to scheduler : '%s'", send_buffer.c_str());
--nb_switching_machines;
} break; // end of case SWITCHED_ON
case IPMessageType::SCHED_TELL_ME_ENERGY:
{
long double total_consumed_energy = context->machines.total_consumed_energy(context);
send_buffer += "|" + std::to_string(MSG_get_clock()) + ":e:" +
std::to_string(total_consumed_energy);
XBT_DEBUG("Message to send to scheduler : '%s'", send_buffer.c_str());
}
} // end of switch
delete task_data;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment