p2p listen
This commit is contained in:
parent
25c7d20c1e
commit
850ad58e5d
|
@ -1,5 +1,10 @@
|
|||
use futures::{empty, Empty};
|
||||
use tokio_core::reactor::Core;
|
||||
|
||||
pub fn event_loop() -> Core {
|
||||
Core::new().unwrap()
|
||||
}
|
||||
|
||||
pub fn forever() -> Empty<(), ()> {
|
||||
empty()
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ pub const USER_AGENT: &'static str = "pbtc";
|
|||
pub use primitives::{hash, bytes};
|
||||
|
||||
pub use config::Config;
|
||||
pub use event_loop::event_loop;
|
||||
pub use event_loop::{event_loop, forever};
|
||||
pub use run::run;
|
||||
pub use p2p::P2P;
|
||||
|
||||
|
|
|
@ -40,6 +40,7 @@ impl Future for Connect {
|
|||
type Error = io::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
println!("first poll");
|
||||
let (next, result) = match self.state {
|
||||
ConnectState::TcpConnect { ref mut future, ref mut version } => {
|
||||
let stream = try_ready!(future.poll());
|
||||
|
|
|
@ -2,7 +2,7 @@ use std::sync::Arc;
|
|||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::collections::HashMap;
|
||||
use parking_lot::RwLock;
|
||||
use futures::{finished, Oneshot, Future};
|
||||
use futures::{finished, Future};
|
||||
use futures_cpupool::CpuPool;
|
||||
use tokio_core::reactor::Handle;
|
||||
use message::PayloadType;
|
||||
|
@ -60,6 +60,7 @@ impl Connections {
|
|||
|
||||
/// Stores new channel.
|
||||
pub fn store(&self, connection: Connection) {
|
||||
println!("new connection!");
|
||||
let id = self.peer_counter.fetch_add(1, Ordering::AcqRel);
|
||||
self.channels.write().insert(id, Arc::new(connection));
|
||||
}
|
||||
|
|
|
@ -46,6 +46,7 @@ impl P2P {
|
|||
let connections = self.connections.clone();
|
||||
let connection = connect(&socket, &self.event_loop_handle, &self.config.connection);
|
||||
let pool_work = self.pool.spawn(connection).then(move |x| {
|
||||
println!("trie to connect");
|
||||
if let Ok(Ok(con)) = x {
|
||||
connections.store(con);
|
||||
}
|
||||
|
@ -55,14 +56,17 @@ impl P2P {
|
|||
}
|
||||
|
||||
fn listen(&self) -> Result<(), io::Error> {
|
||||
unimplemented!();
|
||||
//let listen = try!(listen(&self.event_loop_handle, self.config.connection));
|
||||
//let server = listen.then(|x| {
|
||||
//if let Ok(Ok(con)) = x {
|
||||
//self.connections.store(con)
|
||||
//}
|
||||
//finished(())
|
||||
//});
|
||||
//self.event_loop_handle.spawn(server);
|
||||
let listen = try!(listen(&self.event_loop_handle, self.config.connection.clone()));
|
||||
let connections = self.connections.clone();
|
||||
let server = listen.for_each(move |x| {
|
||||
if let Ok(con) = x {
|
||||
connections.store(con)
|
||||
}
|
||||
Ok(())
|
||||
}).then(|_| {
|
||||
finished(())
|
||||
});
|
||||
self.event_loop_handle.spawn(server);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ extern crate p2p;
|
|||
mod config;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use p2p::{net, event_loop};
|
||||
use p2p::{P2P, event_loop, forever, net};
|
||||
|
||||
fn main() {
|
||||
match run() {
|
||||
|
@ -41,9 +41,9 @@ fn run() -> Result<(), String> {
|
|||
limited_connect: cfg.connect.map_or(None, |x| Some(vec![x])),
|
||||
};
|
||||
|
||||
let handle = el.handle();
|
||||
let server = try!(p2p::run(p2p_cfg, &handle).map_err(|_| "Failed to start p2p module"));
|
||||
el.run(server).unwrap();
|
||||
let p2p = P2P::new(p2p_cfg, el.handle());
|
||||
try!(p2p.run().map_err(|_| "Failed to start p2p module"));
|
||||
el.run(forever()).unwrap();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue