improved p2p::io

This commit is contained in:
debris 2016-09-28 19:45:22 +02:00
parent 4039a1b0e7
commit c2b1af74a3
4 changed files with 19 additions and 24 deletions

View File

@ -5,7 +5,7 @@ use ser::{
}; };
use common::Command; use common::Command;
#[derive(Debug, PartialEq, Clone)] #[derive(Debug, PartialEq)]
pub struct MessageHeader { pub struct MessageHeader {
pub magic: u32, pub magic: u32,
pub command: Command, pub command: Command,

View File

@ -20,13 +20,9 @@ impl<A> Future for ReadHeader<A> where A: io::Read {
type Error = Error; type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match try_nb!(self.reader.poll()) { let (read, data) = try_async!(self.reader.poll());
Async::Ready((read, data)) => { let header = try!(deserialize(&data));
let header: MessageHeader = try!(deserialize(&data)); Ok(Async::Ready((read, header)))
Ok(Async::Ready((read, header)))
},
Async::NotReady => Ok(Async::NotReady),
}
} }
} }

View File

@ -9,9 +9,10 @@ enum ReadMessageState<A> {
future: ReadHeader<A>, future: ReadHeader<A>,
}, },
ReadPayload { ReadPayload {
header: MessageHeader, header: Option<MessageHeader>,
future: ReadPayload<A> future: ReadPayload<A>
}, },
Finished,
} }
pub fn read_message<A>(a: A, version: u32) -> ReadMessage<A> where A: io::Read { pub fn read_message<A>(a: A, version: u32) -> ReadMessage<A> where A: io::Read {
@ -32,27 +33,29 @@ impl<A> Future for ReadMessage<A> where A: io::Read {
type Error = Error; type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let next = match self.state { let (next, result) = match self.state {
ReadMessageState::ReadHeader { version, ref mut future } => { ReadMessageState::ReadHeader { version, ref mut future } => {
let (read, header) = try_async!(future.poll()); let (read, header) = try_async!(future.poll());
ReadMessageState::ReadPayload { let next = ReadMessageState::ReadPayload {
future: read_payload(read, version, header.len as usize, header.command.clone()), future: read_payload(read, version, header.len as usize, header.command.clone()),
header: header, header: Some(header),
} };
(next, Async::NotReady)
}, },
ReadMessageState::ReadPayload { ref header, ref mut future } => { ReadMessageState::ReadPayload { ref mut header, ref mut future } => {
let (read, payload) = try_async!(future.poll()); let (read, payload) = try_async!(future.poll());
let message = Message { let message = Message {
header: header.clone(), header: header.take().expect("payload must be preceded by header"),
payload: payload, payload: payload,
}; };
return Ok(Async::Ready((read, message))) (ReadMessageState::Finished, Async::Ready((read, message)))
}, },
ReadMessageState::Finished => panic!("poll AcceptHandshake after it's done"),
}; };
self.state = next; self.state = next;
Ok(Async::NotReady) Ok(result)
} }
} }

View File

@ -25,12 +25,8 @@ impl<A> Future for ReadPayload<A> where A: io::Read {
type Error = Error; type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match try_nb!(self.reader.poll()) { let (read, data) = try_async!(self.reader.poll());
Async::Ready((read, data)) => { let payload = try!(deserialize_payload(&data, self.version, &self.command));
let payload = try!(deserialize_payload(&data, self.version, &self.command)); Ok(Async::Ready((read, payload)))
Ok(Async::Ready((read, payload)))
},
Async::NotReady => Ok(Async::NotReady),
}
} }
} }