Commit 88364adb authored by SIMONIN Matthieu's avatar SIMONIN Matthieu
Browse files

Merge branch 'dev/direct_com' into 'master'

Dev/direct com

See merge request !8
parents e0ff954a 1d6db17d
Pipeline #223471 failed with stages
in 9 minutes and 28 seconds
......@@ -11,7 +11,7 @@ std::vector<simgrid::s4u::CommPtr> pending_comms;
std::vector<vsg::Message*> pending_messages;
static std::vector<simgrid::s4u::ActorPtr> receivers;
static std::vector<simgrid::s4u::ActorPtr> tansiv_actors;
const std::string vsg_vm_name = "vsg_vm";
......@@ -24,8 +24,8 @@ static double compute_min_latency()
}
double min_latency = std::numeric_limits<double>::infinity();
for (simgrid::s4u::ActorPtr const& sender : receivers) {
for (simgrid::s4u::ActorPtr const& receiver : receivers) {
for (simgrid::s4u::ActorPtr const& sender : tansiv_actors) {
for (simgrid::s4u::ActorPtr const& receiver : tansiv_actors) {
if (sender != receiver) {
std::vector<simgrid::s4u::Link*> links;
double latency = 0;
......@@ -55,20 +55,7 @@ static double get_next_event()
return next_event_time;
}
static void sender(std::string const& mailbox_name, vsg::Message* m)
{
XBT_INFO("sending (size %u) from vm [%s], to vm [%s] (on pm [%s])", m->size, m->src.c_str(), m->dest.c_str(),
mailbox_name.c_str());
int msg_size = m->size;
simgrid::s4u::CommPtr comm = simgrid::s4u::Mailbox::by_name(mailbox_name)->put_async(m, msg_size);
pending_comms.push_back(comm);
pending_messages.push_back(m);
comm->wait();
}
static void receiver(std::vector<std::string> args)
static void tansiv_actor(std::vector<std::string> args)
{
XBT_INFO("running receiver");
......@@ -87,21 +74,7 @@ static void receiver(std::vector<std::string> args)
// IMPORTANT: before any simcall, we register the VM to the interface. This way, the coordinator actor will start
// AFTER all the registrations.
vms_interface->register_vm(mailbox_name, args[1], args[2], fork_command);
receivers.push_back(simgrid::s4u::Actor::self());
simgrid::s4u::ActorPtr myself = simgrid::s4u::Actor::self();
simgrid::s4u::Mailbox* mailbox = simgrid::s4u::Mailbox::by_name(mailbox_name);
// this actor is a permanent receiver on its host mailbox.
mailbox->set_receiver(myself);
// For the simulation to end with the coordinator actor, we daemonize all the other actors.
myself->daemonize();
while (true) {
// The received message is freed in the deliverMessage() function, so don't access it from here, as we don't know
// anything about the actors' ordering
mailbox->get<vsg::Message>();
}
tansiv_actors.push_back(simgrid::s4u::Actor::self());
}
static void vm_coordinator()
......@@ -118,10 +91,10 @@ static void vm_coordinator()
for (auto const& host : vms_interface->get_dead_vm_hosts()) {
auto erased_section_begin =
std::remove_if(receivers.begin(), receivers.end(),
std::remove_if(tansiv_actors.begin(), tansiv_actors.end(),
[host](const simgrid::s4u::ActorPtr& o) { return (o->get_host()->get_name() == host); });
receivers.erase(erased_section_begin, receivers.end());
tansiv_actors.erase(erased_section_begin, tansiv_actors.end());
deads = true;
}
if (deads)
......@@ -149,15 +122,17 @@ static void vm_coordinator()
simgrid::s4u::this_actor::sleep_until(m->sent_time);
}
std::string src_host = vms_interface->getHostOfVm(m->src);
xbt_assert(not src_host.empty(), "The VM %s tries to send a message but we do not know its PM", m->src.c_str());
std::string dest_host = vms_interface->getHostOfVm(m->dest);
if (not dest_host.empty()) {
simgrid::s4u::ActorPtr actor =
simgrid::s4u::Actor::create("sender", simgrid::s4u::Host::by_name(src_host), sender, dest_host, m);
// For the simulation to end with the coordinator actor, we daemonize all the other actors.
actor->daemonize();
std::string src_host_name = vms_interface->getHostOfVm(m->src);
xbt_assert(not src_host_name.empty(), "The VM %s tries to send a message but we do not know its PM",
m->src.c_str());
std::string dest_host_name = vms_interface->getHostOfVm(m->dest);
if (not dest_host_name.empty()) {
auto src_host = simgrid::s4u::Host::by_name(src_host_name);
auto dest_host = simgrid::s4u::Host::by_name(dest_host_name);
auto comm = simgrid::s4u::Comm::sendto_async(src_host, dest_host, m->size);
pending_comms.push_back(comm);
pending_messages.push_back(m);
} else {
XBT_WARN("the VM %s tries to send a message to the unknown VM %s", m->src.c_str(), m->dest.c_str());
}
......@@ -178,8 +153,8 @@ static void vm_coordinator()
pending_comms.erase(pending_comms.begin() + changed_pos);
pending_messages.erase(pending_messages.begin() + changed_pos);
XBT_DEBUG("[coordinator]: delivering data from vm [%s] to vm [%s] (size=%d)", m->src.c_str(), m->dest.c_str(),
m->size);
XBT_INFO("[coordinator]: delivering data from vm [%s] to vm [%s] (size=%d)", m->src.c_str(), m->dest.c_str(),
m->size);
vms_interface->deliverMessage(m);
changed_pos = simgrid::s4u::Comm::test_any(&pending_comms);
......@@ -215,7 +190,7 @@ int main(int argc, char* argv[])
vms_interface = new vsg::VmsInterface();
e.register_function(vsg_vm_name, &receiver);
e.register_function(vsg_vm_name, &tansiv_actor);
simgrid::s4u::Actor::create("vm_coordinator", e.get_all_hosts()[0], vm_coordinator);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment