From c2b1af74a3e236d300f888f70f35cedf932baf5e Mon Sep 17 00:00:00 2001 From: debris Date: Wed, 28 Sep 2016 19:45:22 +0200 Subject: [PATCH] improved p2p::io --- net/src/messages/message_header.rs | 2 +- p2p/src/io/read_header.rs | 10 +++------- p2p/src/io/read_message.rs | 21 ++++++++++++--------- p2p/src/io/read_payload.rs | 10 +++------- 4 files changed, 19 insertions(+), 24 deletions(-) diff --git a/net/src/messages/message_header.rs b/net/src/messages/message_header.rs index 4aa1287a..4ea4f837 100644 --- a/net/src/messages/message_header.rs +++ b/net/src/messages/message_header.rs @@ -5,7 +5,7 @@ use ser::{ }; use common::Command; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq)] pub struct MessageHeader { pub magic: u32, pub command: Command, diff --git a/p2p/src/io/read_header.rs b/p2p/src/io/read_header.rs index 6b62f999..c0a960c0 100644 --- a/p2p/src/io/read_header.rs +++ b/p2p/src/io/read_header.rs @@ -20,13 +20,9 @@ impl Future for ReadHeader where A: io::Read { type Error = Error; fn poll(&mut self) -> Poll { - match try_nb!(self.reader.poll()) { - Async::Ready((read, data)) => { - let header: MessageHeader = try!(deserialize(&data)); - Ok(Async::Ready((read, header))) - }, - Async::NotReady => Ok(Async::NotReady), - } + let (read, data) = try_async!(self.reader.poll()); + let header = try!(deserialize(&data)); + Ok(Async::Ready((read, header))) } } diff --git a/p2p/src/io/read_message.rs b/p2p/src/io/read_message.rs index 2bba2301..19d24afd 100644 --- a/p2p/src/io/read_message.rs +++ b/p2p/src/io/read_message.rs @@ -9,9 +9,10 @@ enum ReadMessageState { future: ReadHeader, }, ReadPayload { - header: MessageHeader, + header: Option, future: ReadPayload }, + Finished, } pub fn read_message(a: A, version: u32) -> ReadMessage where A: io::Read { @@ -32,27 +33,29 @@ impl Future for ReadMessage where A: io::Read { type Error = Error; fn poll(&mut self) -> Poll { - let next = match self.state { + let (next, result) = match self.state { ReadMessageState::ReadHeader { version, ref mut future } => { let (read, header) = try_async!(future.poll()); - ReadMessageState::ReadPayload { + let next = ReadMessageState::ReadPayload { future: read_payload(read, version, header.len as usize, header.command.clone()), - header: header, - } + header: Some(header), + }; + (next, Async::NotReady) }, - ReadMessageState::ReadPayload { ref header, ref mut future } => { + ReadMessageState::ReadPayload { ref mut header, ref mut future } => { let (read, payload) = try_async!(future.poll()); let message = Message { - header: header.clone(), + header: header.take().expect("payload must be preceded by header"), payload: payload, }; - return Ok(Async::Ready((read, message))) + (ReadMessageState::Finished, Async::Ready((read, message))) }, + ReadMessageState::Finished => panic!("poll AcceptHandshake after it's done"), }; self.state = next; - Ok(Async::NotReady) + Ok(result) } } diff --git a/p2p/src/io/read_payload.rs b/p2p/src/io/read_payload.rs index 594d8d62..0ccd666e 100644 --- a/p2p/src/io/read_payload.rs +++ b/p2p/src/io/read_payload.rs @@ -25,12 +25,8 @@ impl Future for ReadPayload where A: io::Read { type Error = Error; fn poll(&mut self) -> Poll { - match try_nb!(self.reader.poll()) { - Async::Ready((read, data)) => { - let payload = try!(deserialize_payload(&data, self.version, &self.command)); - Ok(Async::Ready((read, payload))) - }, - Async::NotReady => Ok(Async::NotReady), - } + let (read, data) = try_async!(self.reader.poll()); + let payload = try!(deserialize_payload(&data, self.version, &self.command)); + Ok(Async::Ready((read, payload))) } }