2016-10-19 17:51:20 -07:00
|
|
|
use std::io;
|
|
|
|
use futures::{Future, Poll, Async};
|
2017-03-25 02:05:49 -07:00
|
|
|
use tokio_io::io::{read_exact, ReadExact};
|
|
|
|
use tokio_io::AsyncRead;
|
2016-10-19 17:51:20 -07:00
|
|
|
use crypto::checksum;
|
2016-11-25 09:38:21 -08:00
|
|
|
use network::Magic;
|
|
|
|
use message::{Error, MessageHeader, MessageResult, Command};
|
2016-10-19 17:51:20 -07:00
|
|
|
use bytes::Bytes;
|
|
|
|
use io::{read_header, ReadHeader};
|
|
|
|
|
2017-03-25 02:05:49 -07:00
|
|
|
pub fn read_any_message<A>(a: A, magic: Magic) -> ReadAnyMessage<A> where A: AsyncRead {
|
2016-10-19 17:51:20 -07:00
|
|
|
ReadAnyMessage {
|
|
|
|
state: ReadAnyMessageState::ReadHeader(read_header(a, magic)),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub enum ReadAnyMessageState<A> {
|
|
|
|
ReadHeader(ReadHeader<A>),
|
|
|
|
ReadPayload {
|
|
|
|
header: MessageHeader,
|
|
|
|
future: ReadExact<A, Bytes>
|
|
|
|
},
|
|
|
|
Finished,
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct ReadAnyMessage<A> {
|
|
|
|
state: ReadAnyMessageState<A>,
|
|
|
|
}
|
|
|
|
|
2017-03-25 02:05:49 -07:00
|
|
|
impl<A> Future for ReadAnyMessage<A> where A: AsyncRead {
|
2016-10-19 17:51:20 -07:00
|
|
|
type Item = MessageResult<(Command, Bytes)>;
|
|
|
|
type Error = io::Error;
|
|
|
|
|
|
|
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
|
|
|
let (next, result) = match self.state {
|
|
|
|
ReadAnyMessageState::ReadHeader(ref mut header) => {
|
|
|
|
let (stream, header) = try_ready!(header.poll());
|
|
|
|
let header = match header {
|
|
|
|
Ok(header) => header,
|
|
|
|
Err(err) => return Ok(Err(err).into()),
|
|
|
|
};
|
|
|
|
let future = read_exact(stream, Bytes::new_with_len(header.len as usize));
|
|
|
|
let next = ReadAnyMessageState::ReadPayload {
|
|
|
|
header: header,
|
|
|
|
future: future,
|
|
|
|
};
|
|
|
|
(next, Async::NotReady)
|
|
|
|
},
|
|
|
|
ReadAnyMessageState::ReadPayload { ref mut header, ref mut future } => {
|
|
|
|
let (_stream, bytes) = try_ready!(future.poll());
|
|
|
|
if checksum(&bytes) != header.checksum {
|
|
|
|
return Ok(Err(Error::InvalidChecksum).into());
|
|
|
|
}
|
|
|
|
let next = ReadAnyMessageState::Finished;
|
|
|
|
(next, Ok((header.command.clone(), bytes)).into())
|
|
|
|
},
|
|
|
|
ReadAnyMessageState::Finished => panic!("poll ReadAnyMessage after it's done"),
|
|
|
|
};
|
|
|
|
|
|
|
|
self.state = next;
|
|
|
|
match result {
|
|
|
|
// by polling again, we register new future
|
|
|
|
Async::NotReady => self.poll(),
|
|
|
|
result => Ok(result)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2016-10-28 05:59:58 -07:00
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use futures::Future;
|
|
|
|
use bytes::Bytes;
|
2016-11-25 09:38:21 -08:00
|
|
|
use network::Magic;
|
|
|
|
use message::Error;
|
2016-10-28 05:59:58 -07:00
|
|
|
use super::read_any_message;
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_read_any_message() {
|
|
|
|
let raw: Bytes = "f9beb4d970696e6700000000000000000800000083c00c765845303b6da97786".into();
|
|
|
|
let name = "ping".into();
|
|
|
|
let nonce = "5845303b6da97786".into();
|
|
|
|
let expected = (name, nonce);
|
|
|
|
|
|
|
|
assert_eq!(read_any_message(raw.as_ref(), Magic::Mainnet).wait().unwrap(), Ok(expected));
|
2016-11-25 09:38:21 -08:00
|
|
|
assert_eq!(read_any_message(raw.as_ref(), Magic::Testnet).wait().unwrap(), Err(Error::InvalidMagic));
|
2016-10-28 05:59:58 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_read_too_short_any_message() {
|
|
|
|
let raw: Bytes = "f9beb4d970696e6700000000000000000800000083c00c765845303b6da977".into();
|
|
|
|
assert!(read_any_message(raw.as_ref(), Magic::Mainnet).wait().is_err());
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_read_any_message_with_invalid_checksum() {
|
|
|
|
let raw: Bytes = "f9beb4d970696e6700000000000000000800000083c01c765845303b6da97786".into();
|
|
|
|
assert_eq!(read_any_message(raw.as_ref(), Magic::Mainnet).wait().unwrap(), Err(Error::InvalidChecksum));
|
|
|
|
}
|
|
|
|
}
|