addr protocol && start of sync protocol (#25)

* p2p <-> sync interfaces proposal

* updated with example

* send errors will be handled in p2p module => no need to return to the sync

* poc of outbound sync connection

* simplified send_to_peer

* context has cpu pool and enent loop handles

* on_message won't return ProtocolAction anymore

* session initialized sync protocol, remove retain cycles on P2P::drop

* removed ProtocolAction

* uncommented ping protocol

* node_table sorts nodes also by recently used time

* send getaddr on connect

* fixed node_table insert, added insert_many

* addr protocol

* added TODO: remove
This commit is contained in:
Marek Kotewicz 2016-10-24 09:42:11 +02:00 committed by Svyatoslav Nikolsky
parent 563f77c500
commit 02816aaa4e
14 changed files with 605 additions and 183 deletions

1
Cargo.lock generated
View File

@ -358,6 +358,7 @@ name = "p2p"
version = "0.1.0"
dependencies = [
"bitcrypto 0.1.0",
"chain 0.1.0",
"futures 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-cpupool 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -11,6 +11,12 @@ impl From<net::IpAddr> for IpAddress {
}
}
impl From<IpAddress> for net::IpAddr {
fn from(ip: IpAddress) -> Self {
ip.0
}
}
impl From<&'static str> for IpAddress {
fn from(s: &'static str) -> Self {
s.parse().unwrap()

View File

@ -11,6 +11,14 @@ pub enum Addr {
V31402(V31402),
}
impl Addr {
pub fn new(addresses: Vec<AddressEntry>) -> Self {
Addr::V31402(V31402 {
addresses: addresses,
})
}
}
impl Payload for Addr {
fn version() -> u32 {
0

View File

@ -6,6 +6,6 @@ extern crate test_data;
pub mod memory_pool;
pub use primitives::{hash};
pub use primitives::hash;
pub use self::memory_pool::{MemoryPool, Information as MemoryPoolInformation, OrderingStrategy as MemoryPoolOrderingStrategy};
pub use self::memory_pool::{MemoryPool, Information as MemoryPoolInformation, OrderingStrategy as MemoryPoolOrderingStrategy};

View File

@ -15,3 +15,4 @@ log = "0.3"
primitives = { path = "../primitives" }
bitcrypto = { path = "../crypto" }
message = { path = "../message" }
chain = { path = "../chain" }

View File

@ -8,6 +8,7 @@ extern crate parking_lot;
#[macro_use]
extern crate log;
extern crate chain;
extern crate bitcrypto as crypto;
extern crate message;
extern crate primitives;

View File

@ -1,8 +1,10 @@
use std::mem;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::collections::HashMap;
use parking_lot::RwLock;
use net::{Connection, Channel};
use p2p::Context;
use session::Session;
use PeerId;
@ -19,6 +21,11 @@ impl Connections {
Connections::default()
}
/// Returns channel with given peer id.
pub fn channel(&self, id: PeerId) -> Option<Arc<Channel>> {
self.channels.read().get(&id).cloned()
}
/// Returns safe (nonblocking) copy of channels.
pub fn channels(&self) -> HashMap<PeerId, Arc<Channel>> {
self.channels.read().clone()
@ -31,8 +38,9 @@ impl Connections {
/// Stores new channel.
/// Returnes a shared pointer to it.
pub fn store(&self, connection: Connection, session: Session) -> Arc<Channel> {
pub fn store(&self, context: Arc<Context>, connection: Connection) -> Arc<Channel> {
let id = self.peer_counter.fetch_add(1, Ordering::AcqRel);
let session = Session::new(context, id);
let channel = Arc::new(Channel::new(connection, id, session));
self.channels.write().insert(id, channel.clone());
channel
@ -42,4 +50,12 @@ impl Connections {
pub fn remove(&self, id: PeerId) -> Option<Arc<Channel>> {
self.channels.write().remove(&id)
}
/// Drop all channels.
pub fn remove_all(&self) -> Vec<Arc<Channel>> {
mem::replace(&mut *self.channels.write(), HashMap::new())
.into_iter()
.map(|(_, value)| value)
.collect()
}
}

View File

@ -5,29 +5,57 @@ use futures::{Future, finished, failed, BoxFuture};
use futures::stream::Stream;
use futures_cpupool::CpuPool;
use tokio_core::io::IoFuture;
use tokio_core::reactor::Handle;
use tokio_core::reactor::{Handle, Remote};
use bytes::Bytes;
use message::{Payload, Command};
use session::Session;
use protocol::Direction;
use io::{ReadAnyMessage, SharedTcpStream};
use net::{connect, listen, Connections, Channel, Config as NetConfig};
use util::NodeTable;
use {Config, PeerInfo};
use util::{NodeTable, Node};
use {Config, PeerInfo, PeerId};
pub type BoxedMessageFuture = BoxFuture<<ReadAnyMessage<SharedTcpStream> as Future>::Item, <ReadAnyMessage<SharedTcpStream> as Future>::Error>;
pub type BoxedEmptyFuture = BoxFuture<(), ()>;
/// Network context.
#[derive(Default)]
pub struct Context {
/// Connections.
connections: Connections,
/// Node Table.
node_table: RwLock<NodeTable>,
/// Thread pool handle.
pool: CpuPool,
/// Remote event loop handle.
remote: Remote,
}
impl Context {
pub fn new(pool_handle: CpuPool, remote: Remote) -> Self {
Context {
connections: Default::default(),
node_table: Default::default(),
pool: pool_handle,
remote: remote,
}
}
pub fn spawn<F>(&self, f: F) where F: Future + Send + 'static, F::Item: Send + 'static, F::Error: Send + 'static {
let pool_work = self.pool.spawn(f);
self.remote.spawn(move |handle| {
handle.spawn(pool_work.then(|_| finished(())));
Ok(())
})
}
pub fn node_table_entries(&self) -> Vec<Node> {
self.node_table.read().recently_active_nodes()
}
pub fn update_node_table(&self, nodes: Vec<Node>) {
trace!("Updating node table with {} entries", nodes.len());
self.node_table.write().insert_many(nodes);
}
pub fn connect(context: Arc<Context>, socket: net::SocketAddr, handle: &Handle, config: &NetConfig) -> BoxedEmptyFuture {
trace!("Trying to connect to: {}", socket);
let connection = connect(&socket, handle, config);
@ -37,13 +65,13 @@ impl Context {
// successfull hanshake
trace!("Connected to {}", connection.address);
context.node_table.write().insert(connection.address, connection.services);
let session = Session::new();
let channel = context.connections.store(connection, session);
let channel = context.connections.store(context.clone(), connection);
// initialize session and then start reading messages
channel.session().initialize(context.clone(), channel.clone(), Direction::Outbound)
.and_then(move |_| Context::on_message(context, channel))
.boxed()
match channel.session().initialize(channel.clone(), Direction::Outbound) {
Ok(_) => Context::on_message(context, channel),
Err(err) => finished(Err(err)).boxed()
}
},
Ok(Err(err)) => {
// protocol error
@ -71,14 +99,13 @@ impl Context {
// successfull hanshake
trace!("Accepted connection from {}", connection.address);
context.node_table.write().insert(connection.address, connection.services);
let session = Session::new();
let channel = context.connections.store(connection, session);
let channel = context.connections.store(context.clone(), connection);
// initialize session and then start reading messages
let cloned_context = context.clone();
channel.session().initialize(context.clone(), channel.clone(), Direction::Inbound)
.and_then(|_| Context::on_message(cloned_context, channel))
.boxed()
match channel.session().initialize(channel.clone(), Direction::Inbound) {
Ok(_) => Context::on_message(context.clone(), channel),
Err(err) => finished(Err(err)).boxed()
}
},
Ok(Err(err)) => {
// protocol error
@ -104,9 +131,14 @@ impl Context {
// successful read
trace!("Received {} message from {}", command, channel.peer_info().address);
// handle message and read the next one
channel.session().on_message(context.clone(), channel.clone(), command, payload)
.and_then(move |_| Context::on_message(context, channel))
.boxed()
match channel.session().on_message(channel.clone(), command, payload) {
Ok(_) => Context::on_message(context, channel),
Err(err) => {
// protocol error
context.close_connection(channel.peer_info());
finished(Err(err)).boxed()
}
}
},
Ok(Err(err)) => {
// protocol error
@ -140,6 +172,17 @@ impl Context {
}).boxed()
}
pub fn send_to_peer<T>(context: Arc<Context>, peer: PeerId, payload: &T) -> IoFuture<()> where T: Payload {
match context.connections.channel(peer) {
Some(channel) => Context::send(context, channel, payload),
None => {
// peer no longer exists.
// TODO: should we return error here?
finished(()).boxed()
}
}
}
pub fn send<T>(_context: Arc<Context>, channel: Arc<Channel>, payload: &T) -> IoFuture<()> where T: Payload {
trace!("Sending {} message to {}", T::command(), channel.peer_info().address);
channel.write_message(payload).then(move |result| {
@ -178,6 +221,19 @@ pub struct P2P {
context: Arc<Context>,
}
impl Drop for P2P {
fn drop(&mut self) {
// there are retain cycles
// context->connections->channel->session->protocol->context
// context->connections->channel->on_message closure->context
// first let's get rid of session retain cycle
for channel in &self.context.connections.remove_all() {
// done, now let's finish on_message
channel.shutdown();
}
}
}
impl P2P {
pub fn new(config: Config, handle: Handle) -> Self {
let pool = CpuPool::new(config.threads);
@ -186,7 +242,7 @@ impl P2P {
event_loop_handle: handle.clone(),
pool: pool.clone(),
config: config,
context: Arc::default(),
context: Arc::new(Context::new(pool, handle.remote().clone())),
}
}
@ -212,40 +268,4 @@ impl P2P {
self.event_loop_handle.spawn(pool_work);
Ok(())
}
/*
pub fn broadcast<T>(&self, payload: T) where T: Payload {
let channels = self.connections.channels();
for (_id, channel) in channels.into_iter() {
self.send_to_channel(&payload, &channel);
}
}
pub fn send<T>(&self, payload: T, peer: PeerId) where T: Payload {
let channels = self.connections.channels();
if let Some(channel) = channels.get(&peer) {
self.send_to_channel(&payload, channel);
}
}
fn send_to_channel<T>(&self, payload: &T, channel: &Arc<Channel>) where T: Payload {
let connections = self.connections.clone();
let node_table = self.node_table.clone();
let peer_info = channel.peer_info();
let write = channel.write_message(payload);
let pool_work = self.pool.spawn(write).then(move |result| {
match result {
Ok(_) => {
node_table.write().note_used(&peer_info.address);
},
Err(_err) => {
node_table.write().note_failure(&peer_info.address);
connections.remove(peer_info.id);
}
}
finished(())
});
self.event_loop_handle.spawn(pool_work);
}
*/
}

59
p2p/src/protocol/addr.rs Normal file
View File

@ -0,0 +1,59 @@
use std::sync::Arc;
use bytes::Bytes;
use message::{Error, Command, deserialize_payload, Payload};
use message::types::{GetAddr, Addr};
use protocol::{Protocol, Direction};
use p2p::Context;
use PeerId;
pub struct AddrProtocol {
/// Context
context: Arc<Context>,
/// Connected peer id.
peer: PeerId,
/// True if expect addr message.
expects_addr: bool,
}
impl AddrProtocol {
pub fn new(context: Arc<Context>, peer: PeerId) -> Self {
AddrProtocol {
context: context,
peer: peer,
expects_addr: false,
}
}
}
impl Protocol for AddrProtocol {
fn initialize(&mut self, direction: Direction, _version: u32) -> Result<(), Error> {
if let Direction::Outbound = direction {
self.expects_addr = true;
let send = Context::send_to_peer(self.context.clone(), self.peer, &GetAddr);
self.context.spawn(send);
}
Ok(())
}
fn on_message(&mut self, command: &Command, payload: &Bytes, version: u32) -> Result<(), Error> {
if command == &GetAddr::command().into() {
let _: GetAddr = try!(deserialize_payload(payload, version));
let entries = self.context.node_table_entries().into_iter().map(Into::into).collect();
let addr = Addr::new(entries);
let send = Context::send_to_peer(self.context.clone(), self.peer, &addr);
self.context.spawn(send);
} else if command == &Addr::command().into() {
let addr: Addr = try!(deserialize_payload(payload, version));
match addr {
Addr::V0(_) => {
unreachable!("This version of protocol is not supported!");
},
Addr::V31402(addr) => {
let nodes = addr.addresses.into_iter().map(Into::into).collect();
self.context.update_node_table(nodes);
},
}
}
Ok(())
}
}

View File

@ -1,10 +1,14 @@
mod addr;
mod ping;
mod sync;
use bytes::Bytes;
use message::Error;
use message::common::Command;
pub use self::addr::AddrProtocol;
pub use self::ping::PingProtocol;
pub use self::sync::SyncProtocol;
#[derive(PartialEq, Clone, Copy)]
pub enum Direction {
@ -12,18 +16,17 @@ pub enum Direction {
Outbound,
}
pub enum ProtocolAction {
Reply((Command, Bytes)),
None,
Disconnect,
}
pub trait Protocol: Send {
/// Initialize the protocol.
fn initialize(&mut self, _direction: Direction, _version: u32) -> Result<ProtocolAction, Error> {
Ok(ProtocolAction::None)
fn initialize(&mut self, _direction: Direction, _version: u32) -> Result<(), Error> {
Ok(())
}
/// Handle the message.
fn on_message(&self, command: &Command, payload: &Bytes, version: u32) -> Result<ProtocolAction, Error>;
fn on_message(&mut self, command: &Command, payload: &Bytes, version: u32) -> Result<(), Error>;
/// Boxes the protocol.
fn boxed(self) -> Box<Protocol> where Self: Sized + 'static {
Box::new(self)
}
}

View File

@ -1,56 +1,60 @@
use std::sync::Arc;
use bytes::Bytes;
use message::{Error, Payload, deserialize_payload, serialize_payload};
use message::{Error, Payload, deserialize_payload};
use message::types::{Ping, Pong};
use message::common::Command;
use protocol::{Protocol, ProtocolAction, Direction};
use protocol::{Protocol, Direction};
use util::nonce::{NonceGenerator, RandomNonce};
use p2p::Context;
use PeerId;
pub struct PingProtocol<T = RandomNonce> {
/// Nonce generator
/// Context
context: Arc<Context>,
/// Connected peer id.
peer: PeerId,
/// Nonce generator.
nonce_generator: T,
/// Last nonce sent in a ping message.
last_ping_nonce: u64,
/// Last nonce sent in the ping message.
last_ping_nonce: Option<u64>,
}
impl PingProtocol {
pub fn new() -> Self {
pub fn new(context: Arc<Context>, peer: PeerId) -> Self {
PingProtocol {
context: context,
peer: peer,
nonce_generator: RandomNonce::default(),
last_ping_nonce: 0,
last_ping_nonce: None,
}
}
}
impl<T> Protocol for PingProtocol<T> where T: NonceGenerator + Send {
fn initialize(&mut self, direction: Direction, version: u32) -> Result<ProtocolAction, Error> {
match direction {
Direction::Outbound => Ok(ProtocolAction::None),
Direction::Inbound => {
let nonce = self.nonce_generator.get();
self.last_ping_nonce = nonce;
let ping = Ping::new(nonce);
let serialized = try!(serialize_payload(&ping, version));
Ok(ProtocolAction::Reply((Ping::command().into(), serialized)))
},
}
fn initialize(&mut self, _direction: Direction, _version: u32) -> Result<(), Error> {
// bitcoind always sends ping, let's do the same
let nonce = self.nonce_generator.get();
self.last_ping_nonce = Some(nonce);
let ping = Ping::new(nonce);
let send = Context::send_to_peer(self.context.clone(), self.peer, &ping);
self.context.spawn(send);
Ok(())
}
fn on_message(&self, command: &Command, payload: &Bytes, version: u32) -> Result<ProtocolAction, Error> {
fn on_message(&mut self, command: &Command, payload: &Bytes, version: u32) -> Result<(), Error> {
if command == &Ping::command().into() {
let ping: Ping = try!(deserialize_payload(payload, version));
let pong = Pong::new(ping.nonce);
let serialized = try!(serialize_payload(&pong, version));
Ok(ProtocolAction::Reply((Pong::command().into(), serialized)))
let send = Context::send_to_peer(self.context.clone(), self.peer, &pong);
self.context.spawn(send);
} else if command == &Pong::command().into() {
let pong: Pong = try!(deserialize_payload(payload, version));
if pong.nonce != self.last_ping_nonce {
Err(Error::InvalidCommand)
} else {
Ok(ProtocolAction::None)
if Some(pong.nonce) != self.last_ping_nonce.take() {
return Err(Error::InvalidCommand)
}
} else {
Ok(ProtocolAction::None)
}
Ok(())
}
}

192
p2p/src/protocol/sync.rs Normal file
View File

@ -0,0 +1,192 @@
//TODO: remove!
#![allow(dead_code)]
#![allow(unused_variables)]
use std::sync::Arc;
use parking_lot::Mutex;
use chain::{Block, Transaction};
use bytes::Bytes;
use message::{Command, Error, Payload, types};
use protocol::Protocol;
use p2p::Context;
use PeerId;
pub type InboundSyncConnectionRef = Arc<Mutex<Box<InboundSyncConnection>>>;
pub type OutboundSyncConnectionRef = Arc<Mutex<Box<OutboundSyncConnection>>>;
// TODO: use this to respond to construct Version message (start_height field)
// TODO: use this to create new inbound sessions
pub trait LocalSyncNode : Send + Sync {
fn start_height(&self) -> i32;
fn start_sync_session(&mut self, outbound: OutboundSyncConnectionRef) -> InboundSyncConnectionRef;
}
pub trait InboundSyncConnection : Send + Sync {
fn on_iventory(&mut self, message: &types::Inv);
fn on_getdata(&mut self, message: &types::GetData);
fn on_getblocks(&mut self, message: &types::GetBlocks);
fn on_getheaders(&mut self, message: &types::GetHeaders);
fn on_transaction(&mut self, message: &Transaction);
fn on_block(&mut self, message: &Block);
fn on_headers(&mut self, message: &types::Headers);
fn on_mempool(&mut self, message: &types::MemPool);
fn on_filterload(&mut self, message: &types::FilterLoad);
fn on_filteradd(&mut self, message: &types::FilterAdd);
fn on_filterclear(&mut self, message: &types::FilterClear);
fn on_merkleblock(&mut self, message: &types::MerkleBlock);
fn on_sendheaders(&mut self, message: &types::SendHeaders);
fn on_feefilter(&mut self, message: &types::FeeFilter);
fn on_send_compact(&mut self, message: &types::SendCompact);
fn on_compact_block(&mut self, message: &types::CompactBlock);
fn on_get_block_txn(&mut self, message: &types::GetBlockTxn);
fn on_block_txn(&mut self, message: &types::BlockTxn);
}
pub trait OutboundSyncConnection : Send + Sync {
fn send_iventory(&mut self, message: &types::Inv);
fn send_getdata(&mut self, message: &types::GetData);
fn send_getblocks(&mut self, message: &types::GetBlocks);
fn send_getheaders(&mut self, message: &types::GetHeaders);
fn send_transaction(&mut self, message: &Transaction);
fn send_block(&mut self, message: &Block);
fn send_headers(&mut self, message: &types::Headers);
fn send_mempool(&mut self, message: &types::MemPool);
fn send_filterload(&mut self, message: &types::FilterLoad);
fn send_filteradd(&mut self, message: &types::FilterAdd);
fn send_filterclear(&mut self, message: &types::FilterClear);
fn send_merkleblock(&mut self, message: &types::MerkleBlock);
fn send_sendheaders(&mut self, message: &types::SendHeaders);
fn send_feefilter(&mut self, message: &types::FeeFilter);
fn send_send_compact(&mut self, message: &types::SendCompact);
fn send_compact_block(&mut self, message: &types::CompactBlock);
fn send_get_block_txn(&mut self, message: &types::GetBlockTxn);
fn send_block_txn(&mut self, message: &types::BlockTxn);
}
struct OutboundSync {
context: Arc<Context>,
peer: PeerId,
}
impl OutboundSync {
pub fn new(context: Arc<Context>, peer: PeerId) -> OutboundSync {
OutboundSync {
context: context,
peer: peer,
}
}
pub fn send_message<T>(&self, message: &T) where T: Payload {
let send = Context::send_to_peer(self.context.clone(), self.peer, message);
self.context.spawn(send);
}
pub fn boxed(self) -> Box<OutboundSyncConnection> {
Box::new(self)
}
}
impl OutboundSyncConnection for OutboundSync {
fn send_iventory(&mut self, message: &types::Inv) {
self.send_message(message);
}
fn send_getdata(&mut self, message: &types::GetData) {
self.send_message(message);
}
fn send_getblocks(&mut self, message: &types::GetBlocks) {
self.send_message(message);
}
fn send_getheaders(&mut self, message: &types::GetHeaders) {
self.send_message(message);
}
fn send_transaction(&mut self, message: &Transaction) {
unimplemented!();
}
fn send_block(&mut self, message: &Block) {
unimplemented!();
}
fn send_headers(&mut self, message: &types::Headers) {
self.send_message(message);
}
fn send_mempool(&mut self, message: &types::MemPool) {
self.send_message(message);
}
fn send_filterload(&mut self, message: &types::FilterLoad) {
self.send_message(message);
}
fn send_filteradd(&mut self, message: &types::FilterAdd) {
self.send_message(message);
}
fn send_filterclear(&mut self, message: &types::FilterClear) {
self.send_message(message);
}
fn send_merkleblock(&mut self, message: &types::MerkleBlock) {
self.send_message(message);
}
fn send_sendheaders(&mut self, message: &types::SendHeaders) {
self.send_message(message);
}
fn send_feefilter(&mut self, message: &types::FeeFilter) {
self.send_message(message);
}
fn send_send_compact(&mut self, message: &types::SendCompact) {
self.send_message(message);
}
fn send_compact_block(&mut self, message: &types::CompactBlock) {
self.send_message(message);
}
fn send_get_block_txn(&mut self, message: &types::GetBlockTxn) {
self.send_message(message);
}
fn send_block_txn(&mut self, message: &types::BlockTxn) {
self.send_message(message);
}
}
pub struct SyncProtocol {
//inbound_connection: InboundSyncConnectionRef,
outbound_connection: OutboundSyncConnectionRef,
}
impl SyncProtocol {
pub fn new(context: Arc<Context>, peer: PeerId) -> Self {
let outbound_connection = Arc::new(Mutex::new(OutboundSync::new(context, peer).boxed()));
// let inbound_connection = local_sync_node.start_sync_session(outbound_connection); // TODO: create inbound connection using LocalSyncNode::start_sync_session
SyncProtocol {
// inbound_connection: inbound_connection,
outbound_connection: outbound_connection,
}
}
}
impl Protocol for SyncProtocol {
fn on_message(&mut self, command: &Command, payload: &Bytes, version: u32) -> Result<(), Error> {
// TODO: pass message to inbound_connection + convert response to ProtocolAction/Error
/*
if command == &Inv::command().into() {
let inventory: Inv = try!(deserialize_payload(payload, version));
self.inbound_connection.on_iventory(&inventory);
} else {
Ok(ProtocolAction::None)
}
*/
Ok(())
}
}

View File

@ -1,26 +1,27 @@
use std::sync::Arc;
use parking_lot::Mutex;
use futures::{collect, finished, failed, Future};
use tokio_core::io::IoFuture;
use bytes::Bytes;
use message::Command;
use message::{Command, Error};
use p2p::Context;
use net::Channel;
use protocol::{Protocol, ProtocolAction, PingProtocol, Direction};
use protocol::{Protocol, PingProtocol, SyncProtocol, AddrProtocol, Direction};
use PeerId;
pub struct Session {
protocols: Mutex<Vec<Box<Protocol>>>,
}
impl Session {
pub fn new() -> Self {
let ping = PingProtocol::new();
Session::new_with_protocols(vec![Box::new(ping)])
pub fn new(context: Arc<Context>, peer: PeerId) -> Self {
let ping = PingProtocol::new(context.clone(), peer).boxed();
let addr = AddrProtocol::new(context.clone(), peer).boxed();
let sync = SyncProtocol::new(context, peer).boxed();
Session::new_with_protocols(vec![ping, addr, sync])
}
pub fn new_seednode() -> Self {
let ping = PingProtocol::new();
Session::new_with_protocols(vec![Box::new(ping)])
pub fn new_seednode(context: Arc<Context>, peer: PeerId) -> Self {
let ping = PingProtocol::new(context.clone(), peer).boxed();
Session::new_with_protocols(vec![ping])
}
pub fn new_with_protocols(protocols: Vec<Box<Protocol>>) -> Self {
@ -29,61 +30,24 @@ impl Session {
}
}
pub fn initialize(&self, context: Arc<Context>, channel: Arc<Channel>, direction: Direction) -> IoFuture<()> {
let futures = self.protocols.lock()
pub fn initialize(&self, channel: Arc<Channel>, direction: Direction) -> Result<(), Error> {
self.protocols.lock()
.iter_mut()
.map(|protocol| {
// TODO: use real direction and version
match protocol.initialize(direction, channel.version()) {
Ok(ProtocolAction::None) => {
finished(()).boxed()
},
Ok(ProtocolAction::Disconnect) => {
// no other protocols can use the channel after that
context.close_connection(channel.peer_info());
finished(()).boxed()
},
Ok(ProtocolAction::Reply((command, payload))) => {
Context::send_raw(context.clone(), channel.clone(), command, &payload)
},
Err(err) => {
// protocol error
unimplemented!();
}
}
protocol.initialize(direction, channel.version())
})
.collect::<Vec<_>>();
collect(futures)
.and_then(|_| finished(()))
.boxed()
.collect::<Result<Vec<_>, Error>>()
.map(|_| ())
}
pub fn on_message(&self, context: Arc<Context>, channel: Arc<Channel>, command: Command, payload: Bytes) -> IoFuture<()> {
let futures = self.protocols.lock()
.iter()
pub fn on_message(&self, channel: Arc<Channel>, command: Command, payload: Bytes) -> Result<(), Error> {
self.protocols.lock()
.iter_mut()
.map(|protocol| {
// TODO: use real version
match protocol.on_message(&command, &payload, channel.version()) {
Ok(ProtocolAction::None) => {
finished(()).boxed()
},
Ok(ProtocolAction::Disconnect) => {
context.close_connection(channel.peer_info());
finished(()).boxed()
},
Ok(ProtocolAction::Reply((command, payload))) => {
Context::send_raw(context.clone(), channel.clone(), command, &payload)
},
Err(err) => {
// protocol error
unimplemented!();
},
}
protocol.on_message(&command, &payload, channel.version())
})
.collect::<Vec<_>>();
collect(futures)
.and_then(|_| finished(()))
.boxed()
.collect::<Result<Vec<_>, Error>>()
.map(|_| ())
}
}

View File

@ -1,7 +1,9 @@
use std::collections::{HashMap, BTreeSet};
use std::collections::hash_map::Entry;
use std::net::SocketAddr;
use std::cmp::{PartialOrd, Ord, Ordering};
use message::common::Services;
use message::common::{Services, NetAddress};
use message::types::addr::AddressEntry;
use util::time::{Time, RealTime};
#[derive(PartialEq, Eq, Clone)]
@ -16,26 +18,80 @@ pub struct Node {
failures: u32,
}
impl PartialOrd for Node {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
if self.failures == other.failures {
self.time.partial_cmp(&other.time)
} else {
other.failures.partial_cmp(&self.failures)
impl From<AddressEntry> for Node {
fn from(entry: AddressEntry) -> Self {
Node {
addr: SocketAddr::new(entry.address.address.into(), entry.address.port.into()),
time: entry.timestamp as i64,
services: entry.address.services,
failures: 0,
}
}
}
impl Ord for Node {
fn cmp(&self, other: &Self) -> Ordering {
if self.failures == other.failures {
self.time.cmp(&other.time)
} else {
other.failures.cmp(&self.failures)
impl From<Node> for AddressEntry {
fn from(node: Node) -> Self {
AddressEntry {
timestamp: node.time as u32,
address: NetAddress {
services: node.services,
address: node.addr.ip().into(),
port: node.addr.port().into(),
}
}
}
}
#[derive(PartialEq, Eq, Clone)]
struct NodeByScore(Node);
impl From<Node> for NodeByScore {
fn from(node: Node) -> Self {
NodeByScore(node)
}
}
impl PartialOrd for NodeByScore {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
if self.0.failures == other.0.failures {
other.0.time.partial_cmp(&self.0.time)
} else {
self.0.failures.partial_cmp(&other.0.failures)
}
}
}
impl Ord for NodeByScore {
fn cmp(&self, other: &Self) -> Ordering {
if self.0.failures == other.0.failures {
other.0.time.cmp(&self.0.time)
} else {
self.0.failures.cmp(&other.0.failures)
}
}
}
#[derive(PartialEq, Eq, Clone)]
struct NodeByTime(Node);
impl From<Node> for NodeByTime {
fn from(node: Node) -> Self {
NodeByTime(node)
}
}
impl PartialOrd for NodeByTime {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
other.0.time.partial_cmp(&self.0.time)
}
}
impl Ord for NodeByTime {
fn cmp(&self, other: &Self) -> Ordering {
other.0.time.cmp(&self.0.time)
}
}
#[derive(Default)]
pub struct NodeTable<T = RealTime> where T: Time {
/// Time source.
@ -43,50 +99,117 @@ pub struct NodeTable<T = RealTime> where T: Time {
/// Nodes by socket address.
by_addr: HashMap<SocketAddr, Node>,
/// Nodes sorted by score.
by_score: BTreeSet<Node>,
by_score: BTreeSet<NodeByScore>,
/// Nodes sorted by time.
by_time: BTreeSet<NodeByTime>,
}
impl<T> NodeTable<T> where T: Time {
/// Inserts new address and services pair into NodeTable.
pub fn insert(&mut self, addr: SocketAddr, services: Services) {
let failures = self.by_addr.get(&addr).map_or(0, |ref node| node.failures);
let now = self.time.get().sec;
match self.by_addr.entry(addr) {
Entry::Occupied(mut entry) => {
let old = entry.get_mut();
assert!(self.by_score.remove(&old.clone().into()));
assert!(self.by_time.remove(&old.clone().into()));
old.time = now;
old.services = services;
self.by_score.insert(old.clone().into());
self.by_time.insert(old.clone().into());
},
Entry::Vacant(entry) => {
let node = Node {
addr: addr,
time: now,
services: services,
failures: 0,
};
self.by_score.insert(node.clone().into());
self.by_time.insert(node.clone().into());
entry.insert(node);
}
}
}
let node = Node {
addr: addr,
time: self.time.get().sec,
services: services,
failures: failures,
};
/// Inserts many new addresses into node table.
/// Used in `addr` request handler.
/// Discards all nodes with timestamp newer than current time.
pub fn insert_many(&mut self, nodes: Vec<Node>) {
// discard all nodes with timestamp newer than current time.
let now = self.time.get().sec;
let iter = nodes.into_iter()
.filter(|node| node.time <= now);
self.by_addr.insert(addr, node.clone());
self.by_score.insert(node);
// iterate over the rest
for node in iter {
match self.by_addr.entry(node.addr) {
Entry::Occupied(mut entry) => {
let old = entry.get_mut();
// we've already seen this node
if old.time < node.time {
assert!(self.by_score.remove(&old.clone().into()));
assert!(self.by_time.remove(&old.clone().into()));
// update node info
old.time = node.time;
old.services = node.services;
self.by_score.insert(old.clone().into());
self.by_time.insert(old.clone().into());
}
},
Entry::Vacant(entry)=> {
// it's first time we see this node
self.by_score.insert(node.clone().into());
self.by_time.insert(node.clone().into());
entry.insert(node);
}
}
}
}
/// Returnes most reliable nodes with desired services.
pub fn nodes_with_services(&self, services: &Services, limit: usize) -> Vec<Node> {
self.by_score.iter()
.rev()
.filter(|s| s.services.includes(services))
.map(Clone::clone)
.filter(|node| node.0.services.includes(services))
.map(|node| node.0.clone())
.take(limit)
.collect()
}
/// Returns most recently active nodes.
///
/// The documenation says:
/// "Non-advertised nodes should be forgotten after typically 3 hours"
/// but bitcoin client still advertises them even after a month.
/// Let's do the same.
///
/// https://en.bitcoin.it/wiki/Protocol_documentation#addr
pub fn recently_active_nodes(&self) -> Vec<Node> {
self.by_time.iter()
.map(|node| node.0.clone())
.take(1000)
.collect()
}
/// Marks address as recently used.
pub fn note_used(&mut self, addr: &SocketAddr) {
if let Some(ref mut node) = self.by_addr.get_mut(addr) {
assert!(self.by_score.remove(node));
assert!(self.by_score.remove(&node.clone().into()));
assert!(self.by_time.remove(&node.clone().into()));
node.time = self.time.get().sec;
self.by_score.insert(node.clone());
self.by_score.insert(node.clone().into());
self.by_time.insert(node.clone().into());
}
}
/// Notes failure.
pub fn note_failure(&mut self, addr: &SocketAddr) {
if let Some(ref mut node) = self.by_addr.get_mut(addr) {
assert!(self.by_score.remove(node));
assert!(self.by_score.remove(&node.clone().into()));
assert!(self.by_time.remove(&node.clone().into()));
node.failures += 1;
self.by_score.insert(node.clone());
self.by_score.insert(node.clone().into());
self.by_time.insert(node.clone().into());
}
}
}
@ -157,5 +280,29 @@ mod tests {
assert_eq!(nodes[4].addr, s3);
assert_eq!(nodes[4].time, 3);
assert_eq!(nodes[4].failures, 1);
let nodes = table.recently_active_nodes();
assert_eq!(nodes.len(), 5);
assert_eq!(nodes[0].addr, s1);
assert_eq!(nodes[0].time, 7);
assert_eq!(nodes[0].failures, 0);
assert_eq!(nodes[1].addr, s4);
assert_eq!(nodes[1].time, 6);
assert_eq!(nodes[1].failures, 0);
assert_eq!(nodes[2].addr, s2);
assert_eq!(nodes[2].time, 5);
assert_eq!(nodes[2].failures, 1);
assert_eq!(nodes[3].addr, s3);
assert_eq!(nodes[3].time, 3);
assert_eq!(nodes[3].failures, 1);
assert_eq!(nodes[4].addr, s0);
assert_eq!(nodes[4].time, 0);
assert_eq!(nodes[4].failures, 0);
}
}