Commit a1707d6a authored by S. Lackner's avatar S. Lackner

[code] send status from pybatsim to experiment launcher and improve logging

parent 272ad336
......@@ -5,71 +5,12 @@ from enum import Enum
import json
import sys
import redis
from .network import NetworkHandler
import redis
import zmq
class NetworkHandler:
def __init__(
self,
socket_endpoint='tcp://*:28000',
verbose=0,
timeout=2000):
self.socket_endpoint = socket_endpoint
self.verbose = verbose
self.timeout = timeout
self.context = zmq.Context()
self.connection = None
def send(self, msg):
assert self.connection, "Connection not open"
if self.verbose > 0:
print("[PYBATSIM]: SEND_MSG\n {}".format(msg),
flush=True)
self.connection.send_string(json.dumps(msg))
def recv(self, blocking=False):
assert self.connection, "Connection not open"
if blocking or self.timeout is None or self.timeout <= 0:
self.connection.RCVTIMEO = -1
else:
self.connection.RCVTIMEO = self.timeout
try:
msg = self.connection.recv()
except zmq.error.Again:
return None
msg = json.loads(msg.decode('utf-8'))
if self.verbose > 0:
print('[PYBATSIM]: RECEIVED_MSG\n {}'.format(
json.dumps(msg, indent=2)), flush=True)
return msg
def open(self):
assert not self.connection, "Connection already open"
self.connection = self.context.socket(zmq.REP)
if self.verbose > 0:
print("[PYBATSIM]: binding to {addr}"
.format(addr=self.socket_endpoint), flush=True)
self.connection.bind(self.socket_endpoint)
def close(self):
if self.connection:
self.connection.close()
self.connection = None
def __enter__(self):
self.open()
return self
def __exit__(self, type, value, traceback):
self.close()
class Batsim(object):
WORKLOAD_JOB_SEPARATOR = "!"
......@@ -77,12 +18,17 @@ class Batsim(object):
def __init__(self, scheduler,
network_handler=None,
event_handler=None,
validatingmachine=None,
handle_dynamic_notify=True):
self.running_simulation = False
if network_handler is None:
network_handler = NetworkHandler()
network_handler = NetworkHandler('tcp://*:28000')
if event_handler is None:
event_handler = NetworkHandler(
'tcp://127.0.0.1:28001', type=zmq.PUB)
self.network = network_handler
self.event_publisher = event_handler
self.handle_dynamic_notify = handle_dynamic_notify
self.jobs = dict()
......@@ -110,7 +56,8 @@ class Batsim(object):
self.has_dynamic_job_submissions = False
self.network.open()
self.network.bind()
self.event_publisher.bind()
self.scheduler.bs = self
# import pdb; pdb.set_trace()
......@@ -120,6 +67,12 @@ class Batsim(object):
self.scheduler.onAfterBatsimInit()
self.initialized = True
def publish_event(self, event):
"""Sends a message to subscribed event listeners (e.g. external processes which want to
observe the simulation).
"""
self.event_publisher.send_string(event)
def time(self):
return self._current_time
......@@ -391,9 +344,7 @@ class Batsim(object):
elif event_type == "SIMULATION_ENDS":
assert self.running_simulation, "No simulation is currently running"
self.running_simulation = False
print(
"[PYBATSIM]: All jobs have been submitted and completed!",
flush=True)
print("All jobs have been submitted and completed!")
finished_received = True
self.scheduler.onSimulationEnds()
elif event_type == "JOB_SUBMITTED":
......@@ -477,6 +428,7 @@ class Batsim(object):
if finished_received:
self.network.close()
self.event_publisher.close()
return not finished_received
......
"""
batsim.network
~~~~~~~~~~~~~~
Handle zmq network connections.
"""
import zmq
import json
class NetworkHandler:
def __init__(
self,
socket_endpoint,
verbose=0,
timeout=2000,
type=zmq.REP):
self.socket_endpoint = socket_endpoint
self.verbose = verbose
self.timeout = timeout
self.context = zmq.Context()
self.connection = None
self.type = type
def send(self, msg):
self.send_string(json.dumps(msg))
def send_string(self, msg):
assert self.connection, "Connection not open"
if self.verbose > 0:
print("[PYBATSIM]: SEND_MSG\n {}".format(msg))
self.connection.send_string(msg)
def recv(self, blocking=False):
msg = self.recv_string(blocking=blocking)
if msg is not None:
msg = json.loads(msg)
return msg
def recv_string(self, blocking=False):
assert self.connection, "Connection not open"
if blocking or self.timeout is None or self.timeout <= 0:
self.connection.RCVTIMEO = -1
else:
self.connection.RCVTIMEO = self.timeout
try:
msg = self.connection.recv_string()
except zmq.error.Again:
return None
if self.verbose > 0:
print('[PYBATSIM]: RECEIVED_MSG\n {}'.format(msg))
return msg
def bind(self):
assert not self.connection, "Connection already open"
self.connection = self.context.socket(self.type)
if self.verbose > 0:
print("[PYBATSIM]: binding to {addr}"
.format(addr=self.socket_endpoint))
self.connection.bind(self.socket_endpoint)
def connect(self):
assert not self.connection, "Connection already open"
self.connection = self.context.socket(self.type)
if self.verbose > 0:
print("[PYBATSIM]: connecting to {addr}"
.format(addr=self.socket_endpoint))
self.connection.connect(self.socket_endpoint)
def subscribe(self, pattern=b''):
self.type = zmq.SUB
self.connect()
self.connection.setsockopt(zmq.SUBSCRIBE, pattern)
def close(self):
if self.connection:
self.connection.close()
self.connection = None
......@@ -282,19 +282,61 @@ class Allocation:
self._previously_allocated = True
def __str__(self):
jobid = None
data = self.to_json_dict()
return (
"<{} starttime:{} est_endtime:{} endtime: {} walltime:{} resources:{} special:{} allocated:{} job:{}>"
.format(
self.__class__.__name__,
data["start_time"],
data["estimated_end_time"],
data["end_time"],
data["walltime"],
[r.name for r in self.resources],
[r.name for r in self.special_resources],
[r.name for r in self.allocated_resources],
data["job"]))
def to_json_dict(self, recursive=True):
"""Returns a dict representation of this object.
:param recursive: whether object references should be resolved
"""
job = None
if self.job:
jobid = self.job.id
if recursive:
job = self.job.to_json_dict(recursive=False)
else:
job = self.job.id
resources = []
for r in self._resources:
resources.append(r.name)
if recursive:
resources.append(r.to_json_dict(recursive=False))
else:
resources.append(r.name)
special = []
for r in self._special_resources:
if recursive:
special.append(r.to_json_dict(recursive=False))
else:
special.append(r.name)
allocated = []
for r in self._allocated_resources:
allocated.append(r.name)
return (
"<Allocation starttime:{} endtime:{} walltime:{} resources:{} allocated:{} job:{}>"
.format(
self.start_time, self.end_time, self.walltime, resources, allocated, jobid))
if recursive:
allocated.append(r.to_json_dict(recursive=False))
else:
allocated.append(r.name)
return {
"start_time": self.start_time,
"estimated_end_time": self.estimated_end_time,
"end_time": self.end_time,
"walltime": self.walltime,
"resources": resources,
"special": special,
"allocated": allocated,
"job": job
}
......@@ -584,16 +584,49 @@ class Job:
self._jobs_list.update_element(self)
def __str__(self):
data = self.to_json_dict()
return (
"<Job {}; queue:{} sub:{} reqtime:{} res:{} prof:{} start:{} fin:{} stat:{} killreason:{} ret:{} comment:{}>"
.format(
self.id, self.number, self.submit_time, self.requested_time,
self.requested_resources, self.profile,
self.start_time,
self.finish_time, self.state,
self.kill_reason,
self.return_code,
self.comment))
data["id"], data["queue_number"], data["submit_time"], data["requested_time"],
data["requested_resources"], data["profile"],
data["start_time"],
data["finish_time"], data["state"],
data["kill_reason"],
data["return_code"],
data["comment"]))
def to_json_dict(self, recursive=True):
"""Returns a dict representation of this object.
:param recursive: whether object references should be resolved
"""
profile = None
profile_name = None
if self.profile is not None:
profile = self.profile.to_dict()
profile_name = self.profile.name
state = None
if self.state is not None:
state = self.state.name
return {
"id": self.id,
"queue_number": self.number,
"submit_time": self.submit_time,
"requested_time": self.requested_time,
"requested_resources": self.requested_resources,
"profile": profile,
"profile_name": profile_name,
"start_time": self.start_time,
"finish_time": self.finish_time,
"state": state,
"kill_reason": self.kill_reason,
"return_code": self.return_code,
"comment": self.comment
}
def submit_sub_job(self, *args, **kwargs):
job = self.sub_jobs_workload.new_job(*args, **kwargs)
......
......@@ -7,6 +7,7 @@
import logging
import os
import json
class LoggingEvent:
......@@ -16,6 +17,10 @@ class LoggingEvent:
:param level: the importance level of the event
:param open_jobs: the number of open jobs
:param processed_jobs: the number of processed jobs (completed, killed, etc.)
:param msg: the actual message of the event
:param type: the type of the event (`str`)
......@@ -23,20 +28,55 @@ class LoggingEvent:
:param data: additional data attached to the event (`dict`)
"""
def __init__(self, time, level, msg, type, data):
def __init__(
self,
time,
level,
open_jobs,
processed_jobs,
msg,
type,
data):
self.time = time
self.open_jobs = open_jobs
self.processed_jobs = processed_jobs
self.level = level
self.msg = msg
self.type = type
self.data = data
def to_message(self):
"""Returns a human readable message presentation of this event."""
return "[{:.6f}] {}/{} <{}> ({})".format(
self.time, self.processed_jobs, self.open_jobs,
self.type, self.msg)
def __str__(self):
data = ";".join(
["{}={}".format(
str(k).replace(";", ","),
str(v).replace(";", ",")) for k, v in self.data.items()])
return "{:.6f};{};{};{};{}".format(
self.time, self.level, self.type, self.msg, data)
data = []
for k, v in self.data.items():
try:
k = json.dumps(k, default=lambda o: o.__dict__)
except (AttributeError, ValueError):
k = json.dumps(str(k), default=lambda o: o.__dict__)
if hasattr(v, "to_json_dict"):
v = v.to_json_dict()
try:
v = json.dumps(v, default=lambda o: o.__dict__)
except (AttributeError, ValueError):
raise ValueError(
"Object could not be serialised: {}".format(v))
else:
try:
v = json.dumps(v, default=lambda o: o.__dict__)
except (AttributeError, ValueError):
v = json.dumps(str(v), default=lambda o: o.__dict__)
data.append("{}: {}".format(k, v))
data = "{" + ", ".join(data) + "}"
return "{:.6f};{};{};{};{};{};{}".format(
self.time, self.level, self.processed_jobs, self.open_jobs,
self.type, self.msg, data)
class Logger:
......
......@@ -206,11 +206,34 @@ class Resource:
self.on_free(allocation)
def __str__(self):
data = self.to_json_dict()
return (
"<Resource {}; name:{} pstate:{} allocs:{}>"
.format(
self.id, self.name, self.pstate,
[str(a) for a in self.allocations]))
"<{} {}; name={} allocs:{}>"
.format(self.__class__.__name__, data["id"], data["name"],
data["allocations"]))
def to_json_dict(self, recursive=True):
"""Returns a dict representation of this object.
:param recursive: whether object references should be resolved
"""
data = {
"id": self.id,
"name": self.name
}
if recursive:
allocations = []
for a in self.allocations:
if recursive:
allocations.append(a.to_json_dict(recursive=False))
else:
if a.job:
allocations.append(a.job.id)
else:
allocations.append([r.name for r in a.resources])
data["allocations"] = allocations
return data
def on_allocate(self, allocation):
pass
......
......@@ -351,12 +351,34 @@ class Scheduler(metaclass=ABCMeta):
def _format_event_msg(self, level, msg, type="msg", **kwargs):
msg = msg.format(**kwargs)
event = LoggingEvent(self.time, level, msg, type, kwargs)
try:
open_jobs = self._batsim.nb_jobs_received
processed_jobs = (self._batsim.nb_jobs_completed +
self._batsim.nb_jobs_failed +
self._batsim.nb_jobs_timeout +
self._batsim.nb_jobs_killed +
len(self._batsim.jobs_manually_changed))
except AttributeError:
# Batsim is not initialised
open_jobs = 0
processed_jobs = 0
event = LoggingEvent(self.time, level, open_jobs, processed_jobs,
msg, type, kwargs)
self._events.append(event)
event_str = event.to_message()
try:
self._batsim.publish_event(event_str)
except AttributeError:
# Batsim is not initialised
pass
self.on_event(event)
return str(event)
return event_str
def _log_job_header(self):
header = [
......
This diff is collapsed.
......@@ -9,6 +9,7 @@ Options:
-v --verbose Be verbose.
-p --protect Protect the scheduler using a validating machine.
-s --socket-endpoint=<endpoint> Batsim socket endpoint to use [default: tcp://*:28000]
-e --event-socket-endpoint=<endpoint> Socket endpoint to use to publish scheduler events [default: tcp://*:28001]
-o --options=<options_string> A Json string to pass to the scheduler [default: {}]
-t --timeout=<timeout> How long to wait for responses from Batsim [default: 2000]
'''
......@@ -26,6 +27,7 @@ from batsim.sched import as_scheduler
from batsim.validatingmachine import ValidatingMachine
from docopt import docopt
import zmq
def module_to_class(module):
......@@ -105,6 +107,7 @@ def main():
scheduler_filename = arguments['<scheduler>']
socket_endpoint = arguments['--socket-endpoint']
event_socket_endpoint = arguments['--event-socket-endpoint']
print("Starting simulation...", flush=True)
print("Scheduler:", scheduler_filename, flush=True)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment