read_message stream propagates errors up

This commit is contained in:
debris 2016-10-06 01:44:26 +02:00
parent b09708ca5b
commit 338e198ac5
1 changed files with 6 additions and 8 deletions

View File

@ -93,14 +93,12 @@ impl<A> Stream for ReadMessageStream<A> where A: io::Read {
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let result = match self.future.poll() {
Ok(Async::Ready((_, result))) => {
Ok(Some(result).into())
},
Ok(Async::Ready((_, result))) => Ok(Some(result).into()),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_) => {
// TODO: error should be somehow handled and passed upstream
Err(Error::Io(err)) => return Err(Error::Io(err)),
Err(err) => {
try!(self.stream.read_to_end(&mut Vec::new()));
Ok(Async::NotReady)
Err(err)
}
};
@ -112,7 +110,7 @@ impl<A> Stream for ReadMessageStream<A> where A: io::Read {
#[cfg(test)]
mod tests {
use std::io::Cursor;
use futures::{Future, Async};
use futures::Future;
use futures::stream::Stream;
use bytes::Bytes;
use message::{Message, Payload};
@ -135,6 +133,6 @@ mod tests {
let mut stream = read_message_stream(Cursor::new(raw), Magic::Mainnet, 0);
assert_eq!(stream.poll().unwrap(), Some(expected0).into());
assert_eq!(stream.poll().unwrap(), Some(expected1).into());
assert_eq!(stream.poll().unwrap(), Async::NotReady);
assert!(stream.poll().is_err());
}
}