From 047307be4694c1940ba11be694457375d59ce4be Mon Sep 17 00:00:00 2001 From: Andreas Fackler Date: Wed, 9 May 2018 14:18:12 +0200 Subject: [PATCH] Revert to earlier ProtoIo implementation. This fixes the issue where sometimes messages were skipped. --- src/proto_io.rs | 68 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 57 insertions(+), 11 deletions(-) diff --git a/src/proto_io.rs b/src/proto_io.rs index 5628e40..3c67d21 100644 --- a/src/proto_io.rs +++ b/src/proto_io.rs @@ -3,9 +3,9 @@ use proto::*; use protobuf; use protobuf::Message as ProtobufMessage; -use std::io; use std::io::{Read, Write}; use std::net::TcpStream; +use std::{cmp, io}; /// A magic key to put right before each message. An atavism of primitive serial /// protocols. @@ -35,14 +35,42 @@ impl From for Error { } } +fn encode_u32_to_be(value: u32, buffer: &mut [u8]) -> Result<(), Error> { + if buffer.len() < 4 { + return Err(Error::EncodeError); + } + let value = value.to_le(); + buffer[0] = ((value & 0xFF00_0000) >> 24) as u8; + buffer[1] = ((value & 0x00FF_0000) >> 16) as u8; + buffer[2] = ((value & 0x0000_FF00) >> 8) as u8; + buffer[3] = (value & 0x0000_00FF) as u8; + Ok(()) +} + +fn decode_u32_from_be(buffer: &[u8]) -> Result { + if buffer.len() < 4 { + return Err(Error::DecodeError); + } + let mut result = u32::from(buffer[0]); + result <<= 8; + result += u32::from(buffer[1]); + result <<= 8; + result += u32::from(buffer[2]); + result <<= 8; + result += u32::from(buffer[3]); + Ok(result) +} + pub struct ProtoIo { stream: S, + buffer: [u8; 1024 * 4], } impl ProtoIo { pub fn try_clone(&self) -> Result, ::std::io::Error> { Ok(ProtoIo { stream: self.stream.try_clone()?, + buffer: [0; 1024 * 4], }) } } @@ -52,31 +80,52 @@ impl ProtoIo //where T: Clone + Send + Sync + From> + Into> { pub fn from_stream(stream: S) -> Self { - ProtoIo { stream } + ProtoIo { + stream, + buffer: [0; 1024 * 4], + } } pub fn recv(&mut self) -> Result, Error> where T: Clone + Send + Sync + AsRef<[u8]> + From>, { - let mut stream = protobuf::CodedInputStream::new(&mut self.stream); - // Read magic number - if stream.read_raw_varint32()? != FRAME_START { + self.stream.read_exact(&mut self.buffer[0..4])?; + let frame_start = decode_u32_from_be(&self.buffer[0..4])?; + if frame_start != FRAME_START { return Err(Error::FrameStartMismatch); + }; + self.stream.read_exact(&mut self.buffer[0..4])?; + let size = decode_u32_from_be(&self.buffer[0..4])? as usize; + + let mut message_v: Vec = Vec::new(); + message_v.reserve(size); + while message_v.len() < size { + let num_to_read = cmp::min(self.buffer.len(), size - message_v.len()); + let (slice, _) = self.buffer.split_at_mut(num_to_read); + self.stream.read_exact(slice)?; + message_v.extend_from_slice(slice); } - Message::from_proto(stream.read_message()?).ok_or(Error::DecodeError) + + Message::parse_from_bytes(&message_v).map_err(Error::ProtobufError) } pub fn send(&mut self, message: Message) -> Result<(), Error> where T: Clone + Send + Sync + AsRef<[u8]> + From>, { + let mut buffer: [u8; 4] = [0; 4]; + // Wrap stream let mut stream = protobuf::CodedOutputStream::new(&mut self.stream); // Write magic number - stream.write_raw_varint32(FRAME_START)?; + encode_u32_to_be(FRAME_START, &mut buffer[0..4])?; + stream.write_raw_bytes(&buffer)?; let message_p = message.into_proto(); + // Write message size + encode_u32_to_be(message_p.compute_size(), &mut buffer[0..4])?; + stream.write_raw_bytes(&buffer)?; // Write message - message_p.write_length_delimited_to(&mut stream)?; + message_p.write_to(&mut stream)?; // Flush stream.flush()?; Ok(()) @@ -101,9 +150,6 @@ mod tests { println!("{:?}", pio.stream.get_ref()); pio.stream.set_position(0); assert_eq!(msg0, pio.recv().expect("recv msg0")); - // TODO: Figure out why the cursor is wrong here. - let len = pio.stream.get_ref().len() as u64; - pio.stream.set_position(len / 2); assert_eq!(msg1, pio.recv().expect("recv msg1")); } }