Mentions légales du service

Skip to content
Snippets Groups Projects
Commit a25f1f08 authored by MERCIER Michael's avatar MERCIER Michael
Browse files

Merge branch '6-tests-are-broken' into 'master'

Resolve "Tests are broken: Depens on workloads and platforms not on the repo"

Closes #6

See merge request !7
parents 68d41ed6 3d0c6648
No related branches found
No related tags found
1 merge request!7Resolve "Tests are broken: Depens on workloads and platforms not on the repo"
......@@ -136,11 +136,11 @@ class Batsim(object):
self.nb_jobs_scheduled += 1
def execute_jobs(self, jobs, io_jobs=None):
""" args:jobs: list of jobs to execute (job.allocation MUST be set) """
""" args:jobs: list of jobs to execute
job.allocation MUST be not None and should be a non-empty ProcSet"""
for job in jobs:
assert job.allocation is not None
message = {
"timestamp": self.time(),
"type": "EXECUTE_JOB",
......
......@@ -67,11 +67,11 @@ class LoggingEvent:
return output.getvalue().strip()
def to_csv_line(self):
def conv_obj(o):
'''def conv_obj(o):
try:
return o.__dict__
except (AttributeError, ValueError):
return str(o)
return str(o)'''
data = {}
for k, v in self.data.items():
......@@ -87,12 +87,12 @@ class LoggingEvent:
new_v.append(e)
v = new_v
data[k] = v
try:
'''try:
data = json.dumps(data, default=lambda o: conv_obj(o))
except Exception as e:
raise ValueError(
"Error while dumping json data: {}"
.format(data))
.format(data))'''
output = io.StringIO()
csvdata = [self.time, self.level, self.processed_jobs, self.open_jobs,
......
......@@ -113,7 +113,7 @@ class BaseBatsimScheduler(BatsimScheduler):
except KeyError:
pass
if workload:
job_description = workload[job_id]
job_description = workload[job.id]
job_description.job = newjob
newjob._workload_description = workload
......@@ -620,6 +620,9 @@ class Scheduler(metaclass=ABCMeta):
self._dynamic_workload.prepare()
job.submit(self)
def notify_submission_finished(self):
self._batsim.notify_submission_finished()
def as_scheduler(*args, on_init=[], on_end=[], base_classes=[], **kwargs):
"""Decorator to convert a function to a scheduler class.
......
......@@ -102,13 +102,13 @@ class JobDescription:
assert not self.submitted, "Job was already submitted and can not be modified"
has_workload = bool(self.workload)
if has_workload:
self.workload.remove_job(self)
#if has_workload:
# self.workload.remove_job(self)
self._id = id
if has_workload:
self.workload.add_job(self)
#if has_workload:
# self.workload.add_job(self)
@property
def res(self):
......@@ -224,11 +224,10 @@ class JobDescription:
for p in self._additional_profiles}
scheduler._batsim.submit_job(
self.id,
str(self.id),
self.res,
self.walltime,
self.profile.name,
self.workload.name,
self.workload.name+'!'+self.profile.name,
self.subtime,
self.profile.to_dict())
scheduler._batsim.submit_profiles(
......@@ -389,6 +388,9 @@ class WorkloadDescription:
elif job.id > self._last_job_id:
self._last_job_id = job.id + 1
jid = job._workload.name + '!' + str(job.id)
job.id = jid
self._jobmap[job.id] = job
def remove_job(self, job):
......@@ -396,7 +398,7 @@ class WorkloadDescription:
:param job: The `JobDescription`.
"""
assert not job.self.submitted, "Job was already submitted and can not be removed from the workload"
assert not job.submitted, "Job was already submitted and can not be removed from the workload"
self._jobs.remove(job)
job._workload = None
......
from batsim.batsim import BatsimScheduler
from sortedcontainers import SortedSet
from procset import ProcSet, ProcInt
from procset import ProcSet
class ValidatingMachine(BatsimScheduler):
......@@ -22,18 +22,20 @@ class ValidatingMachine(BatsimScheduler):
self.scheduler.onAfterBatsimInit()
def onSimulationBegins(self):
self.nb_res = self.bs.nb_res
self.nb_res = self.bs.nb_compute_resources
self.availableResources = SortedSet(range(self.nb_res))
self.jobs_waiting = []
self.previousAllocations = dict()
# intercept job start
self.bs_start_jobs_continuous = self.bs.start_jobs_continuous
self.bs.start_jobs_continuous = self.start_jobs_continuous
self.bs_start_jobs = self.bs.start_jobs
self.bs.start_jobs = self.start_jobs
# save real job start function
self.real_start_jobs = self.bs.start_jobs
self.real_execute_jobs = self.bs.execute_jobs
# intercept job start
self.scheduler.bs = self.bs
self.scheduler.bs.start_jobs = self.start_jobs_valid
self.scheduler.bs.execute_jobs = self.execute_jobs_valid
self.scheduler.onSimulationBegins()
def onSimulationEnds(self):
......@@ -67,18 +69,17 @@ class ValidatingMachine(BatsimScheduler):
def onRequestedCall(self):
self.scheduler.onRequestedCall()
def start_jobs_continuous(self, allocs):
jobs = []
for (job, (first_res, last_res)) in allocs:
self.previousAllocations[job.id] = range(first_res, last_res + 1)
def start_jobs_valid(self, jobs, res):
for j in jobs:
try:
self.jobs_waiting.remove(job)
self.jobs_waiting.remove(j)
except KeyError:
raise ValueError(
"Job {} was not waiting (waiting: {})".format(
job, [
j2.id for j2 in self.jobs_waiting]))
for r in range(first_res, last_res + 1):
j, [j2.id for j2 in self.jobs_waiting]))
self.previousAllocations[j.id] = res[j.id]
for r in res[j.id]:
try:
self.availableResources.remove(r)
except KeyError:
......@@ -86,12 +87,10 @@ class ValidatingMachine(BatsimScheduler):
"Resource {} was not available (available: {})".format(
r, list(
self.availableResources)))
job.allocation = ProcInt(first_res, last_res)
jobs.append(job)
self.bs.execute_jobs(jobs)
j.allocation = ProcSet(*res[j.id])
self.real_execute_jobs(jobs)
def start_jobs(self, jobs, res):
def execute_jobs_valid(self, jobs, io_jobs=None):
for j in jobs:
try:
self.jobs_waiting.remove(j)
......@@ -99,8 +98,8 @@ class ValidatingMachine(BatsimScheduler):
raise ValueError(
"Job {} was not waiting (waiting: {})".format(
j, [j2.id for j2 in self.jobs_waiting]))
self.previousAllocations[j.id] = res[j.id]
for r in res[j.id]:
self.previousAllocations[j.id] = j.allocation
for r in j.allocation:
try:
self.availableResources.remove(r)
except KeyError:
......@@ -108,5 +107,4 @@ class ValidatingMachine(BatsimScheduler):
"Resource {} was not available (available: {})".format(
r, list(
self.availableResources)))
job.allocation = ProcSet(*res[job.id])
self.bs.execute_jobs(jobs)
self.real_execute_jobs(jobs, io_jobs)
\ No newline at end of file
......@@ -6,7 +6,7 @@ This scheduler consider job as rectangle.
from batsim.batsim import BatsimScheduler
from sortedcontainers import SortedListWithKey
from procset import ProcInt
from procset import ProcSet
INFINITY = float('inf')
......@@ -286,7 +286,7 @@ class EasyBackfill(BatsimScheduler):
if len(allocs) > 0:
jobs = []
for (job, (first_res, last_res)) in allocs:
job.allocation = ProcInt(first_res, last_res)
job.allocation = ProcSet((first_res, last_res))
jobs.append(job)
self.bs.execute_jobs(jobs)
......
......@@ -2,7 +2,7 @@ from batsim.batsim import BatsimScheduler, Batsim
import sys
import os
from procset import ProcSet, ProcInt
from procset import ProcSet
from itertools import islice
......@@ -20,7 +20,7 @@ class FillerSched(BatsimScheduler):
self.sched_delay = 0.005
self.openJobs = set()
self.availableResources = ProcSet(ProcInt(0,self.bs.nb_compute_resources-1))
self.availableResources = ProcSet((0,self.bs.nb_compute_resources-1))
def scheduleJobs(self):
......@@ -34,6 +34,7 @@ class FillerSched(BatsimScheduler):
nb_res_req = job.requested_resources
if nb_res_req <= len(self.availableResources):
# Retrieve the *nb_res_req* first availables resources
job_alloc = ProcSet(*islice(self.availableResources, nb_res_req))
job.allocation = job_alloc
scheduledJobs.append(job)
......
......@@ -3,7 +3,7 @@ from schedulers.intervalContainer import *
import math
from enum import Enum
from procset import ProcInt
from procset import ProcSet
class State(Enum):
......@@ -190,7 +190,7 @@ class EasyEnergyBudget(EasyBackfill):
else:
jobs = []
for (job, (first_res, last_res)) in allocs:
job.allocation = ProcInt(first_res, last_res)
job.allocation = ProcSet((first_res, last_res))
jobs.append(job)
self.bs.execute_jobs(jobs)
......@@ -527,7 +527,7 @@ def nodes_states_WantingToStartJob_SwitchedON(self, start, end, fromState, toSta
del self.waiting_allocs[a[0]]
jobs = []
for (job, (first_res, last_res)) in allocs_to_start:
job.allocation = ProcInt(first_res, last_res)
job.allocation = ProcSet((first_res, last_res))
jobs.append(job)
self.bs.execute_jobs(jobs)
......
......@@ -46,10 +46,31 @@ class UserCommand(Command):
class TestCommand(UserCommand):
description = 'Run tests'
user_options = []
user_options = [
('batsim-bin=', None, 'Path/to/batsim'),
('workloads-basedir=', None, 'Path/to/batsim/workloads'),
('platforms-basedir=', None, 'Path/to/batsim/platforms')
]
def initialize_options(self):
self.batsim_bin = None
self.workloads_basedir = None
self.platforms_basedir = None
def finalize_options(self):
self.args = []
if self.batsim_bin is not None:
self.args.append('BATSIMBIN=--batsim-bin=' + str(self.batsim_bin))
if self.workloads_basedir is not None:
self.args.append('WORKLOADSDIR=--workloads-basedir=' + str(self.workloads_basedir))
if self.platforms_basedir is not None:
self.args.append('PLATFORMSDIR=--platforms-basedir=' + str(self.platforms_basedir))
def run(self):
self.run_external_command("make", cwd="tests")
if len(self.args) > 0:
self.run_external_command("make", *self.args ,cwd="tests")
else:
self.run_external_command("make", cwd="tests")
class DocCommand(UserCommand):
......
......@@ -32,7 +32,7 @@ gantts_okular: $(GANTTS_OUT)
python3 ../../../../evalys/evalys/gantt.py $* $*/gantt.png
generate:
python3 generate.py
python3 generate.py $(BATSIMBIN) $(WORKLOADSDIR) $(PLATFORMSDIR)
clean:
rm tests/*/out_jobs.csv
......
......@@ -7,7 +7,7 @@ import os.path
import json
import copy
'''
def generate_energy(
workloads_basedir,
platforms_basedir,
......@@ -94,7 +94,7 @@ def generate_energy(
},
"scheduler": copy.deepcopy(s)
} for s in schedulers for w in workloads_to_use]
'''
def generate_basic(
workloads_basedir,
......@@ -151,7 +151,7 @@ def generate_sched_static(
options):
schedulers = []
schedulers += [
''' Problems in parent_job status
{
"name_expe": "sched_delayProfilesAsTasks",
"name": "schedDelayProfilesAsTasks",
......@@ -159,8 +159,11 @@ def generate_sched_static(
"protection": True,
"interpreter": "coverage",
"options": {
}
},
"dynamic": True
},
'''
schedulers += [
{
"name_expe": "sched_fillerSched",
"name": "schedFiller",
......@@ -168,7 +171,8 @@ def generate_sched_static(
"protection": True,
"interpreter": "coverage",
"options": {
}
},
"dynamic":False
},
{
"name_expe": "sched_backfilling",
......@@ -177,7 +181,8 @@ def generate_sched_static(
"protection": True,
"interpreter": "coverage",
"options": {
}
},
"dynamic":False
},
]
......@@ -208,7 +213,7 @@ def generate_sched_static(
"job_submission": {
"forward_profiles": True,
"from_scheduler": {
"enabled": True,
"enabled": s["dynamic"],
"acknowledge": True
}
}
......@@ -282,7 +287,7 @@ def generate_sched_script(
"job_submission": {
"forward_profiles": True,
"from_scheduler": {
"enabled": True,
"enabled": False,
"acknowledge": True
}
}
......@@ -367,12 +372,13 @@ def generate_sched(
batsim_bin,
batsim_args,
options)
''' Pyhton is not finding the path to batsim.sched.workloads
generate_sched_script(
workloads_basedir,
platforms_basedir,
batsim_bin,
batsim_args,
options)
options)'''
generate_sched_dynamic(
workloads_basedir,
platforms_basedir,
......@@ -449,13 +455,14 @@ def main(args):
batsim_args,
options)
''' easyEnergyBudget scheduler is no longer maintained
if energy:
generate_energy(
workloads_basedir,
platforms_basedir,
batsim_bin,
batsim_args,
options)
options)'''
if sched:
generate_sched(
......
......@@ -38,6 +38,7 @@ class DynamicTestScheduler(Scheduler):
])]))
w.submit(self)
self.notify_submission_finished()
def schedule(self):
return filler_sched(self,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment