Commit f4a60679 authored by Martin Quinson's avatar Martin Quinson
Browse files

Explicitly manage the messages' lifetime

parent 52eacc76
......@@ -11,7 +11,7 @@
cmake_minimum_required(VERSION 2.8.10)
project (Tansiv C CXX)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14 -g")
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/tools/cmake/")
# order of linked library is important !
# https://stackoverflow.com/questions/45135/why-does-the-order-in-which-libraries-are-linked-sometimes-cause-errors-in-gcc
......
......@@ -34,9 +34,9 @@ public:
VmsInterface(bool stop_condition = false);
~VmsInterface();
bool vmActive();
std::vector<Message> goTo(double deadline);
std::vector<Message*> goTo(double deadline);
std::string getHostOfVm(std::string vm_name);
void deliverMessage(Message m);
void deliverMessage(Message* m);
void end_simulation(bool must_unlink = true, bool must_exit = true);
void register_vm(std::string host_name, std::string vm_name, std::string file, std::vector<std::string> args);
const std::vector<std::string> get_dead_vm_hosts();
......
......@@ -9,7 +9,7 @@ vsg::VmsInterface* vms_interface;
std::vector<simgrid::s4u::CommPtr> pending_comms;
std::vector<vsg::Message> pending_messages;
std::vector<vsg::Message*> pending_messages;
static std::vector<simgrid::s4u::ActorPtr> receivers;
......@@ -55,14 +55,14 @@ static double get_next_event()
return next_event_time;
}
static void sender(std::string const& mailbox_name, vsg::Message m)
static void sender(std::string const& mailbox_name, vsg::Message* m)
{
XBT_INFO("sending (size %u) from vm [%s], to vm [%s] (on pm [%s])", m.size, m.src.c_str(), m.dest.c_str(),
XBT_INFO("sending (size %u) from vm [%s], to vm [%s] (on pm [%s])", m->size, m->src.c_str(), m->dest.c_str(),
mailbox_name.c_str());
int msg_size = m.size;
simgrid::s4u::CommPtr comm = simgrid::s4u::Mailbox::by_name(mailbox_name)->put_async(&m, msg_size);
int msg_size = m->size;
simgrid::s4u::CommPtr comm = simgrid::s4u::Mailbox::by_name(mailbox_name)->put_async(m, msg_size);
pending_comms.push_back(comm);
pending_messages.push_back(m);
comm->wait();
......@@ -100,8 +100,9 @@ static void receiver(std::vector<std::string> args)
myself->daemonize();
while (true) {
vsg::Message* m = mailbox->get<vsg::Message>();
XBT_INFO("[receiver] delivering data from vm [%s] to vm [%s]", m->src.c_str(), m->dest.c_str());
// The received message is freed in the deliverMessage() function, so don't access it from here, as we don't know
// anything about the actors' ordering
mailbox->get<vsg::Message>();
}
}
......@@ -136,31 +137,31 @@ static void vm_coordinator()
XBT_DEBUG("next deadline = %f [time+min_latency=%f, next_reception_time=%f]", deadline, time + min_latency,
next_reception_time);
std::vector<vsg::Message> messages = vms_interface->goTo(deadline);
for (vsg::Message const& m : messages) {
std::vector<vsg::Message*> messages = vms_interface->goTo(deadline);
for (vsg::Message* m : messages) {
time = simgrid::s4u::Engine::get_clock();
double send_timeeps = m.sent_time + std::numeric_limits<double>::epsilon();
double send_timeeps = m->sent_time + std::numeric_limits<double>::epsilon();
xbt_assert(
m.sent_time + send_timeeps >= time,
m->sent_time + send_timeeps >= time,
"violation of the causality constraint : trying to send a message at time %f[%f] whereas we are already "
"at time %f[%f]",
m.sent_time, send_timeeps, time, time);
if (m.sent_time > time) {
XBT_DEBUG("going to time %f", m.sent_time);
simgrid::s4u::this_actor::sleep_until(m.sent_time);
m->sent_time, send_timeeps, time, time);
if (m->sent_time > time) {
XBT_DEBUG("going to time %f", m->sent_time);
simgrid::s4u::this_actor::sleep_until(m->sent_time);
}
std::string src_host = vms_interface->getHostOfVm(m.src);
xbt_assert(not src_host.empty(), "The VM %s tries to send a message but we do not know its PM", m.src.c_str());
std::string src_host = vms_interface->getHostOfVm(m->src);
xbt_assert(not src_host.empty(), "The VM %s tries to send a message but we do not know its PM", m->src.c_str());
std::string dest_host = vms_interface->getHostOfVm(m.dest);
std::string dest_host = vms_interface->getHostOfVm(m->dest);
if (not dest_host.empty()) {
simgrid::s4u::ActorPtr actor =
simgrid::s4u::Actor::create("sender", simgrid::s4u::Host::by_name(src_host), sender, dest_host, m);
// For the simulation to end with the coordinator actor, we daemonize all the other actors.
actor->daemonize();
} else {
XBT_WARN("the VM %s tries to send a message to the unknown VM %s", m.src.c_str(), m.dest.c_str());
XBT_WARN("the VM %s tries to send a message to the unknown VM %s", m->src.c_str(), m->dest.c_str());
}
}
......@@ -174,13 +175,13 @@ static void vm_coordinator()
changed_pos >=
0) { // deadline was on next_reception_time, ie, latency was high enough for the next msg to arrive before this
simgrid::s4u::CommPtr comm = pending_comms[changed_pos];
vsg::Message m = pending_messages[changed_pos];
vsg::Message* m = pending_messages[changed_pos];
pending_comms.erase(pending_comms.begin() + changed_pos);
pending_messages.erase(pending_messages.begin() + changed_pos);
XBT_DEBUG("[coordinator]: delivering data from vm [%s] to vm [%s] (size=%d)", m.src.c_str(), m.dest.c_str(),
m.size);
XBT_DEBUG("[coordinator]: delivering data from vm [%s] to vm [%s] (size=%d)", m->src.c_str(), m->dest.c_str(),
m->size);
vms_interface->deliverMessage(m);
changed_pos = simgrid::s4u::Comm::test_any(&pending_comms);
......@@ -224,5 +225,7 @@ int main(int argc, char* argv[])
e.run();
delete vms_interface;
return 0;
}
......@@ -13,9 +13,9 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(vm_interface, "Logging specific to the VmsInterface
namespace vsg {
bool sortMessages(Message i, Message j)
bool sortMessages(Message* i, Message* j)
{
return i.sent_time < j.sent_time;
return i->sent_time < j->sent_time;
}
vsg_time simgridToVmTime(double simgrid_time)
......@@ -140,7 +140,7 @@ bool VmsInterface::vmActive()
return (!vm_sockets.empty() && !simulate_until_any_stop) || (!a_vm_stopped && simulate_until_any_stop);
}
std::vector<Message> VmsInterface::goTo(double deadline)
std::vector<Message*> VmsInterface::goTo(double deadline)
{
// Beforehand, forget about the VMs that bailed out recently.
// We hope that the coordinator cleaned the SimGrid side in between
......@@ -158,7 +158,7 @@ std::vector<Message> VmsInterface::goTo(double deadline)
}
// then, we pick up all the messages send by the VM until they reach the deadline
std::vector<Message> messages;
std::vector<Message*> messages;
XBT_DEBUG("getting the message send by the VMs");
for (auto kv : vm_sockets) {
uint32_t vm_flag = 0;
......@@ -200,7 +200,7 @@ std::vector<Message> VmsInterface::goTo(double deadline)
data, sizeof(data), vm_name.c_str(), src_addr, send_packet.packet.dst, dst_addr,
send_packet.send_time.seconds, send_packet.send_time.useconds);
Message m = Message(send_packet, data);
Message* m = new Message(send_packet, data);
// NB: packet_size is the size used by SimGrid to simulate the transfer of the data on the network.
// It does NOT correspond to the size of the data transfered to/from the VM on the REAL socket.
......@@ -215,7 +215,7 @@ std::vector<Message> VmsInterface::goTo(double deadline)
for (auto sock_name : vm_sockets_trash)
vm_sockets.erase(sock_name);
XBT_DEBUG("forwarding all the %d messages to SimGrid", messages.size());
XBT_DEBUG("forwarding all the %lu messages to SimGrid", messages.size());
std::sort(messages.begin(), messages.end(), sortMessages);
return messages;
......@@ -246,20 +246,20 @@ const std::vector<std::string> VmsInterface::get_dead_vm_hosts()
return dead_hosts;
}
void VmsInterface::deliverMessage(Message m)
void VmsInterface::deliverMessage(Message* m)
{
if (vm_sockets.find(m.dest) != vm_sockets.end()) {
int socket = vm_sockets[m.dest];
uint32_t deliver_flag = vsg_msg_in_type::DeliverPacket;
struct vsg_deliver_packet deliver_packet = {.packet = m.send_packet.packet};
vsg_deliver_send(socket, deliver_packet, m.data);
XBT_VERB("message from vm %s delivered to vm %s size=%ld", m.src.c_str(), m.dest.c_str(),
m.send_packet.packet.size);
if (vm_sockets.find(m->dest) != vm_sockets.end()) {
int socket = vm_sockets[m->dest];
struct vsg_deliver_packet deliver_packet = {.packet = m->send_packet.packet};
vsg_deliver_send(socket, deliver_packet, m->data);
XBT_VERB("message from vm %s delivered to vm %s size=%u", m->src.c_str(), m->dest.c_str(),
m->send_packet.packet.size);
} else {
XBT_WARN("message from vm %s was not delivered to vm %s because it already stopped its execution", m.src.c_str(),
m.dest.c_str());
XBT_WARN("message from vm %s was not delivered to vm %s because it already stopped its execution", m->src.c_str(),
m->dest.c_str());
}
delete m;
}
Message::Message(vsg_send_packet send_packet, uint8_t* payload)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment