batsim.py 26.7 KB
Newer Older
1
# from __future__ import print_function
2

3
from enum import Enum
MERCIER Michael's avatar
MERCIER Michael committed
4
from copy import deepcopy
5

6
import json
7
import sys
Millian Poquet's avatar
Millian Poquet committed
8

9
from .network import NetworkHandler
Millian Poquet's avatar
Millian Poquet committed
10

11
from procset import ProcSet
12
import redis
13
import zmq
14
import logging
15 16


17

18 19
class Batsim(object):

20
    WORKLOAD_JOB_SEPARATOR = "!"
21
    ATTEMPT_JOB_SEPARATOR = "#"
22
    WORKLOAD_JOB_SEPARATOR_REPLACEMENT = "%"
S. Lackner's avatar
S. Lackner committed
23

Millian Poquet's avatar
Millian Poquet committed
24
    def __init__(self, scheduler,
25
                 network_handler=None,
26
                 event_handler=None,
27 28 29 30 31 32 33 34
                 validatingmachine=None):


        FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        logging.basicConfig(format=FORMAT)
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.DEBUG)

35
        self.running_simulation = False
36
        if network_handler is None:
37 38 39 40
            network_handler = NetworkHandler('tcp://*:28000')
        if event_handler is None:
            event_handler = NetworkHandler(
                'tcp://127.0.0.1:28001', type=zmq.PUB)
41
        self.network = network_handler
42
        self.event_publisher = event_handler
MERCIER Michael's avatar
MERCIER Michael committed
43

44 45
        self.jobs = dict()

46
        sys.setrecursionlimit(10000)
47

48 49 50 51
        if validatingmachine is None:
            self.scheduler = scheduler
        else:
            self.scheduler = validatingmachine(scheduler)
52

53
        # initialize some public attributes
54 55
        self.nb_jobs_submitted_from_batsim = 0
        self.nb_jobs_submitted_from_scheduler = 0
56
        self.nb_jobs_submitted = 0
S. Lackner's avatar
S. Lackner committed
57
        self.nb_jobs_killed = 0
S. Lackner's avatar
S. Lackner committed
58
        self.nb_jobs_rejected = 0
59
        self.nb_jobs_scheduled = 0
60
        self.nb_jobs_in_submission = 0
61
        self.nb_jobs_completed = 0
MERCIER Michael's avatar
MERCIER Michael committed
62
        self.nb_jobs_successful = 0
63
        self.nb_jobs_failed = 0
64
        self.nb_jobs_timeout = 0
S. Lackner's avatar
S. Lackner committed
65

66 67
        self.jobs_manually_changed = set()

68 69
        self.no_more_static_jobs = False

70 71
        self.network.bind()
        self.event_publisher.bind()
72

73
        self.scheduler.bs = self
Millian Poquet's avatar
Millian Poquet committed
74
        # import pdb; pdb.set_trace()
75 76 77 78
        # Wait the "simulation starts" message to read the number of machines
        self._read_bat_msg()

        self.scheduler.onAfterBatsimInit()
79

80 81 82 83 84 85
    def publish_event(self, event):
        """Sends a message to subscribed event listeners (e.g. external processes which want to
        observe the simulation).
        """
        self.event_publisher.send_string(event)

86 87 88 89
    def time(self):
        return self._current_time

    def consume_time(self, t):
90
        self._current_time += float(t)
91 92
        return self._current_time

93
    def wake_me_up_at(self, time):
Millian Poquet's avatar
Millian Poquet committed
94 95 96 97
        self._events_to_send.append(
            {"timestamp": self.time(),
             "type": "CALL_ME_LATER",
             "data": {"timestamp": time}})
98

99
    def notify_registration_finished(self):
100 101 102 103
        self._events_to_send.append({
            "timestamp": self.time(),
            "type": "NOTIFY",
            "data": {
104
                    "type": "registration_finished",
105 106 107
            }
        })

108
    def notify_registration_continue(self):
109 110 111 112
        self._events_to_send.append({
            "timestamp": self.time(),
            "type": "NOTIFY",
            "data": {
113
                    "type": "continue_registration",
114 115 116
            }
        })

117 118 119 120 121 122 123 124 125 126
    def send_message_to_job(self, job, message):
        self._events_to_send.append({
            "timestamp": self.time(),
            "type": "TO_JOB_MSG",
            "data": {
                    "job_id": job.id,
                    "msg": message,
            }
        })

127
    def start_jobs(self, jobs, res):
128
        """ args:res: is list of int (resources ids) """
129 130
        for job in jobs:
            self._events_to_send.append({
Millian Poquet's avatar
Millian Poquet committed
131 132 133
                "timestamp": self.time(),
                "type": "EXECUTE_JOB",
                "data": {
134
                        "job_id": job.id,
135
                        "alloc": str(ProcSet(*res[job.id]))
136
                }
Millian Poquet's avatar
Millian Poquet committed
137
            }
138 139
            )
            self.nb_jobs_scheduled += 1
140

141
    def execute_jobs(self, jobs, io_jobs=None):
142
        """ args:jobs: list of jobs to execute
MOMMESSIN Clement's avatar
MOMMESSIN Clement committed
143
            job.allocation MUST be not None and should be a non-empty ProcSet"""
MERCIER Michael's avatar
MERCIER Michael committed
144 145 146

        for job in jobs:
            assert job.allocation is not None
147
            message = {
MERCIER Michael's avatar
MERCIER Michael committed
148 149 150 151 152
                "timestamp": self.time(),
                "type": "EXECUTE_JOB",
                "data": {
                        "job_id": job.id,
                        "alloc": str(job.allocation)
153
                }
Millian Poquet's avatar
Millian Poquet committed
154
            }
155 156 157 158
            if io_jobs is not None and job.id in io_jobs:
                message["data"]["additional_io_job"] = io_jobs[job.id]

            self._events_to_send.append(message)
159
            self.nb_jobs_scheduled += 1
160

MERCIER Michael's avatar
MERCIER Michael committed
161

S. Lackner's avatar
S. Lackner committed
162 163
    def reject_jobs(self, jobs):
        """Reject the given jobs."""
MERCIER Michael's avatar
MERCIER Michael committed
164
        assert len(jobs) > 0, "The list of jobs to reject is empty"
S. Lackner's avatar
S. Lackner committed
165 166 167 168 169 170 171 172 173 174
        for job in jobs:
            self._events_to_send.append({
                "timestamp": self.time(),
                "type": "REJECT_JOB",
                "data": {
                        "job_id": job.id,
                }
            })
            self.nb_jobs_rejected += 1

175
    def change_job_state(self, job, state):
176 177 178 179 180 181 182 183 184
        """Change the state of a job."""
        self._events_to_send.append({
            "timestamp": self.time(),
            "type": "CHANGE_JOB_STATE",
            "data": {
                    "job_id": job.id,
                    "job_state": state.name,
            }
        })
185
        self.jobs_manually_changed.add(job)
186

S. Lackner's avatar
S. Lackner committed
187 188
    def kill_jobs(self, jobs):
        """Kill the given jobs."""
MERCIER Michael's avatar
MERCIER Michael committed
189
        assert len(jobs) > 0, "The list of jobs to kill is empty"
MERCIER Michael's avatar
MERCIER Michael committed
190 191
        for job in jobs:
            job.job_state = Job.State.IN_KILLING
S. Lackner's avatar
S. Lackner committed
192 193 194 195
        self._events_to_send.append({
            "timestamp": self.time(),
            "type": "KILL_JOB",
            "data": {
S. Lackner's avatar
S. Lackner committed
196
                    "job_ids": [job.id for job in jobs],
S. Lackner's avatar
S. Lackner committed
197 198 199
            }
        })

200
    def register_profiles(self, workload_name, profiles):
201 202 203
        for profile_name, profile in profiles.items():
            msg = {
                "timestamp": self.time(),
204
                "type": "REGISTER_PROFILE",
205 206 207 208 209 210 211
                "data": {
                    "workload_name": workload_name,
                    "profile_name": profile_name,
                    "profile": profile,
                }
            }
            self._events_to_send.append(msg)
212 213
            if not workload_name in self.profiles:
                self.profiles[workload_name] = {}
214
                self.logger.debug("A new dynamic workload of name '{}' has been created".format(workload_name))
215
            self.logger.debug("Registering profile: {}".format(msg["data"]))
216
            self.profiles[workload_name][profile_name] = profile
S. Lackner's avatar
S. Lackner committed
217

218
    def register_job(
219 220 221 222 223
            self,
            id,
            res,
            walltime,
            profile_name,
224
            subtime=None):
225 226 227

        if subtime is None:
            subtime = self.time()
MERCIER Michael's avatar
MERCIER Michael committed
228 229
        job_dict = {
            "profile": profile_name,
230
            "id": id,
MERCIER Michael's avatar
MERCIER Michael committed
231 232 233 234
            "res": res,
            "walltime": walltime,
            "subtime": subtime,
        }
235 236
        msg = {
            "timestamp": self.time(),
237
            "type": "REGISTER_JOB",
238
            "data": {
239
                "job_id": id,
MERCIER Michael's avatar
MERCIER Michael committed
240
                "job": job_dict,
241 242 243
            }
        }
        self._events_to_send.append(msg)
244 245 246
        self.jobs[id] = Job.from_json_dict(job_dict)
        self.jobs[id].job_state = Job.State.IN_SUBMISSON
        self.nb_jobs_in_submission = self.nb_jobs_in_submission + 1
MERCIER Michael's avatar
MERCIER Michael committed
247

248
    def set_resource_state(self, resources, state):
249
        """ args:resources: is a list of resource numbers or intervals as strings (e.g., "1-5").
250 251 252 253 254 255 256
            args:state: is a state identifier configured in the platform specification.
        """

        self._events_to_send.append({
            "timestamp": self.time(),
            "type": "SET_RESOURCE_STATE",
            "data": {
S. Lackner's avatar
S. Lackner committed
257 258
                    "resources": " ".join([str(r) for r in resources]),
                    "state": str(state)
259 260 261
            }
        })

262
    def get_job_and_profile(self, event):
MERCIER Michael's avatar
MERCIER Michael committed
263
        if self.redis_enabled:
264 265
            return self.redis.get_job_and_profile(event["data"]["job_id"])

MERCIER Michael's avatar
MERCIER Michael committed
266
        else:
Millian Poquet's avatar
Millian Poquet committed
267
            json_dict = event["data"]["job"]
268 269 270 271 272 273 274 275
            job = Job.from_json_dict(json_dict)

            if "profile" in event["data"]:
                profile = event["data"]["profile"]
            else:
                profile = {}

        return job, profile
MERCIER Michael's avatar
MERCIER Michael committed
276

277

278
    def request_consumed_energy(self): #TODO CHANGE NAME 
279 280
        self._events_to_send.append(
            {
281
                "timestamp": self.time(),
MERCIER Michael's avatar
MERCIER Michael committed
282
                "type": "QUERY",
283 284 285 286 287
                "data": {
                    "requests": {"consumed_energy": {}}
                }
            }
        )
288

289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
    def request_air_temperature_all(self):
        self._events_to_send.append(
            {
                "timestamp": self.time(),
                "type": "QUERY",
                "data": {
                    "requests": {"air_temperature_all": {}}
                }
            }
        )

    def request_processor_temperature_all(self):
        self._events_to_send.append(
            {
                "timestamp": self.time(),
                "type": "QUERY",
                "data": {
                    "requests": {"processor_temperature_all": {}}
                }
            }
        )

311

MERCIER Michael's avatar
MERCIER Michael committed
312 313 314 315 316 317 318 319 320 321 322
    def notify_resources_added(self, resources):
        self._events_to_send.append(
            {
                "timestamp": self.time(),
                "type": "RESOURCES_ADDED",
                "data": {
                    "resources": resources
                }
            }
        )

MERCIER Michael's avatar
MERCIER Michael committed
323 324 325 326 327 328 329 330 331 332 333
    def notify_resources_removed(self, resources):
        self._events_to_send.append(
            {
                "timestamp": self.time(),
                "type": "RESOURCES_REMOVED",
                "data": {
                    "resources": resources
                }
            }
        )

334
    def set_job_metadata(self, job_id, metadata):
335
        # Consume some time to be sure that the job was created before the
336
        # metadata is set
MERCIER Michael's avatar
MERCIER Michael committed
337

338 339 340 341 342 343 344 345 346 347
        self._events_to_send.append(
            {
                "timestamp": self.time(),
                "type": "SET_JOB_METADATA",
                "data": {
                    "job_id": str(job_id),
                    "metadata": str(metadata)
                }
            }
        )
348 349
        self.jobs[job_id].metadata = metadata

350

351
    def resubmit_job(self, job):
MERCIER Michael's avatar
MERCIER Michael committed
352 353 354
        """
        The given job is resubmited but in a dynamic workload. The name of this
        workload is "resubmit=N" where N is the number of resubmission.
355 356
        The job metadata is filled with a dict that contains the original job
        full id in "parent_job" and the number of resubmissions in "nb_resubmit".
MERCIER Michael's avatar
MERCIER Michael committed
357 358 359 360 361 362 363 364 365 366
        """

        if job.metadata is None:
            metadata = {"parent_job" : job.id, "nb_resubmit": 1}
        else:
            metadata = deepcopy(job.metadata)
            if "parent_job" not in metadata:
                metadata["parent_job"] = job.id
            metadata["nb_resubmit"] = metadata["nb_resubmit"] + 1

367
        # Keep the current workload and add a resubmit number
368
        splitted_id = job.id.split(Batsim.ATTEMPT_JOB_SEPARATOR)
369 370
        if len(splitted_id) == 1:
            new_job_name = deepcopy(job.id)
371 372 373
        else:
            # This job as already an attempt number
            new_job_name = splitted_id[0]
374
            assert splitted_id[1] == str(metadata["nb_resubmit"] - 1)
375
        new_job_name =  new_job_name + Batsim.ATTEMPT_JOB_SEPARATOR + str(metadata["nb_resubmit"])
376
        # log in job metadata parent job and nb resubmit
377

378
        self.register_job(
379
                new_job_name,
380 381
                job.requested_resources,
                job.requested_time,
382
                job.profile)
383

384
        self.set_job_metadata(new_job_name, metadata)
385

386
    def do_next_event(self):
387
        return self._read_bat_msg()
388 389

    def start(self):
390 391 392
        cont = True
        while cont:
            cont = self.do_next_event()
393 394

    def _read_bat_msg(self):
395 396
        msg = None
        while msg is None:
397
            msg = self.network.recv(blocking=not self.running_simulation)
398 399
            if msg is None:
                self.scheduler.onDeadlock()
400
                continue
401
        self.logger.info("Message Received from Batsim: {}".format(msg))
402

MERCIER Michael's avatar
MERCIER Michael committed
403
        self._current_time = msg["now"]
404

405 406
        if "air_temperatures" in msg:
            self.air_temperatures = msg["air_temperatures"]
407

408
        self._events_to_send = []
409

410 411
        finished_received = False

412 413
        self.scheduler.onBeforeEvents()

MERCIER Michael's avatar
MERCIER Michael committed
414 415
        for event in msg["events"]:
            event_type = event["type"]
MERCIER Michael's avatar
MERCIER Michael committed
416
            event_data = event.get("data", {})
MERCIER Michael's avatar
MERCIER Michael committed
417
            if event_type == "SIMULATION_BEGINS":
418
                assert not self.running_simulation, "A simulation is already running (is more than one instance of Batsim active?!)"
419
                self.running_simulation = True
420
                self.nb_resources = event_data["nb_resources"]
421 422 423 424 425
                self.nb_compute_resources = event_data["nb_compute_resources"]
                self.nb_storage_resources = event_data["nb_storage_resources"]
                compute_resources = event_data["compute_resources"]
                storage_resources = event_data["storage_resources"]
                self.machines = {"compute": compute_resources, "storage": storage_resources}
MERCIER Michael's avatar
MERCIER Michael committed
426
                self.batconf = event_data["config"]
427 428 429 430 431 432 433 434 435 436 437 438 439
                self.time_sharing_on_compute = event_data["allow_time_sharing_on_compute"]
                self.time_sharing_on_storage = event_data["allow_time_sharing_on_storage"]
                self.profiles_forwarded_on_submission = self.batconf["profiles-forwarded-on-submission"]
                self.dynamic_job_registration_enabled = self.batconf["dynamic-jobs-enabled"]
                self.ack_of_dynamic_jobs = self.batconf["dynamic-jobs-acknowledged"]

                if self.dynamic_job_registration_enabled:
                    self.logger.warning("Dynamic registration of jobs is ENABLED. The scheduler must send a NOTIFY event of type 'registration_finished' to let Batsim end the simulation.")

                self.redis_enabled = self.batconf["redis-enabled"]
                redis_hostname = self.batconf["redis-hostname"]
                redis_port = self.batconf["redis-port"]
                redis_prefix = self.batconf["redis-prefix"]
Millian Poquet's avatar
Millian Poquet committed
440 441 442 443 444

                if self.redis_enabled:
                    self.redis = DataStorage(redis_prefix, redis_hostname,
                                             redis_port)

445 446 447 448 449
                # Retro compatibility for old Batsim API > 1.0 < 3.0
                if "resources_data" in event_data:
                    res_key = "resources_data"
                else:
                    res_key = "compute_resources"
450
                self.resources = {
451
                        res["id"]: res for res in event_data[res_key]}
452
                self.storage_resources = {
453 454 455 456 457
                        res["id"]: res for res in event_data["storage_resources"]}

                self.profiles = event_data["profiles"]

                self.workloads = event_data["workloads"]
458

459 460
                self.scheduler.onSimulationBegins()

MERCIER Michael's avatar
MERCIER Michael committed
461
            elif event_type == "SIMULATION_ENDS":
462
                assert self.running_simulation, "No simulation is currently running"
463
                self.running_simulation = False
464
                self.logger.info("All jobs have been submitted and completed!")
465
                finished_received = True
466
                self.scheduler.onSimulationEnds()
MERCIER Michael's avatar
MERCIER Michael committed
467
            elif event_type == "JOB_SUBMITTED":
468
                # Received WORKLOAD_NAME!JOB_ID
MERCIER Michael's avatar
MERCIER Michael committed
469
                job_id = event_data["job_id"]
470
                job, profile = self.get_job_and_profile(event)
471
                job.job_state = Job.State.SUBMITTED
472 473 474
                self.nb_jobs_submitted += 1

                # Store profile if not already present
475 476 477 478 479
                if profile is not None:
                    if job.workload not in self.profiles:
                        self.profiles[job.workload] = {}
                    if job.profile not in self.profiles[job.workload]:
                        self.profiles[job.workload][job.profile] = profile
480 481

                # Keep a pointer in the job structure
482
                assert job.profile in self.profiles[job.workload]
483
                job.profile_dict = self.profiles[job.workload][job.profile]
MERCIER Michael's avatar
MERCIER Michael committed
484

485
                # Warning: override dynamic job but keep metadata
486 487 488 489 490 491 492 493
                if job_id in self.jobs:
                    self.logger.warn(
                        "The job '{}' was alredy in the job list. "
                        "Probaly a dynamic job that was submitted "
                        "before: \nOld job: {}\nNew job: {}".format(
                            job_id,
                            self.jobs[job_id],
                            job))
494 495 496
                    if self.jobs[job_id].job_state == Job.State.IN_SUBMISSON:
                        self.nb_jobs_in_submission = self.nb_jobs_in_submission - 1
                    # Keeping metadata and profile
497
                    job.metadata = self.jobs[job_id].metadata
498 499 500 501
                    self.nb_jobs_submitted_from_scheduler += 1
                else:
                    # This was submitted from batsim
                    self.nb_jobs_submitted_from_batsim += 1
502
                self.jobs[job_id] = job
MERCIER Michael's avatar
MERCIER Michael committed
503

504
                self.scheduler.onJobSubmission(job)
S. Lackner's avatar
S. Lackner committed
505
            elif event_type == "JOB_KILLED":
506 507 508 509
                # get progress
                killed_jobs = []
                for jid in event_data["job_ids"]:
                    j = self.jobs[jid]
510 511 512 513 514 515 516 517 518
                    # The job_progress can only be empty if the has completed
                    # between the order of killing and the killing itself.
                    # So in that case just dont put it in the killed jobs
                    # because it was already mark as complete.
                    if len(event_data["job_progress"]) != 0:
                        j.progress = event_data["job_progress"][jid]
                        killed_jobs.append(j)
                if len(killed_jobs) != 0:
                    self.scheduler.onJobsKilled(killed_jobs)
MERCIER Michael's avatar
MERCIER Michael committed
519
            elif event_type == "JOB_COMPLETED":
MERCIER Michael's avatar
MERCIER Michael committed
520
                job_id = event_data["job_id"]
521
                j = self.jobs[job_id]
MERCIER Michael's avatar
MERCIER Michael committed
522
                j.finish_time = event["timestamp"]
523 524 525 526 527

                try:
                    j.job_state = Job.State[event["data"]["job_state"]]
                except KeyError:
                    j.job_state = Job.State.UNKNOWN
528
                j.return_code = event["data"]["return_code"]
529

530
                self.scheduler.onJobCompletion(j)
MERCIER Michael's avatar
MERCIER Michael committed
531
                if j.job_state == Job.State.COMPLETED_WALLTIME_REACHED:
532
                    self.nb_jobs_timeout += 1
MERCIER Michael's avatar
MERCIER Michael committed
533
                elif j.job_state == Job.State.COMPLETED_FAILED:
534
                    self.nb_jobs_failed += 1
MERCIER Michael's avatar
MERCIER Michael committed
535 536 537 538
                elif j.job_state == Job.State.COMPLETED_SUCCESSFULLY:
                    self.nb_jobs_successful += 1
                elif j.job_state == Job.State.COMPLETED_KILLED:
                    self.nb_jobs_killed += 1
539
                self.nb_jobs_completed += 1
540 541 542
            elif event_type == "FROM_JOB_MSG":
                job_id = event_data["job_id"]
                j = self.jobs[job_id]
543
                timestamp = event["timestamp"]
544
                msg = event_data["msg"]
545
                self.scheduler.onJobMessage(timestamp, j, msg)
MERCIER Michael's avatar
MERCIER Michael committed
546
            elif event_type == "RESOURCE_STATE_CHANGED":
S. Lackner's avatar
S. Lackner committed
547 548 549 550 551 552 553 554 555 556 557
                intervals = event_data["resources"].split(" ")
                for interval in intervals:
                    nodes = interval.split("-")
                    if len(nodes) == 1:
                        nodeInterval = (int(nodes[0]), int(nodes[0]))
                    elif len(nodes) == 2:
                        nodeInterval = (int(nodes[0]), int(nodes[1]))
                    else:
                        raise Exception("Multiple intervals are not supported")
                    self.scheduler.onMachinePStateChanged(
                        nodeInterval, event_data["state"])
558 559 560 561 562 563 564 565 566 567
            elif event_type == "ANSWER":
                if "consumed_energy" in event_data:
                    consumed_energy = event_data["consumed_energy"]
                    self.scheduler.onReportEnergyConsumed(consumed_energy)
                elif "processor_temperature_all" in event_data:
                    proc_temperature_all = event_data["processor_temperature_all"]
                    self.scheduler.onAnswerProcessorTemperatureAll(proc_temperature_all)
                elif "air_temperature_all" in event_data:
                    air_temperature_all = event_data["air_temperature_all"]
                    self.scheduler.onAnswerAirTemperatureAll(air_temperature_all)
568
            elif event_type == 'REQUESTED_CALL':
569
                self.scheduler.onRequestedCall()
570 571 572 573
            elif event_type == 'ADD_RESOURCES':
                self.scheduler.onAddResources(event_data["resources"])
            elif event_type == 'REMOVE_RESOURCES':
                self.scheduler.onRemoveResources(event_data["resources"])
574 575 576 577
            elif event_type == "NOTIFY":
                notify_type = event_data["type"]
                if notify_type == "no_more_static_job_to_submit":
                    self.scheduler.onNoMoreJobsInWorkloads()
578
            else:
579
                raise Exception("Unknown event type {}".format(event_type))
580

581
        self.scheduler.onNoMoreEvents()
582

583
        if len(self._events_to_send) > 0:
584
            # sort msgs by timestamp
MERCIER Michael's avatar
MERCIER Michael committed
585
            self._events_to_send = sorted(
MERCIER Michael's avatar
MERCIER Michael committed
586
                self._events_to_send, key=lambda event: event['timestamp'])
587

MERCIER Michael's avatar
MERCIER Michael committed
588
        new_msg = {
MERCIER Michael's avatar
MERCIER Michael committed
589
            "now": self._current_time,
590
            "events": self._events_to_send
MERCIER Michael's avatar
MERCIER Michael committed
591
        }
592
        self.network.send(new_msg)
593
        self.logger.info("Message Sent to Batsim: {}".format(new_msg))
594

595 596

        if finished_received:
597
            self.network.close()
598
            self.event_publisher.close()
599

600
        return not finished_received
601

602

603
class DataStorage(object):
604 605
    ''' High-level access to the Redis data storage system '''

606 607 608 609 610
    def __init__(self, prefix, hostname='localhost', port=6379):
        self.prefix = prefix
        self.redis = redis.StrictRedis(host=hostname, port=port)

    def get(self, key):
611 612
        real_key = '{iprefix}:{ukey}'.format(iprefix=self.prefix,
                                             ukey=key)
613
        value = self.redis.get(real_key)
614 615
        assert(value is not None), "Redis: No such key '{k}'".format(
            k=real_key)
616 617
        return value

618
    def get_job_and_profile(self, job_id):
619 620 621
        job_key = 'job_{job_id}'.format(job_id=job_id)
        job_str = self.get(job_key).decode('utf-8')
        job = Job.from_json_string(job_str)
Millian Poquet's avatar
Millian Poquet committed
622

623
        profile_key = 'profile_{workload_id}!{profile_id}'.format(
S. Lackner's avatar
S. Lackner committed
624 625
            workload_id=job_id.split(Batsim.WORKLOAD_JOB_SEPARATOR)[0],
            profile_id=job.profile)
626
        profile_str = self.get(profile_key).decode('utf-8')
627
        profile = json.loads(profile_str)
628

629
        return job, profile
630

631
    def set_job(self, job_id, subtime, walltime, res):
Millian Poquet's avatar
Millian Poquet committed
632 633 634 635
        real_key = '{iprefix}:{ukey}'.format(iprefix=self.prefix,
                                             ukey=job_id)
        json_job = json.dumps({"id": job_id, "subtime": subtime,
                               "walltime": walltime, "res": res})
636
        self.redis.set(real_key, json_job)
637 638 639


class Job(object):
640

641
    class State(Enum):
642
        UNKNOWN = -1
643
        IN_SUBMISSON = 0
644 645 646
        SUBMITTED = 1
        RUNNING = 2
        COMPLETED_SUCCESSFULLY = 3
647
        COMPLETED_FAILED = 4
648 649 650
        COMPLETED_WALLTIME_REACHED = 5
        COMPLETED_KILLED = 6
        REJECTED = 7
MERCIER Michael's avatar
MERCIER Michael committed
651
        IN_KILLING = 8
652

653 654 655 656 657 658 659
    def __init__(
            self,
            id,
            subtime,
            walltime,
            res,
            profile,
660
            json_dict):
661 662 663 664 665
        self.id = id
        self.submit_time = subtime
        self.requested_time = walltime
        self.requested_resources = res
        self.profile = profile
666
        self.finish_time = None  # will be set on completion by batsim
667
        self.job_state = Job.State.UNKNOWN
668
        self.return_code = None
669
        self.progress = None
670
        self.json_dict = json_dict
671
        self.profile_dict = None
672
        self.allocation = None
673
        self.metadata = None
674

675
    def __repr__(self):
676
        return(
677 678
            ("{{Job {0}; sub:{1} res:{2} reqtime:{3} prof:{4} "
                "state:{5} ret:{6} alloc:{7}, meta:{8}}}\n").format(
679
            self.id, self.submit_time, self.requested_resources,
MERCIER Michael's avatar
MERCIER Michael committed
680
            self.requested_time, self.profile,
681
            self.job_state,
682
            self.return_code, self.allocation, self.metadata))
Millian Poquet's avatar
Millian Poquet committed
683

684 685 686 687
    @property
    def workload(self):
        return self.id.split(Batsim.WORKLOAD_JOB_SEPARATOR)[0]

Millian Poquet's avatar
Millian Poquet committed
688 689 690 691 692 693
    @staticmethod
    def from_json_string(json_str):
        json_dict = json.loads(json_str)
        return Job.from_json_dict(json_dict)

    @staticmethod
694
    def from_json_dict(json_dict):
Millian Poquet's avatar
Millian Poquet committed
695 696
        return Job(json_dict["id"],
                   json_dict["subtime"],
697
                   json_dict.get("walltime", -1),
Millian Poquet's avatar
Millian Poquet committed
698
                   json_dict["res"],
699
                   json_dict["profile"],
700
                   json_dict)
701 702 703 704
    # def __eq__(self, other):
        # return self.id == other.id
    # def __ne__(self, other):
        # return not self.__eq__(other)
705 706 707


class BatsimScheduler(object):
708

709
    def __init__(self, options = {}):
710 711
        self.options = options

712 713 714 715 716
        FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        logging.basicConfig(format=FORMAT)
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.DEBUG)

717
    def onAfterBatsimInit(self):
718
        # You now have access to self.bs and all other functions
719
        pass
720

721 722 723 724 725 726
    def onSimulationBegins(self):
        pass

    def onSimulationEnds(self):
        pass

727
    def onDeadlock(self):
728 729
        raise ValueError(
            "[PYBATSIM]: Batsim is not responding (maybe deadlocked)")
730

731
    def onJobSubmission(self, job):
S. Lackner's avatar
S. Lackner committed
732
        raise NotImplementedError()
733

734
    def onJobCompletion(self, job):
S. Lackner's avatar
S. Lackner committed
735
        raise NotImplementedError()
736

737
    def onJobMessage(self, timestamp, job, message):
738 739
        raise NotImplementedError()

S. Lackner's avatar
S. Lackner committed
740
    def onJobsKilled(self, jobs):
MERCIER Michael's avatar
MERCIER Michael committed
741
        raise NotImplementedError()
S. Lackner's avatar
S. Lackner committed
742

743
    def onMachinePStateChanged(self, nodeid, pstate):
S. Lackner's avatar
S. Lackner committed
744
        raise NotImplementedError()
745

746
    def onReportEnergyConsumed(self, consumed_energy):
S. Lackner's avatar
S. Lackner committed
747
        raise NotImplementedError()
748 749 750 751 752

    def onAnswerProcessorTemperatureAll(self, proc_temperature_all):
        raise NotImplementedError()

    def onAnswerAirTemperatureAll(self, air_temperature_all):
753 754
        raise NotImplementedError()

755 756 757 758 759
    def onAddResources(self, to_add):
        raise NotImplementedError()

    def onRemoveResources(self, to_remove):
        raise NotImplementedError()
760

761 762 763
    def onRequestedCall(self):
        raise NotImplementedError()

764
    def onNoMoreJobsInWorkloads(self):
765
        self.bs.no_more_static_jobs = True
766
        self.logger.info("There is no more static jobs in the workoad")
767 768

    def onBeforeEvents(self):
769
        pass
770 771

    def onNoMoreEvents(self):
772
        pass