Commit 2bb7e9f1 authored by SIMONIN Matthieu's avatar SIMONIN Matthieu
Browse files

Allow to transfer any data

Initially only std::string was allowed to go through simgrid.

For this purpose I introduced a Message object which mimic the initial
struct used (struct message). The difference lies in the dynamic
attribute (data) to hold the raw payload (as uint8_t*) that needs to be
delivered.
parent b3996920
Pipeline #183613 passed with stages
in 14 minutes and 32 seconds
......@@ -86,6 +86,7 @@ VM_IMAGE=$VM_NAME.qcow2
TAP_NAME=tap${MAC:(-2)}
$QEMU \
--icount shift=auto \
--vsg mynet0,src=$IP \
-m 1g \
-drive file=$VM_IMAGE \
......
......@@ -3,6 +3,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <unistd.h>
extern "C" {
#include <fake_vm.h>
#include <vsg.h>
......@@ -74,7 +75,7 @@ int main(int argc, char* argv[])
uint32_t recv_dest;
uint32_t buffer_len = msg.length() + 1;
char buffer[buffer_len];
ret = vsg_recv(context, &src, &dest, &buffer_len, (uint8_t*)buffer);
ret = vsg_recv(context, &recv_src, &recv_dest, &buffer_len, (uint8_t*)buffer);
if (ret) {
die("vsg_recv() failed", ret);
}
......@@ -85,9 +86,9 @@ int main(int argc, char* argv[])
inet_ntop(AF_INET, &recv_dest, recv_dest_str, INET_ADDRSTRLEN);
// We trust our peer to have sent the final NUL byte... or we will see that he
// is a bad boy!
printf("From %s to %s: %s", recv_src_str, recv_dest_str, buffer);
exit(0);
printf("\n###### \n");
printf("Received from %s to %s: %s", recv_src_str, recv_dest_str, buffer);
printf("\n###### \n\n");
// vsg_stop block until stopped flag is set
// stopped flag is set, for instance, when EndSimulation is received
......
......@@ -10,13 +10,22 @@ extern "C" {
namespace vsg {
struct message {
class Message {
public:
Message(vsg_send_packet send_packet, uint8_t* payload);
Message(const Message& other);
Message(Message&& other);
Message& operator=(Message&& other);
~Message();
double sent_time;
std::string src; // decoded src
std::string dest; // decoded dest
uint32_t packet_size; // decoded packet size
vsg_packet packet; // (raw)packet info
std::string data; // raw data
vsg_send_packet send_packet;
// decoded attribute
std::string src;
std::string dest;
// keep size here for backward compatibility
uint32_t size;
// this will be dynamically allocated according to size
uint8_t* data;
};
class VmsInterface {
......@@ -25,9 +34,9 @@ public:
VmsInterface(bool stop_condition = false);
~VmsInterface();
bool vmActive();
std::vector<message> goTo(double deadline);
std::vector<Message> goTo(double deadline);
std::string getHostOfVm(std::string vm_name);
void deliverMessage(message m);
void deliverMessage(Message m);
void end_simulation(bool must_unlink = true, bool must_exit = true);
void register_vm(std::string host_name, std::string vm_name, std::string file, std::vector<std::string> args);
const std::vector<std::string> get_dead_vm_hosts();
......
......@@ -9,7 +9,7 @@ vsg::VmsInterface* vms_interface;
std::vector<simgrid::s4u::CommPtr> pending_comms;
std::vector<vsg::message> pending_messages;
std::vector<vsg::Message> pending_messages;
static std::vector<simgrid::s4u::ActorPtr> receivers;
......@@ -55,13 +55,13 @@ static double get_next_event()
return next_event_time;
}
static void sender(std::string mailbox_name, vsg::message m)
static void sender(std::string mailbox_name, vsg::Message m)
{
XBT_INFO("sending [%s] (size %lu) from vm [%s], to vm [%s] (on pm [%s])", m.data.c_str(), m.packet_size,
m.src.c_str(), m.dest.c_str(), mailbox_name.c_str());
XBT_INFO("sending (size %lu) from vm [%s], to vm [%s] (on pm [%s])", m.size, m.src.c_str(), m.dest.c_str(),
mailbox_name.c_str());
int msg_size = m.packet_size;
int msg_size = m.size;
simgrid::s4u::CommPtr comm = simgrid::s4u::Mailbox::by_name(mailbox_name)->put_async(&m, msg_size);
pending_comms.push_back(comm);
pending_messages.push_back(m);
......@@ -100,9 +100,8 @@ static void receiver(std::vector<std::string> args)
myself->daemonize();
while (true) {
vsg::message* m = static_cast<vsg::message*>(mailbox->get());
XBT_INFO("[receiver] delivering data [%s] from vm [%s] to vm [%s]", m->data.c_str(), m->src.c_str(),
m->dest.c_str());
vsg::Message* m = static_cast<vsg::Message*>(mailbox->get());
XBT_INFO("[receiver] delivering data from vm [%s] to vm [%s]", m->src.c_str(), m->dest.c_str());
}
}
......@@ -141,10 +140,8 @@ static void vm_coordinator()
XBT_DEBUG("next deadline = %f [time+min_latency=%f, next_reception_time=%f]", deadline, time + min_latency,
next_reception_time);
std::vector<vsg::message> messages = vms_interface->goTo(deadline);
for (vsg::message m : messages) {
std::vector<vsg::Message> messages = vms_interface->goTo(deadline);
for (vsg::Message m : messages) {
time = simgrid::s4u::Engine::get_clock();
double send_timeeps = m.sent_time + std::numeric_limits<double>::epsilon();
xbt_assert(
......@@ -177,20 +174,18 @@ static void vm_coordinator()
double time = simgrid::s4u::Engine::get_clock();
simgrid::s4u::this_actor::sleep_until(deadline);
}
int changed_pos = simgrid::s4u::Comm::test_any(&pending_comms);
while (
changed_pos >=
0) { // deadline was on next_reception_time, ie, latency was high enough for the next msg to arrive before this
simgrid::s4u::CommPtr comm = pending_comms[changed_pos];
vsg::message m = pending_messages[changed_pos];
vsg::Message m = pending_messages[changed_pos];
pending_comms.erase(pending_comms.begin() + changed_pos);
pending_messages.erase(pending_messages.begin() + changed_pos);
XBT_DEBUG("[coordinator]: delivering data [%s] from vm [%s] to vm [%s]", m.data.c_str(), m.src.c_str(),
m.dest.c_str());
XBT_DEBUG("[coordinator]: delivering data from vm [%s] to vm [%s] (size=%d)", m.src.c_str(), m.dest.c_str(),
m.size);
vms_interface->deliverMessage(m);
changed_pos = simgrid::s4u::Comm::test_any(&pending_comms);
......
......@@ -13,7 +13,7 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(vm_interface, "Logging specific to the VmsInterface
namespace vsg {
bool sortMessages(message i, message j)
bool sortMessages(Message i, Message j)
{
return i.sent_time < j.sent_time;
}
......@@ -140,7 +140,7 @@ bool VmsInterface::vmActive()
return (!vm_sockets.empty() && !simulate_until_any_stop) || (!a_vm_stopped && simulate_until_any_stop);
}
std::vector<message> VmsInterface::goTo(double deadline)
std::vector<Message> VmsInterface::goTo(double deadline)
{
// Beforehand, forget about the VMs that bailed out recently.
// We hope that the coordinator cleaned the SimGrid side in between
......@@ -158,7 +158,7 @@ std::vector<message> VmsInterface::goTo(double deadline)
}
// then, we pick up all the messages send by the VM until they reach the deadline
std::vector<message> messages;
std::vector<Message> messages;
XBT_DEBUG("getting the message send by the VMs");
for (auto kv : vm_sockets) {
uint32_t vm_flag = 0;
......@@ -188,7 +188,7 @@ std::vector<message> VmsInterface::goTo(double deadline)
// - and the data transfer, that correspond to the data actually send through the (simulated) network
// (nb: we use vm_name.length() to determine the size of the destination address because we assume all the vm id
// to have the same size)
char data[send_packet.packet.size];
uint8_t data[send_packet.packet.size];
if (recv(vm_socket, data, sizeof(data), MSG_WAITALL) <= 0) {
XBT_ERROR("can not receive the data of the message from VM %s. The socket may be closed", vm_name.c_str());
end_simulation();
......@@ -200,16 +200,10 @@ std::vector<message> VmsInterface::goTo(double deadline)
data, sizeof(data), vm_name.c_str(), src_addr, send_packet.packet.dst, dst_addr,
send_packet.send_time.seconds, send_packet.send_time.useconds);
struct message m;
Message m = Message(send_packet, data);
// NB: packet_size is the size used by SimGrid to simulate the transfer of the data on the network.
// It does NOT correspond to the size of the data transfered to/from the VM on the REAL socket.
m.packet_size = sizeof(data);
m.data.append(data);
m.packet = send_packet.packet;
m.src = std::string(src_addr);
m.dest = std::string(dst_addr);
m.sent_time = vmToSimgridTime(send_packet.send_time);
messages.push_back(m);
} else {
XBT_ERROR("unknown message received from VM %s : %lu", vm_name.c_str(), vm_flag);
......@@ -221,8 +215,7 @@ std::vector<message> VmsInterface::goTo(double deadline)
for (auto sock_name : vm_sockets_trash)
vm_sockets.erase(sock_name);
XBT_DEBUG("forwarding all the messages to SimGrid");
XBT_DEBUG("forwarding all the %d messages to SimGrid", messages.size());
std::sort(messages.begin(), messages.end(), sortMessages);
return messages;
......@@ -253,20 +246,74 @@ const std::vector<std::string> VmsInterface::get_dead_vm_hosts()
return dead_hosts;
}
void VmsInterface::deliverMessage(message m)
void VmsInterface::deliverMessage(Message m)
{
if (vm_sockets.find(m.dest) != vm_sockets.end()) {
int socket = vm_sockets[m.dest];
uint32_t deliver_flag = vsg_msg_in_type::DeliverPacket;
std::string data = m.data;
struct vsg_deliver_packet deliver_packet = {.packet = m.packet};
vsg_deliver_send(socket, deliver_packet, (uint8_t*)data.c_str());
struct vsg_deliver_packet deliver_packet = {.packet = m.send_packet.packet};
vsg_deliver_send(socket, deliver_packet, m.data);
XBT_VERB("message from vm %s delivered to vm %s", m.src.c_str(), m.dest.c_str());
XBT_VERB("message from vm %s delivered to vm %s size=%ld", m.src.c_str(), m.dest.c_str(),
m.send_packet.packet.size);
} else {
XBT_WARN("message from vm %s was not delivered to vm %s because it already stopped its execution", m.src.c_str(),
m.dest.c_str());
}
}
Message::Message(vsg_send_packet send_packet, uint8_t* payload)
: send_packet(send_packet), size(send_packet.packet.size)
{
// -- compute sent time the sent_time
this->sent_time = vmToSimgridTime(send_packet.send_time);
// -- then src and dest and make them a std::string
char dst_addr[INET_ADDRSTRLEN];
char src_addr[INET_ADDRSTRLEN];
vsg_decode_src_dst(send_packet, src_addr, dst_addr);
this->src = std::string(src_addr);
this->dest = std::string(dst_addr);
// -- finally handle the payload
this->data = new uint8_t[this->size];
memcpy(this->data, payload, this->size);
// printf("Creating new Message@%p: size=%d, data@%p\n", this, this->size, this->data);
};
Message::Message(const Message& other) : Message(other.send_packet, other.data)
{
// printf("Copied Message[%p]: size=%d, data@%p from message[%p]\n", this, this->size, this->data, &other);
}
Message::Message(Message&& other) : data(nullptr)
{
// uses the assignement
*this = std::move(other);
// printf("Moved Message[%p]: size=%d, data@%p from Message[%p]\n", this, this->size, this->data, &other);
}
Message& Message::operator=(Message&& other)
{
if (this != &other) {
delete[] this->data;
this->data = other.data;
this->size = other.size;
this->src = other.src;
this->dest = other.dest;
this->send_packet = other.send_packet;
this->sent_time = other.sent_time;
other.data = nullptr;
}
// printf("Moved assigned Message[%p]: size=%d, data@%p from message[%p]\n", this, this->size, this->data, &other);
return *this;
}
Message::~Message()
{
if (this->data != nullptr) {
delete[] this->data;
// printf("Destructing message[%p]: size=%d, data@%p\n", this, this->size, this->data);
}
}
} // namespace vsg
#include "log.h"
#include "vsg.h"
#include "log.h"
#include <arpa/inet.h>
#include <limits.h>
#include <math.h>
......@@ -10,6 +10,18 @@
#include <sys/un.h>
#include <unistd.h>
void dump_packet(const uint8_t* buf, size_t size)
{
printf("Dumping packet at %p size %ld \n", buf, size);
// c compatible dump
printf("{");
for (int i = 0; i < size - 1; i++) {
printf("0x%02x,", buf[i]);
}
printf("0x%02x}", buf[size - 1]);
printf("\n");
}
/**
*
* Debug purpose only,
......
......@@ -51,6 +51,8 @@ struct vsg_deliver_packet {
struct vsg_packet packet;
};
void dump_packet(const uint8_t*, size_t);
void vsg_pg_port(in_port_t, uint8_t*, int, uint8_t*);
void vsg_upg_port(void*, int, in_port_t*, uint8_t**);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment