parity-zcash/p2p/src/p2p.rs

97 lines
2.5 KiB
Rust
Raw Normal View History

2016-10-12 10:39:50 -07:00
use std::{io, net};
use std::sync::Arc;
use futures::{Future, finished};
use futures::stream::Stream;
use futures_cpupool::CpuPool;
use tokio_core::reactor::Handle;
use message::Payload;
2016-10-13 06:24:37 -07:00
use net::{connect, listen, Connections, Subscriber, MessagesHandler};
2016-10-12 10:39:50 -07:00
use Config;
2016-10-12 05:30:50 -07:00
pub struct P2P {
2016-10-12 10:39:50 -07:00
/// Global event loop handle.
event_loop_handle: Handle,
/// Worker thread pool.
pool: CpuPool,
/// P2P config.
config: Config,
/// Connections.
connections: Arc<Connections>,
/// Message subscriber.
2016-10-13 06:24:37 -07:00
subscriber: Arc<Subscriber>,
2016-10-12 05:30:50 -07:00
}
impl P2P {
2016-10-12 10:39:50 -07:00
pub fn new(config: Config, handle: Handle) -> Self {
let pool = CpuPool::new(4);
2016-10-12 05:30:50 -07:00
P2P {
2016-10-12 10:39:50 -07:00
event_loop_handle: handle.clone(),
pool: pool.clone(),
config: config,
2016-10-13 00:59:09 -07:00
connections: Arc::new(Connections::new()),
2016-10-13 06:24:37 -07:00
subscriber: Arc::new(Subscriber::default()),
2016-10-12 05:30:50 -07:00
}
}
2016-10-12 10:39:50 -07:00
pub fn run(&self) -> Result<(), io::Error> {
for seednode in self.config.seednodes.iter() {
self.connect(*seednode)
}
2016-10-13 06:24:37 -07:00
try!(self.listen());
self.handle_messages();
Ok(())
2016-10-12 10:39:50 -07:00
}
2016-10-13 00:59:09 -07:00
pub fn connect(&self, ip: net::IpAddr) {
2016-10-12 10:39:50 -07:00
let socket = net::SocketAddr::new(ip, self.config.connection.magic.port());
let connections = self.connections.clone();
let connection = connect(&socket, &self.event_loop_handle, &self.config.connection);
let pool_work = self.pool.spawn(connection).then(move |x| {
if let Ok(Ok(con)) = x {
connections.store(con);
}
finished(())
});
self.event_loop_handle.spawn(pool_work);
}
fn listen(&self) -> Result<(), io::Error> {
2016-10-13 00:17:29 -07:00
let listen = try!(listen(&self.event_loop_handle, self.config.connection.clone()));
let connections = self.connections.clone();
let server = listen.for_each(move |x| {
if let Ok(con) = x {
2016-10-13 00:59:09 -07:00
connections.store(con);
2016-10-13 00:17:29 -07:00
}
Ok(())
}).then(|_| {
finished(())
});
2016-10-13 00:59:09 -07:00
let pool_work = self.pool.spawn(server);
self.event_loop_handle.spawn(pool_work);
2016-10-13 00:17:29 -07:00
Ok(())
2016-10-12 10:39:50 -07:00
}
2016-10-13 00:59:09 -07:00
2016-10-13 06:24:37 -07:00
fn handle_messages(&self) {
let incoming = MessagesHandler::new(Arc::downgrade(&self.connections));
let subscriber = self.subscriber.clone();
let connections = self.connections.clone();
let incoming_future = incoming.for_each(move |result| {
2016-10-14 01:55:28 -07:00
let (command, payload, version, peerid) = result;
if let Err(_err) = subscriber.try_handle(&payload, version, command, peerid) {
connections.remove(peerid);
2016-10-13 06:24:37 -07:00
}
Ok(())
}).then(|_| {
finished(())
});
let pool_work = self.pool.spawn(incoming_future);
self.event_loop_handle.spawn(pool_work);
}
pub fn broadcast<T>(&self, payload: T) where T: Payload {
2016-10-13 00:59:09 -07:00
Connections::broadcast(&self.connections, &self.event_loop_handle, &self.pool, payload)
}
2016-10-12 05:30:50 -07:00
}