diff --git a/message/src/message/message.rs b/message/src/message/message.rs index 41a40ee0..8c740ee4 100644 --- a/message/src/message/message.rs +++ b/message/src/message/message.rs @@ -33,3 +33,9 @@ impl AsRef<[u8]> for Message { self.bytes.as_ref() } } + +impl From> for Bytes { + fn from(m: Message) -> Self { + m.bytes.into_raw() + } +} diff --git a/p2p/src/net/channel.rs b/p2p/src/net/channel.rs index 792e5351..fb255f55 100644 --- a/p2p/src/net/channel.rs +++ b/p2p/src/net/channel.rs @@ -1,6 +1,6 @@ -use message::{Payload, Message}; +use tokio_core::io::{write_all, WriteAll}; use session::Session; -use io::{SharedTcpStream, WriteMessage, write_message, read_any_message, ReadAnyMessage}; +use io::{SharedTcpStream, read_any_message, ReadAnyMessage}; use util::PeerInfo; pub struct Channel { @@ -18,10 +18,8 @@ impl Channel { } } - pub fn write_message(&self, payload: &T) -> WriteMessage where T: Payload { - // TODO: some tracing here - let message = Message::new(self.peer_info.magic, self.peer_info.version, payload).expect("failed to create outgoing message"); - write_message(self.stream.clone(), message) + pub fn write_message(&self, message: T) -> WriteAll where T: AsRef<[u8]> { + write_all(self.stream.clone(), message) } pub fn read_message(&self) -> ReadAnyMessage { diff --git a/p2p/src/net/connections.rs b/p2p/src/net/connections.rs index 06ae7c85..c3ed4c17 100644 --- a/p2p/src/net/connections.rs +++ b/p2p/src/net/connections.rs @@ -9,6 +9,8 @@ use session::{SessionFactory}; use util::{Direction, PeerInfo}; use PeerId; +const SYNCHRONOUS_RESPONSES: bool = true; + #[derive(Default)] pub struct Connections { /// Incremental peer counter. @@ -51,7 +53,7 @@ impl Connections { magic: connection.magic, }; - let session = T::new_session(context, peer_info.clone()); + let session = T::new_session(context, peer_info.clone(), SYNCHRONOUS_RESPONSES); let channel = Arc::new(Channel::new(connection.stream, peer_info, session)); self.channels.write().insert(id, channel.clone()); channel diff --git a/p2p/src/net/mod.rs b/p2p/src/net/mod.rs index ab8855b0..196364e4 100644 --- a/p2p/src/net/mod.rs +++ b/p2p/src/net/mod.rs @@ -5,6 +5,7 @@ mod connect; mod connection; mod connection_counter; mod connections; +mod peer_context; pub use self::accept_connection::{AcceptConnection, accept_connection}; pub use self::channel::Channel; @@ -13,3 +14,4 @@ pub use self::connect::{Connect, connect}; pub use self::connection::Connection; pub use self::connection_counter::ConnectionCounter; pub use self::connections::Connections; +pub use self::peer_context::PeerContext; diff --git a/p2p/src/net/peer_context.rs b/p2p/src/net/peer_context.rs new file mode 100644 index 00000000..05e6f87f --- /dev/null +++ b/p2p/src/net/peer_context.rs @@ -0,0 +1,116 @@ +use std::sync::Arc; +use parking_lot::Mutex; +use message::{Payload, Message}; +use p2p::Context; +use util::{PeerInfo, ConfigurableSynchronizer, ResponseQueue, Synchronizer, Responses}; + +pub struct PeerContext { + context: Arc, + info: PeerInfo, + synchronizer: Mutex, + response_queue: Mutex, +} + +impl PeerContext { + pub fn new(context: Arc, info: PeerInfo, synchronous: bool) -> Self { + PeerContext { + context: context, + info: info, + synchronizer: Mutex::new(ConfigurableSynchronizer::new(synchronous)), + response_queue: Mutex::default(), + } + } + + fn to_message(&self, payload: &T) -> Message where T: Payload { + Message::new(self.info.magic, self.info.version, payload).expect("failed to create outgoing message") + } + + fn send_awaiting(&self, sync: &mut ConfigurableSynchronizer, queue: &mut ResponseQueue, start_id: u32) { + let mut next_id = start_id; + loop { + next_id = next_id.overflowing_add(1).0; + match queue.responses(next_id) { + Some(Responses::Finished(messages)) => { + assert!(sync.permission_for_response(next_id)); + for message in messages { + let send = Context::send_message_to_peer(self.context.clone(), self.info.id, message); + self.context.spawn(send); + } + }, + Some(Responses::Ignored) => { + assert!(sync.permission_for_response(next_id)); + }, + Some(Responses::Unfinished(messages)) => { + assert!(sync.is_permitted(next_id)); + for message in messages { + let send = Context::send_message_to_peer(self.context.clone(), self.info.id, message); + self.context.spawn(send); + } + break; + }, + None => { + break; + } + } + } + } + + /// Request is always automatically send. + pub fn send_request(&self, payload: &T) where T: Payload { + let send = Context::send_to_peer(self.context.clone(), self.info.id, payload); + self.context.spawn(send); + } + + pub fn declare_response(&self) -> u32 { + let d = self.synchronizer.lock().declare_response(); + trace!("declared response: {}", d); + d + } + + pub fn send_response_inline(&self, payload: &T) where T: Payload { + let id = self.declare_response(); + self.send_response(payload, id, true); + } + + /// Do not wait for response with given id. + pub fn ignore_response(&self, id: u32) { + let mut sync = self.synchronizer.lock(); + let mut queue = self.response_queue.lock(); + if sync.permission_for_response(id) { + self.send_awaiting(&mut sync, &mut queue, id); + } else { + queue.push_ignored_response(id); + } + } + + /// Responses are sent in order defined by synchronizer. + pub fn send_response(&self, payload: &T, id: u32, is_final: bool) where T: Payload { + trace!("response ready: {}, id: {}, final: {}", T::command(), id, is_final); + let mut sync = self.synchronizer.lock(); + let mut queue = self.response_queue.lock(); + if is_final { + if sync.permission_for_response(id) { + let send = Context::send_to_peer(self.context.clone(), self.info.id, payload); + self.context.spawn(send); + self.send_awaiting(&mut sync, &mut queue, id); + } else { + queue.push_finished_response(id, self.to_message(payload).into()); + } + } else { + if sync.is_permitted(id) { + let send = Context::send_to_peer(self.context.clone(), self.info.id, payload); + self.context.spawn(send); + } else { + queue.push_unfinished_response(id, self.to_message(payload).into()); + } + } + } + + pub fn info(&self) -> &PeerInfo { + &self.info + } + + pub fn global(&self) -> &Arc { + &self.context + } +} diff --git a/p2p/src/p2p.rs b/p2p/src/p2p.rs index aa34e47f..02abf779 100644 --- a/p2p/src/p2p.rs +++ b/p2p/src/p2p.rs @@ -9,7 +9,7 @@ use tokio_core::net::{TcpListener, TcpStream}; use tokio_core::reactor::{Handle, Remote, Timeout, Interval}; use abstract_ns::Resolver; use ns_dns_tokio::DnsResolver; -use message::{Payload, MessageResult}; +use message::{Payload, MessageResult, Message}; use message::common::Services; use net::{connect, Connections, Channel, Config as NetConfig, accept_connection, ConnectionCounter}; use util::{NodeTable, Node, Direction}; @@ -293,7 +293,22 @@ impl Context { /// Send message to a channel with given peer id. pub fn send_to_peer(context: Arc, peer: PeerId, payload: &T) -> IoFuture<()> where T: Payload { match context.connections.channel(peer) { - Some(channel) => Context::send(context, channel, payload), + Some(channel) => { + let info = channel.peer_info(); + let message = Message::new(info.magic, info.version, payload).expect("failed to create outgoing message"); + Context::send(context, channel, message) + }, + None => { + // peer no longer exists. + // TODO: should we return error here? + finished(()).boxed() + } + } + } + + pub fn send_message_to_peer(context: Arc, peer: PeerId, message: T) -> IoFuture<()> where T: AsRef<[u8]> + Send + 'static { + match context.connections.channel(peer) { + Some(channel) => Context::send(context, channel, message), None => { // peer no longer exists. // TODO: should we return error here? @@ -303,13 +318,13 @@ impl Context { } /// Send message using given channel. - pub fn send(_context: Arc, channel: Arc, payload: &T) -> IoFuture<()> where T: Payload { - trace!("Sending {} message to {}", T::command(), channel.peer_info().address); - channel.write_message(payload).then(move |result| { + pub fn send(_context: Arc, channel: Arc, message: T) -> IoFuture<()> where T: AsRef<[u8]> + Send + 'static { + //trace!("Sending {} message to {}", T::command(), channel.peer_info().address); + channel.write_message(message).then(move |result| { match result { Ok(_) => { // successful send - trace!("Sent {} message to {}", T::command(), channel.peer_info().address); + //trace!("Sent {} message to {}", T::command(), channel.peer_info().address); finished(()).boxed() }, Err(err) => { diff --git a/p2p/src/protocol/addr.rs b/p2p/src/protocol/addr.rs index dcf3753d..05f8ee79 100644 --- a/p2p/src/protocol/addr.rs +++ b/p2p/src/protocol/addr.rs @@ -4,30 +4,26 @@ use bytes::Bytes; use message::{Error, Command, deserialize_payload, Payload}; use message::types::{GetAddr, Addr}; use protocol::Protocol; -use p2p::Context; -use util::{Direction, PeerInfo}; +use net::PeerContext; +use util::Direction; pub struct AddrProtocol { /// Context - context: Arc, - /// Connected peer info. - info: PeerInfo, + context: Arc, } impl AddrProtocol { - pub fn new(context: Arc, info: PeerInfo) -> Self { + pub fn new(context: Arc) -> Self { AddrProtocol { context: context, - info: info, } } } impl Protocol for AddrProtocol { fn initialize(&mut self) { - if let Direction::Outbound = self.info.direction { - let send = Context::send_to_peer(self.context.clone(), self.info.id, &GetAddr); - self.context.spawn(send); + if let Direction::Outbound = self.context.info().direction { + self.context.send_request(&GetAddr); } } @@ -35,20 +31,19 @@ impl Protocol for AddrProtocol { // normal nodes send addr message only after they receive getaddr message // meanwhile seednodes, surprisingly, send addr message even before they are asked for it if command == &GetAddr::command() { - let _: GetAddr = try!(deserialize_payload(payload, self.info.version)); - let entries = self.context.node_table_entries().into_iter().map(Into::into).collect(); + let _: GetAddr = try!(deserialize_payload(payload, self.context.info().version)); + let entries = self.context.global().node_table_entries().into_iter().map(Into::into).collect(); let addr = Addr::new(entries); - let send = Context::send_to_peer(self.context.clone(), self.info.id, &addr); - self.context.spawn(send); + self.context.send_response_inline(&addr); } else if command == &Addr::command() { - let addr: Addr = try!(deserialize_payload(payload, self.info.version)); + let addr: Addr = try!(deserialize_payload(payload, self.context.info().version)); match addr { Addr::V0(_) => { unreachable!("This version of protocol is not supported!"); }, Addr::V31402(addr) => { let nodes = addr.addresses.into_iter().map(Into::into).collect(); - self.context.update_node_table(nodes); + self.context.global().update_node_table(nodes); }, } } @@ -58,18 +53,15 @@ impl Protocol for AddrProtocol { pub struct SeednodeProtocol { /// Context - context: Arc, - /// Connected peer info, - info: PeerInfo, + context: Arc, /// Indicates if disconnecting has been scheduled. disconnecting: bool, } impl SeednodeProtocol { - pub fn new(context: Arc, info: PeerInfo) -> Self { + pub fn new(context: Arc) -> Self { SeednodeProtocol { context: context, - info: info, disconnecting: false, } } @@ -81,9 +73,9 @@ impl Protocol for SeednodeProtocol { // We can't disconenct after first read. Let's delay it by 60 seconds. if !self.disconnecting && command == &Addr::command() { self.disconnecting = true; - let context = self.context.clone(); - let peer = self.info.id; - self.context.execute_after(Duration::new(60, 0), move || { + let context = self.context.global().clone(); + let peer = self.context.info().id; + self.context.global().execute_after(Duration::new(60, 0), move || { context.close_channel(peer); }); } diff --git a/p2p/src/protocol/ping.rs b/p2p/src/protocol/ping.rs index 7b50fc49..76022bdb 100644 --- a/p2p/src/protocol/ping.rs +++ b/p2p/src/protocol/ping.rs @@ -4,26 +4,12 @@ use message::{Error, Payload, deserialize_payload}; use message::types::{Ping, Pong}; use message::common::Command; use protocol::Protocol; -use util::{PeerId, PeerInfo}; +use net::PeerContext; use util::nonce::{NonceGenerator, RandomNonce}; -use p2p::Context; -pub trait PingContext: Send + Sync { - fn send_to_peer(context: Arc, peer: PeerId, payload: &T) where Self: Sized, T: Payload; -} - -impl PingContext for Context { - fn send_to_peer(context: Arc, peer: PeerId, payload: &T) where T: Payload { - let send = Context::send_to_peer(context.clone(), peer, payload); - context.spawn(send); - } -} - -pub struct PingProtocol { +pub struct PingProtocol { /// Context context: Arc, - /// Connected peer info. - info: PeerInfo, /// Nonce generator. nonce_generator: T, /// Last nonce sent in the ping message. @@ -31,32 +17,31 @@ pub struct PingProtocol { } impl PingProtocol { - pub fn new(context: Arc, info: PeerInfo) -> Self { + pub fn new(context: Arc) -> Self { PingProtocol { context: context, - info: info, nonce_generator: RandomNonce::default(), last_ping_nonce: None, } } } -impl Protocol for PingProtocol where T: NonceGenerator + Send, C: PingContext { +impl Protocol for PingProtocol { fn initialize(&mut self) { // bitcoind always sends ping, let's do the same let nonce = self.nonce_generator.get(); self.last_ping_nonce = Some(nonce); let ping = Ping::new(nonce); - PingContext::send_to_peer(self.context.clone(), self.info.id, &ping); + self.context.send_request(&ping); } fn on_message(&mut self, command: &Command, payload: &Bytes) -> Result<(), Error> { if command == &Ping::command() { - let ping: Ping = try!(deserialize_payload(payload, self.info.version)); + let ping: Ping = try!(deserialize_payload(payload, self.context.info().version)); let pong = Pong::new(ping.nonce); - PingContext::send_to_peer(self.context.clone(), self.info.id, &pong); + self.context.send_response_inline(&pong); } else if command == &Pong::command() { - let pong: Pong = try!(deserialize_payload(payload, self.info.version)); + let pong: Pong = try!(deserialize_payload(payload, self.context.info().version)); if Some(pong.nonce) != self.last_ping_nonce.take() { return Err(Error::InvalidCommand) } @@ -65,111 +50,3 @@ impl Protocol for PingProtocol where T: NonceGenerator + Send, C: Pi Ok(()) } } - -#[cfg(test)] -mod tests { - use std::sync::Arc; - use parking_lot::Mutex; - use bytes::Bytes; - use message::{Payload, serialize_payload, Magic}; - use message::types::{Ping, Pong}; - use util::{PeerId, PeerInfo, Direction}; - use util::nonce::StaticNonce; - use protocol::Protocol; - use super::{PingProtocol, PingContext}; - - #[derive(Default)] - struct TestPingContext { - version: u32, - messages: Mutex>, - } - - impl PingContext for TestPingContext { - fn send_to_peer(context: Arc, peer: PeerId, payload: &T) where T: Payload { - let value = (peer, serialize_payload(payload, context.version).unwrap()); - context.messages.lock().push(value); - } - } - - #[test] - fn test_ping_init() { - let ping_context = Arc::new(TestPingContext::default()); - let peer = 99; - let nonce = 1000; - let expected_message = serialize_payload(&Ping::new(nonce), 0).unwrap(); - let mut ping_protocol = PingProtocol { - context: ping_context.clone(), - info: PeerInfo { - id: peer, - address: "0.0.0.0:8080".parse().unwrap(), - direction: Direction::Inbound, - version: 0, - magic: Magic::Testnet, - }, - nonce_generator: StaticNonce::new(nonce), - last_ping_nonce: None, - }; - - ping_protocol.initialize(); - let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone(); - assert_eq!(messages.len(), 1); - assert_eq!(messages[0].0, peer); - assert_eq!(messages[0].1, expected_message); - assert_eq!(ping_protocol.last_ping_nonce, Some(nonce)); - } - - #[test] - fn test_ping_on_message_ping() { - let ping_context = Arc::new(TestPingContext::default()); - let peer = 99; - let nonce = 1000; - let command = "ping".into(); - let message = serialize_payload(&Ping::new(nonce), 0).unwrap(); - let expected_message = serialize_payload(&Pong::new(nonce), 0).unwrap(); - let mut ping_protocol = PingProtocol { - context: ping_context.clone(), - info: PeerInfo { - id: peer, - address: "0.0.0.0:8080".parse().unwrap(), - direction: Direction::Inbound, - version: 0, - magic: Magic::Testnet, - }, - nonce_generator: StaticNonce::new(nonce), - last_ping_nonce: None, - }; - - assert!(ping_protocol.on_message(&command, &message).is_ok()); - let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone(); - assert_eq!(messages.len(), 1); - assert_eq!(messages[0].0, peer); - assert_eq!(messages[0].1, expected_message); - assert_eq!(ping_protocol.last_ping_nonce, None); - } - - #[test] - fn test_ping_on_message_pong() { - let ping_context = Arc::new(TestPingContext::default()); - let peer = 99; - let nonce = 1000; - let command = "pong".into(); - let message = serialize_payload(&Pong::new(nonce), 0).unwrap(); - let mut ping_protocol = PingProtocol { - context: ping_context.clone(), - info: PeerInfo { - id: peer, - address: "0.0.0.0:8080".parse().unwrap(), - direction: Direction::Inbound, - version: 0, - magic: Magic::Testnet, - }, - nonce_generator: StaticNonce::new(nonce), - last_ping_nonce: Some(nonce), - }; - - assert!(ping_protocol.on_message(&command, &message).is_ok()); - let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone(); - assert_eq!(messages.len(), 0); - assert_eq!(ping_protocol.last_ping_nonce, None); - } -} diff --git a/p2p/src/protocol/sync.rs b/p2p/src/protocol/sync.rs index 64687d99..d69fab57 100644 --- a/p2p/src/protocol/sync.rs +++ b/p2p/src/protocol/sync.rs @@ -2,8 +2,7 @@ use std::sync::Arc; use bytes::Bytes; use message::{Command, Error, Payload, types, deserialize_payload}; use protocol::Protocol; -use util::{PeerInfo, PeerId}; -use p2p::Context; +use net::PeerContext; pub type InboundSyncConnectionRef = Box; pub type OutboundSyncConnectionRef = Box; @@ -19,13 +18,13 @@ pub trait InboundSyncConnection : Send + Sync { fn start_sync_session(&self, version: u32); fn close_session(&self); fn on_inventory(&self, message: types::Inv); - fn on_getdata(&self, message: types::GetData); - fn on_getblocks(&self, message: types::GetBlocks); - fn on_getheaders(&self, message: types::GetHeaders); + fn on_getdata(&self, message: types::GetData, id: u32); + fn on_getblocks(&self, message: types::GetBlocks, id: u32); + fn on_getheaders(&self, message: types::GetHeaders, id: u32); fn on_transaction(&self, message: types::Tx); fn on_block(&self, message: types::Block); fn on_headers(&self, message: types::Headers); - fn on_mempool(&self, message: types::MemPool); + fn on_mempool(&self, message: types::MemPool, id: u32); fn on_filterload(&self, message: types::FilterLoad); fn on_filteradd(&self, message: types::FilterAdd); fn on_filterclear(&self, message: types::FilterClear); @@ -40,14 +39,14 @@ pub trait InboundSyncConnection : Send + Sync { } pub trait OutboundSyncConnection : Send + Sync { - fn send_inventory(&self, message: &types::Inv); + fn send_inventory(&self, message: &types::Inv, id: u32, is_final: bool); fn send_getdata(&self, message: &types::GetData); fn send_getblocks(&self, message: &types::GetBlocks); fn send_getheaders(&self, message: &types::GetHeaders); fn send_transaction(&self, message: &types::Tx); - fn send_block(&self, message: &types::Block); - fn send_headers(&self, message: &types::Headers); - fn send_mempool(&self, message: &types::MemPool); + fn send_block(&self, message: &types::Block, id: u32, is_final: bool); + fn send_headers(&self, message: &types::Headers, id: u32, is_final: bool); + fn send_mempool(&self, message: &types::MemPool, id: u32, is_final: bool); fn send_filterload(&self, message: &types::FilterLoad); fn send_filteradd(&self, message: &types::FilterAdd); fn send_filterclear(&self, message: &types::FilterClear); @@ -58,206 +57,213 @@ pub trait OutboundSyncConnection : Send + Sync { fn send_compact_block(&self, message: &types::CompactBlock); fn send_get_block_txn(&self, message: &types::GetBlockTxn); fn send_block_txn(&self, message: &types::BlockTxn); - fn send_notfound(&self, message: &types::NotFound); + fn send_notfound(&self, message: &types::NotFound, id: u32, is_final: bool); + fn ignored(&self, id: u32); } struct OutboundSync { - context: Arc, - peer: PeerId, + context: Arc, } impl OutboundSync { - pub fn new(context: Arc, peer: PeerId) -> OutboundSync { + pub fn new(context: Arc) -> OutboundSync { OutboundSync { context: context, - peer: peer, } } - pub fn send_message(&self, message: &T) where T: Payload { - let send = Context::send_to_peer(self.context.clone(), self.peer, message); - self.context.spawn(send); - } - pub fn boxed(self) -> Box { Box::new(self) } } impl OutboundSyncConnection for OutboundSync { - fn send_inventory(&self, message: &types::Inv) { - self.send_message(message); + fn send_inventory(&self, message: &types::Inv, id: u32, is_final: bool) { + self.context.send_response(message, id, is_final); } fn send_getdata(&self, message: &types::GetData) { - self.send_message(message); + self.context.send_request(message); } fn send_getblocks(&self, message: &types::GetBlocks) { - self.send_message(message); + self.context.send_request(message); } fn send_getheaders(&self, message: &types::GetHeaders) { - self.send_message(message); + self.context.send_request(message); } fn send_transaction(&self, message: &types::Tx) { - self.send_message(message); + self.context.send_request(message); } - fn send_block(&self, message: &types::Block) { - self.send_message(message); + fn send_block(&self, message: &types::Block, id: u32, is_final: bool) { + self.context.send_response(message, id, is_final); } - fn send_headers(&self, message: &types::Headers) { - self.send_message(message); + fn send_headers(&self, message: &types::Headers, id: u32, is_final: bool) { + self.context.send_response(message, id, is_final); } - fn send_mempool(&self, message: &types::MemPool) { - self.send_message(message); + fn send_mempool(&self, message: &types::MemPool, id: u32, is_final: bool) { + self.context.send_response(message, id, is_final); } fn send_filterload(&self, message: &types::FilterLoad) { - self.send_message(message); + self.context.send_request(message); } fn send_filteradd(&self, message: &types::FilterAdd) { - self.send_message(message); + self.context.send_request(message); } fn send_filterclear(&self, message: &types::FilterClear) { - self.send_message(message); + self.context.send_request(message); } fn send_merkleblock(&self, message: &types::MerkleBlock) { - self.send_message(message); + self.context.send_request(message); } fn send_sendheaders(&self, message: &types::SendHeaders) { - self.send_message(message); + self.context.send_request(message); } fn send_feefilter(&self, message: &types::FeeFilter) { - self.send_message(message); + self.context.send_request(message); } fn send_send_compact(&self, message: &types::SendCompact) { - self.send_message(message); + self.context.send_request(message); } fn send_compact_block(&self, message: &types::CompactBlock) { - self.send_message(message); + self.context.send_request(message); } fn send_get_block_txn(&self, message: &types::GetBlockTxn) { - self.send_message(message); + self.context.send_request(message); } fn send_block_txn(&self, message: &types::BlockTxn) { - self.send_message(message); + self.context.send_request(message); } - fn send_notfound(&self, message: &types::NotFound) { - self.send_message(message); + fn send_notfound(&self, message: &types::NotFound, id: u32, is_final: bool) { + self.context.send_response(message, id, is_final); + } + + fn ignored(&self, id: u32) { + self.context.ignore_response(id); } } pub struct SyncProtocol { inbound_connection: InboundSyncConnectionRef, - info: PeerInfo, + context: Arc, } impl SyncProtocol { - pub fn new(context: Arc, info: PeerInfo) -> Self { - let outbound_connection = OutboundSync::new(context.clone(), info.id).boxed(); - let inbound_connection = context.create_sync_session(0, outbound_connection); + pub fn new(context: Arc) -> Self { + let outbound_connection = OutboundSync::new(context.clone()).boxed(); + let inbound_connection = context.global().create_sync_session(0, outbound_connection); SyncProtocol { inbound_connection: inbound_connection, - info: info, + context: context, } } } impl Protocol for SyncProtocol { fn initialize(&mut self) { - self.inbound_connection.start_sync_session(self.info.version); + self.inbound_connection.start_sync_session(self.context.info().version); } fn on_message(&mut self, command: &Command, payload: &Bytes) -> Result<(), Error> { + let version = self.context.info().version; if command == &types::Inv::command() { - let message: types::Inv = try!(deserialize_payload(payload, self.info.version)); + let message: types::Inv = try!(deserialize_payload(payload, version)); self.inbound_connection.on_inventory(message); } else if command == &types::GetData::command() { - let message: types::GetData = try!(deserialize_payload(payload, self.info.version)); - self.inbound_connection.on_getdata(message); + let message: types::GetData = try!(deserialize_payload(payload, version)); + let id = self.context.declare_response(); + trace!("declared response {} for request: {}", id, types::GetData::command()); + self.inbound_connection.on_getdata(message, id); } else if command == &types::GetBlocks::command() { - let message: types::GetBlocks = try!(deserialize_payload(payload, self.info.version)); - self.inbound_connection.on_getblocks(message); + let message: types::GetBlocks = try!(deserialize_payload(payload, version)); + let id = self.context.declare_response(); + trace!("declared response {} for request: {}", id, types::GetBlocks::command()); + self.inbound_connection.on_getblocks(message, id); } else if command == &types::GetHeaders::command() { - let message: types::GetHeaders = try!(deserialize_payload(payload, self.info.version)); - self.inbound_connection.on_getheaders(message); + let message: types::GetHeaders = try!(deserialize_payload(payload, version)); + let id = self.context.declare_response(); + trace!("declared response {} for request: {}", id, types::GetHeaders::command()); + self.inbound_connection.on_getheaders(message, id); } else if command == &types::Tx::command() { - let message: types::Tx = try!(deserialize_payload(payload, self.info.version)); + let message: types::Tx = try!(deserialize_payload(payload, version)); self.inbound_connection.on_transaction(message); } else if command == &types::Block::command() { - let message: types::Block = try!(deserialize_payload(payload, self.info.version)); + let message: types::Block = try!(deserialize_payload(payload, version)); self.inbound_connection.on_block(message); } else if command == &types::MemPool::command() { - let message: types::MemPool = try!(deserialize_payload(payload, self.info.version)); - self.inbound_connection.on_mempool(message); + let message: types::MemPool = try!(deserialize_payload(payload, version)); + let id = self.context.declare_response(); + trace!("declared response {} for request: {}", id, types::MemPool::command()); + self.inbound_connection.on_mempool(message, id); } else if command == &types::Headers::command() { - let message: types::Headers = try!(deserialize_payload(payload, self.info.version)); + let message: types::Headers = try!(deserialize_payload(payload, version)); self.inbound_connection.on_headers(message); } else if command == &types::FilterLoad::command() { - let message: types::FilterLoad = try!(deserialize_payload(payload, self.info.version)); + let message: types::FilterLoad = try!(deserialize_payload(payload, version)); self.inbound_connection.on_filterload(message); } else if command == &types::FilterAdd::command() { - let message: types::FilterAdd = try!(deserialize_payload(payload, self.info.version)); + let message: types::FilterAdd = try!(deserialize_payload(payload, version)); self.inbound_connection.on_filteradd(message); } else if command == &types::FilterClear::command() { - let message: types::FilterClear = try!(deserialize_payload(payload, self.info.version)); + let message: types::FilterClear = try!(deserialize_payload(payload, version)); self.inbound_connection.on_filterclear(message); } else if command == &types::MerkleBlock::command() { - let message: types::MerkleBlock = try!(deserialize_payload(payload, self.info.version)); + let message: types::MerkleBlock = try!(deserialize_payload(payload, version)); self.inbound_connection.on_merkleblock(message); } else if command == &types::SendHeaders::command() { - let message: types::SendHeaders = try!(deserialize_payload(payload, self.info.version)); + let message: types::SendHeaders = try!(deserialize_payload(payload, version)); self.inbound_connection.on_sendheaders(message); } else if command == &types::FeeFilter::command() { - let message: types::FeeFilter = try!(deserialize_payload(payload, self.info.version)); + let message: types::FeeFilter = try!(deserialize_payload(payload, version)); self.inbound_connection.on_feefilter(message); } else if command == &types::SendCompact::command() { - let message: types::SendCompact = try!(deserialize_payload(payload, self.info.version)); + let message: types::SendCompact = try!(deserialize_payload(payload, version)); self.inbound_connection.on_send_compact(message); } else if command == &types::CompactBlock::command() { - let message: types::CompactBlock = try!(deserialize_payload(payload, self.info.version)); + let message: types::CompactBlock = try!(deserialize_payload(payload, version)); self.inbound_connection.on_compact_block(message); } else if command == &types::GetBlockTxn::command() { - let message: types::GetBlockTxn = try!(deserialize_payload(payload, self.info.version)); + let message: types::GetBlockTxn = try!(deserialize_payload(payload, version)); self.inbound_connection.on_get_block_txn(message); } else if command == &types::BlockTxn::command() { - let message: types::BlockTxn = try!(deserialize_payload(payload, self.info.version)); + let message: types::BlockTxn = try!(deserialize_payload(payload, version)); self.inbound_connection.on_block_txn(message); } else if command == &types::NotFound::command() { - let message: types::NotFound = try!(deserialize_payload(payload, self.info.version)); + let message: types::NotFound = try!(deserialize_payload(payload, version)); self.inbound_connection.on_notfound(message); } Ok(()) diff --git a/p2p/src/session.rs b/p2p/src/session.rs index dc960d1a..81ac5117 100644 --- a/p2p/src/session.rs +++ b/p2p/src/session.rs @@ -3,42 +3,47 @@ use parking_lot::Mutex; use bytes::Bytes; use message::{Command, Error}; use p2p::Context; +use net::PeerContext; use protocol::{Protocol, PingProtocol, SyncProtocol, AddrProtocol, SeednodeProtocol}; -use util::{PeerInfo}; +use util::PeerInfo; pub trait SessionFactory { - fn new_session(context: Arc, info: PeerInfo) -> Session; + fn new_session(context: Arc, info: PeerInfo, synchronous: bool) -> Session; } pub struct SeednodeSessionFactory; impl SessionFactory for SeednodeSessionFactory { - fn new_session(context: Arc, info: PeerInfo) -> Session { - let ping = PingProtocol::new(context.clone(), info.clone()).boxed(); - let addr = AddrProtocol::new(context.clone(), info.clone()).boxed(); - let seed = SeednodeProtocol::new(context.clone(), info).boxed(); - Session::new(vec![ping, addr, seed]) + fn new_session(context: Arc, info: PeerInfo, synchronous: bool) -> Session { + let peer_context = Arc::new(PeerContext::new(context, info, synchronous)); + let ping = PingProtocol::new(peer_context.clone()).boxed(); + let addr = AddrProtocol::new(peer_context.clone()).boxed(); + let seed = SeednodeProtocol::new(peer_context.clone()).boxed(); + Session::new(peer_context, vec![ping, addr, seed]) } } pub struct NormalSessionFactory; impl SessionFactory for NormalSessionFactory { - fn new_session(context: Arc, info: PeerInfo) -> Session { - let ping = PingProtocol::new(context.clone(), info.clone()).boxed(); - let addr = AddrProtocol::new(context.clone(), info.clone()).boxed(); - let sync = SyncProtocol::new(context, info).boxed(); - Session::new(vec![ping, addr, sync]) + fn new_session(context: Arc, info: PeerInfo, synchronous: bool) -> Session { + let peer_context = Arc::new(PeerContext::new(context, info, synchronous)); + let ping = PingProtocol::new(peer_context.clone()).boxed(); + let addr = AddrProtocol::new(peer_context.clone()).boxed(); + let sync = SyncProtocol::new(peer_context.clone()).boxed(); + Session::new(peer_context, vec![ping, addr, sync]) } } pub struct Session { + _peer_context: Arc, protocols: Mutex>>, } impl Session { - pub fn new(protocols: Vec>) -> Self { + pub fn new(peer_context: Arc, protocols: Vec>) -> Self { Session { + _peer_context: peer_context, protocols: Mutex::new(protocols), } } diff --git a/p2p/src/util/mod.rs b/p2p/src/util/mod.rs index 8672f720..ec7c641c 100644 --- a/p2p/src/util/mod.rs +++ b/p2p/src/util/mod.rs @@ -2,8 +2,10 @@ pub mod nonce; pub mod time; mod node_table; mod peer; +mod response_queue; mod synchronizer; pub use self::node_table::{NodeTable, Node}; pub use self::peer::{PeerId, PeerInfo, Direction}; +pub use self::response_queue::{ResponseQueue, Responses}; pub use self::synchronizer::{Synchronizer, ConfigurableSynchronizer}; diff --git a/p2p/src/util/response_queue.rs b/p2p/src/util/response_queue.rs new file mode 100644 index 00000000..0f361010 --- /dev/null +++ b/p2p/src/util/response_queue.rs @@ -0,0 +1,45 @@ +use std::collections::{HashMap, HashSet}; +use bytes::Bytes; + +/// Queue of out-of-order responses. Each peer has it's own queue. +#[derive(Debug, Default)] +pub struct ResponseQueue { + unfinished: HashMap>, + finished: HashMap>, + ignored: HashSet, +} + +pub enum Responses { + Unfinished(Vec), + Finished(Vec), + Ignored, +} + +impl ResponseQueue { + pub fn push_unfinished_response(&mut self, id: u32, response: Bytes) { + self.unfinished.entry(id).or_insert_with(Vec::new).push(response) + } + + pub fn push_finished_response(&mut self, id: u32, response: Bytes) { + let mut responses = self.unfinished.remove(&id).unwrap_or_default(); + responses.push(response); + let previous = self.finished.insert(id, responses); + assert!(previous.is_none(), "logic error; same finished response should never be pushed twice"); + } + + pub fn push_ignored_response(&mut self, id: u32) { + assert!(self.ignored.insert(id), "logic error; same response should never be ignored twice"); + } + + pub fn responses(&mut self, id: u32) -> Option { + self.unfinished.remove(&id).map(Responses::Unfinished) + .or_else(|| self.finished.remove(&id).map(Responses::Finished)) + .or_else(|| { + if self.ignored.remove(&id) { + Some(Responses::Ignored) + } else { + None + } + }) + } +} diff --git a/p2p/src/util/synchronizer.rs b/p2p/src/util/synchronizer.rs index f4e78671..791c9b66 100644 --- a/p2p/src/util/synchronizer.rs +++ b/p2p/src/util/synchronizer.rs @@ -11,6 +11,9 @@ pub trait Synchronizer: Send { /// Declare sending response in future. fn declare_response(&mut self) -> u32; + /// Returns true if permission for response is granted, but without marking response as sent. + fn is_permitted(&self, id: u32) -> bool; + /// Returns true if permission for sending response is granted. fn permission_for_response(&mut self, id: u32) -> bool; } @@ -29,6 +32,10 @@ impl Synchronizer for FifoSynchronizer { result } + fn is_permitted(&self, id: u32) -> bool { + id == self.next_to_grant + } + fn permission_for_response(&mut self, id: u32) -> bool { // there should be an assertion here, assert!(id < self.declared_responses), // but it's impossible to write an assertion if the value may overflow @@ -54,6 +61,10 @@ impl Synchronizer for NoopSynchronizer { result } + fn is_permitted(&self, _id: u32) -> bool { + true + } + fn permission_for_response(&mut self, _id: u32) -> bool { true } @@ -81,18 +92,8 @@ impl ThresholdSynchronizer { to_grant_max: declared, } } -} - -impl Synchronizer for ThresholdSynchronizer { - fn declare_response(&mut self) -> u32 { - self.inner.declare_response() - } - - fn permission_for_response(&mut self, id: u32) -> bool { - if self.inner.permission_for_response(id) { - return true; - } + fn within_threshold(&self, id: u32) -> bool { if self.to_grant_min <= self.to_grant_max { // if max is bigger then min, id must be in range [min, max) self.to_grant_min <= id && id < self.to_grant_max @@ -104,6 +105,20 @@ impl Synchronizer for ThresholdSynchronizer { } } +impl Synchronizer for ThresholdSynchronizer { + fn declare_response(&mut self) -> u32 { + self.inner.declare_response() + } + + fn is_permitted(&self, id: u32) -> bool { + self.inner.is_permitted(id) || self.within_threshold(id) + } + + fn permission_for_response(&mut self, id: u32) -> bool { + self.inner.permission_for_response(id) || self.within_threshold(id) + } +} + #[derive(Debug)] enum InnerSynchronizer { Noop(NoopSynchronizer), @@ -170,6 +185,13 @@ impl Synchronizer for ConfigurableSynchronizer { } } + fn is_permitted(&self, id: u32) -> bool { + match self.inner { + InnerSynchronizer::Noop(ref s) => s.is_permitted(id), + InnerSynchronizer::Threshold(ref s) => s.is_permitted(id), + } + } + fn permission_for_response(&mut self, id: u32) -> bool { match self.inner { InnerSynchronizer::Threshold(ref mut s) => s.permission_for_response(id), diff --git a/sync/src/inbound_connection.rs b/sync/src/inbound_connection.rs index 02b89e4d..4f79ecc6 100644 --- a/sync/src/inbound_connection.rs +++ b/sync/src/inbound_connection.rs @@ -29,16 +29,16 @@ impl InboundSyncConnection for InboundConnection { self.local_node.on_peer_inventory(self.peer_index, message); } - fn on_getdata(&self, message: types::GetData) { - self.local_node.on_peer_getdata(self.peer_index, message); + fn on_getdata(&self, message: types::GetData, id: u32) { + self.local_node.on_peer_getdata(self.peer_index, message, id); } - fn on_getblocks(&self, message: types::GetBlocks) { - self.local_node.on_peer_getblocks(self.peer_index, message); + fn on_getblocks(&self, message: types::GetBlocks, id: u32) { + self.local_node.on_peer_getblocks(self.peer_index, message, id); } - fn on_getheaders(&self, message: types::GetHeaders) { - self.local_node.on_peer_getheaders(self.peer_index, message); + fn on_getheaders(&self, message: types::GetHeaders, id: u32) { + self.local_node.on_peer_getheaders(self.peer_index, message, id); } fn on_transaction(&self, message: types::Tx) { @@ -53,8 +53,8 @@ impl InboundSyncConnection for InboundConnection { self.local_node.on_peer_headers(self.peer_index, message); } - fn on_mempool(&self, message: types::MemPool) { - self.local_node.on_peer_mempool(self.peer_index, message); + fn on_mempool(&self, message: types::MemPool, id: u32) { + self.local_node.on_peer_mempool(self.peer_index, message, id); } fn on_filterload(&self, message: types::FilterLoad) { diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index 4672039d..b9b00742 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -93,23 +93,24 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon // TODO: process unknown transactions, etc... } - pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData) { + pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData, id: u32) { trace!(target: "sync", "Got `getdata` message from peer#{}", peer_index); - self.server.serve_getdata(peer_index, message); + self.server.serve_getdata(peer_index, message, id); } - pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks) { + pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32) { trace!(target: "sync", "Got `getblocks` message from peer#{}", peer_index); - self.server.serve_getblocks(peer_index, message); + self.server.serve_getblocks(peer_index, message, id); } - pub fn on_peer_getheaders(&self, peer_index: usize, message: types::GetHeaders) { + pub fn on_peer_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32) { trace!(target: "sync", "Got `getheaders` message from peer#{}", peer_index); // do not serve getheaders requests until we are synchronized if self.client.lock().state().is_synchronizing() { + self.executor.lock().execute(SynchronizationTask::Ignore(peer_index, id)); return; } @@ -124,7 +125,7 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon need_wait }; - self.server.serve_getheaders(peer_index, message); + self.server.serve_getheaders(peer_index, message, id); if need_wait { self.server.wait_peer_requests_completed(peer_index); } @@ -149,10 +150,10 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon } } - pub fn on_peer_mempool(&self, peer_index: usize, _message: types::MemPool) { + pub fn on_peer_mempool(&self, peer_index: usize, _message: types::MemPool, id: u32) { trace!(target: "sync", "Got `mempool` message from peer#{}", peer_index); - self.server.serve_mempool(peer_index); + self.server.serve_mempool(peer_index, id); } pub fn on_peer_filterload(&self, peer_index: usize, _message: types::FilterLoad) { @@ -238,14 +239,14 @@ mod tests { } impl OutboundSyncConnection for DummyOutboundSyncConnection { - fn send_inventory(&self, _message: &types::Inv) {} + fn send_inventory(&self, _message: &types::Inv, _id: u32, _is_final: bool) {} fn send_getdata(&self, _message: &types::GetData) {} fn send_getblocks(&self, _message: &types::GetBlocks) {} fn send_getheaders(&self, _message: &types::GetHeaders) {} fn send_transaction(&self, _message: &types::Tx) {} - fn send_block(&self, _message: &types::Block) {} - fn send_headers(&self, _message: &types::Headers) {} - fn send_mempool(&self, _message: &types::MemPool) {} + fn send_block(&self, _message: &types::Block, _id: u32, _is_final: bool) {} + fn send_headers(&self, _message: &types::Headers, _id: u32, _is_final: bool) {} + fn send_mempool(&self, _message: &types::MemPool, _id: u32, _is_final: bool) {} fn send_filterload(&self, _message: &types::FilterLoad) {} fn send_filteradd(&self, _message: &types::FilterAdd) {} fn send_filterclear(&self, _message: &types::FilterClear) {} @@ -256,7 +257,8 @@ mod tests { fn send_compact_block(&self, _message: &types::CompactBlock) {} fn send_get_block_txn(&self, _message: &types::GetBlockTxn) {} fn send_block_txn(&self, _message: &types::BlockTxn) {} - fn send_notfound(&self, _message: &types::NotFound) {} + fn send_notfound(&self, _message: &types::NotFound, _id: u32, _is_final: bool) {} + fn ignored(&self, _id: u32) {} } fn create_local_node() -> (Core, Handle, Arc>, Arc, LocalNode>) { @@ -294,9 +296,10 @@ mod tests { hash: genesis_block_hash.clone(), } ]; + let dummy_id = 0; local_node.on_peer_getdata(peer_index, types::GetData { inventory: inventory.clone() - }); + }, dummy_id); // => `getdata` is served let tasks = server.take_tasks(); assert_eq!(tasks, vec![(peer_index, ServerTask::ServeGetData(inventory))]); diff --git a/sync/src/synchronization_executor.rs b/sync/src/synchronization_executor.rs index e1cc9a6f..819d75d9 100644 --- a/sync/src/synchronization_executor.rs +++ b/sync/src/synchronization_executor.rs @@ -7,6 +7,7 @@ use message::types; use primitives::hash::H256; use p2p::OutboundSyncConnectionRef; use synchronization_chain::ChainRef; +use synchronization_server::ServerTaskIndex; use local_node::PeersConnections; pub type LocalSynchronizationTaskExecutorRef = Arc>; @@ -24,13 +25,15 @@ pub enum Task { /// Request blocks headers using full getheaders.block_locator_hashes. RequestBlocksHeaders(usize), /// Send block. - SendBlock(usize, Block), + SendBlock(usize, Block, ServerTaskIndex), /// Send notfound - SendNotFound(usize, Vec), + SendNotFound(usize, Vec, ServerTaskIndex), /// Send inventory - SendInventory(usize, Vec), + SendInventory(usize, Vec, ServerTaskIndex), /// Send headers - SendHeaders(usize, Vec), + SendHeaders(usize, Vec, ServerTaskIndex), + /// Notify io about ignored request + Ignore(usize, u32), } /// Synchronization tasks executor @@ -75,7 +78,6 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { }; if let Some(connection) = self.peers.get_mut(&peer_index) { - let connection = &mut *connection; trace!(target: "sync", "Querying {} unknown blocks from peer#{}", getdata.inventory.len(), peer_index); connection.send_getdata(&getdata); } @@ -89,53 +91,54 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { }; if let Some(connection) = self.peers.get_mut(&peer_index) { - let connection = &mut *connection; trace!(target: "sync", "Request blocks hashes from peer#{} using getheaders", peer_index); connection.send_getheaders(&getheaders); } }, - Task::SendBlock(peer_index, block) => { + Task::SendBlock(peer_index, block, id) => { let block_message = types::Block { block: block, }; if let Some(connection) = self.peers.get_mut(&peer_index) { - let connection = &mut *connection; trace!(target: "sync", "Sending block {:?} to peer#{}", block_message.block.hash(), peer_index); - connection.send_block(&block_message); + connection.send_block(&block_message, id.raw(), id.is_final()); } }, - Task::SendNotFound(peer_index, unknown_inventory) => { + Task::SendNotFound(peer_index, unknown_inventory, id) => { let notfound = types::NotFound { inventory: unknown_inventory, }; if let Some(connection) = self.peers.get_mut(&peer_index) { - let connection = &mut *connection; trace!(target: "sync", "Sending notfound to peer#{} with {} items", peer_index, notfound.inventory.len()); - connection.send_notfound(¬found); + connection.send_notfound(¬found, id.raw(), id.is_final()); } }, - Task::SendInventory(peer_index, inventory) => { + Task::SendInventory(peer_index, inventory, id) => { let inventory = types::Inv { inventory: inventory, }; if let Some(connection) = self.peers.get_mut(&peer_index) { - let connection = &mut *connection; trace!(target: "sync", "Sending inventory to peer#{} with {} items", peer_index, inventory.inventory.len()); - connection.send_inventory(&inventory); + connection.send_inventory(&inventory, id.raw(), id.is_final()); } }, - Task::SendHeaders(peer_index, headers) => { + Task::SendHeaders(peer_index, headers, id) => { let headers = types::Headers { headers: headers, }; if let Some(connection) = self.peers.get_mut(&peer_index) { - let connection = &mut *connection; trace!(target: "sync", "Sending headers to peer#{} with {} items", peer_index, headers.headers.len()); - connection.send_headers(&headers); + connection.send_headers(&headers, id.raw(), id.is_final()); + } + }, + Task::Ignore(peer_index, id) => { + if let Some(connection) = self.peers.get_mut(&peer_index) { + trace!(target: "sync", "Ignoring request from peer#{} with id {}", peer_index, id); + connection.ignored(id); } }, } @@ -194,4 +197,4 @@ pub mod tests { self.waiter.notify_one(); } } -} \ No newline at end of file +} diff --git a/sync/src/synchronization_server.rs b/sync/src/synchronization_server.rs index aec9d303..1dc7d82f 100644 --- a/sync/src/synchronization_server.rs +++ b/sync/src/synchronization_server.rs @@ -14,10 +14,10 @@ use message::types; /// Synchronization requests server trait pub trait Server : Send + 'static { - fn serve_getdata(&self, peer_index: usize, message: types::GetData); - fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks); - fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders); - fn serve_mempool(&self, peer_index: usize); + fn serve_getdata(&self, peer_index: usize, message: types::GetData, id: u32); + fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32); + fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32); + fn serve_mempool(&self, peer_index: usize, id: u32); fn wait_peer_requests_completed(&self, peer_index: usize); } @@ -42,10 +42,59 @@ struct ServerQueue { is_stopping: AtomicBool, queue_ready: Arc, peers_queue: VecDeque, - tasks_queue: HashMap>, + tasks_queue: HashMap>, peer_waiters: HashMap>, } +/// `ServerTask` index. +#[derive(Debug, PartialEq)] +pub enum ServerTaskIndex { + /// `Partial` is used when server needs to send more than one response for request. + Partial(u32), + /// `Final` task task can be preceded by many `Partial` tasks with the same id. + Final(u32), +} + +impl ServerTaskIndex { + pub fn raw(&self) -> u32 { + match *self { + ServerTaskIndex::Partial(id) => id, + ServerTaskIndex::Final(id) => id, + } + } + + pub fn is_final(&self) -> bool { + match *self { + ServerTaskIndex::Partial(_) => false, + ServerTaskIndex::Final(_) => true, + } + } +} + +/// Server tests together with unique id assigned to it +#[derive(Debug, PartialEq)] +pub struct IndexedServerTask { + /// Task itself. + task: ServerTask, + /// Task id. + id: ServerTaskIndex, +} + +impl IndexedServerTask { + fn new(task: ServerTask, id: ServerTaskIndex) -> Self { + IndexedServerTask { + task: task, + id: id, + } + } +} + +impl IndexedServerTask { + fn ignore(id: u32) -> Self { + IndexedServerTask::new(ServerTask::Ignore, ServerTaskIndex::Final(id)) + } +} + #[derive(Debug, PartialEq)] pub enum ServerTask { ServeGetData(Vec), @@ -54,6 +103,7 @@ pub enum ServerTask { ServeMempool, ReturnNotFound(Vec), ReturnBlock(H256), + Ignore, } impl SynchronizationServer { @@ -97,11 +147,18 @@ impl SynchronizationServer { }) }; - match server_task { - // `getdata` => `notfound` + `block` + ... - Some((peer_index, ServerTask::ServeGetData(inventory))) => { + let (peer_index, indexed_task) = match server_task { + Some((peer_index, indexed_task)) => (peer_index, indexed_task), + // no tasks after wake-up => stopping or pausing + _ => continue, + }; + + match indexed_task.task { + // `getdata` => `notfound` + `block` + ... + ServerTask::ServeGetData(inventory) => { let mut unknown_items: Vec = Vec::new(); - let mut new_tasks: Vec = Vec::new(); + let mut new_tasks: Vec = Vec::new(); + let task_id = indexed_task.id.raw(); { let chain = chain.read(); let storage = chain.storage(); @@ -109,7 +166,10 @@ impl SynchronizationServer { match item.inv_type { InventoryType::MessageBlock => { match storage.block_number(&item.hash) { - Some(_) => new_tasks.push(ServerTask::ReturnBlock(item.hash.clone())), + Some(_) => { + let task = IndexedServerTask::new(ServerTask::ReturnBlock(item.hash.clone()), ServerTaskIndex::Partial(task_id)); + new_tasks.push(task); + }, None => unknown_items.push(item), } }, @@ -120,18 +180,25 @@ impl SynchronizationServer { // respond with `notfound` message for unknown data if !unknown_items.is_empty() { trace!(target: "sync", "Going to respond with notfound with {} items to peer#{}", unknown_items.len(), peer_index); - new_tasks.push(ServerTask::ReturnNotFound(unknown_items)); + let task = IndexedServerTask::new(ServerTask::ReturnNotFound(unknown_items), ServerTaskIndex::Partial(task_id)); + new_tasks.push(task); } // schedule data responses if !new_tasks.is_empty() { trace!(target: "sync", "Going to respond with data with {} items to peer#{}", new_tasks.len(), peer_index); + // mark last task as the final one + if let Some(task) = new_tasks.last_mut() { + task.id = ServerTaskIndex::Final(task_id); + } queue.lock().add_tasks(peer_index, new_tasks); + } else { + executor.lock().execute(Task::Ignore(peer_index, task_id)); } // inform that we have processed task for peer queue.lock().task_processed(peer_index); }, // `getblocks` => `inventory` - Some((peer_index, ServerTask::ServeGetBlocks(best_block, hash_stop))) => { + ServerTask::ServeGetBlocks(best_block, hash_stop) => { let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_block, &hash_stop, 500); if !blocks_hashes.is_empty() { trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", blocks_hashes.len(), peer_index); @@ -139,25 +206,29 @@ impl SynchronizationServer { inv_type: InventoryType::MessageBlock, hash: hash, }).collect(); - executor.lock().execute(Task::SendInventory(peer_index, inventory)); + executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id)); + } else { + executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw())); } // inform that we have processed task for peer queue.lock().task_processed(peer_index); }, // `getheaders` => `headers` - Some((peer_index, ServerTask::ServeGetHeaders(best_block, hash_stop))) => { + ServerTask::ServeGetHeaders(best_block, hash_stop) => { // What if we have no common blocks with peer at all? Maybe drop connection or penalize peer? // https://github.com/ethcore/parity-bitcoin/pull/91#discussion_r86734568 let blocks_headers = SynchronizationServer::blocks_headers_after(&chain, &best_block, &hash_stop, 2000); if !blocks_headers.is_empty() { trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_headers.len(), peer_index); - executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers)); + executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers, indexed_task.id)); + } else { + executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw())); } // inform that we have processed task for peer queue.lock().task_processed(peer_index); }, // `mempool` => `inventory` - Some((peer_index, ServerTask::ServeMempool)) => { + ServerTask::ServeMempool => { let inventory: Vec<_> = chain.read() .memory_pool() .get_transactions_ids() @@ -169,27 +240,32 @@ impl SynchronizationServer { .collect(); if !inventory.is_empty() { trace!(target: "sync", "Going to respond with {} memory-pool transactions ids to peer#{}", inventory.len(), peer_index); - executor.lock().execute(Task::SendInventory(peer_index, inventory)); + executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id)); + } else { + executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw())); } // inform that we have processed task for peer queue.lock().task_processed(peer_index); }, // `notfound` - Some((peer_index, ServerTask::ReturnNotFound(inventory))) => { - executor.lock().execute(Task::SendNotFound(peer_index, inventory)); + ServerTask::ReturnNotFound(inventory) => { + executor.lock().execute(Task::SendNotFound(peer_index, inventory, indexed_task.id)); // inform that we have processed task for peer queue.lock().task_processed(peer_index); }, // `block` - Some((peer_index, ServerTask::ReturnBlock(block_hash))) => { + ServerTask::ReturnBlock(block_hash) => { let block = chain.read().storage().block(db::BlockRef::Hash(block_hash)) .expect("we have checked that block exists in ServeGetData; db is append-only; qed"); - executor.lock().execute(Task::SendBlock(peer_index, block)); + executor.lock().execute(Task::SendBlock(peer_index, block, indexed_task.id)); // inform that we have processed task for peer queue.lock().task_processed(peer_index); }, - // no tasks after wake-up => stopping or pausing - None => (), + // ignore + ServerTask::Ignore => { + executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw())); + queue.lock().task_processed(peer_index); + }, } } } @@ -273,32 +349,38 @@ impl Drop for SynchronizationServer { } impl Server for SynchronizationServer { - fn serve_getdata(&self, peer_index: usize, message: types::GetData) { - self.queue.lock().add_task(peer_index, ServerTask::ServeGetData(message.inventory)); + fn serve_getdata(&self, peer_index: usize, message: types::GetData, id: u32) { + let task = IndexedServerTask::new(ServerTask::ServeGetData(message.inventory), ServerTaskIndex::Final(id)); + self.queue.lock().add_task(peer_index, task); } - fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) { + fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32) { if let Some(best_common_block) = self.locate_known_block_hash(message.block_locator_hashes) { trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash); - self.queue.lock().add_task(peer_index, ServerTask::ServeGetBlocks(best_common_block, message.hash_stop)); + let task = IndexedServerTask::new(ServerTask::ServeGetBlocks(best_common_block, message.hash_stop), ServerTaskIndex::Final(id)); + self.queue.lock().add_task(peer_index, task); } else { trace!(target: "sync", "No common blocks with peer#{}", peer_index); + self.queue.lock().add_task(peer_index, IndexedServerTask::ignore(id)); } } - fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders) { + fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32) { if let Some(best_common_block) = self.locate_known_block_header(message.block_locator_hashes) { trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash); - self.queue.lock().add_task(peer_index, ServerTask::ServeGetHeaders(best_common_block, message.hash_stop)); + let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(best_common_block, message.hash_stop), ServerTaskIndex::Final(id)); + self.queue.lock().add_task(peer_index, task); } else { trace!(target: "sync", "No common blocks headers with peer#{}", peer_index); + self.queue.lock().add_task(peer_index, IndexedServerTask::ignore(id)); } } - fn serve_mempool(&self, peer_index: usize) { - self.queue.lock().add_task(peer_index, ServerTask::ServeMempool); + fn serve_mempool(&self, peer_index: usize, id: u32) { + let task = IndexedServerTask::new(ServerTask::ServeMempool, ServerTaskIndex::Final(id)); + self.queue.lock().add_task(peer_index, task); } fn wait_peer_requests_completed(&self, peer_index: usize) { @@ -322,7 +404,7 @@ impl ServerQueue { } } - pub fn next_task(&mut self) -> Option<(usize, ServerTask)> { + pub fn next_task(&mut self) -> Option<(usize, IndexedServerTask)> { self.peers_queue.pop_front() .map(|peer| { let (peer_task, no_tasks_left) = { @@ -353,7 +435,7 @@ impl ServerQueue { } } - pub fn add_task(&mut self, peer_index: usize, task: ServerTask) { + pub fn add_task(&mut self, peer_index: usize, task: IndexedServerTask) { match self.tasks_queue.entry(peer_index) { Entry::Occupied(mut entry) => { let add_to_peers_queue = entry.get().is_empty(); @@ -372,7 +454,7 @@ impl ServerQueue { self.queue_ready.notify_one(); } - pub fn add_tasks(&mut self, peer_index: usize, tasks: Vec) { + pub fn add_tasks(&mut self, peer_index: usize, tasks: Vec) { match self.tasks_queue.entry(peer_index) { Entry::Occupied(mut entry) => { let add_to_peers_queue = entry.get().is_empty(); @@ -442,7 +524,7 @@ pub mod tests { use synchronization_executor::Task; use synchronization_executor::tests::DummyTaskExecutor; use synchronization_chain::Chain; - use super::{Server, ServerTask, SynchronizationServer}; + use super::{Server, ServerTask, SynchronizationServer, ServerTaskIndex}; pub struct DummyServer { tasks: Mutex>, @@ -461,25 +543,25 @@ pub mod tests { } impl Server for DummyServer { - fn serve_getdata(&self, peer_index: usize, message: types::GetData) { + fn serve_getdata(&self, peer_index: usize, message: types::GetData, _id: u32) { self.tasks.lock().push((peer_index, ServerTask::ServeGetData(message.inventory))); } - fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) { + fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, _id: u32) { self.tasks.lock().push((peer_index, ServerTask::ServeGetBlocks(db::BestBlock { number: 0, hash: message.block_locator_hashes[0].clone(), }, message.hash_stop))); } - fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders) { + fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, _id: u32) { self.tasks.lock().push((peer_index, ServerTask::ServeGetHeaders(db::BestBlock { number: 0, hash: message.block_locator_hashes[0].clone(), }, message.hash_stop))); } - fn serve_mempool(&self, peer_index: usize) { + fn serve_mempool(&self, peer_index: usize, _id: u32) { self.tasks.lock().push((peer_index, ServerTask::ServeMempool)); } @@ -504,12 +586,13 @@ pub mod tests { hash: H256::default(), } ]; + let dummy_id = 0; server.serve_getdata(0, types::GetData { inventory: inventory.clone(), - }); + }, dummy_id); // => respond with notfound let tasks = DummyTaskExecutor::wait_tasks(executor); - assert_eq!(tasks, vec![Task::SendNotFound(0, inventory)]); + assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::Final(dummy_id))]); } #[test] @@ -522,12 +605,13 @@ pub mod tests { hash: test_data::genesis().hash(), } ]; + let dummy_id = 0; server.serve_getdata(0, types::GetData { inventory: inventory.clone(), - }); + }, dummy_id); // => respond with block let tasks = DummyTaskExecutor::wait_tasks(executor); - assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis())]); + assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis(), ServerTaskIndex::Final(dummy_id))]); } #[test] @@ -535,14 +619,15 @@ pub mod tests { let (_, executor, server) = create_synchronization_server(); // when asking for blocks hashes let genesis_block_hash = test_data::genesis().hash(); + let dummy_id = 5; server.serve_getblocks(0, types::GetBlocks { version: 0, block_locator_hashes: vec![genesis_block_hash.clone()], hash_stop: H256::default(), - }); + }, dummy_id); // => no response let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout - assert_eq!(tasks, vec![]); + assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]); } #[test] @@ -550,18 +635,19 @@ pub mod tests { let (chain, executor, server) = create_synchronization_server(); chain.write().insert_best_block(test_data::block_h1().hash(), test_data::block_h1()).expect("Db write error"); // when asking for blocks hashes + let dummy_id = 0; server.serve_getblocks(0, types::GetBlocks { version: 0, block_locator_hashes: vec![test_data::genesis().hash()], hash_stop: H256::default(), - }); + }, dummy_id); // => responds with inventory let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: test_data::block_h1().hash(), }]; let tasks = DummyTaskExecutor::wait_tasks(executor); - assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]); + assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::Final(dummy_id))]); } #[test] @@ -569,14 +655,15 @@ pub mod tests { let (_, executor, server) = create_synchronization_server(); // when asking for blocks hashes let genesis_block_hash = test_data::genesis().hash(); + let dummy_id = 6; server.serve_getheaders(0, types::GetHeaders { version: 0, block_locator_hashes: vec![genesis_block_hash.clone()], hash_stop: H256::default(), - }); + }, dummy_id); // => no response let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout - assert_eq!(tasks, vec![]); + assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]); } #[test] @@ -584,27 +671,29 @@ pub mod tests { let (chain, executor, server) = create_synchronization_server(); chain.write().insert_best_block(test_data::block_h1().hash(), test_data::block_h1()).expect("Db write error"); // when asking for blocks hashes + let dummy_id = 0; server.serve_getheaders(0, types::GetHeaders { version: 0, block_locator_hashes: vec![test_data::genesis().hash()], hash_stop: H256::default(), - }); + }, dummy_id); // => responds with headers let headers = vec![ test_data::block_h1().block_header, ]; let tasks = DummyTaskExecutor::wait_tasks(executor); - assert_eq!(tasks, vec![Task::SendHeaders(0, headers)]); + assert_eq!(tasks, vec![Task::SendHeaders(0, headers, ServerTaskIndex::Final(dummy_id))]); } #[test] fn server_mempool_do_not_responds_inventory_when_empty_memory_pool() { let (_, executor, server) = create_synchronization_server(); // when asking for memory pool transactions ids - server.serve_mempool(0); + let dummy_id = 9; + server.serve_mempool(0, dummy_id); // => no response let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout - assert_eq!(tasks, vec![]); + assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]); } #[test] @@ -615,13 +704,14 @@ pub mod tests { let transaction_hash = transaction.hash(); chain.write().memory_pool_mut().insert_verified(transaction); // when asking for memory pool transactions ids - server.serve_mempool(0); + let dummy_id = 0; + server.serve_mempool(0, dummy_id); // => respond with inventory let inventory = vec![InventoryVector { inv_type: InventoryType::MessageTx, hash: transaction_hash, }]; let tasks = DummyTaskExecutor::wait_tasks(executor); - assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]); + assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::Final(dummy_id))]); } }