polishing p2p synchronous syncing

This commit is contained in:
debris 2016-11-15 13:11:05 +01:00
parent abdf6d38ca
commit dc6acc6eee
6 changed files with 18 additions and 142 deletions

View File

@ -1,7 +1,6 @@
use tokio_core::io::{write_all, WriteAll};
use message::{Payload, Message};
use session::Session;
use io::{SharedTcpStream, WriteMessage, write_message, read_any_message, ReadAnyMessage};
use io::{SharedTcpStream, read_any_message, ReadAnyMessage};
use util::PeerInfo;
pub struct Channel {

View File

@ -1,6 +1,6 @@
use std::sync::Arc;
use parking_lot::Mutex;
use message::{Payload, serialize_payload, Message};
use message::{Payload, Message};
use p2p::Context;
use util::{PeerInfo, ConfigurableSynchronizer, ResponseQueue, Synchronizer, Responses};
@ -59,7 +59,9 @@ impl PeerContext {
}
pub fn declare_response(&self) -> u32 {
self.synchronizer.lock().declare_response()
let d = self.synchronizer.lock().declare_response();
trace!("declared response: {}", d);
d
}
pub fn send_response_inline<T>(&self, payload: &T) where T: Payload {
@ -69,6 +71,7 @@ impl PeerContext {
/// Responses are sent in order defined by synchronizer.
pub fn send_response<T>(&self, payload: &T, id: u32, is_final: bool) where T: Payload {
trace!("response ready: {}, id: {}, final: {}", T::command(), id, is_final);
let mut sync = self.synchronizer.lock();
let mut queue = self.response_queue.lock();
if is_final {

View File

@ -5,7 +5,7 @@ use message::{Error, Command, deserialize_payload, Payload};
use message::types::{GetAddr, Addr};
use protocol::Protocol;
use net::PeerContext;
use util::{Direction, PeerInfo};
use util::Direction;
pub struct AddrProtocol {
/// Context

View File

@ -5,20 +5,8 @@ use message::types::{Ping, Pong};
use message::common::Command;
use protocol::Protocol;
use net::PeerContext;
use util::{PeerId, PeerInfo};
use util::nonce::{NonceGenerator, RandomNonce};
//pub trait PingContext: Send + Sync {
//fn send_to_peer<T>(context: Arc<Self>, peer: PeerId, payload: &T) where Self: Sized, T: Payload;
//}
//impl PingContext for Context {
//fn send_to_peer<T>(context: Arc<Self>, peer: PeerId, payload: &T) where T: Payload {
//let send = Context::send_to_peer(context.clone(), peer, payload);
//context.spawn(send);
//}
//}
pub struct PingProtocol<T = RandomNonce, C = PeerContext> {
/// Context
context: Arc<C>,
@ -39,7 +27,6 @@ impl PingProtocol {
}
impl Protocol for PingProtocol {
//impl<T, PeerContext> Protocol for PingProtocol<T, PeerContext> where T: NonceGenerator + Send, C: PingContext {
fn initialize(&mut self) {
// bitcoind always sends ping, let's do the same
let nonce = self.nonce_generator.get();
@ -63,111 +50,3 @@ impl Protocol for PingProtocol {
Ok(())
}
}
//#[cfg(test)]
//mod tests {
//use std::sync::Arc;
//use parking_lot::Mutex;
//use bytes::Bytes;
//use message::{Payload, serialize_payload, Magic};
//use message::types::{Ping, Pong};
//use util::{PeerId, PeerInfo, Direction};
//use util::nonce::StaticNonce;
//use protocol::Protocol;
//use super::{PingProtocol, PingContext};
//#[derive(Default)]
//struct TestPingContext {
//version: u32,
//messages: Mutex<Vec<(PeerId, Bytes)>>,
//}
//impl PingContext for TestPingContext {
//fn send_to_peer<T>(context: Arc<Self>, peer: PeerId, payload: &T) where T: Payload {
//let value = (peer, serialize_payload(payload, context.version).unwrap());
//context.messages.lock().push(value);
//}
//}
//#[test]
//fn test_ping_init() {
//let ping_context = Arc::new(TestPingContext::default());
//let peer = 99;
//let nonce = 1000;
//let expected_message = serialize_payload(&Ping::new(nonce), 0).unwrap();
//let mut ping_protocol = PingProtocol {
//context: ping_context.clone(),
//info: PeerInfo {
//id: peer,
//address: "0.0.0.0:8080".parse().unwrap(),
//direction: Direction::Inbound,
//version: 0,
//magic: Magic::Testnet,
//},
//nonce_generator: StaticNonce::new(nonce),
//last_ping_nonce: None,
//};
//ping_protocol.initialize();
//let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone();
//assert_eq!(messages.len(), 1);
//assert_eq!(messages[0].0, peer);
//assert_eq!(messages[0].1, expected_message);
//assert_eq!(ping_protocol.last_ping_nonce, Some(nonce));
//}
//#[test]
//fn test_ping_on_message_ping() {
//let ping_context = Arc::new(TestPingContext::default());
//let peer = 99;
//let nonce = 1000;
//let command = "ping".into();
//let message = serialize_payload(&Ping::new(nonce), 0).unwrap();
//let expected_message = serialize_payload(&Pong::new(nonce), 0).unwrap();
//let mut ping_protocol = PingProtocol {
//context: ping_context.clone(),
//info: PeerInfo {
//id: peer,
//address: "0.0.0.0:8080".parse().unwrap(),
//direction: Direction::Inbound,
//version: 0,
//magic: Magic::Testnet,
//},
//nonce_generator: StaticNonce::new(nonce),
//last_ping_nonce: None,
//};
//assert!(ping_protocol.on_message(&command, &message).is_ok());
//let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone();
//assert_eq!(messages.len(), 1);
//assert_eq!(messages[0].0, peer);
//assert_eq!(messages[0].1, expected_message);
//assert_eq!(ping_protocol.last_ping_nonce, None);
//}
//#[test]
//fn test_ping_on_message_pong() {
//let ping_context = Arc::new(TestPingContext::default());
//let peer = 99;
//let nonce = 1000;
//let command = "pong".into();
//let message = serialize_payload(&Pong::new(nonce), 0).unwrap();
//let mut ping_protocol = PingProtocol {
//context: ping_context.clone(),
//info: PeerInfo {
//id: peer,
//address: "0.0.0.0:8080".parse().unwrap(),
//direction: Direction::Inbound,
//version: 0,
//magic: Magic::Testnet,
//},
//nonce_generator: StaticNonce::new(nonce),
//last_ping_nonce: Some(nonce),
//};
//assert!(ping_protocol.on_message(&command, &message).is_ok());
//let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone();
//assert_eq!(messages.len(), 0);
//assert_eq!(ping_protocol.last_ping_nonce, None);
//}
//}

View File

@ -2,11 +2,8 @@ use std::sync::Arc;
use bytes::Bytes;
use message::{Command, Error, Payload, types, deserialize_payload};
use protocol::Protocol;
use util::{PeerInfo, PeerId};
use net::PeerContext;
const UNIMPLEMENTED_TASK_ID: u32 = 0;
pub type InboundSyncConnectionRef = Box<InboundSyncConnection>;
pub type OutboundSyncConnectionRef = Box<OutboundSyncConnection>;
pub type LocalSyncNodeRef = Box<LocalSyncNode>;
@ -159,7 +156,7 @@ impl OutboundSyncConnection for OutboundSync {
pub struct SyncProtocol {
inbound_connection: InboundSyncConnectionRef,
info: PeerInfo,
context: Arc<PeerContext>,
}
impl SyncProtocol {
@ -168,33 +165,33 @@ impl SyncProtocol {
let inbound_connection = context.global().create_sync_session(0, outbound_connection);
SyncProtocol {
inbound_connection: inbound_connection,
info: context.info().clone(),
context: context,
}
}
}
impl Protocol for SyncProtocol {
fn initialize(&mut self) {
self.inbound_connection.start_sync_session(self.info.version);
self.inbound_connection.start_sync_session(self.context.info().version);
}
fn on_message(&mut self, command: &Command, payload: &Bytes) -> Result<(), Error> {
let version = self.info.version;
let version = self.context.info().version;
if command == &types::Inv::command() {
let message: types::Inv = try!(deserialize_payload(payload, version));
self.inbound_connection.on_inventory(message);
}
else if command == &types::GetData::command() {
let message: types::GetData = try!(deserialize_payload(payload, version));
self.inbound_connection.on_getdata(message, UNIMPLEMENTED_TASK_ID);
self.inbound_connection.on_getdata(message, self.context.declare_response());
}
else if command == &types::GetBlocks::command() {
let message: types::GetBlocks = try!(deserialize_payload(payload, version));
self.inbound_connection.on_getblocks(message, UNIMPLEMENTED_TASK_ID);
self.inbound_connection.on_getblocks(message, self.context.declare_response());
}
else if command == &types::GetHeaders::command() {
let message: types::GetHeaders = try!(deserialize_payload(payload, version));
self.inbound_connection.on_getheaders(message, UNIMPLEMENTED_TASK_ID);
self.inbound_connection.on_getheaders(message, self.context.declare_response());
}
else if command == &types::Tx::command() {
let message: types::Tx = try!(deserialize_payload(payload, version));
@ -206,7 +203,7 @@ impl Protocol for SyncProtocol {
}
else if command == &types::MemPool::command() {
let message: types::MemPool = try!(deserialize_payload(payload, version));
self.inbound_connection.on_mempool(message, UNIMPLEMENTED_TASK_ID);
self.inbound_connection.on_mempool(message, self.context.declare_response());
}
else if command == &types::Headers::command() {
let message: types::Headers = try!(deserialize_payload(payload, version));

View File

@ -5,7 +5,7 @@ use message::{Command, Error};
use p2p::Context;
use net::PeerContext;
use protocol::{Protocol, PingProtocol, SyncProtocol, AddrProtocol, SeednodeProtocol};
use util::{ConfigurableSynchronizer, PeerInfo};
use util::PeerInfo;
pub trait SessionFactory {
fn new_session(context: Arc<Context>, info: PeerInfo, synchronous: bool) -> Session;
@ -36,17 +36,15 @@ impl SessionFactory for NormalSessionFactory {
}
pub struct Session {
peer_context: Arc<PeerContext>,
_peer_context: Arc<PeerContext>,
protocols: Mutex<Vec<Box<Protocol>>>,
synchronizer: Mutex<ConfigurableSynchronizer>,
}
impl Session {
pub fn new(peer_context: Arc<PeerContext>, protocols: Vec<Box<Protocol>>) -> Self {
Session {
peer_context: peer_context,
_peer_context: peer_context,
protocols: Mutex::new(protocols),
synchronizer: Mutex::new(ConfigurableSynchronizer::new(false)),
}
}