Commit bdf81a7e authored by FRIEDEMANN Sebastian's avatar FRIEDEMANN Sebastian
Browse files

Try to use zmq cpp

This was abandoned as the cpp library is not always up to date.
parent 0804499b
......@@ -12,7 +12,7 @@
#include <vector>
#include <mpi.h>
#include "zmq.h"
#include "zmq.hpp"
#include "../common/messages.h"
#include "../common/n_to_m.h"
......@@ -37,9 +37,6 @@ size_t IDENTITY_SIZE = 0;
int comm_size;
int comm_rank;
void * context;
void * data_response_socket;
int current_timestamp = 0; // will effectively start at 1.
long long get_due_date() {
......@@ -188,7 +185,7 @@ struct ConnectedSimulationRank {
/// returns true if a new was sent.
bool try_to_start_task(int simu_rank) {
bool try_to_start_task(int simu_rank, zmq::socket_t &data_response_socket) {
// already working?
if (current_task != WANT_WORK)
......@@ -208,13 +205,13 @@ struct ConnectedSimulationRank {
header[1] = current_timestamp;
header[2] = CHANGE_STATE;
zmq_msg_t identity_msg;
zmq_msg_t empty_msg;
zmq_msg_t header_msg;
zmq_msg_t data_msg;
zmq::message_t identity_msg(connection_identity, IDENTITY_SIZE, my_free, NULL);
data_response_socket.send(identity_msg, ZMQ_SNDMORE);
zmq::message_t empty_msg(0);
data_response_socket.send(empty_msg, ZMQ_SNDMORE);
zmq::message_t header_msg;
zmq::message_t data_msg;
zmq_msg_init_data(&identity_msg, connection_identity, IDENTITY_SIZE, my_free, NULL);
zmq_msg_send(&identity_msg, data_response_socket, ZMQ_SNDMORE);
zmq_msg_init(&empty_msg);
zmq_msg_send(&empty_msg, data_response_socket, ZMQ_SNDMORE);
......@@ -301,9 +298,9 @@ struct Simulation // Model process runner
}
}
void try_to_start_task() {// todo: replace fields.begin() by field as there will be only on field soon.
void try_to_start_task(zmq::socket_t &socket) {// todo: replace fields.begin() by field as there will be only on field soon.
for (auto cs = connected_simulation_ranks.begin(); cs != connected_simulation_ranks.end(); cs++) {
cs->second.try_to_start_task(cs->first);
cs->second.try_to_start_task(cs->first, socket);
}
}
......@@ -534,7 +531,7 @@ int main(int argc, char * argv[])
zmq_version (&major, &minor, &patch);
D("Current 0MQ version is %d.%d.%d", major, minor, patch);
MPI_Init(NULL, NULL);
context = zmq_ctx_new ();
zmq::context_t ctx;
MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
......@@ -547,17 +544,17 @@ int main(int argc, char * argv[])
// Start sockets:
if (comm_rank == 0)
{
configuration_socket = zmq_socket(context, ZMQ_REP);
int rc = zmq_bind(configuration_socket, "tcp://*:4000"); // to be put into MELISSA_SERVER_MASTER_NODE on simulation start
zmq::socket_t configuration_socket(ctx, ZMQ_REP);
int rc = configuration_socket.bind("tcp://*:4000"); // to be put into MELISSA_SERVER_MASTER_NODE on simulation start
ZMQ_CHECK(rc);
assert(rc == 0);
}
data_response_socket = zmq_socket(context, ZMQ_ROUTER);
zmq::socket_t data_response_socket(ctx, ZMQ_ROUTER);
char data_response_port_name[MPI_MAX_PROCESSOR_NAME];
sprintf(data_response_port_name, "tcp://*:%d", 5000+comm_rank);
zmq_bind(data_response_socket, data_response_port_name);
data_response_socket.bind(data_response_port_name);
char hostname[MPI_MAX_PROCESSOR_NAME];
melissa_get_node_name(hostname, MPI_MAX_PROCESSOR_NAME);
......@@ -697,7 +694,7 @@ int main(int argc, char * argv[])
zmq_msg_close(&data_msg);
// Check if we can answer directly with new data... means starting of a new model task
bool got_task = simu.connected_simulation_ranks[simu_rank].try_to_start_task(simu_rank);
bool got_task = simu.connected_simulation_ranks[simu_rank].try_to_start_task(simu_rank, data_response_socket);
// If we could not start a new model task try to schedule a new one. This is initiated by server rank 0
if (!got_task && comm_rank == 0)
......@@ -706,7 +703,7 @@ int main(int argc, char * argv[])
schedule_new_task(simu_id);
// try to run it directly:
simu.connected_simulation_ranks[simu_rank].try_to_start_task(simu_rank);
simu.connected_simulation_ranks[simu_rank].try_to_start_task(simu_rank, data_response_socket);
}
}
......@@ -742,7 +739,7 @@ int main(int argc, char * argv[])
{
if (schedule_new_task(simu_it->first)) {
// normally we arrive always here if there are not more model task runners than ensemble members.
simu_it->second.try_to_start_task();
simu_it->second.try_to_start_task(data_response_socket);
}
}
}
......@@ -787,7 +784,6 @@ int main(int argc, char * argv[])
{
zmq_close(configuration_socket);
}
zmq_ctx_destroy(context);
MPI_Finalize();
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment