From e31ee2f78feb7255d0aa845368f2c29f471258ee Mon Sep 17 00:00:00 2001 From: WILLIAMS Harvey <hwilliam@inria.fr> Date: Thu, 13 Mar 2025 17:36:17 +0100 Subject: [PATCH] Pushing local communication example --- .vscode/settings.json | 5 + Cargo.lock | 304 +++++++++++++++++++++++++++++ Cargo.toml | 6 + examples/local/Cargo.toml | 12 ++ examples/local/src/application.rs | 0 examples/local/src/main.rs | 20 ++ examples/local/src/net/dispatch.rs | 101 ++++++++++ examples/local/src/net/mod.rs | 35 ++++ src/lib.rs | 6 +- 9 files changed, 487 insertions(+), 2 deletions(-) create mode 100644 .vscode/settings.json create mode 100644 examples/local/Cargo.toml create mode 100644 examples/local/src/application.rs create mode 100644 examples/local/src/main.rs create mode 100644 examples/local/src/net/dispatch.rs create mode 100644 examples/local/src/net/mod.rs diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..e736c43 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "rust-analyzer.linkedProjects": [ + "Cargo.toml", + ], +} \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index ce5a5e5..02a1cd8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "addr2line" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + [[package]] name = "async-trait" version = "0.1.87" @@ -13,6 +28,140 @@ dependencies = [ "syn", ] +[[package]] +name = "autocfg" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" + +[[package]] +name = "backtrace" +version = "0.3.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-targets", +] + +[[package]] +name = "bitflags" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" + +[[package]] +name = "bytes" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "gimli" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" + +[[package]] +name = "libc" +version = "0.2.171" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" + +[[package]] +name = "local" +version = "0.1.0" +dependencies = [ + "async-trait", + "rust-networking-examples", + "tokio", +] + +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "miniz_oxide" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e3e04debbb59698c15bacbb6d93584a8c0ca9cc3213cb423d31f760d8843ce5" +dependencies = [ + "adler2", +] + +[[package]] +name = "mio" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" +dependencies = [ + "libc", + "wasi", + "windows-sys", +] + +[[package]] +name = "object" +version = "0.36.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" +dependencies = [ + "memchr", +] + +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" + [[package]] name = "proc-macro2" version = "1.0.94" @@ -31,11 +180,58 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "redox_syscall" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b8c0c260b63a8219631167be35e6a988e9554dbd323f8bd08439c8ed1302bd1" +dependencies = [ + "bitflags", +] + [[package]] name = "rust-networking-examples" version = "0.1.0" dependencies = [ "async-trait", + "tokio", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + +[[package]] +name = "smallvec" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd" + +[[package]] +name = "socket2" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" +dependencies = [ + "libc", + "windows-sys", ] [[package]] @@ -49,8 +245,116 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "tokio" +version = "1.44.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f382da615b842244d4b8738c82ed1275e6c5dd90c459a30941cd07080b06c91a" +dependencies = [ + "backtrace", + "bytes", + "libc", + "mio", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys", +] + +[[package]] +name = "tokio-macros" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "unicode-ident" version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" diff --git a/Cargo.toml b/Cargo.toml index af6e6a1..0198f9d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,5 +3,11 @@ name = "rust-networking-examples" version = "0.1.0" edition = "2021" +[workspace] +members = [ + "examples/local" +] + [dependencies] +tokio = { version = "1.44.1", features = ["full"] } async-trait = "0.1.87" diff --git a/examples/local/Cargo.toml b/examples/local/Cargo.toml new file mode 100644 index 0000000..8f1f3f0 --- /dev/null +++ b/examples/local/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "local" +version = "0.1.0" +edition = "2021" + +[dependencies] +# For the interface +rust-networking-examples = { path = "../../" } + +# Async related +tokio = { version = "1.44.1", features = ["full"] } +async-trait = "0.1.87" \ No newline at end of file diff --git a/examples/local/src/application.rs b/examples/local/src/application.rs new file mode 100644 index 0000000..e69de29 diff --git a/examples/local/src/main.rs b/examples/local/src/main.rs new file mode 100644 index 0000000..1f49bad --- /dev/null +++ b/examples/local/src/main.rs @@ -0,0 +1,20 @@ +use rust_networking_examples::Network; + +mod application; +mod net; + +use net::Net; + +/// Our main function decorated with the tokio::main attribute to setup an async runtime. +#[tokio::main] +async fn main() { + let mut net_a = Net::new("a").await; + let mut net_b = Net::new("b").await; + + let mut recv_b = net_b.recv().await; + net_a.send("b", "Hello world!".as_bytes().to_vec()); + + while let Some((from, msg)) = recv_b.recv().await { + println!("{}, {:?}", from, msg); + } +} diff --git a/examples/local/src/net/dispatch.rs b/examples/local/src/net/dispatch.rs new file mode 100644 index 0000000..1bd19b0 --- /dev/null +++ b/examples/local/src/net/dispatch.rs @@ -0,0 +1,101 @@ +use tokio::sync::{mpsc, oneshot}; +use std::{collections::HashMap, sync::LazyLock}; + +/// We use a static dispatch handle which other actors can freely clone and use. +pub static DISPATCHER: LazyLock<DispatchHandle> = LazyLock::new(DispatchHandle::new); + +/// The buffer size of the channel used to communicate with the dispatch actor. +const ACTOR_MESSAGE_BUFFER: usize = 1024; +/// The buffer size of the channels used to dispatch messages +const HANDLER_MESSAGE_BUFFER: usize = 1024; + +enum DispatchMessage { + /// Instruct dispatcher to send a message to a specific address + SendMessage { + to: String, + message: Vec<u8> + }, + /// Instruct dispatcher to provide a receiver through which messages to 'address' will be received. + RegisterHandler { + address: String, + respond_to: oneshot::Sender<mpsc::Receiver<(String, Vec<u8>)>>, + } +} + +/// The handle to the dispatch actor, note the 'Clone' trait. +#[derive(Clone)] +pub struct DispatchHandle { + sender: mpsc::Sender<DispatchMessage> +} + +impl DispatchHandle { + pub fn new() -> DispatchHandle { + // Open a multiple sender, single consumer channel to communicate with the actor. + let (sender, mut receiver) = mpsc::channel(ACTOR_MESSAGE_BUFFER); + + // Event loop for the dispatch actor, exits when all handles are dropped. + tokio::spawn(async move { + let mut actor = Actor::default(); + + while let Some(message) = receiver.recv().await { + actor.handle_message(message); + } + }); + + // Return the handle + DispatchHandle { sender } + } + + /// Utility function to register a new address + pub async fn register(&mut self, address: &str) -> mpsc::Receiver<(String, Vec<u8>)> { + let (sender, receiver) = oneshot::channel(); + + { + let message = DispatchMessage::RegisterHandler { address: address.to_string(), respond_to: sender }; + + self.sender.try_send(message).expect("Actor should be alive."); + } + + receiver.await.expect("Channel should have been registered") + } + + /// Utility function to send a message + pub fn send_message(&mut self, to: &str, message: Vec<u8>) { + let message = DispatchMessage::SendMessage { to: to.to_string(), message }; + + self.sender.try_send(message).expect("Actor should be alive."); + } +} + +/// The dispatch actor itself, that performs message routing. +#[derive(Default)] +struct Actor { + senders: HashMap<String, Vec<mpsc::Sender<(String, Vec<u8>)>>> +} + +impl Actor { + pub fn handle_message(&mut self, message: DispatchMessage) { + match message { + DispatchMessage::SendMessage { to, message} => { + // Look for senders to dispatch message to + if let Some(senders) = self.senders.get_mut(&to) { + for sender in senders.iter_mut() { + sender.try_send((to.clone(), message.clone())).expect("Expected sender channel to be closed and not full.") + } + } else { + panic!("No sender with address {} to dispatch message to.", to); + } + }, + DispatchMessage::RegisterHandler { address, respond_to } => { + // Create a new channel + let (sender, receiver) = mpsc::channel(HANDLER_MESSAGE_BUFFER); + + // Place sender in 'senders' + self.senders.entry(address).or_default().push(sender); + + // Send back the new receiver + respond_to.send(receiver).expect("Return channel has closed prematurely."); + }, + } + } +} \ No newline at end of file diff --git a/examples/local/src/net/mod.rs b/examples/local/src/net/mod.rs new file mode 100644 index 0000000..0f4fea2 --- /dev/null +++ b/examples/local/src/net/mod.rs @@ -0,0 +1,35 @@ +mod dispatch; + +use dispatch::{DispatchHandle, DISPATCHER}; +use rust_networking_examples::Network; +use tokio::sync::mpsc; + +/// Handle for sending and receiving on our network simulated using async channels. +#[derive(Clone)] +pub struct Net { + /// Handle to our dispatcher, for sending messager + dispatcher: DispatchHandle, + + /// Our address + address: String, +} + +impl Net { + pub async fn new(address: &str) -> Net { + Net { dispatcher: DISPATCHER.clone(), address: address.to_string() } + } +} + +// Here we implement our Network interface. +#[async_trait::async_trait] +impl Network for Net { + fn send(&mut self, to: &str, message: Vec<u8>) { + // Send message to the dispatch actor + self.dispatcher.send_message(to, message); + } + + async fn recv(&mut self) -> mpsc::Receiver<(String, Vec<u8>)> { + // Register a new sender with the dispatch actor + self.dispatcher.register(&self.address).await + } +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 889360b..cd19324 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,11 +1,13 @@ +use tokio::sync::mpsc; + /// Our Network interface which is accomplished through a language feature of Rust called 'traits' /// /// Note the async_trait annotation; this adds support for defining async functions as part of your interface. The stable version of rust currently doesn't have this, so we rely on a crate for as a workaround. #[async_trait::async_trait] pub trait Network { /// Send a message to the identifier specified by the 'to' field. - fn send(to: &str, message: Vec<u8>); + fn send(&mut self, to: &str, message: Vec<u8>); /// Wait for a message to be received - async fn recv() -> Option<(String, Vec<u8>)>; + async fn recv(&mut self) -> mpsc::Receiver<(String, Vec<u8>)>; } \ No newline at end of file -- GitLab