2016-10-12 10:39:50 -07:00
|
|
|
use std::{io, net};
|
|
|
|
use std::sync::Arc;
|
|
|
|
use futures::{Future, finished};
|
|
|
|
use futures::stream::Stream;
|
|
|
|
use futures_cpupool::CpuPool;
|
|
|
|
use tokio_core::reactor::Handle;
|
2016-10-13 00:59:09 -07:00
|
|
|
use message::PayloadType;
|
2016-10-13 06:24:37 -07:00
|
|
|
use net::{connect, listen, Connections, Subscriber, MessagesHandler};
|
2016-10-12 10:39:50 -07:00
|
|
|
use Config;
|
2016-10-12 05:30:50 -07:00
|
|
|
|
|
|
|
pub struct P2P {
|
2016-10-12 10:39:50 -07:00
|
|
|
/// Global event loop handle.
|
|
|
|
event_loop_handle: Handle,
|
|
|
|
/// Worker thread pool.
|
|
|
|
pool: CpuPool,
|
|
|
|
/// P2P config.
|
|
|
|
config: Config,
|
|
|
|
/// Connections.
|
|
|
|
connections: Arc<Connections>,
|
|
|
|
/// Message subscriber.
|
2016-10-13 06:24:37 -07:00
|
|
|
subscriber: Arc<Subscriber>,
|
2016-10-12 05:30:50 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
impl P2P {
|
2016-10-12 10:39:50 -07:00
|
|
|
pub fn new(config: Config, handle: Handle) -> Self {
|
|
|
|
let pool = CpuPool::new(4);
|
|
|
|
|
2016-10-12 05:30:50 -07:00
|
|
|
P2P {
|
2016-10-12 10:39:50 -07:00
|
|
|
event_loop_handle: handle.clone(),
|
|
|
|
pool: pool.clone(),
|
|
|
|
config: config,
|
2016-10-13 00:59:09 -07:00
|
|
|
connections: Arc::new(Connections::new()),
|
2016-10-13 06:24:37 -07:00
|
|
|
subscriber: Arc::new(Subscriber::default()),
|
2016-10-12 05:30:50 -07:00
|
|
|
}
|
|
|
|
}
|
2016-10-12 10:39:50 -07:00
|
|
|
|
|
|
|
pub fn run(&self) -> Result<(), io::Error> {
|
|
|
|
for seednode in self.config.seednodes.iter() {
|
|
|
|
self.connect(*seednode)
|
|
|
|
}
|
|
|
|
|
2016-10-13 06:24:37 -07:00
|
|
|
try!(self.listen());
|
|
|
|
self.handle_messages();
|
|
|
|
Ok(())
|
2016-10-12 10:39:50 -07:00
|
|
|
}
|
|
|
|
|
2016-10-13 00:59:09 -07:00
|
|
|
pub fn connect(&self, ip: net::IpAddr) {
|
2016-10-12 10:39:50 -07:00
|
|
|
let socket = net::SocketAddr::new(ip, self.config.connection.magic.port());
|
|
|
|
let connections = self.connections.clone();
|
|
|
|
let connection = connect(&socket, &self.event_loop_handle, &self.config.connection);
|
|
|
|
let pool_work = self.pool.spawn(connection).then(move |x| {
|
|
|
|
if let Ok(Ok(con)) = x {
|
|
|
|
connections.store(con);
|
|
|
|
}
|
|
|
|
finished(())
|
|
|
|
});
|
|
|
|
self.event_loop_handle.spawn(pool_work);
|
|
|
|
}
|
|
|
|
|
|
|
|
fn listen(&self) -> Result<(), io::Error> {
|
2016-10-13 00:17:29 -07:00
|
|
|
let listen = try!(listen(&self.event_loop_handle, self.config.connection.clone()));
|
|
|
|
let connections = self.connections.clone();
|
|
|
|
let server = listen.for_each(move |x| {
|
|
|
|
if let Ok(con) = x {
|
2016-10-13 00:59:09 -07:00
|
|
|
connections.store(con);
|
2016-10-13 00:17:29 -07:00
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}).then(|_| {
|
|
|
|
finished(())
|
|
|
|
});
|
2016-10-13 00:59:09 -07:00
|
|
|
let pool_work = self.pool.spawn(server);
|
|
|
|
self.event_loop_handle.spawn(pool_work);
|
2016-10-13 00:17:29 -07:00
|
|
|
Ok(())
|
2016-10-12 10:39:50 -07:00
|
|
|
}
|
2016-10-13 00:59:09 -07:00
|
|
|
|
2016-10-13 06:24:37 -07:00
|
|
|
fn handle_messages(&self) {
|
|
|
|
let incoming = MessagesHandler::new(Arc::downgrade(&self.connections));
|
|
|
|
let subscriber = self.subscriber.clone();
|
|
|
|
let connections = self.connections.clone();
|
|
|
|
let incoming_future = incoming.for_each(move |result| {
|
|
|
|
match result {
|
|
|
|
(Ok((command, payload)), version, peerid) => {
|
|
|
|
let handled = subscriber.try_handle(&payload, version, command, peerid);
|
|
|
|
if let Err(err) = handled {
|
|
|
|
connections.remove(peerid);
|
|
|
|
}
|
|
|
|
},
|
|
|
|
(Err(err), version, peerid) => {
|
|
|
|
connections.remove(peerid);
|
|
|
|
},
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}).then(|_| {
|
|
|
|
finished(())
|
|
|
|
});
|
|
|
|
let pool_work = self.pool.spawn(incoming_future);
|
|
|
|
self.event_loop_handle.spawn(pool_work);
|
|
|
|
}
|
|
|
|
|
2016-10-13 00:59:09 -07:00
|
|
|
pub fn broadcast<T>(&self, payload: T) where T: PayloadType {
|
|
|
|
Connections::broadcast(&self.connections, &self.event_loop_handle, &self.pool, payload)
|
|
|
|
}
|
2016-10-12 05:30:50 -07:00
|
|
|
}
|