Commit f06bb275 authored by S. Lackner's avatar S. Lackner

[sched] Smaller fixes and changes in regard to dynamic job submissions

parent c786b19c
......@@ -79,6 +79,8 @@ class Job:
"""
try:
return self._batsim_job.json_dict[key]
except AttributeError:
return default
except KeyError:
try:
return self._batsim_job.__dict__[key]
......@@ -463,12 +465,15 @@ class Job:
for a2 in r.allocations:
if a1 != a2:
if a1.overlaps_with(a2):
raise ValueError(
"Time sharing is not enabled in Batsim (resource allocations are overlapping)")
self._scheduler.fatal(
"Time sharing is not enabled in Batsim (resource allocations are overlapping: own={own}, other={other})",
own=a1, other=a2,
type="schedule_resource_allocations_overlapping")
if not self.allocation.fits_job_for_remaining_time(self):
raise ValueError(
"Job does not fit in the remaining time frame of the allocation")
self._scheduler.fatal(
"Job does not fit in the remaining time frame of the allocation ({job})",
job=self, type="job_does_not_fit")
# Abort job start if allocation is in the future
if self.allocation.start_time > self._scheduler.time:
......@@ -590,6 +595,10 @@ class DynamicJob(Job):
def id(self):
return self._job_id
@property
def start_time(self):
return None
@property
def submit_time(self):
return None
......@@ -627,12 +636,16 @@ class DynamicJob(Job):
return None
@property
def completed(self):
def runnable(self):
return False
@property
def submitted(self):
return self._submitted
def running(self):
return False
@property
def completed(self):
return False
@property
def is_dynamic_submission_request(self):
......@@ -733,6 +746,16 @@ class Jobs(ObserveList):
"""Returns all jobs which are submitted to Batsim."""
return self.filter(submitted=True)
@property
def dynamic_job(self):
"""Returns all jobs which are dynamic jobs."""
return self.filter(dynamic_job=True)
@property
def static_job(self):
"""Returns all jobs which are static jobs."""
return self.filter(static_job=True)
@property
def dynamic_submission_request(self):
"""Returns all jobs which are requests for dynamic submissions."""
......@@ -770,6 +793,8 @@ class Jobs(ObserveList):
scheduled=False,
killed=False,
submitted=False,
dynamic_job=False,
static_job=False,
dynamic_submission_request=False,
**kwargs):
"""Filter the jobs lists to search for jobs.
......@@ -790,6 +815,10 @@ class Jobs(ObserveList):
:param submitted: whether the job has been submitted.
:param dynamic_job: whether the job is a dynamic job.
:param static_job: whether the job is a static job.
:param dynamic_submission_request: whether the job is a request for dynamic submission.
"""
......@@ -801,6 +830,8 @@ class Jobs(ObserveList):
scheduled,
killed,
submitted,
dynamic_job,
static_job,
dynamic_submission_request]:
runnable = True
running = True
......@@ -810,38 +841,57 @@ class Jobs(ObserveList):
scheduled = True
killed = True
submitted = True
dynamic_job = True
static_job = True
dynamic_submission_request = True
# Filter jobs
def filter_jobs(jobs):
for j in jobs:
if j.is_dynamic_submission_request:
if dynamic_submission_request or submitted:
if dynamic_submission_request:
if j.is_dynamic_submission_request:
yield j
continue
if dynamic_job:
if j.is_dynamic_job:
yield j
continue
if static_job:
if not j.is_dynamic_job:
yield j
elif j.running:
if running or submitted:
continue
if running:
if j.running:
yield j
elif j.completed:
if completed or submitted:
continue
if completed:
if j.completed:
yield j
elif j.rejected:
if rejected or submitted:
continue
if rejected:
if j.rejected:
yield j
elif j.scheduled:
if scheduled or submitted:
continue
if scheduled:
if j.scheduled:
yield j
elif j.killed:
if killed or submitted:
continue
if killed:
if j.killed:
yield j
elif j.runnable:
if runnable or open or submitted:
continue
if runnable:
if j.runnable:
yield j
elif j.open:
if open or submitted:
continue
if open:
if j.open:
yield j
elif j.submitted:
if submitted:
continue
if submitted:
if j.submitted:
yield j
continue
return self.create(
filter_list(
......
......@@ -44,8 +44,11 @@ class Resource:
try:
self._state = Resource.State[(state or "").upper()]
except KeyError:
raise ValueError("Invalid machine state: {}, {}={}"
.format(id, name, state))
scheduler.fatal("Invalid machine state: {id}, {name}={state}",
id=id,
name=name,
state=state,
type="invalid_machine_state")
self._properties = properties
self._allocations = set()
......@@ -164,8 +167,10 @@ class Resource:
if not self._scheduler.has_time_sharing:
for alloc in self._allocations:
if alloc.overlaps_with(allocation):
raise ValueError(
"Overlapping resource allocation while time-sharing is not enabled")
self._scheduler.fatal(
"Overlapping resource allocation while time-sharing is not enabled, {own} overlaps with {other}",
own=alloc,
other=allocation)
self._allocations.add(allocation)
self._resources_list.update_element(self)
......@@ -358,14 +363,16 @@ class Resources(ObserveList):
# Filter after the resource type
def filter_free_or_allocated_resources(res):
for r in res:
if r.is_allocated:
if allocated:
yield r
elif r.computing:
if computing:
yield r
else:
if free:
if allocated:
if r.is_allocated:
yield j
continue
if computing:
if r.computing:
yield j
continue
if free:
if not r.is_allocated and not r.computing:
yield r
filter_objects.append(filter_free_or_allocated_resources)
......
......@@ -228,7 +228,7 @@ class Scheduler(metaclass=ABCMeta):
self._sched_delay = float(
options.get(
"sched_delay",
None) or 0.00000000000001)
None) or 0.000000000000000000001)
self._jobs = Jobs()
self._resources = Resources()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment