Attention une mise à jour du service Gitlab va être effectuée le mardi 18 janvier (et non lundi 17 comme annoncé précédemment) entre 18h00 et 18h30. Cette mise à jour va générer une interruption du service dont nous ne maîtrisons pas complètement la durée mais qui ne devrait pas excéder quelques minutes.

Commit 631325f4 authored by RILLING Louis's avatar RILLING Louis
Browse files

fake-vm: Remove one layer of wrapper and allocation around Context

parent f726e1fe
......@@ -8,6 +8,7 @@ use libc::{self, uintptr_t};
use log::{debug, error};
use static_assertions::const_assert;
use std::os::raw::{c_char, c_int};
use std::sync::Arc;
unsafe fn parse_os_args<F, T>(argc: c_int, argv: *const *const c_char, parse: F) -> Result<(T, c_int)>
where F: FnOnce(&mut dyn Iterator<Item = std::borrow::Cow<'static, std::ffi::OsStr>>) -> Result<T>
......@@ -42,7 +43,7 @@ unsafe fn parse_os_args<F, T>(argc: c_int, argv: *const *const c_char, parse: F)
type CRecvCallback = unsafe extern "C" fn(uintptr_t);
#[no_mangle]
pub unsafe extern fn vsg_init(argc: c_int, argv: *const *const c_char, next_arg_p: *mut c_int, recv_callback: CRecvCallback, recv_callback_arg: uintptr_t) -> *mut Context {
pub unsafe extern fn vsg_init(argc: c_int, argv: *const *const c_char, next_arg_p: *mut c_int, recv_callback: CRecvCallback, recv_callback_arg: uintptr_t) -> *const Context {
let callback: tansiv_client::RecvCallback = Box::new(move || recv_callback(recv_callback_arg));
match parse_os_args(argc, argv, |args| tansiv_client::init(args, callback)) {
......@@ -50,7 +51,7 @@ pub unsafe extern fn vsg_init(argc: c_int, argv: *const *const c_char, next_arg_
if let Some(next_arg_p) = next_arg_p.as_mut() {
*next_arg_p = next_arg;
}
Box::into_raw(context)
Arc::into_raw(context)
},
Err(e) => {
error!("vsg_init failed: {}", e);
......@@ -60,9 +61,9 @@ pub unsafe extern fn vsg_init(argc: c_int, argv: *const *const c_char, next_arg_
}
#[no_mangle]
pub unsafe extern fn vsg_cleanup(context: *mut Context) {
if let Some(context) = context.as_mut() {
drop(Box::from_raw(context));
pub unsafe extern fn vsg_cleanup(context: *const Context) {
if !context.is_null() {
drop(Arc::from_raw(context));
}
}
......
......@@ -65,10 +65,10 @@ impl Packet {
}
}
// InnerContext must be accessed concurrently from application code and the deadline handler. To
// Context must be accessed concurrently from application code and the deadline handler. To
// enable this, all fields are either read-only or implement thread and signal handler-safe
// interior mutability.
struct InnerContext {
pub struct Context {
// Read-only
address: libc::in_addr_t,
// No concurrency: (mut) accessed only by the deadline handler
......@@ -96,14 +96,20 @@ struct InnerContext {
start_once: Once,
}
impl std::fmt::Debug for InnerContext {
impl std::fmt::Debug for Context {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::result::Result<(), std::fmt::Error> {
write!(f, "InnerContext {{ address: {:0x}, connector: {:?}, input_queue: {:?}, timer_context: {:?}, output_buffer_pool: {:?}, outgoing_messages: {:?}, start_once: {:?} }}", self.address, self.connector, self.input_queue, self.timer_context, self.output_buffer_pool, self.outgoing_messages, self.start_once)
write!(f, "Context {{ address: {:0x}, connector: {:?}, input_queue: {:?}, timer_context: {:?}, output_buffer_pool: {:?}, outgoing_messages: {:?}, start_once: {:?} }}", self.address, self.connector, self.input_queue, self.timer_context, self.output_buffer_pool, self.outgoing_messages, self.start_once)
}
}
impl InnerContext {
fn new(config: &Config, recv_callback: RecvCallback) -> Result<InnerContext> {
#[derive(Debug)]
enum AfterDeadline {
NextDeadline(Duration),
EndSimulation,
}
impl Context {
fn new(config: &Config, recv_callback: RecvCallback) -> Result<Arc<Context>> {
let address = config.address;
let connector = ConnectorImpl::new(&config)?;
let input_queue = WaitfreeArrayQueue::new(config.num_buffers.get());
......@@ -111,7 +117,7 @@ impl InnerContext {
let output_buffer_pool = BufferPool::new(crate::MAX_PACKET_SIZE, config.num_buffers.get());
let outgoing_messages = OutputMsgSet::new(config.num_buffers.get());
Ok(InnerContext {
let context = Arc::new(Context {
address: address,
connector: Mutex::new(connector),
input_queue: input_queue,
......@@ -120,89 +126,24 @@ impl InnerContext {
output_buffer_pool: output_buffer_pool,
outgoing_messages: outgoing_messages,
start_once: Once::new(),
})
}
fn start(&self, deadline: Duration) -> Result<()> {
Ok(self.timer_context.start(deadline)?)
}
fn gettimeofday(&self) -> libc::timeval {
let adjusted_time = self.timer_context.application_now();
libc::timeval {
tv_sec: adjusted_time.timestamp() as libc::time_t,
tv_usec: adjusted_time.timestamp_subsec_micros() as libc::suseconds_t,
}
}
fn send(&self, dst: libc::in_addr_t, msg: &[u8]) -> Result<()> {
let mut buffer = self.output_buffer_pool.allocate_buffer(msg.len())?;
buffer.copy_from_slice(msg);
let send_time = self.timer_context.simulation_now();
// It is possible that the deadline is reached just after recording the send time and
// before inserting the message, which leads to sending the message at the next deadline.
// This would violate the property that send times must be after the previous deadline
// (included) and (strictly) before the current deadline. To solve this, ::at_deadline()
// takes the latest time between the recorded time and the previous deadline.
self.outgoing_messages.insert(send_time, self.address, dst, buffer)?;
Ok(())
}
fn recv<'a, 'b>(&'a self, msg: &'b mut [u8]) -> Result<(libc::in_addr_t, libc::in_addr_t, &'b mut [u8])> {
match self.input_queue.pop() {
Some(Packet { src, dst, payload, }) => {
if msg.len() >= payload.len() {
let msg = &mut msg[..payload.len()];
msg.copy_from_slice(&payload);
Ok((src, dst, msg))
} else {
Err(Error::SizeTooBig)
}
},
None => Err(Error::NoMessageAvailable),
}
}
fn poll(&self) -> Option<()> {
if self.input_queue.is_empty() {
None
} else {
Some(())
}
}
}
#[derive(Debug)]
enum AfterDeadline {
NextDeadline(Duration),
EndSimulation,
}
#[derive(Debug)]
pub struct Context(Arc<InnerContext>);
impl Context {
fn new(config: &Config, recv_callback: RecvCallback) -> Result<Box<Context>> {
let inner_context = InnerContext::new(config, recv_callback)?;
let context = Box::new(Context(Arc::new(inner_context)));
});
timer::register(&context)?;
Ok(context)
}
pub fn start(&self) -> Result<()> {
let context = &self.0;
let mut res = Err(Error::AlreadyStarted);
context.start_once.call_once(|| res = (|| {
let mut connector = context.connector.lock().unwrap();
self.start_once.call_once(|| res = (|| {
let mut connector = self.connector.lock().unwrap();
let msg = connector.recv()?;
// The deadline handler can fire and try to lock connector at any time once self.0.start()
// is called so we must unlock connector before.
drop(connector);
match msg {
MsgIn::GoToDeadline(deadline) => context.start(deadline),
// Writing Ok(...?) helps the compiler to know how to convert std::io::Error to Error
MsgIn::GoToDeadline(deadline) => Ok(self.timer_context.start(deadline)?),
_ => Err(Error::ProtocolViolation),
}
})());
......@@ -211,16 +152,16 @@ impl Context {
}
pub fn stop(&self) {
self.0.timer_context.stop()
self.timer_context.stop()
}
fn at_deadline(&self) -> AfterDeadline {
let mut connector = self.0.connector.lock().unwrap();
let mut connector = self.connector.lock().unwrap();
// First, send all messages from this last time slice to others
let messages = self.0.outgoing_messages.drain();
let previous_deadline = self.0.timer_context.simulation_previous_deadline();
let current_deadline = self.0.timer_context.simulation_next_deadline();
let messages = self.outgoing_messages.drain();
let previous_deadline = self.timer_context.simulation_previous_deadline();
let current_deadline = self.timer_context.simulation_next_deadline();
for (send_time, src, dst, payload) in messages {
let send_time = if send_time < previous_deadline {
// This message was time-stamped before the previous deadline but inserted after.
......@@ -247,7 +188,7 @@ impl Context {
}
// Third, receive messages from others, followed by next deadline
let input_queue = &self.0.input_queue;
let input_queue = &self.input_queue;
let may_notify = input_queue.is_empty();
let after_deadline = loop {
......@@ -264,7 +205,7 @@ impl Context {
};
if may_notify && !input_queue.is_empty() {
(self.0.recv_callback)();
(self.recv_callback)();
}
after_deadline
......@@ -274,7 +215,7 @@ impl Context {
match msg {
MsgIn::DeliverPacket(src, dst, packet) => {
// let size = packet.len();
if self.0.input_queue.push(Packet::new(src, dst, packet)).is_err() {
if self.input_queue.push(Packet::new(src, dst, packet)).is_err() {
// info!("Dropping input packet from {} of {} bytes", src, size);
}
None
......@@ -285,23 +226,52 @@ impl Context {
}
pub fn gettimeofday(&self) -> libc::timeval {
self.0.gettimeofday()
let adjusted_time = self.timer_context.application_now();
libc::timeval {
tv_sec: adjusted_time.timestamp() as libc::time_t,
tv_usec: adjusted_time.timestamp_subsec_micros() as libc::suseconds_t,
}
}
pub fn send(&self, dst: libc::in_addr_t, msg: &[u8]) -> Result<()> {
self.0.send(dst, msg)
let mut buffer = self.output_buffer_pool.allocate_buffer(msg.len())?;
buffer.copy_from_slice(msg);
let send_time = self.timer_context.simulation_now();
// It is possible that the deadline is reached just after recording the send time and
// before inserting the message, which leads to sending the message at the next deadline.
// This would violate the property that send times must be after the previous deadline
// (included) and (strictly) before the current deadline. To solve this, ::at_deadline()
// takes the latest time between the recorded time and the previous deadline.
self.outgoing_messages.insert(send_time, self.address, dst, buffer)?;
Ok(())
}
pub fn recv<'a, 'b>(&'a self, msg: &'b mut [u8]) -> Result<(libc::in_addr_t, libc::in_addr_t, &'b mut [u8])> {
self.0.recv(msg)
match self.input_queue.pop() {
Some(Packet { src, dst, payload, }) => {
if msg.len() >= payload.len() {
let msg = &mut msg[..payload.len()];
msg.copy_from_slice(&payload);
Ok((src, dst, msg))
} else {
Err(Error::SizeTooBig)
}
},
None => Err(Error::NoMessageAvailable),
}
}
pub fn poll(&self) -> Option<()> {
self.0.poll()
if self.input_queue.is_empty() {
None
} else {
Some(())
}
}
}
pub fn init<I>(args: I, recv_callback: RecvCallback) -> Result<Box<Context>>
pub fn init<I>(args: I, recv_callback: RecvCallback) -> Result<Arc<Context>>
where I: IntoIterator,
I::Item: Into<std::ffi::OsString> + Clone {
use structopt::StructOpt;
......@@ -759,7 +729,7 @@ mod test {
let context = super::init(valid_args_h1!(), Box::new(dummy_recv_callback))
.expect("init() failed");
let mut connector = context.0.connector.lock().unwrap();
let mut connector = context.connector.lock().unwrap();
loop {
let msg = connector.recv();
......
......@@ -10,7 +10,7 @@ use std::sync::{Arc, Mutex, RwLock, Weak};
use std::sync::Once;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration as StdDuration;
use crate::{Context, InnerContext};
use crate::Context;
#[derive(Debug)]
struct AdjustedTime(SeqLock<Duration>);
......@@ -225,19 +225,19 @@ impl Drop for TimerContext {
}
lazy_static! {
static ref CONTEXT: RwLock<Weak<InnerContext>> = RwLock::new(Weak::new());
static ref CONTEXT: RwLock<Weak<Context>> = RwLock::new(Weak::new());
}
#[cfg(not(any(test, feature = "test-helpers")))]
static INIT: Once = Once::new();
#[cfg(not(any(test, feature = "test-helpers")))]
pub fn register(context: &Context) -> Result<()> {
pub fn register(context: &Arc<Context>) -> Result<()> {
let mut success = false;
INIT.call_once(|| {
// Signal handler safety: This is the only place where CONTEXT is write-locked.
let mut uniq_context = CONTEXT.write().unwrap();
*uniq_context = Arc::downgrade(&context.0);
*uniq_context = Arc::downgrade(&context);
success = true;
});
......@@ -251,9 +251,9 @@ pub fn register(context: &Context) -> Result<()> {
// Let individual tests overwrite each other's context in turn.
// Assumes that tests are run in a single thread
#[cfg(any(test, feature = "test-helpers"))]
pub fn register(context: &Context) -> Result<()> {
pub fn register(context: &Arc<Context>) -> Result<()> {
let mut uniq_context = CONTEXT.write().unwrap();
*uniq_context = Arc::downgrade(&context.0);
*uniq_context = Arc::downgrade(&context);
Ok(())
}
......@@ -261,13 +261,12 @@ extern "C" fn deadline_handler(_: libc::c_int) {
use crate::AfterDeadline;
if let Some(context) = CONTEXT.read().unwrap().upgrade() {
let context = Context(context);
let freeze_time = context.0.timer_context.freeze_time();
let freeze_time = context.timer_context.freeze_time();
match context.at_deadline() {
AfterDeadline::NextDeadline(time_to_deadline) => {
context.0.timer_context.thaw_time_to_deadline(Some(freeze_time), time_to_deadline).expect("thaw_time_to_deadline failed")
context.timer_context.thaw_time_to_deadline(Some(freeze_time), time_to_deadline).expect("thaw_time_to_deadline failed")
},
AfterDeadline::EndSimulation => context.0.timer_context.stopped.store(true, Ordering::Release),
AfterDeadline::EndSimulation => context.timer_context.stopped.store(true, Ordering::Release),
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment