scheduler.py 18.8 KB
Newer Older
1 2 3 4 5 6 7 8 9
"""
    batsim.sched.scheduler
    ~~~~~~~~~~~~~~~~~~~~~~

    This module provides a high-level interface for implementing schedulers for Batsim.
    It contains a basic scheduler used by Pybatsim directly and a high-level scheduler API which will
    interact with the basic scheduler to provide a richer interface.

"""
10
import logging
11
import os
12
from abc import ABCMeta, abstractmethod
S. Lackner's avatar
S. Lackner committed
13

14
from batsim.batsim import BatsimScheduler
S. Lackner's avatar
S. Lackner committed
15

16
from .resource import Resources, Resource
17
from .job import Job, Jobs
18
from .reply import ConsumedEnergyReply
19 20
from .utils import DictWrapper
from .messages import Message
21

S. Lackner's avatar
S. Lackner committed
22

23
class BaseBatsimScheduler(BatsimScheduler):
24 25 26 27 28 29
    """The basic Pybatsim scheduler.

    :param scheduler: the high-level scheduler which uses this basic scheduler.

    :param options: the options given to the launcher.
    """
30 31 32 33 34

    def __init__(self, scheduler, options):
        self._scheduler = scheduler
        self._options = options

35 36
        self._jobmap = {}

37
    def onAfterBatsimInit(self):
S. Lackner's avatar
S. Lackner committed
38
        self._scheduler.debug(
39
            "decision process is executing after batsim init", type="on_init")
40
        self._scheduler._batsim = self.bs
41
        self._scheduler._update_time()
42 43 44
        self._scheduler._on_pre_init()
        self._scheduler.on_init()
        self._scheduler._on_post_init()
45 46

    def onSimulationBegins(self):
47 48 49
        self._scheduler.info(
            "Simulation begins",
            type="simulation_begins_received")
50 51

    def onSimulationEnds(self):
52
        self._scheduler._update_time()
53 54 55
        self._scheduler.info(
            "Simulation ends",
            type="simulation_ends_received")
56 57 58
        self._scheduler._on_pre_end()
        self._scheduler.on_end()
        self._scheduler._on_post_end()
59 60

    def onNOP(self):
61
        self._scheduler._update_time()
62 63 64
        self._scheduler.debug(
            "decision process received NOP",
            type="nop_received")
65
        self._scheduler.on_nop()
66
        self._scheduler._do_schedule()
67 68

    def onJobsKilled(self, jobs):
69
        self._scheduler._update_time()
S. Lackner's avatar
S. Lackner committed
70
        self._scheduler.debug(
71 72 73
            "decision process received jobs kills({jobs})",
            jobs=jobs,
            type="jobs_killed_received2")
74
        jobobjs = []
75
        for job in jobs:
76 77 78
            jobobj = self._jobmap[job.id]
            del self._jobmap[job.id]
            jobobjs.append(job)
S. Lackner's avatar
S. Lackner committed
79

80 81
        self._scheduler.info("The following jobs were killed: ({jobs})",
                             jobs=jobobjs, type="jobs_killed_received")
S. Lackner's avatar
S. Lackner committed
82 83

        for job in jobobjs:
S. Lackner's avatar
S. Lackner committed
84
            job._do_complete_job()
S. Lackner's avatar
S. Lackner committed
85

86
        self._scheduler.on_jobs_killed(jobobjs)
87
        self._scheduler._do_schedule()
88 89

    def onJobSubmission(self, job):
90
        self._scheduler._update_time()
S. Lackner's avatar
S. Lackner committed
91
        self._scheduler.debug(
92 93 94
            "decision process received job submission({job})",
            job=job,
            type="job_submission_received2")
S. Lackner's avatar
S. Lackner committed
95 96 97 98
        newjob = Job(
            batsim_job=job,
            scheduler=self._scheduler,
            jobs_list=self._scheduler.jobs)
99 100
        self._jobmap[job.id] = newjob

S. Lackner's avatar
S. Lackner committed
101
        self._scheduler.jobs.add(newjob)
102

103 104
        self._scheduler.info("Received job submission from Batsim ({job})",
                             job=newjob, type="job_submission_received")
S. Lackner's avatar
S. Lackner committed
105

106
        if newjob.is_dynamic_job:
107
            for job2 in self._scheduler.jobs.dynamic_submission_request:
S. Lackner's avatar
S. Lackner committed
108
                if job.id == job2.id:
109
                    newjob.move_properties_from(job2)
110 111
                    self._scheduler.jobs.remove(job2)
                    break
112
        self._scheduler.on_job_submission(newjob)
113
        self._scheduler._do_schedule()
114 115

    def onJobCompletion(self, job):
116
        self._scheduler._update_time()
S. Lackner's avatar
S. Lackner committed
117
        self._scheduler.debug(
118 119 120
            "decision process received job completion({job})",
            job=job,
            type="job_completion_received2")
121 122
        jobobj = self._jobmap[job.id]
        del self._jobmap[job.id]
S. Lackner's avatar
S. Lackner committed
123

124 125
        self._scheduler.info("Job has completed its execution ({job})",
                             job=jobobj, type="job_completion_received")
S. Lackner's avatar
S. Lackner committed
126

S. Lackner's avatar
S. Lackner committed
127
        jobobj._do_complete_job()
128 129

        self._scheduler.on_job_completion(jobobj)
130
        self._scheduler._do_schedule()
131

132
    def onJobMessage(self, timestamp, job, message):
133
        self._scheduler._update_time()
134 135 136 137
        self._scheduler.debug(
            "decision process received from job message({job} => {message})",
            job=job,
            message=message,
138
            type="job_message_received2")
139
        jobobj = self._jobmap[job.id]
140 141 142 143 144 145 146
        self._scheduler.info(
            "Got from job message({job} => {message})",
            job=jobobj,
            message=message,
            type="job_message_received")
        jobobj.messages.append(Message(timestamp, message))
        self._scheduler._do_schedule()
147

148
    def onMachinePStateChanged(self, nodeid, pstate):
149
        self._scheduler._update_time()
150
        resource = self._scheduler.resources[nodeid]
151 152 153 154 155
        self._scheduler.info(
            "Resource state was updated ({resource}) to {pstate}",
            resource=resource,
            pstate=pstate,
            type="pstate_change_received")
156 157

        resource.update_pstate_change(pstate)
158

159
        self._scheduler.on_machine_pstate_changed(resource, pstate)
160
        self._scheduler._do_schedule()
161 162

    def onReportEnergyConsumed(self, consumed_energy):
163
        self._scheduler._update_time()
164 165 166 167
        self._scheduler.info(
            "Received reply from Batsim (energy_consumed={energy_consumed})",
            energy_consumed=consumed_energy,
            type="reply_energy_received")
168

169 170 171
        reply = BatsimReply(consumed_energy=consumed_energy)
        self._scheduler.on_report_energy_consumed(reply)
        self._scheduler._do_schedule(reply)
172 173 174


class Scheduler(metaclass=ABCMeta):
175 176 177 178 179 180 181
    """The high-level scheduler which should be interited from by concrete scheduler
    implementations. All important Batsim functions are either available in the scheduler or used
    by the job/resource objects.

    :param options: the options given to the launcher.

    """
182

183
    class Event:
S. Lackner's avatar
S. Lackner committed
184 185 186 187 188 189 190 191 192 193 194 195
        """Class for storing data about events triggered by the scheduler.

        :param time: the simulation time when the event occurred

        :param level: the importance level of the event

        :param msg: the actual message of the event

        :param type: the type of the event (`str`)

        :param data: additional data attached to the event (`dict`)
        """
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211

        def __init__(self, time, level, msg, type, data):
            self.time = time
            self.level = level
            self.msg = msg
            self.type = type
            self.data = data

        def __str__(self):
            data = ";".join(
                ["{}={}".format(
                    str(k).replace(";", ","),
                    str(v).replace(";", ",")) for k, v in self.data.items()])
            return "{:.6f};{};{};{};{}".format(
                self.time, self.level, self.type, self.msg, data)

212 213 214 215
    def __init__(self, options):
        self._options = options

        self._init_logger()
216
        self._events = []
217

218
        # Use the basic Pybatsim scheduler to wrap the Batsim API
219 220 221
        self._scheduler = BaseBatsimScheduler(self, options)

        self._time = 0
222

223
        self._reply = None
224

S. Lackner's avatar
S. Lackner committed
225 226 227 228
        self._sched_delay = float(
            options.get(
                "sched_delay",
                None) or 0.00000000000001)
229

230
        self._jobs = Jobs()
231 232
        self._resources = Resources()

233
        self.debug("Scheduler initialised", type="scheduler_initialised")
234 235

    def _init_logger(self):
236
        debug = self.options.get("debug", False)
237 238 239 240 241 242 243 244 245 246
        if isinstance(debug, str):
            debug = debug.lower() in ["y", "yes", "true", "1"]

        self._logger = logging.getLogger(self.__class__.__name__)
        if debug:
            self._logger.setLevel(logging.DEBUG)
        else:
            self._logger.setLevel(logging.INFO)

        formatter = logging.Formatter(
S. Lackner's avatar
S. Lackner committed
247
            '[%(name)s::%(levelname)s] %(message)s')
248

S. Lackner's avatar
S. Lackner committed
249
        # Add the stream handler (stdout)
250 251 252 253 254
        handler = logging.StreamHandler()
        handler.setLevel(logging.INFO)
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)

255 256 257 258 259
        self._event_logger = logging.getLogger(
            self.__class__.__name__ + "Events")
        self._event_logger.setLevel(logging.DEBUG)
        formatter = logging.Formatter('%(message)s')

260 261
        export_prefix = self.options.get("export_prefix", "out")

S. Lackner's avatar
S. Lackner committed
262
        # Add the persistent event logging handler (not purged between runs)
263
        handler = logging.FileHandler(
264
            "{}_events.csv".format(export_prefix))
265
        handler.setLevel(logging.DEBUG)
266 267 268
        handler.setFormatter(formatter)
        self._event_logger.addHandler(handler)

S. Lackner's avatar
S. Lackner committed
269 270
        # Add the event logging handler for the last run (purged when the next
        # run starts)
271 272
        filename_lastschedule = "{}_last_events.csv".format(
            export_prefix)
273 274 275 276 277 278
        try:
            os.remove(filename_lastschedule)
        except OSError:
            pass
        handler = logging.FileHandler(filename_lastschedule)
        handler.setLevel(logging.DEBUG)
279 280 281
        handler.setFormatter(formatter)
        self._event_logger.addHandler(handler)

282 283
    @property
    def events(self):
S. Lackner's avatar
S. Lackner committed
284
        """The events happened in the scheduler (`tuple`)"""
285 286
        return tuple(self._events)

287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303
    @property
    def hpst(self):
        """The hpst (high-performance storage tier) host managed by Batsim."""
        return self._hpst

    @property
    def lcst(self):
        """The lcst (large-capacity storage tier) host managed by Batsim."""
        return self._lcst

    @property
    def pfs(self):
        """The pfs (parallel file system) host managed by Batsim. This is an alias
        to the host of the large-capacity storage tier.
        """
        return self.lcst

304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
    @property
    def options(self):
        """The options given to the launcher."""
        return self._options

    @property
    def resources(self):
        """The searchable collection of resources."""
        return self._resources

    @property
    def jobs(self):
        """The searchable collection of jobs."""
        return self._jobs

    @property
    def reply(self):
        """The last reply from Batsim (or None)."""
        return self._reply

    @property
    def time(self):
        """The current simulation time."""
        return self._time

329 330
    @property
    def has_time_sharing(self):
S. Lackner's avatar
S. Lackner committed
331
        """Whether or not time sharing is enabled."""
332 333
        return self._batsim.time_sharing

334
    def run_scheduler_at(self, time):
335
        """Wake the scheduler at the given point in time (of the simulation)."""
336 337 338
        self._batsim.wake_me_up_at(time)

    def request_consumed_energy(self):
339
        """Request the consumed energy from Batsim."""
340 341
        self._batsim.request_consumed_energy()

342
    def __call__(self):
343
        """Return the underlying Pybatsim scheduler."""
344 345
        return self._scheduler

346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
    def _format_log_msg(self, msg, **kwargs):
        msg = msg.format(**kwargs)
        return "{:.6f} | {}".format(self.time, msg)

    def _format_event_msg(self, level, msg, type="msg", **kwargs):
        msg = msg.format(**kwargs)

        event = Scheduler.Event(self.time, level, msg, type, kwargs)

        self._events.append(event)
        self.on_event(event)

        return str(event)

    def debug(self, msg, **kwargs):
        """Writes a debug message to the logging facility."""
        self._logger.debug(self._format_log_msg(msg, **kwargs))
        self._event_logger.info(self._format_event_msg(1, msg, **kwargs))

    def info(self, msg, **kwargs):
        """Writes a info message to the logging facility."""
        self._logger.info(self._format_log_msg(msg, **kwargs))
        self._event_logger.info(self._format_event_msg(2, msg, **kwargs))

    def warn(self, msg, **kwargs):
        """Writes a warn message to the logging facility."""
        self._logger.warn(self._format_log_msg(msg, **kwargs))
        self._event_logger.info(self._format_event_msg(3, msg, **kwargs))

    def error(self, msg, **kwargs):
        """Writes a error message to the logging facility."""
        self._logger.error(self._format_log_msg(msg, **kwargs))
        self._event_logger.info(self._format_event_msg(4, msg, **kwargs))
379

S. Lackner's avatar
S. Lackner committed
380 381 382 383 384 385 386
    def fatal(self, msg, **kwargs):
        """Writes a fatal message to the logging facility and terminates the scheduler."""
        error_msg = self._format_log_msg(msg, **kwargs)
        self._logger.error(error_msg)
        self._event_logger.info(self._format_event_msg(5, msg, **kwargs))
        raise ValueError("Fatal error: {}".format(error_msg))

387
    def _on_pre_init(self):
388 389 390 391
        """The _pre_init method called during the start-up phase of the scheduler.
        If the _pre_init method is overridden the super method should be called with:
        `super()._pre_init()`
        """
392
        for r in self._batsim.resources:
S. Lackner's avatar
S. Lackner committed
393 394 395 396 397 398
            self._resources.add(Resource(self,
                                         r["id"],
                                         r["name"],
                                         r["state"],
                                         r["properties"],
                                         self.resources))
399 400 401 402 403
        self.info(
            "{num_resources} resources registered",
            num_resources=len(
                self.resources),
            type="resources_registered")
404

405
        self._hpst = DictWrapper(self._batsim.hpst)
S. Lackner's avatar
S. Lackner committed
406
        self._lcst = DictWrapper(self._batsim.lcst)
407

408
    def on_init(self):
409
        """The init method called during the start-up phase of the scheduler."""
410 411
        pass

412
    def _on_post_init(self):
413 414 415 416
        """The _post_init method called during the start-up phase of the scheduler.
        If the _post_init method is overridden the super method should be called with:
        `super()._post_init()`
        """
417 418 419
        pass

    def _pre_schedule(self):
420 421 422 423
        """The _pre_schedule method called during the scheduling phase of the scheduler.
        If the _pre_schedule method is overridden the super method should be called with:
        `super()._pre_schedule()`
        """
424 425 426
        self.debug(
            "Starting scheduling iteration",
            type="scheduling_iteration_started")
427 428 429

    @abstractmethod
    def schedule(self):
430
        """The schedule method called during the scheduling phase of the scheduler."""
431 432 433
        pass

    def _post_schedule(self):
434 435 436 437
        """The _post_schedule method called during the scheduling phase of the scheduler.
        If the _post_schedule method is overridden the super method should be called with:
        `super()._post_schedule()`
        """
438
        if self.jobs.open:
439
            self.debug(
440 441 442 443
                "{num_jobs} jobs open at end of scheduling iteration",
                num_jobs=len(
                    self.jobs.open),
                type="jobs_open_at_end")
444

445 446 447
        self.debug(
            "Ending scheduling iteration",
            type="scheduling_iteration_ended")
448

449 450 451
    def _update_time(self):
        self._time = self._batsim.time()

452
    def _do_schedule(self, reply=None):
S. Lackner's avatar
S. Lackner committed
453 454 455 456
        """Internal method to execute a scheduling iteration.

        :param reply: the reply set by Batsim (most of the time there is no reply object)
        """
457 458 459 460 461
        self._reply = reply
        self._pre_schedule()
        self.schedule()
        self._post_schedule()

S. Lackner's avatar
S. Lackner committed
462 463
        # Fast forward the time after the iteration. The time can be set through
        # a scheduler starting option.
464 465
        self._batsim.consume_time(self._sched_delay)

466
    def _on_pre_end(self):
467 468 469 470
        """The _pre_end method called during the shut-down phase of the scheduler.
        If the _pre_end method is overridden the super method should be called with:
        `super()._pre_end()`
        """
471
        if self.jobs.open:
S. Lackner's avatar
S. Lackner committed
472
            self.warn(
473 474 475 476
                "{num_jobs} jobs still in state open at end of simulation",
                num_jobs=len(
                    self.jobs.open),
                type="open_jobs_warning")
477

478
    def on_end(self):
479
        """The end method called during the shut-down phase of the scheduler."""
480 481
        pass

482
    def _on_post_end(self):
483 484 485 486
        """The _post_end method called during the shut-down phase of the scheduler.
        If the _post_end method is overridden the super method should be called with:
        `super()._post_end()`
        """
487
        pass
488 489

    def on_nop(self):
S. Lackner's avatar
S. Lackner committed
490
        """Hook similar to the low-level API."""
491 492 493
        pass

    def on_jobs_killed(self, jobs):
S. Lackner's avatar
S. Lackner committed
494 495 496 497
        """Hook similar to the low-level API.

        :param jobs: the killed jobs (higher-level job objects)
        """
498 499 500
        pass

    def on_job_submission(self, job):
S. Lackner's avatar
S. Lackner committed
501 502 503 504
        """Hook similar to the low-level API.

        :param job: the submitted job (higher-level job object)
        """
505 506 507
        pass

    def on_job_completion(self, job):
S. Lackner's avatar
S. Lackner committed
508 509 510 511
        """Hook similar to the low-level API.

        :param job: the completed job (higher-level job object)
        """
512 513
        pass

514
    def on_machine_pstate_changed(self, resource, pstate):
S. Lackner's avatar
S. Lackner committed
515 516 517 518 519 520
        """Hook similar to the low-level API.

        :param resource: the changed resource (higher-level job object)

        :param pstate: the new pstate
        """
521 522 523
        pass

    def on_report_energy_consumed(self, consumed_energy):
S. Lackner's avatar
S. Lackner committed
524 525
        """Hook similar to the low-level API.

526
        :param consumed_energy: the consumed energy (higher-level reply object)
S. Lackner's avatar
S. Lackner committed
527
        """
528
        pass
529

530
    def on_event(self, event):
S. Lackner's avatar
S. Lackner committed
531 532 533 534
        """Hook called on each event triggered by the scheduler.

        :param event: the triggered event (class: `Scheduler.Event`)
        """
535 536
        pass

537

538
def as_scheduler(*args, on_init=[], on_end=[], base_classes=[], **kwargs):
S. Lackner's avatar
S. Lackner committed
539 540 541 542 543 544 545 546 547 548 549 550
    """Decorator to convert a function to a scheduler class.

    The function should accept the scheduler as first argument and optionally
    *args and **kwargs arguments which will be given from additional arguments
    to the call of the decorator.

    :param args: additional arguments passed to the scheduler function (in each iteration)

    :param base_class: the class to use as a base class for the scheduler (must be a subclass of Scheduler)

    :param kwargs: additional arguments passed to the scheduler function (in each iteration)
    """
551 552
    base_classes = base_classes[:]
    base_classes.append(Scheduler)
S. Lackner's avatar
S. Lackner committed
553

554
    def convert_to_scheduler(schedule_function):
555 556 557 558 559 560 561 562 563
        class InheritedScheduler(*base_classes):
            def __init__(self, *init_args, **init_kwargs):
                super().__init__(*init_args, **init_kwargs)

            def _on_pre_init(self):
                super()._on_pre_init()
                for i in on_init:
                    i(self)

564 565
            def schedule(self):
                schedule_function(self, *args, **kwargs)
566 567 568 569 570 571

            def _on_pre_end(self):
                super()._on_pre_end()
                for e in on_end:
                    e(self)

572 573 574 575
        InheritedScheduler.__name__ = schedule_function.__name__

        return InheritedScheduler
    return convert_to_scheduler