server.cpp 43.3 KB
Newer Older
Millian Poquet's avatar
Millian Poquet committed
1 2 3 4 5
/**
 * @file server.cpp
 * @brief Contains functions related to the general orchestration of the simulation
 */

6 7 8 9 10 11 12
#include "server.hpp"

#include <string>

#include <boost/algorithm/string.hpp>

#include <simgrid/msg.h>
13
#include <simgrid/s4u.hpp>
14 15 16 17 18 19

#include "context.hpp"
#include "ipp.hpp"
#include "network.hpp"
#include "jobs_execution.hpp"

Millian Poquet's avatar
Millian Poquet committed
20
XBT_LOG_NEW_DEFAULT_CATEGORY(server, "server"); //!< Logging
21 22 23

using namespace std;

24
void server_process(BatsimContext * context)
25
{
26 27
    ServerData * data = new ServerData;
    data->context = context;
28

29 30
    // Let's tell the Decision process that the simulation is about to begin
    // (and that some data can be read from the data storage)
31
    context->proto_writer->append_simulation_begins(context->machines,
32
                                                    context->workloads,
MOMMESSIN Clement's avatar
MOMMESSIN Clement committed
33
                                                    context->config_json,
34 35
                                                    context->allow_compute_sharing,
                                                    context->allow_storage_sharing,
36
                                                    MSG_get_clock());
Millian Poquet's avatar
Millian Poquet committed
37

38
    string send_buffer = context->proto_writer->generate_current_message(MSG_get_clock());
Millian Poquet's avatar
Millian Poquet committed
39
    context->proto_writer->clear();
40

41 42 43
    simgrid::s4u::Actor::create("Scheduler REQ-REP", simgrid::s4u::this_actor::get_host(),
                                request_reply_scheduler_process,
                                context, send_buffer);
44
    data->sched_ready = false;
45

46 47 48
    // Let's prepare a handler map to react on events
    std::map<IPMessageType, std::function<void(ServerData *, IPMessage *)>> handler_map;
    handler_map[IPMessageType::JOB_SUBMITTED] = server_on_job_submitted;
49 50
    handler_map[IPMessageType::JOB_REGISTERED_BY_DP] = server_on_register_job;
    handler_map[IPMessageType::PROFILE_REGISTERED_BY_DP] = server_on_register_profile;
51 52 53
    handler_map[IPMessageType::JOB_COMPLETED] = server_on_job_completed;
    handler_map[IPMessageType::PSTATE_MODIFICATION] = server_on_pstate_modification;
    handler_map[IPMessageType::SCHED_EXECUTE_JOB] = server_on_execute_job;
54
    handler_map[IPMessageType::SCHED_CHANGE_JOB_STATE] = server_on_change_job_state;
55
    handler_map[IPMessageType::TO_JOB_MSG] = server_on_to_job_msg;
56
    handler_map[IPMessageType::FROM_JOB_MSG] = server_on_from_job_msg;
57 58 59 60
    handler_map[IPMessageType::SCHED_REJECT_JOB] = server_on_reject_job;
    handler_map[IPMessageType::SCHED_KILL_JOB] = server_on_kill_jobs;
    handler_map[IPMessageType::SCHED_CALL_ME_LATER] = server_on_call_me_later;
    handler_map[IPMessageType::SCHED_TELL_ME_ENERGY] = server_on_sched_tell_me_energy;
61
    handler_map[IPMessageType::SCHED_SET_JOB_METADATA] = server_on_set_job_metadata;
62 63 64 65 66 67 68
    handler_map[IPMessageType::SCHED_WAIT_ANSWER] = server_on_sched_wait_answer;
    handler_map[IPMessageType::WAIT_QUERY] = server_on_wait_query;
    handler_map[IPMessageType::SCHED_READY] = server_on_sched_ready;
    handler_map[IPMessageType::WAITING_DONE] = server_on_waiting_done;
    handler_map[IPMessageType::KILLING_DONE] = server_on_killing_done;
    handler_map[IPMessageType::SUBMITTER_HELLO] = server_on_submitter_hello;
    handler_map[IPMessageType::SUBMITTER_BYE] = server_on_submitter_bye;
69 70
    handler_map[IPMessageType::SWITCHED_ON] = server_on_switched;
    handler_map[IPMessageType::SWITCHED_OFF] = server_on_switched;
71 72
    handler_map[IPMessageType::END_DYNAMIC_REGISTER] = server_on_end_dynamic_register;
    handler_map[IPMessageType::CONTINUE_DYNAMIC_REGISTER] = server_on_continue_dynamic_register;
73

Millian Poquet's avatar
Millian Poquet committed
74 75 76 77 78 79 80 81 82
    /* Currently, there is one submtiter per input file (workload or workflow).
       As workflows use an inner workload, calling nb_static_workloads() should
       be enough. The dynamic submitter (from the decision process) is not part
       of this count as it can finish then restart... */
    data->expected_nb_submitters = context->workloads.nb_static_workloads();

    // Is the simulation already finished? It can occur now if there is no job to execute.
    check_simulation_finished(data);

83
    // Simulation loop
Millian Poquet's avatar
Millian Poquet committed
84
    while (!data->end_of_simulation_ack_received)
85
    {
86 87
        // Wait and receive a message from a node or the request-reply process...
        IPMessage * message = receive_message("server");
88
        XBT_DEBUG("Server received a message of type %s:",
89
                 ip_message_type_to_string(message->type).c_str());
90

91 92
        // Handle the message
        xbt_assert(handler_map.count(message->type) == 1,
93
                   "The server does not know how to handle message type %s.",
94 95 96
                   ip_message_type_to_string(message->type).c_str());
        auto handler_function = handler_map[message->type];
        handler_function(data, message);
97

98 99
        // Delete the message
        delete message;
100

101 102
        // Let's send a message to the scheduler if needed
        if (data->sched_ready && // The scheduler must be ready
Millian Poquet's avatar
Millian Poquet committed
103 104 105
            !context->proto_writer->is_empty() && // There must be something to send to the scheduler
            !data->end_of_simulation_ack_received // The simulation must NOT be finished
            )
106
        {
107
            string send_buffer = context->proto_writer->generate_current_message(MSG_get_clock());
108
            context->proto_writer->clear();
109

110 111 112
            simgrid::s4u::Actor::create("Scheduler REQ-REP", simgrid::s4u::this_actor::get_host(),
                                        request_reply_scheduler_process,
                                        context, send_buffer);
113
            data->sched_ready = false;
Millian Poquet's avatar
Millian Poquet committed
114 115

            if (data->end_of_simulation_in_send_buffer)
116
            {
Millian Poquet's avatar
Millian Poquet committed
117
                data->end_of_simulation_in_send_buffer = false;
118
                data->end_of_simulation_sent = true;
119
            }
120
        }
121

122
    } // end of while
123

124
    XBT_INFO("Simulation is finished!");
Millian Poquet's avatar
Millian Poquet committed
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141

    // Is simulation also finished for the decision process?
    xbt_assert(!data->end_of_simulation_in_send_buffer, "Left simulation loop, but the SIMULATION_ENDS message is still in the send buffer to the decision process.");
    xbt_assert(data->end_of_simulation_sent, "Left simulation loop, but the SIMULATION_ENDS message has not been sent to the scheduler.");
    xbt_assert(data->end_of_simulation_ack_received, "Left simulation loop, but the decision process did not ACK the SIMULATION_ENDS message.");

    // Are there still pending actions in Batsim?
    xbt_assert(data->sched_ready, "Left simulation loop, but a call to the decision process is ongoing.");
    xbt_assert(data->nb_running_jobs == 0, "Left simulation loop, but some jobs are running.");
    xbt_assert(data->nb_switching_machines == 0, "Left simulation loop, but some machines are being switched.");
    xbt_assert(data->nb_killers == 0, "Left simulation loop, but some killer processes (used to kill jobs) are running.");

    if (data->nb_waiters > 0)
        XBT_WARN("Left simulation loop, but some waiter processes (used to manage the CALL_ME_LATER message) are running.");

    // Consistency
    xbt_assert(data->nb_completed_jobs == data->nb_submitted_jobs, "All submitted jobs have not been completed (either executed and finished, or rejected).");
142

143 144
    delete data;
}
145

146 147 148 149
void server_on_submitter_hello(ServerData * data,
                               IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
Millian Poquet's avatar
Millian Poquet committed
150 151
    xbt_assert(!data->end_of_simulation_in_send_buffer,
               "A new submitter said hello but the simulation is finished... Aborting.");
152
    SubmitterHelloMessage * message = (SubmitterHelloMessage *) task_data->data;
153

154 155 156
    xbt_assert(data->submitters.count(message->submitter_name) == 0,
               "Invalid new submitter '%s': a submitter with the same name already exists!",
               message->submitter_name.c_str());
157

158
    data->nb_submitters++;
159

160 161 162
    ServerData::Submitter * submitter = new ServerData::Submitter;
    submitter->mailbox = message->submitter_name;
    submitter->should_be_called_back = message->enable_callback_on_job_completion;
163

164
    data->submitters[message->submitter_name] = submitter;
165

166
    XBT_DEBUG("New submitter said hello. Number of polite submitters: %d",
167 168
             data->nb_submitters);
}
169

170 171 172 173 174 175 176 177 178 179 180 181
void server_on_submitter_bye(ServerData * data,
                             IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
    SubmitterByeMessage * message = (SubmitterByeMessage *) task_data->data;

    xbt_assert(data->submitters.count(message->submitter_name) == 1);
    delete data->submitters[message->submitter_name];
    data->submitters.erase(message->submitter_name);

    data->nb_submitters_finished++;
    if (message->is_workflow_submitter)
182
    {
183
        data->nb_workflow_submitters_finished++;
184
    }
185
    XBT_DEBUG("A submitter said goodbye. Number of finished submitters: %d",
186 187
             data->nb_submitters_finished);

188 189
    if(data->nb_submitters_finished == data->expected_nb_submitters)
    {
190
        data->context->proto_writer->append_notify("no_more_static_job_to_submit", MSG_get_clock());
191 192
    }

Millian Poquet's avatar
Millian Poquet committed
193
    check_simulation_finished(data);
194
}
195

196 197 198 199 200
void server_on_job_completed(ServerData * data,
                             IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
    JobCompletedMessage * message = (JobCompletedMessage *) task_data->data;
Millian Poquet's avatar
Millian Poquet committed
201

202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
    if (data->origin_of_jobs.count(message->job_id) == 1)
    {
        // Let's call the submitter which submitted the job back
        SubmitterJobCompletionCallbackMessage * msg = new SubmitterJobCompletionCallbackMessage;
        msg->job_id = message->job_id;

        ServerData::Submitter * submitter = data->origin_of_jobs.at(message->job_id);
        dsend_message(submitter->mailbox, IPMessageType::SUBMITTER_CALLBACK, (void*) msg);

        data->origin_of_jobs.erase(message->job_id);
    }

    data->nb_running_jobs--;
    xbt_assert(data->nb_running_jobs >= 0);
    data->nb_completed_jobs++;
    xbt_assert(data->nb_completed_jobs + data->nb_running_jobs <= data->nb_submitted_jobs);
    Job * job = data->context->workloads.job_at(message->job_id);

220
    XBT_INFO("Job %s has COMPLETED. %d jobs completed so far",
MERCIER Michael's avatar
MERCIER Michael committed
221
             job->id.to_string().c_str(), data->nb_completed_jobs);
222 223

    data->context->proto_writer->append_job_completed(message->job_id.to_string(),
224
                                                      job_state_to_string(job->state),
225
                                                      job->allocation.to_string_hyphen(" "),
226
                                                      job->return_code,
227
                                                      MSG_get_clock());
228

Millian Poquet's avatar
Millian Poquet committed
229
    check_simulation_finished(data);
230
}
231

232 233 234 235 236 237 238 239 240 241
void server_on_job_submitted(ServerData * data,
                             IPMessage * task_data)
{
    // Ignore all submissions if -k was specified and all workflows have completed
    if ((data->context->workflows.size() != 0) && (data->context->terminate_with_last_workflow) &&
        (data->nb_workflow_submitters_finished == data->context->workflows.size()))
    {
        XBT_INFO("Ignoring Job due to -k command-line option");
        return;
    }
242

243 244
    xbt_assert(task_data->data != nullptr);
    JobSubmittedMessage * message = (JobSubmittedMessage *) task_data->data;
245

246
    xbt_assert(data->submitters.count(message->submitter_name) == 1);
247

248
    ServerData::Submitter * submitter = data->submitters.at(message->submitter_name);
249
    for (JobIdentifier & job_id : message->job_ids)
250
    {
251 252 253 254 255
        if (submitter->should_be_called_back)
        {
            xbt_assert(data->origin_of_jobs.count(job_id) == 0);
            data->origin_of_jobs[job_id] = submitter;
        }
256

257
        // Let's retrieve the Job from memory (or add it into memory if it is dynamic)
258
        XBT_DEBUG("Job received: %s", job_id.to_string().c_str());
MERCIER Michael's avatar
MERCIER Michael committed
259

260
        XBT_DEBUG("Workloads: %s", data->context->workloads.to_string().c_str());
MERCIER Michael's avatar
MERCIER Michael committed
261

262 263 264
        xbt_assert(data->context->workloads.job_is_registered(job_id));
        Job * job = data->context->workloads.job_at(job_id);
        job->id = job_id;
265

266 267 268 269 270
        // Update control information
        job->state = JobState::JOB_STATE_SUBMITTED;
        ++data->nb_submitted_jobs;
        XBT_INFO("Job %s SUBMITTED. %d jobs submitted so far",
                 job_id.to_string().c_str(), data->nb_submitted_jobs);
271

272
        string job_json_description, profile_json_description;
273

274
        if (!data->context->redis_enabled)
275
        {
276 277 278 279 280
            job_json_description = job->json_description;
            if (data->context->submission_forward_profiles)
            {
                profile_json_description = job->workload->profiles->at(job->profile)->json_description;
            }
281
        }
282

283 284 285 286 287
        data->context->proto_writer->append_job_submitted(job->id.to_string(),
                                                          job_json_description,
                                                          profile_json_description,
                                                          MSG_get_clock());
    }
288
}
289

290 291 292
void server_on_pstate_modification(ServerData * data,
                                   IPMessage * task_data)
{
293 294 295 296 297
    xbt_assert(data->context->energy_used,
               "Receiving a pstate modification request, which is forbidden as "
               "Batsim has not been launched with energy support "
               "(cf. batsim --help).");

298 299 300 301
    xbt_assert(task_data->data != nullptr);
    PStateModificationMessage * message = (PStateModificationMessage *) task_data->data;

    data->context->current_switches.add_switch(message->machine_ids, message->new_pstate);
302 303
    data->context->energy_tracer.add_pstate_change(MSG_get_clock(), message->machine_ids,
                                                   message->new_pstate);
304 305 306 307 308 309

    // Let's quickly check whether this is a switchON or a switchOFF
    // Unknown transition states will be set to -42.
    int transition_state = -42;
    Machine * first_machine = data->context->machines[message->machine_ids.first_element()];
    if (first_machine->pstates[message->new_pstate] == PStateType::COMPUTATION_PSTATE)
310
    {
311
        transition_state = -1; // means we are switching to a COMPUTATION_PSTATE
312
    }
313
    else if (first_machine->pstates[message->new_pstate] == PStateType::SLEEP_PSTATE)
314
    {
315
        transition_state = -2; // means we are switching to a SLEEP_PSTATE
316
    }
317

318 319 320 321

    // The pstate is set to an invalid one to know the machines are in transition.
    data->context->pstate_tracer.add_pstate_change(MSG_get_clock(), message->machine_ids,
                                                   transition_state);
322 323 324 325 326 327 328 329 330 331 332 333

    // Let's mark that some switches have been requested
    data->context->nb_grouped_switches++;
    data->context->nb_machine_switches += message->machine_ids.size();

    for (auto machine_it = message->machine_ids.elements_begin();
         machine_it != message->machine_ids.elements_end();
         ++machine_it)
    {
        const int machine_id = *machine_it;
        Machine * machine = data->context->machines[machine_id];
        int curr_pstate = MSG_host_get_pstate(machine->host);
334

335
        if (machine->pstates[curr_pstate] == PStateType::COMPUTATION_PSTATE)
336
        {
337
            if (machine->pstates[message->new_pstate] == PStateType::COMPUTATION_PSTATE)
338
            {
339 340 341 342 343
                XBT_INFO("Switching machine %d ('%s') pstate : %d -> %d.", machine->id,
                         machine->name.c_str(), curr_pstate, message->new_pstate);
                MSG_host_set_pstate(machine->host, message->new_pstate);
                xbt_assert(MSG_host_get_pstate(machine->host) == message->new_pstate);

344
                IntervalSet all_switched_machines;
345 346 347
                if (data->context->current_switches.mark_switch_as_done(machine->id, message->new_pstate,
                                                                        all_switched_machines,
                                                                        data->context))
348
                {
349 350 351
                    data->context->proto_writer->append_resource_state_changed(all_switched_machines,
                                                                               std::to_string(message->new_pstate),
                                                                               MSG_get_clock());
352
                }
353
            }
354
            else if (machine->pstates[message->new_pstate] == PStateType::SLEEP_PSTATE)
355
            {
356
                machine->update_machine_state(MachineState::TRANSITING_FROM_COMPUTING_TO_SLEEPING);
357

358
                string pname = "switch ON " + to_string(machine_id);
359 360
                simgrid::s4u::Actor::create(pname.c_str(), machine->host, switch_off_machine_process,
                                            data->context, machine_id, message->new_pstate);
361

362
                ++data->nb_switching_machines;
Millian Poquet's avatar
Millian Poquet committed
363
            }
364
            else
365
            {
366 367
                XBT_ERROR("Switching from a communication pstate to an invalid pstate on machine %d ('%s') : %d -> %d",
                          machine->id, machine->name.c_str(), curr_pstate, message->new_pstate);
368
            }
369 370
        }
        else if (machine->pstates[curr_pstate] == PStateType::SLEEP_PSTATE)
371
        {
372 373 374
            xbt_assert(machine->pstates[message->new_pstate] == PStateType::COMPUTATION_PSTATE,
                    "Switching from a sleep pstate to a non-computation pstate on machine %d ('%s') : %d -> %d, which is forbidden",
                    machine->id, machine->name.c_str(), curr_pstate, message->new_pstate);
375

376
            machine->update_machine_state(MachineState::TRANSITING_FROM_SLEEPING_TO_COMPUTING);
377

378
            string pname = "switch OFF " + to_string(machine_id);
379 380
            simgrid::s4u::Actor::create(pname.c_str(), machine->host, switch_on_machine_process,
                                        data->context, machine_id, message->new_pstate);
381

382 383 384
            ++data->nb_switching_machines;
        }
        else
385
        {
386
            XBT_ERROR("Machine %d ('%s') has an invalid pstate : %d", machine->id, machine->name.c_str(), curr_pstate);
387
        }
388
    }
389

390
    if (data->context->trace_machine_states)
391
    {
392
        data->context->machine_state_tracer.write_machine_states(MSG_get_clock());
393
    }
394
}
395

396 397 398 399 400 401 402
void server_on_waiting_done(ServerData * data,
                            IPMessage * task_data)
{
    (void) task_data;
    data->context->proto_writer->append_requested_call(MSG_get_clock());
    --data->nb_waiters;
}
403

404 405 406 407 408
void server_on_sched_ready(ServerData * data,
                           IPMessage * task_data)
{
    (void) task_data;
    data->sched_ready = true;
Millian Poquet's avatar
Millian Poquet committed
409 410 411 412 413

    if (data->end_of_simulation_sent)
    {
        data->end_of_simulation_ack_received = true;
    }
414
}
415

416 417 418 419 420 421
void server_on_sched_wait_answer(ServerData * data,
                                 IPMessage * task_data)
{
    (void) data;
    SchedWaitAnswerMessage * message = new SchedWaitAnswerMessage;
    *message = *( (SchedWaitAnswerMessage *) task_data->data);
422

423 424 425 426
    //    Submitter * submitter = origin_of_wait_queries.at({message->nb_resources,message->processing_time});
    dsend_message(message->submitter_name, IPMessageType::SCHED_WAIT_ANSWER, (void*) message);
    //    origin_of_wait_queries.erase({message->nb_resources,message->processing_time});
}
427

428 429 430 431
void server_on_sched_tell_me_energy(ServerData * data,
                                    IPMessage * task_data)
{
    (void) task_data;
432 433 434 435
    xbt_assert(data->context->energy_used,
               "Received a request about the energy consumption of the "
               "machines but energy simulation is not enabled. "
               "Try --help to enable it.");
436
    long double total_consumed_energy = data->context->machines.total_consumed_energy(data->context);
437
    data->context->proto_writer->append_answer_energy(total_consumed_energy, MSG_get_clock());
438
}
439

440 441 442 443 444 445
void server_on_wait_query(ServerData * data,
                          IPMessage * task_data)
{
    (void) data;
    (void) task_data;
    //WaitQueryMessage * message = (WaitQueryMessage *) task_data->data;
446

447 448
    //    XBT_INFO("received : %s , %s\n", to_string(message->nb_resources).c_str(), to_string(message->processing_time).c_str());
    xbt_assert(false, "Unimplemented! TODO");
449

450 451 452
    //Submitter * submitter = submitters.at(message->submitter_name);
    //origin_of_wait_queries[{message->nb_resources,message->processing_time}] = submitter;
}
453

454 455
void server_on_switched(ServerData * data,
                        IPMessage * task_data)
456 457
{
    xbt_assert(task_data->data != nullptr);
458
    SwitchMessage * message = (SwitchMessage *) task_data->data;
459

460 461 462 463
    xbt_assert(data->context->machines.exists(message->machine_id));
    Machine * machine = data->context->machines[message->machine_id];
    (void) machine; // Avoids a warning if assertions are ignored
    xbt_assert(MSG_host_get_pstate(machine->host) == message->new_pstate);
464

465
    IntervalSet all_switched_machines;
466 467 468 469
    if (data->context->current_switches.mark_switch_as_done(message->machine_id, message->new_pstate,
                                                            all_switched_machines, data->context))
    {
        if (data->context->trace_machine_states)
470
        {
471
            data->context->machine_state_tracer.write_machine_states(MSG_get_clock());
472
        }
473

474 475 476 477
        data->context->proto_writer->append_resource_state_changed(all_switched_machines,
                                                                   std::to_string(message->new_pstate),
                                                                   MSG_get_clock());
    }
478

479 480
    --data->nb_switching_machines;
}
481

482 483 484 485 486
void server_on_killing_done(ServerData * data,
                            IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
    KillingDoneMessage * message = (KillingDoneMessage *) task_data->data;
487

488 489 490
    vector<string> job_ids_str;
    vector<string> really_killed_job_ids_str;
    job_ids_str.reserve(message->jobs_ids.size());
491
    map<string, BatTask *> jobs_progress_str;
492

493
    // manage job Id list
494 495 496
    for (const JobIdentifier & job_id : message->jobs_ids)
    {
        job_ids_str.push_back(job_id.to_string());
497

498 499 500
        // store job progress from BatTask tree in str
        jobs_progress_str[job_id.to_string()] = message->jobs_progress[job_id];

501
        const Job * job = data->context->workloads.job_at(job_id);
502
        if (job->state == JobState::JOB_STATE_COMPLETED_KILLED)
503
        {
504 505 506 507
            data->nb_running_jobs--;
            xbt_assert(data->nb_running_jobs >= 0);
            data->nb_completed_jobs++;
            xbt_assert(data->nb_completed_jobs + data->nb_running_jobs <= data->nb_submitted_jobs);
508

509
            really_killed_job_ids_str.push_back(job_id.to_string());
510

Millian Poquet's avatar
Millian Poquet committed
511
            // also add a job complete message for the jobs that have really been
512 513 514 515 516 517 518
            // killed
            data->context->proto_writer->append_job_completed(
                job->id.to_string(),
                job_state_to_string(job->state),
                job->allocation.to_string_hyphen(" "),
                job->return_code,
                MSG_get_clock());
519 520
        }
    }
521

522 523 524
    XBT_INFO("Jobs {%s} have been killed (the following ones have REALLY been killed: {%s})",
             boost::algorithm::join(job_ids_str, ",").c_str(),
             boost::algorithm::join(really_killed_job_ids_str, ",").c_str());
525

526
    data->context->proto_writer->append_job_killed(job_ids_str, jobs_progress_str, MSG_get_clock());
527
    --data->nb_killers;
528

Millian Poquet's avatar
Millian Poquet committed
529
    check_simulation_finished(data);
530
}
531

532
void server_on_end_dynamic_register(ServerData * data,
533 534 535
                                  IPMessage * task_data)
{
    (void) task_data;
Millian Poquet's avatar
Millian Poquet committed
536

537
    data->context->registration_sched_finished = true;
538

Millian Poquet's avatar
Millian Poquet committed
539
    check_simulation_finished(data);
540 541
}

542
void server_on_continue_dynamic_register(ServerData * data,
543 544 545
                                  IPMessage * task_data)
{
    (void) task_data;
546
    data->context->registration_sched_finished = false;
547

Millian Poquet's avatar
Millian Poquet committed
548
    check_simulation_finished(data);
549 550
}

551
void server_on_register_job(ServerData * data,
552 553 554
                          IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
555
    JobRegisteredByDPMessage * message = (JobRegisteredByDPMessage *) task_data->data;
556 557 558

    // Let's update global states
    ++data->nb_submitted_jobs;
559

MERCIER Michael's avatar
MERCIER Michael committed
560 561 562
    xbt_assert(data->context->workloads.exists(message->job_id.workload_name),
               "Internal error: Workload '%s' should exist.",
               message->job_id.workload_name.c_str());
563 564
    xbt_assert(!data->context->workloads.job_is_registered(message->job_id),
               "Cannot register new job '%s', it already exists in the workload.", message->job_id.to_string().c_str());
565

566 567
    Workload * workload = data->context->workloads.at(message->job_id.workload_name);

568 569 570 571 572 573 574
    // Create the job.
    XBT_DEBUG("Parsing user-submitted job %s", message->job_id.to_string().c_str());
    Job * job = Job::from_json(message->job_description, workload,
                               "Invalid JSON job submitted by the scheduler");
    xbt_assert(job->id.job_name == message->job_id.job_name, "Internal error");
    xbt_assert(job->id.workload_name == message->job_id.workload_name, "Internal error");

575 576 577
    if (!workload->profiles->exists(job->profile))
    {
        xbt_die(
578
                   "Dynamically registered job '%s' has no profile: "
579
                   "Workload '%s' has no profile named '%s'. "
580
                   "When registering a dynamic job, its profile should already exist. "
581
                   "If the profile is also dynamic, it can be registered with the REGISTER_PROFILE "
582
                   "message but you must ensure that the profile is sent (non-strictly) before "
583
                   "the REGISTER_JOB message.",
584 585 586
                   job->id.to_string().c_str(),
                   workload->name.c_str(), job->profile.c_str());
    }
587 588

    workload->check_single_job_validity(job);
589 590
    workload->jobs->add_job(job);
    job->state = JobState::JOB_STATE_SUBMITTED;
591

592
    if (data->context->registration_sched_ack)
593
    {
594
        // TODO Sleep until submit time is reached before sending the ack (JOB_SUBMITTED)
595
        string job_json_description, profile_json_description;
596

597
        if (!data->context->redis_enabled)
598
        {
599 600
            job_json_description = job->json_description;
            if (data->context->submission_forward_profiles)
601
            {
602
                profile_json_description = job->workload->profiles->at(job->profile)->json_description;
603
            }
604
        }
605

606 607
        data->context->proto_writer->append_job_submitted(job->id.to_string(),
                                                          job_json_description,
608 609 610 611
                                                          profile_json_description,
                                                          MSG_get_clock());
    }
}
612

613
void server_on_register_profile(ServerData * data,
614 615 616
                          IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
617
    ProfileRegisteredByDPMessage * message = (ProfileRegisteredByDPMessage *) task_data->data;
618 619
    (void) message;

620 621 622 623 624 625 626 627 628 629 630 631
    // Retrieve the workload, or create if it does not exist yet
    Workload * workload = nullptr;
    if (data->context->workloads.exists(message->workload_name))
    {
        workload = data->context->workloads.at(message->workload_name);
    }
    else
    {
        workload = Workload::new_dynamic_workload(message->workload_name);
        data->context->workloads.insert_workload(workload->name, workload);
    }

632
    XBT_DEBUG("New dynamically registered profile %s to workload %s",
633 634
                message->profile_name.c_str(),
                message->workload_name.c_str());
635

636 637
    if (!workload->profiles->exists(message->profile_name))
    {
638
        XBT_INFO("Adding dynamically registered profile %s to workload %s",
639 640 641 642 643 644 645 646 647
                message->profile_name.c_str(),
                message->workload_name.c_str());
        Profile * profile = Profile::from_json(message->profile_name,
                                               message->profile,
                                               "Invalid JSON profile received from the scheduler");
        workload->profiles->add_profile(message->profile_name, profile);
    }
    else
    {
648
        xbt_die("Invalid profile registration: the profile '%s' of workload '%s' that was registered has already been registered (old profile: %s)",
649 650 651 652
            message->profile_name.c_str(),
            message->workload_name.c_str(),
            workload->profiles->at(message->profile_name)->json_description.c_str());
    }
653 654
}

655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671
void server_on_set_job_metadata(ServerData * data,
                                IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
    SetJobMetadataMessage * message = (SetJobMetadataMessage *)task_data->data;

    JobIdentifier job_identifier = JobIdentifier(message->job_id);
    if (!(data->context->workloads.job_is_registered(job_identifier)))
    {
        xbt_die("The job '%s' does not exist, cannot set its metadata", message->job_id.to_string().c_str());
    }

    Job * job = data->context->workloads.job_at(job_identifier);
    job->metadata = message->metadata;
    XBT_DEBUG("Metadata of job '%s' has been set", message->job_id.to_string().c_str());
}

672
void server_on_change_job_state(ServerData * data,
Millian Poquet's avatar
Millian Poquet committed
673
                                IPMessage * task_data)
674 675 676 677
{
    xbt_assert(task_data->data != nullptr);
    ChangeJobStateMessage * message = (ChangeJobStateMessage *) task_data->data;

678 679 680 681
    if (!(data->context->workloads.job_is_registered(message->job_id)))
    {
        xbt_die("The job '%s' does not exist.", message->job_id.to_string().c_str());
    }
682 683
    Job * job = data->context->workloads.job_at(message->job_id);

MERCIER Michael's avatar
MERCIER Michael committed
684 685 686
    XBT_INFO("Change job state: Job %s to state %s",
             job->id.to_string().c_str(),
             message->job_state.c_str());
687

688
    JobState new_state = job_state_from_string(message->job_state);
689

Millian Poquet's avatar
Millian Poquet committed
690 691
    switch (job->state)
    {
692
    case JobState::JOB_STATE_SUBMITTED:
Millian Poquet's avatar
Millian Poquet committed
693 694
        switch (new_state)
        {
695 696 697 698 699 700 701 702 703 704
        case JobState::JOB_STATE_RUNNING:
            job->starting_time = MSG_get_clock();
            data->nb_running_jobs++;
            xbt_assert(data->nb_running_jobs <= data->nb_submitted_jobs);
            break;
        case JobState::JOB_STATE_REJECTED:
            data->nb_completed_jobs++;
            xbt_assert(data->nb_completed_jobs + data->nb_running_jobs <= data->nb_submitted_jobs);
            break;
        default:
Millian Poquet's avatar
Millian Poquet committed
705 706 707
            xbt_assert(false,
                       "Can only change the state of a submitted job to running or rejected. "
                       "State was %s", job_state_to_string(job->state).c_str());
708 709 710
        }
        break;
    case JobState::JOB_STATE_RUNNING:
Millian Poquet's avatar
Millian Poquet committed
711 712
        switch (new_state)
        {
713
        case JobState::JOB_STATE_COMPLETED_SUCCESSFULLY:
714
        case JobState::JOB_STATE_COMPLETED_FAILED:
715
        case JobState::JOB_STATE_COMPLETED_WALLTIME_REACHED:
716 717 718 719 720 721 722 723
        case JobState::JOB_STATE_COMPLETED_KILLED:
            job->runtime = MSG_get_clock() - job->starting_time;
            data->nb_running_jobs--;
            xbt_assert(data->nb_running_jobs >= 0);
            data->nb_completed_jobs++;
            xbt_assert(data->nb_completed_jobs + data->nb_running_jobs <= data->nb_submitted_jobs);
            break;
        default:
Millian Poquet's avatar
Millian Poquet committed
724 725 726 727
            xbt_assert(false,
                       "Can only change the state of a running job to completed "
                       "(successfully, failed, and killed). State was %s",
                       job_state_to_string(job->state).c_str());
728 729 730
        }
        break;
    default:
Millian Poquet's avatar
Millian Poquet committed
731 732 733
        xbt_assert(false,
                   "Can only change the state of a submitted or running job. State was %s",
                   job_state_to_string(job->state).c_str());
734
    }
Millian Poquet's avatar
Millian Poquet committed
735

736 737
    job->state = new_state;

Millian Poquet's avatar
Millian Poquet committed
738
    check_simulation_finished(data);
739 740
}

741 742 743 744 745 746
void server_on_to_job_msg(ServerData * data,
                          IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
    ToJobMessage * message = (ToJobMessage *) task_data->data;

747 748 749 750 751
    if (!(data->context->workloads.job_is_registered(message->job_id)))
    {
        xbt_die("The job '%s' does not exist, cannot send a message to that job.",
                message->job_id.to_string().c_str());
    }
752 753
    Job * job = data->context->workloads.job_at(message->job_id);

754
    XBT_INFO("Send message to job: Job '%s' message='%s'",
MERCIER Michael's avatar
MERCIER Michael committed
755
             job->id.to_string().c_str(),
Millian Poquet's avatar
Millian Poquet committed
756
             message->message.c_str());
757 758 759

    job->incoming_message_buffer.push_back(message->message);

Millian Poquet's avatar
Millian Poquet committed
760
    check_simulation_finished(data);
761 762
}

763 764 765 766 767 768 769 770
void server_on_from_job_msg(ServerData * data,
                          IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
    FromJobMessage * message = (FromJobMessage *) task_data->data;

    Job * job = data->context->workloads.job_at(message->job_id);

MERCIER Michael's avatar
MERCIER Michael committed
771 772
    XBT_INFO("Send message to scheduler: Job %s",
             job->id.to_string().c_str());
773 774

    data->context->proto_writer->append_from_job_message(message->job_id.to_string(),
Millian Poquet's avatar
Millian Poquet committed
775 776
                                                         message->message,
                                                         MSG_get_clock());
777

Millian Poquet's avatar
Millian Poquet committed
778
    check_simulation_finished(data);
779 780
}

781 782 783 784 785
void server_on_reject_job(ServerData * data,
                          IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
    JobRejectedMessage * message = (JobRejectedMessage *) task_data->data;
786

787 788 789 790 791
    if (!(data->context->workloads.job_is_registered(message->job_id)))
    {
        xbt_die("Job '%s' does not exist.", message->job_id.to_string().c_str());
    }

792
    Job * job = data->context->workloads.job_at(message->job_id);
793 794 795 796 797 798
    (void) job; // Avoids a warning if assertions are ignored
    xbt_assert(job->state == JobState::JOB_STATE_SUBMITTED,
               "Invalid rejection received: job '%s' cannot be rejected at the present time. "
               "To be rejected, a job must be submitted and not allocated yet.",
               job->id.to_string().c_str());

799 800
    job->state = JobState::JOB_STATE_REJECTED;
    data->nb_completed_jobs++;
801

MOMMESSIN Clement's avatar
MOMMESSIN Clement committed
802
    XBT_INFO("Job '%s' has been rejected",
MERCIER Michael's avatar
MERCIER Michael committed
803
             job->id.to_string().c_str());
804

Millian Poquet's avatar
Millian Poquet committed
805
    check_simulation_finished(data);
806
}
807

808 809 810 811 812
void server_on_kill_jobs(ServerData * data,
                         IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
    KillJobMessage * message = (KillJobMessage *) task_data->data;
813

814
    std::vector<JobIdentifier> jobs_ids_to_kill;
815

816
    for (const JobIdentifier & job_id : message->jobs_ids)
817
    {
818 819 820
        xbt_assert(data->context->workloads.job_is_registered(job_id),
                   "Trying to kill job '%s' but it does not exist.", job_id.to_string().c_str());

821
        Job * job = data->context->workloads.job_at(job_id);
822 823 824 825 826

        // Let's discard jobs whose kill has already been requested
        if (!job->kill_requested)
        {
            // Let's check the job state
827
            xbt_assert(job->state == JobState::JOB_STATE_RUNNING || job->is_complete(),
828 829 830 831 832 833 834
                       "Invalid KILL_JOB: job_id '%s' refers to a job not being executed nor completed.",
                       job_id.to_string().c_str());

            // Let's mark that the job kill has been requested
            job->kill_requested = true;

            // The job is included in the killer_process arguments
835
            jobs_ids_to_kill.push_back(job_id);
836
        }
837 838
    }

839 840
    simgrid::s4u::Actor::create("killer_process", simgrid::s4u::this_actor::get_host(),
                                killer_process, data->context, jobs_ids_to_kill);
841 842
    ++data->nb_killers;
}
843

844 845 846 847 848
void server_on_call_me_later(ServerData * data,
                             IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
    CallMeLaterMessage * message = (CallMeLaterMessage *) task_data->data;
849

850 851 852
    xbt_assert(message->target_time > MSG_get_clock(),
               "You asked to be awaken in the past! (you ask: %f, it is: %f)",
               message->target_time, MSG_get_clock());
853

854
    string pname = "waiter " + to_string(message->target_time);
855 856 857
    simgrid::s4u::Actor::create(pname.c_str(),
                                data->context->machines.master_machine()->host,
                                waiter_process, message->target_time, data);
858 859
    ++data->nb_waiters;
}
860

861 862 863 864 865 866
void server_on_execute_job(ServerData * data,
                           IPMessage * task_data)
{
    xbt_assert(task_data->data != nullptr);
    ExecuteJobMessage * message = (ExecuteJobMessage *) task_data->data;
    SchedulingAllocation * allocation = message->allocation;
867

868 869 870 871 872 873
    xbt_assert(data->context->workloads.job_is_registered(allocation->job_id),
               "Trying to execute job '%s', which is not registered in the workload!",
               allocation->job_id.to_string().c_str());

    xbt_assert(data->context->workloads.job_profile_is_registered(allocation->job_id),
               "Trying to execute job '%s', in which the profile is not registered in the workload!",
874
               allocation->job_id.to_string().c_str());
875

876 877
    Job * job = data->context->workloads.job_at(allocation->job_id);
    xbt_assert(job->state == JobState::JOB_STATE_SUBMITTED,
878
               "Cannot execute job '%s': its state (%s) is not JOB_STATE_SUBMITTED.",
Millian Poquet's avatar
Millian Poquet committed
879
               job->id.to_string().c_str(), job_state_to_string(job->state).c_str());
880

881
    job->state = JobState::JOB_STATE_RUNNING;
882

883 884
    data->nb_running_jobs++;
    xbt_assert(data->nb_running_jobs <= data->nb_submitted_jobs);
885

886
    if (!data->context->allow_compute_sharing || !data->context->allow_storage_sharing)
887 888
    {
        for (auto machine_id_it = allocation->machine_ids.elements_begin(); machine_id_it != allocation->machine_ids.elements_end(); ++machine_id_it)
889
        {
890 891
            int machine_id = *machine_id_it;
            const Machine * machine = data->context->machines[machine_id];
892
            if (machine->has_role(roles::Permissions::COMPUTE_NODE) && !data->context->allow_compute_sharing)
893 894 895 896 897 898 899 900
            {
                (void) machine; // Avoids a warning if assertions are ignored
                xbt_assert(machine->jobs_being_computed.empty(),
                           "Job '%s': Invalid allocation: machine %d ('%s') is currently computing jobs (these ones:"
                           " {%s}) whereas time-sharing on compute machines is disabled (try --help to display the available options).",
                           job->id.to_string().c_str(), machine->id, machine->name.c_str(),
                           machine->jobs_being_computed_as_string().c_str());
            }
901
            if (machine->has_role(roles::Permissions::STORAGE) && !data->context->allow_storage_sharing)
902 903 904 905 906 907 908 909
            {
                (void) machine; // Avoids a warning if assertions are ignored
                xbt_assert(machine->jobs_being_computed.empty(),
                           "Job '%s': Invalid allocation: machine %d ('%s') is currently computing jobs (these ones:"
                           " {%s}) whereas time-sharing on storage machines is disabled (try --help to display the available options).",
                           job->id.to_string().c_str(), machine->id, machine->name.c_str(),
                           machine->jobs_being_computed_as_string().c_str());
            }
910
        }
911
    }
912

913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930
    if (data->context->energy_used)
    {
        // Check that every machine is in a computation pstate
        for (auto machine_id_it = allocation->machine_ids.elements_begin(); machine_id_it != allocation->machine_ids.elements_end(); ++machine_id_it)
        {
            int machine_id = *machine_id_it;
            Machine * machine = data->context->machines[machine_id];
            int ps = MSG_host_get_pstate(machine->host);
            (void) ps; // Avoids a warning if assertions are ignored
            xbt_assert(machine->has_pstate(ps));
            xbt_assert(machine->pstates[ps] == PStateType::COMPUTATION_PSTATE,
                       "Invalid job allocation: machine %d ('%s') is not in a computation pstate (ps=%d)",
                       machine->id, machine->name.c_str(), ps);
            xbt_assert(machine->state == MachineState::COMPUTING || machine->state == MachineState::IDLE,
                       "Invalid job allocation: machine %d ('%s') cannot compute jobs now (the machine is"
                       " neither computing nor being idle)", machine->id, machine->name.c_str());
        }
    }
931

932
    // Only PARALLEL_HOMOGENEOUS_TOTAL_AMOUNT profile, or a sequence of
933 934 935 936 937 938 939 940 941 942 943 944 945 946 947
    // those profile, is able to manage the following scenario: The scheduler
    // allocated a different number of resources than the number of requested
    // resources.
    Profile* current_profile = job->workload->profiles->at(job->profile);

    // Check for sequence profiles
    std::vector<std::string> profile_seq;
    bool all_profiles_ok = true;
    if (current_profile->type == ProfileType::SEQUENCE)
    {
        profile_seq = ((SequenceProfileData *) current_profile->data)->sequence;
        auto first = profile_seq.begin();
        auto last = profile_seq.end();
        while (first != last and all_profiles_ok)
        {
948
            if (job->workload->profiles->at(*first)->type != ProfileType::PARALLEL_HOMOGENEOUS_TOTAL_AMOUNT)
949 950 951 952 953 954 955
            {
                all_profiles_ok = false;
            }
            ++first;
        }
    }

956
    if (current_profile->type != ProfileType::PARALLEL_HOMOGENEOUS_TOTAL_AMOUNT
957
            and (current_profile->type == ProfileType::SEQUENCE and not all_profiles_ok))
958
    {
959 960
        if (allocation->mapping.size() != 0)
        {
961
            xbt_assert((unsigned int)allocation->mapping.size() == job->requested_nb_res,
962 963 964
                       "Job '%s' allocation is invalid. The decision process set a custom mapping for this job, "
                       "but the custom mapping size (%d) does not match the job requested number of machines (%d).",
                       job->id.to_string().c_str(), (int)allocation->mapping.size(), job->requested_nb_res);
965 966 967
        }
        else
        {
968
            xbt_assert((unsigned int)allocation->machine_ids.size() == job->requested_nb_res,
969
                       "Job '%s' allocation is invalid. The job requires %d machines but only %d were given (%s). "
970 971 972
                       "Using a different number of machines than the one requested is prevented by default. "
                       "If you meant to use multiple executors per machine, please specify a custom execution mapping "
                       "specifying which allocated machine each executor should use.",
973 974 975
                       job->id.to_string().c_str(), job->requested_nb_res, (int)allocation->machine_ids.size(),
                       allocation->machine_ids.to_string_hyphen().c_str());
        }
976
    }
977

978
    string pname = "job_" + job->id.to_string();
Millian Poquet's avatar
Millian Poquet committed
979 980 981 982
    auto actor = simgrid::s4u::Actor::create(pname.c_str(),
                                             data->context->machines[allocation->machine_ids.first_element()]->host,
                                             execute_job_process, data->context, allocation, true, message->io_profile);
    job->execution_actors.insert(actor);
983
}
984

Millian Poquet's avatar
Millian Poquet committed
985 986 987
bool is_simumation_finished(const ServerData * data)
{
    return (data->nb_submitters_finished == data->expected_nb_submitters) && // All static submitters have finished
988
           (!data->context->registration_sched_enabled || data->context->registration_sched_finished) && // Dynamic submissions are disabled or finished