Commit b6fed293 authored by RILLING Louis's avatar RILLING Louis
Browse files

fake-vm: Change the recv API to avoid making applications recv messages in signal context

API changes:
- input messages are now queued internally and not handed to the recv
  callback;
- the input queue has a bounded size and new messages are silently
  dropped if the queue is full;
- instead of being called for every input message, the recv callback is
  now called at most once per deadline and iff the queue was previously
  empty and now contains some messages, thus acting like an
  edge-triggered interrupt notifying that some messages are ready to be
  read;
- the recv callback now takes no argument in native Rust, and only a
  user provided intptr_t argument in FFI API;
- in FFI API vsg_init() takes the user-provided callback argument as
  additional argument;
- two new non-blocking operations are provided:
	- Context::poll / vsg_poll() checks if the input queue contains
	  some messages;
	- Context::recv / vsg_recv() picks the next message in the input
	  queue, if available.

Tests are adapted and completed to use the new API.
Example send.cpp is completed to receive and display the message sent by
the peer.
parent d7b62c2a
......@@ -13,8 +13,6 @@ using namespace std;
// Addresses used in this program
#define ADDR_FMT "10.0.%d.1"
std::atomic<bool> callback_called(false);
void die(const char* msg, int error)
{
fprintf(stderr, "%s", msg);
......@@ -31,9 +29,10 @@ void make_addr(char* addr, int id)
}
}
void recv_cb(const struct vsg_context* context, uint32_t msglen, const uint8_t* msg)
void recv_cb(uintptr_t arg)
{
callback_called = true;
std::atomic<bool>* callback_called = (std::atomic<bool>*)arg;
*callback_called = true;
};
int main(int argc, char* argv[])
......@@ -49,7 +48,8 @@ int main(int argc, char* argv[])
uint32_t src = inet_addr(src_str);
int vsg_argc = 6;
const char* const vsg_argv[] = {"-a", CONNECTION_SOCKET_NAME, "-n", src_str, "-t", "1970-01-01T00:00:00"};
vsg_context* context = vsg_init(vsg_argc, vsg_argv, NULL, recv_cb);
std::atomic<bool> callback_called(false);
vsg_context* context = vsg_init(vsg_argc, vsg_argv, NULL, recv_cb, (uintptr_t)&callback_called);
if (!context) {
die("Unable to initialize the context", 0);
......@@ -69,6 +69,24 @@ int main(int argc, char* argv[])
// yes, ...
while (!callback_called.load()) {
}
uint32_t recv_src;
uint32_t recv_dest;
uint32_t buffer_len = msg.length() + 1;
char buffer[buffer_len];
ret = vsg_recv(context, &src, &dest, &buffer_len, (uint8_t*)buffer);
if (ret) {
die("vsg_recv() failed", ret);
}
char recv_src_str[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &recv_src, recv_src_str, INET_ADDRSTRLEN);
char recv_dest_str[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &recv_dest, recv_dest_str, INET_ADDRSTRLEN);
// We trust our peer to have sent the final NUL byte... or we will see that he
// is a bad boy!
printf("From %s to %s: %s", recv_src_str, recv_dest_str, buffer);
exit(0);
// vsg_stop block until stopped flag is set
......
......@@ -6,9 +6,9 @@
struct vsg_context;
typedef void (*vsg_recv_cb)(const struct vsg_context* context, uint32_t msglen, const uint8_t* msg);
typedef void (*vsg_recv_cb)(uintptr_t recv_cb_arg);
struct vsg_context* vsg_init(int argc, const char* const argv[], int* next_arg_p, vsg_recv_cb recv_cb);
struct vsg_context* vsg_init(int argc, const char* const argv[], int* next_arg_p, vsg_recv_cb recv_cb, uintptr_t recv_cb_arg);
void vsg_cleanup(struct vsg_context* context);
int vsg_start(const struct vsg_context* context);
......@@ -16,5 +16,7 @@ int vsg_stop(const struct vsg_context* context);
int vsg_gettimeofday(const struct vsg_context* context, struct timeval* timeval, void* timezone);
int vsg_send(const struct vsg_context* context, uint32_t src, uint32_t dest, uint32_t msglen, const uint8_t* msg);
int vsg_recv(const struct vsg_context* context, uint32_t* src, uint32_t* dest, uint32_t* msglen, uint8_t* msg);
int vsg_poll(const struct vsg_context* context);
#endif /* __FAKE_VM_H__ */
This diff is collapsed.
#include <errno.h>
#include <fake_vm.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
void recv_cb(const struct vsg_context* context, uint32_t msglen, const uint8_t* msg) {}
void recv_cb(uintptr_t arg)
{
int *flag = (int *)arg;
*flag = true;
}
void die(const char* msg, int error)
{
......@@ -17,10 +23,11 @@ int main(int argc, const char* argv[])
{
struct vsg_context* context;
struct timeval time;
int flag = false;
unsigned char msg[] = "Foo msg";
int res;
context = vsg_init(argc, argv, NULL, recv_cb);
context = vsg_init(argc, argv, NULL, recv_cb, (uintptr_t)&flag);
if (!context)
die("vsg_init() failed", 0);
......@@ -38,6 +45,15 @@ int main(int argc, const char* argv[])
if (res)
die("vsg_send() failed", res);
while ((res = vsg_poll(context)) == EAGAIN) {}
if (res)
die("vsg_poll() failed", res);
uint32_t msglen = sizeof(msg);
res = vsg_recv(context, &src, &dest, &msglen, msg);
if (res)
die("vsg_recv() failed", res);
res = vsg_stop(context);
if (res)
die("vsg_stop() failed", res);
......
......@@ -4,6 +4,7 @@ use std::{error, fmt, io};
pub enum Error {
AlreadyStarted,
NoMemoryAvailable,
NoMessageAvailable,
ProtocolViolation,
SizeTooBig,
IoError(io::Error),
......@@ -23,6 +24,7 @@ impl fmt::Display for Error {
let msg = match simple {
Error::AlreadyStarted => "Already Started",
Error::NoMemoryAvailable => "No memory available",
Error::NoMessageAvailable => "No message available",
Error::ProtocolViolation => "Protocol violation",
Error::SizeTooBig => "Size too big",
Error::IoError(_) => unimplemented!(),
......
......@@ -40,7 +40,7 @@ impl From<output_msg_set::Error> for Error {
pub type Result<T> = std::result::Result<T, Error>;
pub type RecvCallback = Box<Fn(&Context, &[u8]) -> () + Send + Sync>;
pub type RecvCallback = Box<dyn Fn() -> () + Send + Sync>;
// #[derive(Debug)]
// pub struct Destination {
......@@ -154,6 +154,29 @@ impl InnerContext {
self.outgoing_messages.insert(send_time, src, dest, buffer)?;
Ok(())
}
fn recv<'a, 'b>(&'a self, msg: &'b mut [u8]) -> Result<(VsgAddress, VsgAddress, &'b mut [u8])> {
match self.input_queue.pop() {
Some(Packet { src, dst, payload, }) => {
if msg.len() >= payload.len() {
let msg = &mut msg[..payload.len()];
msg.copy_from_slice(&payload);
Ok((src, dst, msg))
} else {
Err(Error::SizeTooBig)
}
},
None => Err(Error::NoMessageAvailable),
}
}
fn poll(&self) -> Option<()> {
if self.input_queue.is_empty() {
None
} else {
Some(())
}
}
}
#[derive(Debug)]
......@@ -231,7 +254,7 @@ impl Context {
// Third, receive messages from others, followed by next deadline
let input_queue = &self.0.input_queue;
// let may_notify = input_queue.is_empty();
let may_notify = input_queue.is_empty();
let after_deadline = loop {
let msg = connector.recv();
......@@ -246,11 +269,8 @@ impl Context {
}
};
// if may_notify && !input_queue.is_empty() {
// (self.0.recv_callback)(self.0.recv_token);
// }
for packet in input_queue.iter() {
(self.0.recv_callback)(self, &packet.payload);
if may_notify && !input_queue.is_empty() {
(self.0.recv_callback)();
}
after_deadline
......@@ -282,8 +302,12 @@ impl Context {
self.0.send(src, dest, msg)
}
pub fn poll(&self, buffer: &mut [u8]) -> Result<()> {
unimplemented!()
pub fn recv<'a, 'b>(&'a self, msg: &'b mut [u8]) -> Result<(VsgAddress, VsgAddress, &'b mut [u8])> {
self.0.recv(msg)
}
pub fn poll(&self) -> Option<()> {
self.0.poll()
}
}
......@@ -301,6 +325,8 @@ pub fn init<I>(args: I, recv_callback: RecvCallback) -> Result<Box<Context>>
#[cfg(any(test, feature = "test-helpers"))]
#[macro_use]
pub mod test_helpers {
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use super::connector::{MsgIn, MsgOut};
#[cfg(feature = "test-helpers")]
......@@ -350,7 +376,7 @@ pub mod test_helpers {
}
}
pub fn dummy_recv_callback(_context: &super::Context, _packet: &[u8]) -> () {
pub fn dummy_recv_callback() -> () {
}
pub fn start_actor(actor: &mut TestActor) -> TestResult<()> {
......@@ -373,6 +399,58 @@ pub mod test_helpers {
actor.send(MsgIn::EndSimulation)
}
pub fn send_one_msg_actor(actor: &mut TestActor, msg: &[u8]) -> TestResult<()> {
send_one_delayed_msg_actor(actor, msg, 100, 100)
}
pub fn send_one_delayed_msg_actor(actor: &mut TestActor, msg: &[u8], slice_micros: u64, delay_micros: u64) -> TestResult<()> {
let buffer_pool = crate::BufferPool::new(msg.len(), 1);
let mut buffer = TestActor::check(buffer_pool.allocate_buffer(msg.len()), "Buffer allocation failed")?;
(&mut buffer).copy_from_slice(msg);
let mut next_deadline_micros = slice_micros;
while next_deadline_micros < delay_micros {
actor.send(MsgIn::GoToDeadline(Duration::from_micros(slice_micros)))?;
loop {
match actor.recv()? {
MsgOut::AtDeadline => break,
_ => (),
}
}
next_deadline_micros += slice_micros;
}
let next_slice = delay_micros - (next_deadline_micros - slice_micros);
actor.send(MsgIn::GoToDeadline(Duration::from_micros(next_slice)))?;
actor.send(MsgIn::DeliverPacket(buffer))?;
actor.send(MsgIn::EndSimulation)
}
#[derive(Clone)]
pub struct RecvNotifier(Arc<AtomicBool>);
impl RecvNotifier {
pub fn new() -> RecvNotifier {
RecvNotifier(Arc::new(AtomicBool::new(false)))
}
pub fn notify(&self) -> () {
self.0.store(true, Ordering::SeqCst);
}
pub fn wait(&self, pause_slice_micros: u64) -> () {
while !self.0.load(Ordering::Acquire) {
std::thread::sleep(Duration::from_micros(pause_slice_micros));
}
self.0.store(false, Ordering::SeqCst);
}
pub fn get_callback(&self) -> crate::RecvCallback {
let cb_notifier = self.clone();
Box::new(move || cb_notifier.notify())
}
}
static INIT: std::sync::Once = std::sync::ONCE_INIT;
......@@ -500,6 +578,141 @@ mod test {
drop(actor);
}
#[test]
fn recv() {
init();
const EXPECTED_MSG: &[u8] = b"Foo msg";
let mut buffer = [0u8; EXPECTED_MSG.len()];
let actor = TestActorDesc::new("titi", |actor| send_one_msg_actor(actor, EXPECTED_MSG));
let recv_notifier = RecvNotifier::new();
let context = super::init(valid_args!(), recv_notifier.get_callback())
.expect("init failed");
context.start()
.expect("start failed");
recv_notifier.wait(1000);
let (src, dst, buffer) = context.recv(&mut buffer)
.expect("recv failed");
// FIXME: Use the remote address when available in the protocol
assert_eq!(src, local_vsg_address!());
assert_eq!(dst, local_vsg_address!());
assert_eq!(buffer, EXPECTED_MSG);
context.stop();
drop(actor);
}
fn recv_one_delay(slice_micros: u64, delay_micros: u64) {
const EXPECTED_MSG: &[u8] = b"Foo msg";
let mut buffer = [0u8; EXPECTED_MSG.len()];
let actor = TestActorDesc::new("titi", |actor| send_one_delayed_msg_actor(actor, EXPECTED_MSG, slice_micros, delay_micros));
let recv_notifier = RecvNotifier::new();
let context = super::init(valid_args!(), recv_notifier.get_callback())
.expect("init failed");
context.start()
.expect("start failed");
recv_notifier.wait(delay_micros / 10);
let (src, dst, buffer) = context.recv(&mut buffer)
.expect("recv failed");
let tv = context.gettimeofday();
// FIXME: Use the remote address when available in the protocol
assert_eq!(src, local_vsg_address!());
assert_eq!(dst, local_vsg_address!());
assert_eq!(buffer, EXPECTED_MSG);
let total_usec = tv.tv_sec as u64 * 1_000_000 + tv.tv_usec as u64;
assert!(delay_micros <= total_usec && total_usec <= 10 * delay_micros);
context.stop();
drop(actor);
}
#[test]
fn recv_delayed() {
init();
for delay_slices in 1..=100 {
recv_one_delay(100, delay_slices * 100);
}
}
#[test]
fn recv_too_big() {
init();
const EXPECTED_MSG: &[u8] = b"Foo msg";
const ORIG_BUFFER: &[u8] = b"fOO~MS";
let mut buffer: [u8; ORIG_BUFFER.len()] = Default::default();
buffer.copy_from_slice(ORIG_BUFFER);
let actor = TestActorDesc::new("titi", |actor| send_one_msg_actor(actor, EXPECTED_MSG));
let recv_notifier = RecvNotifier::new();
let context = super::init(valid_args!(), recv_notifier.get_callback())
.expect("init failed");
context.start()
.expect("start failed");
recv_notifier.wait(1000);
match context.recv(&mut buffer).expect_err("recv should have failed") {
crate::error::Error::SizeTooBig => (),
_ => assert!(false),
}
assert_eq!(buffer, ORIG_BUFFER);
context.stop();
drop(actor);
}
#[test]
fn poll() {
init();
const EXPECTED_MSG: &[u8] = b"Foo msg";
let mut buffer = [0u8; EXPECTED_MSG.len()];
let actor = TestActorDesc::new("titi", |actor| send_one_msg_actor(actor, EXPECTED_MSG));
let context = super::init(valid_args!(), Box::new(dummy_recv_callback))
.expect("init failed");
context.start()
.expect("start failed");
while !context.poll().is_some() {
std::thread::sleep(std::time::Duration::from_micros(1000));
}
let (src, dst, buffer) = context.recv(&mut buffer)
.expect("recv failed");
// FIXME: Use the remote address when available in the protocol
assert_eq!(src, local_vsg_address!());
assert_eq!(dst, local_vsg_address!());
assert_eq!(buffer, EXPECTED_MSG);
context.stop();
drop(actor);
}
#[test]
fn gettimeofday() {
init();
......
......@@ -41,12 +41,6 @@ using namespace std;
typedef void scenario(int);
/* For use in deliver_one test. */
std::atomic<bool> message_delivered(false);
/* For use in send_deliver_pg_port. */
std::atomic<uint32_t> port_delivered(0);
pid_t simple_actor(scenario f)
{
// I don't care about the status...
......@@ -146,7 +140,7 @@ private:
vsg_context* context;
};
void recv_cb(const struct vsg_context* context, uint32_t msglen, const uint8_t* msg)
void recv_cb(uintptr_t arg)
{
// Try not to deadlock with libc's stdout
const char hey[] = "callback called\n";
......@@ -290,7 +284,7 @@ void TestTansiv::testVsgStart(void)
int argc = 6;
const char* const argv[] = {"-a", SOCKET_ACTOR, "-n", SRC, "-t", "1970-01-01T00:00:00"};
vsg_context* context = vsg_init(argc, argv, NULL, recv_cb);
vsg_context* context = vsg_init(argc, argv, NULL, recv_cb, 0);
CPPUNIT_ASSERT(context != NULL);
int ret = vsg_start(context);
......@@ -309,7 +303,7 @@ void TestTansiv::testVsgSend(void)
int argc = 6;
const char* const argv[] = {"-a", SOCKET_ACTOR, "-n", SRC, "-t", "1970-01-01T00:00:00"};
vsg_context* context = vsg_init(argc, argv, NULL, recv_cb);
vsg_context* context = vsg_init(argc, argv, NULL, recv_cb, 0);
int ret = vsg_start(context);
std::string msg = MESSAGE;
in_addr_t src = inet_addr(SRC);
......@@ -333,7 +327,7 @@ void TestTansiv::testVsgSendEnsureRaise(void)
int argc = 6;
const char* const argv[] = {"-a", SOCKET_ACTOR, "-n", SRC, "-t", "1970-01-01T00:00:00"};
vsg_context* context = vsg_init(argc, argv, NULL, recv_cb);
vsg_context* context = vsg_init(argc, argv, NULL, recv_cb, 0);
int ret = vsg_start(context);
in_addr_t src = inet_addr(SRC);
in_addr_t dest = inet_addr(DEST);
......@@ -377,13 +371,21 @@ void TestTansiv::testVsgPiggyBackPort(void)
CPPUNIT_ASSERT_EQUAL(msg, actual_msg);
}
void recv_cb_pg(const struct vsg_context* context, uint32_t msglen, const uint8_t* msg)
in_port_t recv_pg(const struct vsg_context* context)
{
uint8_t payload[sizeof(MESSAGE) + sizeof(in_port_t)];
uint32_t payload_len = sizeof(payload);
int ret = vsg_recv(context, NULL, NULL, &payload_len, payload);
if (ret) {
// Return 0 as an error port number
return 0;
}
// un-piggyback
in_port_t recv_port;
uint8_t* recv_payload;
vsg_upg_port((void*)msg, msglen, &recv_port, &recv_payload);
port_delivered = recv_port;
vsg_upg_port((void*)payload, payload_len, &recv_port, &recv_payload);
return recv_port;
};
/*
......@@ -398,7 +400,7 @@ void TestTansiv::testVsgSendPiggyBackPort(void)
int argc = 6;
const char* const argv[] = {"-a", SOCKET_ACTOR, "-n", SRC, "-t", "1970-01-01T00:00:00"};
vsg_context* context = vsg_init(argc, argv, NULL, recv_cb_pg);
vsg_context* context = vsg_init(argc, argv, NULL, recv_cb, 0);
int ret = vsg_start(context);
in_port_t port = 5000;
std::string msg = MESSAGE;
......@@ -413,16 +415,16 @@ void TestTansiv::testVsgSendPiggyBackPort(void)
// fire!
vsg_send(context, src, dest, payload_length, payload);
// loop until our atomic is set
// loop until some message arrives
// this shouldn't take long ...
for (int i = 0; i < 3; i++) {
if (port_delivered.load() > 0)
if (vsg_poll(context) == 0)
break;
sleep(1);
}
// test the receive port
CPPUNIT_ASSERT_EQUAL((uint32_t)port, port_delivered.load());
CPPUNIT_ASSERT_EQUAL(port, recv_pg(context));
vsg_stop(context);
vsg_cleanup(context);
......@@ -430,9 +432,10 @@ void TestTansiv::testVsgSendPiggyBackPort(void)
finalize(pid);
}
void recv_cb_atomic(const struct vsg_context* context, uint32_t msglen, const uint8_t* msg)
void recv_cb_atomic(uintptr_t arg)
{
message_delivered = true;
std::atomic<bool>* message_delivered = (std::atomic<bool>*)arg;
*message_delivered = true;
};
void TestTansiv::testVsgDeliver(void)
......@@ -442,7 +445,8 @@ void TestTansiv::testVsgDeliver(void)
int argc = 6;
const char* const argv[] = {"-a", SOCKET_ACTOR, "-n", SRC, "-t", "1970-01-01T00:00:00"};
vsg_context* context = vsg_init(argc, argv, NULL, recv_cb_atomic);
std::atomic<bool> message_delivered(false);
vsg_context* context = vsg_init(argc, argv, NULL, recv_cb_atomic, (uintptr_t)&message_delivered);
int ret = vsg_start(context);
// loop until our atomic is set to true
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment