Mentions légales du service

Skip to content
Snippets Groups Projects
Commit b3c9f86b authored by WILLIAMS Harvey's avatar WILLIAMS Harvey
Browse files

Working TCP example

parent 82de5902
Branches
No related tags found
1 merge request!1examples/tcp
...@@ -67,6 +67,95 @@ version = "1.0.0" ...@@ -67,6 +67,95 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "futures"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-macro"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
[[package]]
name = "futures-task"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-util"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]] [[package]]
name = "gimli" name = "gimli"
version = "0.31.1" version = "0.31.1"
...@@ -162,6 +251,12 @@ version = "0.2.16" ...@@ -162,6 +251,12 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.94" version = "1.0.94"
...@@ -218,6 +313,15 @@ dependencies = [ ...@@ -218,6 +313,15 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "slab"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
dependencies = [
"autocfg",
]
[[package]] [[package]]
name = "smallvec" name = "smallvec"
version = "1.14.0" version = "1.14.0"
...@@ -245,6 +349,17 @@ dependencies = [ ...@@ -245,6 +349,17 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "tcp"
version = "0.1.0"
dependencies = [
"async-trait",
"futures",
"rust-networking-examples",
"tokio",
"tokio-util",
]
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.44.1" version = "1.44.1"
...@@ -274,6 +389,19 @@ dependencies = [ ...@@ -274,6 +389,19 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "tokio-util"
version = "0.7.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "unicode-ident" name = "unicode-ident"
version = "1.0.18" version = "1.0.18"
......
...@@ -6,7 +6,7 @@ edition = "2021" ...@@ -6,7 +6,7 @@ edition = "2021"
[workspace] [workspace]
members = [ members = [
"examples/local" "examples/local"
] , "examples/tcp"]
[dependencies] [dependencies]
tokio = { version = "1.44.1", features = ["full"] } tokio = { version = "1.44.1", features = ["full"] }
......
[package]
name = "tcp"
version = "0.1.0"
edition = "2021"
[dependencies]
# For the interface
rust-networking-examples = { path = "../../" }
# Async related
tokio = { version = "1.44.1", features = ["full"] }
tokio-util = { version = "0.7.15", features = ["codec"] }
async-trait = "0.1.87"
futures = "0.3.31"
use std::time::Duration;
use tcp::TcpConnector;
use rust_networking_examples::Network;
use tokio::time::sleep;
mod tcp;
#[tokio::main]
async fn main() {
let mut c_a = TcpConnector::new("127.0.0.1:3000");
let mut c_b = TcpConnector::new("127.0.0.1:3001");
let mut r_a = c_a.recv().await;
tokio::spawn(async move {
while let Some((from, msg)) = r_a.recv().await {
println!("Got {}, {:?}", from, msg);
}
});
sleep(Duration::from_secs(2)).await;
c_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec());
c_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec());
c_b.send("127.0.0.1:3000", "Hello world!".as_bytes().to_vec());
sleep(Duration::from_secs(10)).await;
println!("Closing");
}
use tokio::net::TcpStream;
use tokio::select;
use std::collections::HashMap;
use tokio::sync::{mpsc, oneshot};
use super::connection::{ConnectionHandle, ReceivedEvent};
use super::listen::listen_tcp;
use super::DEFAULT_CHANNEL_SIZE;
/// Messages that the Net actor knows how to process
pub enum NetMessage {
NewConnection {
from: String,
socket: TcpStream
},
SendMessage {
to: String,
message: Vec<u8>,
},
ReceiveMessage {
from: String,
message: Vec<u8>
},
RegisterMessageHandler {
respond_to: oneshot::Sender<mpsc::Receiver<(String, Vec<u8>)>>
}
}
// Map address:port to connection handle(s)
type Connections = HashMap<String, Vec<ConnectionHandle>>;
/// Net actor, orchestrates the RPS, Listen and Connection Actors
struct Actor {
/// Wallet message handlers
emit: Vec<mpsc::Sender<(String, Vec<u8>)>>,
/// Map of addr -> communication channel for currently active connection agents
connections: Connections,
connection_sender: mpsc::Sender<ReceivedEvent>,
}
#[derive(Clone)]
pub struct ActorHandle {
pub sender: mpsc::Sender<NetMessage>
}
impl ActorHandle {
pub fn new(bind_address: String) -> Self {
// Channel to communicate with parent
let (sender, mut receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
// Channel to receive incoming messages from connections
let (sender_connection, mut receiver_connection) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
tokio::spawn(async move {
// Channel to accept new connections
let mut listening = listen_tcp(&bind_address).await;
// Event loop
let mut actor = Actor::new(sender_connection);
loop {
select! {
event = receiver.recv() => {
match event {
Some(msg) => actor.handle_message(msg),
None => break, // Exit
};
},
Some((from, socket)) = listening.recv() => {
actor.handle_message(NetMessage::NewConnection { from, socket });
},
Some(ReceivedEvent { from, message }) = receiver_connection.recv() => {
actor.handle_message(NetMessage::ReceiveMessage { from, message });
}
}
}
});
ActorHandle { sender }
}
}
impl Actor {
pub fn new(connection_sender: mpsc::Sender<ReceivedEvent>) -> Self {
Actor { emit: vec![], connections: Connections::new(), connection_sender }
}
pub fn handle_message(&mut self, msg: NetMessage) {
match msg {
NetMessage::NewConnection { from, socket } => {
let handle = ConnectionHandle::from_socket(from.clone(), socket, self.connection_sender.clone());
self.connections
.entry(from)
.or_default()
.push(handle);
},
NetMessage::SendMessage { to, message } => {
// Purge stale connection handles that have already closed.
self.connections.entry(to.clone()).and_modify(|handles| {
handles.retain(|h| h.open());
});
// Send message to the first connection handle we find, or open a new one.
self.connections
.entry(to.clone())
.or_insert(vec![ConnectionHandle::new(to, self.connection_sender.clone())])
.first_mut()
.unwrap()
.send(message);
},
NetMessage::ReceiveMessage { from, message } => {
self.emit
.iter_mut()
.for_each(|sender| { let _ = sender.try_send((from.clone(), message.clone())); });
},
NetMessage::RegisterMessageHandler { respond_to } => {
let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
self.emit.push(sender);
respond_to.send(receiver).expect("Channel should be alive");
},
}
}
}
\ No newline at end of file
use futures::{sink::SinkExt, StreamExt};
use tokio::{io::{self, split}, net::TcpStream, select, sync::mpsc};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use super::DEFAULT_CHANNEL_SIZE;
// The event emitted by this actor, denoting a message and who it was received from.
pub struct ReceivedEvent {
pub from: String,
pub message: Vec<u8>
}
pub enum ConnectionHandleMessage {
Send(Vec<u8>),
Receive(Vec<u8>)
}
#[derive(Clone)]
pub struct ConnectionHandle {
pub sender: mpsc::Sender<ConnectionHandleMessage>
}
impl ConnectionHandle {
pub fn from_socket(address: String, socket: TcpStream, emit: mpsc::Sender<ReceivedEvent>) -> Self {
let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
event_loop(receiver, address, Some(socket), emit);
ConnectionHandle { sender }
}
pub fn new(address: String, emit: mpsc::Sender<ReceivedEvent>) -> Self {
let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
event_loop(receiver, address, None, emit);
ConnectionHandle { sender }
}
pub fn open(&self) -> bool {
!self.sender.is_closed()
}
pub fn send(&mut self, message: Vec<u8>) {
self.sender.try_send(ConnectionHandleMessage::Send(message)).expect("Actor should be alive");
}
}
/// Function for creating the actor and then running the event loop
fn event_loop(mut receiver: mpsc::Receiver<ConnectionHandleMessage>, address: String, socket: Option<TcpStream>, emit: mpsc::Sender<ReceivedEvent>) {
tokio::spawn(async move {
let socket = match socket {
Some(s) => s,
None => {
TcpStream::connect(address.clone())
.await
.expect("Failed to connect to address")
},
};
// Start connection actor
let (writer, mut reader) = frame_and_split_socket(socket);
// Writer becomes and channel that we can send messsages down
let writer = start_writer_task(writer);
let mut actor = Actor { address: address.clone(), emit: emit.clone(), writer };
loop {
select!{
received = reader.next() => {
if let Some(message) = received {
let bytes = message.expect("Message should be readble");
actor.handle_message(ConnectionHandleMessage::Receive(bytes.into()));
} else {
break; // Connection finished
}
},
Some(msg) = receiver.recv() => {
actor.handle_message(msg);
},
else => break // Actor handle dropped
}
}
});
}
struct Actor {
address: String,
emit: mpsc::Sender<ReceivedEvent>,
writer: mpsc::Sender<Vec<u8>>,
}
impl Actor {
pub fn handle_message(&mut self, msg: ConnectionHandleMessage) {
match msg {
ConnectionHandleMessage::Send(message) => {
self.writer.try_send(message).expect("Writer task should be running");
},
ConnectionHandleMessage::Receive(message) => {
self.emit.try_send(ReceivedEvent { from: self.address.clone(), message }).expect("Should be able to emit events");
},
}
}
}
type ConnectionReader = FramedRead<io::ReadHalf<TcpStream>, LengthDelimitedCodec>;
type ConnectionWriter = FramedWrite<io::WriteHalf<TcpStream>, LengthDelimitedCodec>;
/// Split TcpStream into two seperate ownership objects; a writer (for sending) and a reader (for receiving).
fn frame_and_split_socket(socket: TcpStream) -> (ConnectionWriter, ConnectionReader) {
// Split the TcpStream into read and write halves
let (read_half, write_half) = split(socket);
// Create FramedRead for receiving and FramedWrite for sending
let reader = FramedRead::new(read_half, LengthDelimitedCodec::new());
let writer = FramedWrite::new(write_half, LengthDelimitedCodec::new());
(writer, reader)
}
/// To write to the underlying TCP socket, we need an async context. This is setup by this function.
fn start_writer_task(mut writer: ConnectionWriter) -> mpsc::Sender<Vec<u8>> {
let (sender, mut receiver) = mpsc::channel::<Vec<u8>>(DEFAULT_CHANNEL_SIZE);
tokio::spawn(async move {
while let Some(bytes) = receiver.recv().await {
writer.send(bytes.into()).await.expect("Should have sent message");
}
});
sender
}
\ No newline at end of file
use tokio::select;
use tokio::sync::mpsc;
use tokio::net::{TcpListener, TcpStream};
use super::DEFAULT_CHANNEL_SIZE;
/// `listen_tcp` spawns an Actor that waits for inbound requests to open TCP connections, accepts them and then starts a connection actor to exchange messages with them.
pub async fn listen_tcp(listen_addr: &str) -> mpsc::Receiver<(String, TcpStream)> {
let listener = TcpListener::bind(listen_addr).await.expect("Couldn't bind to listen address");
let (sender, receiver) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
tokio::spawn(async move {
loop {
select! {
// Accept all inbound TCP connections
incoming = listener.accept() => {
match incoming {
Ok((socket, socket_addr)) => {
sender.send((socket_addr.to_string(), socket)).await.expect("Parent actor should be alive and accepting new connections");
},
Err(error) => {
println!("TCP : error accepting connection {}", error);
}
}
}
}
}
});
receiver
}
\ No newline at end of file
// Actors
mod actor;
mod listen;
mod connection;
pub use actor::ActorHandle;
use rust_networking_examples::Network;
use tokio::sync::{mpsc, oneshot};
const DEFAULT_CHANNEL_SIZE: usize = 1024;
pub struct TcpConnector {
handle: ActorHandle
}
impl TcpConnector {
pub fn new(bind_address: &str) -> TcpConnector {
TcpConnector { handle: ActorHandle::new(String::from(bind_address)) }
}
}
#[async_trait::async_trait]
impl Network for TcpConnector {
/// Send a message to the identifier specified by the 'to' field.
fn send(&mut self, to: &str, message: Vec<u8>) {
self.handle.sender.try_send(actor::NetMessage::SendMessage { to: String::from(to), message }).expect("Actor should be alive");
}
/// Generate one or more receivers to catch inbound messages.
async fn recv(&mut self) -> mpsc::Receiver<(String, Vec<u8>)> {
let (respond_to, receiver) = oneshot::channel();
self.handle.sender.try_send(actor::NetMessage::RegisterMessageHandler { respond_to }).expect("Actor should be alive");
receiver.await.expect("Should have receievd channel back")
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment