Commit 3c2b9f89 authored by S. Lackner's avatar S. Lackner

[sched] Improve event handling and replace out_sched_jobs by using the event log

parent bb15afdb
......@@ -8,8 +8,10 @@
import logging
import json
import csv
import io
from .logging import Logger
from .utils import ObserveList, filter_list
class LoggingEvent:
......@@ -54,33 +56,42 @@ class LoggingEvent:
self.type, self.msg)
def __str__(self):
data = []
for k, v in self.data.items():
def conv_obj(o):
try:
k = json.dumps(k, default=lambda o: o.__dict__)
return o.__dict__
except (AttributeError, ValueError):
k = json.dumps(str(k), default=lambda o: o.__dict__)
return str(o)
data = {}
for k, v in self.data.items():
k = str(k)
if hasattr(v, "to_json_dict"):
v = v.to_json_dict()
try:
v = json.dumps(v, default=lambda o: o.__dict__)
except (AttributeError, ValueError):
raise ValueError(
"Object could not be serialised: {}".format(v))
else:
try:
v = json.dumps(v, default=lambda o: o.__dict__)
except (AttributeError, ValueError):
v = json.dumps(str(v), default=lambda o: o.__dict__)
data.append("{}: {}".format(k, v))
data = "{" + ", ".join(data) + "}"
return "{:.6f};{};{};{};{};{};{}".format(
self.time, self.level, self.processed_jobs, self.open_jobs,
json.dumps(self.type),
json.dumps(self.msg),
data)
elif hasattr(v, "__iter__") and not isinstance(v, str):
new_v = []
for e in v:
if hasattr(e, "to_json_dict"):
e = e.to_json_dict()
new_v.append(e)
v = new_v
data[k] = v
try:
data = json.dumps(data, default=lambda o: conv_obj(o))
except Exception as e:
raise ValueError(
"Error while dumping json data: {}"
.format(data))
output = io.StringIO()
csvdata = [self.time, self.level, self.processed_jobs, self.open_jobs,
self.type, self.msg, data]
writer = csv.writer(
output,
quoting=csv.QUOTE_NONNUMERIC,
delimiter=';')
writer.writerow(csvdata)
return output.getvalue().strip()
@classmethod
def from_entries(cls, parts):
......@@ -90,12 +101,80 @@ class LoggingEvent:
open_jobs = int(parts[3])
type = parts[4]
msg = parts[5]
data = json.loads(parts[6])
try:
data = json.loads(parts[6])
except Exception:
raise ValueError(
"Error while parsing data entry in line: {}"
.format(parts))
return LoggingEvent(time, level, open_jobs, processed_jobs, msg, type,
data)
class EventList(ObserveList):
def filter(
self,
*args,
time_after=None,
time_at=None,
time_before=None,
level=None,
type=None,
**kwargs):
"""Filter the event list to search for specific events.
:param time_after: Search for events after a specified time.
:param time_at: Search for events at a specified time.
:param time_before: Search for events before a specified time.
:param level: Search for events with a given logging level.
:param type: Search for events with a given event type.
"""
no_filters = False
if time_after is None and time_at is None and time_before is None and \
level is None and type is None:
no_filters = True
# Filter events
def filter_events(events, **kwargs):
if no_filters:
yield from events
else:
for e in events:
if time_after is not None:
if e.time > time_after:
yield e
continue
if time_before is not None:
if e.time < time_before:
yield e
continue
if time_at is not None:
if e.time == time_at:
yield e
continue
if level is not None:
if e.level == level:
yield e
continue
if type is not None:
if e.type == type:
yield e
continue
return self.create(
filter_list(
self._data,
[filter_events],
*args,
**kwargs))
class EventLogger(Logger):
"""Logger for events which will only log to files and will write the log messages
without any additional formatting.
......@@ -110,10 +189,10 @@ class EventLogger(Logger):
def load_events_from_file(in_file):
events = []
events = EventList()
with open(in_file, 'r') as f:
reader = csv.reader(f, delimiter=';')
reader = csv.reader(f, quoting=csv.QUOTE_NONNUMERIC, delimiter=';')
for row in reader:
if row:
events.append(LoggingEvent.from_entries(row))
events.add(LoggingEvent.from_entries(row))
return events
......@@ -479,8 +479,6 @@ class Job:
self._scheduler.info(
"Rejecting job ({job}), reason={reason}",
job=self, reason=self.rejected_reason, type="job_rejection")
self._scheduler._log_job(
self._scheduler.time, self, "rejected", reason)
self._scheduler._batsim.reject_jobs([self._batsim_job])
del self._scheduler._scheduler._jobmap[self._batsim_job.id]
......@@ -593,8 +591,8 @@ class Job:
Job.State.COMPLETED_KILLED]:
self._batsim_job.finish_time = self._scheduler.time
self._batsim_job.kill_reason = kill_reason
self._batsim_job.return_code = return_code or 0 if state == Job.State.COMPLETED_SUCCESSFULLY else 1
self._scheduler._log_job(self._scheduler.time, self, "completed")
self._batsim_job.return_code = (
return_code or 0 if state == Job.State.COMPLETED_SUCCESSFULLY else 1)
self._jobs_list.update_element(self)
def __str__(self):
......@@ -626,8 +624,25 @@ class Job:
if self.state is not None:
state = self.state.name
split_id = self.id.split(Batsim.WORKLOAD_JOB_SEPARATOR)
parent_id = ""
parent_workload_name = ""
parent_number = ""
if self.parent_job:
parent_id = self.parent_job.id
parent_split_id = parent_id.split(Batsim.WORKLOAD_JOB_SEPARATOR)
parent_workload_name = parent_split_id[0]
parent_number = parent_split_id[1]
return {
"id": self.id,
"workload_name": split_id[0],
"number": split_id[1],
"parent_id": parent_id,
"parent_workload_name": parent_workload_name,
"parent_number": parent_number,
"queue_number": self.number,
"submit_time": self.submit_time,
"requested_time": self.requested_time,
......@@ -637,6 +652,7 @@ class Job:
"start_time": self.start_time,
"finish_time": self.finish_time,
"state": state,
"success": True if self.success else False,
"kill_reason": self.kill_reason,
"return_code": self.return_code,
"comment": self.comment
......
......@@ -20,7 +20,7 @@ from .utils import DictWrapper
from .messages import Message
from .utils import ListView
from .logging import Logger
from .events import LoggingEvent, EventLogger
from .events import LoggingEvent, EventLogger, EventList
from .workloads import WorkloadDescription
......@@ -90,7 +90,6 @@ class BaseBatsimScheduler(BatsimScheduler):
for job in jobobjs:
job._do_complete_job()
self._scheduler._log_job(self._scheduler.time, job, "killed")
self._scheduler.on_jobs_killed(jobobjs)
self._scheduler._do_schedule()
......@@ -147,7 +146,6 @@ class BaseBatsimScheduler(BatsimScheduler):
self._scheduler.info("Job has completed its execution ({job})",
job=jobobj, type="job_completion_received")
self._scheduler._log_job(self._scheduler.time, jobobj, "completed")
jobobj._do_complete_job()
......@@ -227,14 +225,7 @@ class Scheduler(metaclass=ABCMeta):
self, "Events", debug=debug,
to_file="{}_events.csv".format(export_prefix))
self._sched_jobs_logger = EventLogger(
self,
"SchedJobs",
debug=debug,
to_file="{}_sched_jobs.csv".format(export_prefix))
self._log_job_header()
self._events = []
self._events = EventList()
# Use the basic Pybatsim scheduler to wrap the Batsim API
self._scheduler = BaseBatsimScheduler(self, options)
......@@ -261,7 +252,7 @@ class Scheduler(metaclass=ABCMeta):
@property
def events(self):
"""The events happened in the scheduler."""
return ListView(self._events)
return self._events
@property
def dynamic_workload(self):
......@@ -377,7 +368,7 @@ class Scheduler(metaclass=ABCMeta):
event = LoggingEvent(self.time, level, open_jobs, processed_jobs,
msg, type, kwargs)
self._events.append(event)
self._events.add(event)
event_str = event.to_message()
......@@ -401,65 +392,6 @@ class Scheduler(metaclass=ABCMeta):
return str(event)
def _log_job_header(self):
header = [
"time",
"full_job_id",
"workload_name",
"job_id",
"full_parent_job_id",
"parent_workload_name",
"parent_job_id",
"submission_time",
"requested_number_of_processors",
"requested_time",
"success",
"starting_time",
"finish_time",
"comment",
"type",
"reason"
]
self._sched_jobs_logger.info(";".join([str(i) for i in header]))
def _log_job(
self,
time,
job,
type_of_completion,
reason_for_completion=""):
full_parent_job_id = ""
parent_job_id = ""
parent_workload_name = ""
if job.parent_job:
full_parent_job_id = job.parent_job.id
split_parent = full_parent_job_id.split(
Batsim.WORKLOAD_JOB_SEPARATOR)
parent_workload_name = split_parent[0]
parent_job_id = split_parent[1]
id = job.id.split(Batsim.WORKLOAD_JOB_SEPARATOR)
msg = [
time, # time
job.id, # full_job_id
id[0], # workload_name
id[1], # job_id
full_parent_job_id, # full_parent_job_id
parent_workload_name, # parent_workload_name
parent_job_id, # parent_job_id
job.submit_time, # submission_time
job.requested_resources, # requested_number_of_processors
job.requested_time, # requested_time
1 if job.success else 0, # success
job.start_time, # starting_time
job.finish_time, # finish_time
job.comment or "", # comment
type_of_completion, # type
reason_for_completion # reason
]
msg = ["" if s is None else s for s in msg]
self._sched_jobs_logger.info(";".join([str(i) for i in msg]))
def debug(self, msg, **kwargs):
"""Writes a debug message to the logging facility."""
self._logger.debug(self._format_log_msg(msg, **kwargs))
......
......@@ -24,6 +24,28 @@ class ObserveList:
"""A view of the content of the list."""
return ListView(self._data)
def get(self, idx, default=None):
"""Returns the element at the specified index.
:param idx: the index
:param default: the default value if the list does have less elements.
"""
try:
return self._data[idx]
except IndexError:
return default
@property
def first(self):
"""Return the first element."""
return self.get(0)
@property
def last(self):
"""Return the last element."""
return self.get(len(self._data) - 1)
def _check_new_elem(self, element):
"""Checks whether a new element should be added.
......
......@@ -13,7 +13,7 @@ from batsim.batsim import Batsim
from batsim.sched.events import load_events_from_file
def merge_by_parent_job(in_jobs, in_sched_jobs, in_events, out_jobs, **kwargs):
def merge_by_parent_job(in_batsim_jobs, in_sched_events, out_jobs, **kwargs):
"""Function used as function in `process_jobs` to merge jobs with the same parent job id."""
idx = 0
......@@ -22,25 +22,22 @@ def merge_by_parent_job(in_jobs, in_sched_jobs, in_events, out_jobs, **kwargs):
out_jobs.loc[idx] = args
idx += 1
for i1, r1 in in_jobs.iterrows():
submit_events = in_sched_events.filter(type="job_submission_received")
for i1, r1 in in_batsim_jobs.iterrows():
job_id = r1["job_id"]
workload_name = r1["workload_name"]
sched_job = in_sched_jobs.loc[in_sched_jobs['full_job_id'] == str(
workload_name) + Batsim.WORKLOAD_JOB_SEPARATOR + str(job_id)].iloc[0]
full_job_id = str(
workload_name) + Batsim.WORKLOAD_JOB_SEPARATOR + str(job_id)
parent_workload_name = sched_job["parent_workload_name"]
parent_job_id = sched_job["parent_job_id"]
event = submit_events.filter(
cond=lambda ev: ev.data["job"]["id"] == full_job_id).first
job_obj = event.data["job"]
if not pandas.isnull(parent_job_id):
try:
workload_name = str(int(parent_workload_name))
except ValueError:
workload_name = str(parent_workload_name)
try:
job_id = str(int(parent_job_id))
except ValueError:
job_id = str(parent_job_id)
if job_obj["parent_id"]:
job_id = str(job_obj["parent_number"])
workload_name = str(job_obj["parent_workload_name"])
add_job(
job_id,
......@@ -60,16 +57,14 @@ def merge_by_parent_job(in_jobs, in_sched_jobs, in_events, out_jobs, **kwargs):
r1["allocated_processors"])
def process_jobs(in_jobs, in_sched_jobs, in_events,
def process_jobs(in_batsim_jobs, in_sched_events,
functions=[], float_precision=6,
output_separator=",", **kwargs):
"""Tool for processing the job results.
:param in_jobs: the file name of the jobs file written by Batsim
:param in_sched_jobs: the file name of the jobs file written by PyBatsim
:param in_batsim_jobs: the file name of the jobs file written by Batsim
:param in_events: the file name of the events file written by PyBatsim
:param in_sched_events: the file name of the events file written by PyBatsim.sched
:param functions: the functions which should be used for processing the jobs
and generating new data files.
......@@ -83,25 +78,22 @@ def process_jobs(in_jobs, in_sched_jobs, in_events,
"""
result_files = []
with open(in_jobs, 'r') as in_jobs_file, \
open(in_sched_jobs, 'r') as in_sched_jobs_file:
in_jobs_data = pandas.read_csv(in_jobs_file, sep=",")
in_sched_jobs_data = pandas.read_csv(in_sched_jobs_file, sep=";")
in_events_data = load_events_from_file(in_events)
with open(in_batsim_jobs, 'r') as in_batsim_jobs_file:
in_batsim_jobs_data = pandas.read_csv(in_batsim_jobs_file, sep=",")
in_sched_events_data = load_events_from_file(in_sched_events)
for f in functions:
out_jobs = "{}_{}.csv".format(
os.path.splitext(in_jobs)[0], f.__name__)
os.path.splitext(in_batsim_jobs)[0], f.__name__)
result_files.append(out_jobs)
with open(out_jobs, 'w') as out_jobs_file:
out_jobs_data = pandas.DataFrame(
data=None,
columns=in_jobs_data.columns,
index=in_jobs_data.index)
columns=in_batsim_jobs_data.columns,
index=in_batsim_jobs_data.index)
out_jobs_data.drop(out_jobs_data.index, inplace=True)
f(in_jobs_data, in_sched_jobs_data,
in_events_data, out_jobs_data, **kwargs)
f(in_batsim_jobs_data, in_sched_events_data, out_jobs_data, **kwargs)
out_jobs_data.to_csv(
out_jobs_file,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment