introduced a Connection struct and a procedure to start the network

This commit is contained in:
Vladimir Komendantskiy 2018-04-01 22:29:12 +01:00
parent 9554c3c543
commit e01a80dfa7
6 changed files with 102 additions and 34 deletions

View File

@ -5,6 +5,7 @@ extern crate hbbft;
use hbbft::node::Node; use hbbft::node::Node;
use docopt::Docopt; use docopt::Docopt;
use std::collections::HashSet;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::vec::Vec; use std::vec::Vec;
@ -21,7 +22,7 @@ Usage:
#[derive(Debug)] #[derive(Debug)]
struct Args { struct Args {
bind_address: SocketAddr, bind_address: SocketAddr,
remote_addresses: Vec<SocketAddr>, remote_addresses: HashSet<SocketAddr>,
value: Option<Vec<u8>>, value: Option<Vec<u8>>,
} }

View File

@ -12,7 +12,7 @@ use task;
/// A communication task connects a remote node to the thread that manages the /// A communication task connects a remote node to the thread that manages the
/// consensus algorithm. /// consensus algorithm.
pub struct CommsTask<T: Send + Sync + From<Vec<u8>> + Into<Vec<u8>>> pub struct CommsTask<'a, T: Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
where Vec<u8>: From<T> where Vec<u8>: From<T>
{ {
/// The transmit side of the multiple producer channel from comms threads. /// The transmit side of the multiple producer channel from comms threads.
@ -20,16 +20,16 @@ where Vec<u8>: From<T>
/// The receive side of the multiple consumer channel to comms threads. /// The receive side of the multiple consumer channel to comms threads.
rx: spmc::Receiver<Message<T>>, rx: spmc::Receiver<Message<T>>,
/// The socket IO task. /// The socket IO task.
task: task::Task task: task::Task<'a>
} }
impl<T: Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>> impl<'a, T: Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
CommsTask<T> CommsTask<'a, T>
where Vec<u8>: From<T> where Vec<u8>: From<T>
{ {
pub fn new(tx: mpsc::Sender<Message<T>>, pub fn new(tx: mpsc::Sender<Message<T>>,
rx: spmc::Receiver<Message<T>>, rx: spmc::Receiver<Message<T>>,
stream: ::std::net::TcpStream) -> Self { stream: &'a ::std::net::TcpStream) -> Self {
CommsTask { CommsTask {
tx: tx, tx: tx,
rx: rx, rx: rx,

74
src/connection.rs Normal file
View File

@ -0,0 +1,74 @@
//! Connection data and initiation routines.
use std::collections::HashSet;
use std::fmt::Debug;
use std::io::{Read, Write, BufReader};
use std::net::{TcpStream, TcpListener, SocketAddr};
#[derive(Debug)]
pub struct Connection {
pub stream: TcpStream,
pub reader: BufReader<TcpStream>,
}
impl Connection {
pub fn new(stream: TcpStream) -> Self {
Connection {
// Create a read buffer of 1K bytes.
reader: BufReader::with_capacity(1024, stream.try_clone().unwrap()),
stream: stream
}
}
}
/// Connect this node to remote peers. A vector of successful connections is
/// returned.
pub fn make(bind_address: &SocketAddr,
remote_addresses: &HashSet<SocketAddr>) -> Vec<Connection>
{
// Connected remote nodes.
// let mut connected: Vec<SocketAddr> = Vec::new();
// Listen for incoming connections on a given TCP port.
let bind_address = bind_address;
let listener = TcpListener::bind(bind_address).unwrap();
// Initialise initial connection states.
let mut connections: Vec<Option<Connection>> =
(0 .. remote_addresses.len())
.into_iter()
.map(|_| None)
.collect();
let here_str = format!("{}", bind_address);
// Wait for all nodes with larger addresses to connect.
for (n, &address) in remote_addresses.iter().enumerate() {
let there_str = format!("{}", address);
if here_str < there_str {
connections[n] =
match listener.accept() {
Ok((stream, _)) => {
info!("Connected to {}", there_str);
Some(Connection::new(stream))
},
Err(_) => None
}
}
}
// Try to connect to all nodes with smaller addresses.
for (n, &address) in remote_addresses.iter().enumerate() {
let there_str = format!("{}", address);
if here_str > there_str {
connections[n] =
match TcpStream::connect(address) {
Ok(stream) => {
info!("Connected to {}", there_str);
Some(Connection::new(stream))
},
Err(_) => None
}
}
}
// remove Nones from connections
connections.into_iter().filter_map(|c| c).collect()
}

View File

@ -47,6 +47,7 @@ extern crate spmc;
extern crate crossbeam; extern crate crossbeam;
extern crate reed_solomon_erasure; extern crate reed_solomon_erasure;
mod connection;
mod errors; mod errors;
mod proto; mod proto;
mod task; mod task;

View File

@ -1,4 +1,5 @@
//! Networking controls of the consensus node. //! Networking controls of the consensus node.
use std::collections::HashSet;
use std::fmt::Debug; use std::fmt::Debug;
use std::hash::Hash; use std::hash::Hash;
use std::marker::{Send, Sync}; use std::marker::{Send, Sync};
@ -8,6 +9,7 @@ use std::sync::mpsc;
use spmc; use spmc;
use crossbeam; use crossbeam;
use connection;
use broadcast; use broadcast;
use proto::Message; use proto::Message;
use commst; use commst;
@ -17,7 +19,7 @@ pub struct Node<T> {
/// Incoming connection socket. /// Incoming connection socket.
addr: SocketAddr, addr: SocketAddr,
/// Sockets of remote nodes. TODO. /// Sockets of remote nodes. TODO.
remotes: Vec<SocketAddr>, remotes: HashSet<SocketAddr>,
/// Optionally, a value to be broadcast by this node. /// Optionally, a value to be broadcast by this node.
value: Option<T> value: Option<T>
} }
@ -27,8 +29,9 @@ impl<T: Clone + Debug + Eq + Hash + Send + Sync + From<Vec<u8>> + AsRef<[u8]>>
where Vec<u8>: From<T> where Vec<u8>: From<T>
{ {
/// Consensus node constructor. It only initialises initial parameters. /// Consensus node constructor. It only initialises initial parameters.
pub fn new(addr: SocketAddr, remotes: Vec<SocketAddr>, value: Option<T>) -> pub fn new(addr: SocketAddr,
Self remotes: HashSet<SocketAddr>,
value: Option<T>) -> Self
{ {
Node {addr, remotes, value} Node {addr, remotes, value}
} }
@ -36,8 +39,6 @@ where Vec<u8>: From<T>
/// Consensus node procedure implementing HoneyBadgerBFT. /// Consensus node procedure implementing HoneyBadgerBFT.
pub fn run(&self) -> Result<T, ()> pub fn run(&self) -> Result<T, ()>
{ {
// Listen for incoming connections on a given TCP port.
let listener = TcpListener::bind(&self.addr).unwrap();
// Multicast channel from the manager task to comms tasks. // Multicast channel from the manager task to comms tasks.
let (stx, srx): (spmc::Sender<Message<T>>, let (stx, srx): (spmc::Sender<Message<T>>,
spmc::Receiver<Message<T>>) = spmc::channel(); spmc::Receiver<Message<T>>) = spmc::channel();
@ -45,29 +46,20 @@ where Vec<u8>: From<T>
let (mtx, mrx): (mpsc::Sender<Message<T>>, let (mtx, mrx): (mpsc::Sender<Message<T>>,
mpsc::Receiver<Message<T>>) = mpsc::channel(); mpsc::Receiver<Message<T>>) = mpsc::channel();
let broadcast_value = self.value.to_owned(); let broadcast_value = self.value.to_owned();
let connections = connection::make(&self.addr, &self.remotes);
// All spawned threads will have exited by the end of the scope. // All spawned threads will have exited by the end of the scope.
crossbeam::scope(|scope| { crossbeam::scope(|scope| {
// Listen for incoming socket connections and start a comms task for // Start a comms task for each connection.
// each new connection. for c in connections.iter() {
for stream in listener.incoming() { info!("Creating a comms task for {:?}",
match stream { c.stream.peer_addr().unwrap());
Ok(stream) => { let tx = mtx.clone();
info!("New connection from {:?}", let rx = srx.clone();
stream.peer_addr().unwrap()); scope.spawn(move || {
let tx = mtx.clone(); commst::CommsTask::new(tx, rx, &c.stream).run();
let rx = srx.clone(); });
scope.spawn(move || {
commst::CommsTask::new(tx, rx, stream).run();
});
// TODO: break when all the remotes have joined
}
Err(e) => {
warn!("Failed to connect: {}", e);
}
}
} }
// broadcast stage // broadcast stage

View File

@ -57,14 +57,14 @@ fn decode_u32_from_be(buffer: &[u8]) -> Result<u32, Error> {
Ok(result) Ok(result)
} }
pub struct Task { pub struct Task<'a> {
stream: TcpStream, stream: &'a TcpStream,
buffer: [u8; 1024], buffer: [u8; 1024],
} }
/// A message handling task. /// A message handling task.
impl Task where { impl<'a> Task<'a> where {
pub fn new(stream: TcpStream) -> Task { pub fn new(stream: &'a TcpStream) -> Task<'a> {
Task { Task {
stream, stream,
buffer: [0; 1024] buffer: [0; 1024]