diff --git a/p2p/src/net/channel.rs b/p2p/src/net/channel.rs index 72b98fed..fb255f55 100644 --- a/p2p/src/net/channel.rs +++ b/p2p/src/net/channel.rs @@ -1,7 +1,6 @@ use tokio_core::io::{write_all, WriteAll}; -use message::{Payload, Message}; use session::Session; -use io::{SharedTcpStream, WriteMessage, write_message, read_any_message, ReadAnyMessage}; +use io::{SharedTcpStream, read_any_message, ReadAnyMessage}; use util::PeerInfo; pub struct Channel { diff --git a/p2p/src/net/peer_context.rs b/p2p/src/net/peer_context.rs index 97668ea5..02ce45fd 100644 --- a/p2p/src/net/peer_context.rs +++ b/p2p/src/net/peer_context.rs @@ -1,6 +1,6 @@ use std::sync::Arc; use parking_lot::Mutex; -use message::{Payload, serialize_payload, Message}; +use message::{Payload, Message}; use p2p::Context; use util::{PeerInfo, ConfigurableSynchronizer, ResponseQueue, Synchronizer, Responses}; @@ -59,7 +59,9 @@ impl PeerContext { } pub fn declare_response(&self) -> u32 { - self.synchronizer.lock().declare_response() + let d = self.synchronizer.lock().declare_response(); + trace!("declared response: {}", d); + d } pub fn send_response_inline(&self, payload: &T) where T: Payload { @@ -69,6 +71,7 @@ impl PeerContext { /// Responses are sent in order defined by synchronizer. pub fn send_response(&self, payload: &T, id: u32, is_final: bool) where T: Payload { + trace!("response ready: {}, id: {}, final: {}", T::command(), id, is_final); let mut sync = self.synchronizer.lock(); let mut queue = self.response_queue.lock(); if is_final { diff --git a/p2p/src/protocol/addr.rs b/p2p/src/protocol/addr.rs index 1bd12f81..05f8ee79 100644 --- a/p2p/src/protocol/addr.rs +++ b/p2p/src/protocol/addr.rs @@ -5,7 +5,7 @@ use message::{Error, Command, deserialize_payload, Payload}; use message::types::{GetAddr, Addr}; use protocol::Protocol; use net::PeerContext; -use util::{Direction, PeerInfo}; +use util::Direction; pub struct AddrProtocol { /// Context diff --git a/p2p/src/protocol/ping.rs b/p2p/src/protocol/ping.rs index 78b377b1..76022bdb 100644 --- a/p2p/src/protocol/ping.rs +++ b/p2p/src/protocol/ping.rs @@ -5,20 +5,8 @@ use message::types::{Ping, Pong}; use message::common::Command; use protocol::Protocol; use net::PeerContext; -use util::{PeerId, PeerInfo}; use util::nonce::{NonceGenerator, RandomNonce}; -//pub trait PingContext: Send + Sync { - //fn send_to_peer(context: Arc, peer: PeerId, payload: &T) where Self: Sized, T: Payload; -//} - -//impl PingContext for Context { - //fn send_to_peer(context: Arc, peer: PeerId, payload: &T) where T: Payload { - //let send = Context::send_to_peer(context.clone(), peer, payload); - //context.spawn(send); - //} -//} - pub struct PingProtocol { /// Context context: Arc, @@ -39,7 +27,6 @@ impl PingProtocol { } impl Protocol for PingProtocol { -//impl Protocol for PingProtocol where T: NonceGenerator + Send, C: PingContext { fn initialize(&mut self) { // bitcoind always sends ping, let's do the same let nonce = self.nonce_generator.get(); @@ -63,111 +50,3 @@ impl Protocol for PingProtocol { Ok(()) } } - -//#[cfg(test)] -//mod tests { - //use std::sync::Arc; - //use parking_lot::Mutex; - //use bytes::Bytes; - //use message::{Payload, serialize_payload, Magic}; - //use message::types::{Ping, Pong}; - //use util::{PeerId, PeerInfo, Direction}; - //use util::nonce::StaticNonce; - //use protocol::Protocol; - //use super::{PingProtocol, PingContext}; - - //#[derive(Default)] - //struct TestPingContext { - //version: u32, - //messages: Mutex>, - //} - - //impl PingContext for TestPingContext { - //fn send_to_peer(context: Arc, peer: PeerId, payload: &T) where T: Payload { - //let value = (peer, serialize_payload(payload, context.version).unwrap()); - //context.messages.lock().push(value); - //} - //} - - //#[test] - //fn test_ping_init() { - //let ping_context = Arc::new(TestPingContext::default()); - //let peer = 99; - //let nonce = 1000; - //let expected_message = serialize_payload(&Ping::new(nonce), 0).unwrap(); - //let mut ping_protocol = PingProtocol { - //context: ping_context.clone(), - //info: PeerInfo { - //id: peer, - //address: "0.0.0.0:8080".parse().unwrap(), - //direction: Direction::Inbound, - //version: 0, - //magic: Magic::Testnet, - //}, - //nonce_generator: StaticNonce::new(nonce), - //last_ping_nonce: None, - //}; - - //ping_protocol.initialize(); - //let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone(); - //assert_eq!(messages.len(), 1); - //assert_eq!(messages[0].0, peer); - //assert_eq!(messages[0].1, expected_message); - //assert_eq!(ping_protocol.last_ping_nonce, Some(nonce)); - //} - - //#[test] - //fn test_ping_on_message_ping() { - //let ping_context = Arc::new(TestPingContext::default()); - //let peer = 99; - //let nonce = 1000; - //let command = "ping".into(); - //let message = serialize_payload(&Ping::new(nonce), 0).unwrap(); - //let expected_message = serialize_payload(&Pong::new(nonce), 0).unwrap(); - //let mut ping_protocol = PingProtocol { - //context: ping_context.clone(), - //info: PeerInfo { - //id: peer, - //address: "0.0.0.0:8080".parse().unwrap(), - //direction: Direction::Inbound, - //version: 0, - //magic: Magic::Testnet, - //}, - //nonce_generator: StaticNonce::new(nonce), - //last_ping_nonce: None, - //}; - - //assert!(ping_protocol.on_message(&command, &message).is_ok()); - //let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone(); - //assert_eq!(messages.len(), 1); - //assert_eq!(messages[0].0, peer); - //assert_eq!(messages[0].1, expected_message); - //assert_eq!(ping_protocol.last_ping_nonce, None); - //} - - //#[test] - //fn test_ping_on_message_pong() { - //let ping_context = Arc::new(TestPingContext::default()); - //let peer = 99; - //let nonce = 1000; - //let command = "pong".into(); - //let message = serialize_payload(&Pong::new(nonce), 0).unwrap(); - //let mut ping_protocol = PingProtocol { - //context: ping_context.clone(), - //info: PeerInfo { - //id: peer, - //address: "0.0.0.0:8080".parse().unwrap(), - //direction: Direction::Inbound, - //version: 0, - //magic: Magic::Testnet, - //}, - //nonce_generator: StaticNonce::new(nonce), - //last_ping_nonce: Some(nonce), - //}; - - //assert!(ping_protocol.on_message(&command, &message).is_ok()); - //let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone(); - //assert_eq!(messages.len(), 0); - //assert_eq!(ping_protocol.last_ping_nonce, None); - //} -//} diff --git a/p2p/src/protocol/sync.rs b/p2p/src/protocol/sync.rs index ba49957d..362a29bf 100644 --- a/p2p/src/protocol/sync.rs +++ b/p2p/src/protocol/sync.rs @@ -2,11 +2,8 @@ use std::sync::Arc; use bytes::Bytes; use message::{Command, Error, Payload, types, deserialize_payload}; use protocol::Protocol; -use util::{PeerInfo, PeerId}; use net::PeerContext; -const UNIMPLEMENTED_TASK_ID: u32 = 0; - pub type InboundSyncConnectionRef = Box; pub type OutboundSyncConnectionRef = Box; pub type LocalSyncNodeRef = Box; @@ -159,7 +156,7 @@ impl OutboundSyncConnection for OutboundSync { pub struct SyncProtocol { inbound_connection: InboundSyncConnectionRef, - info: PeerInfo, + context: Arc, } impl SyncProtocol { @@ -168,33 +165,33 @@ impl SyncProtocol { let inbound_connection = context.global().create_sync_session(0, outbound_connection); SyncProtocol { inbound_connection: inbound_connection, - info: context.info().clone(), + context: context, } } } impl Protocol for SyncProtocol { fn initialize(&mut self) { - self.inbound_connection.start_sync_session(self.info.version); + self.inbound_connection.start_sync_session(self.context.info().version); } fn on_message(&mut self, command: &Command, payload: &Bytes) -> Result<(), Error> { - let version = self.info.version; + let version = self.context.info().version; if command == &types::Inv::command() { let message: types::Inv = try!(deserialize_payload(payload, version)); self.inbound_connection.on_inventory(message); } else if command == &types::GetData::command() { let message: types::GetData = try!(deserialize_payload(payload, version)); - self.inbound_connection.on_getdata(message, UNIMPLEMENTED_TASK_ID); + self.inbound_connection.on_getdata(message, self.context.declare_response()); } else if command == &types::GetBlocks::command() { let message: types::GetBlocks = try!(deserialize_payload(payload, version)); - self.inbound_connection.on_getblocks(message, UNIMPLEMENTED_TASK_ID); + self.inbound_connection.on_getblocks(message, self.context.declare_response()); } else if command == &types::GetHeaders::command() { let message: types::GetHeaders = try!(deserialize_payload(payload, version)); - self.inbound_connection.on_getheaders(message, UNIMPLEMENTED_TASK_ID); + self.inbound_connection.on_getheaders(message, self.context.declare_response()); } else if command == &types::Tx::command() { let message: types::Tx = try!(deserialize_payload(payload, version)); @@ -206,7 +203,7 @@ impl Protocol for SyncProtocol { } else if command == &types::MemPool::command() { let message: types::MemPool = try!(deserialize_payload(payload, version)); - self.inbound_connection.on_mempool(message, UNIMPLEMENTED_TASK_ID); + self.inbound_connection.on_mempool(message, self.context.declare_response()); } else if command == &types::Headers::command() { let message: types::Headers = try!(deserialize_payload(payload, version)); diff --git a/p2p/src/session.rs b/p2p/src/session.rs index bceef284..81ac5117 100644 --- a/p2p/src/session.rs +++ b/p2p/src/session.rs @@ -5,7 +5,7 @@ use message::{Command, Error}; use p2p::Context; use net::PeerContext; use protocol::{Protocol, PingProtocol, SyncProtocol, AddrProtocol, SeednodeProtocol}; -use util::{ConfigurableSynchronizer, PeerInfo}; +use util::PeerInfo; pub trait SessionFactory { fn new_session(context: Arc, info: PeerInfo, synchronous: bool) -> Session; @@ -36,17 +36,15 @@ impl SessionFactory for NormalSessionFactory { } pub struct Session { - peer_context: Arc, + _peer_context: Arc, protocols: Mutex>>, - synchronizer: Mutex, } impl Session { pub fn new(peer_context: Arc, protocols: Vec>) -> Self { Session { - peer_context: peer_context, + _peer_context: peer_context, protocols: Mutex::new(protocols), - synchronizer: Mutex::new(ConfigurableSynchronizer::new(false)), } }