Merge pull request #476 from paritytech/fix_future_warnings

Fixed BoxFuture-related warnings
This commit is contained in:
Robert Habermeier 2017-12-28 16:12:43 +01:00 committed by GitHub
commit 90e3767fcf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 39 additions and 43 deletions

View File

@ -1,14 +1,14 @@
use std::io;
use std::time::Duration;
use futures::{Future, Select, BoxFuture, Poll, Async};
use futures::{Future, Select, Poll, Async};
use tokio_core::reactor::{Handle, Timeout};
type DeadlineBox<F> = BoxFuture<DeadlineStatus<<F as Future>::Item>, <F as Future>::Error>;
type DeadlineBox<F> = Box<Future<Item = DeadlineStatus<<F as Future>::Item>, Error = <F as Future>::Error> + Send>;
pub fn deadline<F, T>(duration: Duration, handle: &Handle, future: F) -> Result<Deadline<F>, io::Error>
where F: Future<Item = T, Error = io::Error> + Send + 'static, T: 'static {
let timeout = try!(Timeout::new(duration, handle)).map(|_| DeadlineStatus::Timeout).boxed();
let future = future.map(DeadlineStatus::Meet).boxed();
let timeout: DeadlineBox<F> = Box::new(try!(Timeout::new(duration, handle)).map(|_| DeadlineStatus::Timeout));
let future: DeadlineBox<F> = Box::new(future.map(DeadlineStatus::Meet));
let deadline = Deadline {
future: timeout.select(future),
};
@ -20,11 +20,11 @@ pub enum DeadlineStatus<T> {
Timeout,
}
pub struct Deadline<F> where F: Future {
pub struct Deadline<F> where F: Future + Send {
future: Select<DeadlineBox<F>, DeadlineBox<F>>,
}
impl<F, T> Future for Deadline<F> where F: Future<Item = T, Error = io::Error> {
impl<F, T> Future for Deadline<F> where F: Future<Item = T, Error = io::Error> + Send {
type Item = DeadlineStatus<T>;
type Error = io::Error;

View File

@ -2,7 +2,7 @@ use std::{io, net, error, time};
use std::sync::Arc;
use std::net::SocketAddr;
use parking_lot::RwLock;
use futures::{Future, finished, failed, BoxFuture};
use futures::{Future, finished, failed};
use futures::stream::Stream;
use futures_cpupool::CpuPool;
use tokio_io::IoFuture;
@ -20,7 +20,7 @@ use {Config, PeerId};
use protocol::{LocalSyncNodeRef, InboundSyncConnectionRef, OutboundSyncConnectionRef};
use io::DeadlineStatus;
pub type BoxedEmptyFuture = BoxFuture<(), ()>;
pub type BoxedEmptyFuture = Box<Future<Item=(), Error=()> + Send>;
/// Network context.
pub struct Context {
@ -113,7 +113,7 @@ impl Context {
pub fn autoconnect(context: Arc<Context>, handle: &Handle) {
let c = context.clone();
// every 10 seconds connect to new peers (if needed)
let interval: BoxedEmptyFuture = Interval::new_at(time::Instant::now(), time::Duration::new(10, 0), handle).expect("Failed to create interval")
let interval: BoxedEmptyFuture = Box::new(Interval::new_at(time::Instant::now(), time::Duration::new(10, 0), handle).expect("Failed to create interval")
.and_then(move |_| {
// print traces
let ic = context.connection_counter.inbound_connections();
@ -147,8 +147,7 @@ impl Context {
Ok(())
})
.for_each(|_| Ok(()))
.then(|_| finished(()))
.boxed();
.then(|_| finished(())));
c.spawn(interval);
}
@ -156,7 +155,7 @@ impl Context {
fn connect_future<T>(context: Arc<Context>, socket: net::SocketAddr, handle: &Handle, config: &NetConfig) -> BoxedEmptyFuture where T: SessionFactory {
trace!("Trying to connect to: {}", socket);
let connection = connect(&socket, handle, config);
connection.then(move |result| {
Box::new(connection.then(move |result| {
match result {
Ok(DeadlineStatus::Meet(Ok(connection))) => {
// successfull hanshake
@ -174,7 +173,7 @@ impl Context {
// TODO: close socket
context.node_table.write().note_failure(&socket);
context.connection_counter.note_close_outbound_connection();
finished(Ok(())).boxed()
Box::new(finished(Ok(())))
},
Ok(DeadlineStatus::Timeout) => {
// connection time out
@ -182,19 +181,18 @@ impl Context {
// TODO: close socket
context.node_table.write().note_failure(&socket);
context.connection_counter.note_close_outbound_connection();
finished(Ok(())).boxed()
Box::new(finished(Ok(())))
},
Err(_) => {
// network error
trace!("Unable to connect to {}", socket);
context.node_table.write().note_failure(&socket);
context.connection_counter.note_close_outbound_connection();
finished(Ok(())).boxed()
Box::new(finished(Ok(())))
}
}
})
.then(|_| finished(()))
.boxed()
.then(|_| finished(())))
}
/// Connect to socket using given context.
@ -211,7 +209,7 @@ impl Context {
}
pub fn accept_connection_future(context: Arc<Context>, stream: TcpStream, socket: net::SocketAddr, handle: &Handle, config: NetConfig) -> BoxedEmptyFuture {
accept_connection(stream, handle, &config, socket).then(move |result| {
Box::new(accept_connection(stream, handle, &config, socket).then(move |result| {
match result {
Ok(DeadlineStatus::Meet(Ok(connection))) => {
// successfull hanshake
@ -229,7 +227,7 @@ impl Context {
// TODO: close socket
context.node_table.write().note_failure(&socket);
context.connection_counter.note_close_inbound_connection();
finished(Ok(())).boxed()
Box::new(finished(Ok(())))
},
Ok(DeadlineStatus::Timeout) => {
// connection time out
@ -237,19 +235,18 @@ impl Context {
// TODO: close socket
context.node_table.write().note_failure(&socket);
context.connection_counter.note_close_inbound_connection();
finished(Ok(())).boxed()
Box::new(finished(Ok(())))
},
Err(_) => {
// network error
trace!("Accepting handshake from {} failed with network error", socket);
context.node_table.write().note_failure(&socket);
context.connection_counter.note_close_inbound_connection();
finished(Ok(())).boxed()
Box::new(finished(Ok(())))
}
}
})
.then(|_| finished(()))
.boxed()
.then(|_| finished(())))
}
pub fn accept_connection(context: Arc<Context>, stream: TcpStream, socket: net::SocketAddr, config: NetConfig) {
@ -263,7 +260,7 @@ impl Context {
pub fn listen(context: Arc<Context>, handle: &Handle, config: NetConfig) -> Result<BoxedEmptyFuture, io::Error> {
trace!("Starting tcp server");
let server = try!(TcpListener::bind(&config.local_address, handle));
let server = server.incoming()
let server = Box::new(server.incoming()
.and_then(move |(stream, socket)| {
// because we acquire atomic value twice,
// it may happen that accept slightly more connections than we need
@ -277,14 +274,13 @@ impl Context {
Ok(())
})
.for_each(|_| Ok(()))
.then(|_| finished(()))
.boxed();
.then(|_| finished(())));
Ok(server)
}
/// Called on incomming mesage.
pub fn on_message(context: Arc<Context>, channel: Arc<Channel>) -> IoFuture<MessageResult<()>> {
channel.read_message().then(move |result| {
Box::new(channel.read_message().then(move |result| {
match result {
Ok(Ok((command, payload))) => {
// successful read
@ -295,28 +291,28 @@ impl Context {
context.node_table.write().note_used(&channel.peer_info().address);
let on_message = Context::on_message(context.clone(), channel);
context.spawn(on_message);
finished(Ok(())).boxed()
Box::new(finished(Ok(())))
},
Err(err) => {
// protocol error
context.close_channel_with_error(channel.peer_info().id, &err);
finished(Err(err)).boxed()
Box::new(finished(Err(err)))
}
}
},
Ok(Err(err)) => {
// protocol error
context.close_channel_with_error(channel.peer_info().id, &err);
finished(Err(err)).boxed()
Box::new(finished(Err(err)))
},
Err(err) => {
// network error
// TODO: remote node was just turned off. should we mark it as not reliable?
context.close_channel_with_error(channel.peer_info().id, &err);
failed(err).boxed()
Box::new(failed(err))
}
}
}).boxed()
}))
}
/// Send message to a channel with given peer id.
@ -331,7 +327,7 @@ impl Context {
None => {
// peer no longer exists.
// TODO: should we return error here?
finished(()).boxed()
Box::new(finished(()))
}
}
}
@ -342,7 +338,7 @@ impl Context {
None => {
// peer no longer exists.
// TODO: should we return error here?
finished(()).boxed()
Box::new(finished(()))
}
}
}
@ -350,20 +346,20 @@ impl Context {
/// Send message using given channel.
pub fn send<T>(_context: Arc<Context>, channel: Arc<Channel>, message: T) -> IoFuture<()> where T: AsRef<[u8]> + Send + 'static {
//trace!("Sending {} message to {}", T::command(), channel.peer_info().address);
channel.write_message(message).then(move |result| {
Box::new(channel.write_message(message).then(move |result| {
match result {
Ok(_) => {
// successful send
//trace!("Sent {} message to {}", T::command(), channel.peer_info().address);
finished(()).boxed()
Box::new(finished(()))
},
Err(err) => {
// network error
// closing connection is handled in on_message`
failed(err).boxed()
Box::new(failed(err))
},
}
}).boxed()
}))
}
/// Close channel with given peer info.

View File

@ -1,7 +1,7 @@
use std::sync::Arc;
use parking_lot::{Mutex, Condvar};
use time;
use futures::{Future, lazy, finished};
use futures::{lazy, finished};
use chain::{Transaction, IndexedTransaction, IndexedBlock};
use message::types;
use miner::BlockAssembler;
@ -163,8 +163,8 @@ impl<T, U, V> LocalNode<T, U, V> where T: TaskExecutor, U: Server, V: Client {
let lazy_server_task = lazy(move || {
server.upgrade().map(|s| s.execute(server_task));
finished::<(), ()>(())
}).boxed();
self.client.after_peer_nearly_blocks_verified(peer_index, lazy_server_task);
});
self.client.after_peer_nearly_blocks_verified(peer_index, Box::new(lazy_server_task));
}
/// When peer is requesting for memory pool contents

View File

@ -1,5 +1,5 @@
use std::sync::Arc;
use futures::BoxFuture;
use futures::Future;
use parking_lot::{Mutex, RwLock};
use db;
use local_node::LocalNode;
@ -21,7 +21,7 @@ pub type RequestId = u32;
pub type PeerIndex = usize;
// No-error, no-result future
pub type EmptyBoxFuture = BoxFuture<(), ()>;
pub type EmptyBoxFuture = Box<Future<Item=(), Error=()> + Send>;
/// Reference to storage
pub type StorageRef = db::SharedStore;