2016-11-30 07:01:11 -08:00
|
|
|
use std::{io, net, error, time};
|
2016-10-12 10:39:50 -07:00
|
|
|
use std::sync::Arc;
|
2016-12-12 06:18:05 -08:00
|
|
|
use std::net::SocketAddr;
|
2016-10-18 03:14:54 -07:00
|
|
|
use parking_lot::RwLock;
|
2016-10-19 17:51:20 -07:00
|
|
|
use futures::{Future, finished, failed, BoxFuture};
|
2016-10-12 10:39:50 -07:00
|
|
|
use futures::stream::Stream;
|
|
|
|
use futures_cpupool::CpuPool;
|
2017-03-25 02:20:45 -07:00
|
|
|
use tokio_io::IoFuture;
|
2016-10-27 05:57:14 -07:00
|
|
|
use tokio_core::net::{TcpListener, TcpStream};
|
2016-11-02 16:22:00 -07:00
|
|
|
use tokio_core::reactor::{Handle, Remote, Timeout, Interval};
|
2016-10-25 00:55:43 -07:00
|
|
|
use abstract_ns::Resolver;
|
|
|
|
use ns_dns_tokio::DnsResolver;
|
2016-11-15 03:48:01 -08:00
|
|
|
use message::{Payload, MessageResult, Message};
|
2016-11-02 16:22:00 -07:00
|
|
|
use message::common::Services;
|
|
|
|
use net::{connect, Connections, Channel, Config as NetConfig, accept_connection, ConnectionCounter};
|
2016-12-12 06:18:05 -08:00
|
|
|
use util::{NodeTable, Node, NodeTableError, Direction};
|
2016-10-24 07:39:20 -07:00
|
|
|
use session::{SessionFactory, SeednodeSessionFactory, NormalSessionFactory};
|
2016-10-26 02:45:51 -07:00
|
|
|
use {Config, PeerId};
|
2016-10-24 07:38:33 -07:00
|
|
|
use protocol::{LocalSyncNodeRef, InboundSyncConnectionRef, OutboundSyncConnectionRef};
|
2016-10-27 05:57:14 -07:00
|
|
|
use io::DeadlineStatus;
|
2016-10-19 17:51:20 -07:00
|
|
|
|
2016-10-19 18:14:42 -07:00
|
|
|
pub type BoxedEmptyFuture = BoxFuture<(), ()>;
|
2016-10-19 17:51:20 -07:00
|
|
|
|
|
|
|
/// Network context.
|
|
|
|
pub struct Context {
|
|
|
|
/// Connections.
|
|
|
|
connections: Connections,
|
2016-11-02 16:22:00 -07:00
|
|
|
/// Connection counter.
|
|
|
|
connection_counter: ConnectionCounter,
|
2016-10-19 17:51:20 -07:00
|
|
|
/// Node Table.
|
|
|
|
node_table: RwLock<NodeTable>,
|
2016-10-24 00:42:11 -07:00
|
|
|
/// Thread pool handle.
|
|
|
|
pool: CpuPool,
|
|
|
|
/// Remote event loop handle.
|
|
|
|
remote: Remote,
|
2016-10-24 07:38:33 -07:00
|
|
|
/// Local synchronization node.
|
|
|
|
local_sync_node: LocalSyncNodeRef,
|
2016-11-04 04:08:58 -07:00
|
|
|
/// Node table path.
|
2016-11-30 07:01:11 -08:00
|
|
|
config: Config,
|
2016-10-19 17:51:20 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Context {
|
2016-10-25 02:02:37 -07:00
|
|
|
/// Creates new context with reference to local sync node, thread pool and event loop.
|
2016-11-30 07:01:11 -08:00
|
|
|
pub fn new(local_sync_node: LocalSyncNodeRef, pool_handle: CpuPool, remote: Remote, config: Config) -> Result<Self, Box<error::Error>> {
|
2016-11-04 04:08:58 -07:00
|
|
|
let context = Context {
|
2016-10-24 00:42:11 -07:00
|
|
|
connections: Default::default(),
|
2016-11-02 16:22:00 -07:00
|
|
|
connection_counter: ConnectionCounter::new(config.inbound_connections, config.outbound_connections),
|
2016-11-04 04:08:58 -07:00
|
|
|
node_table: RwLock::new(try!(NodeTable::from_file(&config.node_table_path))),
|
2016-10-24 00:42:11 -07:00
|
|
|
pool: pool_handle,
|
|
|
|
remote: remote,
|
2016-10-24 07:38:33 -07:00
|
|
|
local_sync_node: local_sync_node,
|
2016-11-30 07:01:11 -08:00
|
|
|
config: config,
|
2016-11-04 04:08:58 -07:00
|
|
|
};
|
|
|
|
|
|
|
|
Ok(context)
|
2016-10-24 00:42:11 -07:00
|
|
|
}
|
|
|
|
|
2016-10-25 02:02:37 -07:00
|
|
|
/// Spawns a future using thread pool and schedules execution of it with event loop handle.
|
2016-10-24 00:42:11 -07:00
|
|
|
pub fn spawn<F>(&self, f: F) where F: Future + Send + 'static, F::Item: Send + 'static, F::Error: Send + 'static {
|
|
|
|
let pool_work = self.pool.spawn(f);
|
2016-10-25 02:02:37 -07:00
|
|
|
self.remote.spawn(move |_handle| {
|
|
|
|
pool_work.then(|_| finished(()))
|
2016-10-24 00:42:11 -07:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2016-10-26 02:45:51 -07:00
|
|
|
/// Schedules execution of function in future.
|
|
|
|
/// Use wisely, it keeps used objects in memory until after it is resolved.
|
|
|
|
pub fn execute_after<F>(&self, duration: time::Duration, f: F) where F: FnOnce() + 'static + Send {
|
|
|
|
let pool = self.pool.clone();
|
|
|
|
self.remote.spawn(move |handle| {
|
|
|
|
let timeout = Timeout::new(duration, handle)
|
|
|
|
.expect("Expected to schedule timeout")
|
|
|
|
.then(move |_| {
|
|
|
|
f();
|
|
|
|
finished(())
|
|
|
|
});
|
|
|
|
pool.spawn(timeout)
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2016-10-25 02:02:37 -07:00
|
|
|
/// Returns addresses of recently active nodes. Sorted and limited to 1000.
|
2016-10-24 00:42:11 -07:00
|
|
|
pub fn node_table_entries(&self) -> Vec<Node> {
|
2016-11-30 07:01:11 -08:00
|
|
|
self.node_table.read().recently_active_nodes(self.config.internet_protocol)
|
2016-10-24 00:42:11 -07:00
|
|
|
}
|
|
|
|
|
2016-10-25 02:02:37 -07:00
|
|
|
/// Updates node table.
|
2016-10-24 00:42:11 -07:00
|
|
|
pub fn update_node_table(&self, nodes: Vec<Node>) {
|
|
|
|
trace!("Updating node table with {} entries", nodes.len());
|
|
|
|
self.node_table.write().insert_many(nodes);
|
|
|
|
}
|
|
|
|
|
2016-12-12 06:18:05 -08:00
|
|
|
/// Adds node to table.
|
2016-12-12 09:28:39 -08:00
|
|
|
pub fn add_node(&self, addr: SocketAddr) -> Result<(), NodeTableError> {
|
2016-12-12 06:18:05 -08:00
|
|
|
trace!("Adding node {} to node table", &addr);
|
2016-12-12 09:28:39 -08:00
|
|
|
self.node_table.write().add(addr, self.config.connection.services)
|
2016-12-12 06:18:05 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Removes node from table.
|
|
|
|
pub fn remove_node(&self, addr: SocketAddr) -> Result<(), NodeTableError> {
|
|
|
|
trace!("Removing node {} from node table", &addr);
|
|
|
|
self.node_table.write().remove(&addr)
|
|
|
|
}
|
|
|
|
|
2016-11-02 16:22:00 -07:00
|
|
|
/// Every 10 seconds check if we have reached maximum number of outbound connections.
|
|
|
|
/// If not, connect to best peers.
|
2016-12-12 09:28:39 -08:00
|
|
|
pub fn autoconnect(context: Arc<Context>, handle: &Handle) {
|
2016-11-02 16:22:00 -07:00
|
|
|
let c = context.clone();
|
|
|
|
// every 10 seconds connect to new peers (if needed)
|
2017-04-13 23:28:41 -07:00
|
|
|
let interval: BoxedEmptyFuture = Interval::new_at(time::Instant::now(), time::Duration::new(10, 0), handle).expect("Failed to create interval")
|
2016-11-02 16:22:00 -07:00
|
|
|
.and_then(move |_| {
|
2016-11-02 17:13:43 -07:00
|
|
|
// print traces
|
|
|
|
let ic = context.connection_counter.inbound_connections();
|
|
|
|
let oc = context.connection_counter.outbound_connections();
|
|
|
|
info!("Inbound connections: ({}/{})", ic.0, ic.1);
|
|
|
|
info!("Outbound connections: ({}/{})", oc.0, oc.1);
|
|
|
|
|
2017-01-11 00:48:40 -08:00
|
|
|
for channel in context.connections.channels().values() {
|
|
|
|
channel.session().maintain();
|
|
|
|
}
|
|
|
|
|
2016-11-02 16:22:00 -07:00
|
|
|
let needed = context.connection_counter.outbound_connections_needed() as usize;
|
2017-04-13 23:28:41 -07:00
|
|
|
if needed != 0 {
|
|
|
|
let used_addresses = context.connections.addresses();
|
|
|
|
let peers = context.node_table.read().nodes_with_services(&Services::default(), context.config.internet_protocol, &used_addresses, needed);
|
|
|
|
let addresses = peers.into_iter()
|
|
|
|
.map(|peer| peer.address())
|
|
|
|
.collect::<Vec<_>>();
|
2016-11-02 16:22:00 -07:00
|
|
|
|
2017-04-13 23:28:41 -07:00
|
|
|
trace!("Creating {} more outbound connections", addresses.len());
|
|
|
|
for address in addresses {
|
|
|
|
Context::connect::<NormalSessionFactory>(context.clone(), address);
|
|
|
|
}
|
2016-11-02 16:22:00 -07:00
|
|
|
}
|
2016-11-02 17:13:43 -07:00
|
|
|
|
2016-11-30 07:01:11 -08:00
|
|
|
if let Err(_err) = context.node_table.read().save_to_file(&context.config.node_table_path) {
|
2016-11-04 04:08:58 -07:00
|
|
|
error!("Saving node table to disk failed");
|
|
|
|
}
|
|
|
|
|
2016-11-02 16:22:00 -07:00
|
|
|
Ok(())
|
|
|
|
})
|
|
|
|
.for_each(|_| Ok(()))
|
|
|
|
.then(|_| finished(()))
|
|
|
|
.boxed();
|
|
|
|
c.spawn(interval);
|
|
|
|
}
|
|
|
|
|
2016-10-25 02:02:37 -07:00
|
|
|
/// Connect to socket using given context and handle.
|
|
|
|
fn connect_future<T>(context: Arc<Context>, socket: net::SocketAddr, handle: &Handle, config: &NetConfig) -> BoxedEmptyFuture where T: SessionFactory {
|
2016-10-19 17:51:20 -07:00
|
|
|
trace!("Trying to connect to: {}", socket);
|
|
|
|
let connection = connect(&socket, handle, config);
|
|
|
|
connection.then(move |result| {
|
|
|
|
match result {
|
2016-10-27 05:57:14 -07:00
|
|
|
Ok(DeadlineStatus::Meet(Ok(connection))) => {
|
2016-10-19 17:51:20 -07:00
|
|
|
// successfull hanshake
|
|
|
|
trace!("Connected to {}", connection.address);
|
|
|
|
context.node_table.write().insert(connection.address, connection.services);
|
2016-11-02 16:22:00 -07:00
|
|
|
let channel = context.connections.store::<T>(context.clone(), connection, Direction::Outbound);
|
2016-10-20 04:19:19 -07:00
|
|
|
|
|
|
|
// initialize session and then start reading messages
|
2016-11-09 13:36:29 -08:00
|
|
|
channel.session().initialize();
|
2016-10-24 07:39:20 -07:00
|
|
|
Context::on_message(context, channel)
|
2016-10-19 17:51:20 -07:00
|
|
|
},
|
2016-11-02 16:22:00 -07:00
|
|
|
Ok(DeadlineStatus::Meet(Err(_))) => {
|
2016-10-19 17:51:20 -07:00
|
|
|
// protocol error
|
2016-10-19 18:14:42 -07:00
|
|
|
trace!("Handshake with {} failed", socket);
|
|
|
|
// TODO: close socket
|
2016-11-02 16:22:00 -07:00
|
|
|
context.node_table.write().note_failure(&socket);
|
2016-11-02 17:13:43 -07:00
|
|
|
context.connection_counter.note_close_outbound_connection();
|
2016-11-02 16:22:00 -07:00
|
|
|
finished(Ok(())).boxed()
|
2016-10-19 17:51:20 -07:00
|
|
|
},
|
2016-10-27 05:57:14 -07:00
|
|
|
Ok(DeadlineStatus::Timeout) => {
|
|
|
|
// connection time out
|
2016-11-02 16:22:00 -07:00
|
|
|
trace!("Handshake with {} timed out", socket);
|
2016-10-27 05:57:14 -07:00
|
|
|
// TODO: close socket
|
2016-11-02 16:22:00 -07:00
|
|
|
context.node_table.write().note_failure(&socket);
|
2016-11-02 17:13:43 -07:00
|
|
|
context.connection_counter.note_close_outbound_connection();
|
2016-10-27 05:57:14 -07:00
|
|
|
finished(Ok(())).boxed()
|
|
|
|
},
|
2016-11-02 16:22:00 -07:00
|
|
|
Err(_) => {
|
2016-10-19 17:51:20 -07:00
|
|
|
// network error
|
2016-10-19 18:14:42 -07:00
|
|
|
trace!("Unable to connect to {}", socket);
|
2016-11-02 16:22:00 -07:00
|
|
|
context.node_table.write().note_failure(&socket);
|
2016-11-02 17:13:43 -07:00
|
|
|
context.connection_counter.note_close_outbound_connection();
|
2016-11-02 16:22:00 -07:00
|
|
|
finished(Ok(())).boxed()
|
2016-10-19 17:51:20 -07:00
|
|
|
}
|
|
|
|
}
|
2016-10-19 18:14:42 -07:00
|
|
|
})
|
|
|
|
.then(|_| finished(()))
|
|
|
|
.boxed()
|
2016-10-19 17:51:20 -07:00
|
|
|
}
|
|
|
|
|
2016-10-25 02:02:37 -07:00
|
|
|
/// Connect to socket using given context.
|
2016-12-12 09:28:39 -08:00
|
|
|
pub fn connect<T>(context: Arc<Context>, socket: net::SocketAddr) where T: SessionFactory {
|
2016-11-02 16:22:00 -07:00
|
|
|
context.connection_counter.note_new_outbound_connection();
|
2016-10-25 02:02:37 -07:00
|
|
|
context.remote.clone().spawn(move |handle| {
|
2016-12-12 09:28:39 -08:00
|
|
|
let config = context.config.clone();
|
|
|
|
context.pool.clone().spawn(Context::connect_future::<T>(context, socket, handle, &config.connection))
|
2016-10-25 02:02:37 -07:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2016-12-12 09:28:39 -08:00
|
|
|
pub fn connect_normal(context: Arc<Context>, socket: net::SocketAddr) {
|
|
|
|
Self::connect::<NormalSessionFactory>(context, socket)
|
|
|
|
}
|
|
|
|
|
2016-10-27 05:57:14 -07:00
|
|
|
pub fn accept_connection_future(context: Arc<Context>, stream: TcpStream, socket: net::SocketAddr, handle: &Handle, config: NetConfig) -> BoxedEmptyFuture {
|
|
|
|
accept_connection(stream, handle, &config, socket).then(move |result| {
|
2016-10-19 17:51:20 -07:00
|
|
|
match result {
|
2016-10-27 05:57:14 -07:00
|
|
|
Ok(DeadlineStatus::Meet(Ok(connection))) => {
|
2016-10-19 17:51:20 -07:00
|
|
|
// successfull hanshake
|
|
|
|
trace!("Accepted connection from {}", connection.address);
|
|
|
|
context.node_table.write().insert(connection.address, connection.services);
|
2016-11-02 16:22:00 -07:00
|
|
|
let channel = context.connections.store::<NormalSessionFactory>(context.clone(), connection, Direction::Inbound);
|
2016-10-20 04:19:19 -07:00
|
|
|
|
|
|
|
// initialize session and then start reading messages
|
2016-11-09 13:36:29 -08:00
|
|
|
channel.session().initialize();
|
2016-10-24 07:39:20 -07:00
|
|
|
Context::on_message(context.clone(), channel)
|
2016-10-19 17:51:20 -07:00
|
|
|
},
|
2016-11-05 07:32:57 -07:00
|
|
|
Ok(DeadlineStatus::Meet(Err(err))) => {
|
2016-10-19 17:51:20 -07:00
|
|
|
// protocol error
|
2016-11-05 07:32:57 -07:00
|
|
|
trace!("Accepting handshake from {} failed with error: {}", socket, err);
|
2016-10-19 18:14:42 -07:00
|
|
|
// TODO: close socket
|
2016-11-02 16:22:00 -07:00
|
|
|
context.node_table.write().note_failure(&socket);
|
2016-11-02 17:13:43 -07:00
|
|
|
context.connection_counter.note_close_inbound_connection();
|
2016-11-02 16:22:00 -07:00
|
|
|
finished(Ok(())).boxed()
|
2016-10-19 17:51:20 -07:00
|
|
|
},
|
2016-10-27 05:57:14 -07:00
|
|
|
Ok(DeadlineStatus::Timeout) => {
|
|
|
|
// connection time out
|
2016-11-05 07:32:57 -07:00
|
|
|
trace!("Accepting handshake from {} timed out", socket);
|
2016-10-27 05:57:14 -07:00
|
|
|
// TODO: close socket
|
2016-11-02 16:22:00 -07:00
|
|
|
context.node_table.write().note_failure(&socket);
|
2016-11-02 17:13:43 -07:00
|
|
|
context.connection_counter.note_close_inbound_connection();
|
2016-10-27 05:57:14 -07:00
|
|
|
finished(Ok(())).boxed()
|
|
|
|
},
|
2016-11-02 16:22:00 -07:00
|
|
|
Err(_) => {
|
2016-10-19 17:51:20 -07:00
|
|
|
// network error
|
2016-11-05 07:32:57 -07:00
|
|
|
trace!("Accepting handshake from {} failed with network error", socket);
|
2016-11-02 16:22:00 -07:00
|
|
|
context.node_table.write().note_failure(&socket);
|
2016-11-02 17:13:43 -07:00
|
|
|
context.connection_counter.note_close_inbound_connection();
|
2016-11-02 16:22:00 -07:00
|
|
|
finished(Ok(())).boxed()
|
2016-10-19 17:51:20 -07:00
|
|
|
}
|
|
|
|
}
|
2016-10-19 18:14:42 -07:00
|
|
|
})
|
|
|
|
.then(|_| finished(()))
|
2016-10-27 05:57:14 -07:00
|
|
|
.boxed()
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn accept_connection(context: Arc<Context>, stream: TcpStream, socket: net::SocketAddr, config: NetConfig) {
|
2016-11-02 16:22:00 -07:00
|
|
|
context.connection_counter.note_new_inbound_connection();
|
2016-10-27 05:57:14 -07:00
|
|
|
context.remote.clone().spawn(move |handle| {
|
|
|
|
context.pool.clone().spawn(Context::accept_connection_future(context, stream, socket, handle, config))
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Starts tcp server and listens for incomming connections.
|
|
|
|
pub fn listen(context: Arc<Context>, handle: &Handle, config: NetConfig) -> Result<BoxedEmptyFuture, io::Error> {
|
|
|
|
trace!("Starting tcp server");
|
|
|
|
let server = try!(TcpListener::bind(&config.local_address, handle));
|
|
|
|
let server = server.incoming()
|
|
|
|
.and_then(move |(stream, socket)| {
|
2016-11-02 16:22:00 -07:00
|
|
|
// because we acquire atomic value twice,
|
|
|
|
// it may happen that accept slightly more connections than we need
|
|
|
|
// we don't mind
|
|
|
|
if context.connection_counter.inbound_connections_needed() > 0 {
|
|
|
|
Context::accept_connection(context.clone(), stream, socket, config.clone());
|
|
|
|
} else {
|
|
|
|
// ignore result
|
|
|
|
let _ = stream.shutdown(net::Shutdown::Both);
|
|
|
|
}
|
2016-10-27 05:57:14 -07:00
|
|
|
Ok(())
|
|
|
|
})
|
|
|
|
.for_each(|_| Ok(()))
|
|
|
|
.then(|_| finished(()))
|
|
|
|
.boxed();
|
2016-10-19 17:51:20 -07:00
|
|
|
Ok(server)
|
|
|
|
}
|
|
|
|
|
2016-10-25 02:02:37 -07:00
|
|
|
/// Called on incomming mesage.
|
2016-10-24 05:55:08 -07:00
|
|
|
pub fn on_message(context: Arc<Context>, channel: Arc<Channel>) -> IoFuture<MessageResult<()>> {
|
2016-10-19 17:51:20 -07:00
|
|
|
channel.read_message().then(move |result| {
|
|
|
|
match result {
|
2016-10-20 04:19:19 -07:00
|
|
|
Ok(Ok((command, payload))) => {
|
2016-10-19 17:51:20 -07:00
|
|
|
// successful read
|
|
|
|
trace!("Received {} message from {}", command, channel.peer_info().address);
|
2016-10-20 04:19:19 -07:00
|
|
|
// handle message and read the next one
|
2016-11-09 13:36:29 -08:00
|
|
|
match channel.session().on_message(command, payload) {
|
2016-10-24 01:14:01 -07:00
|
|
|
Ok(_) => {
|
|
|
|
context.node_table.write().note_used(&channel.peer_info().address);
|
2016-10-24 05:55:08 -07:00
|
|
|
let on_message = Context::on_message(context.clone(), channel);
|
|
|
|
context.spawn(on_message);
|
|
|
|
finished(Ok(())).boxed()
|
2016-10-24 01:14:01 -07:00
|
|
|
},
|
2016-10-24 00:42:11 -07:00
|
|
|
Err(err) => {
|
|
|
|
// protocol error
|
2016-10-26 02:45:51 -07:00
|
|
|
context.close_channel_with_error(channel.peer_info().id, &err);
|
2016-10-24 00:42:11 -07:00
|
|
|
finished(Err(err)).boxed()
|
|
|
|
}
|
|
|
|
}
|
2016-10-19 17:51:20 -07:00
|
|
|
},
|
|
|
|
Ok(Err(err)) => {
|
|
|
|
// protocol error
|
2016-10-26 02:45:51 -07:00
|
|
|
context.close_channel_with_error(channel.peer_info().id, &err);
|
2016-10-19 17:51:20 -07:00
|
|
|
finished(Err(err)).boxed()
|
|
|
|
},
|
|
|
|
Err(err) => {
|
|
|
|
// network error
|
2016-11-02 16:22:00 -07:00
|
|
|
// TODO: remote node was just turned off. should we mark it as not reliable?
|
2016-10-26 02:45:51 -07:00
|
|
|
context.close_channel_with_error(channel.peer_info().id, &err);
|
2016-10-19 17:51:20 -07:00
|
|
|
failed(err).boxed()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}).boxed()
|
|
|
|
}
|
|
|
|
|
2016-10-25 02:02:37 -07:00
|
|
|
/// Send message to a channel with given peer id.
|
2016-10-24 00:42:11 -07:00
|
|
|
pub fn send_to_peer<T>(context: Arc<Context>, peer: PeerId, payload: &T) -> IoFuture<()> where T: Payload {
|
|
|
|
match context.connections.channel(peer) {
|
2016-11-15 03:48:01 -08:00
|
|
|
Some(channel) => {
|
|
|
|
let info = channel.peer_info();
|
|
|
|
let message = Message::new(info.magic, info.version, payload).expect("failed to create outgoing message");
|
2016-12-21 07:57:29 -08:00
|
|
|
channel.session().stats().lock().report_send(T::command().into(), message.len());
|
2016-11-15 03:48:01 -08:00
|
|
|
Context::send(context, channel, message)
|
|
|
|
},
|
|
|
|
None => {
|
|
|
|
// peer no longer exists.
|
|
|
|
// TODO: should we return error here?
|
|
|
|
finished(()).boxed()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn send_message_to_peer<T>(context: Arc<Context>, peer: PeerId, message: T) -> IoFuture<()> where T: AsRef<[u8]> + Send + 'static {
|
|
|
|
match context.connections.channel(peer) {
|
|
|
|
Some(channel) => Context::send(context, channel, message),
|
2016-10-24 00:42:11 -07:00
|
|
|
None => {
|
|
|
|
// peer no longer exists.
|
|
|
|
// TODO: should we return error here?
|
|
|
|
finished(()).boxed()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-10-25 02:02:37 -07:00
|
|
|
/// Send message using given channel.
|
2016-11-15 03:48:01 -08:00
|
|
|
pub fn send<T>(_context: Arc<Context>, channel: Arc<Channel>, message: T) -> IoFuture<()> where T: AsRef<[u8]> + Send + 'static {
|
|
|
|
//trace!("Sending {} message to {}", T::command(), channel.peer_info().address);
|
|
|
|
channel.write_message(message).then(move |result| {
|
2016-10-20 04:19:19 -07:00
|
|
|
match result {
|
|
|
|
Ok(_) => {
|
|
|
|
// successful send
|
2016-11-15 03:48:01 -08:00
|
|
|
//trace!("Sent {} message to {}", T::command(), channel.peer_info().address);
|
2016-10-20 04:19:19 -07:00
|
|
|
finished(()).boxed()
|
|
|
|
},
|
|
|
|
Err(err) => {
|
|
|
|
// network error
|
2016-10-21 01:55:37 -07:00
|
|
|
// closing connection is handled in on_message`
|
2016-10-20 04:19:19 -07:00
|
|
|
failed(err).boxed()
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}).boxed()
|
2016-10-19 17:51:20 -07:00
|
|
|
}
|
2016-10-19 18:14:42 -07:00
|
|
|
|
2016-10-25 02:02:37 -07:00
|
|
|
/// Close channel with given peer info.
|
2016-10-26 02:45:51 -07:00
|
|
|
pub fn close_channel(&self, id: PeerId) {
|
|
|
|
if let Some(channel) = self.connections.remove(id) {
|
2016-11-02 16:22:00 -07:00
|
|
|
let info = channel.peer_info();
|
2016-10-31 06:44:48 -07:00
|
|
|
channel.session().on_close();
|
2016-11-02 16:22:00 -07:00
|
|
|
trace!("Disconnecting from {}", info.address);
|
2016-10-26 02:45:51 -07:00
|
|
|
channel.shutdown();
|
2016-11-02 16:22:00 -07:00
|
|
|
match info.direction {
|
|
|
|
Direction::Inbound => self.connection_counter.note_close_inbound_connection(),
|
|
|
|
Direction::Outbound => self.connection_counter.note_close_outbound_connection(),
|
|
|
|
}
|
2016-10-26 02:45:51 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Close channel with given peer info.
|
|
|
|
pub fn close_channel_with_error(&self, id: PeerId, error: &error::Error) {
|
|
|
|
if let Some(channel) = self.connections.remove(id) {
|
2016-11-02 16:22:00 -07:00
|
|
|
let info = channel.peer_info();
|
2016-10-31 06:44:48 -07:00
|
|
|
channel.session().on_close();
|
2016-11-02 16:22:00 -07:00
|
|
|
trace!("Disconnecting from {} caused by {}", info.address, error.description());
|
2016-10-19 18:14:42 -07:00
|
|
|
channel.shutdown();
|
2016-11-02 16:22:00 -07:00
|
|
|
self.node_table.write().note_failure(&info.address);
|
|
|
|
match info.direction {
|
|
|
|
Direction::Inbound => self.connection_counter.note_close_inbound_connection(),
|
|
|
|
Direction::Outbound => self.connection_counter.note_close_outbound_connection(),
|
|
|
|
}
|
2016-10-19 18:14:42 -07:00
|
|
|
}
|
|
|
|
}
|
2016-10-24 07:38:33 -07:00
|
|
|
|
|
|
|
pub fn create_sync_session(&self, start_height: i32, outbound_connection: OutboundSyncConnectionRef) -> InboundSyncConnectionRef {
|
2016-10-29 02:52:40 -07:00
|
|
|
self.local_sync_node.create_sync_session(start_height, outbound_connection)
|
2016-10-24 07:38:33 -07:00
|
|
|
}
|
2016-12-13 05:23:16 -08:00
|
|
|
|
|
|
|
pub fn connections(&self) -> &Connections {
|
|
|
|
&self.connections
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn nodes(&self) -> Vec<Node> {
|
|
|
|
self.node_table.read().nodes()
|
|
|
|
}
|
2016-10-19 17:51:20 -07:00
|
|
|
}
|
2016-10-12 05:30:50 -07:00
|
|
|
|
|
|
|
pub struct P2P {
|
2016-10-12 10:39:50 -07:00
|
|
|
/// Global event loop handle.
|
|
|
|
event_loop_handle: Handle,
|
|
|
|
/// Worker thread pool.
|
|
|
|
pool: CpuPool,
|
|
|
|
/// P2P config.
|
|
|
|
config: Config,
|
2016-10-19 17:51:20 -07:00
|
|
|
/// Network context.
|
|
|
|
context: Arc<Context>,
|
2016-10-12 05:30:50 -07:00
|
|
|
}
|
|
|
|
|
2016-10-24 00:42:11 -07:00
|
|
|
impl Drop for P2P {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
// there are retain cycles
|
|
|
|
// context->connections->channel->session->protocol->context
|
|
|
|
// context->connections->channel->on_message closure->context
|
|
|
|
// first let's get rid of session retain cycle
|
|
|
|
for channel in &self.context.connections.remove_all() {
|
|
|
|
// done, now let's finish on_message
|
|
|
|
channel.shutdown();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-10-12 05:30:50 -07:00
|
|
|
impl P2P {
|
2016-11-04 04:08:58 -07:00
|
|
|
pub fn new(config: Config, local_sync_node: LocalSyncNodeRef, handle: Handle) -> Result<Self, Box<error::Error>> {
|
2016-10-17 16:44:52 -07:00
|
|
|
let pool = CpuPool::new(config.threads);
|
2016-10-12 10:39:50 -07:00
|
|
|
|
2016-11-30 07:01:11 -08:00
|
|
|
let context = try!(Context::new(local_sync_node, pool.clone(), handle.remote().clone(), config.clone()));
|
2016-11-04 04:08:58 -07:00
|
|
|
|
|
|
|
let p2p = P2P {
|
2016-10-12 10:39:50 -07:00
|
|
|
event_loop_handle: handle.clone(),
|
2016-11-04 04:08:58 -07:00
|
|
|
pool: pool,
|
|
|
|
context: Arc::new(context),
|
2016-10-12 10:39:50 -07:00
|
|
|
config: config,
|
2016-11-04 04:08:58 -07:00
|
|
|
};
|
|
|
|
|
|
|
|
Ok(p2p)
|
2016-10-12 05:30:50 -07:00
|
|
|
}
|
2016-10-12 10:39:50 -07:00
|
|
|
|
2016-10-25 00:55:43 -07:00
|
|
|
pub fn run(&self) -> Result<(), Box<error::Error>> {
|
2016-11-08 02:51:34 -08:00
|
|
|
for peer in &self.config.peers {
|
2016-10-24 07:39:20 -07:00
|
|
|
self.connect::<NormalSessionFactory>(*peer);
|
|
|
|
}
|
|
|
|
|
2016-10-25 00:55:43 -07:00
|
|
|
let resolver = try!(DnsResolver::system_config(&self.event_loop_handle));
|
2016-11-08 02:51:34 -08:00
|
|
|
for seed in &self.config.seeds {
|
2016-10-25 00:55:43 -07:00
|
|
|
self.connect_to_seednode(&resolver, seed);
|
2016-10-12 10:39:50 -07:00
|
|
|
}
|
|
|
|
|
2016-12-12 09:28:39 -08:00
|
|
|
Context::autoconnect(self.context.clone(), &self.event_loop_handle);
|
2016-10-13 06:24:37 -07:00
|
|
|
try!(self.listen());
|
|
|
|
Ok(())
|
2016-10-12 10:39:50 -07:00
|
|
|
}
|
|
|
|
|
2016-12-12 06:18:05 -08:00
|
|
|
/// Attempts to connect to the specified node
|
2016-11-02 00:25:09 -07:00
|
|
|
pub fn connect<T>(&self, addr: net::SocketAddr) where T: SessionFactory {
|
2016-12-12 09:28:39 -08:00
|
|
|
Context::connect::<T>(self.context.clone(), addr);
|
2016-12-12 06:18:05 -08:00
|
|
|
}
|
|
|
|
|
2016-10-25 00:55:43 -07:00
|
|
|
pub fn connect_to_seednode(&self, resolver: &Resolver, seednode: &str) {
|
|
|
|
let owned_seednode = seednode.to_owned();
|
|
|
|
let context = self.context.clone();
|
|
|
|
let dns_lookup = resolver.resolve(seednode).then(move |result| {
|
|
|
|
match result {
|
|
|
|
Ok(address) => match address.pick_one() {
|
|
|
|
Some(socket) => {
|
|
|
|
trace!("Dns lookup of seednode {} finished. Connecting to {}", owned_seednode, socket);
|
2016-12-12 09:28:39 -08:00
|
|
|
Context::connect::<SeednodeSessionFactory>(context, socket);
|
2016-10-25 00:55:43 -07:00
|
|
|
},
|
|
|
|
None => {
|
|
|
|
trace!("Dns lookup of seednode {} resolved with no results", owned_seednode);
|
|
|
|
}
|
|
|
|
},
|
|
|
|
Err(_err) => {
|
|
|
|
trace!("Dns lookup of seednode {} failed", owned_seednode);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
finished(())
|
|
|
|
});
|
|
|
|
let pool_work = self.pool.spawn(dns_lookup);
|
|
|
|
self.event_loop_handle.spawn(pool_work);
|
|
|
|
}
|
|
|
|
|
|
|
|
fn listen(&self) -> Result<(), Box<error::Error>> {
|
2016-10-19 18:14:42 -07:00
|
|
|
let server = try!(Context::listen(self.context.clone(), &self.event_loop_handle, self.config.connection.clone()));
|
2016-10-27 05:57:14 -07:00
|
|
|
self.event_loop_handle.spawn(server);
|
2016-10-13 00:17:29 -07:00
|
|
|
Ok(())
|
2016-10-12 10:39:50 -07:00
|
|
|
}
|
2016-12-12 10:18:43 -08:00
|
|
|
|
|
|
|
pub fn context(&self) -> &Arc<Context> {
|
|
|
|
&self.context
|
|
|
|
}
|
2016-10-12 05:30:50 -07:00
|
|
|
}
|