connect to multiple nodes automatically

This commit is contained in:
debris 2016-11-03 00:22:00 +01:00
parent ab6e4e78fc
commit 448259328d
16 changed files with 269 additions and 62 deletions

24
Cargo.lock generated
View File

@ -145,12 +145,14 @@ dependencies = [
[[package]]
name = "domain"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
source = "git+https://github.com/debris/domain#3754429fefb19b7c1c78cf34fcea32272266a8c0"
dependencies = [
"byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.0 (git+https://github.com/debris/tokio-core)",
"void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -369,12 +371,12 @@ dependencies = [
[[package]]
name = "ns-dns-tokio"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
source = "git+https://github.com/debris/abstract-ns#d4df407f94ae725c88aed3457aa8ebb44c123c5a"
dependencies = [
"abstract-ns 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"domain 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"domain 0.1.0 (git+https://github.com/debris/domain)",
"futures 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.0 (git+https://github.com/debris/tokio-core)",
]
[[package]]
@ -405,13 +407,13 @@ dependencies = [
"futures-cpupool 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"message 0.1.0",
"ns-dns-tokio 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ns-dns-tokio 0.1.0 (git+https://github.com/debris/abstract-ns)",
"parking_lot 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"primitives 0.1.0",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
"serialization 0.1.0",
"time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.0 (git+https://github.com/debris/tokio-core)",
]
[[package]]
@ -635,7 +637,7 @@ dependencies = [
[[package]]
name = "tokio-core"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
source = "git+https://github.com/debris/tokio-core#623ce443d89cd9ffa2c1adae8d2eb75538802d01"
dependencies = [
"futures 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
@ -724,7 +726,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum clap 2.16.2 (registry+https://github.com/rust-lang/crates.io-index)" = "08aac7b078ec0a58e1d4b43cfb11d47001f8eb7c6f6f2bda4f5eed43c82491f1"
"checksum crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "0c5ea215664ca264da8a9d9c3be80d2eaf30923c259d03e870388eb927508f97"
"checksum deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1614659040e711785ed8ea24219140654da1729f3ec8a47a9719d041112fe7bf"
"checksum domain 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "725459994103308a8476a95d8115280b1359dccc06ca14291df75f37459a9e30"
"checksum domain 0.1.0 (git+https://github.com/debris/domain)" = "<none>"
"checksum elastic-array 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4bc9250a632e7c001b741eb0ec6cee93c9a5b6d5f1879696a4b94d62b012210a"
"checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f"
"checksum eth-secp256k1 0.5.6 (git+https://github.com/ethcore/rust-secp256k1)" = "<none>"
@ -744,7 +746,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum net2 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)" = "5edf9cb6be97212423aed9413dd4729d62b370b5e1c571750e882cebbbc1e3e2"
"checksum nix 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a7bb1da2be7da3cbffda73fc681d509ffd9e665af478d2bee1907cee0bc64b2"
"checksum nodrop 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "0dbbadd3f4c98dea0bd3d9b4be4c0cdaf1ab57035cb2e41fce3983db5add7cc5"
"checksum ns-dns-tokio 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "43330aab5077c311b390b62147feb44316cb5b754b97d28c92210e6c6b7baff7"
"checksum ns-dns-tokio 0.1.0 (git+https://github.com/debris/abstract-ns)" = "<none>"
"checksum num_cpus 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8890e6084723d57d0df8d2720b0d60c6ee67d6c93e7169630e4371e88765dcad"
"checksum odds 0.2.23 (registry+https://github.com/rust-lang/crates.io-index)" = "e04630a62b3f1cc8c58b4d8f2555a40136f02b420e158242936ef286a72d33a0"
"checksum owning_ref 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "8d91377085359426407a287ab16884a0111ba473aa6844ff01d4ec20ce3d75e7"
@ -769,7 +771,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum thread-id 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a9539db560102d1cef46b8b78ce737ff0bb64e7e18d35b2a5688f7d097d0ff03"
"checksum thread_local 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "8576dbbfcaef9641452d5cf0df9b0e7eeab7694956dd33bb61515fb8f18cfdd5"
"checksum time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)" = "3c7ec6d62a20df54e07ab3b78b9a3932972f4b7981de295563686849eb3989af"
"checksum tokio-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "659cbae6c954dee37352853816c6a52180e47feb70be73bbfeec6d58c4da4a71"
"checksum tokio-core 0.1.0 (git+https://github.com/debris/tokio-core)" = "<none>"
"checksum unicode-segmentation 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b905d0fc2a1f0befd86b0e72e31d1787944efef9d38b9358a9e92a69757f7e3b"
"checksum unicode-width 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2d6722facc10989f63ee0e20a83cd4e1714a9ae11529403ac7e0afd069abc39e"
"checksum utf8-ranges 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a1ca13c08c41c9c3e04224ed9ff80461d97e121589ff27c753a16cb10830ae0f"

View File

@ -4,7 +4,7 @@ version = "0.1.0"
authors = ["debris <marek.kotewicz@gmail.com>"]
[dependencies]
tokio-core = "0.1"
tokio-core = { git = "https://github.com/debris/tokio-core" }
parking_lot = "0.3"
futures = "0.1"
futures-cpupool = "0.1"
@ -12,7 +12,7 @@ time = "0.1"
rand = "0.3"
log = "0.3"
abstract-ns = "0.2.1"
ns-dns-tokio = "0.1.0"
ns-dns-tokio = { git = "https://github.com/debris/abstract-ns", path = "ns-dns-tokio" }
primitives = { path = "../primitives"}
bitcrypto = { path = "../crypto" }

View File

@ -10,9 +10,9 @@ pub struct Config {
/// Highest supported protocol version.
pub protocol_maximum: u32,
/// Number of inbound connections.
pub inbound_connections: usize,
pub inbound_connections: u32,
/// Number of outbound connections.
pub outbound_connections: usize,
pub outbound_connections: u32,
/// Configuration for every connection.
pub connection: NetConfig,
/// Connect only ot these nodes.

View File

@ -2,6 +2,7 @@ use message::{Payload, Magic, Message};
use net::Connection;
use session::Session;
use io::{SharedTcpStream, WriteMessage, write_message, read_any_message, ReadAnyMessage};
use util::Direction;
use {PeerId, PeerInfo};
pub struct Channel {
@ -13,13 +14,14 @@ pub struct Channel {
}
impl Channel {
pub fn new(connection: Connection, peer_id: PeerId, session: Session) -> Self {
pub fn new(connection: Connection, peer_id: PeerId, session: Session, direction: Direction) -> Self {
Channel {
version: connection.version,
magic: connection.magic,
peer_info: PeerInfo {
address: connection.address,
id: peer_id,
direction: direction,
},
session: session,
stream: connection.stream,

View File

@ -0,0 +1,117 @@
use std::cmp;
use std::sync::atomic::{AtomicUsize, Ordering};
/// Counts number of open inbound and outbound connections.
pub struct ConnectionCounter {
/// Current number of inbound connections.
current_inbound_connections: AtomicUsize,
/// Current number of outbound connections.
current_outbound_connections: AtomicUsize,
/// Maximum number of inbound connections.
max_inbound_connections: u32,
/// Maximum number of outbound connections.
max_outbound_connections: u32,
}
impl ConnectionCounter {
pub fn new(max_inbound_connections: u32, max_outbound_connections: u32) -> Self {
ConnectionCounter {
current_inbound_connections: AtomicUsize::new(0),
current_outbound_connections: AtomicUsize::new(0),
max_inbound_connections: max_inbound_connections,
max_outbound_connections: max_outbound_connections,
}
}
/// Returns maxiumum number of outbound connections.
pub fn max_outbound_connections(&self) -> u32 {
self.max_outbound_connections
}
/// Increases inbound connections counter by 1.
pub fn note_new_inbound_connection(&self) {
self.current_inbound_connections.fetch_add(1, Ordering::AcqRel);
}
/// Decreases inbound connections counter by 1.
/// If it underflows, it means, that there is a logic error.
pub fn note_close_inbound_connection(&self) {
self.current_inbound_connections.fetch_sub(1, Ordering::AcqRel);
}
/// Increases outbound connections counter by 1.
pub fn note_new_outbound_connection(&self) {
self.current_outbound_connections.fetch_add(1, Ordering::AcqRel);
}
/// Decreases outbound connections counter by 1.
/// If it underflows, it means, that there is a logic error.
pub fn note_close_outbound_connection(&self) {
self.current_outbound_connections.fetch_sub(1, Ordering::AcqRel);
}
/// Returns number of inbound connections needed to reach the maximum
pub fn inbound_connections_needed(&self) -> u32 {
let ic = self.inbound_connections();
ic.1 - cmp::min(ic.0, ic.1)
}
/// Returns number of inbound connections needed to reach the maximum
pub fn outbound_connections_needed(&self) -> u32 {
let oc = self.outbound_connections();
oc.1 - cmp::min(oc.0, oc.1)
}
/// Returns a pair of unsigned integers where first element is current number of connections and the second is max.
pub fn inbound_connections(&self) -> (u32, u32) {
let current = self.current_inbound_connections.load(Ordering::Acquire) as u32;
(current, self.max_inbound_connections)
}
/// Returns a pair of unsigned integers where first element is current number of connections and the second is max.
pub fn outbound_connections(&self) -> (u32, u32) {
let current = self.current_outbound_connections.load(Ordering::Acquire) as u32;
(current, self.max_outbound_connections)
}
}
#[cfg(test)]
mod tests {
use super::ConnectionCounter;
#[test]
fn test_inbound_connection_counter() {
let cc = ConnectionCounter::new(5, 10);
assert_eq!(cc.inbound_connections_needed(), 5);
assert_eq!(cc.inbound_connections(), (0, 5));
cc.note_new_inbound_connection();
assert_eq!(cc.inbound_connections_needed(), 4);
assert_eq!(cc.inbound_connections(), (1, 5));
cc.note_new_inbound_connection();
cc.note_new_inbound_connection();
cc.note_new_inbound_connection();
cc.note_new_inbound_connection();
assert_eq!(cc.inbound_connections_needed(), 0);
// it may exceed max
cc.note_new_inbound_connection();
assert_eq!(cc.inbound_connections_needed(), 0);
assert_eq!(cc.inbound_connections(), (6, 5));
cc.note_close_inbound_connection();
assert_eq!(cc.inbound_connections_needed(), 0);
assert_eq!(cc.inbound_connections(), (5, 5));
}
#[test]
fn test_outbound_connection_counter() {
let cc = ConnectionCounter::new(0, 4);
assert_eq!(cc.outbound_connections_needed(), 4);
assert_eq!(cc.outbound_connections(), (0, 4));
cc.note_new_outbound_connection();
cc.note_new_outbound_connection();
assert_eq!(cc.outbound_connections_needed(), 2);
assert_eq!(cc.outbound_connections(), (2, 4));
cc.note_close_outbound_connection();
assert_eq!(cc.outbound_connections_needed(), 3);
assert_eq!(cc.outbound_connections(), (1, 4));
}
}

View File

@ -1,11 +1,12 @@
use std::mem;
use std::{mem, net};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use parking_lot::RwLock;
use net::{Connection, Channel};
use p2p::Context;
use session::{SessionFactory};
use util::Direction;
use PeerId;
#[derive(Default)]
@ -23,10 +24,15 @@ impl Connections {
}
/// Returns safe (nonblocking) copy of channels.
pub fn _channels(&self) -> HashMap<PeerId, Arc<Channel>> {
pub fn channels(&self) -> HashMap<PeerId, Arc<Channel>> {
self.channels.read().clone()
}
/// Returns addresses of all active channels (nonblocking).
pub fn addresses(&self) -> HashSet<net::SocketAddr> {
self.channels().values().map(|channel| channel.peer_info().address).collect()
}
/// Returns number of connections.
pub fn _count(&self) -> usize {
self.channels.read().len()
@ -34,10 +40,10 @@ impl Connections {
/// Stores new channel.
/// Returnes a shared pointer to it.
pub fn store<T>(&self, context: Arc<Context>, connection: Connection) -> Arc<Channel> where T: SessionFactory {
pub fn store<T>(&self, context: Arc<Context>, connection: Connection, direction: Direction) -> Arc<Channel> where T: SessionFactory {
let id = self.peer_counter.fetch_add(1, Ordering::AcqRel);
let session = T::new_session(context, id);
let channel = Arc::new(Channel::new(connection, id, session));
let channel = Arc::new(Channel::new(connection, id, session, direction));
self.channels.write().insert(id, channel.clone());
channel
}

View File

@ -3,6 +3,7 @@ mod channel;
mod config;
mod connect;
mod connection;
mod connection_counter;
mod connections;
pub use self::accept_connection::{AcceptConnection, accept_connection};
@ -10,4 +11,5 @@ pub use self::channel::Channel;
pub use self::config::Config;
pub use self::connect::{Connect, connect};
pub use self::connection::Connection;
pub use self::connection_counter::ConnectionCounter;
pub use self::connections::Connections;

View File

@ -6,13 +6,13 @@ use futures::stream::Stream;
use futures_cpupool::CpuPool;
use tokio_core::io::IoFuture;
use tokio_core::net::{TcpListener, TcpStream};
use tokio_core::reactor::{Handle, Remote, Timeout};
use tokio_core::reactor::{Handle, Remote, Timeout, Interval};
use abstract_ns::Resolver;
use ns_dns_tokio::DnsResolver;
use message::{Payload, MessageResult};
use protocol::Direction;
use net::{connect, Connections, Channel, Config as NetConfig, accept_connection};
use util::{NodeTable, Node};
use message::common::Services;
use net::{connect, Connections, Channel, Config as NetConfig, accept_connection, ConnectionCounter};
use util::{NodeTable, Node, Direction};
use session::{SessionFactory, SeednodeSessionFactory, NormalSessionFactory};
use {Config, PeerId};
use protocol::{LocalSyncNodeRef, InboundSyncConnectionRef, OutboundSyncConnectionRef};
@ -24,6 +24,8 @@ pub type BoxedEmptyFuture = BoxFuture<(), ()>;
pub struct Context {
/// Connections.
connections: Connections,
/// Connection counter.
connection_counter: ConnectionCounter,
/// Node Table.
node_table: RwLock<NodeTable>,
/// Thread pool handle.
@ -36,9 +38,10 @@ pub struct Context {
impl Context {
/// Creates new context with reference to local sync node, thread pool and event loop.
pub fn new(local_sync_node: LocalSyncNodeRef, pool_handle: CpuPool, remote: Remote) -> Self {
pub fn new(local_sync_node: LocalSyncNodeRef, pool_handle: CpuPool, remote: Remote, config: &Config) -> Self {
Context {
connections: Default::default(),
connection_counter: ConnectionCounter::new(config.inbound_connections, config.outbound_connections),
node_table: Default::default(),
pool: pool_handle,
remote: remote,
@ -80,6 +83,36 @@ impl Context {
self.node_table.write().insert_many(nodes);
}
/// Every 10 seconds check if we have reached maximum number of outbound connections.
/// If not, connect to best peers.
pub fn autoconnect(context: Arc<Context>, handle: &Handle, config: NetConfig) {
let c = context.clone();
// every 10 seconds connect to new peers (if needed)
let interval: BoxedEmptyFuture = Interval::new(time::Duration::new(10, 0), handle).expect("Failed to create interval")
.and_then(move |_| {
let used_addresses = context.connections.addresses();
let max = context.connection_counter.max_outbound_connections() as usize;
let needed = context.connection_counter.outbound_connections_needed() as usize;
let peers = context.node_table.read().nodes_with_services(&Services::default(), max);
let addresses = peers.into_iter()
.map(|peer| peer.address())
.filter(|address| !used_addresses.contains(&address))
.take(needed)
.collect::<Vec<_>>();
trace!("connected to addresses: {:?}", used_addresses);
trace!("connecting to addresses: {:?}", addresses);
for address in addresses {
Context::connect::<NormalSessionFactory>(context.clone(), address, config.clone());
}
Ok(())
})
.for_each(|_| Ok(()))
.then(|_| finished(()))
.boxed();
c.spawn(interval);
}
/// Connect to socket using given context and handle.
fn connect_future<T>(context: Arc<Context>, socket: net::SocketAddr, handle: &Handle, config: &NetConfig) -> BoxedEmptyFuture where T: SessionFactory {
trace!("Trying to connect to: {}", socket);
@ -90,28 +123,34 @@ impl Context {
// successfull hanshake
trace!("Connected to {}", connection.address);
context.node_table.write().insert(connection.address, connection.services);
let channel = context.connections.store::<T>(context.clone(), connection);
let channel = context.connections.store::<T>(context.clone(), connection, Direction::Outbound);
// initialize session and then start reading messages
channel.session().initialize(channel.clone(), Direction::Outbound);
channel.session().initialize(channel.clone());
Context::on_message(context, channel)
},
Ok(DeadlineStatus::Meet(Err(err))) => {
Ok(DeadlineStatus::Meet(Err(_))) => {
// protocol error
trace!("Handshake with {} failed", socket);
// TODO: close socket
finished(Err(err)).boxed()
context.node_table.write().note_failure(&socket);
context.connection_counter.note_close_inbound_connection();
finished(Ok(())).boxed()
},
Ok(DeadlineStatus::Timeout) => {
// connection time out
trace!("Handshake with {} timedout", socket);
trace!("Handshake with {} timed out", socket);
// TODO: close socket
context.node_table.write().note_failure(&socket);
context.connection_counter.note_close_inbound_connection();
finished(Ok(())).boxed()
},
Err(err) => {
Err(_) => {
// network error
trace!("Unable to connect to {}", socket);
failed(err).boxed()
context.node_table.write().note_failure(&socket);
context.connection_counter.note_close_inbound_connection();
finished(Ok(())).boxed()
}
}
})
@ -121,6 +160,7 @@ impl Context {
/// Connect to socket using given context.
pub fn connect<T>(context: Arc<Context>, socket: net::SocketAddr, config: NetConfig) where T: SessionFactory {
context.connection_counter.note_new_outbound_connection();
context.remote.clone().spawn(move |handle| {
context.pool.clone().spawn(Context::connect_future::<T>(context, socket, handle, &config))
})
@ -133,26 +173,32 @@ impl Context {
// successfull hanshake
trace!("Accepted connection from {}", connection.address);
context.node_table.write().insert(connection.address, connection.services);
let channel = context.connections.store::<NormalSessionFactory>(context.clone(), connection);
let channel = context.connections.store::<NormalSessionFactory>(context.clone(), connection, Direction::Inbound);
// initialize session and then start reading messages
channel.session().initialize(channel.clone(), Direction::Inbound);
channel.session().initialize(channel.clone());
Context::on_message(context.clone(), channel)
},
Ok(DeadlineStatus::Meet(Err(err))) => {
Ok(DeadlineStatus::Meet(Err(_))) => {
// protocol error
// TODO: close socket
finished(Err(err)).boxed()
context.node_table.write().note_failure(&socket);
context.connection_counter.note_close_outbound_connection();
finished(Ok(())).boxed()
},
Ok(DeadlineStatus::Timeout) => {
// connection time out
trace!("Handshake with {} timedout", socket);
// TODO: close socket
context.node_table.write().note_failure(&socket);
context.connection_counter.note_close_outbound_connection();
finished(Ok(())).boxed()
},
Err(err) => {
Err(_) => {
// network error
failed(err).boxed()
context.node_table.write().note_failure(&socket);
context.connection_counter.note_close_outbound_connection();
finished(Ok(())).boxed()
}
}
})
@ -161,6 +207,7 @@ impl Context {
}
pub fn accept_connection(context: Arc<Context>, stream: TcpStream, socket: net::SocketAddr, config: NetConfig) {
context.connection_counter.note_new_inbound_connection();
context.remote.clone().spawn(move |handle| {
context.pool.clone().spawn(Context::accept_connection_future(context, stream, socket, handle, config))
})
@ -172,7 +219,15 @@ impl Context {
let server = try!(TcpListener::bind(&config.local_address, handle));
let server = server.incoming()
.and_then(move |(stream, socket)| {
Context::accept_connection(context.clone(), stream, socket, config.clone());
// because we acquire atomic value twice,
// it may happen that accept slightly more connections than we need
// we don't mind
if context.connection_counter.inbound_connections_needed() > 0 {
Context::accept_connection(context.clone(), stream, socket, config.clone());
} else {
// ignore result
let _ = stream.shutdown(net::Shutdown::Both);
}
Ok(())
})
.for_each(|_| Ok(()))
@ -210,6 +265,7 @@ impl Context {
},
Err(err) => {
// network error
// TODO: remote node was just turned off. should we mark it as not reliable?
context.close_channel_with_error(channel.peer_info().id, &err);
failed(err).boxed()
}
@ -251,20 +307,29 @@ impl Context {
/// Close channel with given peer info.
pub fn close_channel(&self, id: PeerId) {
if let Some(channel) = self.connections.remove(id) {
let info = channel.peer_info();
channel.session().on_close();
trace!("Disconnecting from {}", channel.peer_info().address);
trace!("Disconnecting from {}", info.address);
channel.shutdown();
match info.direction {
Direction::Inbound => self.connection_counter.note_close_inbound_connection(),
Direction::Outbound => self.connection_counter.note_close_outbound_connection(),
}
}
}
/// Close channel with given peer info.
pub fn close_channel_with_error(&self, id: PeerId, error: &error::Error) {
if let Some(channel) = self.connections.remove(id) {
let info = channel.peer_info();
channel.session().on_close();
let address = channel.peer_info().address;
trace!("Disconnecting from {} caused by {}", address, error.description());
trace!("Disconnecting from {} caused by {}", info.address, error.description());
channel.shutdown();
self.node_table.write().note_failure(&address);
self.node_table.write().note_failure(&info.address);
match info.direction {
Direction::Inbound => self.connection_counter.note_close_inbound_connection(),
Direction::Outbound => self.connection_counter.note_close_outbound_connection(),
}
}
}
@ -304,8 +369,8 @@ impl P2P {
P2P {
event_loop_handle: handle.clone(),
pool: pool.clone(),
context: Arc::new(Context::new(local_sync_node, pool, handle.remote().clone(), &config)),
config: config,
context: Arc::new(Context::new(local_sync_node, pool, handle.remote().clone())),
}
}
@ -319,6 +384,7 @@ impl P2P {
self.connect_to_seednode(&resolver, seed);
}
Context::autoconnect(self.context.clone(), &self.event_loop_handle, self.config.connection.clone());
try!(self.listen());
Ok(())
}

View File

@ -3,8 +3,9 @@ use std::time::Duration;
use bytes::Bytes;
use message::{Error, Command, deserialize_payload, Payload};
use message::types::{GetAddr, Addr};
use protocol::{Protocol, Direction};
use protocol::Protocol;
use p2p::Context;
use util::Direction;
use PeerId;
pub struct AddrProtocol {

View File

@ -6,16 +6,11 @@ use bytes::Bytes;
use message::Error;
use message::common::Command;
use util::Direction;
pub use self::addr::{AddrProtocol, SeednodeProtocol};
pub use self::ping::PingProtocol;
pub use self::sync::{SyncProtocol, InboundSyncConnection, InboundSyncConnectionRef, OutboundSyncConnection, OutboundSyncConnectionRef, LocalSyncNode, LocalSyncNodeRef};
#[derive(PartialEq, Clone, Copy)]
pub enum Direction {
Inbound,
Outbound,
}
pub trait Protocol: Send {
/// Initialize the protocol.
fn initialize(&mut self, _direction: Direction, _version: u32) {}

View File

@ -3,7 +3,8 @@ use bytes::Bytes;
use message::{Error, Payload, deserialize_payload};
use message::types::{Ping, Pong};
use message::common::Command;
use protocol::{Protocol, Direction};
use protocol::Protocol;
use util::Direction;
use util::nonce::{NonceGenerator, RandomNonce};
use p2p::Context;
use PeerId;
@ -73,8 +74,9 @@ mod tests {
use bytes::Bytes;
use message::{Payload, serialize_payload};
use message::types::{Ping, Pong};
use util::Direction;
use util::nonce::StaticNonce;
use protocol::{Protocol, Direction};
use protocol::Protocol;
use PeerId;
use super::{PingProtocol, PingContext};

View File

@ -1,7 +1,8 @@
use std::sync::Arc;
use bytes::Bytes;
use message::{Command, Error, Payload, types, deserialize_payload};
use protocol::{Protocol, Direction};
use protocol::Protocol;
use util::Direction;
use p2p::Context;
use PeerId;

View File

@ -4,7 +4,7 @@ use bytes::Bytes;
use message::{Command, Error};
use p2p::Context;
use net::Channel;
use protocol::{Protocol, PingProtocol, SyncProtocol, AddrProtocol, SeednodeProtocol, Direction};
use protocol::{Protocol, PingProtocol, SyncProtocol, AddrProtocol, SeednodeProtocol};
use PeerId;
pub trait SessionFactory {
@ -44,9 +44,9 @@ impl Session {
}
}
pub fn initialize(&self, channel: Arc<Channel>, direction: Direction) {
pub fn initialize(&self, channel: Arc<Channel>) {
for protocol in self.protocols.lock().iter_mut() {
protocol.initialize(direction, channel.version());
protocol.initialize(channel.peer_info().direction, channel.version());
}
}

View File

@ -4,4 +4,4 @@ mod node_table;
mod peer;
pub use self::node_table::{NodeTable, Node};
pub use self::peer::{PeerId, PeerInfo};
pub use self::peer::{PeerId, PeerInfo, Direction};

View File

@ -18,6 +18,12 @@ pub struct Node {
failures: u32,
}
impl Node {
pub fn address(&self) -> SocketAddr {
self.addr
}
}
impl From<AddressEntry> for Node {
fn from(entry: AddressEntry) -> Self {
Node {
@ -168,7 +174,7 @@ impl<T> NodeTable<T> where T: Time {
}
/// Returnes most reliable nodes with desired services.
pub fn _nodes_with_services(&self, services: &Services, limit: usize) -> Vec<Node> {
pub fn nodes_with_services(&self, services: &Services, limit: usize) -> Vec<Node> {
self.by_score.iter()
.filter(|node| node.0.services.includes(services))
.map(|node| node.0.clone())
@ -230,7 +236,7 @@ mod tests {
table.insert(s0, Services::default());
table.insert(s1, Services::default());
table.insert(s2, Services::default());
let nodes = table._nodes_with_services(&Services::default(), 2);
let nodes = table.nodes_with_services(&Services::default(), 2);
assert_eq!(nodes.len(), 2);
assert_eq!(nodes[0].addr, s2);
assert_eq!(nodes[0].time, 2);
@ -258,7 +264,7 @@ mod tests {
table.note_used(&s1);
table.note_failure(&s2);
table.note_failure(&s3);
let nodes = table._nodes_with_services(&Services::default(), 10);
let nodes = table.nodes_with_services(&Services::default(), 10);
assert_eq!(nodes.len(), 5);
assert_eq!(nodes[0].addr, s1);

View File

@ -2,9 +2,16 @@ use std::net::SocketAddr;
pub type PeerId = usize;
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum Direction {
Inbound,
Outbound,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct PeerInfo {
pub id: PeerId,
pub address: SocketAddr,
pub direction: Direction,
}