From 045b607216bd725a7ffc2c87218544b0e3441a36 Mon Sep 17 00:00:00 2001 From: debris Date: Thu, 6 Oct 2016 14:48:29 +0200 Subject: [PATCH] read_message does return only message --- p2p/src/io/handshake.rs | 12 +++---- p2p/src/io/read_message.rs | 27 ++++++---------- p2p/src/net/connection.rs | 64 +++----------------------------------- 3 files changed, 21 insertions(+), 82 deletions(-) diff --git a/p2p/src/io/handshake.rs b/p2p/src/io/handshake.rs index cb9b6548..038d3944 100644 --- a/p2p/src/io/handshake.rs +++ b/p2p/src/io/handshake.rs @@ -93,8 +93,8 @@ impl Future for Handshake where A: io::Read + io::Write { (HandshakeState::ReceiveVersion(read_message(stream, self.magic, 0)), Async::NotReady) }, HandshakeState::ReceiveVersion(ref mut future) => { - let (stream, message) = try_ready!(future.poll()); - let version = match message.payload { + let (stream, payload) = try_ready!(future.poll()); + let version = match payload { Payload::Version(version) => version, _ => return Err(Error::Handshake), }; @@ -107,8 +107,8 @@ impl Future for Handshake where A: io::Read + io::Write { (next, Async::NotReady) }, HandshakeState::ReceiveVerack { ref mut version, ref mut future } => { - let (stream, message) = try_ready!(future.poll()); - if message.payload != Payload::Verack { + let (stream, payload) = try_ready!(future.poll()); + if payload != Payload::Verack { return Err(Error::Handshake); } @@ -140,8 +140,8 @@ impl Future for AcceptHandshake where A: io::Read + io::Write { fn poll(&mut self) -> Poll { let (next, result) = match self.state { AcceptHandshakeState::ReceiveVersion { ref mut local_version, ref mut future } => { - let (stream, message) = try_ready!(future.poll()); - let version = match message.payload { + let (stream, payload) = try_ready!(future.poll()); + let version = match payload { Payload::Version(version) => version, _ => return Err(Error::Handshake), }; diff --git a/p2p/src/io/read_message.rs b/p2p/src/io/read_message.rs index 87a15424..33902141 100644 --- a/p2p/src/io/read_message.rs +++ b/p2p/src/io/read_message.rs @@ -1,7 +1,7 @@ use std::io::{self, Read}; use futures::{Future, Poll, Async}; use futures::stream::Stream; -use message::{Message, MessageHeader}; +use message::Payload; use message::common::Magic; use io::{read_header, read_payload, ReadHeader, ReadPayload, ReadRc}; use Error; @@ -12,7 +12,6 @@ enum ReadMessageState { future: ReadHeader, }, ReadPayload { - header: Option, future: ReadPayload }, Finished, @@ -49,7 +48,7 @@ pub struct ReadMessageStream { } impl Future for ReadMessage where A: io::Read { - type Item = (A, Message); + type Item = (A, Payload); type Error = Error; fn poll(&mut self) -> Poll { @@ -58,22 +57,16 @@ impl Future for ReadMessage where A: io::Read { let (read, header) = try_ready!(future.poll()); let future = read_payload( read, version, header.len as usize, - header.command.clone(), header.checksum.clone() + header.command, header.checksum ); let next = ReadMessageState::ReadPayload { future: future, - header: Some(header), }; (next, Async::NotReady) }, - ReadMessageState::ReadPayload { ref mut header, ref mut future } => { + ReadMessageState::ReadPayload { ref mut future } => { let (read, payload) = try_ready!(future.poll()); - let message = Message { - header: header.take().expect("payload must be preceded by header"), - payload: payload, - }; - - (ReadMessageState::Finished, Async::Ready((read, message))) + (ReadMessageState::Finished, Async::Ready((read, payload))) }, ReadMessageState::Finished => panic!("poll AcceptHandshake after it's done"), }; @@ -88,7 +81,7 @@ impl Future for ReadMessage where A: io::Read { } impl Stream for ReadMessageStream where A: io::Read { - type Item = Message; + type Item = Payload; type Error = Error; fn poll(&mut self) -> Poll, Self::Error> { @@ -113,22 +106,22 @@ mod tests { use futures::Future; use futures::stream::Stream; use bytes::Bytes; - use message::{Message, Payload}; + use message::Payload; use message::common::Magic; use super::{read_message, read_message_stream}; #[test] fn test_read_message() { let raw: Bytes = "f9beb4d976657261636b000000000000000000005df6e0e2".into(); - let expected = Message::new(Magic::Mainnet, Payload::Verack); + let expected = Payload::Verack; assert_eq!(read_message(raw.as_ref(), Magic::Mainnet, 0).wait().unwrap().1, expected); } #[test] fn test_read_message_stream() { let raw: Bytes = "f9beb4d976657261636b000000000000000000005df6e0e2f9beb4d9676574616464720000000000000000005df6e0e2".into(); - let expected0 = Message::new(Magic::Mainnet, Payload::Verack); - let expected1 = Message::new(Magic::Mainnet, Payload::GetAddr); + let expected0 = Payload::Verack; + let expected1 = Payload::GetAddr; let mut stream = read_message_stream(Cursor::new(raw), Magic::Mainnet, 0); assert_eq!(stream.poll().unwrap(), Some(expected0).into()); diff --git a/p2p/src/net/connection.rs b/p2p/src/net/connection.rs index 9de2b290..6fa18aef 100644 --- a/p2p/src/net/connection.rs +++ b/p2p/src/net/connection.rs @@ -1,8 +1,7 @@ use std::{net, io}; use tokio_core::io::{Io, ReadHalf, WriteHalf}; use message::{Message, Payload}; -use message::common::{Magic, InventoryVector}; -use message::types::{Ping, Pong, addr, Inv, GetData, NotFound}; +use message::common::Magic; use io::{write_message, WriteMessage, read_message_stream, ReadMessageStream}; pub struct Connection { @@ -65,63 +64,10 @@ impl ConnectionReader where A: io::Read { } impl ConnectionWriter where A: io::Write { - fn write_message(self, payload: Payload) -> WriteMessage> { + pub fn write_message(self, payload: Payload) -> WriteMessage> { let message = Message::new(self.magic, payload); write_message(self, message) } - - pub fn ping(self) -> WriteMessage> { - let payload = Payload::Ping(Ping { - nonce: 0, - }); - self.write_message(payload) - } - - pub fn pong(self, nonce: u64) -> WriteMessage> { - let payload = Payload::Pong(Pong { - nonce: nonce, - }); - self.write_message(payload) - } - - pub fn getaddr(self) -> WriteMessage> { - let payload = Payload::GetAddr; - self.write_message(payload) - } - - pub fn addr(self, addresses: Vec) -> WriteMessage> { - let payload = if self.version < 31402 { - Payload::AddrBelow31402(addr::AddrBelow31402 { - addresses: addresses.into_iter().map(|x| x.address).collect(), - }) - } else { - Payload::Addr(addr::Addr { - addresses: addresses, - }) - }; - self.write_message(payload) - } - - pub fn inv(self, inventory: Vec) -> WriteMessage> { - let payload = Payload::Inv(Inv { - inventory: inventory, - }); - self.write_message(payload) - } - - pub fn getdata(self, inventory: Vec) -> WriteMessage> { - let payload = Payload::GetData(GetData { - inventory: inventory, - }); - self.write_message(payload) - } - - pub fn notfound(self, inventory: Vec) -> WriteMessage> { - let payload = Payload::NotFound(NotFound { - inventory: inventory, - }); - self.write_message(payload) - } } #[cfg(test)] @@ -129,15 +75,15 @@ mod test { use std::io::Cursor; use futures::stream::Stream; use bytes::Bytes; - use message::{Message, Payload}; + use message::Payload; use message::common::Magic; use super::ConnectionReader; #[test] fn test_connection_reader_stream() { let raw: Bytes = "f9beb4d976657261636b000000000000000000005df6e0e2f9beb4d9676574616464720000000000000000005df6e0e2".into(); - let expected0 = Message::new(Magic::Mainnet, Payload::Verack); - let expected1 = Message::new(Magic::Mainnet, Payload::GetAddr); + let expected0 = Payload::Verack; + let expected1 = Payload::GetAddr; let reader = ConnectionReader { stream: Cursor::new(raw),