diff --git a/p2p/src/io/deadline.rs b/p2p/src/io/deadline.rs index d35f11e7..f8a5a5a0 100644 --- a/p2p/src/io/deadline.rs +++ b/p2p/src/io/deadline.rs @@ -1,14 +1,14 @@ use std::io; use std::time::Duration; -use futures::{Future, Select, BoxFuture, Poll, Async}; +use futures::{Future, Select, Poll, Async}; use tokio_core::reactor::{Handle, Timeout}; -type DeadlineBox = BoxFuture::Item>, ::Error>; +type DeadlineBox = Box::Item>, Error = ::Error> + Send>; pub fn deadline(duration: Duration, handle: &Handle, future: F) -> Result, io::Error> where F: Future + Send + 'static, T: 'static { - let timeout = try!(Timeout::new(duration, handle)).map(|_| DeadlineStatus::Timeout).boxed(); - let future = future.map(DeadlineStatus::Meet).boxed(); + let timeout: DeadlineBox = Box::new(try!(Timeout::new(duration, handle)).map(|_| DeadlineStatus::Timeout)); + let future: DeadlineBox = Box::new(future.map(DeadlineStatus::Meet)); let deadline = Deadline { future: timeout.select(future), }; @@ -20,11 +20,11 @@ pub enum DeadlineStatus { Timeout, } -pub struct Deadline where F: Future { +pub struct Deadline where F: Future + Send { future: Select, DeadlineBox>, } -impl Future for Deadline where F: Future { +impl Future for Deadline where F: Future + Send { type Item = DeadlineStatus; type Error = io::Error; diff --git a/p2p/src/p2p.rs b/p2p/src/p2p.rs index d9dd7c0b..1c4bc5d4 100644 --- a/p2p/src/p2p.rs +++ b/p2p/src/p2p.rs @@ -2,7 +2,7 @@ use std::{io, net, error, time}; use std::sync::Arc; use std::net::SocketAddr; use parking_lot::RwLock; -use futures::{Future, finished, failed, BoxFuture}; +use futures::{Future, finished, failed}; use futures::stream::Stream; use futures_cpupool::CpuPool; use tokio_io::IoFuture; @@ -20,7 +20,7 @@ use {Config, PeerId}; use protocol::{LocalSyncNodeRef, InboundSyncConnectionRef, OutboundSyncConnectionRef}; use io::DeadlineStatus; -pub type BoxedEmptyFuture = BoxFuture<(), ()>; +pub type BoxedEmptyFuture = Box + Send>; /// Network context. pub struct Context { @@ -113,7 +113,7 @@ impl Context { pub fn autoconnect(context: Arc, handle: &Handle) { let c = context.clone(); // every 10 seconds connect to new peers (if needed) - let interval: BoxedEmptyFuture = Interval::new_at(time::Instant::now(), time::Duration::new(10, 0), handle).expect("Failed to create interval") + let interval: BoxedEmptyFuture = Box::new(Interval::new_at(time::Instant::now(), time::Duration::new(10, 0), handle).expect("Failed to create interval") .and_then(move |_| { // print traces let ic = context.connection_counter.inbound_connections(); @@ -147,8 +147,7 @@ impl Context { Ok(()) }) .for_each(|_| Ok(())) - .then(|_| finished(())) - .boxed(); + .then(|_| finished(()))); c.spawn(interval); } @@ -156,7 +155,7 @@ impl Context { fn connect_future(context: Arc, socket: net::SocketAddr, handle: &Handle, config: &NetConfig) -> BoxedEmptyFuture where T: SessionFactory { trace!("Trying to connect to: {}", socket); let connection = connect(&socket, handle, config); - connection.then(move |result| { + Box::new(connection.then(move |result| { match result { Ok(DeadlineStatus::Meet(Ok(connection))) => { // successfull hanshake @@ -174,7 +173,7 @@ impl Context { // TODO: close socket context.node_table.write().note_failure(&socket); context.connection_counter.note_close_outbound_connection(); - finished(Ok(())).boxed() + Box::new(finished(Ok(()))) }, Ok(DeadlineStatus::Timeout) => { // connection time out @@ -182,19 +181,18 @@ impl Context { // TODO: close socket context.node_table.write().note_failure(&socket); context.connection_counter.note_close_outbound_connection(); - finished(Ok(())).boxed() + Box::new(finished(Ok(()))) }, Err(_) => { // network error trace!("Unable to connect to {}", socket); context.node_table.write().note_failure(&socket); context.connection_counter.note_close_outbound_connection(); - finished(Ok(())).boxed() + Box::new(finished(Ok(()))) } } }) - .then(|_| finished(())) - .boxed() + .then(|_| finished(()))) } /// Connect to socket using given context. @@ -211,7 +209,7 @@ impl Context { } pub fn accept_connection_future(context: Arc, stream: TcpStream, socket: net::SocketAddr, handle: &Handle, config: NetConfig) -> BoxedEmptyFuture { - accept_connection(stream, handle, &config, socket).then(move |result| { + Box::new(accept_connection(stream, handle, &config, socket).then(move |result| { match result { Ok(DeadlineStatus::Meet(Ok(connection))) => { // successfull hanshake @@ -229,7 +227,7 @@ impl Context { // TODO: close socket context.node_table.write().note_failure(&socket); context.connection_counter.note_close_inbound_connection(); - finished(Ok(())).boxed() + Box::new(finished(Ok(()))) }, Ok(DeadlineStatus::Timeout) => { // connection time out @@ -237,19 +235,18 @@ impl Context { // TODO: close socket context.node_table.write().note_failure(&socket); context.connection_counter.note_close_inbound_connection(); - finished(Ok(())).boxed() + Box::new(finished(Ok(()))) }, Err(_) => { // network error trace!("Accepting handshake from {} failed with network error", socket); context.node_table.write().note_failure(&socket); context.connection_counter.note_close_inbound_connection(); - finished(Ok(())).boxed() + Box::new(finished(Ok(()))) } } }) - .then(|_| finished(())) - .boxed() + .then(|_| finished(()))) } pub fn accept_connection(context: Arc, stream: TcpStream, socket: net::SocketAddr, config: NetConfig) { @@ -263,7 +260,7 @@ impl Context { pub fn listen(context: Arc, handle: &Handle, config: NetConfig) -> Result { trace!("Starting tcp server"); let server = try!(TcpListener::bind(&config.local_address, handle)); - let server = server.incoming() + let server = Box::new(server.incoming() .and_then(move |(stream, socket)| { // because we acquire atomic value twice, // it may happen that accept slightly more connections than we need @@ -277,14 +274,13 @@ impl Context { Ok(()) }) .for_each(|_| Ok(())) - .then(|_| finished(())) - .boxed(); + .then(|_| finished(()))); Ok(server) } /// Called on incomming mesage. pub fn on_message(context: Arc, channel: Arc) -> IoFuture> { - channel.read_message().then(move |result| { + Box::new(channel.read_message().then(move |result| { match result { Ok(Ok((command, payload))) => { // successful read @@ -295,28 +291,28 @@ impl Context { context.node_table.write().note_used(&channel.peer_info().address); let on_message = Context::on_message(context.clone(), channel); context.spawn(on_message); - finished(Ok(())).boxed() + Box::new(finished(Ok(()))) }, Err(err) => { // protocol error context.close_channel_with_error(channel.peer_info().id, &err); - finished(Err(err)).boxed() + Box::new(finished(Err(err))) } } }, Ok(Err(err)) => { // protocol error context.close_channel_with_error(channel.peer_info().id, &err); - finished(Err(err)).boxed() + Box::new(finished(Err(err))) }, Err(err) => { // network error // TODO: remote node was just turned off. should we mark it as not reliable? context.close_channel_with_error(channel.peer_info().id, &err); - failed(err).boxed() + Box::new(failed(err)) } } - }).boxed() + })) } /// Send message to a channel with given peer id. @@ -331,7 +327,7 @@ impl Context { None => { // peer no longer exists. // TODO: should we return error here? - finished(()).boxed() + Box::new(finished(())) } } } @@ -342,7 +338,7 @@ impl Context { None => { // peer no longer exists. // TODO: should we return error here? - finished(()).boxed() + Box::new(finished(())) } } } @@ -350,20 +346,20 @@ impl Context { /// Send message using given channel. pub fn send(_context: Arc, channel: Arc, message: T) -> IoFuture<()> where T: AsRef<[u8]> + Send + 'static { //trace!("Sending {} message to {}", T::command(), channel.peer_info().address); - channel.write_message(message).then(move |result| { + Box::new(channel.write_message(message).then(move |result| { match result { Ok(_) => { // successful send //trace!("Sent {} message to {}", T::command(), channel.peer_info().address); - finished(()).boxed() + Box::new(finished(())) }, Err(err) => { // network error // closing connection is handled in on_message` - failed(err).boxed() + Box::new(failed(err)) }, } - }).boxed() + })) } /// Close channel with given peer info. diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index 03c4772a..a367fbbf 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use parking_lot::{Mutex, Condvar}; use time; -use futures::{Future, lazy, finished}; +use futures::{lazy, finished}; use chain::{Transaction, IndexedTransaction, IndexedBlock}; use message::types; use miner::BlockAssembler; @@ -163,8 +163,8 @@ impl LocalNode where T: TaskExecutor, U: Server, V: Client { let lazy_server_task = lazy(move || { server.upgrade().map(|s| s.execute(server_task)); finished::<(), ()>(()) - }).boxed(); - self.client.after_peer_nearly_blocks_verified(peer_index, lazy_server_task); + }); + self.client.after_peer_nearly_blocks_verified(peer_index, Box::new(lazy_server_task)); } /// When peer is requesting for memory pool contents diff --git a/sync/src/types.rs b/sync/src/types.rs index 611473a4..0a426011 100644 --- a/sync/src/types.rs +++ b/sync/src/types.rs @@ -1,5 +1,5 @@ use std::sync::Arc; -use futures::BoxFuture; +use futures::Future; use parking_lot::{Mutex, RwLock}; use db; use local_node::LocalNode; @@ -21,7 +21,7 @@ pub type RequestId = u32; pub type PeerIndex = usize; // No-error, no-result future -pub type EmptyBoxFuture = BoxFuture<(), ()>; +pub type EmptyBoxFuture = Box + Send>; /// Reference to storage pub type StorageRef = db::SharedStore;