read_message does return only message

This commit is contained in:
debris 2016-10-06 14:48:29 +02:00
parent 338e198ac5
commit 045b607216
3 changed files with 21 additions and 82 deletions

View File

@ -93,8 +93,8 @@ impl<A> Future for Handshake<A> where A: io::Read + io::Write {
(HandshakeState::ReceiveVersion(read_message(stream, self.magic, 0)), Async::NotReady)
},
HandshakeState::ReceiveVersion(ref mut future) => {
let (stream, message) = try_ready!(future.poll());
let version = match message.payload {
let (stream, payload) = try_ready!(future.poll());
let version = match payload {
Payload::Version(version) => version,
_ => return Err(Error::Handshake),
};
@ -107,8 +107,8 @@ impl<A> Future for Handshake<A> where A: io::Read + io::Write {
(next, Async::NotReady)
},
HandshakeState::ReceiveVerack { ref mut version, ref mut future } => {
let (stream, message) = try_ready!(future.poll());
if message.payload != Payload::Verack {
let (stream, payload) = try_ready!(future.poll());
if payload != Payload::Verack {
return Err(Error::Handshake);
}
@ -140,8 +140,8 @@ impl<A> Future for AcceptHandshake<A> where A: io::Read + io::Write {
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (next, result) = match self.state {
AcceptHandshakeState::ReceiveVersion { ref mut local_version, ref mut future } => {
let (stream, message) = try_ready!(future.poll());
let version = match message.payload {
let (stream, payload) = try_ready!(future.poll());
let version = match payload {
Payload::Version(version) => version,
_ => return Err(Error::Handshake),
};

View File

@ -1,7 +1,7 @@
use std::io::{self, Read};
use futures::{Future, Poll, Async};
use futures::stream::Stream;
use message::{Message, MessageHeader};
use message::Payload;
use message::common::Magic;
use io::{read_header, read_payload, ReadHeader, ReadPayload, ReadRc};
use Error;
@ -12,7 +12,6 @@ enum ReadMessageState<A> {
future: ReadHeader<A>,
},
ReadPayload {
header: Option<MessageHeader>,
future: ReadPayload<A>
},
Finished,
@ -49,7 +48,7 @@ pub struct ReadMessageStream<A> {
}
impl<A> Future for ReadMessage<A> where A: io::Read {
type Item = (A, Message);
type Item = (A, Payload);
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -58,22 +57,16 @@ impl<A> Future for ReadMessage<A> where A: io::Read {
let (read, header) = try_ready!(future.poll());
let future = read_payload(
read, version, header.len as usize,
header.command.clone(), header.checksum.clone()
header.command, header.checksum
);
let next = ReadMessageState::ReadPayload {
future: future,
header: Some(header),
};
(next, Async::NotReady)
},
ReadMessageState::ReadPayload { ref mut header, ref mut future } => {
ReadMessageState::ReadPayload { ref mut future } => {
let (read, payload) = try_ready!(future.poll());
let message = Message {
header: header.take().expect("payload must be preceded by header"),
payload: payload,
};
(ReadMessageState::Finished, Async::Ready((read, message)))
(ReadMessageState::Finished, Async::Ready((read, payload)))
},
ReadMessageState::Finished => panic!("poll AcceptHandshake after it's done"),
};
@ -88,7 +81,7 @@ impl<A> Future for ReadMessage<A> where A: io::Read {
}
impl<A> Stream for ReadMessageStream<A> where A: io::Read {
type Item = Message;
type Item = Payload;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
@ -113,22 +106,22 @@ mod tests {
use futures::Future;
use futures::stream::Stream;
use bytes::Bytes;
use message::{Message, Payload};
use message::Payload;
use message::common::Magic;
use super::{read_message, read_message_stream};
#[test]
fn test_read_message() {
let raw: Bytes = "f9beb4d976657261636b000000000000000000005df6e0e2".into();
let expected = Message::new(Magic::Mainnet, Payload::Verack);
let expected = Payload::Verack;
assert_eq!(read_message(raw.as_ref(), Magic::Mainnet, 0).wait().unwrap().1, expected);
}
#[test]
fn test_read_message_stream() {
let raw: Bytes = "f9beb4d976657261636b000000000000000000005df6e0e2f9beb4d9676574616464720000000000000000005df6e0e2".into();
let expected0 = Message::new(Magic::Mainnet, Payload::Verack);
let expected1 = Message::new(Magic::Mainnet, Payload::GetAddr);
let expected0 = Payload::Verack;
let expected1 = Payload::GetAddr;
let mut stream = read_message_stream(Cursor::new(raw), Magic::Mainnet, 0);
assert_eq!(stream.poll().unwrap(), Some(expected0).into());

View File

@ -1,8 +1,7 @@
use std::{net, io};
use tokio_core::io::{Io, ReadHalf, WriteHalf};
use message::{Message, Payload};
use message::common::{Magic, InventoryVector};
use message::types::{Ping, Pong, addr, Inv, GetData, NotFound};
use message::common::Magic;
use io::{write_message, WriteMessage, read_message_stream, ReadMessageStream};
pub struct Connection<A> {
@ -65,63 +64,10 @@ impl<A> ConnectionReader<A> where A: io::Read {
}
impl<A> ConnectionWriter<A> where A: io::Write {
fn write_message(self, payload: Payload) -> WriteMessage<ConnectionWriter<A>> {
pub fn write_message(self, payload: Payload) -> WriteMessage<ConnectionWriter<A>> {
let message = Message::new(self.magic, payload);
write_message(self, message)
}
pub fn ping(self) -> WriteMessage<ConnectionWriter<A>> {
let payload = Payload::Ping(Ping {
nonce: 0,
});
self.write_message(payload)
}
pub fn pong(self, nonce: u64) -> WriteMessage<ConnectionWriter<A>> {
let payload = Payload::Pong(Pong {
nonce: nonce,
});
self.write_message(payload)
}
pub fn getaddr(self) -> WriteMessage<ConnectionWriter<A>> {
let payload = Payload::GetAddr;
self.write_message(payload)
}
pub fn addr(self, addresses: Vec<addr::AddressEntry>) -> WriteMessage<ConnectionWriter<A>> {
let payload = if self.version < 31402 {
Payload::AddrBelow31402(addr::AddrBelow31402 {
addresses: addresses.into_iter().map(|x| x.address).collect(),
})
} else {
Payload::Addr(addr::Addr {
addresses: addresses,
})
};
self.write_message(payload)
}
pub fn inv(self, inventory: Vec<InventoryVector>) -> WriteMessage<ConnectionWriter<A>> {
let payload = Payload::Inv(Inv {
inventory: inventory,
});
self.write_message(payload)
}
pub fn getdata(self, inventory: Vec<InventoryVector>) -> WriteMessage<ConnectionWriter<A>> {
let payload = Payload::GetData(GetData {
inventory: inventory,
});
self.write_message(payload)
}
pub fn notfound(self, inventory: Vec<InventoryVector>) -> WriteMessage<ConnectionWriter<A>> {
let payload = Payload::NotFound(NotFound {
inventory: inventory,
});
self.write_message(payload)
}
}
#[cfg(test)]
@ -129,15 +75,15 @@ mod test {
use std::io::Cursor;
use futures::stream::Stream;
use bytes::Bytes;
use message::{Message, Payload};
use message::Payload;
use message::common::Magic;
use super::ConnectionReader;
#[test]
fn test_connection_reader_stream() {
let raw: Bytes = "f9beb4d976657261636b000000000000000000005df6e0e2f9beb4d9676574616464720000000000000000005df6e0e2".into();
let expected0 = Message::new(Magic::Mainnet, Payload::Verack);
let expected1 = Message::new(Magic::Mainnet, Payload::GetAddr);
let expected0 = Payload::Verack;
let expected1 = Payload::GetAddr;
let reader = ConnectionReader {
stream: Cursor::new(raw),