Merge branch 'master' into h256-logging
This commit is contained in:
commit
af687bbbf9
|
@ -33,3 +33,9 @@ impl<T> AsRef<[u8]> for Message<T> {
|
|||
self.bytes.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<Message<T>> for Bytes {
|
||||
fn from(m: Message<T>) -> Self {
|
||||
m.bytes.into_raw()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use message::{Payload, Message};
|
||||
use tokio_core::io::{write_all, WriteAll};
|
||||
use session::Session;
|
||||
use io::{SharedTcpStream, WriteMessage, write_message, read_any_message, ReadAnyMessage};
|
||||
use io::{SharedTcpStream, read_any_message, ReadAnyMessage};
|
||||
use util::PeerInfo;
|
||||
|
||||
pub struct Channel {
|
||||
|
@ -18,10 +18,8 @@ impl Channel {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn write_message<T>(&self, payload: &T) -> WriteMessage<T, SharedTcpStream> where T: Payload {
|
||||
// TODO: some tracing here
|
||||
let message = Message::new(self.peer_info.magic, self.peer_info.version, payload).expect("failed to create outgoing message");
|
||||
write_message(self.stream.clone(), message)
|
||||
pub fn write_message<T>(&self, message: T) -> WriteAll<SharedTcpStream, T> where T: AsRef<[u8]> {
|
||||
write_all(self.stream.clone(), message)
|
||||
}
|
||||
|
||||
pub fn read_message(&self) -> ReadAnyMessage<SharedTcpStream> {
|
||||
|
|
|
@ -9,6 +9,8 @@ use session::{SessionFactory};
|
|||
use util::{Direction, PeerInfo};
|
||||
use PeerId;
|
||||
|
||||
const SYNCHRONOUS_RESPONSES: bool = true;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Connections {
|
||||
/// Incremental peer counter.
|
||||
|
@ -51,7 +53,7 @@ impl Connections {
|
|||
magic: connection.magic,
|
||||
};
|
||||
|
||||
let session = T::new_session(context, peer_info.clone());
|
||||
let session = T::new_session(context, peer_info.clone(), SYNCHRONOUS_RESPONSES);
|
||||
let channel = Arc::new(Channel::new(connection.stream, peer_info, session));
|
||||
self.channels.write().insert(id, channel.clone());
|
||||
channel
|
||||
|
|
|
@ -5,6 +5,7 @@ mod connect;
|
|||
mod connection;
|
||||
mod connection_counter;
|
||||
mod connections;
|
||||
mod peer_context;
|
||||
|
||||
pub use self::accept_connection::{AcceptConnection, accept_connection};
|
||||
pub use self::channel::Channel;
|
||||
|
@ -13,3 +14,4 @@ pub use self::connect::{Connect, connect};
|
|||
pub use self::connection::Connection;
|
||||
pub use self::connection_counter::ConnectionCounter;
|
||||
pub use self::connections::Connections;
|
||||
pub use self::peer_context::PeerContext;
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
use std::sync::Arc;
|
||||
use parking_lot::Mutex;
|
||||
use message::{Payload, Message};
|
||||
use p2p::Context;
|
||||
use util::{PeerInfo, ConfigurableSynchronizer, ResponseQueue, Synchronizer, Responses};
|
||||
|
||||
pub struct PeerContext {
|
||||
context: Arc<Context>,
|
||||
info: PeerInfo,
|
||||
synchronizer: Mutex<ConfigurableSynchronizer>,
|
||||
response_queue: Mutex<ResponseQueue>,
|
||||
}
|
||||
|
||||
impl PeerContext {
|
||||
pub fn new(context: Arc<Context>, info: PeerInfo, synchronous: bool) -> Self {
|
||||
PeerContext {
|
||||
context: context,
|
||||
info: info,
|
||||
synchronizer: Mutex::new(ConfigurableSynchronizer::new(synchronous)),
|
||||
response_queue: Mutex::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn to_message<T>(&self, payload: &T) -> Message<T> where T: Payload {
|
||||
Message::new(self.info.magic, self.info.version, payload).expect("failed to create outgoing message")
|
||||
}
|
||||
|
||||
fn send_awaiting(&self, sync: &mut ConfigurableSynchronizer, queue: &mut ResponseQueue, start_id: u32) {
|
||||
let mut next_id = start_id;
|
||||
loop {
|
||||
next_id = next_id.overflowing_add(1).0;
|
||||
match queue.responses(next_id) {
|
||||
Some(Responses::Finished(messages)) => {
|
||||
assert!(sync.permission_for_response(next_id));
|
||||
for message in messages {
|
||||
let send = Context::send_message_to_peer(self.context.clone(), self.info.id, message);
|
||||
self.context.spawn(send);
|
||||
}
|
||||
},
|
||||
Some(Responses::Ignored) => {
|
||||
assert!(sync.permission_for_response(next_id));
|
||||
},
|
||||
Some(Responses::Unfinished(messages)) => {
|
||||
assert!(sync.is_permitted(next_id));
|
||||
for message in messages {
|
||||
let send = Context::send_message_to_peer(self.context.clone(), self.info.id, message);
|
||||
self.context.spawn(send);
|
||||
}
|
||||
break;
|
||||
},
|
||||
None => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Request is always automatically send.
|
||||
pub fn send_request<T>(&self, payload: &T) where T: Payload {
|
||||
let send = Context::send_to_peer(self.context.clone(), self.info.id, payload);
|
||||
self.context.spawn(send);
|
||||
}
|
||||
|
||||
pub fn declare_response(&self) -> u32 {
|
||||
let d = self.synchronizer.lock().declare_response();
|
||||
trace!("declared response: {}", d);
|
||||
d
|
||||
}
|
||||
|
||||
pub fn send_response_inline<T>(&self, payload: &T) where T: Payload {
|
||||
let id = self.declare_response();
|
||||
self.send_response(payload, id, true);
|
||||
}
|
||||
|
||||
/// Do not wait for response with given id.
|
||||
pub fn ignore_response(&self, id: u32) {
|
||||
let mut sync = self.synchronizer.lock();
|
||||
let mut queue = self.response_queue.lock();
|
||||
if sync.permission_for_response(id) {
|
||||
self.send_awaiting(&mut sync, &mut queue, id);
|
||||
} else {
|
||||
queue.push_ignored_response(id);
|
||||
}
|
||||
}
|
||||
|
||||
/// Responses are sent in order defined by synchronizer.
|
||||
pub fn send_response<T>(&self, payload: &T, id: u32, is_final: bool) where T: Payload {
|
||||
trace!("response ready: {}, id: {}, final: {}", T::command(), id, is_final);
|
||||
let mut sync = self.synchronizer.lock();
|
||||
let mut queue = self.response_queue.lock();
|
||||
if is_final {
|
||||
if sync.permission_for_response(id) {
|
||||
let send = Context::send_to_peer(self.context.clone(), self.info.id, payload);
|
||||
self.context.spawn(send);
|
||||
self.send_awaiting(&mut sync, &mut queue, id);
|
||||
} else {
|
||||
queue.push_finished_response(id, self.to_message(payload).into());
|
||||
}
|
||||
} else {
|
||||
if sync.is_permitted(id) {
|
||||
let send = Context::send_to_peer(self.context.clone(), self.info.id, payload);
|
||||
self.context.spawn(send);
|
||||
} else {
|
||||
queue.push_unfinished_response(id, self.to_message(payload).into());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn info(&self) -> &PeerInfo {
|
||||
&self.info
|
||||
}
|
||||
|
||||
pub fn global(&self) -> &Arc<Context> {
|
||||
&self.context
|
||||
}
|
||||
}
|
|
@ -9,7 +9,7 @@ use tokio_core::net::{TcpListener, TcpStream};
|
|||
use tokio_core::reactor::{Handle, Remote, Timeout, Interval};
|
||||
use abstract_ns::Resolver;
|
||||
use ns_dns_tokio::DnsResolver;
|
||||
use message::{Payload, MessageResult};
|
||||
use message::{Payload, MessageResult, Message};
|
||||
use message::common::Services;
|
||||
use net::{connect, Connections, Channel, Config as NetConfig, accept_connection, ConnectionCounter};
|
||||
use util::{NodeTable, Node, Direction};
|
||||
|
@ -293,7 +293,22 @@ impl Context {
|
|||
/// Send message to a channel with given peer id.
|
||||
pub fn send_to_peer<T>(context: Arc<Context>, peer: PeerId, payload: &T) -> IoFuture<()> where T: Payload {
|
||||
match context.connections.channel(peer) {
|
||||
Some(channel) => Context::send(context, channel, payload),
|
||||
Some(channel) => {
|
||||
let info = channel.peer_info();
|
||||
let message = Message::new(info.magic, info.version, payload).expect("failed to create outgoing message");
|
||||
Context::send(context, channel, message)
|
||||
},
|
||||
None => {
|
||||
// peer no longer exists.
|
||||
// TODO: should we return error here?
|
||||
finished(()).boxed()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send_message_to_peer<T>(context: Arc<Context>, peer: PeerId, message: T) -> IoFuture<()> where T: AsRef<[u8]> + Send + 'static {
|
||||
match context.connections.channel(peer) {
|
||||
Some(channel) => Context::send(context, channel, message),
|
||||
None => {
|
||||
// peer no longer exists.
|
||||
// TODO: should we return error here?
|
||||
|
@ -303,13 +318,13 @@ impl Context {
|
|||
}
|
||||
|
||||
/// Send message using given channel.
|
||||
pub fn send<T>(_context: Arc<Context>, channel: Arc<Channel>, payload: &T) -> IoFuture<()> where T: Payload {
|
||||
trace!("Sending {} message to {}", T::command(), channel.peer_info().address);
|
||||
channel.write_message(payload).then(move |result| {
|
||||
pub fn send<T>(_context: Arc<Context>, channel: Arc<Channel>, message: T) -> IoFuture<()> where T: AsRef<[u8]> + Send + 'static {
|
||||
//trace!("Sending {} message to {}", T::command(), channel.peer_info().address);
|
||||
channel.write_message(message).then(move |result| {
|
||||
match result {
|
||||
Ok(_) => {
|
||||
// successful send
|
||||
trace!("Sent {} message to {}", T::command(), channel.peer_info().address);
|
||||
//trace!("Sent {} message to {}", T::command(), channel.peer_info().address);
|
||||
finished(()).boxed()
|
||||
},
|
||||
Err(err) => {
|
||||
|
|
|
@ -4,30 +4,26 @@ use bytes::Bytes;
|
|||
use message::{Error, Command, deserialize_payload, Payload};
|
||||
use message::types::{GetAddr, Addr};
|
||||
use protocol::Protocol;
|
||||
use p2p::Context;
|
||||
use util::{Direction, PeerInfo};
|
||||
use net::PeerContext;
|
||||
use util::Direction;
|
||||
|
||||
pub struct AddrProtocol {
|
||||
/// Context
|
||||
context: Arc<Context>,
|
||||
/// Connected peer info.
|
||||
info: PeerInfo,
|
||||
context: Arc<PeerContext>,
|
||||
}
|
||||
|
||||
impl AddrProtocol {
|
||||
pub fn new(context: Arc<Context>, info: PeerInfo) -> Self {
|
||||
pub fn new(context: Arc<PeerContext>) -> Self {
|
||||
AddrProtocol {
|
||||
context: context,
|
||||
info: info,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Protocol for AddrProtocol {
|
||||
fn initialize(&mut self) {
|
||||
if let Direction::Outbound = self.info.direction {
|
||||
let send = Context::send_to_peer(self.context.clone(), self.info.id, &GetAddr);
|
||||
self.context.spawn(send);
|
||||
if let Direction::Outbound = self.context.info().direction {
|
||||
self.context.send_request(&GetAddr);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -35,20 +31,19 @@ impl Protocol for AddrProtocol {
|
|||
// normal nodes send addr message only after they receive getaddr message
|
||||
// meanwhile seednodes, surprisingly, send addr message even before they are asked for it
|
||||
if command == &GetAddr::command() {
|
||||
let _: GetAddr = try!(deserialize_payload(payload, self.info.version));
|
||||
let entries = self.context.node_table_entries().into_iter().map(Into::into).collect();
|
||||
let _: GetAddr = try!(deserialize_payload(payload, self.context.info().version));
|
||||
let entries = self.context.global().node_table_entries().into_iter().map(Into::into).collect();
|
||||
let addr = Addr::new(entries);
|
||||
let send = Context::send_to_peer(self.context.clone(), self.info.id, &addr);
|
||||
self.context.spawn(send);
|
||||
self.context.send_response_inline(&addr);
|
||||
} else if command == &Addr::command() {
|
||||
let addr: Addr = try!(deserialize_payload(payload, self.info.version));
|
||||
let addr: Addr = try!(deserialize_payload(payload, self.context.info().version));
|
||||
match addr {
|
||||
Addr::V0(_) => {
|
||||
unreachable!("This version of protocol is not supported!");
|
||||
},
|
||||
Addr::V31402(addr) => {
|
||||
let nodes = addr.addresses.into_iter().map(Into::into).collect();
|
||||
self.context.update_node_table(nodes);
|
||||
self.context.global().update_node_table(nodes);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -58,18 +53,15 @@ impl Protocol for AddrProtocol {
|
|||
|
||||
pub struct SeednodeProtocol {
|
||||
/// Context
|
||||
context: Arc<Context>,
|
||||
/// Connected peer info,
|
||||
info: PeerInfo,
|
||||
context: Arc<PeerContext>,
|
||||
/// Indicates if disconnecting has been scheduled.
|
||||
disconnecting: bool,
|
||||
}
|
||||
|
||||
impl SeednodeProtocol {
|
||||
pub fn new(context: Arc<Context>, info: PeerInfo) -> Self {
|
||||
pub fn new(context: Arc<PeerContext>) -> Self {
|
||||
SeednodeProtocol {
|
||||
context: context,
|
||||
info: info,
|
||||
disconnecting: false,
|
||||
}
|
||||
}
|
||||
|
@ -81,9 +73,9 @@ impl Protocol for SeednodeProtocol {
|
|||
// We can't disconenct after first read. Let's delay it by 60 seconds.
|
||||
if !self.disconnecting && command == &Addr::command() {
|
||||
self.disconnecting = true;
|
||||
let context = self.context.clone();
|
||||
let peer = self.info.id;
|
||||
self.context.execute_after(Duration::new(60, 0), move || {
|
||||
let context = self.context.global().clone();
|
||||
let peer = self.context.info().id;
|
||||
self.context.global().execute_after(Duration::new(60, 0), move || {
|
||||
context.close_channel(peer);
|
||||
});
|
||||
}
|
||||
|
|
|
@ -4,26 +4,12 @@ use message::{Error, Payload, deserialize_payload};
|
|||
use message::types::{Ping, Pong};
|
||||
use message::common::Command;
|
||||
use protocol::Protocol;
|
||||
use util::{PeerId, PeerInfo};
|
||||
use net::PeerContext;
|
||||
use util::nonce::{NonceGenerator, RandomNonce};
|
||||
use p2p::Context;
|
||||
|
||||
pub trait PingContext: Send + Sync {
|
||||
fn send_to_peer<T>(context: Arc<Self>, peer: PeerId, payload: &T) where Self: Sized, T: Payload;
|
||||
}
|
||||
|
||||
impl PingContext for Context {
|
||||
fn send_to_peer<T>(context: Arc<Self>, peer: PeerId, payload: &T) where T: Payload {
|
||||
let send = Context::send_to_peer(context.clone(), peer, payload);
|
||||
context.spawn(send);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PingProtocol<T = RandomNonce, C = Context> {
|
||||
pub struct PingProtocol<T = RandomNonce, C = PeerContext> {
|
||||
/// Context
|
||||
context: Arc<C>,
|
||||
/// Connected peer info.
|
||||
info: PeerInfo,
|
||||
/// Nonce generator.
|
||||
nonce_generator: T,
|
||||
/// Last nonce sent in the ping message.
|
||||
|
@ -31,32 +17,31 @@ pub struct PingProtocol<T = RandomNonce, C = Context> {
|
|||
}
|
||||
|
||||
impl PingProtocol {
|
||||
pub fn new(context: Arc<Context>, info: PeerInfo) -> Self {
|
||||
pub fn new(context: Arc<PeerContext>) -> Self {
|
||||
PingProtocol {
|
||||
context: context,
|
||||
info: info,
|
||||
nonce_generator: RandomNonce::default(),
|
||||
last_ping_nonce: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, C> Protocol for PingProtocol<T, C> where T: NonceGenerator + Send, C: PingContext {
|
||||
impl Protocol for PingProtocol {
|
||||
fn initialize(&mut self) {
|
||||
// bitcoind always sends ping, let's do the same
|
||||
let nonce = self.nonce_generator.get();
|
||||
self.last_ping_nonce = Some(nonce);
|
||||
let ping = Ping::new(nonce);
|
||||
PingContext::send_to_peer(self.context.clone(), self.info.id, &ping);
|
||||
self.context.send_request(&ping);
|
||||
}
|
||||
|
||||
fn on_message(&mut self, command: &Command, payload: &Bytes) -> Result<(), Error> {
|
||||
if command == &Ping::command() {
|
||||
let ping: Ping = try!(deserialize_payload(payload, self.info.version));
|
||||
let ping: Ping = try!(deserialize_payload(payload, self.context.info().version));
|
||||
let pong = Pong::new(ping.nonce);
|
||||
PingContext::send_to_peer(self.context.clone(), self.info.id, &pong);
|
||||
self.context.send_response_inline(&pong);
|
||||
} else if command == &Pong::command() {
|
||||
let pong: Pong = try!(deserialize_payload(payload, self.info.version));
|
||||
let pong: Pong = try!(deserialize_payload(payload, self.context.info().version));
|
||||
if Some(pong.nonce) != self.last_ping_nonce.take() {
|
||||
return Err(Error::InvalidCommand)
|
||||
}
|
||||
|
@ -65,111 +50,3 @@ impl<T, C> Protocol for PingProtocol<T, C> where T: NonceGenerator + Send, C: Pi
|
|||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
use parking_lot::Mutex;
|
||||
use bytes::Bytes;
|
||||
use message::{Payload, serialize_payload, Magic};
|
||||
use message::types::{Ping, Pong};
|
||||
use util::{PeerId, PeerInfo, Direction};
|
||||
use util::nonce::StaticNonce;
|
||||
use protocol::Protocol;
|
||||
use super::{PingProtocol, PingContext};
|
||||
|
||||
#[derive(Default)]
|
||||
struct TestPingContext {
|
||||
version: u32,
|
||||
messages: Mutex<Vec<(PeerId, Bytes)>>,
|
||||
}
|
||||
|
||||
impl PingContext for TestPingContext {
|
||||
fn send_to_peer<T>(context: Arc<Self>, peer: PeerId, payload: &T) where T: Payload {
|
||||
let value = (peer, serialize_payload(payload, context.version).unwrap());
|
||||
context.messages.lock().push(value);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ping_init() {
|
||||
let ping_context = Arc::new(TestPingContext::default());
|
||||
let peer = 99;
|
||||
let nonce = 1000;
|
||||
let expected_message = serialize_payload(&Ping::new(nonce), 0).unwrap();
|
||||
let mut ping_protocol = PingProtocol {
|
||||
context: ping_context.clone(),
|
||||
info: PeerInfo {
|
||||
id: peer,
|
||||
address: "0.0.0.0:8080".parse().unwrap(),
|
||||
direction: Direction::Inbound,
|
||||
version: 0,
|
||||
magic: Magic::Testnet,
|
||||
},
|
||||
nonce_generator: StaticNonce::new(nonce),
|
||||
last_ping_nonce: None,
|
||||
};
|
||||
|
||||
ping_protocol.initialize();
|
||||
let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone();
|
||||
assert_eq!(messages.len(), 1);
|
||||
assert_eq!(messages[0].0, peer);
|
||||
assert_eq!(messages[0].1, expected_message);
|
||||
assert_eq!(ping_protocol.last_ping_nonce, Some(nonce));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ping_on_message_ping() {
|
||||
let ping_context = Arc::new(TestPingContext::default());
|
||||
let peer = 99;
|
||||
let nonce = 1000;
|
||||
let command = "ping".into();
|
||||
let message = serialize_payload(&Ping::new(nonce), 0).unwrap();
|
||||
let expected_message = serialize_payload(&Pong::new(nonce), 0).unwrap();
|
||||
let mut ping_protocol = PingProtocol {
|
||||
context: ping_context.clone(),
|
||||
info: PeerInfo {
|
||||
id: peer,
|
||||
address: "0.0.0.0:8080".parse().unwrap(),
|
||||
direction: Direction::Inbound,
|
||||
version: 0,
|
||||
magic: Magic::Testnet,
|
||||
},
|
||||
nonce_generator: StaticNonce::new(nonce),
|
||||
last_ping_nonce: None,
|
||||
};
|
||||
|
||||
assert!(ping_protocol.on_message(&command, &message).is_ok());
|
||||
let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone();
|
||||
assert_eq!(messages.len(), 1);
|
||||
assert_eq!(messages[0].0, peer);
|
||||
assert_eq!(messages[0].1, expected_message);
|
||||
assert_eq!(ping_protocol.last_ping_nonce, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ping_on_message_pong() {
|
||||
let ping_context = Arc::new(TestPingContext::default());
|
||||
let peer = 99;
|
||||
let nonce = 1000;
|
||||
let command = "pong".into();
|
||||
let message = serialize_payload(&Pong::new(nonce), 0).unwrap();
|
||||
let mut ping_protocol = PingProtocol {
|
||||
context: ping_context.clone(),
|
||||
info: PeerInfo {
|
||||
id: peer,
|
||||
address: "0.0.0.0:8080".parse().unwrap(),
|
||||
direction: Direction::Inbound,
|
||||
version: 0,
|
||||
magic: Magic::Testnet,
|
||||
},
|
||||
nonce_generator: StaticNonce::new(nonce),
|
||||
last_ping_nonce: Some(nonce),
|
||||
};
|
||||
|
||||
assert!(ping_protocol.on_message(&command, &message).is_ok());
|
||||
let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone();
|
||||
assert_eq!(messages.len(), 0);
|
||||
assert_eq!(ping_protocol.last_ping_nonce, None);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,8 +2,7 @@ use std::sync::Arc;
|
|||
use bytes::Bytes;
|
||||
use message::{Command, Error, Payload, types, deserialize_payload};
|
||||
use protocol::Protocol;
|
||||
use util::{PeerInfo, PeerId};
|
||||
use p2p::Context;
|
||||
use net::PeerContext;
|
||||
|
||||
pub type InboundSyncConnectionRef = Box<InboundSyncConnection>;
|
||||
pub type OutboundSyncConnectionRef = Box<OutboundSyncConnection>;
|
||||
|
@ -19,13 +18,13 @@ pub trait InboundSyncConnection : Send + Sync {
|
|||
fn start_sync_session(&self, version: u32);
|
||||
fn close_session(&self);
|
||||
fn on_inventory(&self, message: types::Inv);
|
||||
fn on_getdata(&self, message: types::GetData);
|
||||
fn on_getblocks(&self, message: types::GetBlocks);
|
||||
fn on_getheaders(&self, message: types::GetHeaders);
|
||||
fn on_getdata(&self, message: types::GetData, id: u32);
|
||||
fn on_getblocks(&self, message: types::GetBlocks, id: u32);
|
||||
fn on_getheaders(&self, message: types::GetHeaders, id: u32);
|
||||
fn on_transaction(&self, message: types::Tx);
|
||||
fn on_block(&self, message: types::Block);
|
||||
fn on_headers(&self, message: types::Headers);
|
||||
fn on_mempool(&self, message: types::MemPool);
|
||||
fn on_mempool(&self, message: types::MemPool, id: u32);
|
||||
fn on_filterload(&self, message: types::FilterLoad);
|
||||
fn on_filteradd(&self, message: types::FilterAdd);
|
||||
fn on_filterclear(&self, message: types::FilterClear);
|
||||
|
@ -40,14 +39,14 @@ pub trait InboundSyncConnection : Send + Sync {
|
|||
}
|
||||
|
||||
pub trait OutboundSyncConnection : Send + Sync {
|
||||
fn send_inventory(&self, message: &types::Inv);
|
||||
fn send_inventory(&self, message: &types::Inv, id: u32, is_final: bool);
|
||||
fn send_getdata(&self, message: &types::GetData);
|
||||
fn send_getblocks(&self, message: &types::GetBlocks);
|
||||
fn send_getheaders(&self, message: &types::GetHeaders);
|
||||
fn send_transaction(&self, message: &types::Tx);
|
||||
fn send_block(&self, message: &types::Block);
|
||||
fn send_headers(&self, message: &types::Headers);
|
||||
fn send_mempool(&self, message: &types::MemPool);
|
||||
fn send_block(&self, message: &types::Block, id: u32, is_final: bool);
|
||||
fn send_headers(&self, message: &types::Headers, id: u32, is_final: bool);
|
||||
fn send_mempool(&self, message: &types::MemPool, id: u32, is_final: bool);
|
||||
fn send_filterload(&self, message: &types::FilterLoad);
|
||||
fn send_filteradd(&self, message: &types::FilterAdd);
|
||||
fn send_filterclear(&self, message: &types::FilterClear);
|
||||
|
@ -58,206 +57,213 @@ pub trait OutboundSyncConnection : Send + Sync {
|
|||
fn send_compact_block(&self, message: &types::CompactBlock);
|
||||
fn send_get_block_txn(&self, message: &types::GetBlockTxn);
|
||||
fn send_block_txn(&self, message: &types::BlockTxn);
|
||||
fn send_notfound(&self, message: &types::NotFound);
|
||||
fn send_notfound(&self, message: &types::NotFound, id: u32, is_final: bool);
|
||||
fn ignored(&self, id: u32);
|
||||
}
|
||||
|
||||
struct OutboundSync {
|
||||
context: Arc<Context>,
|
||||
peer: PeerId,
|
||||
context: Arc<PeerContext>,
|
||||
}
|
||||
|
||||
impl OutboundSync {
|
||||
pub fn new(context: Arc<Context>, peer: PeerId) -> OutboundSync {
|
||||
pub fn new(context: Arc<PeerContext>) -> OutboundSync {
|
||||
OutboundSync {
|
||||
context: context,
|
||||
peer: peer,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send_message<T>(&self, message: &T) where T: Payload {
|
||||
let send = Context::send_to_peer(self.context.clone(), self.peer, message);
|
||||
self.context.spawn(send);
|
||||
}
|
||||
|
||||
pub fn boxed(self) -> Box<OutboundSyncConnection> {
|
||||
Box::new(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl OutboundSyncConnection for OutboundSync {
|
||||
fn send_inventory(&self, message: &types::Inv) {
|
||||
self.send_message(message);
|
||||
fn send_inventory(&self, message: &types::Inv, id: u32, is_final: bool) {
|
||||
self.context.send_response(message, id, is_final);
|
||||
}
|
||||
|
||||
fn send_getdata(&self, message: &types::GetData) {
|
||||
self.send_message(message);
|
||||
self.context.send_request(message);
|
||||
}
|
||||
|
||||
fn send_getblocks(&self, message: &types::GetBlocks) {
|
||||
self.send_message(message);
|
||||
self.context.send_request(message);
|
||||
}
|
||||
|
||||
fn send_getheaders(&self, message: &types::GetHeaders) {
|
||||
self.send_message(message);
|
||||
self.context.send_request(message);
|
||||
}
|
||||
|
||||
fn send_transaction(&self, message: &types::Tx) {
|
||||
self.send_message(message);
|
||||
self.context.send_request(message);
|
||||
}
|
||||
|
||||
fn send_block(&self, message: &types::Block) {
|
||||
self.send_message(message);
|
||||
fn send_block(&self, message: &types::Block, id: u32, is_final: bool) {
|
||||
self.context.send_response(message, id, is_final);
|
||||
}
|
||||
|
||||
fn send_headers(&self, message: &types::Headers) {
|
||||
self.send_message(message);
|
||||
fn send_headers(&self, message: &types::Headers, id: u32, is_final: bool) {
|
||||
self.context.send_response(message, id, is_final);
|
||||
}
|
||||
|
||||
fn send_mempool(&self, message: &types::MemPool) {
|
||||
self.send_message(message);
|
||||
fn send_mempool(&self, message: &types::MemPool, id: u32, is_final: bool) {
|
||||
self.context.send_response(message, id, is_final);
|
||||
}
|
||||
|
||||
fn send_filterload(&self, message: &types::FilterLoad) {
|
||||
self.send_message(message);
|
||||
self.context.send_request(message);
|
||||
}
|
||||
|
||||
fn send_filteradd(&self, message: &types::FilterAdd) {
|
||||
self.send_message(message);
|
||||
self.context.send_request(message);
|
||||
}
|
||||
|
||||
fn send_filterclear(&self, message: &types::FilterClear) {
|
||||
self.send_message(message);
|
||||
self.context.send_request(message);
|
||||
}
|
||||
|
||||
fn send_merkleblock(&self, message: &types::MerkleBlock) {
|
||||
self.send_message(message);
|
||||
self.context.send_request(message);
|
||||
}
|
||||
|
||||
fn send_sendheaders(&self, message: &types::SendHeaders) {
|
||||
self.send_message(message);
|
||||
self.context.send_request(message);
|
||||
}
|
||||
|
||||
fn send_feefilter(&self, message: &types::FeeFilter) {
|
||||
self.send_message(message);
|
||||
self.context.send_request(message);
|
||||
}
|
||||
|
||||
fn send_send_compact(&self, message: &types::SendCompact) {
|
||||
self.send_message(message);
|
||||
self.context.send_request(message);
|
||||
}
|
||||
|
||||
fn send_compact_block(&self, message: &types::CompactBlock) {
|
||||
self.send_message(message);
|
||||
self.context.send_request(message);
|
||||
}
|
||||
|
||||
fn send_get_block_txn(&self, message: &types::GetBlockTxn) {
|
||||
self.send_message(message);
|
||||
self.context.send_request(message);
|
||||
}
|
||||
|
||||
fn send_block_txn(&self, message: &types::BlockTxn) {
|
||||
self.send_message(message);
|
||||
self.context.send_request(message);
|
||||
}
|
||||
|
||||
fn send_notfound(&self, message: &types::NotFound) {
|
||||
self.send_message(message);
|
||||
fn send_notfound(&self, message: &types::NotFound, id: u32, is_final: bool) {
|
||||
self.context.send_response(message, id, is_final);
|
||||
}
|
||||
|
||||
fn ignored(&self, id: u32) {
|
||||
self.context.ignore_response(id);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SyncProtocol {
|
||||
inbound_connection: InboundSyncConnectionRef,
|
||||
info: PeerInfo,
|
||||
context: Arc<PeerContext>,
|
||||
}
|
||||
|
||||
impl SyncProtocol {
|
||||
pub fn new(context: Arc<Context>, info: PeerInfo) -> Self {
|
||||
let outbound_connection = OutboundSync::new(context.clone(), info.id).boxed();
|
||||
let inbound_connection = context.create_sync_session(0, outbound_connection);
|
||||
pub fn new(context: Arc<PeerContext>) -> Self {
|
||||
let outbound_connection = OutboundSync::new(context.clone()).boxed();
|
||||
let inbound_connection = context.global().create_sync_session(0, outbound_connection);
|
||||
SyncProtocol {
|
||||
inbound_connection: inbound_connection,
|
||||
info: info,
|
||||
context: context,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Protocol for SyncProtocol {
|
||||
fn initialize(&mut self) {
|
||||
self.inbound_connection.start_sync_session(self.info.version);
|
||||
self.inbound_connection.start_sync_session(self.context.info().version);
|
||||
}
|
||||
|
||||
fn on_message(&mut self, command: &Command, payload: &Bytes) -> Result<(), Error> {
|
||||
let version = self.context.info().version;
|
||||
if command == &types::Inv::command() {
|
||||
let message: types::Inv = try!(deserialize_payload(payload, self.info.version));
|
||||
let message: types::Inv = try!(deserialize_payload(payload, version));
|
||||
self.inbound_connection.on_inventory(message);
|
||||
}
|
||||
else if command == &types::GetData::command() {
|
||||
let message: types::GetData = try!(deserialize_payload(payload, self.info.version));
|
||||
self.inbound_connection.on_getdata(message);
|
||||
let message: types::GetData = try!(deserialize_payload(payload, version));
|
||||
let id = self.context.declare_response();
|
||||
trace!("declared response {} for request: {}", id, types::GetData::command());
|
||||
self.inbound_connection.on_getdata(message, id);
|
||||
}
|
||||
else if command == &types::GetBlocks::command() {
|
||||
let message: types::GetBlocks = try!(deserialize_payload(payload, self.info.version));
|
||||
self.inbound_connection.on_getblocks(message);
|
||||
let message: types::GetBlocks = try!(deserialize_payload(payload, version));
|
||||
let id = self.context.declare_response();
|
||||
trace!("declared response {} for request: {}", id, types::GetBlocks::command());
|
||||
self.inbound_connection.on_getblocks(message, id);
|
||||
}
|
||||
else if command == &types::GetHeaders::command() {
|
||||
let message: types::GetHeaders = try!(deserialize_payload(payload, self.info.version));
|
||||
self.inbound_connection.on_getheaders(message);
|
||||
let message: types::GetHeaders = try!(deserialize_payload(payload, version));
|
||||
let id = self.context.declare_response();
|
||||
trace!("declared response {} for request: {}", id, types::GetHeaders::command());
|
||||
self.inbound_connection.on_getheaders(message, id);
|
||||
}
|
||||
else if command == &types::Tx::command() {
|
||||
let message: types::Tx = try!(deserialize_payload(payload, self.info.version));
|
||||
let message: types::Tx = try!(deserialize_payload(payload, version));
|
||||
self.inbound_connection.on_transaction(message);
|
||||
}
|
||||
else if command == &types::Block::command() {
|
||||
let message: types::Block = try!(deserialize_payload(payload, self.info.version));
|
||||
let message: types::Block = try!(deserialize_payload(payload, version));
|
||||
self.inbound_connection.on_block(message);
|
||||
}
|
||||
else if command == &types::MemPool::command() {
|
||||
let message: types::MemPool = try!(deserialize_payload(payload, self.info.version));
|
||||
self.inbound_connection.on_mempool(message);
|
||||
let message: types::MemPool = try!(deserialize_payload(payload, version));
|
||||
let id = self.context.declare_response();
|
||||
trace!("declared response {} for request: {}", id, types::MemPool::command());
|
||||
self.inbound_connection.on_mempool(message, id);
|
||||
}
|
||||
else if command == &types::Headers::command() {
|
||||
let message: types::Headers = try!(deserialize_payload(payload, self.info.version));
|
||||
let message: types::Headers = try!(deserialize_payload(payload, version));
|
||||
self.inbound_connection.on_headers(message);
|
||||
}
|
||||
else if command == &types::FilterLoad::command() {
|
||||
let message: types::FilterLoad = try!(deserialize_payload(payload, self.info.version));
|
||||
let message: types::FilterLoad = try!(deserialize_payload(payload, version));
|
||||
self.inbound_connection.on_filterload(message);
|
||||
}
|
||||
else if command == &types::FilterAdd::command() {
|
||||
let message: types::FilterAdd = try!(deserialize_payload(payload, self.info.version));
|
||||
let message: types::FilterAdd = try!(deserialize_payload(payload, version));
|
||||
self.inbound_connection.on_filteradd(message);
|
||||
}
|
||||
else if command == &types::FilterClear::command() {
|
||||
let message: types::FilterClear = try!(deserialize_payload(payload, self.info.version));
|
||||
let message: types::FilterClear = try!(deserialize_payload(payload, version));
|
||||
self.inbound_connection.on_filterclear(message);
|
||||
}
|
||||
else if command == &types::MerkleBlock::command() {
|
||||
let message: types::MerkleBlock = try!(deserialize_payload(payload, self.info.version));
|
||||
let message: types::MerkleBlock = try!(deserialize_payload(payload, version));
|
||||
self.inbound_connection.on_merkleblock(message);
|
||||
}
|
||||
else if command == &types::SendHeaders::command() {
|
||||
let message: types::SendHeaders = try!(deserialize_payload(payload, self.info.version));
|
||||
let message: types::SendHeaders = try!(deserialize_payload(payload, version));
|
||||
self.inbound_connection.on_sendheaders(message);
|
||||
}
|
||||
else if command == &types::FeeFilter::command() {
|
||||
let message: types::FeeFilter = try!(deserialize_payload(payload, self.info.version));
|
||||
let message: types::FeeFilter = try!(deserialize_payload(payload, version));
|
||||
self.inbound_connection.on_feefilter(message);
|
||||
}
|
||||
else if command == &types::SendCompact::command() {
|
||||
let message: types::SendCompact = try!(deserialize_payload(payload, self.info.version));
|
||||
let message: types::SendCompact = try!(deserialize_payload(payload, version));
|
||||
self.inbound_connection.on_send_compact(message);
|
||||
}
|
||||
else if command == &types::CompactBlock::command() {
|
||||
let message: types::CompactBlock = try!(deserialize_payload(payload, self.info.version));
|
||||
let message: types::CompactBlock = try!(deserialize_payload(payload, version));
|
||||
self.inbound_connection.on_compact_block(message);
|
||||
}
|
||||
else if command == &types::GetBlockTxn::command() {
|
||||
let message: types::GetBlockTxn = try!(deserialize_payload(payload, self.info.version));
|
||||
let message: types::GetBlockTxn = try!(deserialize_payload(payload, version));
|
||||
self.inbound_connection.on_get_block_txn(message);
|
||||
}
|
||||
else if command == &types::BlockTxn::command() {
|
||||
let message: types::BlockTxn = try!(deserialize_payload(payload, self.info.version));
|
||||
let message: types::BlockTxn = try!(deserialize_payload(payload, version));
|
||||
self.inbound_connection.on_block_txn(message);
|
||||
}
|
||||
else if command == &types::NotFound::command() {
|
||||
let message: types::NotFound = try!(deserialize_payload(payload, self.info.version));
|
||||
let message: types::NotFound = try!(deserialize_payload(payload, version));
|
||||
self.inbound_connection.on_notfound(message);
|
||||
}
|
||||
Ok(())
|
||||
|
|
|
@ -3,42 +3,47 @@ use parking_lot::Mutex;
|
|||
use bytes::Bytes;
|
||||
use message::{Command, Error};
|
||||
use p2p::Context;
|
||||
use net::PeerContext;
|
||||
use protocol::{Protocol, PingProtocol, SyncProtocol, AddrProtocol, SeednodeProtocol};
|
||||
use util::{PeerInfo};
|
||||
use util::PeerInfo;
|
||||
|
||||
pub trait SessionFactory {
|
||||
fn new_session(context: Arc<Context>, info: PeerInfo) -> Session;
|
||||
fn new_session(context: Arc<Context>, info: PeerInfo, synchronous: bool) -> Session;
|
||||
}
|
||||
|
||||
pub struct SeednodeSessionFactory;
|
||||
|
||||
impl SessionFactory for SeednodeSessionFactory {
|
||||
fn new_session(context: Arc<Context>, info: PeerInfo) -> Session {
|
||||
let ping = PingProtocol::new(context.clone(), info.clone()).boxed();
|
||||
let addr = AddrProtocol::new(context.clone(), info.clone()).boxed();
|
||||
let seed = SeednodeProtocol::new(context.clone(), info).boxed();
|
||||
Session::new(vec![ping, addr, seed])
|
||||
fn new_session(context: Arc<Context>, info: PeerInfo, synchronous: bool) -> Session {
|
||||
let peer_context = Arc::new(PeerContext::new(context, info, synchronous));
|
||||
let ping = PingProtocol::new(peer_context.clone()).boxed();
|
||||
let addr = AddrProtocol::new(peer_context.clone()).boxed();
|
||||
let seed = SeednodeProtocol::new(peer_context.clone()).boxed();
|
||||
Session::new(peer_context, vec![ping, addr, seed])
|
||||
}
|
||||
}
|
||||
|
||||
pub struct NormalSessionFactory;
|
||||
|
||||
impl SessionFactory for NormalSessionFactory {
|
||||
fn new_session(context: Arc<Context>, info: PeerInfo) -> Session {
|
||||
let ping = PingProtocol::new(context.clone(), info.clone()).boxed();
|
||||
let addr = AddrProtocol::new(context.clone(), info.clone()).boxed();
|
||||
let sync = SyncProtocol::new(context, info).boxed();
|
||||
Session::new(vec![ping, addr, sync])
|
||||
fn new_session(context: Arc<Context>, info: PeerInfo, synchronous: bool) -> Session {
|
||||
let peer_context = Arc::new(PeerContext::new(context, info, synchronous));
|
||||
let ping = PingProtocol::new(peer_context.clone()).boxed();
|
||||
let addr = AddrProtocol::new(peer_context.clone()).boxed();
|
||||
let sync = SyncProtocol::new(peer_context.clone()).boxed();
|
||||
Session::new(peer_context, vec![ping, addr, sync])
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Session {
|
||||
_peer_context: Arc<PeerContext>,
|
||||
protocols: Mutex<Vec<Box<Protocol>>>,
|
||||
}
|
||||
|
||||
impl Session {
|
||||
pub fn new(protocols: Vec<Box<Protocol>>) -> Self {
|
||||
pub fn new(peer_context: Arc<PeerContext>, protocols: Vec<Box<Protocol>>) -> Self {
|
||||
Session {
|
||||
_peer_context: peer_context,
|
||||
protocols: Mutex::new(protocols),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,8 +2,10 @@ pub mod nonce;
|
|||
pub mod time;
|
||||
mod node_table;
|
||||
mod peer;
|
||||
mod response_queue;
|
||||
mod synchronizer;
|
||||
|
||||
pub use self::node_table::{NodeTable, Node};
|
||||
pub use self::peer::{PeerId, PeerInfo, Direction};
|
||||
pub use self::response_queue::{ResponseQueue, Responses};
|
||||
pub use self::synchronizer::{Synchronizer, ConfigurableSynchronizer};
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
use std::collections::{HashMap, HashSet};
|
||||
use bytes::Bytes;
|
||||
|
||||
/// Queue of out-of-order responses. Each peer has it's own queue.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ResponseQueue {
|
||||
unfinished: HashMap<u32, Vec<Bytes>>,
|
||||
finished: HashMap<u32, Vec<Bytes>>,
|
||||
ignored: HashSet<u32>,
|
||||
}
|
||||
|
||||
pub enum Responses {
|
||||
Unfinished(Vec<Bytes>),
|
||||
Finished(Vec<Bytes>),
|
||||
Ignored,
|
||||
}
|
||||
|
||||
impl ResponseQueue {
|
||||
pub fn push_unfinished_response(&mut self, id: u32, response: Bytes) {
|
||||
self.unfinished.entry(id).or_insert_with(Vec::new).push(response)
|
||||
}
|
||||
|
||||
pub fn push_finished_response(&mut self, id: u32, response: Bytes) {
|
||||
let mut responses = self.unfinished.remove(&id).unwrap_or_default();
|
||||
responses.push(response);
|
||||
let previous = self.finished.insert(id, responses);
|
||||
assert!(previous.is_none(), "logic error; same finished response should never be pushed twice");
|
||||
}
|
||||
|
||||
pub fn push_ignored_response(&mut self, id: u32) {
|
||||
assert!(self.ignored.insert(id), "logic error; same response should never be ignored twice");
|
||||
}
|
||||
|
||||
pub fn responses(&mut self, id: u32) -> Option<Responses> {
|
||||
self.unfinished.remove(&id).map(Responses::Unfinished)
|
||||
.or_else(|| self.finished.remove(&id).map(Responses::Finished))
|
||||
.or_else(|| {
|
||||
if self.ignored.remove(&id) {
|
||||
Some(Responses::Ignored)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -11,6 +11,9 @@ pub trait Synchronizer: Send {
|
|||
/// Declare sending response in future.
|
||||
fn declare_response(&mut self) -> u32;
|
||||
|
||||
/// Returns true if permission for response is granted, but without marking response as sent.
|
||||
fn is_permitted(&self, id: u32) -> bool;
|
||||
|
||||
/// Returns true if permission for sending response is granted.
|
||||
fn permission_for_response(&mut self, id: u32) -> bool;
|
||||
}
|
||||
|
@ -29,6 +32,10 @@ impl Synchronizer for FifoSynchronizer {
|
|||
result
|
||||
}
|
||||
|
||||
fn is_permitted(&self, id: u32) -> bool {
|
||||
id == self.next_to_grant
|
||||
}
|
||||
|
||||
fn permission_for_response(&mut self, id: u32) -> bool {
|
||||
// there should be an assertion here, assert!(id < self.declared_responses),
|
||||
// but it's impossible to write an assertion if the value may overflow
|
||||
|
@ -54,6 +61,10 @@ impl Synchronizer for NoopSynchronizer {
|
|||
result
|
||||
}
|
||||
|
||||
fn is_permitted(&self, _id: u32) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn permission_for_response(&mut self, _id: u32) -> bool {
|
||||
true
|
||||
}
|
||||
|
@ -81,18 +92,8 @@ impl ThresholdSynchronizer {
|
|||
to_grant_max: declared,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Synchronizer for ThresholdSynchronizer {
|
||||
fn declare_response(&mut self) -> u32 {
|
||||
self.inner.declare_response()
|
||||
}
|
||||
|
||||
fn permission_for_response(&mut self, id: u32) -> bool {
|
||||
if self.inner.permission_for_response(id) {
|
||||
return true;
|
||||
}
|
||||
|
||||
fn within_threshold(&self, id: u32) -> bool {
|
||||
if self.to_grant_min <= self.to_grant_max {
|
||||
// if max is bigger then min, id must be in range [min, max)
|
||||
self.to_grant_min <= id && id < self.to_grant_max
|
||||
|
@ -104,6 +105,20 @@ impl Synchronizer for ThresholdSynchronizer {
|
|||
}
|
||||
}
|
||||
|
||||
impl Synchronizer for ThresholdSynchronizer {
|
||||
fn declare_response(&mut self) -> u32 {
|
||||
self.inner.declare_response()
|
||||
}
|
||||
|
||||
fn is_permitted(&self, id: u32) -> bool {
|
||||
self.inner.is_permitted(id) || self.within_threshold(id)
|
||||
}
|
||||
|
||||
fn permission_for_response(&mut self, id: u32) -> bool {
|
||||
self.inner.permission_for_response(id) || self.within_threshold(id)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum InnerSynchronizer {
|
||||
Noop(NoopSynchronizer),
|
||||
|
@ -170,6 +185,13 @@ impl Synchronizer for ConfigurableSynchronizer {
|
|||
}
|
||||
}
|
||||
|
||||
fn is_permitted(&self, id: u32) -> bool {
|
||||
match self.inner {
|
||||
InnerSynchronizer::Noop(ref s) => s.is_permitted(id),
|
||||
InnerSynchronizer::Threshold(ref s) => s.is_permitted(id),
|
||||
}
|
||||
}
|
||||
|
||||
fn permission_for_response(&mut self, id: u32) -> bool {
|
||||
match self.inner {
|
||||
InnerSynchronizer::Threshold(ref mut s) => s.permission_for_response(id),
|
||||
|
|
|
@ -29,16 +29,16 @@ impl InboundSyncConnection for InboundConnection {
|
|||
self.local_node.on_peer_inventory(self.peer_index, message);
|
||||
}
|
||||
|
||||
fn on_getdata(&self, message: types::GetData) {
|
||||
self.local_node.on_peer_getdata(self.peer_index, message);
|
||||
fn on_getdata(&self, message: types::GetData, id: u32) {
|
||||
self.local_node.on_peer_getdata(self.peer_index, message, id);
|
||||
}
|
||||
|
||||
fn on_getblocks(&self, message: types::GetBlocks) {
|
||||
self.local_node.on_peer_getblocks(self.peer_index, message);
|
||||
fn on_getblocks(&self, message: types::GetBlocks, id: u32) {
|
||||
self.local_node.on_peer_getblocks(self.peer_index, message, id);
|
||||
}
|
||||
|
||||
fn on_getheaders(&self, message: types::GetHeaders) {
|
||||
self.local_node.on_peer_getheaders(self.peer_index, message);
|
||||
fn on_getheaders(&self, message: types::GetHeaders, id: u32) {
|
||||
self.local_node.on_peer_getheaders(self.peer_index, message, id);
|
||||
}
|
||||
|
||||
fn on_transaction(&self, message: types::Tx) {
|
||||
|
@ -53,8 +53,8 @@ impl InboundSyncConnection for InboundConnection {
|
|||
self.local_node.on_peer_headers(self.peer_index, message);
|
||||
}
|
||||
|
||||
fn on_mempool(&self, message: types::MemPool) {
|
||||
self.local_node.on_peer_mempool(self.peer_index, message);
|
||||
fn on_mempool(&self, message: types::MemPool, id: u32) {
|
||||
self.local_node.on_peer_mempool(self.peer_index, message, id);
|
||||
}
|
||||
|
||||
fn on_filterload(&self, message: types::FilterLoad) {
|
||||
|
|
|
@ -93,23 +93,24 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
|
|||
// TODO: process unknown transactions, etc...
|
||||
}
|
||||
|
||||
pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData) {
|
||||
pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData, id: u32) {
|
||||
trace!(target: "sync", "Got `getdata` message from peer#{}", peer_index);
|
||||
|
||||
self.server.serve_getdata(peer_index, message);
|
||||
self.server.serve_getdata(peer_index, message, id);
|
||||
}
|
||||
|
||||
pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
|
||||
pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32) {
|
||||
trace!(target: "sync", "Got `getblocks` message from peer#{}", peer_index);
|
||||
|
||||
self.server.serve_getblocks(peer_index, message);
|
||||
self.server.serve_getblocks(peer_index, message, id);
|
||||
}
|
||||
|
||||
pub fn on_peer_getheaders(&self, peer_index: usize, message: types::GetHeaders) {
|
||||
pub fn on_peer_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32) {
|
||||
trace!(target: "sync", "Got `getheaders` message from peer#{}", peer_index);
|
||||
|
||||
// do not serve getheaders requests until we are synchronized
|
||||
if self.client.lock().state().is_synchronizing() {
|
||||
self.executor.lock().execute(SynchronizationTask::Ignore(peer_index, id));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -124,7 +125,7 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
|
|||
need_wait
|
||||
};
|
||||
|
||||
self.server.serve_getheaders(peer_index, message);
|
||||
self.server.serve_getheaders(peer_index, message, id);
|
||||
if need_wait {
|
||||
self.server.wait_peer_requests_completed(peer_index);
|
||||
}
|
||||
|
@ -149,10 +150,10 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
|
|||
}
|
||||
}
|
||||
|
||||
pub fn on_peer_mempool(&self, peer_index: usize, _message: types::MemPool) {
|
||||
pub fn on_peer_mempool(&self, peer_index: usize, _message: types::MemPool, id: u32) {
|
||||
trace!(target: "sync", "Got `mempool` message from peer#{}", peer_index);
|
||||
|
||||
self.server.serve_mempool(peer_index);
|
||||
self.server.serve_mempool(peer_index, id);
|
||||
}
|
||||
|
||||
pub fn on_peer_filterload(&self, peer_index: usize, _message: types::FilterLoad) {
|
||||
|
@ -238,14 +239,14 @@ mod tests {
|
|||
}
|
||||
|
||||
impl OutboundSyncConnection for DummyOutboundSyncConnection {
|
||||
fn send_inventory(&self, _message: &types::Inv) {}
|
||||
fn send_inventory(&self, _message: &types::Inv, _id: u32, _is_final: bool) {}
|
||||
fn send_getdata(&self, _message: &types::GetData) {}
|
||||
fn send_getblocks(&self, _message: &types::GetBlocks) {}
|
||||
fn send_getheaders(&self, _message: &types::GetHeaders) {}
|
||||
fn send_transaction(&self, _message: &types::Tx) {}
|
||||
fn send_block(&self, _message: &types::Block) {}
|
||||
fn send_headers(&self, _message: &types::Headers) {}
|
||||
fn send_mempool(&self, _message: &types::MemPool) {}
|
||||
fn send_block(&self, _message: &types::Block, _id: u32, _is_final: bool) {}
|
||||
fn send_headers(&self, _message: &types::Headers, _id: u32, _is_final: bool) {}
|
||||
fn send_mempool(&self, _message: &types::MemPool, _id: u32, _is_final: bool) {}
|
||||
fn send_filterload(&self, _message: &types::FilterLoad) {}
|
||||
fn send_filteradd(&self, _message: &types::FilterAdd) {}
|
||||
fn send_filterclear(&self, _message: &types::FilterClear) {}
|
||||
|
@ -256,7 +257,8 @@ mod tests {
|
|||
fn send_compact_block(&self, _message: &types::CompactBlock) {}
|
||||
fn send_get_block_txn(&self, _message: &types::GetBlockTxn) {}
|
||||
fn send_block_txn(&self, _message: &types::BlockTxn) {}
|
||||
fn send_notfound(&self, _message: &types::NotFound) {}
|
||||
fn send_notfound(&self, _message: &types::NotFound, _id: u32, _is_final: bool) {}
|
||||
fn ignored(&self, _id: u32) {}
|
||||
}
|
||||
|
||||
fn create_local_node() -> (Core, Handle, Arc<Mutex<DummyTaskExecutor>>, Arc<DummyServer>, LocalNode<DummyTaskExecutor, DummyServer, SynchronizationClient<DummyTaskExecutor>>) {
|
||||
|
@ -294,9 +296,10 @@ mod tests {
|
|||
hash: genesis_block_hash.clone(),
|
||||
}
|
||||
];
|
||||
let dummy_id = 0;
|
||||
local_node.on_peer_getdata(peer_index, types::GetData {
|
||||
inventory: inventory.clone()
|
||||
});
|
||||
}, dummy_id);
|
||||
// => `getdata` is served
|
||||
let tasks = server.take_tasks();
|
||||
assert_eq!(tasks, vec![(peer_index, ServerTask::ServeGetData(inventory))]);
|
||||
|
|
|
@ -7,6 +7,7 @@ use message::types;
|
|||
use primitives::hash::H256;
|
||||
use p2p::OutboundSyncConnectionRef;
|
||||
use synchronization_chain::ChainRef;
|
||||
use synchronization_server::ServerTaskIndex;
|
||||
use local_node::PeersConnections;
|
||||
|
||||
pub type LocalSynchronizationTaskExecutorRef = Arc<Mutex<LocalSynchronizationTaskExecutor>>;
|
||||
|
@ -24,13 +25,15 @@ pub enum Task {
|
|||
/// Request blocks headers using full getheaders.block_locator_hashes.
|
||||
RequestBlocksHeaders(usize),
|
||||
/// Send block.
|
||||
SendBlock(usize, Block),
|
||||
SendBlock(usize, Block, ServerTaskIndex),
|
||||
/// Send notfound
|
||||
SendNotFound(usize, Vec<InventoryVector>),
|
||||
SendNotFound(usize, Vec<InventoryVector>, ServerTaskIndex),
|
||||
/// Send inventory
|
||||
SendInventory(usize, Vec<InventoryVector>),
|
||||
SendInventory(usize, Vec<InventoryVector>, ServerTaskIndex),
|
||||
/// Send headers
|
||||
SendHeaders(usize, Vec<BlockHeader>),
|
||||
SendHeaders(usize, Vec<BlockHeader>, ServerTaskIndex),
|
||||
/// Notify io about ignored request
|
||||
Ignore(usize, u32),
|
||||
}
|
||||
|
||||
/// Synchronization tasks executor
|
||||
|
@ -75,7 +78,6 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
|
|||
};
|
||||
|
||||
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
||||
let connection = &mut *connection;
|
||||
trace!(target: "sync", "Querying {} unknown blocks from peer#{}", getdata.inventory.len(), peer_index);
|
||||
connection.send_getdata(&getdata);
|
||||
}
|
||||
|
@ -89,53 +91,54 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
|
|||
};
|
||||
|
||||
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
||||
let connection = &mut *connection;
|
||||
trace!(target: "sync", "Request blocks hashes from peer#{} using getheaders", peer_index);
|
||||
connection.send_getheaders(&getheaders);
|
||||
}
|
||||
},
|
||||
Task::SendBlock(peer_index, block) => {
|
||||
Task::SendBlock(peer_index, block, id) => {
|
||||
let block_message = types::Block {
|
||||
block: block,
|
||||
};
|
||||
|
||||
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
||||
let connection = &mut *connection;
|
||||
trace!(target: "sync", "Sending block {:?} to peer#{}", block_message.block.hash(), peer_index);
|
||||
connection.send_block(&block_message);
|
||||
connection.send_block(&block_message, id.raw(), id.is_final());
|
||||
}
|
||||
},
|
||||
Task::SendNotFound(peer_index, unknown_inventory) => {
|
||||
Task::SendNotFound(peer_index, unknown_inventory, id) => {
|
||||
let notfound = types::NotFound {
|
||||
inventory: unknown_inventory,
|
||||
};
|
||||
|
||||
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
||||
let connection = &mut *connection;
|
||||
trace!(target: "sync", "Sending notfound to peer#{} with {} items", peer_index, notfound.inventory.len());
|
||||
connection.send_notfound(¬found);
|
||||
connection.send_notfound(¬found, id.raw(), id.is_final());
|
||||
}
|
||||
},
|
||||
Task::SendInventory(peer_index, inventory) => {
|
||||
Task::SendInventory(peer_index, inventory, id) => {
|
||||
let inventory = types::Inv {
|
||||
inventory: inventory,
|
||||
};
|
||||
|
||||
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
||||
let connection = &mut *connection;
|
||||
trace!(target: "sync", "Sending inventory to peer#{} with {} items", peer_index, inventory.inventory.len());
|
||||
connection.send_inventory(&inventory);
|
||||
connection.send_inventory(&inventory, id.raw(), id.is_final());
|
||||
}
|
||||
},
|
||||
Task::SendHeaders(peer_index, headers) => {
|
||||
Task::SendHeaders(peer_index, headers, id) => {
|
||||
let headers = types::Headers {
|
||||
headers: headers,
|
||||
};
|
||||
|
||||
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
||||
let connection = &mut *connection;
|
||||
trace!(target: "sync", "Sending headers to peer#{} with {} items", peer_index, headers.headers.len());
|
||||
connection.send_headers(&headers);
|
||||
connection.send_headers(&headers, id.raw(), id.is_final());
|
||||
}
|
||||
},
|
||||
Task::Ignore(peer_index, id) => {
|
||||
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
||||
trace!(target: "sync", "Ignoring request from peer#{} with id {}", peer_index, id);
|
||||
connection.ignored(id);
|
||||
}
|
||||
},
|
||||
}
|
||||
|
@ -194,4 +197,4 @@ pub mod tests {
|
|||
self.waiter.notify_one();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,10 +14,10 @@ use message::types;
|
|||
|
||||
/// Synchronization requests server trait
|
||||
pub trait Server : Send + 'static {
|
||||
fn serve_getdata(&self, peer_index: usize, message: types::GetData);
|
||||
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks);
|
||||
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders);
|
||||
fn serve_mempool(&self, peer_index: usize);
|
||||
fn serve_getdata(&self, peer_index: usize, message: types::GetData, id: u32);
|
||||
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32);
|
||||
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32);
|
||||
fn serve_mempool(&self, peer_index: usize, id: u32);
|
||||
fn wait_peer_requests_completed(&self, peer_index: usize);
|
||||
}
|
||||
|
||||
|
@ -42,10 +42,59 @@ struct ServerQueue {
|
|||
is_stopping: AtomicBool,
|
||||
queue_ready: Arc<Condvar>,
|
||||
peers_queue: VecDeque<usize>,
|
||||
tasks_queue: HashMap<usize, VecDeque<ServerTask>>,
|
||||
tasks_queue: HashMap<usize, VecDeque<IndexedServerTask>>,
|
||||
peer_waiters: HashMap<usize, Arc<PeerRequestsWaiter>>,
|
||||
}
|
||||
|
||||
/// `ServerTask` index.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum ServerTaskIndex {
|
||||
/// `Partial` is used when server needs to send more than one response for request.
|
||||
Partial(u32),
|
||||
/// `Final` task task can be preceded by many `Partial` tasks with the same id.
|
||||
Final(u32),
|
||||
}
|
||||
|
||||
impl ServerTaskIndex {
|
||||
pub fn raw(&self) -> u32 {
|
||||
match *self {
|
||||
ServerTaskIndex::Partial(id) => id,
|
||||
ServerTaskIndex::Final(id) => id,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_final(&self) -> bool {
|
||||
match *self {
|
||||
ServerTaskIndex::Partial(_) => false,
|
||||
ServerTaskIndex::Final(_) => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Server tests together with unique id assigned to it
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct IndexedServerTask {
|
||||
/// Task itself.
|
||||
task: ServerTask,
|
||||
/// Task id.
|
||||
id: ServerTaskIndex,
|
||||
}
|
||||
|
||||
impl IndexedServerTask {
|
||||
fn new(task: ServerTask, id: ServerTaskIndex) -> Self {
|
||||
IndexedServerTask {
|
||||
task: task,
|
||||
id: id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IndexedServerTask {
|
||||
fn ignore(id: u32) -> Self {
|
||||
IndexedServerTask::new(ServerTask::Ignore, ServerTaskIndex::Final(id))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum ServerTask {
|
||||
ServeGetData(Vec<InventoryVector>),
|
||||
|
@ -54,6 +103,7 @@ pub enum ServerTask {
|
|||
ServeMempool,
|
||||
ReturnNotFound(Vec<InventoryVector>),
|
||||
ReturnBlock(H256),
|
||||
Ignore,
|
||||
}
|
||||
|
||||
impl SynchronizationServer {
|
||||
|
@ -97,11 +147,18 @@ impl SynchronizationServer {
|
|||
})
|
||||
};
|
||||
|
||||
match server_task {
|
||||
let (peer_index, indexed_task) = match server_task {
|
||||
Some((peer_index, indexed_task)) => (peer_index, indexed_task),
|
||||
// no tasks after wake-up => stopping or pausing
|
||||
_ => continue,
|
||||
};
|
||||
|
||||
match indexed_task.task {
|
||||
// `getdata` => `notfound` + `block` + ...
|
||||
Some((peer_index, ServerTask::ServeGetData(inventory))) => {
|
||||
ServerTask::ServeGetData(inventory) => {
|
||||
let mut unknown_items: Vec<InventoryVector> = Vec::new();
|
||||
let mut new_tasks: Vec<ServerTask> = Vec::new();
|
||||
let mut new_tasks: Vec<IndexedServerTask> = Vec::new();
|
||||
let task_id = indexed_task.id.raw();
|
||||
{
|
||||
let chain = chain.read();
|
||||
let storage = chain.storage();
|
||||
|
@ -109,7 +166,10 @@ impl SynchronizationServer {
|
|||
match item.inv_type {
|
||||
InventoryType::MessageBlock => {
|
||||
match storage.block_number(&item.hash) {
|
||||
Some(_) => new_tasks.push(ServerTask::ReturnBlock(item.hash.clone())),
|
||||
Some(_) => {
|
||||
let task = IndexedServerTask::new(ServerTask::ReturnBlock(item.hash.clone()), ServerTaskIndex::Partial(task_id));
|
||||
new_tasks.push(task);
|
||||
},
|
||||
None => unknown_items.push(item),
|
||||
}
|
||||
},
|
||||
|
@ -120,18 +180,25 @@ impl SynchronizationServer {
|
|||
// respond with `notfound` message for unknown data
|
||||
if !unknown_items.is_empty() {
|
||||
trace!(target: "sync", "Going to respond with notfound with {} items to peer#{}", unknown_items.len(), peer_index);
|
||||
new_tasks.push(ServerTask::ReturnNotFound(unknown_items));
|
||||
let task = IndexedServerTask::new(ServerTask::ReturnNotFound(unknown_items), ServerTaskIndex::Partial(task_id));
|
||||
new_tasks.push(task);
|
||||
}
|
||||
// schedule data responses
|
||||
if !new_tasks.is_empty() {
|
||||
trace!(target: "sync", "Going to respond with data with {} items to peer#{}", new_tasks.len(), peer_index);
|
||||
// mark last task as the final one
|
||||
if let Some(task) = new_tasks.last_mut() {
|
||||
task.id = ServerTaskIndex::Final(task_id);
|
||||
}
|
||||
queue.lock().add_tasks(peer_index, new_tasks);
|
||||
} else {
|
||||
executor.lock().execute(Task::Ignore(peer_index, task_id));
|
||||
}
|
||||
// inform that we have processed task for peer
|
||||
queue.lock().task_processed(peer_index);
|
||||
},
|
||||
// `getblocks` => `inventory`
|
||||
Some((peer_index, ServerTask::ServeGetBlocks(best_block, hash_stop))) => {
|
||||
ServerTask::ServeGetBlocks(best_block, hash_stop) => {
|
||||
let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_block, &hash_stop, 500);
|
||||
if !blocks_hashes.is_empty() {
|
||||
trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", blocks_hashes.len(), peer_index);
|
||||
|
@ -139,25 +206,29 @@ impl SynchronizationServer {
|
|||
inv_type: InventoryType::MessageBlock,
|
||||
hash: hash,
|
||||
}).collect();
|
||||
executor.lock().execute(Task::SendInventory(peer_index, inventory));
|
||||
executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id));
|
||||
} else {
|
||||
executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw()));
|
||||
}
|
||||
// inform that we have processed task for peer
|
||||
queue.lock().task_processed(peer_index);
|
||||
},
|
||||
// `getheaders` => `headers`
|
||||
Some((peer_index, ServerTask::ServeGetHeaders(best_block, hash_stop))) => {
|
||||
ServerTask::ServeGetHeaders(best_block, hash_stop) => {
|
||||
// What if we have no common blocks with peer at all? Maybe drop connection or penalize peer?
|
||||
// https://github.com/ethcore/parity-bitcoin/pull/91#discussion_r86734568
|
||||
let blocks_headers = SynchronizationServer::blocks_headers_after(&chain, &best_block, &hash_stop, 2000);
|
||||
if !blocks_headers.is_empty() {
|
||||
trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_headers.len(), peer_index);
|
||||
executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers));
|
||||
executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers, indexed_task.id));
|
||||
} else {
|
||||
executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw()));
|
||||
}
|
||||
// inform that we have processed task for peer
|
||||
queue.lock().task_processed(peer_index);
|
||||
},
|
||||
// `mempool` => `inventory`
|
||||
Some((peer_index, ServerTask::ServeMempool)) => {
|
||||
ServerTask::ServeMempool => {
|
||||
let inventory: Vec<_> = chain.read()
|
||||
.memory_pool()
|
||||
.get_transactions_ids()
|
||||
|
@ -169,27 +240,32 @@ impl SynchronizationServer {
|
|||
.collect();
|
||||
if !inventory.is_empty() {
|
||||
trace!(target: "sync", "Going to respond with {} memory-pool transactions ids to peer#{}", inventory.len(), peer_index);
|
||||
executor.lock().execute(Task::SendInventory(peer_index, inventory));
|
||||
executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id));
|
||||
} else {
|
||||
executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw()));
|
||||
}
|
||||
// inform that we have processed task for peer
|
||||
queue.lock().task_processed(peer_index);
|
||||
},
|
||||
// `notfound`
|
||||
Some((peer_index, ServerTask::ReturnNotFound(inventory))) => {
|
||||
executor.lock().execute(Task::SendNotFound(peer_index, inventory));
|
||||
ServerTask::ReturnNotFound(inventory) => {
|
||||
executor.lock().execute(Task::SendNotFound(peer_index, inventory, indexed_task.id));
|
||||
// inform that we have processed task for peer
|
||||
queue.lock().task_processed(peer_index);
|
||||
},
|
||||
// `block`
|
||||
Some((peer_index, ServerTask::ReturnBlock(block_hash))) => {
|
||||
ServerTask::ReturnBlock(block_hash) => {
|
||||
let block = chain.read().storage().block(db::BlockRef::Hash(block_hash))
|
||||
.expect("we have checked that block exists in ServeGetData; db is append-only; qed");
|
||||
executor.lock().execute(Task::SendBlock(peer_index, block));
|
||||
executor.lock().execute(Task::SendBlock(peer_index, block, indexed_task.id));
|
||||
// inform that we have processed task for peer
|
||||
queue.lock().task_processed(peer_index);
|
||||
},
|
||||
// no tasks after wake-up => stopping or pausing
|
||||
None => (),
|
||||
// ignore
|
||||
ServerTask::Ignore => {
|
||||
executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw()));
|
||||
queue.lock().task_processed(peer_index);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -273,38 +349,38 @@ impl Drop for SynchronizationServer {
|
|||
}
|
||||
|
||||
impl Server for SynchronizationServer {
|
||||
fn serve_getdata(&self, peer_index: usize, message: types::GetData) {
|
||||
self.queue.lock().add_task(peer_index, ServerTask::ServeGetData(message.inventory));
|
||||
fn serve_getdata(&self, peer_index: usize, message: types::GetData, id: u32) {
|
||||
let task = IndexedServerTask::new(ServerTask::ServeGetData(message.inventory), ServerTaskIndex::Final(id));
|
||||
self.queue.lock().add_task(peer_index, task);
|
||||
}
|
||||
|
||||
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
|
||||
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32) {
|
||||
if let Some(best_common_block) = self.locate_known_block_hash(message.block_locator_hashes) {
|
||||
trace!(
|
||||
target: "sync",
|
||||
"Best common block with peer#{} is block#{}: {:?}",
|
||||
peer_index,
|
||||
best_common_block.number,
|
||||
best_common_block.hash.to_reversed_str(),
|
||||
);
|
||||
self.queue.lock().add_task(peer_index, ServerTask::ServeGetBlocks(best_common_block, message.hash_stop));
|
||||
trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
|
||||
let task = IndexedServerTask::new(ServerTask::ServeGetBlocks(best_common_block, message.hash_stop), ServerTaskIndex::Final(id));
|
||||
self.queue.lock().add_task(peer_index, task);
|
||||
}
|
||||
else {
|
||||
trace!(target: "sync", "No common blocks with peer#{}", peer_index);
|
||||
self.queue.lock().add_task(peer_index, IndexedServerTask::ignore(id));
|
||||
}
|
||||
}
|
||||
|
||||
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders) {
|
||||
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32) {
|
||||
if let Some(best_common_block) = self.locate_known_block_header(message.block_locator_hashes) {
|
||||
trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
|
||||
self.queue.lock().add_task(peer_index, ServerTask::ServeGetHeaders(best_common_block, message.hash_stop));
|
||||
trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash.to_reversed_str());
|
||||
let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(best_common_block, message.hash_stop), ServerTaskIndex::Final(id));
|
||||
self.queue.lock().add_task(peer_index, task);
|
||||
}
|
||||
else {
|
||||
trace!(target: "sync", "No common blocks headers with peer#{}", peer_index);
|
||||
self.queue.lock().add_task(peer_index, IndexedServerTask::ignore(id));
|
||||
}
|
||||
}
|
||||
|
||||
fn serve_mempool(&self, peer_index: usize) {
|
||||
self.queue.lock().add_task(peer_index, ServerTask::ServeMempool);
|
||||
fn serve_mempool(&self, peer_index: usize, id: u32) {
|
||||
let task = IndexedServerTask::new(ServerTask::ServeMempool, ServerTaskIndex::Final(id));
|
||||
self.queue.lock().add_task(peer_index, task);
|
||||
}
|
||||
|
||||
fn wait_peer_requests_completed(&self, peer_index: usize) {
|
||||
|
@ -328,7 +404,7 @@ impl ServerQueue {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn next_task(&mut self) -> Option<(usize, ServerTask)> {
|
||||
pub fn next_task(&mut self) -> Option<(usize, IndexedServerTask)> {
|
||||
self.peers_queue.pop_front()
|
||||
.map(|peer| {
|
||||
let (peer_task, no_tasks_left) = {
|
||||
|
@ -359,7 +435,7 @@ impl ServerQueue {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn add_task(&mut self, peer_index: usize, task: ServerTask) {
|
||||
pub fn add_task(&mut self, peer_index: usize, task: IndexedServerTask) {
|
||||
match self.tasks_queue.entry(peer_index) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
let add_to_peers_queue = entry.get().is_empty();
|
||||
|
@ -378,7 +454,7 @@ impl ServerQueue {
|
|||
self.queue_ready.notify_one();
|
||||
}
|
||||
|
||||
pub fn add_tasks(&mut self, peer_index: usize, tasks: Vec<ServerTask>) {
|
||||
pub fn add_tasks(&mut self, peer_index: usize, tasks: Vec<IndexedServerTask>) {
|
||||
match self.tasks_queue.entry(peer_index) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
let add_to_peers_queue = entry.get().is_empty();
|
||||
|
@ -448,7 +524,7 @@ pub mod tests {
|
|||
use synchronization_executor::Task;
|
||||
use synchronization_executor::tests::DummyTaskExecutor;
|
||||
use synchronization_chain::Chain;
|
||||
use super::{Server, ServerTask, SynchronizationServer};
|
||||
use super::{Server, ServerTask, SynchronizationServer, ServerTaskIndex};
|
||||
|
||||
pub struct DummyServer {
|
||||
tasks: Mutex<Vec<(usize, ServerTask)>>,
|
||||
|
@ -467,25 +543,25 @@ pub mod tests {
|
|||
}
|
||||
|
||||
impl Server for DummyServer {
|
||||
fn serve_getdata(&self, peer_index: usize, message: types::GetData) {
|
||||
fn serve_getdata(&self, peer_index: usize, message: types::GetData, _id: u32) {
|
||||
self.tasks.lock().push((peer_index, ServerTask::ServeGetData(message.inventory)));
|
||||
}
|
||||
|
||||
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
|
||||
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, _id: u32) {
|
||||
self.tasks.lock().push((peer_index, ServerTask::ServeGetBlocks(db::BestBlock {
|
||||
number: 0,
|
||||
hash: message.block_locator_hashes[0].clone(),
|
||||
}, message.hash_stop)));
|
||||
}
|
||||
|
||||
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders) {
|
||||
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, _id: u32) {
|
||||
self.tasks.lock().push((peer_index, ServerTask::ServeGetHeaders(db::BestBlock {
|
||||
number: 0,
|
||||
hash: message.block_locator_hashes[0].clone(),
|
||||
}, message.hash_stop)));
|
||||
}
|
||||
|
||||
fn serve_mempool(&self, peer_index: usize) {
|
||||
fn serve_mempool(&self, peer_index: usize, _id: u32) {
|
||||
self.tasks.lock().push((peer_index, ServerTask::ServeMempool));
|
||||
}
|
||||
|
||||
|
@ -510,12 +586,13 @@ pub mod tests {
|
|||
hash: H256::default(),
|
||||
}
|
||||
];
|
||||
let dummy_id = 0;
|
||||
server.serve_getdata(0, types::GetData {
|
||||
inventory: inventory.clone(),
|
||||
});
|
||||
}, dummy_id);
|
||||
// => respond with notfound
|
||||
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
||||
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory)]);
|
||||
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::Final(dummy_id))]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -528,12 +605,13 @@ pub mod tests {
|
|||
hash: test_data::genesis().hash(),
|
||||
}
|
||||
];
|
||||
let dummy_id = 0;
|
||||
server.serve_getdata(0, types::GetData {
|
||||
inventory: inventory.clone(),
|
||||
});
|
||||
}, dummy_id);
|
||||
// => respond with block
|
||||
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
||||
assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis())]);
|
||||
assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis(), ServerTaskIndex::Final(dummy_id))]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -541,14 +619,15 @@ pub mod tests {
|
|||
let (_, executor, server) = create_synchronization_server();
|
||||
// when asking for blocks hashes
|
||||
let genesis_block_hash = test_data::genesis().hash();
|
||||
let dummy_id = 5;
|
||||
server.serve_getblocks(0, types::GetBlocks {
|
||||
version: 0,
|
||||
block_locator_hashes: vec![genesis_block_hash.clone()],
|
||||
hash_stop: H256::default(),
|
||||
});
|
||||
}, dummy_id);
|
||||
// => no response
|
||||
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
|
||||
assert_eq!(tasks, vec![]);
|
||||
assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -556,18 +635,19 @@ pub mod tests {
|
|||
let (chain, executor, server) = create_synchronization_server();
|
||||
chain.write().insert_best_block(test_data::block_h1().hash(), test_data::block_h1()).expect("Db write error");
|
||||
// when asking for blocks hashes
|
||||
let dummy_id = 0;
|
||||
server.serve_getblocks(0, types::GetBlocks {
|
||||
version: 0,
|
||||
block_locator_hashes: vec![test_data::genesis().hash()],
|
||||
hash_stop: H256::default(),
|
||||
});
|
||||
}, dummy_id);
|
||||
// => responds with inventory
|
||||
let inventory = vec![InventoryVector {
|
||||
inv_type: InventoryType::MessageBlock,
|
||||
hash: test_data::block_h1().hash(),
|
||||
}];
|
||||
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
||||
assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]);
|
||||
assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::Final(dummy_id))]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -575,14 +655,15 @@ pub mod tests {
|
|||
let (_, executor, server) = create_synchronization_server();
|
||||
// when asking for blocks hashes
|
||||
let genesis_block_hash = test_data::genesis().hash();
|
||||
let dummy_id = 6;
|
||||
server.serve_getheaders(0, types::GetHeaders {
|
||||
version: 0,
|
||||
block_locator_hashes: vec![genesis_block_hash.clone()],
|
||||
hash_stop: H256::default(),
|
||||
});
|
||||
}, dummy_id);
|
||||
// => no response
|
||||
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
|
||||
assert_eq!(tasks, vec![]);
|
||||
assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -590,27 +671,29 @@ pub mod tests {
|
|||
let (chain, executor, server) = create_synchronization_server();
|
||||
chain.write().insert_best_block(test_data::block_h1().hash(), test_data::block_h1()).expect("Db write error");
|
||||
// when asking for blocks hashes
|
||||
let dummy_id = 0;
|
||||
server.serve_getheaders(0, types::GetHeaders {
|
||||
version: 0,
|
||||
block_locator_hashes: vec![test_data::genesis().hash()],
|
||||
hash_stop: H256::default(),
|
||||
});
|
||||
}, dummy_id);
|
||||
// => responds with headers
|
||||
let headers = vec![
|
||||
test_data::block_h1().block_header,
|
||||
];
|
||||
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
||||
assert_eq!(tasks, vec![Task::SendHeaders(0, headers)]);
|
||||
assert_eq!(tasks, vec![Task::SendHeaders(0, headers, ServerTaskIndex::Final(dummy_id))]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn server_mempool_do_not_responds_inventory_when_empty_memory_pool() {
|
||||
let (_, executor, server) = create_synchronization_server();
|
||||
// when asking for memory pool transactions ids
|
||||
server.serve_mempool(0);
|
||||
let dummy_id = 9;
|
||||
server.serve_mempool(0, dummy_id);
|
||||
// => no response
|
||||
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
|
||||
assert_eq!(tasks, vec![]);
|
||||
assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -621,13 +704,14 @@ pub mod tests {
|
|||
let transaction_hash = transaction.hash();
|
||||
chain.write().memory_pool_mut().insert_verified(transaction);
|
||||
// when asking for memory pool transactions ids
|
||||
server.serve_mempool(0);
|
||||
let dummy_id = 0;
|
||||
server.serve_mempool(0, dummy_id);
|
||||
// => respond with inventory
|
||||
let inventory = vec![InventoryVector {
|
||||
inv_type: InventoryType::MessageTx,
|
||||
hash: transaction_hash,
|
||||
}];
|
||||
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
||||
assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]);
|
||||
assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::Final(dummy_id))]);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue