p2p reading and dispatching messages
This commit is contained in:
parent
3c5d39c37b
commit
d618e904c9
|
@ -1,6 +1,7 @@
|
|||
mod handshake;
|
||||
mod read_header;
|
||||
mod read_message;
|
||||
mod read_message_stream;
|
||||
mod read_payload;
|
||||
mod sharedtcpstream;
|
||||
mod write_message;
|
||||
|
@ -11,5 +12,6 @@ pub use self::handshake::{
|
|||
pub use self::read_header::{read_header, ReadHeader};
|
||||
pub use self::read_payload::{read_payload, ReadPayload};
|
||||
pub use self::read_message::{read_message, ReadMessage};
|
||||
pub use self::read_message_stream::{read_message_stream, ReadMessageStream};
|
||||
pub use self::sharedtcpstream::SharedTcpStream;
|
||||
pub use self::write_message::{write_message, WriteMessage};
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
use std::io;
|
||||
use futures::{Future, Poll, Async};
|
||||
use futures::stream::Stream;
|
||||
use tokio_core::io::{read_exact, ReadExact};
|
||||
use crypto::checksum;
|
||||
use message::{Error, MessageHeader, MessageResult};
|
||||
use message::common::{Magic, Command};
|
||||
use bytes::Bytes;
|
||||
use io::{read_header, ReadHeader};
|
||||
|
||||
pub fn read_message_stream<A>(a: A, magic: Magic) -> ReadMessageStream<A> where A: io::Read {
|
||||
ReadMessageStream {
|
||||
state: ReadMessageStreamState::ReadHeader(read_header(a, magic)),
|
||||
magic: magic,
|
||||
}
|
||||
}
|
||||
|
||||
pub enum ReadMessageStreamState<A> {
|
||||
ReadHeader(ReadHeader<A>),
|
||||
ReadPayload {
|
||||
header: MessageHeader,
|
||||
future: ReadExact<A, Bytes>
|
||||
},
|
||||
}
|
||||
|
||||
pub struct ReadMessageStream<A> {
|
||||
state: ReadMessageStreamState<A>,
|
||||
magic: Magic,
|
||||
}
|
||||
|
||||
impl<A> Stream for ReadMessageStream<A> where A: io::Read {
|
||||
type Item = MessageResult<(Command, Bytes)>;
|
||||
type Error = io::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
let (next, result) = match self.state {
|
||||
ReadMessageStreamState::ReadHeader(ref mut header) => {
|
||||
let (stream, header) = try_ready!(header.poll());
|
||||
let header = match header {
|
||||
Ok(header) => header,
|
||||
Err(err) => return Ok(Some(Err(err)).into()),
|
||||
};
|
||||
let future = read_exact(stream, Bytes::new_with_len(header.len as usize));
|
||||
let next = ReadMessageStreamState::ReadPayload {
|
||||
header: header,
|
||||
future: future,
|
||||
};
|
||||
(next, Async::NotReady)
|
||||
},
|
||||
ReadMessageStreamState::ReadPayload { ref mut header, ref mut future } => {
|
||||
let (stream, bytes) = try_ready!(future.poll());
|
||||
if checksum(&bytes) != header.checksum {
|
||||
return Ok(Some(Err(Error::InvalidChecksum)).into());
|
||||
}
|
||||
let future = read_header(stream, self.magic);
|
||||
let next = ReadMessageStreamState::ReadHeader(future);
|
||||
(next, Some(Ok((header.command.clone(), bytes))).into())
|
||||
},
|
||||
};
|
||||
|
||||
self.state = next;
|
||||
Ok(result)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
use std::io;
|
||||
use futures::Poll;
|
||||
use futures::stream::Stream;
|
||||
use parking_lot::Mutex;
|
||||
use bytes::Bytes;
|
||||
use message::{MessageResult, PayloadType};
|
||||
use message::common::Command;
|
||||
use net::Connection;
|
||||
use io::{read_message_stream, ReadMessageStream, SharedTcpStream, WriteMessage};
|
||||
|
||||
pub struct Channel {
|
||||
connection: Connection,
|
||||
message_stream: Mutex<ReadMessageStream<SharedTcpStream>>,
|
||||
}
|
||||
|
||||
impl Channel {
|
||||
pub fn new(connection: Connection) -> Self {
|
||||
let stream = read_message_stream(connection.stream.clone(), connection.magic);
|
||||
Channel {
|
||||
connection: connection,
|
||||
message_stream: Mutex::new(stream),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn write_message<T>(&self, payload: &T) -> WriteMessage<T, SharedTcpStream> where T: PayloadType {
|
||||
self.connection.write_message(payload)
|
||||
}
|
||||
|
||||
pub fn poll_message(&self) -> Poll<Option<(MessageResult<(Command, Bytes)>)>, io::Error> {
|
||||
self.message_stream.lock().poll()
|
||||
}
|
||||
|
||||
pub fn version(&self) -> u32 {
|
||||
self.connection.version
|
||||
}
|
||||
}
|
|
@ -6,12 +6,12 @@ use futures::{finished, Future};
|
|||
use futures_cpupool::CpuPool;
|
||||
use tokio_core::reactor::Handle;
|
||||
use message::PayloadType;
|
||||
use net::Connection;
|
||||
use net::{Connection, Channel};
|
||||
use PeerId;
|
||||
|
||||
pub struct Connections {
|
||||
peer_counter: AtomicUsize,
|
||||
channels: RwLock<HashMap<PeerId, Arc<Connection>>>,
|
||||
channels: RwLock<HashMap<PeerId, Arc<Channel>>>,
|
||||
}
|
||||
|
||||
impl Connections {
|
||||
|
@ -45,7 +45,7 @@ impl Connections {
|
|||
}
|
||||
|
||||
/// Returns safe (nonblocking) copy of channels.
|
||||
pub fn channels(&self) -> HashMap<PeerId, Arc<Connection>> {
|
||||
pub fn channels(&self) -> HashMap<PeerId, Arc<Channel>> {
|
||||
self.channels.read().clone()
|
||||
}
|
||||
|
||||
|
@ -57,7 +57,7 @@ impl Connections {
|
|||
/// Stores new channel.
|
||||
pub fn store(&self, connection: Connection) {
|
||||
let id = self.peer_counter.fetch_add(1, Ordering::AcqRel);
|
||||
self.channels.write().insert(id, Arc::new(connection));
|
||||
self.channels.write().insert(id, Arc::new(Channel::new(connection)));
|
||||
}
|
||||
|
||||
/// Removes channel with given id.
|
||||
|
|
|
@ -3,7 +3,6 @@ use std::sync::Weak;
|
|||
use bytes::Bytes;
|
||||
use futures::{Poll, Async};
|
||||
use futures::stream::Stream;
|
||||
use message::MessageResult;
|
||||
use message::common::Command;
|
||||
use net::Connections;
|
||||
use PeerId;
|
||||
|
@ -13,6 +12,17 @@ pub struct MessagesHandler {
|
|||
connections: Weak<Connections>,
|
||||
}
|
||||
|
||||
fn next_to_poll(channels: usize, last_polled: usize) -> usize {
|
||||
// it's irrelevant if we sometimes poll the same peer
|
||||
if channels > last_polled + 1 {
|
||||
// let's poll the next peer
|
||||
last_polled + 1
|
||||
} else {
|
||||
// let's move to the first channel
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
impl MessagesHandler {
|
||||
pub fn new(connections: Weak<Connections>) -> Self {
|
||||
MessagesHandler {
|
||||
|
@ -23,7 +33,7 @@ impl MessagesHandler {
|
|||
}
|
||||
|
||||
impl Stream for MessagesHandler {
|
||||
type Item = (MessageResult<(Command, Bytes)>, u32, PeerId);
|
||||
type Item = (Command, Bytes, u32, PeerId);
|
||||
type Error = io::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
|
@ -38,17 +48,34 @@ impl Stream for MessagesHandler {
|
|||
return Ok(Async::NotReady);
|
||||
}
|
||||
|
||||
// it's irrelevant if we sometimes poll the same peer
|
||||
let to_poll = if channels.len() > self.last_polled + 1 {
|
||||
// let's poll the next peer
|
||||
self.last_polled + 1
|
||||
} else {
|
||||
// let's move to the first channel
|
||||
0
|
||||
};
|
||||
let mut to_poll = next_to_poll(channels.len(), self.last_polled);
|
||||
let mut result = None;
|
||||
|
||||
let (id, channel) = channels.into_iter().nth(to_poll).expect("to_poll < channels.len()");
|
||||
unimplemented!();
|
||||
while result.is_none() && to_poll != self.last_polled {
|
||||
let (id, channel) = channels.iter().nth(to_poll).expect("to_poll < channels.len()");
|
||||
let status = channel.poll_message();
|
||||
|
||||
match status {
|
||||
Ok(Async::Ready(Some(Ok((command, message))))) => {
|
||||
result = Some((command, message, channel.version(), *id));
|
||||
},
|
||||
Ok(Async::NotReady) => {
|
||||
// no messages yet, try next channel
|
||||
to_poll = next_to_poll(channels.len(), to_poll);
|
||||
},
|
||||
_ => {
|
||||
// channel has been closed or there was error
|
||||
connections.remove(*id);
|
||||
to_poll = next_to_poll(channels.len(), to_poll);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
self.last_polled = to_poll;
|
||||
match result.is_some() {
|
||||
true => Ok(Async::Ready(result)),
|
||||
false => Ok(Async::NotReady),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
mod channel;
|
||||
mod config;
|
||||
mod connect;
|
||||
mod connection;
|
||||
|
@ -6,6 +7,7 @@ mod messages;
|
|||
mod listen;
|
||||
mod subscriber;
|
||||
|
||||
pub use self::channel::Channel;
|
||||
pub use self::config::Config;
|
||||
pub use self::connect::{Connect, connect};
|
||||
pub use self::connection::Connection;
|
||||
|
|
|
@ -78,16 +78,9 @@ impl P2P {
|
|||
let subscriber = self.subscriber.clone();
|
||||
let connections = self.connections.clone();
|
||||
let incoming_future = incoming.for_each(move |result| {
|
||||
match result {
|
||||
(Ok((command, payload)), version, peerid) => {
|
||||
let handled = subscriber.try_handle(&payload, version, command, peerid);
|
||||
if let Err(err) = handled {
|
||||
connections.remove(peerid);
|
||||
}
|
||||
},
|
||||
(Err(err), version, peerid) => {
|
||||
connections.remove(peerid);
|
||||
},
|
||||
let (command, payload, version, peerid) = result;
|
||||
if let Err(_err) = subscriber.try_handle(&payload, version, command, peerid) {
|
||||
connections.remove(peerid);
|
||||
}
|
||||
Ok(())
|
||||
}).then(|_| {
|
||||
|
|
Loading…
Reference in New Issue