From 078a71ba1505a69cadbade7cff19b153cb4a809d Mon Sep 17 00:00:00 2001 From: debris Date: Wed, 9 Nov 2016 21:42:17 +0100 Subject: [PATCH 1/2] simplified threshold and configurable synchronizers --- p2p/src/util/synchronizer.rs | 78 ++++++++++++++---------------------- 1 file changed, 29 insertions(+), 49 deletions(-) diff --git a/p2p/src/util/synchronizer.rs b/p2p/src/util/synchronizer.rs index df9b85f5..f4e78671 100644 --- a/p2p/src/util/synchronizer.rs +++ b/p2p/src/util/synchronizer.rs @@ -66,23 +66,19 @@ impl Synchronizer for NoopSynchronizer { #[derive(Debug)] struct ThresholdSynchronizer { inner: FifoSynchronizer, - start_id: u32, - threshold: u32, + to_grant_min: u32, + to_grant_max: u32, } impl ThresholdSynchronizer { - fn new(next_to_grant: u32, threshold: u32) -> Self { - // let's mark all ids in threshold as declared - // this may cause some ids, to be skipped, but we don't care - // it won't affect correct execution of the program - let declared = next_to_grant + threshold; + fn new(declared: u32, threshold: u32) -> Self { ThresholdSynchronizer { inner: FifoSynchronizer { declared_responses: declared, next_to_grant: declared, }, - start_id: next_to_grant, - threshold: threshold, + to_grant_min: declared.overflowing_sub(threshold).0, + to_grant_max: declared, } } } @@ -97,14 +93,19 @@ impl Synchronizer for ThresholdSynchronizer { return true; } - id.overflowing_sub(self.start_id).0 < self.threshold || - self.start_id.overflowing_sub(id).0 < self.threshold + if self.to_grant_min <= self.to_grant_max { + // if max is bigger then min, id must be in range [min, max) + self.to_grant_min <= id && id < self.to_grant_max + } else { + // otherwise if is in range [min, u32::max_value()] || [0, max) + (self.to_grant_min <= id && id <= u32::max_value()) || + id < self.to_grant_max + } } } #[derive(Debug)] enum InnerSynchronizer { - Fifo(FifoSynchronizer), Noop(NoopSynchronizer), Threshold(ThresholdSynchronizer), } @@ -112,7 +113,7 @@ enum InnerSynchronizer { impl InnerSynchronizer { pub fn new(sync: bool) -> Self { if sync { - InnerSynchronizer::Fifo(FifoSynchronizer::default()) + InnerSynchronizer::Threshold(ThresholdSynchronizer::new(0, 0)) } else { InnerSynchronizer::Noop(NoopSynchronizer::default()) } @@ -123,8 +124,6 @@ impl InnerSynchronizer { pub struct ConfigurableSynchronizer { /// Inner synchronizer which is currently used inner: InnerSynchronizer, - /// Id of next response which is likely to be granted permission. - probably_next_to_grant: u32, } impl Default for ConfigurableSynchronizer { @@ -137,7 +136,6 @@ impl ConfigurableSynchronizer { pub fn new(sync: bool) -> Self { ConfigurableSynchronizer { inner: InnerSynchronizer::new(sync), - probably_next_to_grant: 0, } } @@ -145,22 +143,15 @@ impl ConfigurableSynchronizer { /// from last_processed response will still be granted permissions. pub fn change_sync_policy(&mut self, sync: bool) { let new_inner = match self.inner { - InnerSynchronizer::Fifo(ref s) if sync == false => { - self.probably_next_to_grant = s.next_to_grant; - InnerSynchronizer::Noop(NoopSynchronizer { - declared_responses: s.declared_responses, - }) - }, InnerSynchronizer::Threshold(ref s) if sync == false => { - self.probably_next_to_grant = s.inner.next_to_grant; InnerSynchronizer::Noop(NoopSynchronizer { declared_responses: s.inner.declared_responses, }) }, - InnerSynchronizer::Noop(_) if sync == true => { + InnerSynchronizer::Noop(ref s) if sync == true => { let threshold = ThresholdSynchronizer::new( - self.probably_next_to_grant, - CONFIGURABLE_SYNCHRONIZER_THRESHOLD + s.declared_responses, + CONFIGURABLE_SYNCHRONIZER_THRESHOLD, ); InnerSynchronizer::Threshold(threshold) }, @@ -174,7 +165,6 @@ impl ConfigurableSynchronizer { impl Synchronizer for ConfigurableSynchronizer { fn declare_response(&mut self) -> u32 { match self.inner { - InnerSynchronizer::Fifo(ref mut s) => s.declare_response(), InnerSynchronizer::Noop(ref mut s) => s.declare_response(), InnerSynchronizer::Threshold(ref mut s) => s.declare_response(), } @@ -182,12 +172,8 @@ impl Synchronizer for ConfigurableSynchronizer { fn permission_for_response(&mut self, id: u32) -> bool { match self.inner { - InnerSynchronizer::Fifo(ref mut s) => s.permission_for_response(id), InnerSynchronizer::Threshold(ref mut s) => s.permission_for_response(id), - InnerSynchronizer::Noop(ref mut s) => { - self.probably_next_to_grant = id.overflowing_add(1).0; - s.permission_for_response(id) - }, + InnerSynchronizer::Noop(ref mut s) => s.permission_for_response(id), } } } @@ -195,7 +181,7 @@ impl Synchronizer for ConfigurableSynchronizer { #[cfg(test)] mod tests { use super::{ - Synchronizer, FifoSynchronizer, NoopSynchronizer, ConfigurableSynchronizer, ThresholdSynchronizer, CONFIGURABLE_SYNCHRONIZER_THRESHOLD + Synchronizer, FifoSynchronizer, NoopSynchronizer, ConfigurableSynchronizer, ThresholdSynchronizer }; #[test] @@ -235,12 +221,10 @@ mod tests { assert!(!s.permission_for_response(id2)); assert!(s.permission_for_response(id1)); assert!(s.permission_for_response(id2)); - // historic permissions - assert!(!s.permission_for_response(0)); + // historic permissions, order does not matter assert!(s.permission_for_response(1)); - assert!(s.permission_for_response(2)); - assert!(s.permission_for_response(3)); - assert!(!s.permission_for_response(4)); + assert!(s.permission_for_response(0)); + assert!(!s.permission_for_response(2)); } #[test] @@ -268,11 +252,13 @@ mod tests { assert!(s.permission_for_response(id2)); assert!(s.permission_for_response(id0)); + + let d0 = s.declare_response(); + let d1 = s.declare_response(); + // process messages synchronously again s.change_sync_policy(true); - let last_async = id2; - // let's check again if we can process them only synchronously let id0 = s.declare_response(); let id1 = s.declare_response(); @@ -284,14 +270,8 @@ mod tests { assert!(s.permission_for_response(id1)); assert!(s.permission_for_response(id2)); - - // there might be ~10 unhandled messages, - // let's check if we can process them out of order (eg. in reverse) - for i in (0..CONFIGURABLE_SYNCHRONIZER_THRESHOLD - 1).into_iter().rev() { - assert!(s.permission_for_response(last_async + i)); - } - - // the next one should fail - assert!(!s.permission_for_response(last_async + CONFIGURABLE_SYNCHRONIZER_THRESHOLD)); + // order of requests before changing to policy to sync should not matter + assert!(s.permission_for_response(d1)); + assert!(s.permission_for_response(d0)); } } From 6b389292221538f222df96ecba9a1ea744b5029d Mon Sep 17 00:00:00 2001 From: debris Date: Wed, 9 Nov 2016 22:36:29 +0100 Subject: [PATCH 2/2] cleanup session init and on_message --- message/src/common/magic.rs | 2 +- p2p/src/net/channel.rs | 30 +++++++------------ p2p/src/net/connections.rs | 15 ++++++++-- p2p/src/p2p.rs | 6 ++-- p2p/src/protocol/addr.rs | 37 ++++++++++++----------- p2p/src/protocol/mod.rs | 5 ++-- p2p/src/protocol/ping.rs | 58 +++++++++++++++++++++++-------------- p2p/src/protocol/sync.rs | 53 ++++++++++++++++----------------- p2p/src/session.rs | 29 +++++++++---------- p2p/src/util/peer.rs | 5 +++- 10 files changed, 128 insertions(+), 112 deletions(-) diff --git a/message/src/common/magic.rs b/message/src/common/magic.rs index fdf7228d..c600da2d 100644 --- a/message/src/common/magic.rs +++ b/message/src/common/magic.rs @@ -11,7 +11,7 @@ const MAGIC_REGTEST: u32 = 0xDAB5BFFA; /// Bitcoin network /// https://bitcoin.org/en/glossary/mainnet -#[derive(Debug, PartialEq, Clone, Copy)] +#[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum Magic { /// The original and main network for Bitcoin transactions, where satoshis have real economic value. Mainnet, diff --git a/p2p/src/net/channel.rs b/p2p/src/net/channel.rs index ef473d7e..792e5351 100644 --- a/p2p/src/net/channel.rs +++ b/p2p/src/net/channel.rs @@ -1,41 +1,31 @@ -use message::{Payload, Magic, Message}; -use net::Connection; +use message::{Payload, Message}; use session::Session; use io::{SharedTcpStream, WriteMessage, write_message, read_any_message, ReadAnyMessage}; -use util::Direction; -use {PeerId, PeerInfo}; +use util::PeerInfo; pub struct Channel { - version: u32, - magic: Magic, + stream: SharedTcpStream, peer_info: PeerInfo, session: Session, - stream: SharedTcpStream, } impl Channel { - pub fn new(connection: Connection, peer_id: PeerId, session: Session, direction: Direction) -> Self { + pub fn new(stream: SharedTcpStream, peer_info: PeerInfo, session: Session) -> Self { Channel { - version: connection.version, - magic: connection.magic, - peer_info: PeerInfo { - address: connection.address, - id: peer_id, - direction: direction, - }, + stream: stream, + peer_info: peer_info, session: session, - stream: connection.stream, } } pub fn write_message(&self, payload: &T) -> WriteMessage where T: Payload { // TODO: some tracing here - let message = Message::new(self.magic, self.version, payload).expect("failed to create outgoing message"); + let message = Message::new(self.peer_info.magic, self.peer_info.version, payload).expect("failed to create outgoing message"); write_message(self.stream.clone(), message) } pub fn read_message(&self) -> ReadAnyMessage { - read_any_message(self.stream.clone(), self.magic) + read_any_message(self.stream.clone(), self.peer_info.magic) } pub fn shutdown(&self) { @@ -43,11 +33,11 @@ impl Channel { } pub fn version(&self) -> u32 { - self.version + self.peer_info.version } pub fn peer_info(&self) -> PeerInfo { - self.peer_info + self.peer_info.clone() } pub fn session(&self) -> &Session { diff --git a/p2p/src/net/connections.rs b/p2p/src/net/connections.rs index 890051ca..06ae7c85 100644 --- a/p2p/src/net/connections.rs +++ b/p2p/src/net/connections.rs @@ -6,7 +6,7 @@ use parking_lot::RwLock; use net::{Connection, Channel}; use p2p::Context; use session::{SessionFactory}; -use util::Direction; +use util::{Direction, PeerInfo}; use PeerId; #[derive(Default)] @@ -42,8 +42,17 @@ impl Connections { /// Returnes a shared pointer to it. pub fn store(&self, context: Arc, connection: Connection, direction: Direction) -> Arc where T: SessionFactory { let id = self.peer_counter.fetch_add(1, Ordering::AcqRel); - let session = T::new_session(context, id); - let channel = Arc::new(Channel::new(connection, id, session, direction)); + + let peer_info = PeerInfo { + id: id, + address: connection.address, + direction: direction, + version: connection.version, + magic: connection.magic, + }; + + let session = T::new_session(context, peer_info.clone()); + let channel = Arc::new(Channel::new(connection.stream, peer_info, session)); self.channels.write().insert(id, channel.clone()); channel } diff --git a/p2p/src/p2p.rs b/p2p/src/p2p.rs index fbe92d9a..aa34e47f 100644 --- a/p2p/src/p2p.rs +++ b/p2p/src/p2p.rs @@ -141,7 +141,7 @@ impl Context { let channel = context.connections.store::(context.clone(), connection, Direction::Outbound); // initialize session and then start reading messages - channel.session().initialize(channel.clone()); + channel.session().initialize(); Context::on_message(context, channel) }, Ok(DeadlineStatus::Meet(Err(_))) => { @@ -191,7 +191,7 @@ impl Context { let channel = context.connections.store::(context.clone(), connection, Direction::Inbound); // initialize session and then start reading messages - channel.session().initialize(channel.clone()); + channel.session().initialize(); Context::on_message(context.clone(), channel) }, Ok(DeadlineStatus::Meet(Err(err))) => { @@ -261,7 +261,7 @@ impl Context { // successful read trace!("Received {} message from {}", command, channel.peer_info().address); // handle message and read the next one - match channel.session().on_message(channel.clone(), command, payload) { + match channel.session().on_message(command, payload) { Ok(_) => { context.node_table.write().note_used(&channel.peer_info().address); let on_message = Context::on_message(context.clone(), channel); diff --git a/p2p/src/protocol/addr.rs b/p2p/src/protocol/addr.rs index a3161182..dcf3753d 100644 --- a/p2p/src/protocol/addr.rs +++ b/p2p/src/protocol/addr.rs @@ -5,44 +5,43 @@ use message::{Error, Command, deserialize_payload, Payload}; use message::types::{GetAddr, Addr}; use protocol::Protocol; use p2p::Context; -use util::Direction; -use PeerId; +use util::{Direction, PeerInfo}; pub struct AddrProtocol { /// Context context: Arc, - /// Connected peer id. - peer: PeerId, + /// Connected peer info. + info: PeerInfo, } impl AddrProtocol { - pub fn new(context: Arc, peer: PeerId) -> Self { + pub fn new(context: Arc, info: PeerInfo) -> Self { AddrProtocol { context: context, - peer: peer, + info: info, } } } impl Protocol for AddrProtocol { - fn initialize(&mut self, direction: Direction, _version: u32) { - if let Direction::Outbound = direction { - let send = Context::send_to_peer(self.context.clone(), self.peer, &GetAddr); + fn initialize(&mut self) { + if let Direction::Outbound = self.info.direction { + let send = Context::send_to_peer(self.context.clone(), self.info.id, &GetAddr); self.context.spawn(send); } } - fn on_message(&mut self, command: &Command, payload: &Bytes, version: u32) -> Result<(), Error> { + fn on_message(&mut self, command: &Command, payload: &Bytes) -> Result<(), Error> { // normal nodes send addr message only after they receive getaddr message // meanwhile seednodes, surprisingly, send addr message even before they are asked for it if command == &GetAddr::command() { - let _: GetAddr = try!(deserialize_payload(payload, version)); + let _: GetAddr = try!(deserialize_payload(payload, self.info.version)); let entries = self.context.node_table_entries().into_iter().map(Into::into).collect(); let addr = Addr::new(entries); - let send = Context::send_to_peer(self.context.clone(), self.peer, &addr); + let send = Context::send_to_peer(self.context.clone(), self.info.id, &addr); self.context.spawn(send); } else if command == &Addr::command() { - let addr: Addr = try!(deserialize_payload(payload, version)); + let addr: Addr = try!(deserialize_payload(payload, self.info.version)); match addr { Addr::V0(_) => { unreachable!("This version of protocol is not supported!"); @@ -60,30 +59,30 @@ impl Protocol for AddrProtocol { pub struct SeednodeProtocol { /// Context context: Arc, - /// Connected peer id. - peer: PeerId, + /// Connected peer info, + info: PeerInfo, /// Indicates if disconnecting has been scheduled. disconnecting: bool, } impl SeednodeProtocol { - pub fn new(context: Arc, peer: PeerId) -> Self { + pub fn new(context: Arc, info: PeerInfo) -> Self { SeednodeProtocol { context: context, - peer: peer, + info: info, disconnecting: false, } } } impl Protocol for SeednodeProtocol { - fn on_message(&mut self, command: &Command, _payload: &Bytes, _version: u32) -> Result<(), Error> { + fn on_message(&mut self, command: &Command, _payload: &Bytes) -> Result<(), Error> { // Seednodes send addr message more than once with different addresses. // We can't disconenct after first read. Let's delay it by 60 seconds. if !self.disconnecting && command == &Addr::command() { self.disconnecting = true; let context = self.context.clone(); - let peer = self.peer; + let peer = self.info.id; self.context.execute_after(Duration::new(60, 0), move || { context.close_channel(peer); }); diff --git a/p2p/src/protocol/mod.rs b/p2p/src/protocol/mod.rs index 6b5c7234..54588f36 100644 --- a/p2p/src/protocol/mod.rs +++ b/p2p/src/protocol/mod.rs @@ -6,17 +6,16 @@ use bytes::Bytes; use message::Error; use message::common::Command; -use util::Direction; pub use self::addr::{AddrProtocol, SeednodeProtocol}; pub use self::ping::PingProtocol; pub use self::sync::{SyncProtocol, InboundSyncConnection, InboundSyncConnectionRef, OutboundSyncConnection, OutboundSyncConnectionRef, LocalSyncNode, LocalSyncNodeRef}; pub trait Protocol: Send { /// Initialize the protocol. - fn initialize(&mut self, _direction: Direction, _version: u32) {} + fn initialize(&mut self) {} /// Handle the message. - fn on_message(&mut self, command: &Command, payload: &Bytes, version: u32) -> Result<(), Error>; + fn on_message(&mut self, command: &Command, payload: &Bytes) -> Result<(), Error>; /// On disconnect. fn on_close(&mut self) {} diff --git a/p2p/src/protocol/ping.rs b/p2p/src/protocol/ping.rs index 3ec6b56e..7b50fc49 100644 --- a/p2p/src/protocol/ping.rs +++ b/p2p/src/protocol/ping.rs @@ -4,10 +4,9 @@ use message::{Error, Payload, deserialize_payload}; use message::types::{Ping, Pong}; use message::common::Command; use protocol::Protocol; -use util::Direction; +use util::{PeerId, PeerInfo}; use util::nonce::{NonceGenerator, RandomNonce}; use p2p::Context; -use PeerId; pub trait PingContext: Send + Sync { fn send_to_peer(context: Arc, peer: PeerId, payload: &T) where Self: Sized, T: Payload; @@ -23,8 +22,8 @@ impl PingContext for Context { pub struct PingProtocol { /// Context context: Arc, - /// Connected peer id. - peer: PeerId, + /// Connected peer info. + info: PeerInfo, /// Nonce generator. nonce_generator: T, /// Last nonce sent in the ping message. @@ -32,10 +31,10 @@ pub struct PingProtocol { } impl PingProtocol { - pub fn new(context: Arc, peer: PeerId) -> Self { + pub fn new(context: Arc, info: PeerInfo) -> Self { PingProtocol { context: context, - peer: peer, + info: info, nonce_generator: RandomNonce::default(), last_ping_nonce: None, } @@ -43,21 +42,21 @@ impl PingProtocol { } impl Protocol for PingProtocol where T: NonceGenerator + Send, C: PingContext { - fn initialize(&mut self, _direction: Direction, _version: u32) { + fn initialize(&mut self) { // bitcoind always sends ping, let's do the same let nonce = self.nonce_generator.get(); self.last_ping_nonce = Some(nonce); let ping = Ping::new(nonce); - PingContext::send_to_peer(self.context.clone(), self.peer, &ping); + PingContext::send_to_peer(self.context.clone(), self.info.id, &ping); } - fn on_message(&mut self, command: &Command, payload: &Bytes, version: u32) -> Result<(), Error> { + fn on_message(&mut self, command: &Command, payload: &Bytes) -> Result<(), Error> { if command == &Ping::command() { - let ping: Ping = try!(deserialize_payload(payload, version)); + let ping: Ping = try!(deserialize_payload(payload, self.info.version)); let pong = Pong::new(ping.nonce); - PingContext::send_to_peer(self.context.clone(), self.peer, &pong); + PingContext::send_to_peer(self.context.clone(), self.info.id, &pong); } else if command == &Pong::command() { - let pong: Pong = try!(deserialize_payload(payload, version)); + let pong: Pong = try!(deserialize_payload(payload, self.info.version)); if Some(pong.nonce) != self.last_ping_nonce.take() { return Err(Error::InvalidCommand) } @@ -72,12 +71,11 @@ mod tests { use std::sync::Arc; use parking_lot::Mutex; use bytes::Bytes; - use message::{Payload, serialize_payload}; + use message::{Payload, serialize_payload, Magic}; use message::types::{Ping, Pong}; - use util::Direction; + use util::{PeerId, PeerInfo, Direction}; use util::nonce::StaticNonce; use protocol::Protocol; - use PeerId; use super::{PingProtocol, PingContext}; #[derive(Default)] @@ -101,12 +99,18 @@ mod tests { let expected_message = serialize_payload(&Ping::new(nonce), 0).unwrap(); let mut ping_protocol = PingProtocol { context: ping_context.clone(), - peer: peer, + info: PeerInfo { + id: peer, + address: "0.0.0.0:8080".parse().unwrap(), + direction: Direction::Inbound, + version: 0, + magic: Magic::Testnet, + }, nonce_generator: StaticNonce::new(nonce), last_ping_nonce: None, }; - ping_protocol.initialize(Direction::Inbound, 0); + ping_protocol.initialize(); let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone(); assert_eq!(messages.len(), 1); assert_eq!(messages[0].0, peer); @@ -124,12 +128,18 @@ mod tests { let expected_message = serialize_payload(&Pong::new(nonce), 0).unwrap(); let mut ping_protocol = PingProtocol { context: ping_context.clone(), - peer: peer, + info: PeerInfo { + id: peer, + address: "0.0.0.0:8080".parse().unwrap(), + direction: Direction::Inbound, + version: 0, + magic: Magic::Testnet, + }, nonce_generator: StaticNonce::new(nonce), last_ping_nonce: None, }; - assert!(ping_protocol.on_message(&command, &message, 0).is_ok()); + assert!(ping_protocol.on_message(&command, &message).is_ok()); let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone(); assert_eq!(messages.len(), 1); assert_eq!(messages[0].0, peer); @@ -146,12 +156,18 @@ mod tests { let message = serialize_payload(&Pong::new(nonce), 0).unwrap(); let mut ping_protocol = PingProtocol { context: ping_context.clone(), - peer: peer, + info: PeerInfo { + id: peer, + address: "0.0.0.0:8080".parse().unwrap(), + direction: Direction::Inbound, + version: 0, + magic: Magic::Testnet, + }, nonce_generator: StaticNonce::new(nonce), last_ping_nonce: Some(nonce), }; - assert!(ping_protocol.on_message(&command, &message, 0).is_ok()); + assert!(ping_protocol.on_message(&command, &message).is_ok()); let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone(); assert_eq!(messages.len(), 0); assert_eq!(ping_protocol.last_ping_nonce, None); diff --git a/p2p/src/protocol/sync.rs b/p2p/src/protocol/sync.rs index f3257e0f..64687d99 100644 --- a/p2p/src/protocol/sync.rs +++ b/p2p/src/protocol/sync.rs @@ -2,9 +2,8 @@ use std::sync::Arc; use bytes::Bytes; use message::{Command, Error, Payload, types, deserialize_payload}; use protocol::Protocol; -use util::Direction; +use util::{PeerInfo, PeerId}; use p2p::Context; -use PeerId; pub type InboundSyncConnectionRef = Box; pub type OutboundSyncConnectionRef = Box; @@ -165,98 +164,100 @@ impl OutboundSyncConnection for OutboundSync { pub struct SyncProtocol { inbound_connection: InboundSyncConnectionRef, + info: PeerInfo, } impl SyncProtocol { - pub fn new(context: Arc, peer: PeerId) -> Self { - let outbound_connection = OutboundSync::new(context.clone(), peer).boxed(); + pub fn new(context: Arc, info: PeerInfo) -> Self { + let outbound_connection = OutboundSync::new(context.clone(), info.id).boxed(); let inbound_connection = context.create_sync_session(0, outbound_connection); SyncProtocol { inbound_connection: inbound_connection, + info: info, } } } impl Protocol for SyncProtocol { - fn initialize(&mut self, _direction: Direction, version: u32) { - self.inbound_connection.start_sync_session(version); + fn initialize(&mut self) { + self.inbound_connection.start_sync_session(self.info.version); } - fn on_message(&mut self, command: &Command, payload: &Bytes, version: u32) -> Result<(), Error> { + fn on_message(&mut self, command: &Command, payload: &Bytes) -> Result<(), Error> { if command == &types::Inv::command() { - let message: types::Inv = try!(deserialize_payload(payload, version)); + let message: types::Inv = try!(deserialize_payload(payload, self.info.version)); self.inbound_connection.on_inventory(message); } else if command == &types::GetData::command() { - let message: types::GetData = try!(deserialize_payload(payload, version)); + let message: types::GetData = try!(deserialize_payload(payload, self.info.version)); self.inbound_connection.on_getdata(message); } else if command == &types::GetBlocks::command() { - let message: types::GetBlocks = try!(deserialize_payload(payload, version)); + let message: types::GetBlocks = try!(deserialize_payload(payload, self.info.version)); self.inbound_connection.on_getblocks(message); } else if command == &types::GetHeaders::command() { - let message: types::GetHeaders = try!(deserialize_payload(payload, version)); + let message: types::GetHeaders = try!(deserialize_payload(payload, self.info.version)); self.inbound_connection.on_getheaders(message); } else if command == &types::Tx::command() { - let message: types::Tx = try!(deserialize_payload(payload, version)); + let message: types::Tx = try!(deserialize_payload(payload, self.info.version)); self.inbound_connection.on_transaction(message); } else if command == &types::Block::command() { - let message: types::Block = try!(deserialize_payload(payload, version)); + let message: types::Block = try!(deserialize_payload(payload, self.info.version)); self.inbound_connection.on_block(message); } else if command == &types::MemPool::command() { - let message: types::MemPool = try!(deserialize_payload(payload, version)); + let message: types::MemPool = try!(deserialize_payload(payload, self.info.version)); self.inbound_connection.on_mempool(message); } else if command == &types::Headers::command() { - let message: types::Headers = try!(deserialize_payload(payload, version)); + let message: types::Headers = try!(deserialize_payload(payload, self.info.version)); self.inbound_connection.on_headers(message); } else if command == &types::FilterLoad::command() { - let message: types::FilterLoad = try!(deserialize_payload(payload, version)); + let message: types::FilterLoad = try!(deserialize_payload(payload, self.info.version)); self.inbound_connection.on_filterload(message); } else if command == &types::FilterAdd::command() { - let message: types::FilterAdd = try!(deserialize_payload(payload, version)); + let message: types::FilterAdd = try!(deserialize_payload(payload, self.info.version)); self.inbound_connection.on_filteradd(message); } else if command == &types::FilterClear::command() { - let message: types::FilterClear = try!(deserialize_payload(payload, version)); + let message: types::FilterClear = try!(deserialize_payload(payload, self.info.version)); self.inbound_connection.on_filterclear(message); } else if command == &types::MerkleBlock::command() { - let message: types::MerkleBlock = try!(deserialize_payload(payload, version)); + let message: types::MerkleBlock = try!(deserialize_payload(payload, self.info.version)); self.inbound_connection.on_merkleblock(message); } else if command == &types::SendHeaders::command() { - let message: types::SendHeaders = try!(deserialize_payload(payload, version)); + let message: types::SendHeaders = try!(deserialize_payload(payload, self.info.version)); self.inbound_connection.on_sendheaders(message); } else if command == &types::FeeFilter::command() { - let message: types::FeeFilter = try!(deserialize_payload(payload, version)); + let message: types::FeeFilter = try!(deserialize_payload(payload, self.info.version)); self.inbound_connection.on_feefilter(message); } else if command == &types::SendCompact::command() { - let message: types::SendCompact = try!(deserialize_payload(payload, version)); + let message: types::SendCompact = try!(deserialize_payload(payload, self.info.version)); self.inbound_connection.on_send_compact(message); } else if command == &types::CompactBlock::command() { - let message: types::CompactBlock = try!(deserialize_payload(payload, version)); + let message: types::CompactBlock = try!(deserialize_payload(payload, self.info.version)); self.inbound_connection.on_compact_block(message); } else if command == &types::GetBlockTxn::command() { - let message: types::GetBlockTxn = try!(deserialize_payload(payload, version)); + let message: types::GetBlockTxn = try!(deserialize_payload(payload, self.info.version)); self.inbound_connection.on_get_block_txn(message); } else if command == &types::BlockTxn::command() { - let message: types::BlockTxn = try!(deserialize_payload(payload, version)); + let message: types::BlockTxn = try!(deserialize_payload(payload, self.info.version)); self.inbound_connection.on_block_txn(message); } else if command == &types::NotFound::command() { - let message: types::NotFound = try!(deserialize_payload(payload, version)); + let message: types::NotFound = try!(deserialize_payload(payload, self.info.version)); self.inbound_connection.on_notfound(message); } Ok(()) diff --git a/p2p/src/session.rs b/p2p/src/session.rs index 83c71240..dc960d1a 100644 --- a/p2p/src/session.rs +++ b/p2p/src/session.rs @@ -3,21 +3,20 @@ use parking_lot::Mutex; use bytes::Bytes; use message::{Command, Error}; use p2p::Context; -use net::Channel; use protocol::{Protocol, PingProtocol, SyncProtocol, AddrProtocol, SeednodeProtocol}; -use PeerId; +use util::{PeerInfo}; pub trait SessionFactory { - fn new_session(context: Arc, peer: PeerId) -> Session; + fn new_session(context: Arc, info: PeerInfo) -> Session; } pub struct SeednodeSessionFactory; impl SessionFactory for SeednodeSessionFactory { - fn new_session(context: Arc, peer: PeerId) -> Session { - let ping = PingProtocol::new(context.clone(), peer).boxed(); - let addr = AddrProtocol::new(context.clone(), peer).boxed(); - let seed = SeednodeProtocol::new(context.clone(), peer).boxed(); + fn new_session(context: Arc, info: PeerInfo) -> Session { + let ping = PingProtocol::new(context.clone(), info.clone()).boxed(); + let addr = AddrProtocol::new(context.clone(), info.clone()).boxed(); + let seed = SeednodeProtocol::new(context.clone(), info).boxed(); Session::new(vec![ping, addr, seed]) } } @@ -25,10 +24,10 @@ impl SessionFactory for SeednodeSessionFactory { pub struct NormalSessionFactory; impl SessionFactory for NormalSessionFactory { - fn new_session(context: Arc, peer: PeerId) -> Session { - let ping = PingProtocol::new(context.clone(), peer).boxed(); - let addr = AddrProtocol::new(context.clone(), peer).boxed(); - let sync = SyncProtocol::new(context, peer).boxed(); + fn new_session(context: Arc, info: PeerInfo) -> Session { + let ping = PingProtocol::new(context.clone(), info.clone()).boxed(); + let addr = AddrProtocol::new(context.clone(), info.clone()).boxed(); + let sync = SyncProtocol::new(context, info).boxed(); Session::new(vec![ping, addr, sync]) } } @@ -44,17 +43,17 @@ impl Session { } } - pub fn initialize(&self, channel: Arc) { + pub fn initialize(&self) { for protocol in self.protocols.lock().iter_mut() { - protocol.initialize(channel.peer_info().direction, channel.version()); + protocol.initialize(); } } - pub fn on_message(&self, channel: Arc, command: Command, payload: Bytes) -> Result<(), Error> { + pub fn on_message(&self, command: Command, payload: Bytes) -> Result<(), Error> { self.protocols.lock() .iter_mut() .map(|protocol| { - protocol.on_message(&command, &payload, channel.version()) + protocol.on_message(&command, &payload) }) .collect::, Error>>() .map(|_| ()) diff --git a/p2p/src/util/peer.rs b/p2p/src/util/peer.rs index 70550784..f19d746b 100644 --- a/p2p/src/util/peer.rs +++ b/p2p/src/util/peer.rs @@ -1,4 +1,5 @@ use std::net::SocketAddr; +use message::Magic; pub type PeerId = usize; @@ -8,10 +9,12 @@ pub enum Direction { Outbound, } -#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[derive(Debug, PartialEq, Eq, Clone)] pub struct PeerInfo { pub id: PeerId, pub address: SocketAddr, pub direction: Direction, + pub version: u32, + pub magic: Magic, }