From d618e904c983e1e09e1139989df99e9f01d1d045 Mon Sep 17 00:00:00 2001 From: debris Date: Fri, 14 Oct 2016 10:55:28 +0200 Subject: [PATCH] p2p reading and dispatching messages --- p2p/src/io/mod.rs | 2 + p2p/src/io/read_message_stream.rs | 64 +++++++++++++++++++++++++++++++ p2p/src/net/channel.rs | 36 +++++++++++++++++ p2p/src/net/connections.rs | 8 ++-- p2p/src/net/messages.rs | 51 ++++++++++++++++++------ p2p/src/net/mod.rs | 2 + p2p/src/p2p.rs | 13 ++----- 7 files changed, 150 insertions(+), 26 deletions(-) create mode 100644 p2p/src/io/read_message_stream.rs create mode 100644 p2p/src/net/channel.rs diff --git a/p2p/src/io/mod.rs b/p2p/src/io/mod.rs index c046d20f..5bffdd4c 100644 --- a/p2p/src/io/mod.rs +++ b/p2p/src/io/mod.rs @@ -1,6 +1,7 @@ mod handshake; mod read_header; mod read_message; +mod read_message_stream; mod read_payload; mod sharedtcpstream; mod write_message; @@ -11,5 +12,6 @@ pub use self::handshake::{ pub use self::read_header::{read_header, ReadHeader}; pub use self::read_payload::{read_payload, ReadPayload}; pub use self::read_message::{read_message, ReadMessage}; +pub use self::read_message_stream::{read_message_stream, ReadMessageStream}; pub use self::sharedtcpstream::SharedTcpStream; pub use self::write_message::{write_message, WriteMessage}; diff --git a/p2p/src/io/read_message_stream.rs b/p2p/src/io/read_message_stream.rs new file mode 100644 index 00000000..57d033e6 --- /dev/null +++ b/p2p/src/io/read_message_stream.rs @@ -0,0 +1,64 @@ +use std::io; +use futures::{Future, Poll, Async}; +use futures::stream::Stream; +use tokio_core::io::{read_exact, ReadExact}; +use crypto::checksum; +use message::{Error, MessageHeader, MessageResult}; +use message::common::{Magic, Command}; +use bytes::Bytes; +use io::{read_header, ReadHeader}; + +pub fn read_message_stream(a: A, magic: Magic) -> ReadMessageStream where A: io::Read { + ReadMessageStream { + state: ReadMessageStreamState::ReadHeader(read_header(a, magic)), + magic: magic, + } +} + +pub enum ReadMessageStreamState { + ReadHeader(ReadHeader), + ReadPayload { + header: MessageHeader, + future: ReadExact + }, +} + +pub struct ReadMessageStream { + state: ReadMessageStreamState, + magic: Magic, +} + +impl Stream for ReadMessageStream where A: io::Read { + type Item = MessageResult<(Command, Bytes)>; + type Error = io::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + let (next, result) = match self.state { + ReadMessageStreamState::ReadHeader(ref mut header) => { + let (stream, header) = try_ready!(header.poll()); + let header = match header { + Ok(header) => header, + Err(err) => return Ok(Some(Err(err)).into()), + }; + let future = read_exact(stream, Bytes::new_with_len(header.len as usize)); + let next = ReadMessageStreamState::ReadPayload { + header: header, + future: future, + }; + (next, Async::NotReady) + }, + ReadMessageStreamState::ReadPayload { ref mut header, ref mut future } => { + let (stream, bytes) = try_ready!(future.poll()); + if checksum(&bytes) != header.checksum { + return Ok(Some(Err(Error::InvalidChecksum)).into()); + } + let future = read_header(stream, self.magic); + let next = ReadMessageStreamState::ReadHeader(future); + (next, Some(Ok((header.command.clone(), bytes))).into()) + }, + }; + + self.state = next; + Ok(result) + } +} diff --git a/p2p/src/net/channel.rs b/p2p/src/net/channel.rs new file mode 100644 index 00000000..60e8de2c --- /dev/null +++ b/p2p/src/net/channel.rs @@ -0,0 +1,36 @@ +use std::io; +use futures::Poll; +use futures::stream::Stream; +use parking_lot::Mutex; +use bytes::Bytes; +use message::{MessageResult, PayloadType}; +use message::common::Command; +use net::Connection; +use io::{read_message_stream, ReadMessageStream, SharedTcpStream, WriteMessage}; + +pub struct Channel { + connection: Connection, + message_stream: Mutex>, +} + +impl Channel { + pub fn new(connection: Connection) -> Self { + let stream = read_message_stream(connection.stream.clone(), connection.magic); + Channel { + connection: connection, + message_stream: Mutex::new(stream), + } + } + + pub fn write_message(&self, payload: &T) -> WriteMessage where T: PayloadType { + self.connection.write_message(payload) + } + + pub fn poll_message(&self) -> Poll)>, io::Error> { + self.message_stream.lock().poll() + } + + pub fn version(&self) -> u32 { + self.connection.version + } +} diff --git a/p2p/src/net/connections.rs b/p2p/src/net/connections.rs index 5daaf8b0..51e8cc62 100644 --- a/p2p/src/net/connections.rs +++ b/p2p/src/net/connections.rs @@ -6,12 +6,12 @@ use futures::{finished, Future}; use futures_cpupool::CpuPool; use tokio_core::reactor::Handle; use message::PayloadType; -use net::Connection; +use net::{Connection, Channel}; use PeerId; pub struct Connections { peer_counter: AtomicUsize, - channels: RwLock>>, + channels: RwLock>>, } impl Connections { @@ -45,7 +45,7 @@ impl Connections { } /// Returns safe (nonblocking) copy of channels. - pub fn channels(&self) -> HashMap> { + pub fn channels(&self) -> HashMap> { self.channels.read().clone() } @@ -57,7 +57,7 @@ impl Connections { /// Stores new channel. pub fn store(&self, connection: Connection) { let id = self.peer_counter.fetch_add(1, Ordering::AcqRel); - self.channels.write().insert(id, Arc::new(connection)); + self.channels.write().insert(id, Arc::new(Channel::new(connection))); } /// Removes channel with given id. diff --git a/p2p/src/net/messages.rs b/p2p/src/net/messages.rs index 769c8f9e..1c654f27 100644 --- a/p2p/src/net/messages.rs +++ b/p2p/src/net/messages.rs @@ -3,7 +3,6 @@ use std::sync::Weak; use bytes::Bytes; use futures::{Poll, Async}; use futures::stream::Stream; -use message::MessageResult; use message::common::Command; use net::Connections; use PeerId; @@ -13,6 +12,17 @@ pub struct MessagesHandler { connections: Weak, } +fn next_to_poll(channels: usize, last_polled: usize) -> usize { + // it's irrelevant if we sometimes poll the same peer + if channels > last_polled + 1 { + // let's poll the next peer + last_polled + 1 + } else { + // let's move to the first channel + 0 + } +} + impl MessagesHandler { pub fn new(connections: Weak) -> Self { MessagesHandler { @@ -23,7 +33,7 @@ impl MessagesHandler { } impl Stream for MessagesHandler { - type Item = (MessageResult<(Command, Bytes)>, u32, PeerId); + type Item = (Command, Bytes, u32, PeerId); type Error = io::Error; fn poll(&mut self) -> Poll, Self::Error> { @@ -38,17 +48,34 @@ impl Stream for MessagesHandler { return Ok(Async::NotReady); } - // it's irrelevant if we sometimes poll the same peer - let to_poll = if channels.len() > self.last_polled + 1 { - // let's poll the next peer - self.last_polled + 1 - } else { - // let's move to the first channel - 0 - }; + let mut to_poll = next_to_poll(channels.len(), self.last_polled); + let mut result = None; - let (id, channel) = channels.into_iter().nth(to_poll).expect("to_poll < channels.len()"); - unimplemented!(); + while result.is_none() && to_poll != self.last_polled { + let (id, channel) = channels.iter().nth(to_poll).expect("to_poll < channels.len()"); + let status = channel.poll_message(); + + match status { + Ok(Async::Ready(Some(Ok((command, message))))) => { + result = Some((command, message, channel.version(), *id)); + }, + Ok(Async::NotReady) => { + // no messages yet, try next channel + to_poll = next_to_poll(channels.len(), to_poll); + }, + _ => { + // channel has been closed or there was error + connections.remove(*id); + to_poll = next_to_poll(channels.len(), to_poll); + }, + } + } + + self.last_polled = to_poll; + match result.is_some() { + true => Ok(Async::Ready(result)), + false => Ok(Async::NotReady), + } } } diff --git a/p2p/src/net/mod.rs b/p2p/src/net/mod.rs index e3d4fa8e..beb49bff 100644 --- a/p2p/src/net/mod.rs +++ b/p2p/src/net/mod.rs @@ -1,3 +1,4 @@ +mod channel; mod config; mod connect; mod connection; @@ -6,6 +7,7 @@ mod messages; mod listen; mod subscriber; +pub use self::channel::Channel; pub use self::config::Config; pub use self::connect::{Connect, connect}; pub use self::connection::Connection; diff --git a/p2p/src/p2p.rs b/p2p/src/p2p.rs index 07a182fb..a1775cf7 100644 --- a/p2p/src/p2p.rs +++ b/p2p/src/p2p.rs @@ -78,16 +78,9 @@ impl P2P { let subscriber = self.subscriber.clone(); let connections = self.connections.clone(); let incoming_future = incoming.for_each(move |result| { - match result { - (Ok((command, payload)), version, peerid) => { - let handled = subscriber.try_handle(&payload, version, command, peerid); - if let Err(err) = handled { - connections.remove(peerid); - } - }, - (Err(err), version, peerid) => { - connections.remove(peerid); - }, + let (command, payload, version, peerid) = result; + if let Err(_err) = subscriber.try_handle(&payload, version, command, peerid) { + connections.remove(peerid); } Ok(()) }).then(|_| {