From b3c9f86b022a7f09aff050ef36fc47b8e79178f7 Mon Sep 17 00:00:00 2001
From: WILLIAMS Harvey <hwilliam@inria.fr>
Date: Tue, 6 May 2025 15:55:01 +0200
Subject: [PATCH 1/2] Working TCP example

---
 Cargo.lock                         | 128 +++++++++++++++++++++++++++
 Cargo.toml                         |   2 +-
 examples/tcp/Cargo.toml            |  14 +++
 examples/tcp/src/main.rs           |  30 +++++++
 examples/tcp/src/tcp/actor.rs      | 129 +++++++++++++++++++++++++++
 examples/tcp/src/tcp/connection.rs | 134 +++++++++++++++++++++++++++++
 examples/tcp/src/tcp/listen.rs     |  33 +++++++
 examples/tcp/src/tcp/mod.rs        |  37 ++++++++
 8 files changed, 506 insertions(+), 1 deletion(-)
 create mode 100644 examples/tcp/Cargo.toml
 create mode 100644 examples/tcp/src/main.rs
 create mode 100644 examples/tcp/src/tcp/actor.rs
 create mode 100644 examples/tcp/src/tcp/connection.rs
 create mode 100644 examples/tcp/src/tcp/listen.rs
 create mode 100644 examples/tcp/src/tcp/mod.rs

diff --git a/Cargo.lock b/Cargo.lock
index 02a1cd8..ebf809b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -67,6 +67,95 @@ version = "1.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
 
+[[package]]
+name = "futures"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-executor",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
+name = "futures-channel"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
+dependencies = [
+ "futures-core",
+ "futures-sink",
+]
+
+[[package]]
+name = "futures-core"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
+
+[[package]]
+name = "futures-executor"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
+dependencies = [
+ "futures-core",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
+name = "futures-io"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
+
+[[package]]
+name = "futures-macro"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "futures-sink"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
+
+[[package]]
+name = "futures-task"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
+
+[[package]]
+name = "futures-util"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-io",
+ "futures-macro",
+ "futures-sink",
+ "futures-task",
+ "memchr",
+ "pin-project-lite",
+ "pin-utils",
+ "slab",
+]
+
 [[package]]
 name = "gimli"
 version = "0.31.1"
@@ -162,6 +251,12 @@ version = "0.2.16"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
 
+[[package]]
+name = "pin-utils"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
+
 [[package]]
 name = "proc-macro2"
 version = "1.0.94"
@@ -218,6 +313,15 @@ dependencies = [
  "libc",
 ]
 
+[[package]]
+name = "slab"
+version = "0.4.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
+dependencies = [
+ "autocfg",
+]
+
 [[package]]
 name = "smallvec"
 version = "1.14.0"
@@ -245,6 +349,17 @@ dependencies = [
  "unicode-ident",
 ]
 
+[[package]]
+name = "tcp"
+version = "0.1.0"
+dependencies = [
+ "async-trait",
+ "futures",
+ "rust-networking-examples",
+ "tokio",
+ "tokio-util",
+]
+
 [[package]]
 name = "tokio"
 version = "1.44.1"
@@ -274,6 +389,19 @@ dependencies = [
  "syn",
 ]
 
+[[package]]
+name = "tokio-util"
+version = "0.7.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df"
+dependencies = [
+ "bytes",
+ "futures-core",
+ "futures-sink",
+ "pin-project-lite",
+ "tokio",
+]
+
 [[package]]
 name = "unicode-ident"
 version = "1.0.18"
diff --git a/Cargo.toml b/Cargo.toml
index 0198f9d..df86bbf 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -6,7 +6,7 @@ edition = "2021"
 [workspace]
 members = [
     "examples/local"
-]
+, "examples/tcp"]
 
 [dependencies]
 tokio = { version = "1.44.1", features = ["full"] }
diff --git a/examples/tcp/Cargo.toml b/examples/tcp/Cargo.toml
new file mode 100644
index 0000000..beb6965
--- /dev/null
+++ b/examples/tcp/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "tcp"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+# For the interface
+rust-networking-examples = { path = "../../" }
+
+# Async related
+tokio = { version = "1.44.1", features = ["full"] }
+tokio-util = { version = "0.7.15", features = ["codec"] }
+async-trait = "0.1.87"
+futures = "0.3.31"
diff --git a/examples/tcp/src/main.rs b/examples/tcp/src/main.rs
new file mode 100644
index 0000000..c7d8709
--- /dev/null
+++ b/examples/tcp/src/main.rs
@@ -0,0 +1,30 @@
+use std::time::Duration;
+
+use tcp::TcpConnector;
+use rust_networking_examples::Network;
+use tokio::time::sleep;
+
+mod tcp;
+
+#[tokio::main]
+async fn main() {
+    let mut c_a = TcpConnector::new("127.0.0.1:3000");
+    let mut c_b = TcpConnector::new("127.0.0.1:3001");
+
+    let mut r_a = c_a.recv().await;
+
+    tokio::spawn(async move {
+        while let Some((from, msg)) = r_a.recv().await {
+            println!("Got {}, {:?}", from, msg);
+        }
+    });
+
+    sleep(Duration::from_secs(2)).await;
+
+    c_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec());
+    c_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec());
+    c_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec());
+    
+    sleep(Duration::from_secs(10)).await;
+    println!("Closing");
+}
diff --git a/examples/tcp/src/tcp/actor.rs b/examples/tcp/src/tcp/actor.rs
new file mode 100644
index 0000000..7d10540
--- /dev/null
+++ b/examples/tcp/src/tcp/actor.rs
@@ -0,0 +1,129 @@
+use tokio::net::TcpStream;
+use tokio::select;
+use std::collections::HashMap;
+use tokio::sync::{mpsc, oneshot};
+
+use super::connection::{ConnectionHandle, ReceivedEvent};
+use super::listen::listen_tcp;
+use super::DEFAULT_CHANNEL_SIZE;
+
+/// Messages that the Net actor knows how to process
+pub enum NetMessage {
+    NewConnection {
+        from: String,
+        socket: TcpStream
+    },
+    SendMessage {
+        to: String,
+        message: Vec<u8>,
+    },
+    ReceiveMessage {
+        from: String,
+        message: Vec<u8> 
+    },
+    RegisterMessageHandler {
+        respond_to: oneshot::Sender<mpsc::Receiver<(String, Vec<u8>)>>
+    }
+}
+
+// Map address:port to connection handle(s)
+type Connections = HashMap<String, Vec<ConnectionHandle>>; 
+
+/// Net actor, orchestrates the RPS, Listen and Connection Actors
+struct Actor {
+    /// Wallet message handlers
+    emit: Vec<mpsc::Sender<(String, Vec<u8>)>>,
+
+    /// Map of addr -> communication channel for currently active connection agents
+    connections: Connections, 
+
+    connection_sender: mpsc::Sender<ReceivedEvent>,
+}
+
+#[derive(Clone)]
+pub struct ActorHandle {
+    pub sender: mpsc::Sender<NetMessage>
+}
+
+impl ActorHandle {
+    pub fn new(bind_address: String) -> Self {
+        // Channel to communicate with parent
+        let (sender, mut receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
+
+        // Channel to receive incoming messages from connections
+        let (sender_connection, mut receiver_connection) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
+
+        tokio::spawn(async move {
+            // Channel to accept new connections
+            let mut listening = listen_tcp(&bind_address).await;
+
+            // Event loop
+            let mut actor = Actor::new(sender_connection);
+
+            loop {
+                select! {
+                    event = receiver.recv() => {
+                        match event {
+                            Some(msg) => actor.handle_message(msg),
+                            None => break, // Exit
+                        };
+                    },
+                    Some((from, socket)) = listening.recv() => {
+                        actor.handle_message(NetMessage::NewConnection { from, socket });
+                    },
+                    Some(ReceivedEvent { from, message }) = receiver_connection.recv() => {
+                        actor.handle_message(NetMessage::ReceiveMessage { from, message });
+                    }
+                }
+            }
+        });
+
+        ActorHandle { sender } 
+    }
+}
+
+
+impl Actor {
+    pub fn new(connection_sender: mpsc::Sender<ReceivedEvent>) -> Self {
+        Actor { emit: vec![], connections: Connections::new(), connection_sender }
+    }
+
+    pub fn handle_message(&mut self, msg: NetMessage) {
+        match msg {
+            NetMessage::NewConnection { from, socket } => {
+                let handle = ConnectionHandle::from_socket(from.clone(), socket, self.connection_sender.clone());
+
+                self.connections
+                    .entry(from)
+                    .or_default()
+                    .push(handle);
+            },
+            NetMessage::SendMessage { to, message } => {
+                // Purge stale connection handles that have already closed.
+                self.connections.entry(to.clone()).and_modify(|handles| {
+                    handles.retain(|h| h.open());
+                });
+
+                // Send message to the first connection handle we find, or open a new one.
+                self.connections
+                    .entry(to.clone())
+                    .or_insert(vec![ConnectionHandle::new(to, self.connection_sender.clone())])
+                    .first_mut()
+                    .unwrap()
+                    .send(message);
+            },
+            NetMessage::ReceiveMessage { from, message } => {
+                self.emit
+                    .iter_mut()
+                    .for_each(|sender| { let _ = sender.try_send((from.clone(), message.clone())); });
+            },
+            NetMessage::RegisterMessageHandler { respond_to } => {
+                let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
+
+                self.emit.push(sender);
+
+                respond_to.send(receiver).expect("Channel should be alive");
+            },
+        }
+    }
+}
\ No newline at end of file
diff --git a/examples/tcp/src/tcp/connection.rs b/examples/tcp/src/tcp/connection.rs
new file mode 100644
index 0000000..56c1a62
--- /dev/null
+++ b/examples/tcp/src/tcp/connection.rs
@@ -0,0 +1,134 @@
+use futures::{sink::SinkExt, StreamExt};
+use tokio::{io::{self, split}, net::TcpStream, select, sync::mpsc};
+use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
+
+use super::DEFAULT_CHANNEL_SIZE;
+
+// The event emitted by this actor, denoting a message and who it was received from.
+pub struct ReceivedEvent {
+    pub from: String,
+    pub message: Vec<u8>
+}
+
+pub enum ConnectionHandleMessage {
+    Send(Vec<u8>),
+    Receive(Vec<u8>)
+}
+
+#[derive(Clone)]
+pub struct ConnectionHandle {
+    pub sender: mpsc::Sender<ConnectionHandleMessage>
+}
+
+impl ConnectionHandle {
+    pub fn from_socket(address: String, socket: TcpStream, emit: mpsc::Sender<ReceivedEvent>) -> Self {
+        let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
+
+        event_loop(receiver, address, Some(socket), emit);
+
+        ConnectionHandle { sender }
+    }
+
+    pub fn new(address: String, emit: mpsc::Sender<ReceivedEvent>) -> Self {
+        let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
+
+        event_loop(receiver, address, None, emit);
+
+        ConnectionHandle { sender }
+    }
+
+    pub fn open(&self) -> bool {
+        !self.sender.is_closed()
+    }
+
+    pub fn send(&mut self, message: Vec<u8>) {
+        self.sender.try_send(ConnectionHandleMessage::Send(message)).expect("Actor should be alive");
+    }
+}
+
+/// Function for creating the actor and then running the event loop
+fn event_loop(mut receiver: mpsc::Receiver<ConnectionHandleMessage>, address: String, socket: Option<TcpStream>, emit: mpsc::Sender<ReceivedEvent>) {
+    tokio::spawn(async move {
+        let socket = match socket {
+            Some(s) => s,
+            None => {
+                TcpStream::connect(address.clone())
+                .await
+                .expect("Failed to connect to address")
+            },
+        };
+
+        // Start connection actor
+        let (writer, mut reader) = frame_and_split_socket(socket);
+
+        // Writer becomes and channel that we can send messsages down
+        let writer = start_writer_task(writer);
+
+        let mut actor = Actor { address: address.clone(), emit: emit.clone(), writer };
+
+        loop {
+            select!{
+                received = reader.next() => {
+                    if let Some(message) = received {
+                        let bytes = message.expect("Message should be readble");
+                        actor.handle_message(ConnectionHandleMessage::Receive(bytes.into()));
+                    } else {
+                        break; // Connection finished
+                    }
+                },
+                Some(msg) = receiver.recv() => {
+                    actor.handle_message(msg);
+                },
+                else => break // Actor handle dropped
+            }
+        }
+    });
+}
+
+struct Actor {
+    address: String,
+    emit: mpsc::Sender<ReceivedEvent>,
+    writer: mpsc::Sender<Vec<u8>>,
+}
+
+impl Actor {
+    pub fn handle_message(&mut self, msg: ConnectionHandleMessage) {
+        match msg {
+            ConnectionHandleMessage::Send(message) => {
+                self.writer.try_send(message).expect("Writer task should be running");
+            },
+            ConnectionHandleMessage::Receive(message) => {
+                self.emit.try_send(ReceivedEvent { from: self.address.clone(), message }).expect("Should be able to emit events");
+            },
+        }
+    }
+}
+
+
+type ConnectionReader = FramedRead<io::ReadHalf<TcpStream>, LengthDelimitedCodec>;
+type ConnectionWriter = FramedWrite<io::WriteHalf<TcpStream>, LengthDelimitedCodec>;
+
+/// Split TcpStream into two seperate ownership objects; a writer (for sending) and a reader (for receiving).
+fn frame_and_split_socket(socket: TcpStream) -> (ConnectionWriter, ConnectionReader) {
+    // Split the TcpStream into read and write halves
+    let (read_half, write_half) = split(socket);
+
+    // Create FramedRead for receiving and FramedWrite for sending
+    let reader = FramedRead::new(read_half, LengthDelimitedCodec::new());
+    let writer = FramedWrite::new(write_half, LengthDelimitedCodec::new());
+
+    (writer, reader)
+}
+
+/// To write to the underlying TCP socket, we need an async context. This is setup by this function.
+fn start_writer_task(mut writer: ConnectionWriter) -> mpsc::Sender<Vec<u8>> {
+    let (sender, mut receiver) = mpsc::channel::<Vec<u8>>(DEFAULT_CHANNEL_SIZE);
+
+    tokio::spawn(async move {
+        while let Some(bytes) = receiver.recv().await {
+            writer.send(bytes.into()).await.expect("Should have sent message");
+        }
+    });
+
+    sender
+}
\ No newline at end of file
diff --git a/examples/tcp/src/tcp/listen.rs b/examples/tcp/src/tcp/listen.rs
new file mode 100644
index 0000000..595de1a
--- /dev/null
+++ b/examples/tcp/src/tcp/listen.rs
@@ -0,0 +1,33 @@
+use tokio::select;
+use tokio::sync::mpsc;
+use tokio::net::{TcpListener, TcpStream};
+
+use super::DEFAULT_CHANNEL_SIZE;
+
+
+/// `listen_tcp` spawns an Actor that waits for inbound requests to open TCP connections, accepts them and then starts a connection actor to exchange messages with them.
+pub async fn listen_tcp(listen_addr: &str) -> mpsc::Receiver<(String, TcpStream)> {
+    let listener = TcpListener::bind(listen_addr).await.expect("Couldn't bind to listen address");
+
+    let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
+
+    tokio::spawn(async move {
+        loop {
+            select! {
+                // Accept all inbound TCP connections
+                incoming = listener.accept() => { 
+                    match incoming {
+                        Ok((socket, socket_addr)) => {
+                            sender.send((socket_addr.to_string(), socket)).await.expect("Parent actor should be alive and accepting new connections");
+                        },
+                        Err(error) => {
+                            println!("TCP : error accepting connection {}", error);
+                        }
+                    }
+                }
+            }
+        }
+    });
+
+    receiver
+}
\ No newline at end of file
diff --git a/examples/tcp/src/tcp/mod.rs b/examples/tcp/src/tcp/mod.rs
new file mode 100644
index 0000000..1ba975c
--- /dev/null
+++ b/examples/tcp/src/tcp/mod.rs
@@ -0,0 +1,37 @@
+// Actors
+mod actor;
+mod listen;
+mod connection;
+
+pub use actor::ActorHandle;
+use rust_networking_examples::Network;
+use tokio::sync::{mpsc, oneshot};
+
+const DEFAULT_CHANNEL_SIZE: usize = 1024;
+
+pub struct TcpConnector {
+    handle: ActorHandle
+}
+
+impl TcpConnector {
+    pub fn new(bind_address: &str) -> TcpConnector {
+        TcpConnector { handle: ActorHandle::new(String::from(bind_address)) }
+    }
+}
+
+#[async_trait::async_trait]
+impl Network for TcpConnector {
+    /// Send a message to the identifier specified by the 'to' field.
+    fn send(&mut self, to: &str, message: Vec<u8>) {
+        self.handle.sender.try_send(actor::NetMessage::SendMessage { to: String::from(to), message }).expect("Actor should be alive");
+    }
+
+    /// Generate one or more receivers to catch inbound messages.
+    async fn recv(&mut self) -> mpsc::Receiver<(String, Vec<u8>)> {
+        let (respond_to, receiver) = oneshot::channel();
+
+        self.handle.sender.try_send(actor::NetMessage::RegisterMessageHandler { respond_to }).expect("Actor should be alive");
+
+        receiver.await.expect("Should have receievd channel back")
+    }
+}
-- 
GitLab


From 53ec3a30727ba33e78d1bd0be3b0314a8a487f36 Mon Sep 17 00:00:00 2001
From: WILLIAMS Harvey <hwilliam@inria.fr>
Date: Wed, 7 May 2025 16:45:47 +0200
Subject: [PATCH 2/2] Documentation

---
 Cargo.toml                     |  5 +++--
 examples/tcp/src/README.md     | 12 +++++++++++
 examples/tcp/src/main.rs       | 30 ++++++++++++++------------
 examples/tcp/src/tcp/actor.rs  | 39 ++++++++++++++++++++--------------
 examples/tcp/src/tcp/listen.rs |  2 +-
 examples/tcp/src/tcp/mod.rs    |  4 ++--
 6 files changed, 57 insertions(+), 35 deletions(-)
 create mode 100644 examples/tcp/src/README.md

diff --git a/Cargo.toml b/Cargo.toml
index df86bbf..eacb326 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -5,8 +5,9 @@ edition = "2021"
 
 [workspace]
 members = [
-    "examples/local"
-, "examples/tcp"]
+    "examples/local",
+    "examples/tcp"
+]
 
 [dependencies]
 tokio = { version = "1.44.1", features = ["full"] }
diff --git a/examples/tcp/src/README.md b/examples/tcp/src/README.md
new file mode 100644
index 0000000..9d692b1
--- /dev/null
+++ b/examples/tcp/src/README.md
@@ -0,0 +1,12 @@
+# examples/tcp
+
+An implementation of the network interface, using TCP sockets. Identifiers follow the format of `{IP}:{port}`.
+
+Identifers now behave slightly differently to reflect the realities of using raw sockets, in contrast to `examples/local` where they consistently map 1-1 to an instance of `Connector`. On receiving messages:
+- `{IP}` may be either an IPV4 address or an IPV6 address, depending on your OS and networking environment. The `{port}` is the source port for the tcp connection.
+- `{port}`, will be unpredictable and assigned by the operating system of which other peer's `Connector` instance is located.   
+- `Connector` may have one or more open sockets with another `Connector` instance, each with a distinct source port number.
+
+When designing your application, you may want to be able to retain some form of unique identifier for each Connector instance on the network:
+- If you can be certain that only one instance of `Connector` is behind each IP address, perhaps on a local network such as Grid5000 in which each node has at most one connector; the IP address (with the port stripped out) may be suitable to identify peers. 
+- Otherwise, you may want to define some messaging protocol on top of the interface yourself. For instance using public key cryptography to consistently distinguish peers. 
\ No newline at end of file
diff --git a/examples/tcp/src/main.rs b/examples/tcp/src/main.rs
index c7d8709..6197894 100644
--- a/examples/tcp/src/main.rs
+++ b/examples/tcp/src/main.rs
@@ -1,30 +1,32 @@
+use tokio::time::sleep;
 use std::time::Duration;
 
 use tcp::TcpConnector;
 use rust_networking_examples::Network;
-use tokio::time::sleep;
 
 mod tcp;
 
 #[tokio::main]
 async fn main() {
-    let mut c_a = TcpConnector::new("127.0.0.1:3000");
-    let mut c_b = TcpConnector::new("127.0.0.1:3001");
-
-    let mut r_a = c_a.recv().await;
+    // Create two tcp connectors that listen on ports 3000 and 3001.
+    let mut conn_a = TcpConnector::new("127.0.0.1:3000");
+    let mut conn_b = TcpConnector::new("127.0.0.1:3001");
 
+    // Spawn a task to print out messages received by the first tcp connector
     tokio::spawn(async move {
-        while let Some((from, msg)) = r_a.recv().await {
-            println!("Got {}, {:?}", from, msg);
+        // Register a receiver channel on the first tcp connector
+        let mut recv_a = conn_a.recv().await;
+
+        while let Some((from, msg)) = recv_a.recv().await {
+            println!("conn_a received {:?} from {}", msg, from);
         }
     });
 
-    sleep(Duration::from_secs(2)).await;
-
-    c_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec());
-    c_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec());
-    c_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec());
+    // Send a message multiple times from conn_b to conn_a
+    conn_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec());
+    conn_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec());
+    conn_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec());
     
-    sleep(Duration::from_secs(10)).await;
-    println!("Closing");
+    // Allow some time for conn_a to receive the messages
+    sleep(Duration::from_secs(2)).await;
 }
diff --git a/examples/tcp/src/tcp/actor.rs b/examples/tcp/src/tcp/actor.rs
index 7d10540..d611a61 100644
--- a/examples/tcp/src/tcp/actor.rs
+++ b/examples/tcp/src/tcp/actor.rs
@@ -7,8 +7,8 @@ use super::connection::{ConnectionHandle, ReceivedEvent};
 use super::listen::listen_tcp;
 use super::DEFAULT_CHANNEL_SIZE;
 
-/// Messages that the Net actor knows how to process
-pub enum NetMessage {
+/// Messages that this actor knows how to process
+pub enum HandleMessage {
     NewConnection {
         from: String,
         socket: TcpStream
@@ -29,20 +29,24 @@ pub enum NetMessage {
 // Map address:port to connection handle(s)
 type Connections = HashMap<String, Vec<ConnectionHandle>>; 
 
+/// Represents a collection of channels that we use to emit messages received over the network
+type MessageHandlers = Vec<mpsc::Sender<(String, Vec<u8>)>>;
+
 /// Net actor, orchestrates the RPS, Listen and Connection Actors
 struct Actor {
-    /// Wallet message handlers
-    emit: Vec<mpsc::Sender<(String, Vec<u8>)>>,
+    /// Application handlers to broadcast messages down.
+    handlers: MessageHandlers,
 
-    /// Map of addr -> communication channel for currently active connection agents
+    /// Holds ownership of all the, currently live, connection actors.
     connections: Connections, 
 
+    /// A sender channel to be passed to each connection actor on creation.
     connection_sender: mpsc::Sender<ReceivedEvent>,
 }
 
 #[derive(Clone)]
 pub struct ActorHandle {
-    pub sender: mpsc::Sender<NetMessage>
+    pub sender: mpsc::Sender<HandleMessage>
 }
 
 impl ActorHandle {
@@ -69,10 +73,10 @@ impl ActorHandle {
                         };
                     },
                     Some((from, socket)) = listening.recv() => {
-                        actor.handle_message(NetMessage::NewConnection { from, socket });
+                        actor.handle_message(HandleMessage::NewConnection { from, socket });
                     },
                     Some(ReceivedEvent { from, message }) = receiver_connection.recv() => {
-                        actor.handle_message(NetMessage::ReceiveMessage { from, message });
+                        actor.handle_message(HandleMessage::ReceiveMessage { from, message });
                     }
                 }
             }
@@ -85,12 +89,13 @@ impl ActorHandle {
 
 impl Actor {
     pub fn new(connection_sender: mpsc::Sender<ReceivedEvent>) -> Self {
-        Actor { emit: vec![], connections: Connections::new(), connection_sender }
+        Actor { handlers: MessageHandlers::new(), connections: Connections::new(), connection_sender }
     }
 
-    pub fn handle_message(&mut self, msg: NetMessage) {
+    pub fn handle_message(&mut self, msg: HandleMessage) {
         match msg {
-            NetMessage::NewConnection { from, socket } => {
+            HandleMessage::NewConnection { from, socket } => {
+                // Spawn a new connection handle to service this new tcp socket
                 let handle = ConnectionHandle::from_socket(from.clone(), socket, self.connection_sender.clone());
 
                 self.connections
@@ -98,7 +103,7 @@ impl Actor {
                     .or_default()
                     .push(handle);
             },
-            NetMessage::SendMessage { to, message } => {
+            HandleMessage::SendMessage { to, message } => {
                 // Purge stale connection handles that have already closed.
                 self.connections.entry(to.clone()).and_modify(|handles| {
                     handles.retain(|h| h.open());
@@ -112,15 +117,17 @@ impl Actor {
                     .unwrap()
                     .send(message);
             },
-            NetMessage::ReceiveMessage { from, message } => {
-                self.emit
+            HandleMessage::ReceiveMessage { from, message } => {
+                // Broadcast to the application handlers when we receive a message from a connection actor.
+                self.handlers
                     .iter_mut()
                     .for_each(|sender| { let _ = sender.try_send((from.clone(), message.clone())); });
             },
-            NetMessage::RegisterMessageHandler { respond_to } => {
+            HandleMessage::RegisterMessageHandler { respond_to } => {
+                // Register an application handler
                 let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
 
-                self.emit.push(sender);
+                self.handlers.push(sender);
 
                 respond_to.send(receiver).expect("Channel should be alive");
             },
diff --git a/examples/tcp/src/tcp/listen.rs b/examples/tcp/src/tcp/listen.rs
index 595de1a..59a04bc 100644
--- a/examples/tcp/src/tcp/listen.rs
+++ b/examples/tcp/src/tcp/listen.rs
@@ -5,7 +5,7 @@ use tokio::net::{TcpListener, TcpStream};
 use super::DEFAULT_CHANNEL_SIZE;
 
 
-/// `listen_tcp` spawns an Actor that waits for inbound requests to open TCP connections, accepts them and then starts a connection actor to exchange messages with them.
+/// `listen_tcp` spawns an Actor that waits for inbound requests to open TCP connections, accepts them and then emits them on the returned channel.
 pub async fn listen_tcp(listen_addr: &str) -> mpsc::Receiver<(String, TcpStream)> {
     let listener = TcpListener::bind(listen_addr).await.expect("Couldn't bind to listen address");
 
diff --git a/examples/tcp/src/tcp/mod.rs b/examples/tcp/src/tcp/mod.rs
index 1ba975c..c27e16b 100644
--- a/examples/tcp/src/tcp/mod.rs
+++ b/examples/tcp/src/tcp/mod.rs
@@ -23,14 +23,14 @@ impl TcpConnector {
 impl Network for TcpConnector {
     /// Send a message to the identifier specified by the 'to' field.
     fn send(&mut self, to: &str, message: Vec<u8>) {
-        self.handle.sender.try_send(actor::NetMessage::SendMessage { to: String::from(to), message }).expect("Actor should be alive");
+        self.handle.sender.try_send(actor::HandleMessage::SendMessage { to: String::from(to), message }).expect("Actor should be alive");
     }
 
     /// Generate one or more receivers to catch inbound messages.
     async fn recv(&mut self) -> mpsc::Receiver<(String, Vec<u8>)> {
         let (respond_to, receiver) = oneshot::channel();
 
-        self.handle.sender.try_send(actor::NetMessage::RegisterMessageHandler { respond_to }).expect("Actor should be alive");
+        self.handle.sender.try_send(actor::HandleMessage::RegisterMessageHandler { respond_to }).expect("Actor should be alive");
 
         receiver.await.expect("Should have receievd channel back")
     }
-- 
GitLab