Merge pull request #125 from ethcore/synchronizer

synchronizer
This commit is contained in:
Svyatoslav Nikolsky 2016-11-16 17:09:09 +03:00 committed by GitHub
commit 9b6f24849d
17 changed files with 545 additions and 361 deletions

View File

@ -33,3 +33,9 @@ impl<T> AsRef<[u8]> for Message<T> {
self.bytes.as_ref()
}
}
impl<T> From<Message<T>> for Bytes {
fn from(m: Message<T>) -> Self {
m.bytes.into_raw()
}
}

View File

@ -1,6 +1,6 @@
use message::{Payload, Message};
use tokio_core::io::{write_all, WriteAll};
use session::Session;
use io::{SharedTcpStream, WriteMessage, write_message, read_any_message, ReadAnyMessage};
use io::{SharedTcpStream, read_any_message, ReadAnyMessage};
use util::PeerInfo;
pub struct Channel {
@ -18,10 +18,8 @@ impl Channel {
}
}
pub fn write_message<T>(&self, payload: &T) -> WriteMessage<T, SharedTcpStream> where T: Payload {
// TODO: some tracing here
let message = Message::new(self.peer_info.magic, self.peer_info.version, payload).expect("failed to create outgoing message");
write_message(self.stream.clone(), message)
pub fn write_message<T>(&self, message: T) -> WriteAll<SharedTcpStream, T> where T: AsRef<[u8]> {
write_all(self.stream.clone(), message)
}
pub fn read_message(&self) -> ReadAnyMessage<SharedTcpStream> {

View File

@ -9,6 +9,8 @@ use session::{SessionFactory};
use util::{Direction, PeerInfo};
use PeerId;
const SYNCHRONOUS_RESPONSES: bool = true;
#[derive(Default)]
pub struct Connections {
/// Incremental peer counter.
@ -51,7 +53,7 @@ impl Connections {
magic: connection.magic,
};
let session = T::new_session(context, peer_info.clone());
let session = T::new_session(context, peer_info.clone(), SYNCHRONOUS_RESPONSES);
let channel = Arc::new(Channel::new(connection.stream, peer_info, session));
self.channels.write().insert(id, channel.clone());
channel

View File

@ -5,6 +5,7 @@ mod connect;
mod connection;
mod connection_counter;
mod connections;
mod peer_context;
pub use self::accept_connection::{AcceptConnection, accept_connection};
pub use self::channel::Channel;
@ -13,3 +14,4 @@ pub use self::connect::{Connect, connect};
pub use self::connection::Connection;
pub use self::connection_counter::ConnectionCounter;
pub use self::connections::Connections;
pub use self::peer_context::PeerContext;

116
p2p/src/net/peer_context.rs Normal file
View File

@ -0,0 +1,116 @@
use std::sync::Arc;
use parking_lot::Mutex;
use message::{Payload, Message};
use p2p::Context;
use util::{PeerInfo, ConfigurableSynchronizer, ResponseQueue, Synchronizer, Responses};
pub struct PeerContext {
context: Arc<Context>,
info: PeerInfo,
synchronizer: Mutex<ConfigurableSynchronizer>,
response_queue: Mutex<ResponseQueue>,
}
impl PeerContext {
pub fn new(context: Arc<Context>, info: PeerInfo, synchronous: bool) -> Self {
PeerContext {
context: context,
info: info,
synchronizer: Mutex::new(ConfigurableSynchronizer::new(synchronous)),
response_queue: Mutex::default(),
}
}
fn to_message<T>(&self, payload: &T) -> Message<T> where T: Payload {
Message::new(self.info.magic, self.info.version, payload).expect("failed to create outgoing message")
}
fn send_awaiting(&self, sync: &mut ConfigurableSynchronizer, queue: &mut ResponseQueue, start_id: u32) {
let mut next_id = start_id;
loop {
next_id = next_id.overflowing_add(1).0;
match queue.responses(next_id) {
Some(Responses::Finished(messages)) => {
assert!(sync.permission_for_response(next_id));
for message in messages {
let send = Context::send_message_to_peer(self.context.clone(), self.info.id, message);
self.context.spawn(send);
}
},
Some(Responses::Ignored) => {
assert!(sync.permission_for_response(next_id));
},
Some(Responses::Unfinished(messages)) => {
assert!(sync.is_permitted(next_id));
for message in messages {
let send = Context::send_message_to_peer(self.context.clone(), self.info.id, message);
self.context.spawn(send);
}
break;
},
None => {
break;
}
}
}
}
/// Request is always automatically send.
pub fn send_request<T>(&self, payload: &T) where T: Payload {
let send = Context::send_to_peer(self.context.clone(), self.info.id, payload);
self.context.spawn(send);
}
pub fn declare_response(&self) -> u32 {
let d = self.synchronizer.lock().declare_response();
trace!("declared response: {}", d);
d
}
pub fn send_response_inline<T>(&self, payload: &T) where T: Payload {
let id = self.declare_response();
self.send_response(payload, id, true);
}
/// Do not wait for response with given id.
pub fn ignore_response(&self, id: u32) {
let mut sync = self.synchronizer.lock();
let mut queue = self.response_queue.lock();
if sync.permission_for_response(id) {
self.send_awaiting(&mut sync, &mut queue, id);
} else {
queue.push_ignored_response(id);
}
}
/// Responses are sent in order defined by synchronizer.
pub fn send_response<T>(&self, payload: &T, id: u32, is_final: bool) where T: Payload {
trace!("response ready: {}, id: {}, final: {}", T::command(), id, is_final);
let mut sync = self.synchronizer.lock();
let mut queue = self.response_queue.lock();
if is_final {
if sync.permission_for_response(id) {
let send = Context::send_to_peer(self.context.clone(), self.info.id, payload);
self.context.spawn(send);
self.send_awaiting(&mut sync, &mut queue, id);
} else {
queue.push_finished_response(id, self.to_message(payload).into());
}
} else {
if sync.is_permitted(id) {
let send = Context::send_to_peer(self.context.clone(), self.info.id, payload);
self.context.spawn(send);
} else {
queue.push_unfinished_response(id, self.to_message(payload).into());
}
}
}
pub fn info(&self) -> &PeerInfo {
&self.info
}
pub fn global(&self) -> &Arc<Context> {
&self.context
}
}

View File

@ -9,7 +9,7 @@ use tokio_core::net::{TcpListener, TcpStream};
use tokio_core::reactor::{Handle, Remote, Timeout, Interval};
use abstract_ns::Resolver;
use ns_dns_tokio::DnsResolver;
use message::{Payload, MessageResult};
use message::{Payload, MessageResult, Message};
use message::common::Services;
use net::{connect, Connections, Channel, Config as NetConfig, accept_connection, ConnectionCounter};
use util::{NodeTable, Node, Direction};
@ -293,7 +293,22 @@ impl Context {
/// Send message to a channel with given peer id.
pub fn send_to_peer<T>(context: Arc<Context>, peer: PeerId, payload: &T) -> IoFuture<()> where T: Payload {
match context.connections.channel(peer) {
Some(channel) => Context::send(context, channel, payload),
Some(channel) => {
let info = channel.peer_info();
let message = Message::new(info.magic, info.version, payload).expect("failed to create outgoing message");
Context::send(context, channel, message)
},
None => {
// peer no longer exists.
// TODO: should we return error here?
finished(()).boxed()
}
}
}
pub fn send_message_to_peer<T>(context: Arc<Context>, peer: PeerId, message: T) -> IoFuture<()> where T: AsRef<[u8]> + Send + 'static {
match context.connections.channel(peer) {
Some(channel) => Context::send(context, channel, message),
None => {
// peer no longer exists.
// TODO: should we return error here?
@ -303,13 +318,13 @@ impl Context {
}
/// Send message using given channel.
pub fn send<T>(_context: Arc<Context>, channel: Arc<Channel>, payload: &T) -> IoFuture<()> where T: Payload {
trace!("Sending {} message to {}", T::command(), channel.peer_info().address);
channel.write_message(payload).then(move |result| {
pub fn send<T>(_context: Arc<Context>, channel: Arc<Channel>, message: T) -> IoFuture<()> where T: AsRef<[u8]> + Send + 'static {
//trace!("Sending {} message to {}", T::command(), channel.peer_info().address);
channel.write_message(message).then(move |result| {
match result {
Ok(_) => {
// successful send
trace!("Sent {} message to {}", T::command(), channel.peer_info().address);
//trace!("Sent {} message to {}", T::command(), channel.peer_info().address);
finished(()).boxed()
},
Err(err) => {

View File

@ -4,30 +4,26 @@ use bytes::Bytes;
use message::{Error, Command, deserialize_payload, Payload};
use message::types::{GetAddr, Addr};
use protocol::Protocol;
use p2p::Context;
use util::{Direction, PeerInfo};
use net::PeerContext;
use util::Direction;
pub struct AddrProtocol {
/// Context
context: Arc<Context>,
/// Connected peer info.
info: PeerInfo,
context: Arc<PeerContext>,
}
impl AddrProtocol {
pub fn new(context: Arc<Context>, info: PeerInfo) -> Self {
pub fn new(context: Arc<PeerContext>) -> Self {
AddrProtocol {
context: context,
info: info,
}
}
}
impl Protocol for AddrProtocol {
fn initialize(&mut self) {
if let Direction::Outbound = self.info.direction {
let send = Context::send_to_peer(self.context.clone(), self.info.id, &GetAddr);
self.context.spawn(send);
if let Direction::Outbound = self.context.info().direction {
self.context.send_request(&GetAddr);
}
}
@ -35,20 +31,19 @@ impl Protocol for AddrProtocol {
// normal nodes send addr message only after they receive getaddr message
// meanwhile seednodes, surprisingly, send addr message even before they are asked for it
if command == &GetAddr::command() {
let _: GetAddr = try!(deserialize_payload(payload, self.info.version));
let entries = self.context.node_table_entries().into_iter().map(Into::into).collect();
let _: GetAddr = try!(deserialize_payload(payload, self.context.info().version));
let entries = self.context.global().node_table_entries().into_iter().map(Into::into).collect();
let addr = Addr::new(entries);
let send = Context::send_to_peer(self.context.clone(), self.info.id, &addr);
self.context.spawn(send);
self.context.send_response_inline(&addr);
} else if command == &Addr::command() {
let addr: Addr = try!(deserialize_payload(payload, self.info.version));
let addr: Addr = try!(deserialize_payload(payload, self.context.info().version));
match addr {
Addr::V0(_) => {
unreachable!("This version of protocol is not supported!");
},
Addr::V31402(addr) => {
let nodes = addr.addresses.into_iter().map(Into::into).collect();
self.context.update_node_table(nodes);
self.context.global().update_node_table(nodes);
},
}
}
@ -58,18 +53,15 @@ impl Protocol for AddrProtocol {
pub struct SeednodeProtocol {
/// Context
context: Arc<Context>,
/// Connected peer info,
info: PeerInfo,
context: Arc<PeerContext>,
/// Indicates if disconnecting has been scheduled.
disconnecting: bool,
}
impl SeednodeProtocol {
pub fn new(context: Arc<Context>, info: PeerInfo) -> Self {
pub fn new(context: Arc<PeerContext>) -> Self {
SeednodeProtocol {
context: context,
info: info,
disconnecting: false,
}
}
@ -81,9 +73,9 @@ impl Protocol for SeednodeProtocol {
// We can't disconenct after first read. Let's delay it by 60 seconds.
if !self.disconnecting && command == &Addr::command() {
self.disconnecting = true;
let context = self.context.clone();
let peer = self.info.id;
self.context.execute_after(Duration::new(60, 0), move || {
let context = self.context.global().clone();
let peer = self.context.info().id;
self.context.global().execute_after(Duration::new(60, 0), move || {
context.close_channel(peer);
});
}

View File

@ -4,26 +4,12 @@ use message::{Error, Payload, deserialize_payload};
use message::types::{Ping, Pong};
use message::common::Command;
use protocol::Protocol;
use util::{PeerId, PeerInfo};
use net::PeerContext;
use util::nonce::{NonceGenerator, RandomNonce};
use p2p::Context;
pub trait PingContext: Send + Sync {
fn send_to_peer<T>(context: Arc<Self>, peer: PeerId, payload: &T) where Self: Sized, T: Payload;
}
impl PingContext for Context {
fn send_to_peer<T>(context: Arc<Self>, peer: PeerId, payload: &T) where T: Payload {
let send = Context::send_to_peer(context.clone(), peer, payload);
context.spawn(send);
}
}
pub struct PingProtocol<T = RandomNonce, C = Context> {
pub struct PingProtocol<T = RandomNonce, C = PeerContext> {
/// Context
context: Arc<C>,
/// Connected peer info.
info: PeerInfo,
/// Nonce generator.
nonce_generator: T,
/// Last nonce sent in the ping message.
@ -31,32 +17,31 @@ pub struct PingProtocol<T = RandomNonce, C = Context> {
}
impl PingProtocol {
pub fn new(context: Arc<Context>, info: PeerInfo) -> Self {
pub fn new(context: Arc<PeerContext>) -> Self {
PingProtocol {
context: context,
info: info,
nonce_generator: RandomNonce::default(),
last_ping_nonce: None,
}
}
}
impl<T, C> Protocol for PingProtocol<T, C> where T: NonceGenerator + Send, C: PingContext {
impl Protocol for PingProtocol {
fn initialize(&mut self) {
// bitcoind always sends ping, let's do the same
let nonce = self.nonce_generator.get();
self.last_ping_nonce = Some(nonce);
let ping = Ping::new(nonce);
PingContext::send_to_peer(self.context.clone(), self.info.id, &ping);
self.context.send_request(&ping);
}
fn on_message(&mut self, command: &Command, payload: &Bytes) -> Result<(), Error> {
if command == &Ping::command() {
let ping: Ping = try!(deserialize_payload(payload, self.info.version));
let ping: Ping = try!(deserialize_payload(payload, self.context.info().version));
let pong = Pong::new(ping.nonce);
PingContext::send_to_peer(self.context.clone(), self.info.id, &pong);
self.context.send_response_inline(&pong);
} else if command == &Pong::command() {
let pong: Pong = try!(deserialize_payload(payload, self.info.version));
let pong: Pong = try!(deserialize_payload(payload, self.context.info().version));
if Some(pong.nonce) != self.last_ping_nonce.take() {
return Err(Error::InvalidCommand)
}
@ -65,111 +50,3 @@ impl<T, C> Protocol for PingProtocol<T, C> where T: NonceGenerator + Send, C: Pi
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use parking_lot::Mutex;
use bytes::Bytes;
use message::{Payload, serialize_payload, Magic};
use message::types::{Ping, Pong};
use util::{PeerId, PeerInfo, Direction};
use util::nonce::StaticNonce;
use protocol::Protocol;
use super::{PingProtocol, PingContext};
#[derive(Default)]
struct TestPingContext {
version: u32,
messages: Mutex<Vec<(PeerId, Bytes)>>,
}
impl PingContext for TestPingContext {
fn send_to_peer<T>(context: Arc<Self>, peer: PeerId, payload: &T) where T: Payload {
let value = (peer, serialize_payload(payload, context.version).unwrap());
context.messages.lock().push(value);
}
}
#[test]
fn test_ping_init() {
let ping_context = Arc::new(TestPingContext::default());
let peer = 99;
let nonce = 1000;
let expected_message = serialize_payload(&Ping::new(nonce), 0).unwrap();
let mut ping_protocol = PingProtocol {
context: ping_context.clone(),
info: PeerInfo {
id: peer,
address: "0.0.0.0:8080".parse().unwrap(),
direction: Direction::Inbound,
version: 0,
magic: Magic::Testnet,
},
nonce_generator: StaticNonce::new(nonce),
last_ping_nonce: None,
};
ping_protocol.initialize();
let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].0, peer);
assert_eq!(messages[0].1, expected_message);
assert_eq!(ping_protocol.last_ping_nonce, Some(nonce));
}
#[test]
fn test_ping_on_message_ping() {
let ping_context = Arc::new(TestPingContext::default());
let peer = 99;
let nonce = 1000;
let command = "ping".into();
let message = serialize_payload(&Ping::new(nonce), 0).unwrap();
let expected_message = serialize_payload(&Pong::new(nonce), 0).unwrap();
let mut ping_protocol = PingProtocol {
context: ping_context.clone(),
info: PeerInfo {
id: peer,
address: "0.0.0.0:8080".parse().unwrap(),
direction: Direction::Inbound,
version: 0,
magic: Magic::Testnet,
},
nonce_generator: StaticNonce::new(nonce),
last_ping_nonce: None,
};
assert!(ping_protocol.on_message(&command, &message).is_ok());
let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].0, peer);
assert_eq!(messages[0].1, expected_message);
assert_eq!(ping_protocol.last_ping_nonce, None);
}
#[test]
fn test_ping_on_message_pong() {
let ping_context = Arc::new(TestPingContext::default());
let peer = 99;
let nonce = 1000;
let command = "pong".into();
let message = serialize_payload(&Pong::new(nonce), 0).unwrap();
let mut ping_protocol = PingProtocol {
context: ping_context.clone(),
info: PeerInfo {
id: peer,
address: "0.0.0.0:8080".parse().unwrap(),
direction: Direction::Inbound,
version: 0,
magic: Magic::Testnet,
},
nonce_generator: StaticNonce::new(nonce),
last_ping_nonce: Some(nonce),
};
assert!(ping_protocol.on_message(&command, &message).is_ok());
let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone();
assert_eq!(messages.len(), 0);
assert_eq!(ping_protocol.last_ping_nonce, None);
}
}

View File

@ -2,8 +2,7 @@ use std::sync::Arc;
use bytes::Bytes;
use message::{Command, Error, Payload, types, deserialize_payload};
use protocol::Protocol;
use util::{PeerInfo, PeerId};
use p2p::Context;
use net::PeerContext;
pub type InboundSyncConnectionRef = Box<InboundSyncConnection>;
pub type OutboundSyncConnectionRef = Box<OutboundSyncConnection>;
@ -19,13 +18,13 @@ pub trait InboundSyncConnection : Send + Sync {
fn start_sync_session(&self, version: u32);
fn close_session(&self);
fn on_inventory(&self, message: types::Inv);
fn on_getdata(&self, message: types::GetData);
fn on_getblocks(&self, message: types::GetBlocks);
fn on_getheaders(&self, message: types::GetHeaders);
fn on_getdata(&self, message: types::GetData, id: u32);
fn on_getblocks(&self, message: types::GetBlocks, id: u32);
fn on_getheaders(&self, message: types::GetHeaders, id: u32);
fn on_transaction(&self, message: types::Tx);
fn on_block(&self, message: types::Block);
fn on_headers(&self, message: types::Headers);
fn on_mempool(&self, message: types::MemPool);
fn on_mempool(&self, message: types::MemPool, id: u32);
fn on_filterload(&self, message: types::FilterLoad);
fn on_filteradd(&self, message: types::FilterAdd);
fn on_filterclear(&self, message: types::FilterClear);
@ -40,14 +39,14 @@ pub trait InboundSyncConnection : Send + Sync {
}
pub trait OutboundSyncConnection : Send + Sync {
fn send_inventory(&self, message: &types::Inv);
fn send_inventory(&self, message: &types::Inv, id: u32, is_final: bool);
fn send_getdata(&self, message: &types::GetData);
fn send_getblocks(&self, message: &types::GetBlocks);
fn send_getheaders(&self, message: &types::GetHeaders);
fn send_transaction(&self, message: &types::Tx);
fn send_block(&self, message: &types::Block);
fn send_headers(&self, message: &types::Headers);
fn send_mempool(&self, message: &types::MemPool);
fn send_block(&self, message: &types::Block, id: u32, is_final: bool);
fn send_headers(&self, message: &types::Headers, id: u32, is_final: bool);
fn send_mempool(&self, message: &types::MemPool, id: u32, is_final: bool);
fn send_filterload(&self, message: &types::FilterLoad);
fn send_filteradd(&self, message: &types::FilterAdd);
fn send_filterclear(&self, message: &types::FilterClear);
@ -58,206 +57,213 @@ pub trait OutboundSyncConnection : Send + Sync {
fn send_compact_block(&self, message: &types::CompactBlock);
fn send_get_block_txn(&self, message: &types::GetBlockTxn);
fn send_block_txn(&self, message: &types::BlockTxn);
fn send_notfound(&self, message: &types::NotFound);
fn send_notfound(&self, message: &types::NotFound, id: u32, is_final: bool);
fn ignored(&self, id: u32);
}
struct OutboundSync {
context: Arc<Context>,
peer: PeerId,
context: Arc<PeerContext>,
}
impl OutboundSync {
pub fn new(context: Arc<Context>, peer: PeerId) -> OutboundSync {
pub fn new(context: Arc<PeerContext>) -> OutboundSync {
OutboundSync {
context: context,
peer: peer,
}
}
pub fn send_message<T>(&self, message: &T) where T: Payload {
let send = Context::send_to_peer(self.context.clone(), self.peer, message);
self.context.spawn(send);
}
pub fn boxed(self) -> Box<OutboundSyncConnection> {
Box::new(self)
}
}
impl OutboundSyncConnection for OutboundSync {
fn send_inventory(&self, message: &types::Inv) {
self.send_message(message);
fn send_inventory(&self, message: &types::Inv, id: u32, is_final: bool) {
self.context.send_response(message, id, is_final);
}
fn send_getdata(&self, message: &types::GetData) {
self.send_message(message);
self.context.send_request(message);
}
fn send_getblocks(&self, message: &types::GetBlocks) {
self.send_message(message);
self.context.send_request(message);
}
fn send_getheaders(&self, message: &types::GetHeaders) {
self.send_message(message);
self.context.send_request(message);
}
fn send_transaction(&self, message: &types::Tx) {
self.send_message(message);
self.context.send_request(message);
}
fn send_block(&self, message: &types::Block) {
self.send_message(message);
fn send_block(&self, message: &types::Block, id: u32, is_final: bool) {
self.context.send_response(message, id, is_final);
}
fn send_headers(&self, message: &types::Headers) {
self.send_message(message);
fn send_headers(&self, message: &types::Headers, id: u32, is_final: bool) {
self.context.send_response(message, id, is_final);
}
fn send_mempool(&self, message: &types::MemPool) {
self.send_message(message);
fn send_mempool(&self, message: &types::MemPool, id: u32, is_final: bool) {
self.context.send_response(message, id, is_final);
}
fn send_filterload(&self, message: &types::FilterLoad) {
self.send_message(message);
self.context.send_request(message);
}
fn send_filteradd(&self, message: &types::FilterAdd) {
self.send_message(message);
self.context.send_request(message);
}
fn send_filterclear(&self, message: &types::FilterClear) {
self.send_message(message);
self.context.send_request(message);
}
fn send_merkleblock(&self, message: &types::MerkleBlock) {
self.send_message(message);
self.context.send_request(message);
}
fn send_sendheaders(&self, message: &types::SendHeaders) {
self.send_message(message);
self.context.send_request(message);
}
fn send_feefilter(&self, message: &types::FeeFilter) {
self.send_message(message);
self.context.send_request(message);
}
fn send_send_compact(&self, message: &types::SendCompact) {
self.send_message(message);
self.context.send_request(message);
}
fn send_compact_block(&self, message: &types::CompactBlock) {
self.send_message(message);
self.context.send_request(message);
}
fn send_get_block_txn(&self, message: &types::GetBlockTxn) {
self.send_message(message);
self.context.send_request(message);
}
fn send_block_txn(&self, message: &types::BlockTxn) {
self.send_message(message);
self.context.send_request(message);
}
fn send_notfound(&self, message: &types::NotFound) {
self.send_message(message);
fn send_notfound(&self, message: &types::NotFound, id: u32, is_final: bool) {
self.context.send_response(message, id, is_final);
}
fn ignored(&self, id: u32) {
self.context.ignore_response(id);
}
}
pub struct SyncProtocol {
inbound_connection: InboundSyncConnectionRef,
info: PeerInfo,
context: Arc<PeerContext>,
}
impl SyncProtocol {
pub fn new(context: Arc<Context>, info: PeerInfo) -> Self {
let outbound_connection = OutboundSync::new(context.clone(), info.id).boxed();
let inbound_connection = context.create_sync_session(0, outbound_connection);
pub fn new(context: Arc<PeerContext>) -> Self {
let outbound_connection = OutboundSync::new(context.clone()).boxed();
let inbound_connection = context.global().create_sync_session(0, outbound_connection);
SyncProtocol {
inbound_connection: inbound_connection,
info: info,
context: context,
}
}
}
impl Protocol for SyncProtocol {
fn initialize(&mut self) {
self.inbound_connection.start_sync_session(self.info.version);
self.inbound_connection.start_sync_session(self.context.info().version);
}
fn on_message(&mut self, command: &Command, payload: &Bytes) -> Result<(), Error> {
let version = self.context.info().version;
if command == &types::Inv::command() {
let message: types::Inv = try!(deserialize_payload(payload, self.info.version));
let message: types::Inv = try!(deserialize_payload(payload, version));
self.inbound_connection.on_inventory(message);
}
else if command == &types::GetData::command() {
let message: types::GetData = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_getdata(message);
let message: types::GetData = try!(deserialize_payload(payload, version));
let id = self.context.declare_response();
trace!("declared response {} for request: {}", id, types::GetData::command());
self.inbound_connection.on_getdata(message, id);
}
else if command == &types::GetBlocks::command() {
let message: types::GetBlocks = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_getblocks(message);
let message: types::GetBlocks = try!(deserialize_payload(payload, version));
let id = self.context.declare_response();
trace!("declared response {} for request: {}", id, types::GetBlocks::command());
self.inbound_connection.on_getblocks(message, id);
}
else if command == &types::GetHeaders::command() {
let message: types::GetHeaders = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_getheaders(message);
let message: types::GetHeaders = try!(deserialize_payload(payload, version));
let id = self.context.declare_response();
trace!("declared response {} for request: {}", id, types::GetHeaders::command());
self.inbound_connection.on_getheaders(message, id);
}
else if command == &types::Tx::command() {
let message: types::Tx = try!(deserialize_payload(payload, self.info.version));
let message: types::Tx = try!(deserialize_payload(payload, version));
self.inbound_connection.on_transaction(message);
}
else if command == &types::Block::command() {
let message: types::Block = try!(deserialize_payload(payload, self.info.version));
let message: types::Block = try!(deserialize_payload(payload, version));
self.inbound_connection.on_block(message);
}
else if command == &types::MemPool::command() {
let message: types::MemPool = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_mempool(message);
let message: types::MemPool = try!(deserialize_payload(payload, version));
let id = self.context.declare_response();
trace!("declared response {} for request: {}", id, types::MemPool::command());
self.inbound_connection.on_mempool(message, id);
}
else if command == &types::Headers::command() {
let message: types::Headers = try!(deserialize_payload(payload, self.info.version));
let message: types::Headers = try!(deserialize_payload(payload, version));
self.inbound_connection.on_headers(message);
}
else if command == &types::FilterLoad::command() {
let message: types::FilterLoad = try!(deserialize_payload(payload, self.info.version));
let message: types::FilterLoad = try!(deserialize_payload(payload, version));
self.inbound_connection.on_filterload(message);
}
else if command == &types::FilterAdd::command() {
let message: types::FilterAdd = try!(deserialize_payload(payload, self.info.version));
let message: types::FilterAdd = try!(deserialize_payload(payload, version));
self.inbound_connection.on_filteradd(message);
}
else if command == &types::FilterClear::command() {
let message: types::FilterClear = try!(deserialize_payload(payload, self.info.version));
let message: types::FilterClear = try!(deserialize_payload(payload, version));
self.inbound_connection.on_filterclear(message);
}
else if command == &types::MerkleBlock::command() {
let message: types::MerkleBlock = try!(deserialize_payload(payload, self.info.version));
let message: types::MerkleBlock = try!(deserialize_payload(payload, version));
self.inbound_connection.on_merkleblock(message);
}
else if command == &types::SendHeaders::command() {
let message: types::SendHeaders = try!(deserialize_payload(payload, self.info.version));
let message: types::SendHeaders = try!(deserialize_payload(payload, version));
self.inbound_connection.on_sendheaders(message);
}
else if command == &types::FeeFilter::command() {
let message: types::FeeFilter = try!(deserialize_payload(payload, self.info.version));
let message: types::FeeFilter = try!(deserialize_payload(payload, version));
self.inbound_connection.on_feefilter(message);
}
else if command == &types::SendCompact::command() {
let message: types::SendCompact = try!(deserialize_payload(payload, self.info.version));
let message: types::SendCompact = try!(deserialize_payload(payload, version));
self.inbound_connection.on_send_compact(message);
}
else if command == &types::CompactBlock::command() {
let message: types::CompactBlock = try!(deserialize_payload(payload, self.info.version));
let message: types::CompactBlock = try!(deserialize_payload(payload, version));
self.inbound_connection.on_compact_block(message);
}
else if command == &types::GetBlockTxn::command() {
let message: types::GetBlockTxn = try!(deserialize_payload(payload, self.info.version));
let message: types::GetBlockTxn = try!(deserialize_payload(payload, version));
self.inbound_connection.on_get_block_txn(message);
}
else if command == &types::BlockTxn::command() {
let message: types::BlockTxn = try!(deserialize_payload(payload, self.info.version));
let message: types::BlockTxn = try!(deserialize_payload(payload, version));
self.inbound_connection.on_block_txn(message);
}
else if command == &types::NotFound::command() {
let message: types::NotFound = try!(deserialize_payload(payload, self.info.version));
let message: types::NotFound = try!(deserialize_payload(payload, version));
self.inbound_connection.on_notfound(message);
}
Ok(())

View File

@ -3,42 +3,47 @@ use parking_lot::Mutex;
use bytes::Bytes;
use message::{Command, Error};
use p2p::Context;
use net::PeerContext;
use protocol::{Protocol, PingProtocol, SyncProtocol, AddrProtocol, SeednodeProtocol};
use util::{PeerInfo};
use util::PeerInfo;
pub trait SessionFactory {
fn new_session(context: Arc<Context>, info: PeerInfo) -> Session;
fn new_session(context: Arc<Context>, info: PeerInfo, synchronous: bool) -> Session;
}
pub struct SeednodeSessionFactory;
impl SessionFactory for SeednodeSessionFactory {
fn new_session(context: Arc<Context>, info: PeerInfo) -> Session {
let ping = PingProtocol::new(context.clone(), info.clone()).boxed();
let addr = AddrProtocol::new(context.clone(), info.clone()).boxed();
let seed = SeednodeProtocol::new(context.clone(), info).boxed();
Session::new(vec![ping, addr, seed])
fn new_session(context: Arc<Context>, info: PeerInfo, synchronous: bool) -> Session {
let peer_context = Arc::new(PeerContext::new(context, info, synchronous));
let ping = PingProtocol::new(peer_context.clone()).boxed();
let addr = AddrProtocol::new(peer_context.clone()).boxed();
let seed = SeednodeProtocol::new(peer_context.clone()).boxed();
Session::new(peer_context, vec![ping, addr, seed])
}
}
pub struct NormalSessionFactory;
impl SessionFactory for NormalSessionFactory {
fn new_session(context: Arc<Context>, info: PeerInfo) -> Session {
let ping = PingProtocol::new(context.clone(), info.clone()).boxed();
let addr = AddrProtocol::new(context.clone(), info.clone()).boxed();
let sync = SyncProtocol::new(context, info).boxed();
Session::new(vec![ping, addr, sync])
fn new_session(context: Arc<Context>, info: PeerInfo, synchronous: bool) -> Session {
let peer_context = Arc::new(PeerContext::new(context, info, synchronous));
let ping = PingProtocol::new(peer_context.clone()).boxed();
let addr = AddrProtocol::new(peer_context.clone()).boxed();
let sync = SyncProtocol::new(peer_context.clone()).boxed();
Session::new(peer_context, vec![ping, addr, sync])
}
}
pub struct Session {
_peer_context: Arc<PeerContext>,
protocols: Mutex<Vec<Box<Protocol>>>,
}
impl Session {
pub fn new(protocols: Vec<Box<Protocol>>) -> Self {
pub fn new(peer_context: Arc<PeerContext>, protocols: Vec<Box<Protocol>>) -> Self {
Session {
_peer_context: peer_context,
protocols: Mutex::new(protocols),
}
}

View File

@ -2,8 +2,10 @@ pub mod nonce;
pub mod time;
mod node_table;
mod peer;
mod response_queue;
mod synchronizer;
pub use self::node_table::{NodeTable, Node};
pub use self::peer::{PeerId, PeerInfo, Direction};
pub use self::response_queue::{ResponseQueue, Responses};
pub use self::synchronizer::{Synchronizer, ConfigurableSynchronizer};

View File

@ -0,0 +1,45 @@
use std::collections::{HashMap, HashSet};
use bytes::Bytes;
/// Queue of out-of-order responses. Each peer has it's own queue.
#[derive(Debug, Default)]
pub struct ResponseQueue {
unfinished: HashMap<u32, Vec<Bytes>>,
finished: HashMap<u32, Vec<Bytes>>,
ignored: HashSet<u32>,
}
pub enum Responses {
Unfinished(Vec<Bytes>),
Finished(Vec<Bytes>),
Ignored,
}
impl ResponseQueue {
pub fn push_unfinished_response(&mut self, id: u32, response: Bytes) {
self.unfinished.entry(id).or_insert_with(Vec::new).push(response)
}
pub fn push_finished_response(&mut self, id: u32, response: Bytes) {
let mut responses = self.unfinished.remove(&id).unwrap_or_default();
responses.push(response);
let previous = self.finished.insert(id, responses);
assert!(previous.is_none(), "logic error; same finished response should never be pushed twice");
}
pub fn push_ignored_response(&mut self, id: u32) {
assert!(self.ignored.insert(id), "logic error; same response should never be ignored twice");
}
pub fn responses(&mut self, id: u32) -> Option<Responses> {
self.unfinished.remove(&id).map(Responses::Unfinished)
.or_else(|| self.finished.remove(&id).map(Responses::Finished))
.or_else(|| {
if self.ignored.remove(&id) {
Some(Responses::Ignored)
} else {
None
}
})
}
}

View File

@ -11,6 +11,9 @@ pub trait Synchronizer: Send {
/// Declare sending response in future.
fn declare_response(&mut self) -> u32;
/// Returns true if permission for response is granted, but without marking response as sent.
fn is_permitted(&self, id: u32) -> bool;
/// Returns true if permission for sending response is granted.
fn permission_for_response(&mut self, id: u32) -> bool;
}
@ -29,6 +32,10 @@ impl Synchronizer for FifoSynchronizer {
result
}
fn is_permitted(&self, id: u32) -> bool {
id == self.next_to_grant
}
fn permission_for_response(&mut self, id: u32) -> bool {
// there should be an assertion here, assert!(id < self.declared_responses),
// but it's impossible to write an assertion if the value may overflow
@ -54,6 +61,10 @@ impl Synchronizer for NoopSynchronizer {
result
}
fn is_permitted(&self, _id: u32) -> bool {
true
}
fn permission_for_response(&mut self, _id: u32) -> bool {
true
}
@ -81,18 +92,8 @@ impl ThresholdSynchronizer {
to_grant_max: declared,
}
}
}
impl Synchronizer for ThresholdSynchronizer {
fn declare_response(&mut self) -> u32 {
self.inner.declare_response()
}
fn permission_for_response(&mut self, id: u32) -> bool {
if self.inner.permission_for_response(id) {
return true;
}
fn within_threshold(&self, id: u32) -> bool {
if self.to_grant_min <= self.to_grant_max {
// if max is bigger then min, id must be in range [min, max)
self.to_grant_min <= id && id < self.to_grant_max
@ -104,6 +105,20 @@ impl Synchronizer for ThresholdSynchronizer {
}
}
impl Synchronizer for ThresholdSynchronizer {
fn declare_response(&mut self) -> u32 {
self.inner.declare_response()
}
fn is_permitted(&self, id: u32) -> bool {
self.inner.is_permitted(id) || self.within_threshold(id)
}
fn permission_for_response(&mut self, id: u32) -> bool {
self.inner.permission_for_response(id) || self.within_threshold(id)
}
}
#[derive(Debug)]
enum InnerSynchronizer {
Noop(NoopSynchronizer),
@ -170,6 +185,13 @@ impl Synchronizer for ConfigurableSynchronizer {
}
}
fn is_permitted(&self, id: u32) -> bool {
match self.inner {
InnerSynchronizer::Noop(ref s) => s.is_permitted(id),
InnerSynchronizer::Threshold(ref s) => s.is_permitted(id),
}
}
fn permission_for_response(&mut self, id: u32) -> bool {
match self.inner {
InnerSynchronizer::Threshold(ref mut s) => s.permission_for_response(id),

View File

@ -29,16 +29,16 @@ impl InboundSyncConnection for InboundConnection {
self.local_node.on_peer_inventory(self.peer_index, message);
}
fn on_getdata(&self, message: types::GetData) {
self.local_node.on_peer_getdata(self.peer_index, message);
fn on_getdata(&self, message: types::GetData, id: u32) {
self.local_node.on_peer_getdata(self.peer_index, message, id);
}
fn on_getblocks(&self, message: types::GetBlocks) {
self.local_node.on_peer_getblocks(self.peer_index, message);
fn on_getblocks(&self, message: types::GetBlocks, id: u32) {
self.local_node.on_peer_getblocks(self.peer_index, message, id);
}
fn on_getheaders(&self, message: types::GetHeaders) {
self.local_node.on_peer_getheaders(self.peer_index, message);
fn on_getheaders(&self, message: types::GetHeaders, id: u32) {
self.local_node.on_peer_getheaders(self.peer_index, message, id);
}
fn on_transaction(&self, message: types::Tx) {
@ -53,8 +53,8 @@ impl InboundSyncConnection for InboundConnection {
self.local_node.on_peer_headers(self.peer_index, message);
}
fn on_mempool(&self, message: types::MemPool) {
self.local_node.on_peer_mempool(self.peer_index, message);
fn on_mempool(&self, message: types::MemPool, id: u32) {
self.local_node.on_peer_mempool(self.peer_index, message, id);
}
fn on_filterload(&self, message: types::FilterLoad) {

View File

@ -93,23 +93,24 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
// TODO: process unknown transactions, etc...
}
pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData) {
pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData, id: u32) {
trace!(target: "sync", "Got `getdata` message from peer#{}", peer_index);
self.server.serve_getdata(peer_index, message);
self.server.serve_getdata(peer_index, message, id);
}
pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32) {
trace!(target: "sync", "Got `getblocks` message from peer#{}", peer_index);
self.server.serve_getblocks(peer_index, message);
self.server.serve_getblocks(peer_index, message, id);
}
pub fn on_peer_getheaders(&self, peer_index: usize, message: types::GetHeaders) {
pub fn on_peer_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32) {
trace!(target: "sync", "Got `getheaders` message from peer#{}", peer_index);
// do not serve getheaders requests until we are synchronized
if self.client.lock().state().is_synchronizing() {
self.executor.lock().execute(SynchronizationTask::Ignore(peer_index, id));
return;
}
@ -124,7 +125,7 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
need_wait
};
self.server.serve_getheaders(peer_index, message);
self.server.serve_getheaders(peer_index, message, id);
if need_wait {
self.server.wait_peer_requests_completed(peer_index);
}
@ -149,10 +150,10 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
}
}
pub fn on_peer_mempool(&self, peer_index: usize, _message: types::MemPool) {
pub fn on_peer_mempool(&self, peer_index: usize, _message: types::MemPool, id: u32) {
trace!(target: "sync", "Got `mempool` message from peer#{}", peer_index);
self.server.serve_mempool(peer_index);
self.server.serve_mempool(peer_index, id);
}
pub fn on_peer_filterload(&self, peer_index: usize, _message: types::FilterLoad) {
@ -238,14 +239,14 @@ mod tests {
}
impl OutboundSyncConnection for DummyOutboundSyncConnection {
fn send_inventory(&self, _message: &types::Inv) {}
fn send_inventory(&self, _message: &types::Inv, _id: u32, _is_final: bool) {}
fn send_getdata(&self, _message: &types::GetData) {}
fn send_getblocks(&self, _message: &types::GetBlocks) {}
fn send_getheaders(&self, _message: &types::GetHeaders) {}
fn send_transaction(&self, _message: &types::Tx) {}
fn send_block(&self, _message: &types::Block) {}
fn send_headers(&self, _message: &types::Headers) {}
fn send_mempool(&self, _message: &types::MemPool) {}
fn send_block(&self, _message: &types::Block, _id: u32, _is_final: bool) {}
fn send_headers(&self, _message: &types::Headers, _id: u32, _is_final: bool) {}
fn send_mempool(&self, _message: &types::MemPool, _id: u32, _is_final: bool) {}
fn send_filterload(&self, _message: &types::FilterLoad) {}
fn send_filteradd(&self, _message: &types::FilterAdd) {}
fn send_filterclear(&self, _message: &types::FilterClear) {}
@ -256,7 +257,8 @@ mod tests {
fn send_compact_block(&self, _message: &types::CompactBlock) {}
fn send_get_block_txn(&self, _message: &types::GetBlockTxn) {}
fn send_block_txn(&self, _message: &types::BlockTxn) {}
fn send_notfound(&self, _message: &types::NotFound) {}
fn send_notfound(&self, _message: &types::NotFound, _id: u32, _is_final: bool) {}
fn ignored(&self, _id: u32) {}
}
fn create_local_node() -> (Core, Handle, Arc<Mutex<DummyTaskExecutor>>, Arc<DummyServer>, LocalNode<DummyTaskExecutor, DummyServer, SynchronizationClient<DummyTaskExecutor>>) {
@ -294,9 +296,10 @@ mod tests {
hash: genesis_block_hash.clone(),
}
];
let dummy_id = 0;
local_node.on_peer_getdata(peer_index, types::GetData {
inventory: inventory.clone()
});
}, dummy_id);
// => `getdata` is served
let tasks = server.take_tasks();
assert_eq!(tasks, vec![(peer_index, ServerTask::ServeGetData(inventory))]);

View File

@ -7,6 +7,7 @@ use message::types;
use primitives::hash::H256;
use p2p::OutboundSyncConnectionRef;
use synchronization_chain::ChainRef;
use synchronization_server::ServerTaskIndex;
use local_node::PeersConnections;
pub type LocalSynchronizationTaskExecutorRef = Arc<Mutex<LocalSynchronizationTaskExecutor>>;
@ -24,13 +25,15 @@ pub enum Task {
/// Request blocks headers using full getheaders.block_locator_hashes.
RequestBlocksHeaders(usize),
/// Send block.
SendBlock(usize, Block),
SendBlock(usize, Block, ServerTaskIndex),
/// Send notfound
SendNotFound(usize, Vec<InventoryVector>),
SendNotFound(usize, Vec<InventoryVector>, ServerTaskIndex),
/// Send inventory
SendInventory(usize, Vec<InventoryVector>),
SendInventory(usize, Vec<InventoryVector>, ServerTaskIndex),
/// Send headers
SendHeaders(usize, Vec<BlockHeader>),
SendHeaders(usize, Vec<BlockHeader>, ServerTaskIndex),
/// Notify io about ignored request
Ignore(usize, u32),
}
/// Synchronization tasks executor
@ -75,7 +78,6 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
let connection = &mut *connection;
trace!(target: "sync", "Querying {} unknown blocks from peer#{}", getdata.inventory.len(), peer_index);
connection.send_getdata(&getdata);
}
@ -89,53 +91,54 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
let connection = &mut *connection;
trace!(target: "sync", "Request blocks hashes from peer#{} using getheaders", peer_index);
connection.send_getheaders(&getheaders);
}
},
Task::SendBlock(peer_index, block) => {
Task::SendBlock(peer_index, block, id) => {
let block_message = types::Block {
block: block,
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
let connection = &mut *connection;
trace!(target: "sync", "Sending block {:?} to peer#{}", block_message.block.hash(), peer_index);
connection.send_block(&block_message);
connection.send_block(&block_message, id.raw(), id.is_final());
}
},
Task::SendNotFound(peer_index, unknown_inventory) => {
Task::SendNotFound(peer_index, unknown_inventory, id) => {
let notfound = types::NotFound {
inventory: unknown_inventory,
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
let connection = &mut *connection;
trace!(target: "sync", "Sending notfound to peer#{} with {} items", peer_index, notfound.inventory.len());
connection.send_notfound(&notfound);
connection.send_notfound(&notfound, id.raw(), id.is_final());
}
},
Task::SendInventory(peer_index, inventory) => {
Task::SendInventory(peer_index, inventory, id) => {
let inventory = types::Inv {
inventory: inventory,
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
let connection = &mut *connection;
trace!(target: "sync", "Sending inventory to peer#{} with {} items", peer_index, inventory.inventory.len());
connection.send_inventory(&inventory);
connection.send_inventory(&inventory, id.raw(), id.is_final());
}
},
Task::SendHeaders(peer_index, headers) => {
Task::SendHeaders(peer_index, headers, id) => {
let headers = types::Headers {
headers: headers,
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
let connection = &mut *connection;
trace!(target: "sync", "Sending headers to peer#{} with {} items", peer_index, headers.headers.len());
connection.send_headers(&headers);
connection.send_headers(&headers, id.raw(), id.is_final());
}
},
Task::Ignore(peer_index, id) => {
if let Some(connection) = self.peers.get_mut(&peer_index) {
trace!(target: "sync", "Ignoring request from peer#{} with id {}", peer_index, id);
connection.ignored(id);
}
},
}
@ -194,4 +197,4 @@ pub mod tests {
self.waiter.notify_one();
}
}
}
}

View File

@ -14,10 +14,10 @@ use message::types;
/// Synchronization requests server trait
pub trait Server : Send + 'static {
fn serve_getdata(&self, peer_index: usize, message: types::GetData);
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks);
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders);
fn serve_mempool(&self, peer_index: usize);
fn serve_getdata(&self, peer_index: usize, message: types::GetData, id: u32);
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32);
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32);
fn serve_mempool(&self, peer_index: usize, id: u32);
fn wait_peer_requests_completed(&self, peer_index: usize);
}
@ -42,10 +42,59 @@ struct ServerQueue {
is_stopping: AtomicBool,
queue_ready: Arc<Condvar>,
peers_queue: VecDeque<usize>,
tasks_queue: HashMap<usize, VecDeque<ServerTask>>,
tasks_queue: HashMap<usize, VecDeque<IndexedServerTask>>,
peer_waiters: HashMap<usize, Arc<PeerRequestsWaiter>>,
}
/// `ServerTask` index.
#[derive(Debug, PartialEq)]
pub enum ServerTaskIndex {
/// `Partial` is used when server needs to send more than one response for request.
Partial(u32),
/// `Final` task task can be preceded by many `Partial` tasks with the same id.
Final(u32),
}
impl ServerTaskIndex {
pub fn raw(&self) -> u32 {
match *self {
ServerTaskIndex::Partial(id) => id,
ServerTaskIndex::Final(id) => id,
}
}
pub fn is_final(&self) -> bool {
match *self {
ServerTaskIndex::Partial(_) => false,
ServerTaskIndex::Final(_) => true,
}
}
}
/// Server tests together with unique id assigned to it
#[derive(Debug, PartialEq)]
pub struct IndexedServerTask {
/// Task itself.
task: ServerTask,
/// Task id.
id: ServerTaskIndex,
}
impl IndexedServerTask {
fn new(task: ServerTask, id: ServerTaskIndex) -> Self {
IndexedServerTask {
task: task,
id: id,
}
}
}
impl IndexedServerTask {
fn ignore(id: u32) -> Self {
IndexedServerTask::new(ServerTask::Ignore, ServerTaskIndex::Final(id))
}
}
#[derive(Debug, PartialEq)]
pub enum ServerTask {
ServeGetData(Vec<InventoryVector>),
@ -54,6 +103,7 @@ pub enum ServerTask {
ServeMempool,
ReturnNotFound(Vec<InventoryVector>),
ReturnBlock(H256),
Ignore,
}
impl SynchronizationServer {
@ -97,11 +147,18 @@ impl SynchronizationServer {
})
};
match server_task {
// `getdata` => `notfound` + `block` + ...
Some((peer_index, ServerTask::ServeGetData(inventory))) => {
let (peer_index, indexed_task) = match server_task {
Some((peer_index, indexed_task)) => (peer_index, indexed_task),
// no tasks after wake-up => stopping or pausing
_ => continue,
};
match indexed_task.task {
// `getdata` => `notfound` + `block` + ...
ServerTask::ServeGetData(inventory) => {
let mut unknown_items: Vec<InventoryVector> = Vec::new();
let mut new_tasks: Vec<ServerTask> = Vec::new();
let mut new_tasks: Vec<IndexedServerTask> = Vec::new();
let task_id = indexed_task.id.raw();
{
let chain = chain.read();
let storage = chain.storage();
@ -109,7 +166,10 @@ impl SynchronizationServer {
match item.inv_type {
InventoryType::MessageBlock => {
match storage.block_number(&item.hash) {
Some(_) => new_tasks.push(ServerTask::ReturnBlock(item.hash.clone())),
Some(_) => {
let task = IndexedServerTask::new(ServerTask::ReturnBlock(item.hash.clone()), ServerTaskIndex::Partial(task_id));
new_tasks.push(task);
},
None => unknown_items.push(item),
}
},
@ -120,18 +180,25 @@ impl SynchronizationServer {
// respond with `notfound` message for unknown data
if !unknown_items.is_empty() {
trace!(target: "sync", "Going to respond with notfound with {} items to peer#{}", unknown_items.len(), peer_index);
new_tasks.push(ServerTask::ReturnNotFound(unknown_items));
let task = IndexedServerTask::new(ServerTask::ReturnNotFound(unknown_items), ServerTaskIndex::Partial(task_id));
new_tasks.push(task);
}
// schedule data responses
if !new_tasks.is_empty() {
trace!(target: "sync", "Going to respond with data with {} items to peer#{}", new_tasks.len(), peer_index);
// mark last task as the final one
if let Some(task) = new_tasks.last_mut() {
task.id = ServerTaskIndex::Final(task_id);
}
queue.lock().add_tasks(peer_index, new_tasks);
} else {
executor.lock().execute(Task::Ignore(peer_index, task_id));
}
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
// `getblocks` => `inventory`
Some((peer_index, ServerTask::ServeGetBlocks(best_block, hash_stop))) => {
ServerTask::ServeGetBlocks(best_block, hash_stop) => {
let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_block, &hash_stop, 500);
if !blocks_hashes.is_empty() {
trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", blocks_hashes.len(), peer_index);
@ -139,25 +206,29 @@ impl SynchronizationServer {
inv_type: InventoryType::MessageBlock,
hash: hash,
}).collect();
executor.lock().execute(Task::SendInventory(peer_index, inventory));
executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id));
} else {
executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw()));
}
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
// `getheaders` => `headers`
Some((peer_index, ServerTask::ServeGetHeaders(best_block, hash_stop))) => {
ServerTask::ServeGetHeaders(best_block, hash_stop) => {
// What if we have no common blocks with peer at all? Maybe drop connection or penalize peer?
// https://github.com/ethcore/parity-bitcoin/pull/91#discussion_r86734568
let blocks_headers = SynchronizationServer::blocks_headers_after(&chain, &best_block, &hash_stop, 2000);
if !blocks_headers.is_empty() {
trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_headers.len(), peer_index);
executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers));
executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers, indexed_task.id));
} else {
executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw()));
}
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
// `mempool` => `inventory`
Some((peer_index, ServerTask::ServeMempool)) => {
ServerTask::ServeMempool => {
let inventory: Vec<_> = chain.read()
.memory_pool()
.get_transactions_ids()
@ -169,27 +240,32 @@ impl SynchronizationServer {
.collect();
if !inventory.is_empty() {
trace!(target: "sync", "Going to respond with {} memory-pool transactions ids to peer#{}", inventory.len(), peer_index);
executor.lock().execute(Task::SendInventory(peer_index, inventory));
executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id));
} else {
executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw()));
}
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
// `notfound`
Some((peer_index, ServerTask::ReturnNotFound(inventory))) => {
executor.lock().execute(Task::SendNotFound(peer_index, inventory));
ServerTask::ReturnNotFound(inventory) => {
executor.lock().execute(Task::SendNotFound(peer_index, inventory, indexed_task.id));
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
// `block`
Some((peer_index, ServerTask::ReturnBlock(block_hash))) => {
ServerTask::ReturnBlock(block_hash) => {
let block = chain.read().storage().block(db::BlockRef::Hash(block_hash))
.expect("we have checked that block exists in ServeGetData; db is append-only; qed");
executor.lock().execute(Task::SendBlock(peer_index, block));
executor.lock().execute(Task::SendBlock(peer_index, block, indexed_task.id));
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
// no tasks after wake-up => stopping or pausing
None => (),
// ignore
ServerTask::Ignore => {
executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw()));
queue.lock().task_processed(peer_index);
},
}
}
}
@ -273,32 +349,38 @@ impl Drop for SynchronizationServer {
}
impl Server for SynchronizationServer {
fn serve_getdata(&self, peer_index: usize, message: types::GetData) {
self.queue.lock().add_task(peer_index, ServerTask::ServeGetData(message.inventory));
fn serve_getdata(&self, peer_index: usize, message: types::GetData, id: u32) {
let task = IndexedServerTask::new(ServerTask::ServeGetData(message.inventory), ServerTaskIndex::Final(id));
self.queue.lock().add_task(peer_index, task);
}
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32) {
if let Some(best_common_block) = self.locate_known_block_hash(message.block_locator_hashes) {
trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
self.queue.lock().add_task(peer_index, ServerTask::ServeGetBlocks(best_common_block, message.hash_stop));
let task = IndexedServerTask::new(ServerTask::ServeGetBlocks(best_common_block, message.hash_stop), ServerTaskIndex::Final(id));
self.queue.lock().add_task(peer_index, task);
}
else {
trace!(target: "sync", "No common blocks with peer#{}", peer_index);
self.queue.lock().add_task(peer_index, IndexedServerTask::ignore(id));
}
}
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders) {
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32) {
if let Some(best_common_block) = self.locate_known_block_header(message.block_locator_hashes) {
trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
self.queue.lock().add_task(peer_index, ServerTask::ServeGetHeaders(best_common_block, message.hash_stop));
let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(best_common_block, message.hash_stop), ServerTaskIndex::Final(id));
self.queue.lock().add_task(peer_index, task);
}
else {
trace!(target: "sync", "No common blocks headers with peer#{}", peer_index);
self.queue.lock().add_task(peer_index, IndexedServerTask::ignore(id));
}
}
fn serve_mempool(&self, peer_index: usize) {
self.queue.lock().add_task(peer_index, ServerTask::ServeMempool);
fn serve_mempool(&self, peer_index: usize, id: u32) {
let task = IndexedServerTask::new(ServerTask::ServeMempool, ServerTaskIndex::Final(id));
self.queue.lock().add_task(peer_index, task);
}
fn wait_peer_requests_completed(&self, peer_index: usize) {
@ -322,7 +404,7 @@ impl ServerQueue {
}
}
pub fn next_task(&mut self) -> Option<(usize, ServerTask)> {
pub fn next_task(&mut self) -> Option<(usize, IndexedServerTask)> {
self.peers_queue.pop_front()
.map(|peer| {
let (peer_task, no_tasks_left) = {
@ -353,7 +435,7 @@ impl ServerQueue {
}
}
pub fn add_task(&mut self, peer_index: usize, task: ServerTask) {
pub fn add_task(&mut self, peer_index: usize, task: IndexedServerTask) {
match self.tasks_queue.entry(peer_index) {
Entry::Occupied(mut entry) => {
let add_to_peers_queue = entry.get().is_empty();
@ -372,7 +454,7 @@ impl ServerQueue {
self.queue_ready.notify_one();
}
pub fn add_tasks(&mut self, peer_index: usize, tasks: Vec<ServerTask>) {
pub fn add_tasks(&mut self, peer_index: usize, tasks: Vec<IndexedServerTask>) {
match self.tasks_queue.entry(peer_index) {
Entry::Occupied(mut entry) => {
let add_to_peers_queue = entry.get().is_empty();
@ -442,7 +524,7 @@ pub mod tests {
use synchronization_executor::Task;
use synchronization_executor::tests::DummyTaskExecutor;
use synchronization_chain::Chain;
use super::{Server, ServerTask, SynchronizationServer};
use super::{Server, ServerTask, SynchronizationServer, ServerTaskIndex};
pub struct DummyServer {
tasks: Mutex<Vec<(usize, ServerTask)>>,
@ -461,25 +543,25 @@ pub mod tests {
}
impl Server for DummyServer {
fn serve_getdata(&self, peer_index: usize, message: types::GetData) {
fn serve_getdata(&self, peer_index: usize, message: types::GetData, _id: u32) {
self.tasks.lock().push((peer_index, ServerTask::ServeGetData(message.inventory)));
}
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, _id: u32) {
self.tasks.lock().push((peer_index, ServerTask::ServeGetBlocks(db::BestBlock {
number: 0,
hash: message.block_locator_hashes[0].clone(),
}, message.hash_stop)));
}
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders) {
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, _id: u32) {
self.tasks.lock().push((peer_index, ServerTask::ServeGetHeaders(db::BestBlock {
number: 0,
hash: message.block_locator_hashes[0].clone(),
}, message.hash_stop)));
}
fn serve_mempool(&self, peer_index: usize) {
fn serve_mempool(&self, peer_index: usize, _id: u32) {
self.tasks.lock().push((peer_index, ServerTask::ServeMempool));
}
@ -504,12 +586,13 @@ pub mod tests {
hash: H256::default(),
}
];
let dummy_id = 0;
server.serve_getdata(0, types::GetData {
inventory: inventory.clone(),
});
}, dummy_id);
// => respond with notfound
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory)]);
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::Final(dummy_id))]);
}
#[test]
@ -522,12 +605,13 @@ pub mod tests {
hash: test_data::genesis().hash(),
}
];
let dummy_id = 0;
server.serve_getdata(0, types::GetData {
inventory: inventory.clone(),
});
}, dummy_id);
// => respond with block
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis())]);
assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis(), ServerTaskIndex::Final(dummy_id))]);
}
#[test]
@ -535,14 +619,15 @@ pub mod tests {
let (_, executor, server) = create_synchronization_server();
// when asking for blocks hashes
let genesis_block_hash = test_data::genesis().hash();
let dummy_id = 5;
server.serve_getblocks(0, types::GetBlocks {
version: 0,
block_locator_hashes: vec![genesis_block_hash.clone()],
hash_stop: H256::default(),
});
}, dummy_id);
// => no response
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
assert_eq!(tasks, vec![]);
assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]);
}
#[test]
@ -550,18 +635,19 @@ pub mod tests {
let (chain, executor, server) = create_synchronization_server();
chain.write().insert_best_block(test_data::block_h1().hash(), test_data::block_h1()).expect("Db write error");
// when asking for blocks hashes
let dummy_id = 0;
server.serve_getblocks(0, types::GetBlocks {
version: 0,
block_locator_hashes: vec![test_data::genesis().hash()],
hash_stop: H256::default(),
});
}, dummy_id);
// => responds with inventory
let inventory = vec![InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: test_data::block_h1().hash(),
}];
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::Final(dummy_id))]);
}
#[test]
@ -569,14 +655,15 @@ pub mod tests {
let (_, executor, server) = create_synchronization_server();
// when asking for blocks hashes
let genesis_block_hash = test_data::genesis().hash();
let dummy_id = 6;
server.serve_getheaders(0, types::GetHeaders {
version: 0,
block_locator_hashes: vec![genesis_block_hash.clone()],
hash_stop: H256::default(),
});
}, dummy_id);
// => no response
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
assert_eq!(tasks, vec![]);
assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]);
}
#[test]
@ -584,27 +671,29 @@ pub mod tests {
let (chain, executor, server) = create_synchronization_server();
chain.write().insert_best_block(test_data::block_h1().hash(), test_data::block_h1()).expect("Db write error");
// when asking for blocks hashes
let dummy_id = 0;
server.serve_getheaders(0, types::GetHeaders {
version: 0,
block_locator_hashes: vec![test_data::genesis().hash()],
hash_stop: H256::default(),
});
}, dummy_id);
// => responds with headers
let headers = vec![
test_data::block_h1().block_header,
];
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendHeaders(0, headers)]);
assert_eq!(tasks, vec![Task::SendHeaders(0, headers, ServerTaskIndex::Final(dummy_id))]);
}
#[test]
fn server_mempool_do_not_responds_inventory_when_empty_memory_pool() {
let (_, executor, server) = create_synchronization_server();
// when asking for memory pool transactions ids
server.serve_mempool(0);
let dummy_id = 9;
server.serve_mempool(0, dummy_id);
// => no response
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
assert_eq!(tasks, vec![]);
assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]);
}
#[test]
@ -615,13 +704,14 @@ pub mod tests {
let transaction_hash = transaction.hash();
chain.write().memory_pool_mut().insert_verified(transaction);
// when asking for memory pool transactions ids
server.serve_mempool(0);
let dummy_id = 0;
server.serve_mempool(0, dummy_id);
// => respond with inventory
let inventory = vec![InventoryVector {
inv_type: InventoryType::MessageTx,
hash: transaction_hash,
}];
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::Final(dummy_id))]);
}
}