Commit 854187f2 authored by Christoph Conrads's avatar Christoph Conrads
Browse files

Server: add TCP server/launcher communication

This commit breaks the Melissa DA launcher.
parent 7f122d3b
Pipeline #405198 failed with stage
in 1 minute and 56 seconds
add_library(melissa_common OBJECT
Part.cxx
Socket.cxx
ZeroMQ.cxx
melissa_utils.c
utils.cxx
melissa_messages.c
melissa_messages.cxx
)
......@@ -18,7 +19,7 @@ install(FILES melissa_da_stype.h DESTINATION include)
# comm4py
add_library(melissa_comm4py SHARED melissa_comm4py.c melissa_utils.c melissa_messages.c)
add_library(melissa_comm4py SHARED melissa_comm4py.c melissa_utils.c melissa_messages.cxx)
target_link_libraries(melissa_comm4py PUBLIC zmq)
install(
TARGETS melissa_comm4py
......
// Copyright 2022 Institut National de Recherche en Informatique et en Automatique (Inria)
#include "Socket.h"
#include <cassert>
#include <cerrno>
#include <limits>
#include <unistd.h>
namespace melissa
{
Socket::Socket(int fd) : fd_(fd) {
assert(fd >= 0);
}
Socket::~Socket() {
assert(fd_ < 0);
}
int Socket::close() {
assert(fd_ >= 0);
auto ret = close_impl();
fd_ = -1;
return ret;
}
ssize_t Socket::write(const void* buffer, std::size_t count) {
assert(buffer || count == 0);
return write_impl(buffer, count);
}
ssize_t Socket::read(void* buffer, std::size_t count) {
assert(buffer || count == 0);
return read_impl(buffer, count);
}
int Socket::close_impl() {
return ::close(fd_);
}
ssize_t Socket::write_impl(const void* buffer, std::size_t count) {
return ::write(fd_, buffer, count);
}
ssize_t Socket::read_impl(void* buffer, std::size_t count) {
return ::read(fd_, buffer, count);
}
TcpSocket::TcpSocket(int fd) : Socket(fd) {
}
ssize_t TcpSocket::write_impl(const void* buffer, std::size_t count) {
if(count > std::numeric_limits<PlfInteger>::max())
{
errno = EMSGSIZE;
return -1;
}
auto prefix = static_cast<PlfInteger>(count);
auto ret = Socket::write_impl(&prefix, sizeof(prefix));
if(ret <= 0)
{
return ret;
}
if(ret != sizeof(prefix))
{
close();
errno = EPIPE;
return -1;
}
return Socket::write_impl(buffer, count);
}
}
#ifndef SOCKET_H_
#define SOCKET_H_
#include <cstddef>
#include <cstdint>
#include <sys/types.h>
namespace melissa
{
struct Socket
{
explicit Socket(int fd);
int fileno() const {
return fd_;
}
Socket(const Socket&) = delete;
~Socket();
void operator= (const Socket&) = delete;
int close();
ssize_t write(const void* buffer, std::size_t count);
ssize_t read(void* buffer, std::size_t count);
virtual int close_impl();
virtual ssize_t write_impl(const void* buffer, std::size_t count);
virtual ssize_t read_impl(void* buffer, std::size_t count);
int fd_ = -1;
};
struct SctpSocket : public Socket
{
explicit SctpSocket(int fd) : Socket(fd) {
}
};
struct TcpSocket : public Socket
{
/// integer type for prefix length framing
using PlfInteger = std::uint32_t;
explicit TcpSocket(int fd);
virtual ssize_t write_impl(const void* buffer, std::size_t count);
};
}
#endif
......@@ -178,7 +178,7 @@ void poll_message(char* message, size_t len) // TODO: why parsing it like this!
void send_hello()
{
assert(g_serverfd >= 0);
send_message_hello(g_serverfd, 0);
send_message_hello(NULL, 0);
}
void close_message()
......
......@@ -16,6 +16,7 @@
#include "melissa_messages.h"
#include "melissa_utils.h"
#include "Socket.h"
#include <assert.h>
#include <limits.h>
......@@ -32,25 +33,30 @@ int get_message_type(const char* buff)
return msg_type;
}
int send_message_hello(int fd, int flags)
int send_message_hello(void* p_socket, int flags)
{
auto p = reinterpret_cast<melissa::Socket*>(p_socket);
int32_t message[] = { HELLO };
return write(fd, message, sizeof(message));
return p->write(message, sizeof(message));
}
int send_message_stop(int fd, int flags)
int send_message_stop(void* p_socket, int flags)
{
auto p = reinterpret_cast<melissa::Socket*>(p_socket);
int32_t message[] = { STOP };
return write(fd, message, sizeof(message));
return p->write(message, sizeof(message));
}
int send_message_simu_status(int simu_id, int status, int fd, int flags)
int send_message_simu_status(
int simu_id, int status, void* p_socket, int flags)
{
auto p = reinterpret_cast<melissa::Socket*>(p_socket);
int32_t message[] = { SIMU_STATUS, simu_id, status };
return write(fd, message, sizeof(message));
return p->write(message, sizeof(message));
}
int send_message_server_name(const char* node_name, int rank, int fd, int flags)
int send_message_server_name(
const char* node_name, int rank, void* p_socket, int flags)
{
int32_t header[] = { SERVER, rank };
size_t msg_size = sizeof(header) + strlen(node_name) + 1;
......@@ -65,7 +71,9 @@ int send_message_server_name(const char* node_name, int rank, int fd, int flags)
memcpy(msg, header, sizeof(header));
strcpy(msg + sizeof(header), node_name);
ssize_t ret = write(fd, msg, msg_size);
auto p = reinterpret_cast<melissa::Socket*>(p_socket);
ssize_t ret = p->write(msg, msg_size);
free(msg);
return ret;
......
......@@ -30,13 +30,13 @@ extern "C" {
int get_message_type(const char* buff);
int send_message_hello(int fd, int flags);
int send_message_hello(void* socket, int flags);
int send_message_stop(int fd, int flags);
int send_message_stop(void* socket, int flags);
int send_message_simu_status(int simu_id, int status, int fd, int flags);
int send_message_simu_status(int simu_id, int status, void* socket, int flags);
int send_message_server_name(const char* node_name, int rank, int fd, int
int send_message_server_name(const char* node_name, int rank, void* socket, int
flags);
#ifdef __cplusplus
......
......@@ -185,7 +185,7 @@ def run_melissa_da_study(
envs = additional_env.copy()
envs['MELISSA_TIMING_NULL'] = str(start_time)
envs['MELISSA_LAUNCHER_HOST'] = node_name
envs['MELISSA_LAUNCHER_HOST'] = self.node_name
envs['MELISSA_LAUNCHER_PORT'] = str(g_port)
join_dicts(envs, additional_server_env)
......
......@@ -10,6 +10,7 @@
#include "utils.h"
#include "melissa_utils.h" // melissa_utils from melissa-sa for melissa_get_node_name
#include "melissa_messages.h"
#include "Socket.h"
#include <cstdint>
#include <cstdio>
......@@ -25,6 +26,24 @@ LauncherConnection::LauncherConnection(void*, std::string)
{
updateLauncherDueDate();
const char* maybe_launcher_protocol = std::getenv(
"MELISSA_LAUNCHER_PROTOCOL");
if(!maybe_launcher_protocol)
{
std::fprintf(stderr,
"environment variable MELISSA_LAUNCHER_PROTOCOL not set\n");
std::exit(EXIT_FAILURE);
}
auto protocol_str = std::string(maybe_launcher_protocol);
if(protocol_str != "SCTP" && protocol_str != "TCP")
{
std::fprintf(stderr,
"unknown server/launcher communication protocol %s\n",
protocol_str.c_str());
std::exit(EXIT_FAILURE);
}
const char* maybe_launcher_host = std::getenv("MELISSA_LAUNCHER_HOST");
if(!maybe_launcher_host)
{
......@@ -52,10 +71,13 @@ LauncherConnection::LauncherConnection(void*, std::string)
launcher_host = "127.0.0.1";
}
L("Establishing connection to launcher on %s:3000-3002",
launcher_host.c_str());
L("Establishing connection to launcher on %s://%s:%s",
protocol_str.c_str(), launcher_host.c_str(), launcher_port_str);
launcherfd = socket(AF_INET, SOCK_STREAM, IPPROTO_SCTP);
auto protocol =
(protocol_str == "SCTP") ? IPPROTO_SCTP :
(protocol_str == "TCP") ? IPPROTO_TCP : -1;
auto launcherfd = socket(AF_INET, SOCK_STREAM, protocol);
if(launcherfd < 0)
{
melissa_die(__FILE__, __LINE__, "socket");
......@@ -65,7 +87,7 @@ LauncherConnection::LauncherConnection(void*, std::string)
std::memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_SCTP;
hints.ai_protocol = protocol;
hints.ai_flags = AI_NUMERICSERV | AI_ADDRCONFIG;
addrinfo* addrs = nullptr;
......@@ -115,24 +137,38 @@ LauncherConnection::LauncherConnection(void*, std::string)
std::exit(EXIT_FAILURE);
}
std::fprintf(stderr, "server connected to launcher\n");
L("server connected to launcher");
if(protocol == IPPROTO_SCTP)
{
p_socket = new melissa::SctpSocket(launcherfd);
}
else
{
p_socket = new melissa::TcpSocket(launcherfd);
}
// Send the first message
send_message_server_name(hostname, comm_rank, launcherfd, 0);
send_message_server_name(hostname, comm_rank, p_socket, 0);
updateLauncherNextMessageDate();
D("Successfully connected to launcher");
}
LauncherConnection::~LauncherConnection()
{
send_message_stop(launcherfd, 0);
send_message_stop(p_socket, 0);
if(close(launcherfd))
if(p_socket->close())
{
melissa_die(__FILE__, __LINE__, "close");
}
}
int LauncherConnection::getTextPuller() {
assert(p_socket);
return p_socket->fileno();
}
void LauncherConnection::updateLauncherDueDate()
{
due_date_launcher = time(NULL) + LAUNCHER_TIMEOUT;
......@@ -146,7 +182,7 @@ bool LauncherConnection::checkLauncherDueDate()
void LauncherConnection::receiveText()
{
char buffer[16] = {0};
if(read(launcherfd, buffer, sizeof(buffer)) < 0)
if(p_socket->read(buffer, sizeof(buffer)) < 0)
{
melissa_die(__FILE__, __LINE__, "read");
}
......@@ -174,12 +210,12 @@ void LauncherConnection::ping()
{
if (time(NULL) > next_message_date_to_launcher)
{
send_message_hello(launcherfd, 0);
send_message_hello(p_socket, 0);
updateLauncherNextMessageDate();
}
}
void LauncherConnection::notify(const int runner_id, const int status)
{
ZMQ_CHECK(send_message_simu_status(runner_id, status, launcherfd, 0));
ZMQ_CHECK(send_message_simu_status(runner_id, status, p_socket, 0));
}
......@@ -8,36 +8,32 @@
#ifndef LAUNCHERCONNECTION_H_
#define LAUNCHERCONNECTION_H_
#include <ctime>
#include <string>
namespace melissa
{
namespace communication
{
class Socket;
struct Socket;
}
}
#include <string>
class LauncherConnection
{
private:
int launcherfd = -1;
melissa::communication::Socket* p_socket = nullptr;
melissa::Socket* p_socket = nullptr;
const int LAUNCHER_TIMEOUT=60; // seconds
const int LAUNCHER_PING_INTERVAL=8; // seconds
time_t due_date_launcher; // seconds
time_t next_message_date_to_launcher; // seconds
std::time_t due_date_launcher; // seconds
std::time_t next_message_date_to_launcher; // seconds
void updateLauncherNextMessageDate();
public:
LauncherConnection(void * context, std::string launcher_host);
virtual ~LauncherConnection();
inline int getTextPuller() {
return launcherfd;
}
int getTextPuller();
void receiveText();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment