task_execution.cpp 25.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/**
 * @file task_execution.cpp
 * @brief Contains functions related to the execution of the MSG profile tasks
 */

#include <simgrid/msg.h>
#include "jobs.hpp"
#include "profiles.hpp"
#include "ipp.hpp"
#include "context.hpp"
#include "jobs_execution.hpp"

XBT_LOG_NEW_DEFAULT_CATEGORY(task_execution, "task_execution"); //!< Logging

using namespace std;
16
using namespace roles;
17

18 19 20 21 22 23 24 25
/**
 * @brief Generate the communication and computaion matrix for the msg
 * parallel task profile. It also set the prefix name of the task.
 * @param[out] computation_amount the computation matrix to be simulated by the msg task
 * @param[out] communication_amount the communication matrix to be simulated by the msg task
 * @param[in] nb_res the number of resources the task have to run on
 * @param[in] profile_data the profile data
 */
26
void generate_msg_parallel_task(double *& computation_amount,
27 28 29
                                double *& communication_amount,
                                unsigned int nb_res,
                                void * profile_data)
30 31 32 33 34 35
{
    MsgParallelProfileData* data = (MsgParallelProfileData*)profile_data;
    // These amounts are deallocated by SG
    computation_amount = xbt_new(double, nb_res);
    communication_amount = xbt_new(double, nb_res* nb_res);

36
    // Retrieve the matrices from the profile
37 38 39 40
    memcpy(computation_amount, data->cpu, sizeof(double) * nb_res);
    memcpy(communication_amount, data->com, sizeof(double) * nb_res * nb_res);
}

41
/**
42 43
 * @brief Generates the communication and computaion matrix for the msg
 *        parallel homogeneous task profile. It also set the prefix name of the task.
44 45 46 47 48
 * @param[out] computation_amount the computation matrix to be simulated by the msg task
 * @param[out] communication_amount the communication matrix to be simulated by the msg task
 * @param[in] nb_res the number of resources the task have to run on
 * @param[in] profile_data the profile data
 */
49
void generate_msg_parallel_homogeneous(double *& computation_amount,
50 51 52
                                       double *& communication_amount,
                                       unsigned int nb_res,
                                       void * profile_data)
53
{
54
    MsgParallelHomogeneousProfileData* data = (MsgParallelHomogeneousProfileData*)profile_data;
55 56 57 58 59 60 61

    double cpu = data->cpu;
    double com = data->com;

    // These amounts are deallocated by SG
    computation_amount = xbt_new(double, nb_res);
    communication_amount = nullptr;
62 63
    if (com > 0)
    {
64 65 66 67 68
        communication_amount = xbt_new(double, nb_res* nb_res);
    }

    // Let us fill the local computation and communication matrices
    int k = 0;
69 70
    for (unsigned int y = 0; y < nb_res; ++y)
    {
71
        computation_amount[y] = cpu;
72 73 74 75 76 77
        if (communication_amount != nullptr)
        {
            for (unsigned int x = 0; x < nb_res; ++x)
            {
                if (x == y)
                {
78
                    communication_amount[k] = 0;
79 80 81
                }
                else
                {
82
                    communication_amount[k] = com;
83
                }
84
                k++;
85 86 87 88 89
            }
        }
    }
}

90
/**
Millian Poquet's avatar
Millian Poquet committed
91
 * @brief Generates the communication vector and computation matrix for the msg
92 93 94 95 96 97 98 99
 *        parallel homogeneous total amount task profile.
 *
 * @param[out] computation_amount the computation matrix to be simulated by the msg task
 * @param[out] communication_amount the communication matrix to be simulated by the msg task
 * @param[in] nb_res the number of resources the task have to run on
 * @param[in] profile_data the profile data
 *
 * @details It is like homogeneous profile but instead of giving what has
Millian Poquet's avatar
Millian Poquet committed
100 101
 *          to be done per host, the user gives the total amounts that should be spread
 *          homogeneously across the hosts.
102
 */
103
void generate_msg_parallel_homogeneous_total_amount(double *& computation_amount,
Millian Poquet's avatar
Millian Poquet committed
104 105 106
                                                    double *& communication_amount,
                                                    unsigned int nb_res,
                                                    void * profile_data)
107 108 109
{
    MsgParallelHomogeneousTotalAmountProfileData* data = (MsgParallelHomogeneousTotalAmountProfileData*)profile_data;

Millian Poquet's avatar
Millian Poquet committed
110 111
    const double spread_cpu = data->cpu / nb_res;
    const double spread_com = data->com / nb_res;
112 113 114 115

    // These amounts are deallocated by SG
    computation_amount = xbt_new(double, nb_res);
    communication_amount = nullptr;
Millian Poquet's avatar
Millian Poquet committed
116
    if (spread_com > 0)
117
    {
Millian Poquet's avatar
Millian Poquet committed
118
        communication_amount = xbt_new(double, nb_res * nb_res);
119 120
    }

Millian Poquet's avatar
Millian Poquet committed
121
    // Fill the local computation and communication matrices
122 123 124
    int k = 0;
    for (unsigned int y = 0; y < nb_res; ++y)
    {
Millian Poquet's avatar
Millian Poquet committed
125
        computation_amount[y] = spread_cpu;
126 127 128 129 130 131 132 133 134 135
        if (communication_amount != nullptr)
        {
            for (unsigned int x = 0; x < nb_res; ++x)
            {
                if (x == y)
                {
                    communication_amount[k] = 0;
                }
                else
                {
Millian Poquet's avatar
Millian Poquet committed
136
                    communication_amount[k] = spread_com;
137 138 139 140 141 142 143
                }
                k++;
            }
        }
    }
}

144 145
/**
 * @brief Generate the communication and computaion matrix for the msg
146
 *        parallel homogeneous task profile with a Parallel File System.
147
 *
148 149 150
 * @param[out] computation_amount the computation matrix to be simulated by the msg task
 * @param[out] communication_amount the communication matrix to be simulated by the msg task
 * @param[in,out] hosts_to_use the list of host to be used by the task
151 152
 * @param[in] storage_mapping mapping from label given in the profile and machine id
 * @param[in] profile_data the profile data
153
 * @param[in] context the batsim context
154 155
 *
 * @details Note that the number of resource is also altered because of the
156
 *          pfs node that is addded.
157
 */
158
void generate_msg_parallel_homogeneous_with_pfs(double *& computation_amount,
159
                                                double *& communication_amount,
160 161
                                                std::vector<msg_host_t> & hosts_to_use,
                                                std::map<std::string, int> storage_mapping,
162
                                                void * profile_data,
163
                                                BatsimContext * context)
164
{
165 166
    MsgParallelHomogeneousPFSProfileData* data =
            (MsgParallelHomogeneousPFSProfileData*)profile_data;
167 168

    // The PFS machine will also be used
169
    unsigned int nb_res = hosts_to_use.size() + 1;
170
    unsigned int pfs_id = nb_res - 1;
171 172

    // Add the pfs_machine
173 174 175 176 177 178 179 180 181 182
    int pfs_machine_id;
    if (storage_mapping.empty())
    {
        if (context->machines.storage_machines().size() == 1)
        {
            // No label given: Use the only storage available
            pfs_machine_id = context->machines.storage_machines().at(0)->id;
        }
        else
        {
183 184
            xbt_assert(false, "No storage/host mapping given and there is no"
                    "(or more than one) storage node available");
185 186 187 188 189
        }
    }
    else
    {
        pfs_machine_id = storage_mapping[data->storage_label];
190 191
        xbt_assert(context->machines[pfs_machine_id]->permissions == Permissions::STORAGE,
                "The given node (%d) is not a storage node", pfs_machine_id);
192 193
    }
    hosts_to_use.push_back(context->machines[pfs_machine_id]->host);
194 195 196 197

    // These amounts are deallocated by SG
    computation_amount = xbt_new(double, nb_res);
    communication_amount = nullptr;
198
    if (data->bytes_to_read > 0 || data->bytes_to_write > 0)
199
    {
200 201 202 203 204
        communication_amount = xbt_new(double, nb_res* nb_res);
    }

    // Let us fill the local computation and communication matrices
    int k = 0;
205
    for (unsigned int row = 0; row < nb_res; ++row)
206
    {
207
        computation_amount[row] = 0;
208 209
        if (communication_amount != nullptr)
        {
210
            for (unsigned int col = 0; col < nb_res; ++col)
211
            {
212 213
                // No intra node comm and no inter node comm if it's not the pfs
                if (col == row or (col != pfs_id and row != pfs_id))
214
                {
215 216 217 218
                    communication_amount[k] = 0;
                }
                // Writes
                else if (col == pfs_id)
219
                {
220 221 222 223
                    communication_amount[k] = data->bytes_to_write;
                }
                // Reads
                else if (row == pfs_id)
224
                {
225
                    communication_amount[k] = data->bytes_to_read;
226
                }
227
                k++;
228 229 230 231 232
            }
        }
    }
}

233 234 235 236 237 238 239 240 241
/**
 * @brief Generate the communication and computaion matrix for the msg
 * data staging task profile.
 * @details Note that the number of resource is also altered because only
 * the pfs and the hpst are involved in the transfer. It also set the prefix
 * name of the task.
 * @param[out] computation_amount the computation matrix to be simulated by the msg task
 * @param[out] communication_amount the communication matrix to be simulated by the msg task
 * @param[in,out] hosts_to_use the list of host to be used by the task
242 243
 * @param[in] storage_mapping mapping from label given in the profile and machine id
 * @param[in] profile_data the profile data
244 245
 * @param[in] context the batsim context
 */
246
void generate_msg_data_staginig_task(double *&  computation_amount,
247
                                     double *& communication_amount,
248 249
                                     std::vector<msg_host_t> & hosts_to_use,
                                     std::map<std::string, int> storage_mapping,
250
                                     void * profile_data,
251
                                     BatsimContext * context)
252 253 254 255
{
    MsgDataStagingProfileData* data = (MsgDataStagingProfileData*)profile_data;

    double cpu = 0;
256
    double nb_bytes = data->nb_bytes;
257 258

    // The PFS machine will also be used
259
    unsigned int nb_res = 2;
260
    unsigned int pfs_id = nb_res - 1;
261 262 263 264 265

    // reset the alloc to use only IO nodes
    hosts_to_use = std::vector<msg_host_t>();

    // Add the pfs_machine
266
    int from_machine_id = storage_mapping[data->from_storage_label];
267
    xbt_assert(context->machines[from_machine_id]->permissions == Permissions::STORAGE, "The given Storage for 'from' (%d) is not a storage node", from_machine_id);
268
    int to_machine_id = storage_mapping[data->to_storage_label];
269
    xbt_assert(context->machines[to_machine_id]->permissions == Permissions::STORAGE, "The given Storage for 'from' (%d) is not a storage node", to_machine_id);
270 271
    hosts_to_use.push_back(context->machines[from_machine_id]->host);
    hosts_to_use.push_back(context->machines[to_machine_id]->host);
272 273 274 275

    // These amounts are deallocated by SG
    computation_amount = xbt_new(double, nb_res);
    communication_amount = nullptr;
276
    if (nb_bytes > 0)
277
    {
278 279 280 281 282
        communication_amount = xbt_new(double, nb_res* nb_res);
    }

    // Let us fill the local computation and communication matrices
    int k = 0;
283
    for (unsigned int row = 0; row < nb_res; ++row)
284
    {
285
        computation_amount[row] = cpu;
286 287
        if (communication_amount != nullptr)
        {
288
            for (unsigned int col = 0; col < nb_res; ++col)
289
            {
290
                // Communications are done towards the last resource
291
                if (col == row or col != pfs_id)
292
                {
293
                    communication_amount[k] = 0;
294 295 296
                }
                else
                {
297
                    communication_amount[k] = nb_bytes;
298
                }
299
                k++;
300 301 302 303 304
            }
        }
    }
}

305 306 307 308 309 310 311 312 313
/**
 * @brief Debug print of a parallel task (via XBT_DEBUG)
 * @param[in] computation_vector The ptask computation vector
 * @param[in] communication_matrix The ptask communication matrix
 * @param[in] nb_res The number of hosts involved in the parallel task
 */
void debug_print_ptask(const double * computation_vector,
                       const double * communication_matrix,
                       unsigned int nb_res)
314 315 316
{
    string comp = "";
    string comm = "";
317
    int k = 0;
318 319
    for (unsigned int i=0; i < nb_res; i++)
    {
320 321 322 323 324
        if (computation_vector != nullptr)
        {
            comp += to_string(computation_vector[i]) + ", ";
        }
        if (communication_matrix != nullptr)
325
        {
326 327 328 329 330
            for (unsigned int j = 0; j < nb_res; j++)
            {
                comm += to_string(communication_matrix[k++]) + ", ";
            }
            comm += "\n";
331 332 333
        }
    }

334
    XBT_DEBUG("Generated matrices: \nCompute: \n%s\nComm:\n%s", comp.c_str(), comm.c_str());
335
}
336
/**
337 338 339 340 341 342 343
 * @brief
 * @param[out] computation_vector The computation vector to be simulated by the msg task
 * @param[out] communication_matrix The communication matrix to be simulated by the msg task
 * @param[in,out] hosts_to_use The list of host to be used by the task
 * @param[in] profile The profile to be converted to a compute/comm matrix
 * @param[in] storage_mapping The storage mapping
 * @param[in] context The BatsimContext
344
 */
345
void generate_matices_from_profile(double *& computation_vector,
346
                                   double *& communication_matrix,
347
                                   std::vector<msg_host_t> & hosts_to_use,
348 349
                                   Profile * profile,
                                   const std::map<std::string, int> * storage_mapping,
350
                                   BatsimContext * context)
351
{
352

353
    unsigned int nb_res = hosts_to_use.size();
354

355 356
    XBT_DEBUG("Number of hosts to use: %d", nb_res);

357 358 359
    switch(profile->type)
    {
    case ProfileType::MSG_PARALLEL:
360
        generate_msg_parallel_task(computation_vector,
361 362
                                   communication_matrix,
                                   nb_res,
363
                                   profile->data);
364 365
        break;
    case ProfileType::MSG_PARALLEL_HOMOGENEOUS:
366
        generate_msg_parallel_homogeneous(computation_vector,
367 368
                                          communication_matrix,
                                          nb_res,
369
                                          profile->data);
370
        break;
371
    case ProfileType::MSG_PARALLEL_HOMOGENEOUS_TOTAL_AMOUNT:
372
        generate_msg_parallel_homogeneous_total_amount(computation_vector,
373
                                                       communication_matrix,
374 375
                                                       nb_res,
                                                       profile->data);
376
        break;
377
    case ProfileType::MSG_PARALLEL_HOMOGENEOUS_PFS:
378
        generate_msg_parallel_homogeneous_with_pfs(computation_vector,
379 380
                                                   communication_matrix,
                                                   hosts_to_use,
381
                                                   *storage_mapping,
382 383
                                                   profile->data,
                                                   context);
384 385
        break;
    case ProfileType::MSG_DATA_STAGING:
386
        generate_msg_data_staginig_task(computation_vector,
387
                                        communication_matrix,
388
                                        hosts_to_use,
389
                                        *storage_mapping,
390 391
                                        profile->data,
                                        context);
392 393 394
        break;
    default:
        xbt_die("Should not be reached.");
395
    }
396
    debug_print_ptask(computation_vector, communication_matrix, hosts_to_use.size());
397 398 399

}

400 401 402 403 404 405 406 407 408
/**
 * @brief Checks if the machines allocated to a ptask can execute it
 * @param[in] alloc The machines on which the ptask should run
 * @param[in] computation_matrix The ptask communication matrix
 * @param[in] context The BatsimContext
 */
void check_ptask_execution_permission(const MachineRange & alloc,
                                      const double * computation_matrix,
                                      BatsimContext * context)
409 410 411
{
    // TODO: simplify the roles because it is very simple in the end
    // Enforce role permission
412 413

    // TODO: handle mapping (ptasks can be executed with non-unique hosts)
414 415 416 417 418 419 420 421 422 423 424 425 426
    for (unsigned int i = 0; i < alloc.size(); i++)
    {
        int machine_id = alloc[i];
        XBT_DEBUG("enforcing permission for machine id: %d", machine_id);
        Permissions perm = context->machines[machine_id]->permissions;
        if (computation_matrix[i] != 0)
        {
            XBT_DEBUG("found computation: %f", computation_matrix[i]);
            xbt_assert(perm == Permissions::COMPUTE_NODE,
                "Some computation (%f) is assigned to a storage node (id: %d)",
                computation_matrix[i], machine_id);
        }
    }
427 428 429 430 431 432 433 434 435 436 437
}

int execute_msg_task(BatTask * btask,
                     const SchedulingAllocation* allocation,
                     double * remaining_time,
                     BatsimContext * context,
                     CleanExecuteTaskData * cleanup_data)
{
    Profile * profile = btask->profile;
    std::vector<msg_host_t> hosts_to_use = allocation->hosts;

438
    double* computation_vector = nullptr;
439 440
    double* communication_matrix = nullptr;

441 442 443 444 445
    string task_name = profile_type_to_string(profile->type) + '_' + btask->parent_job->id.to_string() +
                       "_" + btask->profile->name;
    XBT_DEBUG("Generating comm/compute matrix for task '%s' with allocation %s",
            task_name.c_str(), allocation->machine_ids.to_string_hyphen().c_str());

446
    generate_matices_from_profile(computation_vector,
447
                                  communication_matrix,
448
                                  hosts_to_use,
449
                                  profile,
450 451
                                  & allocation->storage_mapping,
                                  context);
452

453
    check_ptask_execution_permission(allocation->machine_ids, computation_vector, context);
454

455
    //FIXME: This will not work for the PFS profiles
456
    // Manage additional io job
457
    if (btask->io_profile != nullptr)
458 459
    {
        Profile * io_profile = btask->io_profile;
460
        double* io_computation_vector = nullptr;
461 462
        double* io_communication_matrix = nullptr;

463 464
        XBT_DEBUG("Generating comm/compute matrix for IO with alloaction: %s",
                allocation->io_allocation.to_string_hyphen().c_str());
465
        std::vector<msg_host_t> io_hosts = allocation->io_hosts;
466
        generate_matices_from_profile(io_computation_vector,
467
                                      io_communication_matrix,
468
                                      io_hosts,
469 470 471
                                      io_profile,
                                      nullptr,
                                      context);
472 473
        // merge the two profiles
        // First get part of the allocation that do change or not in the job
474 475 476
        MachineRange immut_job_alloc = difference(allocation->machine_ids, allocation->io_allocation);
        MachineRange immut_io_alloc = difference(allocation->io_allocation, allocation->machine_ids);
        MachineRange to_merge_alloc = intersection(allocation->machine_ids, allocation->io_allocation);
477
        MachineRange new_alloc = union_itvs(allocation->machine_ids, allocation->io_allocation);
478 479 480

        // FIXME this does not work for profiles that changes the number of hosts: where the allocation and the host to use
        // are different
481
        // Maybe this and the IO profiles should be merged to simplify implementation
482
        XBT_DEBUG("Job+IO allocation: %s", new_alloc.to_string_hyphen().c_str());
483 484

        //Generate the new list of hosts
485 486
        vector<msg_host_t> new_hosts_to_use;
        for (unsigned int i = 0; i < new_alloc.size(); i++)
487
        {
488
            int machine_id = new_alloc[i];
489 490 491 492
            new_hosts_to_use.push_back(context->machines[machine_id]->host);
        }

        // Generate the new matrices
493
        unsigned int nb_res = new_hosts_to_use.size();
494 495

        // These amounts are deallocated by SG
496 497
        double * new_computation_vector = xbt_new(double, nb_res);
        double * new_communication_matrix = xbt_new(double, nb_res* nb_res);
498 499 500

        // Fill the computation and communication matrices
        int k = 0;
501 502 503 504
        int col_job_host_index = 0;
        int row_job_host_index = 0;
        int col_io_host_index = 0;
        int row_io_host_index = 0;
505 506
        bool col_only_in_job;
        bool col_only_in_io;
507
        for (unsigned int col = 0; col < nb_res; ++col)
508
        {
509 510 511
            col_only_in_job = false;
            col_only_in_io = false;
            int curr_machine = new_alloc[col];
Millian Poquet's avatar
Millian Poquet committed
512
            XBT_DEBUG("Current machine in generation: %d", col);
513
            // Fill computation vector
514
            if (to_merge_alloc.contains(curr_machine))
515
            {
516
                new_computation_vector[col] = computation_vector[col_job_host_index++] + io_computation_vector[col_io_host_index++];
517
            }
518
            else if (immut_job_alloc.contains(curr_machine))
519
            {
520 521 522
                new_computation_vector[col] = computation_vector[col_job_host_index++];
                col_only_in_job = true;
            }
523
            else if (immut_io_alloc.contains(curr_machine))
524 525 526 527 528 529 530 531 532 533 534 535
            {
                new_computation_vector[col] = io_computation_vector[col_io_host_index++];
                col_only_in_io = true;
            }
            else
            {
                xbt_assert(false, "This should not happen");
            }

            // Fill communication matrix with merged values
            for (unsigned int row = 0; row < nb_res; ++row)
            {
536
                if (to_merge_alloc.contains(new_alloc[row]))
537 538
                {
                    if (col_only_in_job){
539 540 541 542 543 544 545 546
                        if (communication_matrix != nullptr)
                        {
                            new_communication_matrix[k] = communication_matrix[row_job_host_index++];
                        }
                        else
                        {
                            new_communication_matrix[k] = 0;
                        }
547 548 549 550 551
                    }
                    else if (col_only_in_io){
                        new_communication_matrix[k] = io_communication_matrix[row_io_host_index++];
                    }
                    else {
552 553 554 555 556 557 558 559
                        if (communication_matrix != nullptr)
                        {
                            new_communication_matrix[k] = communication_matrix[row_job_host_index++] + io_communication_matrix[row_io_host_index++];
                        }
                        else
                        {
                            new_communication_matrix[k] = io_communication_matrix[row_io_host_index++];
                        }
560 561 562
                    }
                }
                else if (immut_job_alloc.contains(new_alloc[row]))
563
                {
564
                    if (col_only_in_io or communication_matrix == nullptr){
565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580
                        new_communication_matrix[k] = 0;
                    }
                    else
                    {
                        new_communication_matrix[k] = communication_matrix[row_job_host_index++];
                    }
                }
                else if (immut_io_alloc.contains(new_alloc[row]))
                {
                    if (col_only_in_job){
                        new_communication_matrix[k] = 0;
                    }
                    else
                    {
                        new_communication_matrix[k] = io_communication_matrix[row_io_host_index++];
                    }
581 582 583
                }
                else
                {
584
                    xbt_assert(false, "This should not happen");
585 586 587 588
                }
                k++;
            }
        }
589

590 591 592 593 594
        // update variables with merged matrix
        communication_matrix = new_communication_matrix;
        computation_vector = new_computation_vector;
        hosts_to_use = new_hosts_to_use;
        // TODO Free old job and io structures
595
        XBT_DEBUG("Merged Job+IO matrices");
596
        debug_print_ptask(computation_vector, communication_matrix, hosts_to_use.size());
597

598
        check_ptask_execution_permission(new_alloc, computation_vector, context);
599
    }
600 601


602
    // Create the MSG task
Millian Poquet's avatar
Millian Poquet committed
603
    XBT_DEBUG("Creating MSG task '%s' on %zu resources", task_name.c_str(), hosts_to_use.size());
604 605
    msg_task_t ptask = MSG_parallel_task_create(task_name.c_str(), hosts_to_use.size(),
                                                hosts_to_use.data(), computation_vector,
606
                                                communication_matrix, NULL);
607 608 609 610

    // If the process gets killed, the following data may need to be freed
    cleanup_data->task = ptask;

611 612 613 614
    // Keep track of the task to get information on kill
    btask->ptask = ptask;

    // Execute the MSG task (blocking)
MERCIER Michael's avatar
MERCIER Michael committed
615
    msg_error_t err;
Millian Poquet's avatar
Millian Poquet committed
616 617
    if (*remaining_time < 0)
    {
618
        XBT_DEBUG("Executing task '%s' without walltime", MSG_task_get_name(ptask));
MERCIER Michael's avatar
MERCIER Michael committed
619 620 621 622 623
        err = MSG_parallel_task_execute(ptask);
    }
    else
    {
        double time_before_execute = MSG_get_clock();
624
        XBT_DEBUG("Executing task '%s' with walltime of %g", MSG_task_get_name(ptask), *remaining_time);
MERCIER Michael's avatar
MERCIER Michael committed
625 626 627
        err = MSG_parallel_task_execute_with_timeout(ptask, *remaining_time);
        *remaining_time = *remaining_time - (MSG_get_clock() - time_before_execute);
    }
628

629
    int ret;
630 631 632 633
    if (err == MSG_OK)
    {
        ret = profile->return_code;
    }
634 635
    else if (err == MSG_TIMEOUT)
    {
636
        ret = -1;
637 638 639 640
    }
    else
    {
        xbt_die("A task execution had been stopped by an unhandled way (err = %d)", err);
641 642
    }

643
    XBT_DEBUG("Task '%s' finished", MSG_task_get_name(ptask));
644 645
    MSG_task_destroy(ptask);

646
    // The task has been executed, the data does need to be freed in the cleanup function anymore
647 648 649 650 651
    cleanup_data->task = nullptr;

    return ret;
}