diff --git a/Cargo.toml b/Cargo.toml index df86bbfc06e4e1e5a55589ca227a4cf21f44f1fb..eacb326b38bd39323b35402ce16040c126df59d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,8 +5,9 @@ edition = "2021" [workspace] members = [ - "examples/local" -, "examples/tcp"] + "examples/local", + "examples/tcp" +] [dependencies] tokio = { version = "1.44.1", features = ["full"] } diff --git a/examples/tcp/src/README.md b/examples/tcp/src/README.md new file mode 100644 index 0000000000000000000000000000000000000000..9d692b15f815a435b29a4dde5cd65b2d9e8b51c6 --- /dev/null +++ b/examples/tcp/src/README.md @@ -0,0 +1,12 @@ +# examples/tcp + +An implementation of the network interface, using TCP sockets. Identifiers follow the format of `{IP}:{port}`. + +Identifers now behave slightly differently to reflect the realities of using raw sockets, in contrast to `examples/local` where they consistently map 1-1 to an instance of `Connector`. On receiving messages: +- `{IP}` may be either an IPV4 address or an IPV6 address, depending on your OS and networking environment. The `{port}` is the source port for the tcp connection. +- `{port}`, will be unpredictable and assigned by the operating system of which other peer's `Connector` instance is located. +- `Connector` may have one or more open sockets with another `Connector` instance, each with a distinct source port number. + +When designing your application, you may want to be able to retain some form of unique identifier for each Connector instance on the network: +- If you can be certain that only one instance of `Connector` is behind each IP address, perhaps on a local network such as Grid5000 in which each node has at most one connector; the IP address (with the port stripped out) may be suitable to identify peers. +- Otherwise, you may want to define some messaging protocol on top of the interface yourself. For instance using public key cryptography to consistently distinguish peers. \ No newline at end of file diff --git a/examples/tcp/src/main.rs b/examples/tcp/src/main.rs index c7d87096bb4f63f38568ceb374a11f587a64ad2f..61978948c8315f3e1bb266518a38a3fff0eb871e 100644 --- a/examples/tcp/src/main.rs +++ b/examples/tcp/src/main.rs @@ -1,30 +1,32 @@ +use tokio::time::sleep; use std::time::Duration; use tcp::TcpConnector; use rust_networking_examples::Network; -use tokio::time::sleep; mod tcp; #[tokio::main] async fn main() { - let mut c_a = TcpConnector::new("127.0.0.1:3000"); - let mut c_b = TcpConnector::new("127.0.0.1:3001"); - - let mut r_a = c_a.recv().await; + // Create two tcp connectors that listen on ports 3000 and 3001. + let mut conn_a = TcpConnector::new("127.0.0.1:3000"); + let mut conn_b = TcpConnector::new("127.0.0.1:3001"); + // Spawn a task to print out messages received by the first tcp connector tokio::spawn(async move { - while let Some((from, msg)) = r_a.recv().await { - println!("Got {}, {:?}", from, msg); + // Register a receiver channel on the first tcp connector + let mut recv_a = conn_a.recv().await; + + while let Some((from, msg)) = recv_a.recv().await { + println!("conn_a received {:?} from {}", msg, from); } }); - sleep(Duration::from_secs(2)).await; - - c_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec()); - c_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec()); - c_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec()); + // Send a message multiple times from conn_b to conn_a + conn_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec()); + conn_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec()); + conn_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec()); - sleep(Duration::from_secs(10)).await; - println!("Closing"); + // Allow some time for conn_a to receive the messages + sleep(Duration::from_secs(2)).await; } diff --git a/examples/tcp/src/tcp/actor.rs b/examples/tcp/src/tcp/actor.rs index 7d105401ff50eb09f5b2a3df653a338a5d5d2914..d611a612cd7155462b683a1aa31065a3adc0ff1a 100644 --- a/examples/tcp/src/tcp/actor.rs +++ b/examples/tcp/src/tcp/actor.rs @@ -7,8 +7,8 @@ use super::connection::{ConnectionHandle, ReceivedEvent}; use super::listen::listen_tcp; use super::DEFAULT_CHANNEL_SIZE; -/// Messages that the Net actor knows how to process -pub enum NetMessage { +/// Messages that this actor knows how to process +pub enum HandleMessage { NewConnection { from: String, socket: TcpStream @@ -29,20 +29,24 @@ pub enum NetMessage { // Map address:port to connection handle(s) type Connections = HashMap<String, Vec<ConnectionHandle>>; +/// Represents a collection of channels that we use to emit messages received over the network +type MessageHandlers = Vec<mpsc::Sender<(String, Vec<u8>)>>; + /// Net actor, orchestrates the RPS, Listen and Connection Actors struct Actor { - /// Wallet message handlers - emit: Vec<mpsc::Sender<(String, Vec<u8>)>>, + /// Application handlers to broadcast messages down. + handlers: MessageHandlers, - /// Map of addr -> communication channel for currently active connection agents + /// Holds ownership of all the, currently live, connection actors. connections: Connections, + /// A sender channel to be passed to each connection actor on creation. connection_sender: mpsc::Sender<ReceivedEvent>, } #[derive(Clone)] pub struct ActorHandle { - pub sender: mpsc::Sender<NetMessage> + pub sender: mpsc::Sender<HandleMessage> } impl ActorHandle { @@ -69,10 +73,10 @@ impl ActorHandle { }; }, Some((from, socket)) = listening.recv() => { - actor.handle_message(NetMessage::NewConnection { from, socket }); + actor.handle_message(HandleMessage::NewConnection { from, socket }); }, Some(ReceivedEvent { from, message }) = receiver_connection.recv() => { - actor.handle_message(NetMessage::ReceiveMessage { from, message }); + actor.handle_message(HandleMessage::ReceiveMessage { from, message }); } } } @@ -85,12 +89,13 @@ impl ActorHandle { impl Actor { pub fn new(connection_sender: mpsc::Sender<ReceivedEvent>) -> Self { - Actor { emit: vec![], connections: Connections::new(), connection_sender } + Actor { handlers: MessageHandlers::new(), connections: Connections::new(), connection_sender } } - pub fn handle_message(&mut self, msg: NetMessage) { + pub fn handle_message(&mut self, msg: HandleMessage) { match msg { - NetMessage::NewConnection { from, socket } => { + HandleMessage::NewConnection { from, socket } => { + // Spawn a new connection handle to service this new tcp socket let handle = ConnectionHandle::from_socket(from.clone(), socket, self.connection_sender.clone()); self.connections @@ -98,7 +103,7 @@ impl Actor { .or_default() .push(handle); }, - NetMessage::SendMessage { to, message } => { + HandleMessage::SendMessage { to, message } => { // Purge stale connection handles that have already closed. self.connections.entry(to.clone()).and_modify(|handles| { handles.retain(|h| h.open()); @@ -112,15 +117,17 @@ impl Actor { .unwrap() .send(message); }, - NetMessage::ReceiveMessage { from, message } => { - self.emit + HandleMessage::ReceiveMessage { from, message } => { + // Broadcast to the application handlers when we receive a message from a connection actor. + self.handlers .iter_mut() .for_each(|sender| { let _ = sender.try_send((from.clone(), message.clone())); }); }, - NetMessage::RegisterMessageHandler { respond_to } => { + HandleMessage::RegisterMessageHandler { respond_to } => { + // Register an application handler let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE); - self.emit.push(sender); + self.handlers.push(sender); respond_to.send(receiver).expect("Channel should be alive"); }, diff --git a/examples/tcp/src/tcp/listen.rs b/examples/tcp/src/tcp/listen.rs index 595de1a017382ecdd5c6220c5768886f0750c48a..59a04bc0200cce02f79e66a578c1925b2fed19c7 100644 --- a/examples/tcp/src/tcp/listen.rs +++ b/examples/tcp/src/tcp/listen.rs @@ -5,7 +5,7 @@ use tokio::net::{TcpListener, TcpStream}; use super::DEFAULT_CHANNEL_SIZE; -/// `listen_tcp` spawns an Actor that waits for inbound requests to open TCP connections, accepts them and then starts a connection actor to exchange messages with them. +/// `listen_tcp` spawns an Actor that waits for inbound requests to open TCP connections, accepts them and then emits them on the returned channel. pub async fn listen_tcp(listen_addr: &str) -> mpsc::Receiver<(String, TcpStream)> { let listener = TcpListener::bind(listen_addr).await.expect("Couldn't bind to listen address"); diff --git a/examples/tcp/src/tcp/mod.rs b/examples/tcp/src/tcp/mod.rs index 1ba975ccbda2872a0d453e46c1173c53768ff056..c27e16bb98476b816981d3af229e0b60801e4455 100644 --- a/examples/tcp/src/tcp/mod.rs +++ b/examples/tcp/src/tcp/mod.rs @@ -23,14 +23,14 @@ impl TcpConnector { impl Network for TcpConnector { /// Send a message to the identifier specified by the 'to' field. fn send(&mut self, to: &str, message: Vec<u8>) { - self.handle.sender.try_send(actor::NetMessage::SendMessage { to: String::from(to), message }).expect("Actor should be alive"); + self.handle.sender.try_send(actor::HandleMessage::SendMessage { to: String::from(to), message }).expect("Actor should be alive"); } /// Generate one or more receivers to catch inbound messages. async fn recv(&mut self) -> mpsc::Receiver<(String, Vec<u8>)> { let (respond_to, receiver) = oneshot::channel(); - self.handle.sender.try_send(actor::NetMessage::RegisterMessageHandler { respond_to }).expect("Actor should be alive"); + self.handle.sender.try_send(actor::HandleMessage::RegisterMessageHandler { respond_to }).expect("Actor should be alive"); receiver.await.expect("Should have receievd channel back") }