Commit 4c5651b9 authored by RILLING Louis's avatar RILLING Louis
Browse files

tansiv: Provide vsg_init() with a callback to notify deadline changes

Deadline changes are notified at the end of deadline handling, right
before unfreezing execution time.
parent 87b671bb
Pipeline #235415 canceled with stages
in 9 minutes and 25 seconds
......@@ -40,6 +40,10 @@ void recv_cb(uintptr_t arg)
*callback_called = true;
};
void deadline_cb(uintptr_t arg, struct timespec deadline)
{
}
vsg_context* init_vsg(int argc, char* argv[])
{
std::string src_str = "10.0.0.1";
......@@ -47,7 +51,7 @@ vsg_context* init_vsg(int argc, char* argv[])
int vsg_argc = 6;
const char* const vsg_argv[] = {"-a", CONNECTION_SOCKET_NAME, "-n", src_str.c_str(), "-t", "1970-01-01T00:00:00"};
std::atomic<bool> callback_called(false);
vsg_context* context = vsg_init(vsg_argc, vsg_argv, NULL, recv_cb, (uintptr_t)&callback_called);
vsg_context* context = vsg_init(vsg_argc, vsg_argv, NULL, recv_cb, (uintptr_t)&callback_called, deadline_cb, 0);
if (!context) {
die("Unable to initialize the context", 0);
......
......@@ -36,6 +36,10 @@ void recv_cb(uintptr_t arg)
*callback_called = true;
};
void deadline_cb(uintptr_t arg, struct timespec deadline)
{
}
int main(int argc, char* argv[])
{
// initialization phase
......@@ -50,7 +54,7 @@ int main(int argc, char* argv[])
int vsg_argc = 6;
const char* const vsg_argv[] = {"-a", CONNECTION_SOCKET_NAME, "-n", src_str, "-t", "1970-01-01T00:00:00"};
std::atomic<bool> callback_called(false);
vsg_context* context = vsg_init(vsg_argc, vsg_argv, NULL, recv_cb, (uintptr_t)&callback_called);
vsg_context* context = vsg_init(vsg_argc, vsg_argv, NULL, recv_cb, (uintptr_t)&callback_called, deadline_cb, 0);
if (!context) {
die("Unable to initialize the context", 0);
......
......@@ -85,6 +85,7 @@ dependencies = [
"chrono",
"libc",
"log",
"seq_lock",
"static_assertions",
"tansiv-client",
]
......
......@@ -20,3 +20,4 @@ static_assertions = "0.3.1"
[dev-dependencies]
tansiv-client = {version = "0.1.0", path = "../tansiv-client", features = ["test-helpers"]}
seq_lock = {version = "0.1.0", path = "../../seq_lock"}
......@@ -10,9 +10,11 @@
struct vsg_context;
typedef void (*vsg_recv_cb)(uintptr_t recv_cb_arg);
typedef void (*vsg_deadline_cb)(uintptr_t deadline_cb_arg, struct timespec deadline);
struct vsg_context* vsg_init(int argc, const char* const argv[], int* next_arg_p, vsg_recv_cb recv_cb,
uintptr_t recv_cb_arg);
struct vsg_context* vsg_init(int argc, const char* const argv[], int* next_arg_p,
vsg_recv_cb recv_cb, uintptr_t recv_cb_arg,
vsg_deadline_cb, uintptr_t deadline_cb_arg);
void vsg_cleanup(struct vsg_context* context);
int vsg_start(const struct vsg_context* context, struct timespec* offset);
......
......@@ -41,12 +41,21 @@ unsafe fn parse_os_args<F, T>(argc: c_int, argv: *const *const c_char, parse: F)
}
type CRecvCallback = unsafe extern "C" fn(uintptr_t);
type CDeadlineCallback = unsafe extern "C" fn(uintptr_t, libc::timespec);
fn duration_to_timespec(duration: std::time::Duration) -> libc::timespec {
libc::timespec {
tv_sec: duration.as_secs() as libc::time_t,
tv_nsec: duration.subsec_nanos() as libc::c_long,
}
}
#[no_mangle]
pub unsafe extern fn vsg_init(argc: c_int, argv: *const *const c_char, next_arg_p: *mut c_int, recv_callback: CRecvCallback, recv_callback_arg: uintptr_t) -> *const Context {
let callback: tansiv_client::RecvCallback = Box::new(move || recv_callback(recv_callback_arg));
pub unsafe extern fn vsg_init(argc: c_int, argv: *const *const c_char, next_arg_p: *mut c_int, recv_callback: CRecvCallback, recv_callback_arg: uintptr_t, deadline_callback: CDeadlineCallback, deadline_callback_arg: uintptr_t) -> *const Context {
let recv_callback: tansiv_client::RecvCallback = Box::new(move || recv_callback(recv_callback_arg));
let deadline_callback: tansiv_client::DeadlineCallback = Box::new(move |deadline| deadline_callback(deadline_callback_arg, duration_to_timespec(deadline)));
match parse_os_args(argc, argv, |args| tansiv_client::init(args, callback)) {
match parse_os_args(argc, argv, |args| tansiv_client::init(args, recv_callback, deadline_callback)) {
Ok((context, next_arg)) => {
if let Some(next_arg_p) = next_arg_p.as_mut() {
*next_arg_p = next_arg;
......@@ -306,6 +315,7 @@ mod test {
use libc::{timespec, timeval};
#[allow(unused_imports)]
use log::{error, info};
use seq_lock::SeqLock;
use std::ffi::CString;
use std::pin::Pin;
use std::os::raw::{c_char, c_int};
......@@ -451,6 +461,9 @@ mod test {
extern "C" fn dummy_recv_callback(_arg: uintptr_t) -> () {
}
extern "C" fn dummy_deadline_callback(_arg: uintptr_t, _deadline: timespec) -> () {
}
// C-style version of tansiv_client::test_helpers::RecvNotifier
// Ugly
struct RecvNotifier(AtomicBool);
......@@ -493,7 +506,7 @@ mod test {
let mut next_arg: c_int = 0;
let actor = TestActorDesc::new("titi", TestActor::dummy_actor);
let args = valid_args!();
let context = unsafe { vsg_init(args.argc(), args.argv(), &mut next_arg, dummy_recv_callback, 0) };
let context = unsafe { vsg_init(args.argc(), args.argv(), &mut next_arg, dummy_recv_callback, 0, dummy_deadline_callback, 0) };
assert!(!context.is_null());
assert_eq!(args.argc(), next_arg);
......@@ -507,7 +520,7 @@ mod test {
let actor = TestActorDesc::new("titi", TestActor::dummy_actor);
let args = valid_args!();
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), dummy_recv_callback, 0) };
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), dummy_recv_callback, 0, dummy_deadline_callback, 0) };
assert!(!context.is_null());
unsafe { vsg_cleanup(context) };
......@@ -521,7 +534,7 @@ mod test {
let mut next_arg: c_int = 0;
let actor = TestActorDesc::new("titi", TestActor::dummy_actor);
let args = invalid_args!();
let context = unsafe { vsg_init(args.argc(), args.argv(), &mut next_arg, dummy_recv_callback, 0) };
let context = unsafe { vsg_init(args.argc(), args.argv(), &mut next_arg, dummy_recv_callback, 0, dummy_deadline_callback, 0) };
assert!(context.is_null());
assert_eq!(0, next_arg);
......@@ -534,7 +547,7 @@ mod test {
let actor = TestActorDesc::new("titi", TestActor::dummy_actor);
let args = invalid_args!();
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), dummy_recv_callback, 0) };
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), dummy_recv_callback, 0, dummy_deadline_callback, 0) };
assert!(context.is_null());
drop(actor);
......@@ -557,13 +570,24 @@ mod test {
tv_usec: std::i64::MAX,
};
const START_ACTOR_DEADLINE_TIMESPEC: timespec = timespec {
tv_sec: START_ACTOR_DEADLINE.as_secs() as libc::time_t,
tv_nsec: START_ACTOR_DEADLINE.subsec_nanos() as libc::c_long,
};
extern "C" fn simple_deadline_callback(arg: uintptr_t, deadline: timespec) -> () {
let record_deadline = unsafe { (arg as *const SeqLock::<timespec>).as_ref().unwrap() };
record_deadline.write(|_| deadline);
}
#[test]
fn start_stop() {
init();
let actor = TestActorDesc::new("titi", start_actor);
let args = valid_args!();
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), dummy_recv_callback, 0) };
let deadline = SeqLock::new(timespec { tv_sec: 0, tv_nsec: 0, });
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), dummy_recv_callback, 0, simple_deadline_callback, &deadline as *const _ as uintptr_t) };
assert!(!context.is_null());
let mut offset = TIMESPEC_POISON;
......@@ -571,6 +595,9 @@ mod test {
assert_eq!(0, res);
assert_eq!(0, offset.tv_sec);
assert_eq!(0, offset.tv_nsec);
let deadline = deadline.read(|d| d);
assert!(START_ACTOR_DEADLINE_TIMESPEC.tv_sec == deadline.tv_sec &&
START_ACTOR_DEADLINE_TIMESPEC.tv_nsec == deadline.tv_nsec);
let res: c_int = unsafe { vsg_stop(context) };
assert_eq!(0, res);
......@@ -593,7 +620,7 @@ mod test {
let actor = TestActorDesc::new("titi", start_actor);
let args = valid_args!();
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), dummy_recv_callback, 0) };
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), dummy_recv_callback, 0, dummy_deadline_callback, 0) };
assert!(!context.is_null());
let res: c_int = unsafe { vsg_start(context, std::ptr::null_mut()) };
......@@ -620,7 +647,7 @@ mod test {
let actor = TestActorDesc::new("titi", recv_one_msg_actor);
let args = valid_args!();
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), dummy_recv_callback, 0) };
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), dummy_recv_callback, 0, dummy_deadline_callback, 0) };
assert!(!context.is_null());
let res: c_int = unsafe { vsg_start(context, std::ptr::null_mut()) };
......@@ -644,7 +671,7 @@ mod test {
let actor = TestActorDesc::new("titi", recv_one_msg_actor);
let args = valid_args!();
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), dummy_recv_callback, 0) };
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), dummy_recv_callback, 0, dummy_deadline_callback, 0) };
assert!(!context.is_null());
let res: c_int = unsafe { vsg_start(context, std::ptr::null_mut()) };
......@@ -667,7 +694,7 @@ mod test {
let actor = TestActorDesc::new("titi", recv_one_msg_actor);
let args = valid_args!();
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), dummy_recv_callback, 0) };
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), dummy_recv_callback, 0, dummy_deadline_callback, 0) };
assert!(!context.is_null());
let res: c_int = unsafe { vsg_start(context, std::ptr::null_mut()) };
......@@ -695,7 +722,7 @@ mod test {
let actor = TestActorDesc::new("titi", recv_one_msg_actor);
let args = valid_args!();
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), dummy_recv_callback, 0) };
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), dummy_recv_callback, 0, dummy_deadline_callback, 0) };
assert!(!context.is_null());
let res: c_int = unsafe { vsg_start(context, std::ptr::null_mut()) };
......@@ -740,7 +767,7 @@ mod test {
let args = valid_args!();
let recv_notifier = RecvNotifier::new();
let recv_notifier = recv_notifier.pin();
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), RecvNotifier::callback, RecvNotifier::get_callback_arg(&recv_notifier)) };
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), RecvNotifier::callback, RecvNotifier::get_callback_arg(&recv_notifier), dummy_deadline_callback, 0) };
assert!(!context.is_null());
let res: c_int = unsafe { vsg_start(context, std::ptr::null_mut()) };
......@@ -778,7 +805,7 @@ mod test {
let args = valid_args!();
let recv_notifier = RecvNotifier::new();
let recv_notifier = recv_notifier.pin();
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), RecvNotifier::callback, RecvNotifier::get_callback_arg(&recv_notifier)) };
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), RecvNotifier::callback, RecvNotifier::get_callback_arg(&recv_notifier), dummy_deadline_callback, 0) };
assert!(!context.is_null());
let res: c_int = unsafe { vsg_start(context, std::ptr::null_mut()) };
......@@ -814,7 +841,7 @@ mod test {
let args = valid_args!();
let recv_notifier = RecvNotifier::new();
let recv_notifier = recv_notifier.pin();
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), RecvNotifier::callback, RecvNotifier::get_callback_arg(&recv_notifier)) };
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), RecvNotifier::callback, RecvNotifier::get_callback_arg(&recv_notifier), dummy_deadline_callback, 0) };
assert!(!context.is_null());
let res: c_int = unsafe { vsg_start(context, std::ptr::null_mut()) };
......@@ -850,7 +877,7 @@ mod test {
let args = valid_args!();
let recv_notifier = RecvNotifier::new();
let recv_notifier = recv_notifier.pin();
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), RecvNotifier::callback, RecvNotifier::get_callback_arg(&recv_notifier)) };
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), RecvNotifier::callback, RecvNotifier::get_callback_arg(&recv_notifier), dummy_deadline_callback, 0) };
assert!(!context.is_null());
let res: c_int = unsafe { vsg_start(context, std::ptr::null_mut()) };
......@@ -884,7 +911,7 @@ mod test {
let args = valid_args!();
let recv_notifier = RecvNotifier::new();
let recv_notifier = recv_notifier.pin();
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), RecvNotifier::callback, RecvNotifier::get_callback_arg(&recv_notifier)) };
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), RecvNotifier::callback, RecvNotifier::get_callback_arg(&recv_notifier), dummy_deadline_callback, 0) };
assert!(!context.is_null());
let res: c_int = unsafe { vsg_start(context, std::ptr::null_mut()) };
......@@ -920,7 +947,7 @@ mod test {
let args = valid_args!();
let recv_notifier = RecvNotifier::new();
let recv_notifier = recv_notifier.pin();
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), RecvNotifier::callback, RecvNotifier::get_callback_arg(&recv_notifier)) };
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), RecvNotifier::callback, RecvNotifier::get_callback_arg(&recv_notifier), dummy_deadline_callback, 0) };
assert!(!context.is_null());
let res: c_int = unsafe { vsg_start(context, std::ptr::null_mut()) };
......@@ -975,7 +1002,7 @@ mod test {
let args = valid_args!();
let recv_notifier = RecvNotifier::new();
let recv_notifier = recv_notifier.pin();
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), RecvNotifier::callback, RecvNotifier::get_callback_arg(&recv_notifier)) };
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), RecvNotifier::callback, RecvNotifier::get_callback_arg(&recv_notifier), dummy_deadline_callback, 0) };
assert!(!context.is_null());
let res: c_int = unsafe { vsg_start(context, std::ptr::null_mut()) };
......@@ -1013,7 +1040,7 @@ mod test {
let args = valid_args!();
let recv_notifier = RecvNotifier::new();
let recv_notifier = recv_notifier.pin();
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), RecvNotifier::callback, RecvNotifier::get_callback_arg(&recv_notifier)) };
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), RecvNotifier::callback, RecvNotifier::get_callback_arg(&recv_notifier), dummy_deadline_callback, 0) };
assert!(!context.is_null());
let res: c_int = unsafe { vsg_start(context, std::ptr::null_mut()) };
......@@ -1051,7 +1078,7 @@ mod test {
let args = valid_args!();
let recv_notifier = RecvNotifier::new();
let recv_notifier = recv_notifier.pin();
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), RecvNotifier::callback, RecvNotifier::get_callback_arg(&recv_notifier)) };
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), RecvNotifier::callback, RecvNotifier::get_callback_arg(&recv_notifier), dummy_deadline_callback, 0) };
assert!(!context.is_null());
let res: c_int = unsafe { vsg_start(context, std::ptr::null_mut()) };
......@@ -1089,7 +1116,7 @@ mod test {
let args = valid_args!();
let recv_notifier = RecvNotifier::new();
let recv_notifier = recv_notifier.pin();
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), RecvNotifier::callback, RecvNotifier::get_callback_arg(&recv_notifier)) };
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), RecvNotifier::callback, RecvNotifier::get_callback_arg(&recv_notifier), dummy_deadline_callback, 0) };
assert!(!context.is_null());
let res: c_int = unsafe { vsg_start(context, std::ptr::null_mut()) };
......@@ -1129,7 +1156,7 @@ mod test {
let actor = TestActorDesc::new("titi", recv_one_msg_actor);
let args = valid_args!();
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), dummy_recv_callback, 0) };
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), dummy_recv_callback, 0, dummy_deadline_callback, 0) };
assert!(!context.is_null());
let res: c_int = unsafe { vsg_start(context, std::ptr::null_mut()) };
......@@ -1159,7 +1186,7 @@ mod test {
let actor = TestActorDesc::new("titi", recv_one_msg_actor);
let args = valid_args!();
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), dummy_recv_callback, 0) };
let context = unsafe { vsg_init(args.argc(), args.argv(), std::ptr::null_mut(), dummy_recv_callback, 0, dummy_deadline_callback, 0) };
assert!(!context.is_null());
let res: c_int = unsafe { vsg_start(context, std::ptr::null_mut()) };
......
......@@ -12,6 +12,12 @@ void recv_cb(uintptr_t arg)
*flag = true;
}
void deadline_cb(uintptr_t arg, struct timespec deadline)
{
struct timespec *recorded_deadline = (struct timespec *)arg;
*recorded_deadline = deadline;
}
void die(const char* msg, int error)
{
fprintf(stderr, "%s", msg);
......@@ -26,10 +32,11 @@ int main(int argc, const char* argv[])
struct timespec offset;
struct timeval time;
int flag = false;
struct timespec deadline;
unsigned char msg[] = "Foo msg";
int res;
context = vsg_init(argc, argv, NULL, recv_cb, (uintptr_t)&flag);
context = vsg_init(argc, argv, NULL, recv_cb, (uintptr_t)&flag, deadline_cb, (uintptr_t)&deadline);
if (!context)
die("vsg_init() failed", 0);
......
......@@ -44,6 +44,7 @@ impl From<output_msg_set::Error> for Error {
pub type Result<T> = std::result::Result<T, Error>;
pub type RecvCallback = Box<dyn Fn() -> () + Send + Sync>;
pub type DeadlineCallback = Box<dyn Fn(Duration) -> () + Send + Sync>;
#[derive(Debug)]
struct Packet {
......@@ -78,6 +79,8 @@ pub struct Context {
input_queue: WaitfreeArrayQueue<Packet>,
// No concurrency, read-only: called only by the deadline handler
recv_callback: RecvCallback,
// No concurrency, read-only: called only by ::start() and the deadline handler
deadline_callback: DeadlineCallback,
// Concurrency:
// - read-only by application code,
// - read-write by the deadline handler, using interior mutability
......@@ -106,7 +109,7 @@ enum AfterDeadline {
}
impl Context {
fn new(config: &Config, recv_callback: RecvCallback) -> Result<Arc<Context>> {
fn new(config: &Config, recv_callback: RecvCallback, deadline_callback: DeadlineCallback) -> Result<Arc<Context>> {
let address = config.address;
let connector = ConnectorImpl::new(config)?;
let input_queue = WaitfreeArrayQueue::new(config.num_buffers.get());
......@@ -119,6 +122,7 @@ impl Context {
connector: Mutex::new(connector),
input_queue: input_queue,
recv_callback: recv_callback,
deadline_callback: deadline_callback,
timer_context: timer_context,
output_buffer_pool: output_buffer_pool,
outgoing_messages: outgoing_messages,
......@@ -220,6 +224,10 @@ impl Context {
(self.recv_callback)();
}
if let AfterDeadline::NextDeadline(deadline) = after_deadline {
(self.deadline_callback)(deadline);
}
deadline_handler_debug!("Context::at_deadline() after_deadline = {:?}", after_deadline);
after_deadline
}
......@@ -285,7 +293,7 @@ impl Context {
}
}
pub fn init<I>(args: I, recv_callback: RecvCallback) -> Result<Arc<Context>>
pub fn init<I>(args: I, recv_callback: RecvCallback, deadline_callback: DeadlineCallback) -> Result<Arc<Context>>
where I: IntoIterator,
I::Item: Into<std::ffi::OsString> + Clone {
use structopt::StructOpt;
......@@ -302,14 +310,14 @@ pub fn init<I>(args: I, recv_callback: RecvCallback) -> Result<Arc<Context>>
let config = Config::from_iter_safe(args).or_else(|e| Err(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
debug!("{:?}", config);
Context::new(&config, recv_callback)
Context::new(&config, recv_callback, deadline_callback)
}
#[cfg(any(test, feature = "test-helpers"))]
#[macro_use]
pub mod test_helpers {
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::Duration;
use super::connector::{MsgIn, MsgOut};
#[cfg(feature = "test-helpers")]
......@@ -362,6 +370,9 @@ pub mod test_helpers {
pub fn dummy_recv_callback() -> () {
}
pub fn dummy_deadline_callback(_deadline: Duration) -> () {
}
pub const START_ACTOR_DEADLINE: Duration = Duration::from_nanos(100000);
pub fn start_actor(actor: &mut TestActor) -> TestResult<()> {
......@@ -445,6 +456,43 @@ pub mod test_helpers {
}
}
struct DeadlineNotifierData {
deadline: seq_lock::SeqLock<Duration>,
num_called: AtomicUsize,
}
#[derive(Clone)]
pub struct DeadlineNotifier(Arc<DeadlineNotifierData>);
impl DeadlineNotifier {
pub const INITIAL_DEADLINE: Duration = Duration::from_secs(0);
pub fn new() -> DeadlineNotifier {
DeadlineNotifier(Arc::new(DeadlineNotifierData {
deadline: seq_lock::SeqLock::new(Self::INITIAL_DEADLINE),
num_called: AtomicUsize::new(0),
}))
}
pub fn notify(&self, deadline: Duration) -> () {
self.0.deadline.write(|_| deadline);
self.0.num_called.fetch_add(1, Ordering::SeqCst);
}
pub fn get_callback(&self) -> crate::DeadlineCallback {
let cb_notifier = self.clone();
Box::new(move |deadline| cb_notifier.notify(deadline))
}
pub fn deadline(&self) -> Duration {
self.0.deadline.read(|d| d)
}
pub fn num_called(&self) -> usize {
self.0.num_called.load(Ordering::SeqCst)
}
}
static INIT: std::sync::Once = std::sync::Once::new();
pub fn init() {
......@@ -458,6 +506,7 @@ pub mod test_helpers {
mod test {
#[allow(unused_imports)]
use log::{error, info};
use std::time::Duration;
use super::connector::Connector;
use super::{connector::test_helpers::*, test_helpers::*};
......@@ -466,7 +515,7 @@ mod test {
init();
let actor = TestActorDesc::new("titi", TestActor::dummy_actor);
super::init(valid_args!(), Box::new(dummy_recv_callback))
super::init(valid_args!(), Box::new(dummy_recv_callback), Box::new(dummy_deadline_callback))
.expect("init failed");
// assert_eq!(chrono::NaiveDateTime::from_timestamp(0, 0), context.0.simulation_offset);
......@@ -479,7 +528,7 @@ mod test {
init();
let actor = TestActorDesc::new("titi", TestActor::dummy_actor);
super::init(invalid_args!(), Box::new(dummy_recv_callback))
super::init(invalid_args!(), Box::new(dummy_recv_callback), Box::new(dummy_deadline_callback))
.expect_err("init returned a context");
drop(actor);
......@@ -490,8 +539,11 @@ mod test {
init();
let actor = TestActorDesc::new("titi", start_actor);
let context = super::init(valid_args!(), Box::new(dummy_recv_callback))
let deadline_notifier = DeadlineNotifier::new();
let context = super::init(valid_args!(), Box::new(dummy_recv_callback), deadline_notifier.get_callback())
.expect("init failed");
assert_eq!(DeadlineNotifier::INITIAL_DEADLINE, deadline_notifier.deadline());
assert_eq!(0, deadline_notifier.num_called());
let offset = context.start()
.expect("start failed");
......@@ -507,7 +559,8 @@ mod test {
init();
let actor = TestActorDesc::new("titi", start_actor);
let context = super::init(valid_args!(), Box::new(dummy_recv_callback))
let deadline_notifier = DeadlineNotifier::new();
let context = super::init(valid_args!(), Box::new(dummy_recv_callback), deadline_notifier.get_callback())
.expect("init failed");
context.start()
......@@ -517,6 +570,7 @@ mod test {
super::error::Error::AlreadyStarted => (),
_ => assert!(false),
}
assert_eq!(1, deadline_notifier.num_called());
context.stop();
......@@ -528,7 +582,8 @@ mod test {
init();
let actor = TestActorDesc::new("titi", recv_one_msg_actor);
let context = super::init(valid_args!(), Box::new(dummy_recv_callback))
let deadline_notifier = DeadlineNotifier::new();
let context = super::init(valid_args!(), Box::new(dummy_recv_callback), deadline_notifier.get_callback())
.expect("init failed");
context.start()
......@@ -539,6 +594,9 @@ mod test {
.expect("send failed");
context.stop();
let num_called = deadline_notifier.num_called();
assert!(num_called > 0);
assert_eq!((num_called as u32) * RECV_ONE_MSG_ACTOR_SLICE, deadline_notifier.deadline());
drop(actor);
}
......@@ -548,7 +606,7 @@ mod test {
init();
let actor = TestActorDesc::new("titi", recv_one_msg_actor);
let context = super::init(valid_args!(), Box::new(dummy_recv_callback))
let context = super::init(valid_args!(), Box::new(dummy_recv_callback), Box::new(dummy_deadline_callback))
.expect("init failed");
context.start()
......@@ -580,7 +638,8 @@ mod test {
let actor = TestActorDesc::new("titi", |actor| send_one_msg_actor(actor, EXPECTED_MSG));
let recv_notifier = RecvNotifier::new();
let context = super::init(valid_args!(), recv_notifier.get_callback())
let deadline_notifier = DeadlineNotifier::new();
let context = super::init(valid_args!(), recv_notifier.get_callback(), deadline_notifier.get_callback())
.expect("init failed");
context.start()
......@@ -595,6 +654,9 @@ mod test {
assert_eq!(dst, remote_vsg_address!());
assert_eq!(buffer, EXPECTED_MSG);
assert_eq!(1, deadline_notifier.num_called());
assert_eq!(SEND_ONE_MSG_ACTOR_DELAY, deadline_notifier.deadline());
context.stop();
drop(actor);
......@@ -607,7 +669,8 @@ mod test {
let actor = TestActorDesc::new("titi", |actor| send_one_delayed_msg_actor(actor, EXPECTED_MSG, slice_micros, delay_micros));
let recv_notifier = RecvNotifier::new();
let context = super::init(valid_args!(), recv_notifier.get_callback())
let deadline_notifier = DeadlineNotifier::new();
let context = super::init(valid_args!(), recv_notifier.get_callback(), deadline_notifier.get_callback())
.expect("init failed");
context.start()
......@@ -627,6 +690,9 @@ mod test {
assert!(delay_micros <= total_usec, "Message received too early: before {}us instead of after {}us", total_usec, delay_micros);
assert!(total_usec <= 10 * delay_micros, "Message received really too late: short before {}us instead of short after {}us", total_usec, delay_micros);