Commit 23df20d3 authored by S. Lackner's avatar S. Lackner

[code] Huge change on how sub jobs are handled.

The class DynamicJobRequest is removed since it is much simpler to
manage all kind of dynamic jobs through the new workloads. However,
dynamic job requests are no longer found through 'scheduler.jobs' and
have to be found in the particular workload object (either through
job.sub_jobs_workload, job.sub_jobs, or scheduler.dynamic_workload)
parent 93392681
This diff is collapsed.
......@@ -244,6 +244,11 @@ class Scheduler(metaclass=ABCMeta):
"""The events happened in the scheduler."""
return ListView(self._events)
@property
def dynamic_workload(self):
"""The workload of dynamic job submissions of this scheduler."""
return self._dynamic_workload
@property
def hpst(self):
"""The hpst (high-performance storage tier) host managed by Batsim."""
......
......@@ -43,9 +43,9 @@ class DelayProfilesAsTasks(Scheduler):
# Create all sub jobs and submit them to Batsim
for task in tasks:
job.create_sub_job(job.requested_resources,
job.submit_sub_job(job.requested_resources,
walltimes_each,
Profiles.Delay(task[1])).submit(self)
Profiles.Delay(task[1]))
# This scheduler only handles delay jobs
else:
self.fatal(
......
......@@ -86,7 +86,7 @@ def generate_energy(workloads_basedir, platforms_basedir, options):
} for s in schedulers for w in workloads_to_use]
def generate_sched(workloads_basedir, platforms_basedir, options):
def generate_sched_static(workloads_basedir, platforms_basedir, options):
schedulers = []
schedulers += [
......@@ -144,10 +144,107 @@ def generate_sched(workloads_basedir, platforms_basedir, options):
} for s in schedulers for w in workloads_to_use]
def generate_sched_script(workloads_basedir, platforms_basedir, options):
schedulers = []
schedulers += [
{
"name_expe": "sched_fillerSched",
"name": "fillerSched2",
"verbosity": 0,
"protection": True,
"interpreter": "coverage",
"options": {
}
},
{
"name_expe": "sched_backfilling",
"name": "easyBackfill2",
"verbosity": 0,
"protection": True,
"interpreter": "coverage",
"options": {
}
},
]
options += [{
"batsim_bin": "tests/run_batsim.sh",
"platform": os.path.join(platforms_basedir, "simple_coalloc_platform.xml"),
"workload_script": {
"path": "tests/workloads/generated_workload.py",
},
# where all output files (stdins, stderrs, csvs...) will be outputed.
"output_dir": "SELF",
# if set to "SELF" then output on the same dir as this option file.
"batsim": {
"pfs-host": "lcst_host",
"hpst-host": "hpst_host",
"config-file": "tests/config_noredis_dynamic.json",
"export": "out", # The export filename prefix used to generate simulation output
"energy-plugin": False, # Enables energy-aware experiments
"disable-schedule-tracing": True, # remove paje output
"verbosity": "information" # Sets the Batsim verbosity level. Available values
# are : quiet, network-only,
# information (default), debug.
},
"scheduler": copy.deepcopy(s)
} for s in schedulers]
def generate_sched_dynamic(workloads_basedir, platforms_basedir, options):
schedulers = []
schedulers += [
{
"name_expe": "sched_dynamic",
"name": "tests/schedulers/dynamicTestScheduler.py",
"verbosity": 0,
"protection": True,
"interpreter": "coverage",
"options": {
}
},
]
options += [{
"batsim_bin": "tests/run_batsim.sh",
"platform": os.path.join(platforms_basedir, "simple_coalloc_platform.xml"),
# where all output files (stdins, stderrs, csvs...) will be outputed.
"output_dir": "SELF",
# if set to "SELF" then output on the same dir as this option file.
"batsim": {
"pfs-host": "lcst_host",
"hpst-host": "hpst_host",
"config-file": "tests/config_noredis_dynamic.json",
"export": "out", # The export filename prefix used to generate simulation output
"energy-plugin": False, # Enables energy-aware experiments
"disable-schedule-tracing": True, # remove paje output
"verbosity": "information" # Sets the Batsim verbosity level. Available values
# are : quiet, network-only,
# information (default), debug.
},
"scheduler": copy.deepcopy(s)
} for s in schedulers]
def generate_sched(workloads_basedir, platforms_basedir, options):
generate_sched_static(workloads_basedir, platforms_basedir, options)
generate_sched_script(workloads_basedir, platforms_basedir, options)
generate_sched_dynamic(workloads_basedir, platforms_basedir, options)
def do_generate(options):
for opt in options:
try:
workload_name = opt["workload"]
except KeyError:
try:
workload_name = opt["workload_script"]["path"]
except KeyError:
workload_name = ""
opt["scheduler"]["name_expe"] += "_" + os.path.splitext(
os.path.basename(opt["workload"]))[0]
os.path.basename(workload_name))[0]
new_dir = "tests/" + opt["scheduler"]["name_expe"]
try:
......
"""
Scheduler used in tests to submit a dynamic workload which originates from the scheduler
(No external workload file exists).
"""
from batsim.sched import Scheduler
from batsim.sched import Profiles
from batsim.sched.workloads import WorkloadDescription
from batsim.sched.algorithms.filling import filler_sched
class DynamicTestScheduler(Scheduler):
def on_init(self):
self.submit_dynamic_job(
walltime=10,
res=2,
id=42,
profile=Profiles.Delay(7))
self.submit_dynamic_job(walltime=10, res=2, profile=Profiles.Delay(7))
self.submit_dynamic_job(walltime=10, res=2, profile=Profiles.Delay(7))
w = WorkloadDescription(name="TestWorkload")
w.new_job(subtime=0, walltime=10, res=4, profile=Profiles.Delay(5))
w.new_job(subtime=0, walltime=11, res=4, profile=Profiles.Delay(10))
w.new_job(walltime=60, res=4, profile=Profiles.Sequence([
Profiles.Delay(15),
Profiles.Delay(5),
Profiles.Delay(10),
Profiles.Delay(20)]))
w.new_job(walltime=60, res=4, profile=Profiles.Sequence([
Profiles.Delay(5),
Profiles.Sequence([
Profiles.Delay(15),
Profiles.Delay(5),
])]))
w.submit(self)
def schedule(self):
return filler_sched(self, abort_on_first_nonfitting=True)
"""
Generated workload for a simple workload using delay and sequence profiles with nested
delays.
"""
from batsim.sched.workloads import generate_workload
from batsim.sched.profiles import Profiles
def complex_profile():
def if_timeout():
yield Profiles.Delay(1, ret=2)
def if_failed():
yield Profiles.Delay(1, ret=1)
def if_success():
yield Profiles.Delay(10)
yield Profiles.Delay(5)
yield Profiles.Delay(10)
yield Profiles.Send({"msg": "Some message"})
yield Profiles.Receive.if_recv("OK", if_success, if_failed, if_timeout)
def generated_test_workload(w):
j1 = w.new_job(subtime=0, walltime=10, res=4, profile=Profiles.Delay(5),
comment="Test comment")
j2 = w.new_job(subtime=0, walltime=10, res=4, profile=Profiles.Delay(5))
j3 = w.new_job(
id=10,
subtime=0,
walltime=10,
res=4,
profile=Profiles.Delay(5))
j4 = w.new_job(subtime=0, walltime=10, res=4, profile=Profiles.Delay(5))
w.new_job(
walltime=30, res=4, deps=[j1.id, j2.id, j3.id, j4.id, "42", 43],
profile=Profiles.Sequence(
[Profiles.Delay(5),
Profiles.Delay(5),
Profiles.Delay(10),
Profiles.Delay(5)]))
w.new_job(
walltime=200,
res=16,
profile=Profiles.profile_from_generator(complex_profile))
w.new_job(
id=42,
subtime=200,
walltime=10,
res=32,
profile=Profiles.Delay(5))
w.new_job(
id=43,
subtime=200,
walltime=10,
res=32,
profile=Profiles.Delay(5))
generate_workload(
generated_test_workload, nb_res=32,
description="This is a simple test workload generated by a python script using batsim.sched.workloads")
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment