From 2ea031019089766880254c181a92f1feb305f4d0 Mon Sep 17 00:00:00 2001 From: debris Date: Thu, 13 Oct 2016 09:59:09 +0200 Subject: [PATCH] connections are accepted using pool --- p2p/src/net/connections.rs | 12 ++++-------- p2p/src/p2p.rs | 14 ++++++++++---- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/p2p/src/net/connections.rs b/p2p/src/net/connections.rs index c0f2b245..110d1e4d 100644 --- a/p2p/src/net/connections.rs +++ b/p2p/src/net/connections.rs @@ -10,17 +10,13 @@ use net::Connection; use PeerId; pub struct Connections { - event_loop_handle: Handle, - pool: CpuPool, peer_counter: AtomicUsize, channels: RwLock>>, } impl Connections { - pub fn new(pool: CpuPool, handle: Handle) -> Self { + pub fn new() -> Self { Connections { - event_loop_handle: handle, - pool: pool, peer_counter: AtomicUsize::default(), channels: RwLock::default(), } @@ -28,12 +24,12 @@ impl Connections { /// Broadcast messages to the network. /// Returned future completes of first confirmed receive. - pub fn broadcast(connections: &Arc, payload: T) where T: PayloadType { + pub fn broadcast(connections: &Arc, handle: &Handle, pool: &CpuPool, payload: T) where T: PayloadType { let channels = connections.channels(); for (id, channel) in channels.into_iter() { let write = channel.write_message(&payload); let cs = connections.clone(); - let pool_work = connections.pool.spawn(write).then(move |x| { + let pool_work = pool.spawn(write).then(move |x| { match x { Ok(_) => { // successfully sent message @@ -44,7 +40,7 @@ impl Connections { } finished(()) }); - connections.event_loop_handle.spawn(pool_work); + handle.spawn(pool_work); } } diff --git a/p2p/src/p2p.rs b/p2p/src/p2p.rs index 17164afb..8b650c42 100644 --- a/p2p/src/p2p.rs +++ b/p2p/src/p2p.rs @@ -4,6 +4,7 @@ use futures::{Future, finished}; use futures::stream::Stream; use futures_cpupool::CpuPool; use tokio_core::reactor::Handle; +use message::PayloadType; use net::{connect, listen, Connections, Subscriber}; use Config; @@ -28,7 +29,7 @@ impl P2P { event_loop_handle: handle.clone(), pool: pool.clone(), config: config, - connections: Arc::new(Connections::new(pool, handle)), + connections: Arc::new(Connections::new()), subscriber: Subscriber::default(), } } @@ -41,7 +42,7 @@ impl P2P { self.listen() } - fn connect(&self, ip: net::IpAddr) { + pub fn connect(&self, ip: net::IpAddr) { let socket = net::SocketAddr::new(ip, self.config.connection.magic.port()); let connections = self.connections.clone(); let connection = connect(&socket, &self.event_loop_handle, &self.config.connection); @@ -59,13 +60,18 @@ impl P2P { let connections = self.connections.clone(); let server = listen.for_each(move |x| { if let Ok(con) = x { - connections.store(con) + connections.store(con); } Ok(()) }).then(|_| { finished(()) }); - self.event_loop_handle.spawn(server); + let pool_work = self.pool.spawn(server); + self.event_loop_handle.spawn(pool_work); Ok(()) } + + pub fn broadcast(&self, payload: T) where T: PayloadType { + Connections::broadcast(&self.connections, &self.event_loop_handle, &self.pool, payload) + } }