From b3c9f86b022a7f09aff050ef36fc47b8e79178f7 Mon Sep 17 00:00:00 2001 From: WILLIAMS Harvey <hwilliam@inria.fr> Date: Tue, 6 May 2025 15:55:01 +0200 Subject: [PATCH 1/2] Working TCP example --- Cargo.lock | 128 +++++++++++++++++++++++++++ Cargo.toml | 2 +- examples/tcp/Cargo.toml | 14 +++ examples/tcp/src/main.rs | 30 +++++++ examples/tcp/src/tcp/actor.rs | 129 +++++++++++++++++++++++++++ examples/tcp/src/tcp/connection.rs | 134 +++++++++++++++++++++++++++++ examples/tcp/src/tcp/listen.rs | 33 +++++++ examples/tcp/src/tcp/mod.rs | 37 ++++++++ 8 files changed, 506 insertions(+), 1 deletion(-) create mode 100644 examples/tcp/Cargo.toml create mode 100644 examples/tcp/src/main.rs create mode 100644 examples/tcp/src/tcp/actor.rs create mode 100644 examples/tcp/src/tcp/connection.rs create mode 100644 examples/tcp/src/tcp/listen.rs create mode 100644 examples/tcp/src/tcp/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 02a1cd8..ebf809b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -67,6 +67,95 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "gimli" version = "0.31.1" @@ -162,6 +251,12 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "proc-macro2" version = "1.0.94" @@ -218,6 +313,15 @@ dependencies = [ "libc", ] +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + [[package]] name = "smallvec" version = "1.14.0" @@ -245,6 +349,17 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "tcp" +version = "0.1.0" +dependencies = [ + "async-trait", + "futures", + "rust-networking-examples", + "tokio", + "tokio-util", +] + [[package]] name = "tokio" version = "1.44.1" @@ -274,6 +389,19 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-util" +version = "0.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "unicode-ident" version = "1.0.18" diff --git a/Cargo.toml b/Cargo.toml index 0198f9d..df86bbf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [workspace] members = [ "examples/local" -] +, "examples/tcp"] [dependencies] tokio = { version = "1.44.1", features = ["full"] } diff --git a/examples/tcp/Cargo.toml b/examples/tcp/Cargo.toml new file mode 100644 index 0000000..beb6965 --- /dev/null +++ b/examples/tcp/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "tcp" +version = "0.1.0" +edition = "2021" + +[dependencies] +# For the interface +rust-networking-examples = { path = "../../" } + +# Async related +tokio = { version = "1.44.1", features = ["full"] } +tokio-util = { version = "0.7.15", features = ["codec"] } +async-trait = "0.1.87" +futures = "0.3.31" diff --git a/examples/tcp/src/main.rs b/examples/tcp/src/main.rs new file mode 100644 index 0000000..c7d8709 --- /dev/null +++ b/examples/tcp/src/main.rs @@ -0,0 +1,30 @@ +use std::time::Duration; + +use tcp::TcpConnector; +use rust_networking_examples::Network; +use tokio::time::sleep; + +mod tcp; + +#[tokio::main] +async fn main() { + let mut c_a = TcpConnector::new("127.0.0.1:3000"); + let mut c_b = TcpConnector::new("127.0.0.1:3001"); + + let mut r_a = c_a.recv().await; + + tokio::spawn(async move { + while let Some((from, msg)) = r_a.recv().await { + println!("Got {}, {:?}", from, msg); + } + }); + + sleep(Duration::from_secs(2)).await; + + c_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec()); + c_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec()); + c_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec()); + + sleep(Duration::from_secs(10)).await; + println!("Closing"); +} diff --git a/examples/tcp/src/tcp/actor.rs b/examples/tcp/src/tcp/actor.rs new file mode 100644 index 0000000..7d10540 --- /dev/null +++ b/examples/tcp/src/tcp/actor.rs @@ -0,0 +1,129 @@ +use tokio::net::TcpStream; +use tokio::select; +use std::collections::HashMap; +use tokio::sync::{mpsc, oneshot}; + +use super::connection::{ConnectionHandle, ReceivedEvent}; +use super::listen::listen_tcp; +use super::DEFAULT_CHANNEL_SIZE; + +/// Messages that the Net actor knows how to process +pub enum NetMessage { + NewConnection { + from: String, + socket: TcpStream + }, + SendMessage { + to: String, + message: Vec<u8>, + }, + ReceiveMessage { + from: String, + message: Vec<u8> + }, + RegisterMessageHandler { + respond_to: oneshot::Sender<mpsc::Receiver<(String, Vec<u8>)>> + } +} + +// Map address:port to connection handle(s) +type Connections = HashMap<String, Vec<ConnectionHandle>>; + +/// Net actor, orchestrates the RPS, Listen and Connection Actors +struct Actor { + /// Wallet message handlers + emit: Vec<mpsc::Sender<(String, Vec<u8>)>>, + + /// Map of addr -> communication channel for currently active connection agents + connections: Connections, + + connection_sender: mpsc::Sender<ReceivedEvent>, +} + +#[derive(Clone)] +pub struct ActorHandle { + pub sender: mpsc::Sender<NetMessage> +} + +impl ActorHandle { + pub fn new(bind_address: String) -> Self { + // Channel to communicate with parent + let (sender, mut receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE); + + // Channel to receive incoming messages from connections + let (sender_connection, mut receiver_connection) = mpsc::channel(DEFAULT_CHANNEL_SIZE); + + tokio::spawn(async move { + // Channel to accept new connections + let mut listening = listen_tcp(&bind_address).await; + + // Event loop + let mut actor = Actor::new(sender_connection); + + loop { + select! { + event = receiver.recv() => { + match event { + Some(msg) => actor.handle_message(msg), + None => break, // Exit + }; + }, + Some((from, socket)) = listening.recv() => { + actor.handle_message(NetMessage::NewConnection { from, socket }); + }, + Some(ReceivedEvent { from, message }) = receiver_connection.recv() => { + actor.handle_message(NetMessage::ReceiveMessage { from, message }); + } + } + } + }); + + ActorHandle { sender } + } +} + + +impl Actor { + pub fn new(connection_sender: mpsc::Sender<ReceivedEvent>) -> Self { + Actor { emit: vec![], connections: Connections::new(), connection_sender } + } + + pub fn handle_message(&mut self, msg: NetMessage) { + match msg { + NetMessage::NewConnection { from, socket } => { + let handle = ConnectionHandle::from_socket(from.clone(), socket, self.connection_sender.clone()); + + self.connections + .entry(from) + .or_default() + .push(handle); + }, + NetMessage::SendMessage { to, message } => { + // Purge stale connection handles that have already closed. + self.connections.entry(to.clone()).and_modify(|handles| { + handles.retain(|h| h.open()); + }); + + // Send message to the first connection handle we find, or open a new one. + self.connections + .entry(to.clone()) + .or_insert(vec![ConnectionHandle::new(to, self.connection_sender.clone())]) + .first_mut() + .unwrap() + .send(message); + }, + NetMessage::ReceiveMessage { from, message } => { + self.emit + .iter_mut() + .for_each(|sender| { let _ = sender.try_send((from.clone(), message.clone())); }); + }, + NetMessage::RegisterMessageHandler { respond_to } => { + let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE); + + self.emit.push(sender); + + respond_to.send(receiver).expect("Channel should be alive"); + }, + } + } +} \ No newline at end of file diff --git a/examples/tcp/src/tcp/connection.rs b/examples/tcp/src/tcp/connection.rs new file mode 100644 index 0000000..56c1a62 --- /dev/null +++ b/examples/tcp/src/tcp/connection.rs @@ -0,0 +1,134 @@ +use futures::{sink::SinkExt, StreamExt}; +use tokio::{io::{self, split}, net::TcpStream, select, sync::mpsc}; +use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; + +use super::DEFAULT_CHANNEL_SIZE; + +// The event emitted by this actor, denoting a message and who it was received from. +pub struct ReceivedEvent { + pub from: String, + pub message: Vec<u8> +} + +pub enum ConnectionHandleMessage { + Send(Vec<u8>), + Receive(Vec<u8>) +} + +#[derive(Clone)] +pub struct ConnectionHandle { + pub sender: mpsc::Sender<ConnectionHandleMessage> +} + +impl ConnectionHandle { + pub fn from_socket(address: String, socket: TcpStream, emit: mpsc::Sender<ReceivedEvent>) -> Self { + let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE); + + event_loop(receiver, address, Some(socket), emit); + + ConnectionHandle { sender } + } + + pub fn new(address: String, emit: mpsc::Sender<ReceivedEvent>) -> Self { + let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE); + + event_loop(receiver, address, None, emit); + + ConnectionHandle { sender } + } + + pub fn open(&self) -> bool { + !self.sender.is_closed() + } + + pub fn send(&mut self, message: Vec<u8>) { + self.sender.try_send(ConnectionHandleMessage::Send(message)).expect("Actor should be alive"); + } +} + +/// Function for creating the actor and then running the event loop +fn event_loop(mut receiver: mpsc::Receiver<ConnectionHandleMessage>, address: String, socket: Option<TcpStream>, emit: mpsc::Sender<ReceivedEvent>) { + tokio::spawn(async move { + let socket = match socket { + Some(s) => s, + None => { + TcpStream::connect(address.clone()) + .await + .expect("Failed to connect to address") + }, + }; + + // Start connection actor + let (writer, mut reader) = frame_and_split_socket(socket); + + // Writer becomes and channel that we can send messsages down + let writer = start_writer_task(writer); + + let mut actor = Actor { address: address.clone(), emit: emit.clone(), writer }; + + loop { + select!{ + received = reader.next() => { + if let Some(message) = received { + let bytes = message.expect("Message should be readble"); + actor.handle_message(ConnectionHandleMessage::Receive(bytes.into())); + } else { + break; // Connection finished + } + }, + Some(msg) = receiver.recv() => { + actor.handle_message(msg); + }, + else => break // Actor handle dropped + } + } + }); +} + +struct Actor { + address: String, + emit: mpsc::Sender<ReceivedEvent>, + writer: mpsc::Sender<Vec<u8>>, +} + +impl Actor { + pub fn handle_message(&mut self, msg: ConnectionHandleMessage) { + match msg { + ConnectionHandleMessage::Send(message) => { + self.writer.try_send(message).expect("Writer task should be running"); + }, + ConnectionHandleMessage::Receive(message) => { + self.emit.try_send(ReceivedEvent { from: self.address.clone(), message }).expect("Should be able to emit events"); + }, + } + } +} + + +type ConnectionReader = FramedRead<io::ReadHalf<TcpStream>, LengthDelimitedCodec>; +type ConnectionWriter = FramedWrite<io::WriteHalf<TcpStream>, LengthDelimitedCodec>; + +/// Split TcpStream into two seperate ownership objects; a writer (for sending) and a reader (for receiving). +fn frame_and_split_socket(socket: TcpStream) -> (ConnectionWriter, ConnectionReader) { + // Split the TcpStream into read and write halves + let (read_half, write_half) = split(socket); + + // Create FramedRead for receiving and FramedWrite for sending + let reader = FramedRead::new(read_half, LengthDelimitedCodec::new()); + let writer = FramedWrite::new(write_half, LengthDelimitedCodec::new()); + + (writer, reader) +} + +/// To write to the underlying TCP socket, we need an async context. This is setup by this function. +fn start_writer_task(mut writer: ConnectionWriter) -> mpsc::Sender<Vec<u8>> { + let (sender, mut receiver) = mpsc::channel::<Vec<u8>>(DEFAULT_CHANNEL_SIZE); + + tokio::spawn(async move { + while let Some(bytes) = receiver.recv().await { + writer.send(bytes.into()).await.expect("Should have sent message"); + } + }); + + sender +} \ No newline at end of file diff --git a/examples/tcp/src/tcp/listen.rs b/examples/tcp/src/tcp/listen.rs new file mode 100644 index 0000000..595de1a --- /dev/null +++ b/examples/tcp/src/tcp/listen.rs @@ -0,0 +1,33 @@ +use tokio::select; +use tokio::sync::mpsc; +use tokio::net::{TcpListener, TcpStream}; + +use super::DEFAULT_CHANNEL_SIZE; + + +/// `listen_tcp` spawns an Actor that waits for inbound requests to open TCP connections, accepts them and then starts a connection actor to exchange messages with them. +pub async fn listen_tcp(listen_addr: &str) -> mpsc::Receiver<(String, TcpStream)> { + let listener = TcpListener::bind(listen_addr).await.expect("Couldn't bind to listen address"); + + let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE); + + tokio::spawn(async move { + loop { + select! { + // Accept all inbound TCP connections + incoming = listener.accept() => { + match incoming { + Ok((socket, socket_addr)) => { + sender.send((socket_addr.to_string(), socket)).await.expect("Parent actor should be alive and accepting new connections"); + }, + Err(error) => { + println!("TCP : error accepting connection {}", error); + } + } + } + } + } + }); + + receiver +} \ No newline at end of file diff --git a/examples/tcp/src/tcp/mod.rs b/examples/tcp/src/tcp/mod.rs new file mode 100644 index 0000000..1ba975c --- /dev/null +++ b/examples/tcp/src/tcp/mod.rs @@ -0,0 +1,37 @@ +// Actors +mod actor; +mod listen; +mod connection; + +pub use actor::ActorHandle; +use rust_networking_examples::Network; +use tokio::sync::{mpsc, oneshot}; + +const DEFAULT_CHANNEL_SIZE: usize = 1024; + +pub struct TcpConnector { + handle: ActorHandle +} + +impl TcpConnector { + pub fn new(bind_address: &str) -> TcpConnector { + TcpConnector { handle: ActorHandle::new(String::from(bind_address)) } + } +} + +#[async_trait::async_trait] +impl Network for TcpConnector { + /// Send a message to the identifier specified by the 'to' field. + fn send(&mut self, to: &str, message: Vec<u8>) { + self.handle.sender.try_send(actor::NetMessage::SendMessage { to: String::from(to), message }).expect("Actor should be alive"); + } + + /// Generate one or more receivers to catch inbound messages. + async fn recv(&mut self) -> mpsc::Receiver<(String, Vec<u8>)> { + let (respond_to, receiver) = oneshot::channel(); + + self.handle.sender.try_send(actor::NetMessage::RegisterMessageHandler { respond_to }).expect("Actor should be alive"); + + receiver.await.expect("Should have receievd channel back") + } +} -- GitLab From 53ec3a30727ba33e78d1bd0be3b0314a8a487f36 Mon Sep 17 00:00:00 2001 From: WILLIAMS Harvey <hwilliam@inria.fr> Date: Wed, 7 May 2025 16:45:47 +0200 Subject: [PATCH 2/2] Documentation --- Cargo.toml | 5 +++-- examples/tcp/src/README.md | 12 +++++++++++ examples/tcp/src/main.rs | 30 ++++++++++++++------------ examples/tcp/src/tcp/actor.rs | 39 ++++++++++++++++++++-------------- examples/tcp/src/tcp/listen.rs | 2 +- examples/tcp/src/tcp/mod.rs | 4 ++-- 6 files changed, 57 insertions(+), 35 deletions(-) create mode 100644 examples/tcp/src/README.md diff --git a/Cargo.toml b/Cargo.toml index df86bbf..eacb326 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,8 +5,9 @@ edition = "2021" [workspace] members = [ - "examples/local" -, "examples/tcp"] + "examples/local", + "examples/tcp" +] [dependencies] tokio = { version = "1.44.1", features = ["full"] } diff --git a/examples/tcp/src/README.md b/examples/tcp/src/README.md new file mode 100644 index 0000000..9d692b1 --- /dev/null +++ b/examples/tcp/src/README.md @@ -0,0 +1,12 @@ +# examples/tcp + +An implementation of the network interface, using TCP sockets. Identifiers follow the format of `{IP}:{port}`. + +Identifers now behave slightly differently to reflect the realities of using raw sockets, in contrast to `examples/local` where they consistently map 1-1 to an instance of `Connector`. On receiving messages: +- `{IP}` may be either an IPV4 address or an IPV6 address, depending on your OS and networking environment. The `{port}` is the source port for the tcp connection. +- `{port}`, will be unpredictable and assigned by the operating system of which other peer's `Connector` instance is located. +- `Connector` may have one or more open sockets with another `Connector` instance, each with a distinct source port number. + +When designing your application, you may want to be able to retain some form of unique identifier for each Connector instance on the network: +- If you can be certain that only one instance of `Connector` is behind each IP address, perhaps on a local network such as Grid5000 in which each node has at most one connector; the IP address (with the port stripped out) may be suitable to identify peers. +- Otherwise, you may want to define some messaging protocol on top of the interface yourself. For instance using public key cryptography to consistently distinguish peers. \ No newline at end of file diff --git a/examples/tcp/src/main.rs b/examples/tcp/src/main.rs index c7d8709..6197894 100644 --- a/examples/tcp/src/main.rs +++ b/examples/tcp/src/main.rs @@ -1,30 +1,32 @@ +use tokio::time::sleep; use std::time::Duration; use tcp::TcpConnector; use rust_networking_examples::Network; -use tokio::time::sleep; mod tcp; #[tokio::main] async fn main() { - let mut c_a = TcpConnector::new("127.0.0.1:3000"); - let mut c_b = TcpConnector::new("127.0.0.1:3001"); - - let mut r_a = c_a.recv().await; + // Create two tcp connectors that listen on ports 3000 and 3001. + let mut conn_a = TcpConnector::new("127.0.0.1:3000"); + let mut conn_b = TcpConnector::new("127.0.0.1:3001"); + // Spawn a task to print out messages received by the first tcp connector tokio::spawn(async move { - while let Some((from, msg)) = r_a.recv().await { - println!("Got {}, {:?}", from, msg); + // Register a receiver channel on the first tcp connector + let mut recv_a = conn_a.recv().await; + + while let Some((from, msg)) = recv_a.recv().await { + println!("conn_a received {:?} from {}", msg, from); } }); - sleep(Duration::from_secs(2)).await; - - c_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec()); - c_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec()); - c_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec()); + // Send a message multiple times from conn_b to conn_a + conn_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec()); + conn_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec()); + conn_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec()); - sleep(Duration::from_secs(10)).await; - println!("Closing"); + // Allow some time for conn_a to receive the messages + sleep(Duration::from_secs(2)).await; } diff --git a/examples/tcp/src/tcp/actor.rs b/examples/tcp/src/tcp/actor.rs index 7d10540..d611a61 100644 --- a/examples/tcp/src/tcp/actor.rs +++ b/examples/tcp/src/tcp/actor.rs @@ -7,8 +7,8 @@ use super::connection::{ConnectionHandle, ReceivedEvent}; use super::listen::listen_tcp; use super::DEFAULT_CHANNEL_SIZE; -/// Messages that the Net actor knows how to process -pub enum NetMessage { +/// Messages that this actor knows how to process +pub enum HandleMessage { NewConnection { from: String, socket: TcpStream @@ -29,20 +29,24 @@ pub enum NetMessage { // Map address:port to connection handle(s) type Connections = HashMap<String, Vec<ConnectionHandle>>; +/// Represents a collection of channels that we use to emit messages received over the network +type MessageHandlers = Vec<mpsc::Sender<(String, Vec<u8>)>>; + /// Net actor, orchestrates the RPS, Listen and Connection Actors struct Actor { - /// Wallet message handlers - emit: Vec<mpsc::Sender<(String, Vec<u8>)>>, + /// Application handlers to broadcast messages down. + handlers: MessageHandlers, - /// Map of addr -> communication channel for currently active connection agents + /// Holds ownership of all the, currently live, connection actors. connections: Connections, + /// A sender channel to be passed to each connection actor on creation. connection_sender: mpsc::Sender<ReceivedEvent>, } #[derive(Clone)] pub struct ActorHandle { - pub sender: mpsc::Sender<NetMessage> + pub sender: mpsc::Sender<HandleMessage> } impl ActorHandle { @@ -69,10 +73,10 @@ impl ActorHandle { }; }, Some((from, socket)) = listening.recv() => { - actor.handle_message(NetMessage::NewConnection { from, socket }); + actor.handle_message(HandleMessage::NewConnection { from, socket }); }, Some(ReceivedEvent { from, message }) = receiver_connection.recv() => { - actor.handle_message(NetMessage::ReceiveMessage { from, message }); + actor.handle_message(HandleMessage::ReceiveMessage { from, message }); } } } @@ -85,12 +89,13 @@ impl ActorHandle { impl Actor { pub fn new(connection_sender: mpsc::Sender<ReceivedEvent>) -> Self { - Actor { emit: vec![], connections: Connections::new(), connection_sender } + Actor { handlers: MessageHandlers::new(), connections: Connections::new(), connection_sender } } - pub fn handle_message(&mut self, msg: NetMessage) { + pub fn handle_message(&mut self, msg: HandleMessage) { match msg { - NetMessage::NewConnection { from, socket } => { + HandleMessage::NewConnection { from, socket } => { + // Spawn a new connection handle to service this new tcp socket let handle = ConnectionHandle::from_socket(from.clone(), socket, self.connection_sender.clone()); self.connections @@ -98,7 +103,7 @@ impl Actor { .or_default() .push(handle); }, - NetMessage::SendMessage { to, message } => { + HandleMessage::SendMessage { to, message } => { // Purge stale connection handles that have already closed. self.connections.entry(to.clone()).and_modify(|handles| { handles.retain(|h| h.open()); @@ -112,15 +117,17 @@ impl Actor { .unwrap() .send(message); }, - NetMessage::ReceiveMessage { from, message } => { - self.emit + HandleMessage::ReceiveMessage { from, message } => { + // Broadcast to the application handlers when we receive a message from a connection actor. + self.handlers .iter_mut() .for_each(|sender| { let _ = sender.try_send((from.clone(), message.clone())); }); }, - NetMessage::RegisterMessageHandler { respond_to } => { + HandleMessage::RegisterMessageHandler { respond_to } => { + // Register an application handler let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE); - self.emit.push(sender); + self.handlers.push(sender); respond_to.send(receiver).expect("Channel should be alive"); }, diff --git a/examples/tcp/src/tcp/listen.rs b/examples/tcp/src/tcp/listen.rs index 595de1a..59a04bc 100644 --- a/examples/tcp/src/tcp/listen.rs +++ b/examples/tcp/src/tcp/listen.rs @@ -5,7 +5,7 @@ use tokio::net::{TcpListener, TcpStream}; use super::DEFAULT_CHANNEL_SIZE; -/// `listen_tcp` spawns an Actor that waits for inbound requests to open TCP connections, accepts them and then starts a connection actor to exchange messages with them. +/// `listen_tcp` spawns an Actor that waits for inbound requests to open TCP connections, accepts them and then emits them on the returned channel. pub async fn listen_tcp(listen_addr: &str) -> mpsc::Receiver<(String, TcpStream)> { let listener = TcpListener::bind(listen_addr).await.expect("Couldn't bind to listen address"); diff --git a/examples/tcp/src/tcp/mod.rs b/examples/tcp/src/tcp/mod.rs index 1ba975c..c27e16b 100644 --- a/examples/tcp/src/tcp/mod.rs +++ b/examples/tcp/src/tcp/mod.rs @@ -23,14 +23,14 @@ impl TcpConnector { impl Network for TcpConnector { /// Send a message to the identifier specified by the 'to' field. fn send(&mut self, to: &str, message: Vec<u8>) { - self.handle.sender.try_send(actor::NetMessage::SendMessage { to: String::from(to), message }).expect("Actor should be alive"); + self.handle.sender.try_send(actor::HandleMessage::SendMessage { to: String::from(to), message }).expect("Actor should be alive"); } /// Generate one or more receivers to catch inbound messages. async fn recv(&mut self) -> mpsc::Receiver<(String, Vec<u8>)> { let (respond_to, receiver) = oneshot::channel(); - self.handle.sender.try_send(actor::NetMessage::RegisterMessageHandler { respond_to }).expect("Actor should be alive"); + self.handle.sender.try_send(actor::HandleMessage::RegisterMessageHandler { respond_to }).expect("Actor should be alive"); receiver.await.expect("Should have receievd channel back") } -- GitLab