connections are accepted using pool

This commit is contained in:
debris 2016-10-13 09:59:09 +02:00
parent 67309678c0
commit 2ea0310190
2 changed files with 14 additions and 12 deletions

View File

@ -10,17 +10,13 @@ use net::Connection;
use PeerId;
pub struct Connections {
event_loop_handle: Handle,
pool: CpuPool,
peer_counter: AtomicUsize,
channels: RwLock<HashMap<PeerId, Arc<Connection>>>,
}
impl Connections {
pub fn new(pool: CpuPool, handle: Handle) -> Self {
pub fn new() -> Self {
Connections {
event_loop_handle: handle,
pool: pool,
peer_counter: AtomicUsize::default(),
channels: RwLock::default(),
}
@ -28,12 +24,12 @@ impl Connections {
/// Broadcast messages to the network.
/// Returned future completes of first confirmed receive.
pub fn broadcast<T>(connections: &Arc<Connections>, payload: T) where T: PayloadType {
pub fn broadcast<T>(connections: &Arc<Connections>, handle: &Handle, pool: &CpuPool, payload: T) where T: PayloadType {
let channels = connections.channels();
for (id, channel) in channels.into_iter() {
let write = channel.write_message(&payload);
let cs = connections.clone();
let pool_work = connections.pool.spawn(write).then(move |x| {
let pool_work = pool.spawn(write).then(move |x| {
match x {
Ok(_) => {
// successfully sent message
@ -44,7 +40,7 @@ impl Connections {
}
finished(())
});
connections.event_loop_handle.spawn(pool_work);
handle.spawn(pool_work);
}
}

View File

@ -4,6 +4,7 @@ use futures::{Future, finished};
use futures::stream::Stream;
use futures_cpupool::CpuPool;
use tokio_core::reactor::Handle;
use message::PayloadType;
use net::{connect, listen, Connections, Subscriber};
use Config;
@ -28,7 +29,7 @@ impl P2P {
event_loop_handle: handle.clone(),
pool: pool.clone(),
config: config,
connections: Arc::new(Connections::new(pool, handle)),
connections: Arc::new(Connections::new()),
subscriber: Subscriber::default(),
}
}
@ -41,7 +42,7 @@ impl P2P {
self.listen()
}
fn connect(&self, ip: net::IpAddr) {
pub fn connect(&self, ip: net::IpAddr) {
let socket = net::SocketAddr::new(ip, self.config.connection.magic.port());
let connections = self.connections.clone();
let connection = connect(&socket, &self.event_loop_handle, &self.config.connection);
@ -59,13 +60,18 @@ impl P2P {
let connections = self.connections.clone();
let server = listen.for_each(move |x| {
if let Ok(con) = x {
connections.store(con)
connections.store(con);
}
Ok(())
}).then(|_| {
finished(())
});
self.event_loop_handle.spawn(server);
let pool_work = self.pool.spawn(server);
self.event_loop_handle.spawn(pool_work);
Ok(())
}
pub fn broadcast<T>(&self, payload: T) where T: PayloadType {
Connections::broadcast(&self.connections, &self.event_loop_handle, &self.pool, payload)
}
}