remove redundant recursion in p2p::io

This commit is contained in:
debris 2017-08-04 14:05:58 +02:00
parent e9f217a6de
commit 5b8bf322b0
3 changed files with 127 additions and 161 deletions

View File

@ -58,7 +58,6 @@ enum HandshakeState<A> {
version: Option<Version>, version: Option<Version>,
future: ReadMessage<Verack, A>, future: ReadMessage<Verack, A>,
}, },
Finished,
} }
enum AcceptHandshakeState<A> { enum AcceptHandshakeState<A> {
@ -74,7 +73,6 @@ enum AcceptHandshakeState<A> {
version: Option<Version>, version: Option<Version>,
future: WriteMessage<Verack, A>, future: WriteMessage<Verack, A>,
}, },
Finished,
} }
pub struct Handshake<A> { pub struct Handshake<A> {
@ -98,10 +96,11 @@ impl<A> Future for Handshake<A> where A: AsyncRead + AsyncWrite {
type Error = io::Error; type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (next, result) = match self.state { loop {
let next_state = match self.state {
HandshakeState::SendVersion(ref mut future) => { HandshakeState::SendVersion(ref mut future) => {
let (stream, _) = try_ready!(future.poll()); let (stream, _) = try_ready!(future.poll());
(HandshakeState::ReceiveVersion(read_message(stream, self.magic, 0)), Async::NotReady) HandshakeState::ReceiveVersion(read_message(stream, self.magic, 0))
}, },
HandshakeState::ReceiveVersion(ref mut future) => { HandshakeState::ReceiveVersion(ref mut future) => {
let (stream, version) = try_ready!(future.poll()); let (stream, version) = try_ready!(future.poll());
@ -119,24 +118,20 @@ impl<A> Future for Handshake<A> where A: AsyncRead + AsyncWrite {
} }
} }
let next = HandshakeState::SendVerack { HandshakeState::SendVerack {
version: Some(version), version: Some(version),
future: write_message(stream, verack_message(self.magic)), future: write_message(stream, verack_message(self.magic)),
}; }
(next, Async::NotReady)
}, },
HandshakeState::SendVerack { ref mut version, ref mut future } => { HandshakeState::SendVerack { ref mut version, ref mut future } => {
let (stream, _) = try_ready!(future.poll()); let (stream, _) = try_ready!(future.poll());
let version = version.take().expect("verack must be preceded by version"); let version = version.take().expect("verack must be preceded by version");
let next = HandshakeState::ReceiveVerack { HandshakeState::ReceiveVerack {
version: Some(version), version: Some(version),
future: read_message(stream, self.magic, 0), future: read_message(stream, self.magic, 0),
}; }
(next, Async::NotReady)
}, },
HandshakeState::ReceiveVerack { ref mut version, ref mut future } => { HandshakeState::ReceiveVerack { ref mut version, ref mut future } => {
let (stream, _verack) = try_ready!(future.poll()); let (stream, _verack) = try_ready!(future.poll());
@ -147,16 +142,10 @@ impl<A> Future for Handshake<A> where A: AsyncRead + AsyncWrite {
version: version, version: version,
}; };
(HandshakeState::Finished, Async::Ready((stream, Ok(result)))) return Ok(Async::Ready((stream, Ok(result))));
}, },
HandshakeState::Finished => panic!("poll Handshake after it's done"),
}; };
self.state = next_state;
self.state = next;
match result {
// by polling again, we register new future
Async::NotReady => self.poll(),
result => Ok(result)
} }
} }
} }
@ -166,7 +155,8 @@ impl<A> Future for AcceptHandshake<A> where A: AsyncRead + AsyncWrite {
type Error = io::Error; type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (next, result) = match self.state { loop {
let next_state = match self.state {
AcceptHandshakeState::ReceiveVersion { ref mut local_version, ref mut future } => { AcceptHandshakeState::ReceiveVersion { ref mut local_version, ref mut future } => {
let (stream, version) = try_ready!(future.poll()); let (stream, version) = try_ready!(future.poll());
let version = match version { let version = match version {
@ -184,21 +174,17 @@ impl<A> Future for AcceptHandshake<A> where A: AsyncRead + AsyncWrite {
} }
let local_version = local_version.take().expect("local version must be set"); let local_version = local_version.take().expect("local version must be set");
let next = AcceptHandshakeState::SendVersion { AcceptHandshakeState::SendVersion {
version: Some(version), version: Some(version),
future: write_message(stream, version_message(self.magic, local_version)), future: write_message(stream, version_message(self.magic, local_version)),
}; }
(next, Async::NotReady)
}, },
AcceptHandshakeState::SendVersion { ref mut version, ref mut future } => { AcceptHandshakeState::SendVersion { ref mut version, ref mut future } => {
let (stream, _) = try_ready!(future.poll()); let (stream, _) = try_ready!(future.poll());
let next = AcceptHandshakeState::SendVerack { AcceptHandshakeState::SendVerack {
version: version.take(), version: version.take(),
future: write_message(stream, verack_message(self.magic)), future: write_message(stream, verack_message(self.magic)),
}; }
(next, Async::NotReady)
}, },
AcceptHandshakeState::SendVerack { ref mut version, ref mut future } => { AcceptHandshakeState::SendVerack { ref mut version, ref mut future } => {
let (stream, _) = try_ready!(future.poll()); let (stream, _) = try_ready!(future.poll());
@ -210,16 +196,10 @@ impl<A> Future for AcceptHandshake<A> where A: AsyncRead + AsyncWrite {
version: version, version: version,
}; };
(AcceptHandshakeState::Finished, Async::Ready((stream, Ok(result)))) return Ok(Async::Ready((stream, Ok(result))));
}, },
AcceptHandshakeState::Finished => panic!("poll AcceptHandshake after it's done"),
}; };
self.state = next_state;
self.state = next;
match result {
// by polling again, we register new future
Async::NotReady => self.poll(),
result => Ok(result)
} }
} }
} }

View File

@ -20,7 +20,6 @@ pub enum ReadAnyMessageState<A> {
header: MessageHeader, header: MessageHeader,
future: ReadExact<A, Bytes> future: ReadExact<A, Bytes>
}, },
Finished,
} }
pub struct ReadAnyMessage<A> { pub struct ReadAnyMessage<A> {
@ -32,36 +31,30 @@ impl<A> Future for ReadAnyMessage<A> where A: AsyncRead {
type Error = io::Error; type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (next, result) = match self.state { loop {
let next_state = match self.state {
ReadAnyMessageState::ReadHeader(ref mut header) => { ReadAnyMessageState::ReadHeader(ref mut header) => {
let (stream, header) = try_ready!(header.poll()); let (stream, header) = try_ready!(header.poll());
let header = match header { let header = match header {
Ok(header) => header, Ok(header) => header,
Err(err) => return Ok(Err(err).into()), Err(err) => return Ok(Err(err).into()),
}; };
let future = read_exact(stream, Bytes::new_with_len(header.len as usize)); ReadAnyMessageState::ReadPayload {
let next = ReadAnyMessageState::ReadPayload { future: read_exact(stream, Bytes::new_with_len(header.len as usize)),
header: header, header: header,
future: future, }
};
(next, Async::NotReady)
}, },
ReadAnyMessageState::ReadPayload { ref mut header, ref mut future } => { ReadAnyMessageState::ReadPayload { ref mut header, ref mut future } => {
let (_stream, bytes) = try_ready!(future.poll()); let (_stream, bytes) = try_ready!(future.poll());
if checksum(&bytes) != header.checksum { if checksum(&bytes) != header.checksum {
return Ok(Err(Error::InvalidChecksum).into()); return Ok(Err(Error::InvalidChecksum).into());
} }
let next = ReadAnyMessageState::Finished;
(next, Ok((header.command.clone(), bytes)).into()) return Ok(Async::Ready(Ok((header.command.clone(), bytes))));
}, },
ReadAnyMessageState::Finished => panic!("poll ReadAnyMessage after it's done"),
}; };
self.state = next; self.state = next_state;
match result {
// by polling again, we register new future
Async::NotReady => self.poll(),
result => Ok(result)
} }
} }
} }

View File

@ -25,7 +25,6 @@ enum ReadMessageState<M, A> {
ReadPayload { ReadPayload {
future: ReadPayload<M, A>, future: ReadPayload<M, A>,
}, },
Finished,
} }
pub struct ReadMessage<M, A> { pub struct ReadMessage<M, A> {
@ -38,7 +37,8 @@ impl<M, A> Future for ReadMessage<M, A> where A: AsyncRead, M: Payload {
type Error = io::Error; type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (next, result) = match self.state { loop {
let next_state = match self.state {
ReadMessageState::ReadHeader { version, ref mut future } => { ReadMessageState::ReadHeader { version, ref mut future } => {
let (read, header) = try_ready!(future.poll()); let (read, header) = try_ready!(future.poll());
let header = match header { let header = match header {
@ -51,23 +51,16 @@ impl<M, A> Future for ReadMessage<M, A> where A: AsyncRead, M: Payload {
let future = read_payload( let future = read_payload(
read, version, header.len as usize, header.checksum, read, version, header.len as usize, header.checksum,
); );
let next = ReadMessageState::ReadPayload { ReadMessageState::ReadPayload {
future: future, future: future,
}; }
(next, Async::NotReady)
}, },
ReadMessageState::ReadPayload { ref mut future } => { ReadMessageState::ReadPayload { ref mut future } => {
let (read, payload) = try_ready!(future.poll()); let (read, payload) = try_ready!(future.poll());
(ReadMessageState::Finished, Async::Ready((read, payload))) return Ok(Async::Ready((read, payload)));
}, },
ReadMessageState::Finished => panic!("poll ReadMessage after it's done"),
}; };
self.state = next_state;
self.state = next;
match result {
// by polling again, we register new future
Async::NotReady => self.poll(),
result => Ok(result)
} }
} }
} }