jobs_execution.cpp 26 KB
Newer Older
Millian Poquet's avatar
Millian Poquet committed
1 2 3 4
/**
 * @file jobs_execution.cpp
 * @brief Contains functions related to the execution of the jobs
 */
5
#include <regex>
Millian Poquet's avatar
Millian Poquet committed
6

7 8 9
#include "jobs_execution.hpp"
#include "jobs.hpp"

10 11
#include <simgrid/plugins/energy.h>

12 13 14
#include <simgrid/msg.h>
#include <smpi/smpi.h>

Millian Poquet's avatar
Millian Poquet committed
15
XBT_LOG_NEW_DEFAULT_CATEGORY(jobs_execution, "jobs_execution"); //!< Logging
16 17 18 19 20

using namespace std;

int smpi_replay_process(int argc, char *argv[])
{
21 22 23
    SMPIReplayProcessArguments * args = (SMPIReplayProcessArguments *) MSG_process_get_data(MSG_process_self());

    if (args->semaphore != NULL)
24
    {
25
        MSG_sem_acquire(args->semaphore);
26
    }
27

28
    XBT_INFO("Launching smpi_replay_run");
29
    smpi_replay_run(&argc, &argv);
30
    XBT_INFO("smpi_replay_run finished");
31 32

    if (args->semaphore != NULL)
33
    {
34
        MSG_sem_release(args->semaphore);
35
    }
36

37
    args->job->execution_processes.erase(MSG_process_self());
38
    delete args;
39 40 41 42
    return 0;
}

int execute_profile(BatsimContext *context,
Millian Poquet's avatar
Millian Poquet committed
43
                    const std::string & profile_name,
Millian Poquet's avatar
Millian Poquet committed
44
                    const SchedulingAllocation * allocation,
45
                    CleanExecuteProfileData * cleanup_data,
46
                    double * remaining_time)
47
{
48 49 50
    Workload * workload = context->workloads.at(allocation->job_id.workload_name);
    Job * job = workload->jobs->at(allocation->job_id.job_number);
    Profile * profile = workload->profiles->at(profile_name);
51 52
    int nb_res = job->required_nb_res;

53 54
    if (profile->type == ProfileType::MSG_PARALLEL ||
        profile->type == ProfileType::MSG_PARALLEL_HOMOGENEOUS ||
55 56
        profile->type == ProfileType::MSG_PARALLEL_HOMOGENEOUS_PFS_MULTIPLE_TIERS ||
        profile->type == ProfileType::MSG_DATA_STAGING)
57
    {
58 59 60 61
        double * computation_amount = nullptr;
        double * communication_amount = nullptr;
        string task_name_prefix;
        std::vector<msg_host_t> hosts_to_use = allocation->hosts;
62

63 64 65 66
        if (profile->type == ProfileType::MSG_PARALLEL)
        {
            task_name_prefix = "p ";
            MsgParallelProfileData * data = (MsgParallelProfileData *)profile->data;
67

68 69 70
            // These amounts are deallocated by SG
            computation_amount = xbt_new(double, nb_res);
            communication_amount = xbt_new(double, nb_res*nb_res);
71

72 73 74 75 76
            // Let us retrieve the matrices from the profile
            memcpy(computation_amount, data->cpu, sizeof(double) * nb_res);
            memcpy(communication_amount, data->com, sizeof(double) * nb_res * nb_res);
        }
        else if (profile->type == ProfileType::MSG_PARALLEL_HOMOGENEOUS)
77
        {
78 79 80 81 82 83 84 85 86 87
            task_name_prefix = "phg ";
            MsgParallelHomogeneousProfileData * data = (MsgParallelHomogeneousProfileData *)profile->data;

            double cpu = data->cpu;
            double com = data->com;

            // These amounts are deallocated by SG
            computation_amount = xbt_new(double, nb_res);
            communication_amount = nullptr;
            if(com > 0)
88
            {
89
                communication_amount = xbt_new(double, nb_res * nb_res);
90
            }
91 92 93 94

            // Let us fill the local computation and communication matrices
            int k = 0;
            for (int y = 0; y < nb_res; ++y)
Millian Poquet's avatar
Millian Poquet committed
95
            {
96 97
                computation_amount[y] = cpu;
                if(communication_amount != nullptr)
98
                {
99 100 101
                    for (int x = 0; x < nb_res; ++x)
                    {
                        if (x == y)
102
                        {
103
                            communication_amount[k++] = 0;
104
                        }
105
                        else
106
                        {
107
                            communication_amount[k++] = com;
108
                        }
109
                    }
110
                }
111 112
            }
        }
113
        else if (profile->type == ProfileType::MSG_PARALLEL_HOMOGENEOUS_PFS_MULTIPLE_TIERS)
114
        {
115 116
            task_name_prefix = "pfs_tiers ";
            MsgParallelHomogeneousPFSMultipleTiersProfileData * data = (MsgParallelHomogeneousPFSMultipleTiersProfileData *)profile->data;
117

118 119
            double cpu = 0;
            double size = data->size;
120

121 122 123
            // The PFS machine will also be used
            nb_res = nb_res + 1;
            int pfs_id = nb_res - 1;
124

125
            // Add the pfs_machine
126 127 128 129 130 131 132 133 134 135
            switch(data->host) {
            case MsgParallelHomogeneousPFSMultipleTiersProfileData::Host::HPST:
                hosts_to_use.push_back(context->machines.hpst_machine()->host);
                break;
            case MsgParallelHomogeneousPFSMultipleTiersProfileData::Host::LCST:
                hosts_to_use.push_back(context->machines.pfs_machine()->host);
                break;
            default:
                xbt_die("Should not be reached");
            }
136

137 138 139 140
            // These amounts are deallocated by SG
            computation_amount = xbt_new(double, nb_res);
            communication_amount = nullptr;
            if(size > 0)
Millian Poquet's avatar
Millian Poquet committed
141
            {
142
                communication_amount = xbt_new(double, nb_res*nb_res);
Millian Poquet's avatar
Millian Poquet committed
143
            }
144

145 146 147 148 149 150 151 152 153
            // Let us fill the local computation and communication matrices
            int k = 0;
            for (int y = 0; y < nb_res; ++y)
            {
                computation_amount[y] = cpu;
                if(communication_amount != nullptr)
                {
                    for (int x = 0; x < nb_res; ++x)
                    {
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
                        switch(data->direction) {
                        case MsgParallelHomogeneousPFSMultipleTiersProfileData::Direction::TO_STORAGE:
                            // Communications are done towards the PFS host, which is the last resource (to the storage)
                            if (x != pfs_id)
                            {
                                communication_amount[k++] = 0;
                            }
                            else
                            {
                                communication_amount[k++] = size;
                            }
                            break;
                        case MsgParallelHomogeneousPFSMultipleTiersProfileData::Direction::FROM_STORAGE:
                            // Communications are done towards the job allocation (from the storage)
                            if (x != pfs_id)
                            {
                                communication_amount[k++] = size;
                            }
                            else
                            {
                                communication_amount[k++] = 0;
                            }
                            break;
                        default:
                            xbt_die("Should not be reached");
179
                        }
180 181 182 183
                    }
                }
            }
        }
184 185 186 187 188 189 190 191
        else if (profile->type == ProfileType::MSG_DATA_STAGING)
        {
            task_name_prefix = "data_staging ";
            MsgDataStagingProfileData * data = (MsgDataStagingProfileData *)profile->data;

            double cpu = 0;
            double size = data->size;

192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
            // The PFS machine will also be used
            nb_res = 2;
            int pfs_id = nb_res - 1;

            hosts_to_use = std::vector<msg_host_t>();

            // Add the pfs_machine
            switch(data->direction) {
            case MsgDataStagingProfileData::Direction::LCST_TO_HPST:
                hosts_to_use.push_back(context->machines.pfs_machine()->host);
                hosts_to_use.push_back(context->machines.hpst_machine()->host);
                break;
            case MsgDataStagingProfileData::Direction::HPST_TO_LCST:
                hosts_to_use.push_back(context->machines.hpst_machine()->host);
                hosts_to_use.push_back(context->machines.pfs_machine()->host);
                break;
            default:
                xbt_die("Should not be reached");
            }

            // These amounts are deallocated by SG
            computation_amount = xbt_new(double, nb_res);
            communication_amount = nullptr;
            if(size > 0)
            {
                communication_amount = xbt_new(double, nb_res*nb_res);
            }

            // Let us fill the local computation and communication matrices
            int k = 0;
            for (int y = 0; y < nb_res; ++y)
            {
                computation_amount[y] = cpu;
                if(communication_amount != nullptr)
                {
                    for (int x = 0; x < nb_res; ++x)
                    {
                        // Communications are done towards the last resource
                        if (x != pfs_id)
                        {
                            communication_amount[k++] = 0;
                        }
                        else
                        {
                            communication_amount[k++] = size;
                        }
                    }
                }
            }
241
        }
242

243
        string task_name = task_name_prefix + to_string(job->number) + "'" + job->profile + "'";
244 245 246
        XBT_INFO("Creating task '%s'", task_name.c_str());

        msg_task_t ptask = MSG_parallel_task_create(task_name.c_str(),
247 248
                                                    nb_res,
                                                    hosts_to_use.data(),
249 250
                                                    computation_amount,
                                                    communication_amount, NULL);
251

252 253 254
        // If the process gets killed, the following data may need to be freed
        cleanup_data->task = ptask;

255
        double time_before_execute = MSG_get_clock();
256
        XBT_INFO("Executing task '%s'", MSG_task_get_name(ptask));
257
        msg_error_t err = MSG_parallel_task_execute_with_timeout(ptask, *remaining_time);
258
        *remaining_time = *remaining_time - (MSG_get_clock() - time_before_execute);
259

260
        int ret = profile->return_code;
261 262
        if (err == MSG_OK) {}
        else if (err == MSG_TIMEOUT)
263
        {
264
            ret = -1;
265
        }
266
        else
267
        {
268
            xbt_die("A task execution had been stopped by an unhandled way (err = %d)", err);
269
        }
270 271 272

        XBT_INFO("Task '%s' finished", MSG_task_get_name(ptask));
        MSG_task_destroy(ptask);
273 274 275 276

        // The task has been executed, the data does need to be freed in the cleanup function anymore
        cleanup_data->task = nullptr;

277 278 279 280 281 282 283 284
        return ret;
    }
    else if (profile->type == ProfileType::SEQUENCE)
    {
        SequenceProfileData * data = (SequenceProfileData *) profile->data;

        for (int i = 0; i < data->repeat; i++)
        {
285
            for (unsigned int j = 0; j < data->sequence.size(); j++)
286
            {
287 288 289
                int ret_last_profile = execute_profile(context, data->sequence[j], allocation,
                                    cleanup_data, remaining_time);
                if (ret_last_profile != 0)
290
                {
291
                    return ret_last_profile;
292
                }
293 294
            }
        }
295
        return profile->return_code;
296
    }
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
    else if (profile->type == ProfileType::SCHEDULER_SEND)
    {
        SchedulerSendProfileData * data = (SchedulerSendProfileData *) profile->data;

        XBT_INFO("Sending message to the scheduler: %s", data->message.c_str());
        context->proto_writer->append_from_job_message(job->id, data->message, MSG_get_clock());

        return profile->return_code;
    }
    else if (profile->type == ProfileType::SCHEDULER_RECV)
    {
        SchedulerRecvProfileData * data = (SchedulerRecvProfileData *) profile->data;

        string profile_to_execute = "";
        bool has_messages = false;

        XBT_INFO("Trying to receive message from scheduler");
        if (job->incoming_message_buffer.empty()) {
            if (data->on_timeout == "") {
                XBT_INFO("Waiting for message from scheduler");
                while (true) {
                    static double sleeptime = 0.00001;
                    if (sleeptime < *remaining_time)
                    {
                        MSG_process_sleep(sleeptime);
                        *remaining_time = *remaining_time - sleeptime;
                    }
                    else
                    {
                        XBT_INFO("Job has reached walltime without receiving message from scheduler");
                        MSG_process_sleep(*remaining_time);
                        *remaining_time = 0;
                        return -1;
                    }
                    if (!job->incoming_message_buffer.empty()) {
                        XBT_INFO("Finally got message from scheduler");
                        has_messages = true;
                        break;
                    }
                }
            } else {
                XBT_INFO("Timeout on waiting for message from scheduler");
                profile_to_execute = data->on_timeout;
            }
        } else {
            has_messages = true;
        }
        
        if (has_messages) {
            string first_message = job->incoming_message_buffer.front();
            job->incoming_message_buffer.pop_front();

            regex msg_regex(data->regex);
            if (regex_match(first_message, msg_regex)) {
                XBT_INFO("Message from scheduler matches");
                profile_to_execute = data->on_success;
            } else {
                XBT_INFO("Message from scheduler does not match");
                profile_to_execute = data->on_failure;
            }
        }

        if (profile_to_execute != "") {
            XBT_INFO("Execute profile: %s", profile_to_execute.c_str());
            int ret_last_profile = execute_profile(context, profile_to_execute, allocation,
                                    cleanup_data, remaining_time);
            if (ret_last_profile != 0) {
                return ret_last_profile;
            }
        }
        return profile->return_code;
    }
369 370 371 372 373 374 375 376 377 378
    else if (profile->type == ProfileType::DELAY)
    {
        DelayProfileData * data = (DelayProfileData *) profile->data;

        if (data->delay < *remaining_time)
        {
            XBT_INFO("Sleeping the whole task length");
            MSG_process_sleep(data->delay);
            XBT_INFO("Sleeping done");
            *remaining_time = *remaining_time - data->delay;
379
            return profile->return_code;
380 381 382 383 384 385 386
        }
        else
        {
            XBT_INFO("Sleeping until walltime");
            MSG_process_sleep(*remaining_time);
            XBT_INFO("Walltime reached");
            *remaining_time = 0;
387
            return -1;
388 389 390 391 392
        }
    }
    else if (profile->type == ProfileType::SMPI)
    {
        SmpiProfileData * data = (SmpiProfileData *) profile->data;
393
        msg_sem_t sem = MSG_sem_init(1);
394

395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
        int nb_ranks = data->trace_filenames.size();

        // Let's use the default mapping is none is provided (round-robin on hosts, as we do not
        // know the number of cores on each host)
        if (job->smpi_ranks_to_hosts_mapping.empty())
        {
            job->smpi_ranks_to_hosts_mapping.resize(nb_ranks);
            int host_to_use = 0;
            for (int i = 0; i < nb_ranks; ++i)
            {
                job->smpi_ranks_to_hosts_mapping[i] = host_to_use;
                ++host_to_use %= job->required_nb_res; // ++ is done first
            }
        }

        xbt_assert(nb_ranks == (int) job->smpi_ranks_to_hosts_mapping.size(),
                   "Invalid job %s: SMPI ranks_to_host mapping has an invalid size, as it should "
                   "use %d MPI ranks but the ranking states that there are %d ranks.",
Millian Poquet's avatar
Millian Poquet committed
413
                   job->id.c_str(), nb_ranks, (int) job->smpi_ranks_to_hosts_mapping.size());
414 415

        for (int i = 0; i < nb_ranks; ++i)
416 417
        {
            char *str_instance_id = NULL;
418
            int ret = asprintf(&str_instance_id, "%s!%d", job->workload->name.c_str(), job->number);
Millian Poquet's avatar
Millian Poquet committed
419
            (void) ret; // Avoids a warning if assertions are ignored
420 421 422 423 424 425 426
            xbt_assert(ret != -1, "asprintf failed (not enough memory?)");

            char *str_rank_id  = NULL;
            ret = asprintf(&str_rank_id, "%d", i);
            xbt_assert(ret != -1, "asprintf failed (not enough memory?)");

            char *str_pname = NULL;
427
            ret = asprintf(&str_pname, "%d_%d", job->number, i);
428 429 430 431 432 433
            xbt_assert(ret != -1, "asprintf failed (not enough memory?)");

            char **argv = xbt_new(char*, 5);
            argv[0] = xbt_strdup("1"); // Fonction_replay_label (can be ignored, for log only),
            argv[1] = str_instance_id; // Instance Id (application) job_id is used
            argv[2] = str_rank_id;     // Rank Id
434
            argv[3] = xbt_strdup((char*) data->trace_filenames[i].c_str());
435
            argv[4] = xbt_strdup("0"); //
436 437

            msg_host_t host_to_use = allocation->hosts[job->smpi_ranks_to_hosts_mapping[i]];
438 439 440
            SMPIReplayProcessArguments * message = new SMPIReplayProcessArguments;
            message->semaphore = NULL;
            message->job = job;
441 442 443 444

            XBT_INFO("Hello!");

            if (i == 0)
445
            {
446
                message->semaphore = sem;
447
            }
448

449 450
            msg_process_t process = MSG_process_create_with_arguments(str_pname, smpi_replay_process,
                                                                      message, host_to_use, 5, argv);
451
            job->execution_processes.insert(process);
452

453
            // todo: avoid memory leaks
454
            free(str_pname);
Millian Poquet's avatar
Millian Poquet committed
455

456
        }
457 458
        MSG_sem_acquire(sem);
        free(sem);
459
        return profile->return_code;
460 461
    }
    else
Millian Poquet's avatar
Millian Poquet committed
462 463
        xbt_die("Cannot execute job %s: the profile type '%s' is unknown",
                job->id.c_str(), job->profile.c_str());
464

465
    return 1;
466 467 468 469 470 471 472
}

int execute_job_process(int argc, char *argv[])
{
    (void) argc;
    (void) argv;

473
    // Retrieving input parameters
474 475
    ExecuteJobProcessArguments * args = (ExecuteJobProcessArguments *) MSG_process_get_data(MSG_process_self());

476 477
    Workload * workload = args->context->workloads.at(args->allocation->job_id.workload_name);
    Job * job = workload->jobs->at(args->allocation->job_id.job_number);
478
    job->starting_time = MSG_get_clock();
479
    job->allocation = args->allocation->machine_ids;
Millian Poquet's avatar
Millian Poquet committed
480
    double remaining_time = (double)job->walltime;
481

482 483 484
    // If energy is enabled, let us compute the energy used by the machines before running the job
    if (args->context->energy_used)
    {
485
        job->consumed_energy = consumed_energy_on_machines(args->context, job->allocation);
486
        // Let's trace the consumed energy
487
        args->context->energy_tracer.add_job_start(MSG_get_clock(), job->number);
488 489 490
    }

    // Job computation
491 492 493
    args->context->machines.update_machines_on_job_run(job,
                                                       args->allocation->machine_ids,
                                                       args->context);
494
    CleanExecuteProfileData * cleanup_data = new CleanExecuteProfileData;
495
    cleanup_data->exec_process_args = args;
496
    SIMIX_process_on_exit(MSG_process_self(), execute_profile_cleanup, cleanup_data);
497 498
    job->return_code = execute_profile(args->context, job->profile, args->allocation, cleanup_data, &remaining_time);
    if (job->return_code == 0)
499
    {
500
        XBT_INFO("Job %s finished in time (success)", job->id.c_str());
501 502
        job->state = JobState::JOB_STATE_COMPLETED_SUCCESSFULLY;
    }
503 504 505 506 507
    else if (job->return_code > 0)
    {
        XBT_INFO("Job %s finished in time (failed)", job->id.c_str());
        job->state = JobState::JOB_STATE_COMPLETED_FAILED;
    }
508 509
    else
    {
Millian Poquet's avatar
Millian Poquet committed
510 511
        XBT_INFO("Job %s had been killed (walltime %g reached)",
                 job->id.c_str(), (double) job->walltime);
512
        job->state = JobState::JOB_STATE_COMPLETED_KILLED;
513
        job->kill_reason = "Walltime reached";
514
        if (args->context->trace_schedule)
Millian Poquet's avatar
Millian Poquet committed
515
        {
516 517
            args->context->paje_tracer.add_job_kill(job, args->allocation->machine_ids,
                                                    MSG_get_clock(), true);
Millian Poquet's avatar
Millian Poquet committed
518
        }
519 520
    }

521
    args->context->machines.update_machines_on_job_end(job, args->allocation->machine_ids,
522
                                                       args->context);
523 524 525 526 527 528
    job->runtime = MSG_get_clock() - job->starting_time;
    if (job->runtime == 0)
    {
        XBT_WARN("Job '%s' computed in null time. Putting epsilon instead.", job->id.c_str());
        job->runtime = Rational(1e-5);
    }
529

530 531 532 533
    // If energy is enabled, let us compute the energy used by the machines after running the job
    if (args->context->energy_used)
    {
        long double consumed_energy_before = job->consumed_energy;
534
        job->consumed_energy = consumed_energy_on_machines(args->context, job->allocation);
535 536

        // The consumed energy is the difference (consumed_energy_after_job - consumed_energy_before_job)
537
        job->consumed_energy -= job->consumed_energy - consumed_energy_before;
538 539

        // Let's trace the consumed energy
540
        args->context->energy_tracer.add_job_end(MSG_get_clock(), job->number);
541 542
    }

543 544 545 546 547
    if (args->notify_server_at_end)
    {
        // Let us tell the server that the job completed
        JobCompletedMessage * message = new JobCompletedMessage;
        message->job_id = args->allocation->job_id;
548

549 550
        send_message("server", IPMessageType::JOB_COMPLETED, (void*)message);
    }
551

552
    job->execution_processes.erase(MSG_process_self());
553 554
    return 0;
}
555 556 557 558 559 560 561 562 563 564 565 566 567

int waiter_process(int argc, char *argv[])
{
    (void) argc;
    (void) argv;

    WaiterProcessArguments * args = (WaiterProcessArguments *) MSG_process_get_data(MSG_process_self());

    double curr_time = MSG_get_clock();

    if (curr_time < args->target_time)
    {
        double time_to_wait = args->target_time - curr_time;
568 569
        // Sometimes time_to_wait is so small that it does not affect MSG_process_sleep. The value of 1e-5 have been found on trial-error.
        if(time_to_wait < 1e-5)
Millian Poquet's avatar
Millian Poquet committed
570
        {
571
            time_to_wait = 1e-5;
Millian Poquet's avatar
Millian Poquet committed
572
        }
573 574 575 576 577
        XBT_INFO("Sleeping %g seconds to reach time %g", time_to_wait, args->target_time);
        MSG_process_sleep(time_to_wait);
        XBT_INFO("Sleeping done");
    }
    else
Millian Poquet's avatar
Millian Poquet committed
578
    {
579
        XBT_INFO("Time %g is already reached, skipping sleep", args->target_time);
Millian Poquet's avatar
Millian Poquet committed
580
    }
581 582 583 584 585 586

    send_message("server", IPMessageType::WAITING_DONE);
    delete args;

    return 0;
}
587

588 589 590 591 592 593 594
int execute_profile_cleanup(void * unknown, void * data)
{
    (void) unknown;

    CleanExecuteProfileData * cleanup_data = (CleanExecuteProfileData *) data;
    xbt_assert(cleanup_data != nullptr);

595
    XBT_DEBUG("before freeing computation amount %p", cleanup_data->computation_amount);
596
    xbt_free(cleanup_data->computation_amount);
597
    XBT_DEBUG("before freeing communication amount %p", cleanup_data->communication_amount);
598
    xbt_free(cleanup_data->communication_amount);
599

600 601 602 603 604 605 606 607 608
    if (cleanup_data->exec_process_args != nullptr)
    {
        XBT_DEBUG("before deleting exec_process_args->allocation %p",
                  cleanup_data->exec_process_args->allocation);
        delete cleanup_data->exec_process_args->allocation;
        XBT_DEBUG("before deleting exec_process_args %p", cleanup_data->exec_process_args);
        delete cleanup_data->exec_process_args;
    }

609 610 611 612 613 614
    if (cleanup_data->task != nullptr)
    {
        XBT_WARN("Not cleaning the task data to avoid a SG deadlock :(");
        //MSG_task_destroy(cleanup_data->task);
    }

615
    XBT_DEBUG("before deleting cleanup_data %p", cleanup_data);
616 617 618 619 620
    delete cleanup_data;

    return 0;
}

621 622 623 624 625 626 627
int killer_process(int argc, char *argv[])
{
    (void) argc;
    (void) argv;

    KillerProcessArguments * args = (KillerProcessArguments *) MSG_process_get_data(MSG_process_self());

628 629 630 631 632 633
    for (const JobIdentifier & job_id : args->jobs_ids)
    {
        Job * job = args->context->workloads.job_at(job_id);
        Profile * profile = args->context->workloads.at(job_id.workload_name)->profiles->at(job->profile);
        (void) profile;

634 635
        xbt_assert(job->state == JobState::JOB_STATE_RUNNING ||
                   job->state == JobState::JOB_STATE_COMPLETED_KILLED ||
636 637
                   job->state == JobState::JOB_STATE_COMPLETED_SUCCESSFULLY ||
                   job->state == JobState::JOB_STATE_COMPLETED_FAILED,
638 639 640 641 642 643 644 645
                   "Bad kill: job %s is not running", job->id.c_str());

        if (job->state == JobState::JOB_STATE_RUNNING)
        {
            // Let's kill all the involved processes
            xbt_assert(job->execution_processes.size() > 0);
            for (msg_process_t process : job->execution_processes)
            {
646
                XBT_INFO("Killing process '%s'", MSG_process_get_name(process));
647 648
                MSG_process_kill(process);
            }
649
            job->execution_processes.clear();
650 651 652 653 654

            // Let's update the job information
            job->state = JobState::JOB_STATE_COMPLETED_KILLED;
            job->kill_reason = "Killed from killer_process (probably requested by the decision process)";

655 656 657
            args->context->machines.update_machines_on_job_end(job,
                                                               job->allocation,
                                                               args->context);
658 659
            job->runtime = (Rational)MSG_get_clock() - job->starting_time;

660 661 662 663 664 665 666 667
            xbt_assert(job->runtime >= 0, "Negative runtime of killed job '%s' (%g)!",
                       job->id.c_str(), (double)job->runtime);
            if (job->runtime == 0)
            {
                XBT_WARN("Killed job '%s' has a null runtime. Putting epsilon instead.",
                         job->id.c_str());
                job->runtime = Rational(1e-5);
            }
668 669 670 671 672

            // If energy is enabled, let us compute the energy used by the machines after running the job
            if (args->context->energy_used)
            {
                long double consumed_energy_before = job->consumed_energy;
673
                job->consumed_energy = consumed_energy_on_machines(args->context, job->allocation);
674 675 676 677 678 679 680 681

                // The consumed energy is the difference (consumed_energy_after_job - consumed_energy_before_job)
                job->consumed_energy = job->consumed_energy - consumed_energy_before;

                // Let's trace the consumed energy
                args->context->energy_tracer.add_job_end(MSG_get_clock(), job->number);
            }
        }
682
    }
683 684 685 686 687 688 689 690

    KillingDoneMessage * message = new KillingDoneMessage;
    message->jobs_ids = args->jobs_ids;
    send_message("server", IPMessageType::KILLING_DONE, (void*)message);
    delete args;

    return 0;
}