server.cxx 57.8 KB
Newer Older
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
1
2
// TODOS:
// TODO 1. refactoring
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
3
//   DONE: use zmq cpp?? --> no
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
4
5
// lower depth of if trees!
// no long comments
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
6
// TODO 2. error prone ness
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
7
8
// TODO: check ret values!
// TODO: heavely test fault tollerance with a good testcase.
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
9
// TODO 3. check with real world sim and DA.
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
10
// TODO: clean up L logs and D debug logs
11
12
13
//
// TODO: check for other erase bugs...(erasing from a container while iterating over
// the same container)
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
14

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
15
16


17
18
#include <map>
#include <string>
19
#include <cstring>
20
21
22
#include <cstdlib>
#include <cassert>

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
23
#include <utility>
24
#include <vector>
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
25
26
27
#include <list>
#include <memory>
#include <algorithm>
28

29
30
#include "zmq.h"

31
32
#include "melissa-da_config.h"

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
33
#include "Field.h"
34
35
36
#ifdef WITH_FTI
#   include "FTmodule.h"
#endif
37
38
39
#include "messages.h"
#include "Part.h"
#include "utils.h"
40
41
42
43

#include "melissa_utils.h"  // melissa_utils from melissa-sa for melissa_get_node_name
#include "melissa_messages.h"

44
#include "memory.h"
Kai Keller's avatar
Kai Keller committed
45
#include "MpiManager.h"
46

47
48
#include <time.h>

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
49
50
51
52
// one could also use an set which might be faster but we would need to
// define hash functions for all the classes we puit in this container.
#include <set>

53
54
#include "Assimilator.h"

55
#include "ServerTiming.h"
56

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
57
#include "LauncherConnection.h"
Kai Keller's avatar
Kai Keller committed
58

59

60
61
62
63
// Activate this include manually before build if you need score-p  user regions for
// polling. At the same time you will need to use the scorep wrappers for compilation.
// see compile.sh and profiling/ for more information
#if 0
64
65
66
67
68
69
70
#include <scorep/SCOREP_User.h>
#else
#define SCOREP_USER_REGION_DEFINE(...)
#define SCOREP_USER_REGION_BEGIN(...)
#define SCOREP_USER_REGION_END(...)
#endif

71
72
73
extern int ENSEMBLE_SIZE;

int ENSEMBLE_SIZE = 5;
74

75

76
#ifdef WITH_FTI
Kai Keller's avatar
Kai Keller committed
77
FTmodule FT;
78
#endif
Kai Keller's avatar
Kai Keller committed
79
MpiManager mpi;
80

81
82
AssimilatorType ASSIMILATOR_TYPE=ASSIMILATOR_DUMMY;

83
84
85



86
std::shared_ptr<LauncherConnection> launcher;
87
88
89



90
// in seconds:
91
long long MAX_RUNNER_TIMEOUT = 5;
92

93
const int TAG_NEW_TASK = 42;
94
const int TAG_KILL_RUNNER = 43;
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
95
96
97
const int TAG_RANK_FINISHED = 44;
const int TAG_ALL_FINISHED = 45;

98
99
int highest_received_task_id = 0;

100

101
102
// only important on ranks != 0:
int highest_sent_task_id = 0;
103

104
size_t IDENTITY_SIZE = 0;
105

106
107
108
void * context;
void * data_response_socket;

109
110
unsigned int assimilation_cycles = 0;  // only used for logging stats at the end.

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
111
112
113
/// Will be counted as announced by chosen assimilator. This does not forcibly correspond
/// to any time counting as done in the runner code
/// Will also be checkpointed to restart...
114
int current_step = 0;  // will effectively start at 1.
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
115

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
116
int current_nsteps = 1;  // this is important if there are less model task runners than ensemble members. for every model task runner at the beginning an ensemble state will be generated.
117

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
118
int get_due_date() {
119
120
121
122
    time_t seconds;
    seconds = time (NULL);
    // low: we might use a bigger data type here...
    return static_cast<int>(seconds + MAX_RUNNER_TIMEOUT);
123
}
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
124

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
125
126
struct Task
{
127
128
    int state_id;
    int runner_id;
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
129
130
131
};

bool operator<(const Task &lhs, const Task &rhs) {
Kai Keller's avatar
works    
Kai Keller committed
132
133
    return lhs.state_id < rhs.state_id || (lhs.state_id == rhs.state_id &&
                                           lhs.runner_id < rhs.runner_id);
134
135
}

136
std::set<Task> killed;  // when a runner from this list connects we thus respond with a kill message. if one rank receives a kill message it has to call exit so all runner ranks are quit.
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
137

138

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
139
std::unique_ptr<Field> field(nullptr);
140
141

#ifdef REPORT_TIMING
142
std::unique_ptr<ServerTiming> timing(nullptr);
143
#endif
144

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
145
void my_free(void * data, void * hint)
146
{
147
    free(data);
148
}
Kai Keller's avatar
works    
Kai Keller committed
149

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
150
151
struct RunnerRankConnection
{
152
    void * connection_identity;
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
153

154
155
156
    RunnerRankConnection(void * identity) {
        connection_identity = identity;
    }
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
157

158
159
160
    void launch_sub_task(const int runner_rank, const int state_id) {
        // get state and send it to the runner rank...
        assert(connection_identity != NULL);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
161

162
163
164
165
        zmq_msg_t identity_msg;
        zmq_msg_t empty_msg;
        zmq_msg_t header_msg;
        zmq_msg_t data_msg;
166
        zmq_msg_t data_msg_hidden;
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
167

168
169
170
        zmq_msg_init_data(&identity_msg, connection_identity,
                          IDENTITY_SIZE, my_free, NULL);
        zmq_msg_send(&identity_msg, data_response_socket, ZMQ_SNDMORE);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
171

172
173
174
175
176
177
178
179
180
181
182
183
184
        zmq_msg_init(&empty_msg);
        zmq_msg_send(&empty_msg, data_response_socket, ZMQ_SNDMORE);

        ZMQ_CHECK(zmq_msg_init_size(&header_msg, 4 * sizeof(int)));
        int * header = reinterpret_cast<int*>(zmq_msg_data(
                                                  &header_msg));
        header[0] = state_id;
        header[1] = current_step;
        header[2] = CHANGE_STATE;
        header[3] = current_nsteps;
        zmq_msg_send(&header_msg, data_response_socket, ZMQ_SNDMORE);
        // we do not know when it will really send. send is non blocking!

185
        const Part & part = field->getPart(runner_rank);
186
187
188
        zmq_msg_init_data(&data_msg,
                          field->ensemble_members.at(
                              state_id).state_analysis.data() +
189
190
                          part.local_offset_server,
                          part.send_count *
191
192
                          sizeof(double), NULL, NULL);

193
194
195
196
197
        if (runner_rank == 0)
        {
            trigger(START_PROPAGATE_STATE, state_id);
        }

198
        const Part & hidden_part = field->getPartHidden(runner_rank);
199
        D("-> Server sending %lu + %lu hidden bytes for state %d, timestep=%d",
200
          part.send_count *
201
          sizeof(double),
202
          hidden_part.send_count * sizeof(double),
203
204
205
206
207
208
          header[0], header[1]);
        D(
            "local server offset %lu, local runner offset %lu, sendcount=%lu",
            field->getPart(runner_rank).local_offset_server,
            field->getPart(runner_rank).local_offset_runner,
            field->getPart(runner_rank).send_count);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
209
        // print_vector(field->ensemble_members.at(state_id).state_analysis);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
210
        D("content=[%f,%f,%f,%f,%f...]",
211
212
213
214
215
216
217
218
219
220
          field->ensemble_members.at(
              state_id).state_analysis.data()[0],
          field->ensemble_members.at(
              state_id).state_analysis.data()[1],
          field->ensemble_members.at(
              state_id).state_analysis.data()[2],
          field->ensemble_members.at(
              state_id).state_analysis.data()[3],
          field->ensemble_members.at(
              state_id).state_analysis.data()[4]);
221

222
223
224
225
226
227
228
229
230
231
        int flag;
        if (hidden_part.send_count > 0)
        {
            flag = ZMQ_SNDMORE;
        }
        else
        {
            flag = 0;
        }
        ZMQ_CHECK(zmq_msg_send(&data_msg, data_response_socket, flag));
232

233
234
235
236
237
238
239
240
241
242
243
        if (flag == ZMQ_SNDMORE)
        {
            zmq_msg_init_data(&data_msg_hidden,
                              field->ensemble_members.at(
                                  state_id).state_hidden.data() +
                              hidden_part.local_offset_server,
                              hidden_part.send_count *
                              sizeof(double), NULL, NULL);
            double * tmp = field->ensemble_members.at(
                state_id).state_hidden.data();
            tmp += hidden_part.local_offset_server;
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
244
245
246
            // D("Hidden values to send:");
            // print_vector(std::vector<double>(tmp, tmp +
            // hidden_part.send_count));
247
248
            ZMQ_CHECK(zmq_msg_send(&data_msg_hidden, data_response_socket, 0));
        }
249
250
251
252
253
254

        // close connection:
        // but do not free it. send is going to free it.
        connection_identity = NULL;
    }

255
    // TODO: clean up error messages in the case of ending runners on the api side...
256
    void stop(const int end_flag=END_RUNNER) {
257
258
259
260
261
262
263
264
265
266
267
        // some connection_identities will be 0 if some runner ranks are connected to another server rank at the moment.
        if (connection_identity == NULL)
            return;

        zmq_msg_t identity_msg;
        zmq_msg_t empty_msg;
        zmq_msg_t header_msg;

        zmq_msg_init_data(&identity_msg, connection_identity,
                          IDENTITY_SIZE, my_free, NULL);
        zmq_msg_send(&identity_msg, data_response_socket, ZMQ_SNDMORE);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
268

269
270
        zmq_msg_init(&empty_msg);
        zmq_msg_send(&empty_msg, data_response_socket, ZMQ_SNDMORE);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
271

272
        zmq_msg_init_size(&header_msg, 4 * sizeof(int));
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
273

274
275
276
277
278
279
        int * header = reinterpret_cast<int*>(zmq_msg_data(
                                                  &header_msg));
        header[0] = -1;
        header[1] = current_step;
        header[2] = end_flag;
        header[3] = 0;          // nsteps
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
280

281
        zmq_msg_send(&header_msg, data_response_socket, 0);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
282

283
        D("Send end message");
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
284

285
286
287
        // but don't delete it. this is done in the zmq_msg_send.
        connection_identity = NULL;
    }
288
289
290
};


291
struct Runner  // Server perspective of a Model task runner
292
{
293
294
    // model task runner ranks
    std::map<int, RunnerRankConnection> connected_runner_ranks;
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
295

296
    void stop(int end_flag) {
297
298
299
        for (auto cs = connected_runner_ranks.begin(); cs !=
             connected_runner_ranks.end(); cs++)
        {
300
301
            D("xxx end connected runner rank...");
            cs->second.stop(end_flag);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
302
        }
303
    }
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
304

305
306
    ~Runner() {
        // try to kill remaining runners if there are still some.
307
        stop(KILL_RUNNER);
308
    }
309

310
311
};

312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
std::map<int, std::shared_ptr<Runner> > idle_runners;  // might also contain half empty stuff

std::map<int, std::shared_ptr<Runner> >::iterator get_completely_idle_runner()
{
    // finds a runner that is completely idle. Returns idle_runners.end() if none was
    // found.
    auto result = std::find_if(idle_runners.begin(),
                                         idle_runners.end(),
                                         [](
                                             std::pair<int, std::shared_ptr<Runner>>
                                             elem){
                                          return elem.second->connected_runner_ranks.size() == field->connected_runner_ranks.size();

                                          });
    return result;
}
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
328

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
329
std::set<int> unscheduled_tasks;
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
330

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
331
// only important on rank 0:
332
// if == comm_size we start the update step. is reseted when a state is rescheduled!
333
int finished_ranks = -1;
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
334

335
/// Used to link states to runner id's
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
336
337
struct NewTask
{
338
339
340
341
    int runner_id;
    int state_id;
    int due_date;
    int task_id;
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
342
343
};

344

345
/// used to transmit new tasks to clients. A subtask is the state part whih is sent / expected to be received by this server rank from a single simulation rank
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
346
347
struct SubTask
{
348
349
350
351
352
353
354
355
356
357
358
    int runner_id;
    int runner_rank;
    int state_id;
    int due_date;
    // low: set different due dates so not all ranks communicate at the same time to the server when it gets bypassed ;)
    SubTask(NewTask &new_task, int runner_rank_) {
        runner_id = new_task.runner_id;
        runner_rank = runner_rank_;
        state_id = new_task.state_id;
        due_date = new_task.due_date;
    }
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
359
360
};

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
361
// REM: works as fifo! (that is why we are not using sets here: they do not keep the order.)
362
// fifo with tasks that are running, running already on some model task runner ranks or not
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
363
364
// these are checked for the due dates!
// if we get results for the whole task we remove it from the scheduled_tasks list.
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
365
typedef std::list<std::shared_ptr<SubTask> > SubTaskList;
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
366
SubTaskList scheduled_sub_tasks;  // could be ordered sets! this prevents us from adding 2 the same!? No would not work as (ordered) sets order by key and not by time of insertion.
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
367
SubTaskList running_sub_tasks;
368
SubTaskList finished_sub_tasks;  // TODO: why do we need to store this? actually not needed.
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
369
// TODO: extract fault tolerant n to m code to use it elsewhere?
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
370

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
371
372
373
void register_runner_id(zmq_msg_t &msg, const int * buf,
                        void * configuration_socket,
                        char * data_response_port_names) {
374
375
    static int highest_runner_id = 0;
    assert(zmq_msg_size(&msg) == sizeof(int));
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
376

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
377
    trigger(ADD_RUNNER, buf[1]);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
378

379
    D("Server registering Runner ID %d", buf[1]);
380

381
382
    zmq_msg_t msg_reply1, msg_reply2;
    zmq_msg_init_size(&msg_reply1, 3 * sizeof(int));
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
383

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
384
    // At the moment we request field registration from runner id 0. TODO! be fault tollerant during server init too? - actually we do not want to. faults during init may make it crashing...
385
    int request_register_field =  highest_runner_id == 0 ? 1 : 0;
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
386

387
388
389
390
391
    int * out_buf = reinterpret_cast<int*>(zmq_msg_data(&msg_reply1));
    out_buf[0] = highest_runner_id++;              // every model task runner gets an other runner id.
    out_buf[1] = request_register_field;
    out_buf[2] = comm_size;
    zmq_msg_send(&msg_reply1, configuration_socket, ZMQ_SNDMORE);
392

393
394
395
396
    zmq_msg_init_data(&msg_reply2, data_response_port_names,
                      comm_size * MPI_MAX_PROCESSOR_NAME * sizeof(char),
                      NULL, NULL);
    zmq_msg_send(&msg_reply2, configuration_socket, 0);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
397
398
399

}

400
401
402
std::vector<int> global_index_map;
std::vector<int> global_index_map_hidden;

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
403
404
void register_field(zmq_msg_t &msg, const int * buf,
                    void * configuration_socket)
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
405
{
406
407
408
409
    assert(phase == PHASE_INIT);              // we accept new fields only if in initialization phase.
    assert(zmq_msg_size(&msg) == sizeof(int) + sizeof(int) +
           MPI_MAX_PROCESSOR_NAME * sizeof(char));
    assert(field == nullptr);              // we accept only one field for now.
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
410

411
    int runner_comm_size = buf[1];
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
412

413
414
415
    char field_name[MPI_MAX_PROCESSOR_NAME];
    strcpy(field_name, reinterpret_cast<const char*>(&buf[2]));
    zmq_msg_close(&msg);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
416

417
418
    field = std::make_unique<Field>(field_name, runner_comm_size,
                                    ENSEMBLE_SIZE);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
419

420
421
    D("Server registering Field %s, runner_comm_size = %d", field_name,
      runner_comm_size);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
422

423
424
425
426
427
428
    assert_more_zmq_messages(configuration_socket);
    zmq_msg_init(&msg);
    zmq_msg_recv(&msg, configuration_socket, 0);
    assert(zmq_msg_size(&msg) == runner_comm_size * sizeof(size_t));
    memcpy (field->local_vect_sizes_runner.data(), zmq_msg_data(&msg),
            runner_comm_size * sizeof(size_t));
429
430
431
432
433
434
435
436
437
    zmq_msg_close(&msg);

    // always await a hidden state
    assert_more_zmq_messages(configuration_socket);
    zmq_msg_init(&msg);
    zmq_msg_recv(&msg, configuration_socket, 0);
    assert(zmq_msg_size(&msg) == runner_comm_size * sizeof(size_t));
    memcpy (field->local_vect_sizes_runner_hidden.data(), zmq_msg_data(&msg),
            runner_comm_size * sizeof(size_t));
438
439
    zmq_msg_close(&msg);

440

441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
    // always await global_index_map now
    global_index_map.resize(sum_vec(field->local_vect_sizes_runner));
    assert_more_zmq_messages(configuration_socket);
    zmq_msg_init(&msg);
    zmq_msg_recv(&msg, configuration_socket, 0);
    assert(zmq_msg_size(&msg) == global_index_map.size() * sizeof(int));
    memcpy (global_index_map.data(), zmq_msg_data(&msg),
            global_index_map.size() * sizeof(int));
    zmq_msg_close(&msg);

    global_index_map_hidden.resize(sum_vec(field->local_vect_sizes_runner_hidden));
    assert_more_zmq_messages(configuration_socket);
    zmq_msg_init(&msg);
    zmq_msg_recv(&msg, configuration_socket, 0);
    assert(zmq_msg_size(&msg) == global_index_map_hidden.size() * sizeof(int));
    memcpy (global_index_map_hidden.data(), zmq_msg_data(&msg),
            global_index_map_hidden.size() * sizeof(int));

    // scatter index map is done in broadcast field info
460
461
462
463

    // msg is closed outside by caller...


464
    field->name = field_name;
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
465

466
467
468
469
    // ack
    zmq_msg_t msg_reply;
    zmq_msg_init(&msg_reply);
    zmq_msg_send(&msg_reply, configuration_socket, 0);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
470
471
}

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
472
473
void answer_configuration_message(void * configuration_socket,
                                  char * data_response_port_names)
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
474
{
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
    zmq_msg_t msg;
    zmq_msg_init(&msg);
    zmq_msg_recv(&msg, configuration_socket, 0);
    int * buf = reinterpret_cast<int*>(zmq_msg_data(&msg));
    if (buf[0] == REGISTER_RUNNER_ID)
    {
        register_runner_id(msg, buf, configuration_socket,
                           data_response_port_names);
    }
    else if (buf[0] == REGISTER_FIELD)
    {
        register_field(msg, buf, configuration_socket);
    }
    else
    {
        // Bad message type
        assert(false);
        exit(1);
    }
    zmq_msg_close(&msg);
495
496
}

497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
void scatter_index_map(size_t global_vect_size, size_t local_vect_size, int global_index_map_data[], int local_index_map_data[])
{
    size_t local_vect_sizes_server[comm_size];
    calculate_local_vect_sizes_server(comm_size, global_vect_size,
            local_vect_sizes_server);
    int scounts[comm_size];
    // transform size_t to mpi's int
    std::copy(local_vect_sizes_server, local_vect_sizes_server+comm_size, scounts);
    int displs [comm_size];
    int last_displ = 0;
    for (int i = 0; i < comm_size; ++i)
    {
        displs[i] = last_displ;
        last_displ += scounts[i];
    }

    MPI_Scatterv( global_index_map_data, scounts, displs, MPI_INT,
            local_index_map_data, local_vect_size, MPI_INT,
            0, mpi.comm());
}

518
void broadcast_field_information_and_calculate_parts() {
519
520
521
522
523
524
525
526
527
528
529
    char field_name[MPI_MAX_PROCESSOR_NAME];
    int runner_comm_size;      // Very strange bug: if I declare this variable in the if / else scope it does not work!. it gets overwritten by the mpi_bcast for the runner_comm_size

    if (comm_rank == 0)
    {
        strcpy(field_name, field->name.c_str());
        runner_comm_size =
            field->local_vect_sizes_runner.size();
    }

    MPI_Bcast(field_name, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, 0,
Kai Keller's avatar
Kai Keller committed
530
              mpi.comm());                                                             // 1:fieldname
531
    MPI_Bcast(&runner_comm_size, 1, MPI_INT, 0,
Kai Keller's avatar
Kai Keller committed
532
              mpi.comm());                                                                     // 2:runner_comm_size
533
534
535
536
537
538

    if (comm_rank != 0)
    {
        field = std::make_unique<Field>(field_name, runner_comm_size,
                                        ENSEMBLE_SIZE);
    }
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
539

540
    D("local_vect_sizes");
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
541
    // print_vector(field->local_vect_sizes_runner);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
542

543
544
    MPI_Bcast(field->local_vect_sizes_runner.data(),
              runner_comm_size,
Kai Keller's avatar
Kai Keller committed
545
              my_MPI_SIZE_T, 0, mpi.comm());                                                             // 3:local_vect_sizes_runner
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
546

547
548
    MPI_Bcast(field->local_vect_sizes_runner_hidden.data(),
              runner_comm_size,
549
              my_MPI_SIZE_T, 0, mpi.comm());                                                             // 4:local_vect_sizes_runner_hidden
550

551
    field->calculate_parts(comm_size);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
552

553
    // 5 and 6: Scatter the field transform (index_maps)
554
555
    scatter_index_map(field->globalVectSize(), field->local_vect_size,
            global_index_map.data(), field->local_index_map.data());
556

557
558
    //printf("rank %d index map:", comm_rank);
    //print_vector(field->local_index_map);
559

560
561
    scatter_index_map(field->globalVectSizeHidden(), field->local_vect_size_hidden,
            global_index_map_hidden.data(), field->local_index_map_hidden.data());
562

563
564
    //printf("rank %d hidden index map:", comm_rank);
    //print_vector(field->local_index_map_hidden);
565
}
566

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
567
/// returns true if could send the sub_task on a connection.
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
568
bool try_launch_subtask(std::shared_ptr<SubTask> &sub_task) {
569
570
571
572
573
574
575
    // tries to send this task.
    auto found_runner = idle_runners.find(sub_task->runner_id);
    if (found_runner == idle_runners.end())
    {
        D("could not send: Did not find idle runner");
        return false;
    }
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
576

577
578
579
580
581
582
583
    auto found_rank = found_runner->second->connected_runner_ranks.find(
        sub_task->runner_rank);
    if (found_rank == found_runner->second->connected_runner_ranks.end())
    {
        D("could not send: Did not find rank");
        return false;
    }
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
584

585
    D("Send after adding subtask! to runner_id %d", sub_task->runner_id);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
586

587
588
    found_rank->second.launch_sub_task(sub_task->runner_rank,
                                       sub_task->state_id);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
589

590
591
592
593
    found_runner->second->connected_runner_ranks.erase(found_rank);
    if (found_runner->second->connected_runner_ranks.empty())
    {
        idle_runners.erase(found_runner);
594
595
596
597
    }

    if (sub_task->runner_rank == 0)
    {
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
598
        trigger(STOP_IDLE_RUNNER, sub_task->runner_id);
599
    }
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
600

601
    return true;
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
602
603
}

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
604
void kill_task(Task t) {
605
606
607
608
609
610
611
612
613
614
615
616
617
618
    L("killing state %d runner %d", t.state_id, t.runner_id);
    unscheduled_tasks.insert(t.state_id);
    idle_runners.erase(t.runner_id);
    killed.emplace(t);
    auto f = [&t](std::shared_ptr<SubTask> &task) {
                 return task->state_id == t.state_id &&
                        task->runner_id == t.runner_id;
             };
    running_sub_tasks.remove_if(f);
    finished_sub_tasks.remove_if(f);
    scheduled_sub_tasks.remove_if(f);

    // there might be more states scheduled to this runner! these will initiate their own violation and will be rescheduled later ;)
    // if we would kill just by state id we would need to synchronize the killing (what if we rescheduled already the state on the next and then the kill message is coming..... so we would kill it from the next.....)
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
619
620
}

621
/// adds a subtask for each runner rank.
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
622
// either add subtasks to list of scheduled subtasks or runs them directly adding them to running sub tasks.
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
623
void add_sub_tasks(NewTask &new_task) {
624
625
626
627
628
629
630
    int ret = unscheduled_tasks.erase(new_task.state_id);
    assert(ret == 1);
    auto &csr = field->connected_runner_ranks;
    assert(csr.size() > 0);      // connectd runner ranks must be initialized...
    for (auto it = csr.begin(); it != csr.end(); it++)
    {
        std::shared_ptr<SubTask> sub_task (new SubTask(new_task, *it));
631
        D("Adding subtask for runner rank %d", *it);
632
633

        if (try_launch_subtask(sub_task))
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
634
        {
635
636
637
638
639
            running_sub_tasks.push_back(sub_task);
        }
        else
        {
            scheduled_sub_tasks.push_back(sub_task);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
640
        }
641
    }
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
642
643
644
}

/// schedules a new task on a model task runner and tries to run it.
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
645
static int task_id = 1; // low: aftrer each update step one could reset the task id and also the highest sent task id and so on to never get overflows!
646
bool schedule_new_task(const int runner_id)
647
{
648
649
650
651
652
653
654
    assert(comm_rank == 0);
    if (unscheduled_tasks.size() <= 0)
    {
        return false;
    }
    task_id++;
    int state_id = *(unscheduled_tasks.begin());
655

656
    NewTask new_task({runner_id, state_id, get_due_date(), task_id});
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
657

658
    L("Schedule task with task id %d", task_id);
659

660
    finished_ranks = 0;
661

662
    add_sub_tasks(new_task);
663

664

665
666
667
668
669
670
    // Send new scheduled task to all server ranks! This makes sure that everybody receives it!
    MPI_Request requests[comm_size - 1];
    for (int receiving_rank = 1; receiving_rank < comm_size;
         receiving_rank++)
    {
        // REM: MPI_Ssend to be sure that all messages are received!
Kai Keller's avatar
Kai Keller committed
671
        // MPI_Ssend(&new_task, sizeof(NewTask), MPI_BYTE, receiving_rank, TAG_NEW_TASK, mpi.comm());
672
        MPI_Isend(&new_task, sizeof(NewTask), MPI_BYTE, receiving_rank,
Kai Keller's avatar
Kai Keller committed
673
                  TAG_NEW_TASK, mpi.comm(),
674
675
                  &requests[receiving_rank-1]);
    }
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
676

677
678
    int ret = MPI_Waitall(comm_size - 1, requests, MPI_STATUSES_IGNORE);
    assert(ret == MPI_SUCCESS);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
679

Kai Keller's avatar
works    
Kai Keller committed
680

681
    return true;
682
683
}

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
684
/// checks if the server added new tasks... if so tries to run them.
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
685
686
void check_schedule_new_tasks()
{
687
688
689
    assert(comm_rank != 0);
    int received;

Kai Keller's avatar
Kai Keller committed
690
    MPI_Iprobe(0, TAG_NEW_TASK, mpi.comm(), &received,
691
692
693
694
695
696
697
698
699
700
               MPI_STATUS_IGNORE);
    if (!received)
        return;

    D("Got task to send...");

    // we are not finished anymore so resend if we are finished:

    NewTask new_task;
    MPI_Recv(&new_task, sizeof(new_task), MPI_BYTE, 0, TAG_NEW_TASK,
Kai Keller's avatar
Kai Keller committed
701
             mpi.comm(), MPI_STATUS_IGNORE);
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740

    highest_received_task_id = std::max(new_task.task_id,
                                        highest_received_task_id);

    // Remove all tasks with the same id!
    // REM: we assume that we receive new_task messages in the right order! This is done by ISend on rank 0 and the wait all behind ;)
    auto f = [&new_task] (std::shared_ptr<SubTask> st) {
                 if (st->state_id == new_task.state_id)
                 {
                     bool is_new = killed.emplace(Task(
                                                      {st->
                                                       state_id,
                                                       st->
                                                       runner_id}))
                                   .second;
                     if (is_new)
                     {
                         L(
                             "The state %d was before scheduled on runner id %d as we reschedule now we know that this runnerid was killed.",
                             st->state_id, st->runner_id);
                         // REM: not necessary to resend. rank0 should already know it from its own!
                         unscheduled_tasks.insert(st->state_id);
                         idle_runners.erase(st->runner_id);
                         killed.insert(Task({st->state_id,
                                             st->runner_id}));
                     }
                     return true;
                 }
                 else
                 {
                     return false;
                 }
             };

    scheduled_sub_tasks.remove_if(f);
    running_sub_tasks.remove_if(f);
    finished_sub_tasks.remove_if(f);

    add_sub_tasks(new_task);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
741
742
743
}


744
void end_all_runners()
745
{
746
747
748
    for (auto runner_it = idle_runners.begin(); runner_it !=
         idle_runners.end(); runner_it++)
    {
749
        runner_it->second->stop(END_RUNNER);
750
    }
751
752
}

753
void init_new_timestep()
754
{
755
756
757
    size_t connections =
        field->connected_runner_ranks.size();
    // init or finished....
758
    assert(assimilation_cycles == 0 || finished_sub_tasks.size() == ENSEMBLE_SIZE *
759
           connections);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
760

761
762
    assert(scheduled_sub_tasks.size() == 0);
    assert(running_sub_tasks.size() == 0);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
763

764
765
766
767
    finished_sub_tasks.clear();
    highest_received_task_id = 0;
    highest_sent_task_id = 0;
    task_id = 1;
768

769
    current_step += current_nsteps;
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
770

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
771
    trigger(START_ITERATION, current_step);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
772
773
    for (auto it = idle_runners.begin(); it != idle_runners.end(); it++)
    {
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
774
775
776
        trigger(START_IDLE_RUNNER, it->first);
    }

777
    assert(unscheduled_tasks.size() == 0);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
778

779
780
781
782
    for (int i = 0; i < ENSEMBLE_SIZE; i++)
    {
        unscheduled_tasks.insert(i);
    }
783
}
784

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
785
void check_due_dates() {
786
787
    time_t now;
    now = time (NULL);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
788

789
    std::set<Task> to_kill;
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
790

791
792
793
794
795
796
797
    auto check_date = [&now,&to_kill] (std::shared_ptr<SubTask>& it) {
                          if (now > it->due_date)
                          {
                              to_kill.emplace(Task({it->state_id,
                                                    it->runner_id}));
                          }
                      };
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
798

799
800
801
802
    std::for_each(scheduled_sub_tasks.begin(), scheduled_sub_tasks.end(),
                  check_date);
    std::for_each(running_sub_tasks.begin(), running_sub_tasks.end(),
                  check_date);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
803

804
805
806
807
    if (to_kill.size() > 0)
    {
        L("Need to redo %lu states", to_kill.size());
    }
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
808
809


810
811
812
813
    for (auto it = to_kill.begin(); it != to_kill.end(); it++)
    {
        L("Due date passed for state id %d , runner_id %d at %lu s ",
          it->state_id, it->runner_id, now);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
814

815
        kill_task(*it);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
816

817
818
        if (comm_rank == 0)
        {
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
819
            trigger(REMOVE_RUNNER, it->runner_id);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
820

821
            // reschedule directly if possible
822
            if (get_completely_idle_runner() != idle_runners.end())
823
824
825
826
827
828
            {
                L(
                    "Rescheduling after due date violation detected by rank 0");
                // REM: schedule only once as there is only on more task after the kill it. later we might want to schedule multiple times if we clear runners that are still scheduled or running on the broken runner id...
                schedule_new_task(idle_runners.begin()->first);
            }
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
829

830
831
832
833
        }
        else
        {
            // Send to rank 0 that the runner that calcultated this state is to kill
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
834

835
            L("Sending kill request to rank 0");
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
836

837
            int buf[2] = { it->state_id, it->runner_id};
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
838
            // bsend does not work...
839
            // MPI_Bsend(buf, 2, MPI_INT, 0, TAG_KILL_RUNNER,
840
            // mpi.comm());
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
841
            // if BSend does not find memory use the send version
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
842
            MPI_Send(buf, 2, MPI_INT, 0, TAG_KILL_RUNNER,
843
                     mpi.comm());
844
            L("Finished kill request to rank 0");
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
845
        }
846
    }
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
847
848
849
}

void check_kill_requests() {
850
851
852
853
    assert(comm_rank == 0);
    for (int detector_rank = 1; detector_rank < comm_size; detector_rank++)
    {
        int received;
Kai Keller's avatar
Kai Keller committed
854
        MPI_Iprobe(detector_rank, TAG_KILL_RUNNER, mpi.comm(),
855
856
857
858
859
860
                   &received, MPI_STATUS_IGNORE);
        if (!received)
            continue;

        int buf[2];
        MPI_Recv(buf, 2, MPI_INT, detector_rank, TAG_KILL_RUNNER,
Kai Keller's avatar
Kai Keller committed
861
                 mpi.comm(), MPI_STATUS_IGNORE);
862
863
864
865
        Task t({buf[0], buf[1]});
        L("Got state_id to kill... %d, killing runner_id %d",
          t.state_id, t.runner_id);
        bool is_new = killed.emplace(t).second;
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
866
867
        if (is_new)
        {
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
868
            trigger(REMOVE_RUNNER, t.runner_id);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
869
870
        }
        else
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
871
        {
872
873
874
875
            // don't kill it a second time!
            L("I already knew this");
            continue;
        }
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
876

877
878
        // REM: do not do intelligent killing of other runners. no worries, their due dates will fail soon too ;)
        kill_task(t);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
879

880
        // reschedule directly if possible
881
        if (get_completely_idle_runner() != idle_runners.end())
882
883
884
885
886
887
        {
            L(
                "Rescheduling after due date violation detected by detector_rank %d",
                detector_rank);
            // REM: schedule only once as there is only on more task after the kill it. later we might want to schedule multiple times if we clear runners that are still scheduled or running on the broken runner id...
            schedule_new_task(idle_runners.begin()->first);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
888
        }
889
    }
890
891
}

FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
892
void handle_data_response(std::shared_ptr<Assimilator> & assimilator) {
893
    // TODO: move to send and receive function as on api side... maybe use zproto library?
894
    zmq_msg_t identity_msg, empty_msg, header_msg, data_msg, data_msg_hidden;
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
    zmq_msg_init(&identity_msg);
    zmq_msg_init(&empty_msg);
    zmq_msg_init(&header_msg);
    zmq_msg_init(&data_msg);

    zmq_msg_recv(&identity_msg, data_response_socket, 0);

    assert_more_zmq_messages(data_response_socket);
    zmq_msg_recv(&empty_msg, data_response_socket, 0);
    assert_more_zmq_messages(data_response_socket);
    zmq_msg_recv(&header_msg, data_response_socket, 0);
    assert_more_zmq_messages(data_response_socket);
    zmq_msg_recv(&data_msg, data_response_socket, 0);

    assert(IDENTITY_SIZE == 0 || IDENTITY_SIZE == zmq_msg_size(
               &identity_msg));

    IDENTITY_SIZE = zmq_msg_size(&identity_msg);

    void * identity = malloc(IDENTITY_SIZE);
    memcpy(identity, zmq_msg_data(&identity_msg), zmq_msg_size(
               &identity_msg));

    assert(zmq_msg_size(&header_msg) == 4 * sizeof(int) +
           MPI_MAX_PROCESSOR_NAME * sizeof(char));
    int * header_buf = reinterpret_cast<int*>(zmq_msg_data(&header_msg));
    int runner_id = header_buf[0];
    int runner_rank = header_buf[1];
    int runner_state_id = header_buf[2];      // = ensemble_member_id;
924
    int runner_timestep = header_buf[3];
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
    char field_name[MPI_MAX_PROCESSOR_NAME];
    strcpy(field_name, reinterpret_cast<char*>(&header_buf[4]));

    // good runner_rank, good state id?
    auto running_sub_task = std::find_if(running_sub_tasks.begin(),
                                         running_sub_tasks.end(),
                                         [runner_id, runner_rank,
                                          runner_state_id](
                                             std::shared_ptr<SubTask> &
                                             st){
        return st->runner_id == runner_id && st->state_id ==
        runner_state_id && st->runner_rank == runner_rank;
    });

    RunnerRankConnection csr(identity);
    auto found = std::find_if(killed.begin(), killed.end(), [runner_id] (
                                  Task p) {
        return p.runner_id == runner_id;
    });
    if (found != killed.end())
    {
        L(
            "Ending Model Task Runner killed by timeout violation runner=%d",
            runner_id);
949
        csr.stop(KILL_RUNNER);
950
951
952
    }
    else
    {
953
        assert(runner_timestep == 0 || running_sub_task !=
954
955
956
957
               running_sub_tasks.end());

        // This is necessary if a task was finished on rank 0. then it crashes on another rank. so rank 0 needs to undo this!
        if (running_sub_task != running_sub_tasks.end())
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
958
        {
959
            // only if we are not in timestep  0:
960
            finished_sub_tasks.push_back(*running_sub_task);
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
961

962
963
            running_sub_tasks.remove(*running_sub_task);
        }
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
964

965
966
        // good timestep? There are 2 cases: timestep 0 or good timestep...
        assert (runner_timestep == 0 || runner_timestep ==
967
968
969
970
971
                current_step);


        // Save state part in background_states.
        assert(field->name == field_name);
972
        const Part & part = field->getPart(runner_rank);
973
974
975
        assert(zmq_msg_size(&data_msg) == part.send_count *
               sizeof(double));
        D(
976
            "<- Server received %lu/%lu bytes of %s from runner id %d, runner rank %d, state id %d, timestep=%d",
977
978
979
            zmq_msg_size(&data_msg), part.send_count *
            sizeof(double),
            field_name, runner_id, runner_rank, runner_state_id,
980
            runner_timestep);
981
982
983
984
        D("local server offset %lu, sendcount=%lu",
          part.local_offset_server, part.send_count);


FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
        //D("values[0] = %.3f", reinterpret_cast<double*>(zmq_msg_data(
                                                            //&
                                                            //data_msg))
          //[0]);
        //D("values[1] = %.3f", reinterpret_cast<double*>(zmq_msg_data(
                                                            //&
                                                            //data_msg))
          //[1]);
        //D("values[2] = %.3f", reinterpret_cast<double*>(zmq_msg_data(
                                                            //&
                                                            //data_msg))
          //[2]);
        //D("values[3] = %.3f", reinterpret_cast<double*>(zmq_msg_data(
                                                            //&
                                                            //data_msg))
          //[3]);
        //D("values[4] = %.3f", reinterpret_cast<double*>(zmq_msg_data(
                                                            //&
                                                            //data_msg))
          //[4]);
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017

        const Part & hidden_part = field->getPartHidden(runner_rank);
        double * values_hidden = nullptr;

        if (hidden_part.send_count > 0)
        {
            assert_more_zmq_messages(data_response_socket);
            zmq_msg_init(&data_msg_hidden);
            zmq_msg_recv(&data_msg_hidden, data_response_socket, 0);
            assert(zmq_msg_size(&data_msg_hidden) == hidden_part.send_count *
                   sizeof(double));
            values_hidden = reinterpret_cast<double*>(zmq_msg_data(
                                                          &data_msg_hidden));
FRIEDEMANN Sebastian's avatar
FRIEDEMANN Sebastian committed
1018
1019
1020
            // D("hidden values received:");
            // print_vector(std::vector<double>(values_hidden, values_hidden +
            // hidden_part.send_count));
1021
1022
1023
        }


1024
        if (runner_timestep == current_step)
1025
        {
1026
1027