Commit 4783e1b7 authored by S. Lackner's avatar S. Lackner

[sched] extend resource management in compute resources and special resources

parent 788949b8
......@@ -6,7 +6,7 @@
"""
import sys
from .resource import Resources, Resource
from .resource import Resources, Resource, ComputeResource
from .utils import ListView
......@@ -23,6 +23,7 @@ class Allocation:
def __init__(self, start_time, walltime=None, resources=[], job=None):
self._job = None
self._resources = []
self._special_resources = []
self._scheduler = None
......@@ -57,6 +58,11 @@ class Allocation:
"""The list of assigned resources."""
return ListView(self._resources)
@property
def special_resources(self):
"""The list of assigned special resources."""
return ListView(self._special_resources)
@property
def allocated(self):
"""Whether or not this allocation is currently allocated (active)."""
......@@ -199,7 +205,10 @@ class Allocation:
"""
assert not self.allocated and not self.previously_allocated, "Allocation is in invalid state"
resource._do_add_allocation(self)
self._resources.append(resource)
if not isinstance(resource, ComputeResource):
self._special_resources.append(resource)
else:
self._resources.append(resource)
def remove_resource(self, resource):
"""Removes a resource from this allocation.
......@@ -208,13 +217,18 @@ class Allocation:
"""
assert not self.allocated and not self.previously_allocated, "Allocation is in invalid state"
resource._do_remove_allocation(self)
self._resources.remove(resource)
if not isinstance(resource, ComputeResource):
self._special_resources.remove(resource)
else:
self._resources.remove(resource)
def remove_all_resources(self):
"""Removes all resources from this allocation."""
assert not self.allocated and not self.previously_allocated, "Allocation is in invalid state"
for r in self._resources.copy():
self.remove_resource(r)
for r in self._special_resources.copy():
self.remove_resource(r)
def allocate(self, scheduler, range1, *more_ranges):
"""Mark node ranges from this allocation as `computing` and set the allocation
......
......@@ -10,8 +10,43 @@ from enum import Enum
from .utils import ObserveList, filter_list, ListView, build_filter
class ResourceRequirement:
"""Resource requirements are additional requirements which have to be fulfilled
when filtering for resources.
Can be registered as handlers in the scheduler (`find_resource_handler`).
Functions which search for resources for jobs make use of `ResourceRequirement`
objects to specify the requirements based on properties set in the job.
:param resources: the list of resources which can be used to fulfil the resource
requirement. Alternatively only a single resource object.
:param min: the minimum number of resources which have to be selected
:param max: the maximum number of resources which have to be selected
:param filter: a resource filter function to limit the resource combinations
:param num: when set this will set min and max
"""
def __init__(self, resources, min=1, max=None, filter=None, num=None):
if num is not None:
num = abs(num)
min = num
max = num
if not isinstance(resources, list):
resources = [resources]
self.resources = resources
self.min = min
self.max = max
self.filter = filter
class Resource:
"""A resource is a machine managed by the resource manager.
"""A resource is a limited resource managed by the scheduler.
:param scheduler: the associated scheduler managing this resource.
......@@ -19,49 +54,44 @@ class Resource:
:param name: the name of this resource.
:param state: the default state of this resource.
:param properties: the dict of additional properties of this resource
:param resources_list: the main resources list where this resource is contained
:param time_sharing: whether this resource allows time sharing (if `None`, the
value set in Batsim will be used)
"""
class State(Enum):
"""The states of a machine."""
SLEEPING = 0
IDLE = 1
COMPUTING = 2
TRANSITING_FROM_SLEEPING_TO_COMPUTING = 3
TRANSITING_FROM_COMPUTING_TO_SLEEPING = 4
TIME_DELTA = 0.00000000001
def __init__(self, scheduler, id, name, state, properties, resources_list):
def __init__(
self,
scheduler,
name,
id=None,
resources_list=None,
time_sharing=None):
self._scheduler = scheduler
self._id = id
self._name = name
self._resources_list = resources_list
self._time_sharing = time_sharing
try:
self._state = Resource.State[(state or "").upper()]
except KeyError:
scheduler.fatal("Invalid machine state: {id}, {name}={state}",
id=id,
name=name,
state=state,
type="invalid_machine_state")
self._properties = properties
self._resources_list = resources_list
self._allocations = set()
self._pstate = None
self._old_pstate = None
self._pstate_update_in_progress = False
@property
def id(self):
"""The id of this resource."""
return self._id
@property
def time_sharing(self):
"""Whether this resource can be shared."""
if self._time_sharing is None:
return self._scheduler.has_time_sharing
return self._time_sharing
@property
def name(self):
"""The name of this resource."""
......@@ -78,50 +108,38 @@ class Resource:
return ListView(self._allocations)
@property
def computing(self):
"""Whether or not this resource is currently computing in some of its resources."""
def active(self):
"""Whether or not this resource is currently active in some of its resources."""
for alloc in self._allocations:
if self in alloc.allocated_resources:
return True
return False
@property
def pstate_update_in_progress(self):
"""Whether or not a pstate update is currently in progress (sent to Batsim but still pending)."""
return self._pstate_update_in_progress
@property
def old_pstate(self):
"""Returns the previous pstate."""
return self._old_pstate
@property
def pstate(self):
"""Returns the current pstate."""
return self._pstate
@pstate.setter
def pstate(self, newval):
if not self.pstate_update_in_progress:
self._old_pstate = self._pstate
scheduler._batsim.set_resource_state([self.id], self._pstate)
self._pstate_update_in_progress = True
self._pstate = newval
self._resources_list.update_element(self)
@property
def resources(self):
"""Returns a list containing only the resource (for compatibility with the `Resources` class)."""
return [self]
def find_first_time_to_fit_job(self, job, time=None):
"""Finds the first time after which the job can start.
:param job: the job to find a time slot for
:param time: the starting time after which a time slot is needed
"""
raise NotImplementedError(
"Implement this function in a concrete Resources class")
def time_free(self, time=None):
"""Get time how long this resource is still free until the next reservation starts, `0` if the resource is allocated, and
`Inf` if this resource has no allocations.
"""Get time how long this resource is still free until the next reservation starts, `0`
if the resource is allocated, and `Inf` if this resource has no allocations.
Please note that this probably makes less sense for resources which allow time sharing
since this function will only look for times where there is not a single allocation
using this resource.
:param time: the first time step to consider (default: the current time in the simulation)
"""
result = float("Inf")
......@@ -139,46 +157,6 @@ class Resource:
return result
def find_first_time_to_fit_walltime(self, requested_walltime, time=None):
"""Finds the first time after which the requested walltime is available for a job start.
:param requested_walltime: the size of the requested time slot
:param time: the starting time after which a time slot is needed
"""
if time is None:
time = self._scheduler.time
time_updated = True
while time_updated:
time_updated = False
# Search the earliest time when a slot for an allocation is
# available
for alloc in self._allocations:
if alloc.start_time <= time and alloc.end_time >= time:
time = alloc.end_time + 0.000001
time_updated = True
# Check whether or not the full requested walltime fits into the
# slot, otherwise move the slot at the end of the found conflicting
# allocation and then repeat the loop.
estimated_end_time = time + requested_walltime
for alloc in self._allocations:
if alloc.start_time > time and alloc.start_time < (
estimated_end_time + 0.000001):
time = alloc.end_time + 0.000001
estimated_end_time = time + requested_walltime
time_updated = True
return time
def _update_pstate_change(self, pstate):
"""Update the pstate when called through a Batsim event.
:param pstate: the new pstate
"""
self._old_pstate = self._pstate
self._pstate = pstate
self._pstate_update_in_progress = False
self._resources_list.update_element(self)
def _do_add_allocation(self, allocation):
"""Adds an allocation to this resource.
......@@ -187,15 +165,17 @@ class Resource:
:param allocation: the allocation to be added
"""
# If time sharing is not enabled: check that allocations do not overlap
if not self._scheduler.has_time_sharing:
if not self.time_sharing:
for alloc in self._allocations:
if alloc.overlaps_with(allocation):
self._scheduler.fatal(
"Overlapping resource allocation while time-sharing is not enabled, {own} overlaps with {other}",
"Overlapping resource allocation in resource {resource} while time-sharing is not allowed, {own} overlaps with {other}",
resource=self,
own=alloc,
other=allocation)
self._allocations.add(allocation)
self._resources_list.update_element(self)
if self._resources_list:
self._resources_list.update_element(self)
def _do_remove_allocation(self, allocation):
"""Removes an allocation from this resource.
......@@ -203,14 +183,17 @@ class Resource:
:param allocation: the allocation to be removed.
"""
self._allocations.remove(allocation)
self._resources_list.update_element(self)
if self._resources_list:
self._resources_list.update_element(self)
def _do_allocate_allocation(self, allocation):
"""Hook which is called when an allocation becomes active.
:param allocation: the allocation which becomes active
"""
self._resources_list.update_element(self)
if self._resources_list:
self._resources_list.update_element(self)
self.on_allocate(allocation)
def _do_free_allocation(self, allocation):
"""Hook which is called when an previously active allocation is freed.
......@@ -218,7 +201,9 @@ class Resource:
:param allocation: the allocation which is freed
"""
self._allocations.remove(allocation)
self._resources_list.update_element(self)
if self._resources_list:
self._resources_list.update_element(self)
self.on_free(allocation)
def __str__(self):
return (
......@@ -227,6 +212,120 @@ class Resource:
self.id, self.name, self.pstate,
[str(a) for a in self.allocations]))
def on_allocate(self, allocation):
pass
def on_free(self, allocation):
pass
class ComputeResource(Resource):
"""A compute resource is a machine managed by the resource manager (Batsim).
:param state: the default state of this resource.
:param properties: the dict of additional properties of this resource
"""
class State(Enum):
"""The states of a machine."""
SLEEPING = 0
IDLE = 1
COMPUTING = 2
TRANSITING_FROM_SLEEPING_TO_COMPUTING = 3
TRANSITING_FROM_COMPUTING_TO_SLEEPING = 4
def __init__(self, *args, state, properties, **kwargs):
# ComputeResources are only time_shared if Batsim allows this, which is
# the default if set to False.
super().__init__(*args, **kwargs)
try:
self._state = ComputeResource.State[(state or "").upper()]
except KeyError:
scheduler.fatal("Invalid machine state: {id}, {name}={state}",
id=id,
name=name,
state=state,
type="invalid_machine_state")
self._properties = properties
self._pstate = None
self._old_pstate = None
self._pstate_update_in_progress = False
def find_first_time_to_fit_job(self, job, time=None):
return self.find_first_time_to_fit_walltime(job.requested_time, time)
def find_first_time_to_fit_walltime(self, requested_walltime, time=None):
"""Finds the first time after which the requested walltime is available for a job start.
:param requested_walltime: the size of the requested time slot
:param time: the starting time after which a time slot is needed
"""
if time is None:
time = self._scheduler.time
time_updated = True
while time_updated:
time_updated = False
# Search the earliest time when a slot for an allocation is
# available
for alloc in self._allocations:
if alloc.start_time <= time and alloc.estimated_end_time >= time:
time = alloc.estimated_end_time + Resource.TIME_DELTA
time_updated = True
# Check whether or not the full requested walltime fits into the
# slot, otherwise move the slot at the end of the found conflicting
# allocation and then repeat the loop.
estimated_end_time = time + requested_walltime
for alloc in self._allocations:
if alloc.start_time > time and alloc.start_time < (
estimated_end_time + Resource.TIME_DELTA):
time = alloc.estimated_end_time + Resource.TIME_DELTA
estimated_end_time = time + requested_walltime
time_updated = True
return time
@property
def pstate_update_in_progress(self):
"""Whether or not a pstate update is currently in progress (sent to Batsim but still pending)."""
return self._pstate_update_in_progress
@property
def old_pstate(self):
"""Returns the previous pstate."""
return self._old_pstate
@property
def pstate(self):
"""Returns the current pstate."""
return self._pstate
@pstate.setter
def pstate(self, newval):
if not self.pstate_update_in_progress:
self._old_pstate = self._pstate
scheduler._batsim.set_resource_state([self.id], self._pstate)
self._pstate_update_in_progress = True
self._pstate = newval
if self._resources_list:
self._resources_list.update_element(self)
def _update_pstate_change(self, pstate):
"""Update the pstate when called through a Batsim event.
:param pstate: the new pstate
"""
self._old_pstate = self._pstate
self._pstate = pstate
self._pstate_update_in_progress = False
if self._resources_list:
self._resources_list.update_element(self)
class Resources(ObserveList):
"""Helper class implementing parts of the python list API to manage the resources.
......@@ -254,9 +353,19 @@ class Resources(ObserveList):
return self.filter(allocated=True)
@property
def computing(self):
"""The list of all computing resources (resources which are allocated and active in an allocation)."""
return self.filter(computing=True)
def active(self):
"""The list of all active resources (resources which are allocated and active in an allocation)."""
return self.filter(active=True)
@property
def compute(self):
"""The list of all compute resources (normal hosts)."""
return self.filter(compute=True)
@property
def special(self):
"""The list of all special resources (managed by the scheduler logic)."""
return self.filter(special=True)
def __getitem__(self, items):
"""Returns either a slice of resources or returns a resource based on a given resource id."""
......@@ -280,13 +389,12 @@ class Resources(ObserveList):
def find_first_time_and_resources_to_fit_walltime(
self,
requested_walltime,
job,
time,
min_matches=None,
max_matches=None,
time_sharing=False,
filter=None):
"""Find sufficient resources and the earlierst start time to fit a walltime and resource requirements.
"""Find sufficient resources and the earlierst start time to fit a job and its resource requirements.
:param requested_walltime: the walltime which should fit in the allocation
......@@ -298,62 +406,86 @@ class Resources(ObserveList):
:param filter: the filter to be applied when a set of resources was found
"""
if time_sharing:
raise ValueError(
"Finding allocations when time sharing is enabled is currently not implemented")
# There are not enough resources available
if min_matches is not None and len(self) < min_matches:
return time, self.create()
while True:
sufficient_resources_found = False
found_resources = []
earliest_time_available = None
time_changed = False
for r in self._data:
new_time = r.find_first_time_to_fit_walltime(
requested_walltime, time)
if new_time == time:
found_resources.append(r)
if min_matches is None or len(
found_resources) >= min_matches:
sufficient_resources_found = True
else:
if earliest_time_available is None:
earliest_time_available = new_time
def do_find(job, time, res, min_matches, max_matches, filter):
# There are not enough resources available
if min_matches is not None and len(res) < min_matches:
return time, []
while True:
sufficient_resources_found = False
found_resources = []
earliest_time_available = None
time_changed = False
for r in res:
new_time = r.find_first_time_to_fit_job(
job, time)
if new_time == time:
found_resources.append(r)
if min_matches is None or len(
found_resources) >= min_matches:
sufficient_resources_found = True
else:
earliest_time_available = min(
earliest_time_available, new_time)
if sufficient_resources_found:
if filter:
found_resources = build_filter(
filter, min_entries=min_matches,
max_entries=max_matches, walltime=requested_walltime,
current_time=time)(found_resources)
if found_resources and (
min_matches is None or len(found_resources) >= min_matches):
break
if earliest_time_available is None:
earliest_time_available = new_time
else:
earliest_time_available = min(
earliest_time_available, new_time)
if sufficient_resources_found:
if filter:
found_resources = build_filter(
filter, min_entries=min_matches,
max_entries=max_matches, job=job,
current_time=time)(found_resources)
if found_resources and (
min_matches is None or len(found_resources) >= min_matches):
break
else:
time = earliest_time_available
else:
time = earliest_time_available
else:
break
elif earliest_time_available:
time = earliest_time_available
found_length = len(found_resources)
break
elif earliest_time_available:
time = earliest_time_available
if max_matches is not None:
found_resources = found_resources[:max_matches]
found_length = len(found_resources)
assert found_length <= max_matches
if min_matches is not None:
assert found_length >= min_matches
if max_matches is not None:
found_resources = found_resources[:max_matches]
found_length = len(found_resources)
assert found_length <= max_matches
if min_matches is not None:
assert found_length >= min_matches
return time, self.create(found_resources)
return time, found_resources
while True:
result = set()
new_time, found_res = do_find(job,
time,
self._data,
min_matches,
max_matches,
filter)
s_found_all = []
new_time2 = new_time
for h in job._scheduler.get_find_resource_handlers:
reqs = h(job._scheduler, job)
for r in reqs:
new_time2, s_found = do_find(job,
new_time2,
r.resources,
r.min,
r.max,
r.filter)
s_found_all += s_found
if new_time == new_time2:
return new_time, self.create(set(found_res + s_found_all))
else:
time = max(new_time, new_time2)
def find_with_earliest_start_time(
self, job, allow_future_allocations=False,
......@@ -367,10 +499,8 @@ class Resources(ObserveList):
:param filter: the filter to be applied when a set of resources was found
"""
start_time, found_resources = self.find_first_time_and_resources_to_fit_walltime(
job.requested_time, max(job._scheduler.time, job.submit_time), job.requested_resources, job.requested_resources,
job._scheduler.has_time_sharing,
filter)
start_time, found_resources = self.find_first_time_and_resources_to_fit_walltime(job, max(
job._scheduler.time, job.submit_time), job.requested_resources, job.requested_resources, filter)
if not allow_future_allocations and start_time != job._scheduler.time:
found_resources = self.create()
......@@ -391,7 +521,9 @@ class Resources(ObserveList):
*args,
free=False,
allocated=False,
computing=False