Commit b92d2ef8 authored by S. Lackner's avatar S. Lackner

[sched] Rework event logging and serialisation

parent c2eaf32e
"""
batsim.sched.events
~~~~~~~~~~~~~~~~~~~
This module provides handling of scheduling events.
"""
import logging
import json
import csv
from .logging import Logger
class LoggingEvent:
"""Class for storing data about events triggered by the scheduler.
:param time: the simulation time when the event occurred
:param level: the importance level of the event
:param open_jobs: the number of open jobs
:param processed_jobs: the number of processed jobs (completed, killed, etc.)
:param msg: the actual message of the event
:param type: the type of the event (`str`)
:param data: additional data attached to the event (`dict`)
"""
def __init__(
self,
time,
level,
open_jobs,
processed_jobs,
msg,
type,
data):
self.time = time
self.open_jobs = open_jobs
self.processed_jobs = processed_jobs
self.level = level
self.msg = msg
self.type = type
self.data = data
def to_message(self):
"""Returns a human readable message presentation of this event."""
return "[{:.6f}] {}/{} <{}> ({})".format(
self.time, self.processed_jobs, self.open_jobs,
self.type, self.msg)
def __str__(self):
data = []
for k, v in self.data.items():
try:
k = json.dumps(k, default=lambda o: o.__dict__)
except (AttributeError, ValueError):
k = json.dumps(str(k), default=lambda o: o.__dict__)
if hasattr(v, "to_json_dict"):
v = v.to_json_dict()
try:
v = json.dumps(v, default=lambda o: o.__dict__)
except (AttributeError, ValueError):
raise ValueError(
"Object could not be serialised: {}".format(v))
else:
try:
v = json.dumps(v, default=lambda o: o.__dict__)
except (AttributeError, ValueError):
v = json.dumps(str(v), default=lambda o: o.__dict__)
data.append("{}: {}".format(k, v))
data = "{" + ", ".join(data) + "}"
return "{:.6f};{};{};{};{};{};{}".format(
self.time, self.level, self.processed_jobs, self.open_jobs,
json.dumps(self.type),
json.dumps(self.msg),
data)
@classmethod
def from_entries(cls, parts):
time = float(parts[0])
level = int(parts[1])
processed_jobs = int(parts[2])
open_jobs = int(parts[3])
type = parts[4]
msg = parts[5]
data = json.loads(parts[6])
return LoggingEvent(time, level, open_jobs, processed_jobs, msg, type,
data)
class EventLogger(Logger):
"""Logger for events which will only log to files and will write the log messages
without any additional formatting.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, streamhandler=False, **kwargs)
@property
def file_formatter(self):
return logging.Formatter('%(message)s')
def load_events_from_file(in_file):
events = []
with open(in_file, 'r') as f:
reader = csv.reader(f, delimiter=';')
for row in reader:
if row:
events.append(LoggingEvent.from_entries(row))
return events
......@@ -7,76 +7,6 @@
import logging
import os
import json
class LoggingEvent:
"""Class for storing data about events triggered by the scheduler.
:param time: the simulation time when the event occurred
:param level: the importance level of the event
:param open_jobs: the number of open jobs
:param processed_jobs: the number of processed jobs (completed, killed, etc.)
:param msg: the actual message of the event
:param type: the type of the event (`str`)
:param data: additional data attached to the event (`dict`)
"""
def __init__(
self,
time,
level,
open_jobs,
processed_jobs,
msg,
type,
data):
self.time = time
self.open_jobs = open_jobs
self.processed_jobs = processed_jobs
self.level = level
self.msg = msg
self.type = type
self.data = data
def to_message(self):
"""Returns a human readable message presentation of this event."""
return "[{:.6f}] {}/{} <{}> ({})".format(
self.time, self.processed_jobs, self.open_jobs,
self.type, self.msg)
def __str__(self):
data = []
for k, v in self.data.items():
try:
k = json.dumps(k, default=lambda o: o.__dict__)
except (AttributeError, ValueError):
k = json.dumps(str(k), default=lambda o: o.__dict__)
if hasattr(v, "to_json_dict"):
v = v.to_json_dict()
try:
v = json.dumps(v, default=lambda o: o.__dict__)
except (AttributeError, ValueError):
raise ValueError(
"Object could not be serialised: {}".format(v))
else:
try:
v = json.dumps(v, default=lambda o: o.__dict__)
except (AttributeError, ValueError):
v = json.dumps(str(v), default=lambda o: o.__dict__)
data.append("{}: {}".format(k, v))
data = "{" + ", ".join(data) + "}"
return "{:.6f};{};{};{};{};{};{}".format(
self.time, self.level, self.processed_jobs, self.open_jobs,
self.type, self.msg, data)
class Logger:
......@@ -171,16 +101,3 @@ class Logger:
def error(self, *args, **kwargs):
"""Writes a error message to the logger."""
self._logger.error(*args, **kwargs)
class EventLogger(Logger):
"""Logger for events which will only log to files and will write the log messages
without any additional formatting.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, streamhandler=False, **kwargs)
@property
def file_formatter(self):
return logging.Formatter('%(message)s')
......@@ -19,7 +19,8 @@ from .reply import ConsumedEnergyReply
from .utils import DictWrapper
from .messages import Message
from .utils import ListView
from .logging import LoggingEvent, Logger, EventLogger
from .logging import Logger
from .events import LoggingEvent, EventLogger
from .workloads import WorkloadDescription
......@@ -215,18 +216,16 @@ class Scheduler(metaclass=ABCMeta):
self._options = options
debug = self.options.get("debug", False)
export_prefix = self.options.get("export-prefix", "out")
write_events = bool(self.options.get("write-events", False))
self._log_debug_events = self.options.get("log-debug-events", False)
# Create the logger
self._logger = Logger(self, debug=debug)
self._last_published_event = None
self._event_logger = None
if write_events:
self._event_logger = EventLogger(
self, "Events", debug=debug,
to_file="{}_last_events.csv".format(export_prefix),
append_to_file="{}_events.csv".format(export_prefix))
self._event_logger = EventLogger(
self, "Events", debug=debug,
to_file="{}_events.csv".format(export_prefix))
self._sched_jobs_logger = EventLogger(
self,
......@@ -465,37 +464,34 @@ class Scheduler(metaclass=ABCMeta):
"""Writes a debug message to the logging facility."""
self._logger.debug(self._format_log_msg(msg, **kwargs))
event = self._format_event_msg(1, msg, **kwargs)
if self._event_logger:
if self._log_debug_events:
self._event_logger.info(event)
def info(self, msg, **kwargs):
"""Writes a info message to the logging facility."""
self._logger.info(self._format_log_msg(msg, **kwargs))
event = self._format_event_msg(2, msg, **kwargs)
if self._event_logger:
self._event_logger.info(event)
self._event_logger.info(event)
def warn(self, msg, **kwargs):
"""Writes a warn message to the logging facility."""
self._logger.warn(self._format_log_msg(msg, **kwargs))
event = self._format_event_msg(3, msg, **kwargs)
if self._event_logger:
self._event_logger.info(event)
self._event_logger.info(event)
def error(self, msg, **kwargs):
"""Writes a error message to the logging facility."""
self._logger.error(self._format_log_msg(msg, **kwargs))
event = self._format_event_msg(4, msg, **kwargs)
if self._event_logger:
self._event_logger.info(event)
self._event_logger.info(event)
def fatal(self, msg, **kwargs):
"""Writes a fatal message to the logging facility and terminates the scheduler."""
error_msg = self._format_log_msg(msg, **kwargs)
self._logger.error(error_msg)
event = self._format_event_msg(5, msg, **kwargs)
if self._event_logger:
self._event_logger.info(event)
self._event_logger.info(event)
raise ValueError("Fatal error: {}".format(error_msg))
def _on_pre_init(self):
......
......@@ -10,9 +10,10 @@ import os
import pandas
from batsim.batsim import Batsim
from batsim.sched.events import load_events_from_file
def merge_by_parent_job(in_jobs, in_sched_jobs, out_jobs, **kwargs):
def merge_by_parent_job(in_jobs, in_sched_jobs, in_events, out_jobs, **kwargs):
"""Function used as function in `process_jobs` to merge jobs with the same parent job id."""
idx = 0
......@@ -59,7 +60,8 @@ def merge_by_parent_job(in_jobs, in_sched_jobs, out_jobs, **kwargs):
r1["allocated_processors"])
def process_jobs(in_jobs, in_sched_jobs, functions=[], float_precision=6,
def process_jobs(in_jobs, in_sched_jobs, in_events,
functions=[], float_precision=6,
output_separator=",", **kwargs):
"""Tool for processing the job results.
......@@ -67,6 +69,8 @@ def process_jobs(in_jobs, in_sched_jobs, functions=[], float_precision=6,
:param in_sched_jobs: the file name of the jobs file written by PyBatsim
:param in_events: the file name of the events file written by PyBatsim
:param functions: the functions which should be used for processing the jobs
and generating new data files.
......@@ -83,6 +87,7 @@ def process_jobs(in_jobs, in_sched_jobs, functions=[], float_precision=6,
open(in_sched_jobs, 'r') as in_sched_jobs_file:
in_jobs_data = pandas.read_csv(in_jobs_file, sep=",")
in_sched_jobs_data = pandas.read_csv(in_sched_jobs_file, sep=";")
in_events_data = load_events_from_file(in_events)
for f in functions:
out_jobs = "{}_{}.csv".format(
......@@ -95,7 +100,8 @@ def process_jobs(in_jobs, in_sched_jobs, functions=[], float_precision=6,
index=in_jobs_data.index)
out_jobs_data.drop(out_jobs_data.index, inplace=True)
f(in_jobs_data, in_sched_jobs_data, out_jobs_data, **kwargs)
f(in_jobs_data, in_sched_jobs_data,
in_events_data, out_jobs_data, **kwargs)
out_jobs_data.to_csv(
out_jobs_file,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment