scheduler.py 20.4 KB
Newer Older
1 2 3 4 5 6 7 8 9
"""
    batsim.sched.scheduler
    ~~~~~~~~~~~~~~~~~~~~~~

    This module provides a high-level interface for implementing schedulers for Batsim.
    It contains a basic scheduler used by Pybatsim directly and a high-level scheduler API which will
    interact with the basic scheduler to provide a richer interface.

"""
10
from abc import ABCMeta, abstractmethod
S. Lackner's avatar
S. Lackner committed
11

12
from batsim.batsim import BatsimScheduler, Batsim
S. Lackner's avatar
S. Lackner committed
13

14
from .resource import Resources, Resource
15
from .job import Job, Jobs
16
from .reply import ConsumedEnergyReply
17 18
from .utils import DictWrapper
from .messages import Message
19
from .utils import ListView
20
from .logging import LoggingEvent, Logger, EventLogger
21
from .workloads import WorkloadDescription
22

S. Lackner's avatar
S. Lackner committed
23

24
class BaseBatsimScheduler(BatsimScheduler):
25 26 27 28 29 30
    """The basic Pybatsim scheduler.

    :param scheduler: the high-level scheduler which uses this basic scheduler.

    :param options: the options given to the launcher.
    """
31 32

    def __init__(self, scheduler, options):
33 34
        super().__init__(options)

35 36
        self._scheduler = scheduler

37 38
        self._jobmap = {}

39 40 41 42
    def onSimulationBegins(self):
        self._scheduler.info(
            "Simulation begins",
            type="simulation_begins_received")
43
        self._scheduler._batsim = self.bs
44
        self._scheduler._update_time()
45 46 47
        self._scheduler._on_pre_init()
        self._scheduler.on_init()
        self._scheduler._on_post_init()
48 49

    def onSimulationEnds(self):
50
        self._scheduler._update_time()
51 52 53
        self._scheduler.info(
            "Simulation ends",
            type="simulation_ends_received")
54 55 56
        self._scheduler._on_pre_end()
        self._scheduler.on_end()
        self._scheduler._on_post_end()
57

58 59 60 61 62 63
    def onDeadlock(self):
        self._scheduler.debug(
            "batsim has reached a deadlock or is not responding",
            type="deadlock")
        self._scheduler.on_deadlock()

64
    def onNOP(self):
65
        self._scheduler._update_time()
66 67 68
        self._scheduler.debug(
            "decision process received NOP",
            type="nop_received")
69
        self._scheduler.on_nop()
70
        self._scheduler._do_schedule()
71 72

    def onJobsKilled(self, jobs):
73
        self._scheduler._update_time()
S. Lackner's avatar
S. Lackner committed
74
        self._scheduler.debug(
75 76 77
            "decision process received jobs kills({jobs})",
            jobs=jobs,
            type="jobs_killed_received2")
78
        jobobjs = []
79
        for job in jobs:
80 81 82
            jobobj = self._jobmap[job.id]
            del self._jobmap[job.id]
            jobobjs.append(job)
S. Lackner's avatar
S. Lackner committed
83

84 85
        self._scheduler.info("The following jobs were killed: ({jobs})",
                             jobs=jobobjs, type="jobs_killed_received")
S. Lackner's avatar
S. Lackner committed
86 87

        for job in jobobjs:
S. Lackner's avatar
S. Lackner committed
88
            job._do_complete_job()
89
            self._scheduler._log_job(self._scheduler.time, job, "killed")
S. Lackner's avatar
S. Lackner committed
90

91
        self._scheduler.on_jobs_killed(jobobjs)
92
        self._scheduler._do_schedule()
93 94

    def onJobSubmission(self, job):
95
        self._scheduler._update_time()
S. Lackner's avatar
S. Lackner committed
96
        self._scheduler.debug(
97 98 99
            "decision process received job submission({job})",
            job=job,
            type="job_submission_received2")
S. Lackner's avatar
S. Lackner committed
100 101 102 103
        newjob = Job(
            batsim_job=job,
            scheduler=self._scheduler,
            jobs_list=self._scheduler.jobs)
104 105
        self._jobmap[job.id] = newjob

S. Lackner's avatar
S. Lackner committed
106
        self._scheduler.jobs.add(newjob)
107

108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
        job_id_split = job.id.split(Batsim.WORKLOAD_JOB_SEPARATOR)
        workload_name = job_id_split[0]
        job_id = int(job_id_split[1])

        workload = None
        try:
            # Will succeed if job is dynamic job
            workload = self._scheduler._workload_map[workload_name]
        except KeyError:
            pass
        if workload:
            job_description = workload[job_id]
            job_description.job = newjob
            newjob._workload_description = workload

123 124
        self._scheduler.info("Received job submission from Batsim ({job})",
                             job=newjob, type="job_submission_received")
S. Lackner's avatar
S. Lackner committed
125

126
        self._scheduler.on_job_submission(newjob)
127
        self._scheduler._do_schedule()
128 129

    def onJobCompletion(self, job):
130
        self._scheduler._update_time()
S. Lackner's avatar
S. Lackner committed
131
        self._scheduler.debug(
132 133 134
            "decision process received job completion({job})",
            job=job,
            type="job_completion_received2")
135 136
        jobobj = self._jobmap[job.id]
        del self._jobmap[job.id]
S. Lackner's avatar
S. Lackner committed
137

138 139
        self._scheduler.info("Job has completed its execution ({job})",
                             job=jobobj, type="job_completion_received")
140
        self._scheduler._log_job(self._scheduler.time, jobobj, "completed")
S. Lackner's avatar
S. Lackner committed
141

S. Lackner's avatar
S. Lackner committed
142
        jobobj._do_complete_job()
143 144

        self._scheduler.on_job_completion(jobobj)
145
        self._scheduler._do_schedule()
146

147
    def onJobMessage(self, timestamp, job, message):
148
        self._scheduler._update_time()
149 150 151 152
        self._scheduler.debug(
            "decision process received from job message({job} => {message})",
            job=job,
            message=message,
153
            type="job_message_received2")
154
        jobobj = self._jobmap[job.id]
155 156 157 158 159
        self._scheduler.info(
            "Got from job message({job} => {message})",
            job=jobobj,
            message=message,
            type="job_message_received")
160 161 162
        msg = Message(timestamp, message)
        jobobj.messages.append(msg)
        self._scheduler.on_job_message(jobobj, msg)
163
        self._scheduler._do_schedule()
164

165
    def onMachinePStateChanged(self, nodeid, pstate):
166
        self._scheduler._update_time()
167
        resource = self._scheduler.resources[nodeid]
168 169 170 171 172
        self._scheduler.info(
            "Resource state was updated ({resource}) to {pstate}",
            resource=resource,
            pstate=pstate,
            type="pstate_change_received")
173 174

        resource.update_pstate_change(pstate)
175

176
        self._scheduler.on_machine_pstate_changed(resource, pstate)
177
        self._scheduler._do_schedule()
178 179

    def onReportEnergyConsumed(self, consumed_energy):
180
        self._scheduler._update_time()
181 182 183 184
        self._scheduler.info(
            "Received reply from Batsim (energy_consumed={energy_consumed})",
            energy_consumed=consumed_energy,
            type="reply_energy_received")
185

186 187 188
        reply = BatsimReply(consumed_energy=consumed_energy)
        self._scheduler.on_report_energy_consumed(reply)
        self._scheduler._do_schedule(reply)
189 190 191


class Scheduler(metaclass=ABCMeta):
192 193 194 195 196 197 198
    """The high-level scheduler which should be interited from by concrete scheduler
    implementations. All important Batsim functions are either available in the scheduler or used
    by the job/resource objects.

    :param options: the options given to the launcher.

    """
199

200 201 202 203
    def __init__(self, options={}):
        self._options = options
        debug = self.options.get("debug", False)
        export_prefix = self.options.get("export_prefix", "out")
S. Lackner's avatar
S. Lackner committed
204

205 206
        # Create the logger
        self._logger = Logger(self, debug=debug)
207

208 209 210 211
        self._event_logger = EventLogger(
            self, "Events", debug=debug, to_file="{}_last_events.csv".format(
                export_prefix),
            append_to_file="{}_events.csv".format(export_prefix))
212

213 214 215 216 217 218
        self._sched_jobs_logger = EventLogger(
            self,
            "SchedJobs",
            debug=debug,
            to_file="{}_sched_jobs.csv".format(export_prefix))
        self._log_job_header()
219

220
        self._events = []
221

222
        # Use the basic Pybatsim scheduler to wrap the Batsim API
223 224 225
        self._scheduler = BaseBatsimScheduler(self, options)

        self._time = 0
226

227
        self._reply = None
228

S. Lackner's avatar
S. Lackner committed
229 230 231
        self._sched_delay = float(
            options.get(
                "sched_delay",
232
                None) or 0.000000000000000000001)
233

234
        self._jobs = Jobs()
235 236
        self._resources = Resources()

237 238 239
        self._workload_map = {}
        self._dynamic_workload = WorkloadDescription(name="DYNAMIC_WORKLOAD")

240
        self.debug("Scheduler initialised", type="scheduler_initialised")
241

242 243
    @property
    def events(self):
244 245
        """The events happened in the scheduler."""
        return ListView(self._events)
246

247 248 249 250 251
    @property
    def dynamic_workload(self):
        """The workload of dynamic job submissions of this scheduler."""
        return self._dynamic_workload

252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
    @property
    def hpst(self):
        """The hpst (high-performance storage tier) host managed by Batsim."""
        return self._hpst

    @property
    def lcst(self):
        """The lcst (large-capacity storage tier) host managed by Batsim."""
        return self._lcst

    @property
    def pfs(self):
        """The pfs (parallel file system) host managed by Batsim. This is an alias
        to the host of the large-capacity storage tier.
        """
        return self.lcst

269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
    @property
    def options(self):
        """The options given to the launcher."""
        return self._options

    @property
    def resources(self):
        """The searchable collection of resources."""
        return self._resources

    @property
    def jobs(self):
        """The searchable collection of jobs."""
        return self._jobs

    @property
    def reply(self):
        """The last reply from Batsim (or None)."""
        return self._reply

    @property
    def time(self):
        """The current simulation time."""
        return self._time

294 295
    @property
    def has_time_sharing(self):
S. Lackner's avatar
S. Lackner committed
296
        """Whether or not time sharing is enabled."""
297 298
        return self._batsim.time_sharing

299
    def run_scheduler_at(self, time):
300
        """Wake the scheduler at the given point in time (of the simulation)."""
301 302 303
        self._batsim.wake_me_up_at(time)

    def request_consumed_energy(self):
304
        """Request the consumed energy from Batsim."""
305 306
        self._batsim.request_consumed_energy()

307
    def __call__(self):
308
        """Return the underlying Pybatsim scheduler."""
309 310
        return self._scheduler

311 312 313 314 315 316 317
    def _format_log_msg(self, msg, **kwargs):
        msg = msg.format(**kwargs)
        return "{:.6f} | {}".format(self.time, msg)

    def _format_event_msg(self, level, msg, type="msg", **kwargs):
        msg = msg.format(**kwargs)

318
        event = LoggingEvent(self.time, level, msg, type, kwargs)
319 320 321 322 323 324

        self._events.append(event)
        self.on_event(event)

        return str(event)

325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
    def _log_job_header(self):
        header = [
            "time",
            "full_job_id",
            "workload_name",
            "job_id",
            "full_parent_job_id",
            "parent_workload_name",
            "parent_job_id",
            "submission_time",
            "requested_number_of_processors",
            "requested_time",
            "success",
            "starting_time",
            "finish_time",
            "comment",
            "type",
            "reason"
        ]
        self._sched_jobs_logger.info(";".join([str(i) for i in header]))

    def _log_job(
            self,
            time,
            job,
            type_of_completion,
            reason_for_completion=""):
        full_parent_job_id = ""
        parent_job_id = ""
        parent_workload_name = ""
        if job.parent_job:
            full_parent_job_id = job.parent_job.id
            split_parent = full_parent_job_id.split(
                Batsim.WORKLOAD_JOB_SEPARATOR)
            parent_workload_name = split_parent[0]
            parent_job_id = split_parent[1]

        id = job.id.split(Batsim.WORKLOAD_JOB_SEPARATOR)
        msg = [
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
            time,                       # time
            job.id,                     # full_job_id
            id[0],                      # workload_name
            id[1],                      # job_id
            full_parent_job_id,         # full_parent_job_id
            parent_workload_name,       # parent_workload_name
            parent_job_id,              # parent_job_id
            job.submit_time,            # submission_time
            job.requested_resources,    # requested_number_of_processors
            job.requested_time,         # requested_time
            1 if job.success else 0,    # success
            job.start_time,             # starting_time
            job.finish_time,            # finish_time
            job.comment or "",          # comment
            type_of_completion,         # type
            reason_for_completion       # reason
380
        ]
381
        msg = ["" if s is None else s for s in msg]
382 383
        self._sched_jobs_logger.info(";".join([str(i) for i in msg]))

384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402
    def debug(self, msg, **kwargs):
        """Writes a debug message to the logging facility."""
        self._logger.debug(self._format_log_msg(msg, **kwargs))
        self._event_logger.info(self._format_event_msg(1, msg, **kwargs))

    def info(self, msg, **kwargs):
        """Writes a info message to the logging facility."""
        self._logger.info(self._format_log_msg(msg, **kwargs))
        self._event_logger.info(self._format_event_msg(2, msg, **kwargs))

    def warn(self, msg, **kwargs):
        """Writes a warn message to the logging facility."""
        self._logger.warn(self._format_log_msg(msg, **kwargs))
        self._event_logger.info(self._format_event_msg(3, msg, **kwargs))

    def error(self, msg, **kwargs):
        """Writes a error message to the logging facility."""
        self._logger.error(self._format_log_msg(msg, **kwargs))
        self._event_logger.info(self._format_event_msg(4, msg, **kwargs))
403

S. Lackner's avatar
S. Lackner committed
404 405 406 407 408 409 410
    def fatal(self, msg, **kwargs):
        """Writes a fatal message to the logging facility and terminates the scheduler."""
        error_msg = self._format_log_msg(msg, **kwargs)
        self._logger.error(error_msg)
        self._event_logger.info(self._format_event_msg(5, msg, **kwargs))
        raise ValueError("Fatal error: {}".format(error_msg))

411
    def _on_pre_init(self):
412 413 414 415
        """The _pre_init method called during the start-up phase of the scheduler.
        If the _pre_init method is overridden the super method should be called with:
        `super()._pre_init()`
        """
416
        for r in self._batsim.resources:
S. Lackner's avatar
S. Lackner committed
417 418 419 420 421 422
            self._resources.add(Resource(self,
                                         r["id"],
                                         r["name"],
                                         r["state"],
                                         r["properties"],
                                         self.resources))
423 424 425 426 427
        self.info(
            "{num_resources} resources registered",
            num_resources=len(
                self.resources),
            type="resources_registered")
428

429
        self._hpst = DictWrapper(self._batsim.hpst)
S. Lackner's avatar
S. Lackner committed
430
        self._lcst = DictWrapper(self._batsim.lcst)
431

432
    def on_init(self):
433
        """The init method called during the start-up phase of the scheduler."""
434 435
        pass

436
    def _on_post_init(self):
437 438 439 440
        """The _post_init method called during the start-up phase of the scheduler.
        If the _post_init method is overridden the super method should be called with:
        `super()._post_init()`
        """
441 442 443
        pass

    def _pre_schedule(self):
444 445 446 447
        """The _pre_schedule method called during the scheduling phase of the scheduler.
        If the _pre_schedule method is overridden the super method should be called with:
        `super()._pre_schedule()`
        """
448 449 450
        self.debug(
            "Starting scheduling iteration",
            type="scheduling_iteration_started")
451

452 453 454 455 456 457 458
        if self.jobs.open:
            self.debug(
                "{num_jobs} jobs open at start of scheduling iteration",
                num_jobs=len(
                    self.jobs.open),
                type="jobs_open_at_start")

459 460
    @abstractmethod
    def schedule(self):
461
        """The schedule method called during the scheduling phase of the scheduler."""
462 463 464
        pass

    def _post_schedule(self):
465 466 467 468
        """The _post_schedule method called during the scheduling phase of the scheduler.
        If the _post_schedule method is overridden the super method should be called with:
        `super()._post_schedule()`
        """
469
        if self.jobs.open:
470
            self.debug(
471 472 473 474
                "{num_jobs} jobs open at end of scheduling iteration",
                num_jobs=len(
                    self.jobs.open),
                type="jobs_open_at_end")
475

476 477 478
        self.debug(
            "Ending scheduling iteration",
            type="scheduling_iteration_ended")
479

480 481 482
    def _update_time(self):
        self._time = self._batsim.time()

483
    def _do_schedule(self, reply=None):
S. Lackner's avatar
S. Lackner committed
484 485 486 487
        """Internal method to execute a scheduling iteration.

        :param reply: the reply set by Batsim (most of the time there is no reply object)
        """
488 489 490 491 492
        self._reply = reply
        self._pre_schedule()
        self.schedule()
        self._post_schedule()

S. Lackner's avatar
S. Lackner committed
493 494
        # Fast forward the time after the iteration. The time can be set through
        # a scheduler starting option.
495 496
        self._batsim.consume_time(self._sched_delay)

497
    def _on_pre_end(self):
498 499 500 501
        """The _pre_end method called during the shut-down phase of the scheduler.
        If the _pre_end method is overridden the super method should be called with:
        `super()._pre_end()`
        """
502
        if self.jobs.open:
S. Lackner's avatar
S. Lackner committed
503
            self.warn(
504 505 506 507
                "{num_jobs} jobs still in state open at end of simulation",
                num_jobs=len(
                    self.jobs.open),
                type="open_jobs_warning")
508

509
    def on_end(self):
510
        """The end method called during the shut-down phase of the scheduler."""
511 512
        pass

513
    def _on_post_end(self):
514 515 516 517
        """The _post_end method called during the shut-down phase of the scheduler.
        If the _post_end method is overridden the super method should be called with:
        `super()._post_end()`
        """
518
        pass
519 520

    def on_nop(self):
S. Lackner's avatar
S. Lackner committed
521
        """Hook similar to the low-level API."""
522 523
        pass

524 525 526
    def on_deadlock(self):
        raise ValueError("Batsim has reached a deadlock")

527
    def on_jobs_killed(self, jobs):
S. Lackner's avatar
S. Lackner committed
528 529 530 531
        """Hook similar to the low-level API.

        :param jobs: the killed jobs (higher-level job objects)
        """
532 533 534
        pass

    def on_job_submission(self, job):
S. Lackner's avatar
S. Lackner committed
535 536 537 538
        """Hook similar to the low-level API.

        :param job: the submitted job (higher-level job object)
        """
539 540 541
        pass

    def on_job_completion(self, job):
S. Lackner's avatar
S. Lackner committed
542 543 544 545
        """Hook similar to the low-level API.

        :param job: the completed job (higher-level job object)
        """
546 547
        pass

548 549 550 551 552 553 554 555 556
    def on_job_message(self, job, message):
        """Hook similar to the low-level API.

        :param job: the sending job

        :param message: the sent message
        """
        pass

557
    def on_machine_pstate_changed(self, resource, pstate):
S. Lackner's avatar
S. Lackner committed
558 559 560 561 562 563
        """Hook similar to the low-level API.

        :param resource: the changed resource (higher-level job object)

        :param pstate: the new pstate
        """
564 565 566
        pass

    def on_report_energy_consumed(self, consumed_energy):
S. Lackner's avatar
S. Lackner committed
567 568
        """Hook similar to the low-level API.

569
        :param consumed_energy: the consumed energy (higher-level reply object)
S. Lackner's avatar
S. Lackner committed
570
        """
571
        pass
572

573
    def on_event(self, event):
S. Lackner's avatar
S. Lackner committed
574 575
        """Hook called on each event triggered by the scheduler.

576
        :param event: the triggered event (class: `LoggingEvent`)
S. Lackner's avatar
S. Lackner committed
577
        """
578 579
        pass

580 581 582 583 584
    def submit_dynamic_job(self, *args, **kwargs):
        job = self._dynamic_workload.new_job(*args, **kwargs)
        self._dynamic_workload.prepare()
        job.submit(self)

585

586
def as_scheduler(*args, on_init=[], on_end=[], base_classes=[], **kwargs):
S. Lackner's avatar
S. Lackner committed
587 588 589
    """Decorator to convert a function to a scheduler class.

    The function should accept the scheduler as first argument and optionally
590
    `*args` and `**kwargs` arguments which will be given from additional arguments
S. Lackner's avatar
S. Lackner committed
591 592 593 594 595 596 597 598
    to the call of the decorator.

    :param args: additional arguments passed to the scheduler function (in each iteration)

    :param base_class: the class to use as a base class for the scheduler (must be a subclass of Scheduler)

    :param kwargs: additional arguments passed to the scheduler function (in each iteration)
    """
599
    base_classes = base_classes.copy()
600
    base_classes.append(Scheduler)
S. Lackner's avatar
S. Lackner committed
601

602
    def convert_to_scheduler(schedule_function):
603 604 605 606 607 608 609 610 611
        class InheritedScheduler(*base_classes):
            def __init__(self, *init_args, **init_kwargs):
                super().__init__(*init_args, **init_kwargs)

            def _on_pre_init(self):
                super()._on_pre_init()
                for i in on_init:
                    i(self)

612 613
            def schedule(self):
                schedule_function(self, *args, **kwargs)
614 615 616 617 618 619

            def _on_pre_end(self):
                super()._on_pre_end()
                for e in on_end:
                    e(self)

620 621 622 623
        InheritedScheduler.__name__ = schedule_function.__name__

        return InheritedScheduler
    return convert_to_scheduler