scheduler.py 22 KB
Newer Older
1 2 3 4 5 6 7 8 9
"""
    batsim.sched.scheduler
    ~~~~~~~~~~~~~~~~~~~~~~

    This module provides a high-level interface for implementing schedulers for Batsim.
    It contains a basic scheduler used by Pybatsim directly and a high-level scheduler API which will
    interact with the basic scheduler to provide a richer interface.

"""
10
from abc import ABCMeta, abstractmethod
S. Lackner's avatar
S. Lackner committed
11

12
from batsim.batsim import BatsimScheduler, Batsim
S. Lackner's avatar
S. Lackner committed
13
from batsim.network import NetworkHandler
14
from batsim.tools.launcher import launch_scheduler_main
S. Lackner's avatar
S. Lackner committed
15

16
from .resource import Resources, ComputeResource
17
from .job import Job, Jobs
18
from .reply import ConsumedEnergyReply
19 20
from .utils import DictWrapper
from .messages import Message
21
from .utils import ListView
22
from .logging import Logger
23
from .events import LoggingEvent, EventLogger, EventList
24
from .workloads import WorkloadDescription
25

S. Lackner's avatar
S. Lackner committed
26

27
class BaseBatsimScheduler(BatsimScheduler):
28 29 30 31 32 33
    """The basic Pybatsim scheduler.

    :param scheduler: the high-level scheduler which uses this basic scheduler.

    :param options: the options given to the launcher.
    """
34 35

    def __init__(self, scheduler, options):
36 37
        super().__init__(options)

38 39
        self._scheduler = scheduler

40
        self._jobmap = {}
41
        self._next_job_number = 0
42

43 44 45 46
    def onSimulationBegins(self):
        self._scheduler.info(
            "Simulation begins",
            type="simulation_begins_received")
47
        self._scheduler._batsim = self.bs
48
        self._scheduler._update_time()
49 50 51
        self._scheduler._on_pre_init()
        self._scheduler.on_init()
        self._scheduler._on_post_init()
52 53

    def onSimulationEnds(self):
54
        self._scheduler._update_time()
55 56 57
        self._scheduler.info(
            "Simulation ends",
            type="simulation_ends_received")
58 59 60
        self._scheduler._on_pre_end()
        self._scheduler.on_end()
        self._scheduler._on_post_end()
61

62 63 64 65 66 67
    def onDeadlock(self):
        self._scheduler.debug(
            "batsim has reached a deadlock or is not responding",
            type="deadlock")
        self._scheduler.on_deadlock()

68
    def onJobsKilled(self, jobs):
69
        self._scheduler._update_time()
S. Lackner's avatar
S. Lackner committed
70
        self._scheduler.debug(
71 72 73
            "decision process received jobs kills({jobs})",
            jobs=jobs,
            type="jobs_killed_received2")
74
        jobobjs = []
75
        for job in jobs:
76 77
            jobobj = self._jobmap[job.id]
            del self._jobmap[job.id]
78 79
            jobobj.progress = job.progress
            jobobjs.append(jobobj)
S. Lackner's avatar
S. Lackner committed
80

81 82
        self._scheduler.info("The following jobs were killed: ({jobs})",
                             jobs=jobobjs, type="jobs_killed_received")
S. Lackner's avatar
S. Lackner committed
83

MERCIER Michael's avatar
MERCIER Michael committed
84
        jobobj._do_complete_job()
S. Lackner's avatar
S. Lackner committed
85

86
        self._scheduler.on_jobs_killed(jobobjs)
87
        self._scheduler._do_schedule()
88 89

    def onJobSubmission(self, job):
90
        self._scheduler._update_time()
S. Lackner's avatar
S. Lackner committed
91
        self._scheduler.debug(
92 93 94
            "decision process received job submission({job})",
            job=job,
            type="job_submission_received2")
S. Lackner's avatar
S. Lackner committed
95
        newjob = Job(
96
            number=self._next_job_number,
S. Lackner's avatar
S. Lackner committed
97 98 99
            batsim_job=job,
            scheduler=self._scheduler,
            jobs_list=self._scheduler.jobs)
100
        self._jobmap[job.id] = newjob
101
        self._next_job_number += 1
102

S. Lackner's avatar
S. Lackner committed
103
        self._scheduler.jobs.add(newjob)
104

105 106 107 108 109 110 111 112 113 114 115
        job_id_split = job.id.split(Batsim.WORKLOAD_JOB_SEPARATOR)
        workload_name = job_id_split[0]
        job_id = int(job_id_split[1])

        workload = None
        try:
            # Will succeed if job is dynamic job
            workload = self._scheduler._workload_map[workload_name]
        except KeyError:
            pass
        if workload:
MOMMESSIN Clement's avatar
MOMMESSIN Clement committed
116
            job_description = workload[job.id]
117 118 119
            job_description.job = newjob
            newjob._workload_description = workload

120 121 122 123 124 125
        self._scheduler.info(
            "Received job submission from Batsim (job={job}, open_jobs_in_queue={open_jobs_in_queue})",
            job=newjob,
            open_jobs_in_queue=len(
                self._scheduler.jobs.open),
            type="job_submission_received")
S. Lackner's avatar
S. Lackner committed
126

127
        self._scheduler.on_job_submission(newjob)
128
        self._scheduler._do_schedule()
129 130

    def onJobCompletion(self, job):
131
        self._scheduler._update_time()
S. Lackner's avatar
S. Lackner committed
132
        self._scheduler.debug(
133 134 135
            "decision process received job completion({job})",
            job=job,
            type="job_completion_received2")
136 137
        jobobj = self._jobmap[job.id]
        del self._jobmap[job.id]
S. Lackner's avatar
S. Lackner committed
138

139 140
        self._scheduler.info("Job has completed its execution ({job})",
                             job=jobobj, type="job_completion_received")
S. Lackner's avatar
S. Lackner committed
141

S. Lackner's avatar
S. Lackner committed
142
        jobobj._do_complete_job()
143 144

        self._scheduler.on_job_completion(jobobj)
145
        self._scheduler._do_schedule()
146

147
    def onJobMessage(self, timestamp, job, message):
148
        self._scheduler._update_time()
149 150 151 152
        self._scheduler.debug(
            "decision process received from job message({job} => {message})",
            job=job,
            message=message,
153
            type="job_message_received2")
154
        jobobj = self._jobmap[job.id]
155 156 157 158 159
        self._scheduler.info(
            "Got from job message({job} => {message})",
            job=jobobj,
            message=message,
            type="job_message_received")
160 161 162
        msg = Message(timestamp, message)
        jobobj.messages.append(msg)
        self._scheduler.on_job_message(jobobj, msg)
163
        self._scheduler._do_schedule()
164

165
    def onMachinePStateChanged(self, nodeid, pstate):
166
        self._scheduler._update_time()
167
        resource = self._scheduler.resources[nodeid]
168 169 170 171 172
        self._scheduler.info(
            "Resource state was updated ({resource}) to {pstate}",
            resource=resource,
            pstate=pstate,
            type="pstate_change_received")
173 174

        resource.update_pstate_change(pstate)
175

176
        self._scheduler.on_machine_pstate_changed(resource, pstate)
177
        self._scheduler._do_schedule()
178 179

    def onReportEnergyConsumed(self, consumed_energy):
180
        self._scheduler._update_time()
181 182 183 184
        self._scheduler.info(
            "Received reply from Batsim (energy_consumed={energy_consumed})",
            energy_consumed=consumed_energy,
            type="reply_energy_received")
185

186 187 188
        reply = BatsimReply(consumed_energy=consumed_energy)
        self._scheduler.on_report_energy_consumed(reply)
        self._scheduler._do_schedule(reply)
189

190 191 192 193 194 195 196 197 198
    def onAddResources(self, resources):
        self._scheduler._update_time()
        self._scheduler.info(
            "Received add Resources message: {resources}",
            resources=resources,
            type="add_resources_received")
        self._scheduler.on_add_resources(resources)

    def onRemoveResources(self, resources):
199
        # TODO Shouldn't be an update_time call here too?
200 201 202 203 204 205
        self._scheduler.info(
            "Received remove Resources message: {resources}",
            resources=resources,
            type="remove_resources_received")
        self._scheduler.on_remove_resources(resources)

S. Lackner's avatar
S. Lackner committed
206
    def onRequestedCall(self):
207 208 209 210 211 212
        self._scheduler._update_time()
        self._scheduler.info(
            "Received Requested Call message",
            type="requested_call_received")
        self._scheduler.on_requested_call()

213 214

class Scheduler(metaclass=ABCMeta):
215 216 217 218 219 220 221
    """The high-level scheduler which should be interited from by concrete scheduler
    implementations. All important Batsim functions are either available in the scheduler or used
    by the job/resource objects.

    :param options: the options given to the launcher.

    """
222

223 224 225 226 227
    @classmethod
    def launch_main(cls, **kwargs):
        """Initialise this scheduler class and run it as if it were started with the launcher."""
        launch_scheduler_main(cls, **kwargs)

228 229 230
    def __init__(self, options={}):
        self._options = options
        debug = self.options.get("debug", False)
231
        export_prefix = self.options.get("export-prefix", "out")
232
        self._log_debug_events = self.options.get("log-debug-events", False)
S. Lackner's avatar
S. Lackner committed
233

234 235
        # Create the logger
        self._logger = Logger(self, debug=debug)
236

S. Lackner's avatar
S. Lackner committed
237
        self._last_published_event = None
238
        self._event_logger = None
239 240 241
        self._event_logger = EventLogger(
            self, "Events", debug=debug,
            to_file="{}_events.csv".format(export_prefix))
242
        self._event_logger.info(LoggingEvent.get_csv_header())
243

244
        self._events = EventList()
245

246
        # Use the basic Pybatsim scheduler to wrap the Batsim API
247 248 249
        self._scheduler = BaseBatsimScheduler(self, options)

        self._time = 0
250

251
        self._reply = None
252

S. Lackner's avatar
S. Lackner committed
253 254
        self._sched_delay = float(
            options.get(
255
                "sched-delay",
256
                None) or 0.000000000000000000001)
257

258
        self._jobs = Jobs()
259 260
        self._resources = Resources()

261 262
        self._find_resource_handler = []

263 264 265
        self._workload_map = {}
        self._dynamic_workload = WorkloadDescription(name="DYNAMIC_WORKLOAD")

266
        self.debug("Scheduler initialised", type="scheduler_initialised")
267

268 269
    @property
    def events(self):
270
        """The events happened in the scheduler."""
271
        return self._events
272

273 274 275 276 277
    @property
    def machines(self):
        """The registered machines in Batsim."""
        return self._machines

278 279 280 281 282
    @property
    def dynamic_workload(self):
        """The workload of dynamic job submissions of this scheduler."""
        return self._dynamic_workload

283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307
    @property
    def options(self):
        """The options given to the launcher."""
        return self._options

    @property
    def resources(self):
        """The searchable collection of resources."""
        return self._resources

    @property
    def jobs(self):
        """The searchable collection of jobs."""
        return self._jobs

    @property
    def reply(self):
        """The last reply from Batsim (or None)."""
        return self._reply

    @property
    def time(self):
        """The current simulation time."""
        return self._time

308 309
    @property
    def has_time_sharing(self):
S. Lackner's avatar
S. Lackner committed
310
        """Whether or not time sharing is enabled."""
311 312
        return self._batsim.time_sharing

313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
    @property
    def get_find_resource_handlers(self):
        """The functions to find resource requirements for jobs."""
        return self._find_resource_handler

    def register_find_resource_handler(self, handler):
        """Adds a resource handler for searching resource requirements for jobs.

        :param handler: a function which should return an iterable
                        (or generator) containing `ResourceRequirement`
                        objects. The function should determine
                        absolutely necessary resource requirements
                        needed by this job. For example, when all jobs
                        should always allocate a specific external
                        special resource like allocating I/O nodes
                        not managed by Batsim.
                        Signature: scheduler, job
        """
        self._find_resource_handler.append(handler)

    def unregister_find_resource_handler(self, handler):
        """Removes a resource handler.

        :param handler: the function to be removed
        """
        self._find_resource_handler.remove(handler)

340
    def run_scheduler_at(self, time):
341
        """Wake the scheduler at the given point in time (of the simulation)."""
342 343 344
        self._batsim.wake_me_up_at(time)

    def request_consumed_energy(self):
345
        """Request the consumed energy from Batsim."""
346 347
        self._batsim.request_consumed_energy()

348
    def __call__(self):
349
        """Return the underlying Pybatsim scheduler."""
350 351
        return self._scheduler

352 353 354 355 356 357 358
    def _format_log_msg(self, msg, **kwargs):
        msg = msg.format(**kwargs)
        return "{:.6f} | {}".format(self.time, msg)

    def _format_event_msg(self, level, msg, type="msg", **kwargs):
        msg = msg.format(**kwargs)

359 360 361 362
        try:
            open_jobs = self._batsim.nb_jobs_received
            processed_jobs = (self._batsim.nb_jobs_completed +
                              self._batsim.nb_jobs_failed +
363
                              self._batsim.nb_jobs_rejected +
364 365 366 367 368 369 370 371 372 373
                              self._batsim.nb_jobs_timeout +
                              self._batsim.nb_jobs_killed +
                              len(self._batsim.jobs_manually_changed))
        except AttributeError:
            # Batsim is not initialised
            open_jobs = 0
            processed_jobs = 0

        event = LoggingEvent(self.time, level, open_jobs, processed_jobs,
                             msg, type, kwargs)
374

375
        self._events.add(event)
376

377
        event_str = str(event)
378 379

        try:
S. Lackner's avatar
S. Lackner committed
380 381 382 383 384 385 386 387 388 389 390
            do_publish = True

            if self._last_published_event is not None:
                if (self._last_published_event.time >= event.time and
                    self._last_published_event.open_jobs == event.open_jobs and
                        self._last_published_event.processed_jobs == event.processed_jobs):
                    do_publish = False

            if do_publish:
                self._batsim.publish_event(event_str)
                self._last_published_event = event
391 392 393 394
        except AttributeError:
            # Batsim is not initialised
            pass

395 396
        self.on_event(event)

397
        return event.to_csv_line()
398 399 400

    def debug(self, msg, **kwargs):
        """Writes a debug message to the logging facility."""
401
        if self._log_debug_events:
S. Lackner's avatar
S. Lackner committed
402 403
            self._logger.debug(self._format_log_msg(msg, **kwargs))
            event = self._format_event_msg(1, msg, **kwargs)
404
            self._event_logger.info(event)
405 406 407 408

    def info(self, msg, **kwargs):
        """Writes a info message to the logging facility."""
        self._logger.info(self._format_log_msg(msg, **kwargs))
409
        event = self._format_event_msg(2, msg, **kwargs)
410
        self._event_logger.info(event)
411 412 413 414

    def warn(self, msg, **kwargs):
        """Writes a warn message to the logging facility."""
        self._logger.warn(self._format_log_msg(msg, **kwargs))
415
        event = self._format_event_msg(3, msg, **kwargs)
416
        self._event_logger.info(event)
417 418 419 420

    def error(self, msg, **kwargs):
        """Writes a error message to the logging facility."""
        self._logger.error(self._format_log_msg(msg, **kwargs))
421
        event = self._format_event_msg(4, msg, **kwargs)
422
        self._event_logger.info(event)
423

S. Lackner's avatar
S. Lackner committed
424 425 426 427
    def fatal(self, msg, **kwargs):
        """Writes a fatal message to the logging facility and terminates the scheduler."""
        error_msg = self._format_log_msg(msg, **kwargs)
        self._logger.error(error_msg)
428
        event = self._format_event_msg(5, msg, **kwargs)
429
        self._event_logger.info(event)
S. Lackner's avatar
S. Lackner committed
430 431
        raise ValueError("Fatal error: {}".format(error_msg))

432
    def _on_pre_init(self):
433 434 435 436
        """The _pre_init method called during the start-up phase of the scheduler.
        If the _pre_init method is overridden the super method should be called with:
        `super()._pre_init()`
        """
437
        for r in self._batsim.resources.values():
438 439 440 441 442 443
            self._resources.add(ComputeResource(self,
                                                id=r["id"],
                                                name=r["name"],
                                                resources_list=self.resources,
                                                state=r["state"],
                                                properties=r["properties"]))
444 445 446 447 448
        self.info(
            "{num_resources} resources registered",
            num_resources=len(
                self.resources),
            type="resources_registered")
449

450
        self._machines = DictWrapper(self._batsim.machines)
451

452
    def on_init(self):
453
        """The init method called during the start-up phase of the scheduler."""
454 455
        pass

456
    def _on_post_init(self):
457 458 459 460
        """The _post_init method called during the start-up phase of the scheduler.
        If the _post_init method is overridden the super method should be called with:
        `super()._post_init()`
        """
461 462 463
        pass

    def _pre_schedule(self):
464 465 466 467
        """The _pre_schedule method called during the scheduling phase of the scheduler.
        If the _pre_schedule method is overridden the super method should be called with:
        `super()._pre_schedule()`
        """
468 469 470
        self.debug(
            "Starting scheduling iteration",
            type="scheduling_iteration_started")
471

472 473 474 475 476 477 478
        if self.jobs.open:
            self.debug(
                "{num_jobs} jobs open at start of scheduling iteration",
                num_jobs=len(
                    self.jobs.open),
                type="jobs_open_at_start")

479 480
    @abstractmethod
    def schedule(self):
481
        """The schedule method called during the scheduling phase of the scheduler."""
482 483 484
        pass

    def _post_schedule(self):
485 486 487 488
        """The _post_schedule method called during the scheduling phase of the scheduler.
        If the _post_schedule method is overridden the super method should be called with:
        `super()._post_schedule()`
        """
489
        if self.jobs.open:
490
            self.debug(
491 492 493 494
                "{num_jobs} jobs open at end of scheduling iteration",
                num_jobs=len(
                    self.jobs.open),
                type="jobs_open_at_end")
495

496 497 498
        self.debug(
            "Ending scheduling iteration",
            type="scheduling_iteration_ended")
499

500 501 502
    def _update_time(self):
        self._time = self._batsim.time()

503
    def _do_schedule(self, reply=None):
S. Lackner's avatar
S. Lackner committed
504 505 506 507
        """Internal method to execute a scheduling iteration.

        :param reply: the reply set by Batsim (most of the time there is no reply object)
        """
508 509 510 511 512
        self._reply = reply
        self._pre_schedule()
        self.schedule()
        self._post_schedule()

S. Lackner's avatar
S. Lackner committed
513 514
        # Fast forward the time after the iteration. The time can be set through
        # a scheduler starting option.
515 516
        self._batsim.consume_time(self._sched_delay)

517
    def _on_pre_end(self):
518 519 520 521
        """The _pre_end method called during the shut-down phase of the scheduler.
        If the _pre_end method is overridden the super method should be called with:
        `super()._pre_end()`
        """
522
        if self.jobs.open:
S. Lackner's avatar
S. Lackner committed
523
            self.warn(
524 525 526 527
                "{num_jobs} jobs still in state open at end of simulation",
                num_jobs=len(
                    self.jobs.open),
                type="open_jobs_warning")
528

529
    def on_end(self):
530
        """The end method called during the shut-down phase of the scheduler."""
531 532
        pass

533
    def _on_post_end(self):
534 535 536 537
        """The _post_end method called during the shut-down phase of the scheduler.
        If the _post_end method is overridden the super method should be called with:
        `super()._post_end()`
        """
538
        pass
539 540

    def on_nop(self):
S. Lackner's avatar
S. Lackner committed
541
        """Hook similar to the low-level API."""
542 543
        pass

544 545 546
    def on_deadlock(self):
        raise ValueError("Batsim has reached a deadlock")

547
    def on_jobs_killed(self, jobs):
S. Lackner's avatar
S. Lackner committed
548 549 550 551
        """Hook similar to the low-level API.

        :param jobs: the killed jobs (higher-level job objects)
        """
552 553 554
        pass

    def on_job_submission(self, job):
S. Lackner's avatar
S. Lackner committed
555 556 557 558
        """Hook similar to the low-level API.

        :param job: the submitted job (higher-level job object)
        """
559 560 561
        pass

    def on_job_completion(self, job):
S. Lackner's avatar
S. Lackner committed
562 563 564 565
        """Hook similar to the low-level API.

        :param job: the completed job (higher-level job object)
        """
566 567
        pass

568 569 570 571 572 573 574 575 576
    def on_job_message(self, job, message):
        """Hook similar to the low-level API.

        :param job: the sending job

        :param message: the sent message
        """
        pass

577
    def on_machine_pstate_changed(self, resource, pstate):
S. Lackner's avatar
S. Lackner committed
578 579 580 581 582 583
        """Hook similar to the low-level API.

        :param resource: the changed resource (higher-level job object)

        :param pstate: the new pstate
        """
584 585 586
        pass

    def on_report_energy_consumed(self, consumed_energy):
S. Lackner's avatar
S. Lackner committed
587 588
        """Hook similar to the low-level API.

589
        :param consumed_energy: the consumed energy (higher-level reply object)
S. Lackner's avatar
S. Lackner committed
590
        """
591
        pass
592

593 594 595 596 597 598 599 600 601 602 603 604 605 606
    def on_add_resources(self, resources):
        """Hook similar to the low-level API.

        :param resources: a procset of resources
        """
        pass

    def on_remove_resources(self, resources):
        """Hook similar to the low-level API.

        :param resources: a procset of resources
        """
        pass

607 608 609 610
    def on_requested_call(self):
        """Hook similar to the low-level API."""
        pass

611
    def on_event(self, event):
S. Lackner's avatar
S. Lackner committed
612 613
        """Hook called on each event triggered by the scheduler.

614
        :param event: the triggered event (class: `LoggingEvent`)
S. Lackner's avatar
S. Lackner committed
615
        """
616 617
        pass

618
    def register_dynamic_job(self, *args, **kwargs):
619 620 621 622
        job = self._dynamic_workload.new_job(*args, **kwargs)
        self._dynamic_workload.prepare()
        job.submit(self)

623 624
    def notify_registration_finished(self):
        self._batsim.notify_registration_finished()
MOMMESSIN Clement's avatar
MOMMESSIN Clement committed
625

626

627
def as_scheduler(*args, on_init=[], on_end=[], base_classes=[], **kwargs):
S. Lackner's avatar
S. Lackner committed
628 629 630
    """Decorator to convert a function to a scheduler class.

    The function should accept the scheduler as first argument and optionally
631
    `*args` and `**kwargs` arguments which will be given from additional arguments
S. Lackner's avatar
S. Lackner committed
632 633 634 635 636 637 638 639
    to the call of the decorator.

    :param args: additional arguments passed to the scheduler function (in each iteration)

    :param base_class: the class to use as a base class for the scheduler (must be a subclass of Scheduler)

    :param kwargs: additional arguments passed to the scheduler function (in each iteration)
    """
640
    base_classes = base_classes.copy()
641
    base_classes.append(Scheduler)
S. Lackner's avatar
S. Lackner committed
642

643
    def convert_to_scheduler(schedule_function):
644 645 646 647 648 649 650 651 652
        class InheritedScheduler(*base_classes):
            def __init__(self, *init_args, **init_kwargs):
                super().__init__(*init_args, **init_kwargs)

            def _on_pre_init(self):
                super()._on_pre_init()
                for i in on_init:
                    i(self)

653 654
            def schedule(self):
                schedule_function(self, *args, **kwargs)
655 656 657 658 659 660

            def _on_pre_end(self):
                super()._on_pre_end()
                for e in on_end:
                    e(self)

661 662 663 664
        InheritedScheduler.__name__ = schedule_function.__name__

        return InheritedScheduler
    return convert_to_scheduler