use tokio_io::AsyncWrite in p2p lib

This commit is contained in:
debris 2017-03-25 16:20:45 +07:00
parent acb1cbdfd2
commit 35c74d8948
7 changed files with 33 additions and 16 deletions

View File

@ -1,12 +1,12 @@
use std::{io, cmp};
use futures::{Future, Poll, Async};
use tokio_io::AsyncRead;
use tokio_io::{AsyncRead, AsyncWrite};
use message::{Message, MessageResult, Error};
use message::types::{Version, Verack};
use network::Magic;
use io::{write_message, WriteMessage, ReadMessage, read_message};
pub fn handshake<A>(a: A, magic: Magic, version: Version, min_version: u32) -> Handshake<A> where A: io::Write + AsyncRead {
pub fn handshake<A>(a: A, magic: Magic, version: Version, min_version: u32) -> Handshake<A> where A: AsyncWrite + AsyncRead {
Handshake {
version: version.version(),
state: HandshakeState::SendVersion(write_message(a, version_message(magic, version))),
@ -15,7 +15,7 @@ pub fn handshake<A>(a: A, magic: Magic, version: Version, min_version: u32) -> H
}
}
pub fn accept_handshake<A>(a: A, magic: Magic, version: Version, min_version: u32) -> AcceptHandshake<A> where A: io::Write + AsyncRead {
pub fn accept_handshake<A>(a: A, magic: Magic, version: Version, min_version: u32) -> AcceptHandshake<A> where A: AsyncWrite + AsyncRead {
AcceptHandshake {
version: version.version(),
state: AcceptHandshakeState::ReceiveVersion {
@ -85,7 +85,7 @@ pub struct AcceptHandshake<A> {
min_version: u32,
}
impl<A> Future for Handshake<A> where A: AsyncRead + io::Write {
impl<A> Future for Handshake<A> where A: AsyncRead + AsyncWrite {
type Item = (A, MessageResult<HandshakeResult>);
type Error = io::Error;
@ -136,7 +136,7 @@ impl<A> Future for Handshake<A> where A: AsyncRead + io::Write {
}
}
impl<A> Future for AcceptHandshake<A> where A: AsyncRead + io::Write {
impl<A> Future for AcceptHandshake<A> where A: AsyncRead + AsyncWrite {
type Item = (A, MessageResult<HandshakeResult>);
type Error = io::Error;
@ -197,7 +197,8 @@ impl<A> Future for AcceptHandshake<A> where A: AsyncRead + io::Write {
#[cfg(test)]
mod tests {
use std::io;
use futures::Future;
use futures::{Future, Poll};
use tokio_io::{AsyncRead, AsyncWrite};
use bytes::Bytes;
use ser::Stream;
use network::Magic;
@ -217,6 +218,8 @@ mod tests {
}
}
impl AsyncRead for TestIo {}
impl io::Write for TestIo {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
io::Write::write(&mut self.write, buf)
@ -227,6 +230,12 @@ mod tests {
}
}
impl AsyncWrite for TestIo {
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
}
fn local_version() -> Version {
Version::V70001(V0 {
version: 70001,

View File

@ -1,6 +1,5 @@
use std::io;
use futures::{Future, Poll, Async};
//use tokio_core::io::{ReadExact, read_exact};
use tokio_io::AsyncRead;
use tokio_io::io::{ReadExact, read_exact};
use message::{MessageHeader, MessageResult};

View File

@ -1,14 +1,15 @@
use std::io;
use std::marker::PhantomData;
use futures::{Poll, Future};
use tokio_core::io::{read_exact, ReadExact};
use tokio_io::AsyncRead;
use tokio_io::io::{read_exact, ReadExact};
use bytes::Bytes;
use hash::H32;
use crypto::checksum;
use message::{Error, MessageResult, Payload, deserialize_payload};
pub fn read_payload<M, A>(a: A, version: u32, len: usize, checksum: H32) -> ReadPayload<M, A>
where A: io::Read, M: Payload {
where A: AsyncRead, M: Payload {
ReadPayload {
reader: read_exact(a, Bytes::new_with_len(len)),
version: version,
@ -24,7 +25,7 @@ pub struct ReadPayload<M, A> {
payload_type: PhantomData<M>,
}
impl<M, A> Future for ReadPayload<M, A> where A: io::Read, M: Payload {
impl<M, A> Future for ReadPayload<M, A> where A: AsyncRead, M: Payload {
type Item = (A, MessageResult<M>);
type Error = io::Error;

View File

@ -1,7 +1,8 @@
use std::sync::Arc;
use std::net::Shutdown;
use std::io::{Read, Write, Error};
use tokio_io::AsyncRead;
use futures::Poll;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::net::TcpStream;
pub struct SharedTcpStream {
@ -35,6 +36,12 @@ impl Read for SharedTcpStream {
impl AsyncRead for SharedTcpStream {}
impl AsyncWrite for SharedTcpStream {
fn shutdown(&mut self) -> Poll<(), Error> {
self.io.shutdown(Shutdown::Both).map(Into::into)
}
}
impl Write for SharedTcpStream {
fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
Write::write(&mut (&*self.io as &TcpStream), buf)

View File

@ -1,9 +1,10 @@
use std::io;
use futures::{Future, Poll};
use tokio_core::io::{WriteAll, write_all};
use tokio_io::AsyncWrite;
use tokio_io::io::{WriteAll, write_all};
use message::Message;
pub fn write_message<M, A>(a: A, message: Message<M>) -> WriteMessage<M, A> where A: io::Write {
pub fn write_message<M, A>(a: A, message: Message<M>) -> WriteMessage<M, A> where A: AsyncWrite {
WriteMessage {
future: write_all(a, message),
}
@ -13,7 +14,7 @@ pub struct WriteMessage<M, A> {
future: WriteAll<A, Message<M>>,
}
impl<M, A> Future for WriteMessage<M, A> where A: io::Write {
impl<M, A> Future for WriteMessage<M, A> where A: AsyncWrite {
type Item = (A, Message<M>);
type Error = io::Error;

View File

@ -1,4 +1,4 @@
use tokio_core::io::{write_all, WriteAll};
use tokio_io::io::{write_all, WriteAll};
use session::Session;
use io::{SharedTcpStream, read_any_message, ReadAnyMessage};
use util::PeerInfo;

View File

@ -5,7 +5,7 @@ use parking_lot::RwLock;
use futures::{Future, finished, failed, BoxFuture};
use futures::stream::Stream;
use futures_cpupool::CpuPool;
use tokio_core::io::IoFuture;
use tokio_io::IoFuture;
use tokio_core::net::{TcpListener, TcpStream};
use tokio_core::reactor::{Handle, Remote, Timeout, Interval};
use abstract_ns::Resolver;