Merge pull request #177 from poanetwork/afck-no-protobuf

Remove protobuf support.
This commit is contained in:
Vladimir Komendantskiy 2018-07-31 12:12:26 +01:00 committed by GitHub
commit 0e7055f8eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 24 additions and 504 deletions

View File

@ -26,7 +26,6 @@ itertools = "0.7"
log = "0.4.1" log = "0.4.1"
merkle = { git = "https://github.com/afck/merkle.rs", branch = "public-proof", features = [ "serialization-serde" ] } merkle = { git = "https://github.com/afck/merkle.rs", branch = "public-proof", features = [ "serialization-serde" ] }
pairing = { version = "0.14.2", features = ["u128-support"] } pairing = { version = "0.14.2", features = ["u128-support"] }
protobuf = { version = "2.0.0", optional = true }
rand = "0.4.2" rand = "0.4.2"
rand_derive = "0.3.1" rand_derive = "0.3.1"
reed-solomon-erasure = "3.1.0" reed-solomon-erasure = "3.1.0"
@ -34,12 +33,6 @@ ring = "^0.12"
serde = "1.0.55" serde = "1.0.55"
serde_derive = "1.0.55" serde_derive = "1.0.55"
[features]
serialization-protobuf = [ "protobuf", "protobuf-codegen-pure" ]
[build-dependencies]
protobuf-codegen-pure = { version = "2.0.0", optional = true }
[dev-dependencies] [dev-dependencies]
colored = "1.6" colored = "1.6"
crossbeam = "0.3.2" crossbeam = "0.3.2"
@ -50,7 +43,6 @@ signifix = "0.9"
[[example]] [[example]]
name = "consensus-node" name = "consensus-node"
required-features = [ "serialization-protobuf" ]
[[example]] [[example]]
name = "simulation" name = "simulation"

View File

@ -1,22 +0,0 @@
#[cfg(feature = "serialization-protobuf")]
mod feature_protobuf {
extern crate protobuf_codegen_pure;
pub fn main() {
println!("cargo:rerun-if-changed=proto/message.proto");
protobuf_codegen_pure::run(protobuf_codegen_pure::Args {
out_dir: "src/proto",
input: &["proto/message.proto"],
includes: &["proto"],
customize: Default::default(),
}).expect("protoc");
}
}
#[cfg(feature = "serialization-protobuf")]
fn main() {
feature_protobuf::main()
}
#[cfg(not(feature = "serialization-protobuf"))]
fn main() {}

View File

@ -1,5 +1,6 @@
//! Example of a consensus node that uses the `hbbft::node::Node` struct for //! Example of a consensus node that uses the `hbbft::node::Node` struct for
//! running the distributed consensus state machine. //! running the distributed consensus state machine.
extern crate bincode;
extern crate crossbeam; extern crate crossbeam;
#[macro_use] #[macro_use]
extern crate crossbeam_channel; extern crate crossbeam_channel;
@ -9,7 +10,7 @@ extern crate hbbft;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
extern crate pairing; extern crate pairing;
extern crate protobuf; extern crate serde;
mod network; mod network;

View File

@ -1,14 +1,14 @@
//! Comms task structure. A comms task communicates with a remote node through a //! Comms task structure. A comms task communicates with a remote node through a
//! socket. Local communication with coordinating threads is made via //! socket. Local communication with coordinating threads is made via
//! `crossbeam_channel::unbounded()`. //! `crossbeam_channel::unbounded()`.
use bincode;
use crossbeam; use crossbeam;
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::{Receiver, Sender};
use serde::{Deserialize, Serialize};
use std::io; use std::io;
use std::net::TcpStream; use std::net::TcpStream;
use hbbft::messaging::SourcedMessage; use hbbft::messaging::SourcedMessage;
use hbbft::proto_io::{ErrorKind, ProtoIo};
use protobuf::Message;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
@ -23,18 +23,18 @@ impl From<io::Error> for Error {
/// A communication task connects a remote node to the thread that manages the /// A communication task connects a remote node to the thread that manages the
/// consensus algorithm. /// consensus algorithm.
pub struct CommsTask<'a, P: 'a, M: 'a> { pub struct CommsTask<'a, M: 'a> {
/// The transmit side of the multiple producer channel from comms threads. /// The transmit side of the multiple producer channel from comms threads.
tx: &'a Sender<SourcedMessage<M, usize>>, tx: &'a Sender<SourcedMessage<M, usize>>,
/// The receive side of the channel to the comms thread. /// The receive side of the channel to the comms thread.
rx: &'a Receiver<M>, rx: &'a Receiver<M>,
/// The socket IO task. /// The socket IO task.
io: ProtoIo<TcpStream, P>, stream: TcpStream,
/// The index of this comms task for identification against its remote node. /// The index of this comms task for identification against its remote node.
pub node_index: usize, pub node_index: usize,
} }
impl<'a, P: Message + 'a, M: Into<P> + From<P> + Send + 'a> CommsTask<'a, P, M> { impl<'a, M: Serialize + for<'de> Deserialize<'de> + Send + 'a> CommsTask<'a, M> {
pub fn new( pub fn new(
tx: &'a Sender<SourcedMessage<M, usize>>, tx: &'a Sender<SourcedMessage<M, usize>>,
rx: &'a Receiver<M>, rx: &'a Receiver<M>,
@ -50,7 +50,7 @@ impl<'a, P: Message + 'a, M: Into<P> + From<P> + Send + 'a> CommsTask<'a, P, M>
CommsTask { CommsTask {
tx, tx,
rx, rx,
io: ProtoIo::from_stream(stream), stream,
node_index, node_index,
} }
} }
@ -61,7 +61,7 @@ impl<'a, P: Message + 'a, M: Into<P> + From<P> + Send + 'a> CommsTask<'a, P, M>
// Borrow parts of `self` before entering the thread binding scope. // Borrow parts of `self` before entering the thread binding scope.
let tx = self.tx; let tx = self.tx;
let rx = self.rx; let rx = self.rx;
let mut io1 = self.io.try_clone()?; let mut stream1 = self.stream.try_clone()?;
let node_index = self.node_index; let node_index = self.node_index;
crossbeam::scope(move |scope| { crossbeam::scope(move |scope| {
@ -71,26 +71,30 @@ impl<'a, P: Message + 'a, M: Into<P> + From<P> + Send + 'a> CommsTask<'a, P, M>
// Receive a multicast message from the manager thread. // Receive a multicast message from the manager thread.
let message = rx.recv().unwrap(); let message = rx.recv().unwrap();
// Forward the message to the remote node. // Forward the message to the remote node.
io1.send(&message.into()).unwrap(); bincode::serialize_into(&mut stream1, &message)
.expect("message serialization failed");
} }
}); });
// Remote comms receive loop. // Remote comms receive loop.
debug!("Starting remote RX loop for node {}", node_index); debug!("Starting remote RX loop for node {}", node_index);
loop { loop {
match self.io.recv() { match bincode::deserialize_from(&mut self.stream) {
Ok(message) => { Ok(message) => {
tx.send(SourcedMessage { tx.send(SourcedMessage {
source: node_index, source: node_index,
message: message.into(), message,
}).unwrap(); }).unwrap();
} }
Err(err) => match err.kind() { Err(err) => {
ErrorKind::Protobuf(pe) => { if let bincode::ErrorKind::Io(ref io_err) = *err {
warn!("Node {} - Protobuf error {}", node_index, pe) if io_err.kind() == io::ErrorKind::UnexpectedEof {
info!("Node {} disconnected.", node_index);
break;
}
}
panic!("Node {} - Deserialization error {:?}", node_index, err);
} }
_ => warn!("Node {} - Critical error {:?}", node_index, err),
},
} }
} }
}); });

View File

@ -1,24 +1,17 @@
//! Connection data and initiation routines. //! Connection data and initiation routines.
use std::collections::{BTreeMap, HashSet}; use std::collections::{BTreeMap, HashSet};
use std::io::BufReader;
use std::net::{SocketAddr, TcpListener, TcpStream}; use std::net::{SocketAddr, TcpListener, TcpStream};
#[derive(Debug)] #[derive(Debug)]
pub struct Connection { pub struct Connection {
pub stream: TcpStream, pub stream: TcpStream,
pub reader: BufReader<TcpStream>,
pub node_str: String, pub node_str: String,
} }
impl Connection { impl Connection {
pub fn new(stream: TcpStream, node_str: String) -> Self { pub fn new(stream: TcpStream, node_str: String) -> Self {
Connection { Connection { stream, node_str }
// Create a read buffer of 1K bytes.
reader: BufReader::with_capacity(1024, stream.try_clone().unwrap()),
stream,
node_str,
}
} }
} }

View File

@ -46,7 +46,6 @@ use hbbft::broadcast::{Broadcast, BroadcastMessage};
use hbbft::crypto::poly::Poly; use hbbft::crypto::poly::Poly;
use hbbft::crypto::{SecretKey, SecretKeySet}; use hbbft::crypto::{SecretKey, SecretKeySet};
use hbbft::messaging::{DistAlgorithm, NetworkInfo, SourcedMessage}; use hbbft::messaging::{DistAlgorithm, NetworkInfo, SourcedMessage};
use hbbft::proto::message::BroadcastProto;
use network::commst; use network::commst;
use network::connection; use network::connection;
use network::messaging::Messaging; use network::messaging::Messaging;
@ -182,7 +181,7 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
let rx_to_comms = &rxs_to_comms[node_index]; let rx_to_comms = &rxs_to_comms[node_index];
scope.spawn(move || { scope.spawn(move || {
match commst::CommsTask::<BroadcastProto, BroadcastMessage>::new( match commst::CommsTask::<BroadcastMessage>::new(
tx_from_comms, tx_from_comms,
rx_to_comms, rx_to_comms,
// FIXME: handle error // FIXME: handle error

View File

@ -2,7 +2,7 @@
export RUST_LOG=hbbft=debug,consensus_node=debug export RUST_LOG=hbbft=debug,consensus_node=debug
cargo build --features=serialization-protobuf --example consensus-node cargo build --example consensus-node
target/debug/examples/consensus-node --bind-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 --value=Foo & target/debug/examples/consensus-node --bind-address=127.0.0.1:5000 --remote-address=127.0.0.1:5001 --remote-address=127.0.0.1:5002 --remote-address=127.0.0.1:5003 --remote-address=127.0.0.1:5004 --value=Foo &
sleep 1 sleep 1

View File

@ -5,8 +5,6 @@
pub mod error; pub mod error;
mod into_fr; mod into_fr;
pub mod poly; pub mod poly;
#[cfg(feature = "serialization-protobuf")]
pub mod protobuf_impl;
pub mod serde_impl; pub mod serde_impl;
use std::fmt; use std::fmt;

View File

@ -97,9 +97,6 @@
//! `hbbft` supports [serde](https://serde.rs/): All message types implement the `Serialize` and //! `hbbft` supports [serde](https://serde.rs/): All message types implement the `Serialize` and
//! `Deserialize` traits so they can be easily serialized or included as part of other serializable //! `Deserialize` traits so they can be easily serialized or included as part of other serializable
//! types. //! types.
//!
//! If `serialization-protobuf` is enabled, the message types support serialization with [Google
//! protocol buffers](https://developers.google.com/protocol-buffers/docs/overview).
// TODO: Remove this once https://github.com/rust-lang-nursery/error-chain/issues/245 is resolved. // TODO: Remove this once https://github.com/rust-lang-nursery/error-chain/issues/245 is resolved.
#![allow(renamed_and_removed_lints)] #![allow(renamed_and_removed_lints)]
@ -114,8 +111,6 @@ extern crate log;
extern crate itertools; extern crate itertools;
extern crate merkle; extern crate merkle;
extern crate pairing; extern crate pairing;
#[cfg(feature = "serialization-protobuf")]
extern crate protobuf;
extern crate rand; extern crate rand;
#[macro_use] #[macro_use]
extern crate rand_derive; extern crate rand_derive;
@ -135,10 +130,6 @@ pub mod fault_log;
mod fmt; mod fmt;
pub mod honey_badger; pub mod honey_badger;
pub mod messaging; pub mod messaging;
#[cfg(feature = "serialization-protobuf")]
pub mod proto;
#[cfg(feature = "serialization-protobuf")]
pub mod proto_io;
pub mod queueing_honey_badger; pub mod queueing_honey_badger;
pub mod sync_key_gen; pub mod sync_key_gen;
pub mod transaction_queue; pub mod transaction_queue;

View File

@ -1,234 +0,0 @@
//! Construction of messages from protobuf buffers.
pub mod message;
use merkle::proof::{Lemma, Positioned, Proof};
use ring::digest::Algorithm;
use agreement::bool_set;
use agreement::{AgreementContent, AgreementMessage};
use broadcast::BroadcastMessage;
use common_coin::CommonCoinMessage;
use crypto::{Signature, SignatureShare};
use proto::message::*;
impl From<message::BroadcastProto> for BroadcastMessage {
fn from(proto: message::BroadcastProto) -> BroadcastMessage {
BroadcastMessage::from_proto(proto, &::ring::digest::SHA256)
.expect("invalid broadcast message")
}
}
impl From<BroadcastMessage> for message::BroadcastProto {
fn from(msg: BroadcastMessage) -> message::BroadcastProto {
msg.into_proto()
}
}
impl BroadcastMessage {
pub fn into_proto(self) -> BroadcastProto {
let mut b = BroadcastProto::new();
match self {
BroadcastMessage::Value(p) => {
let mut v = ValueProto::new();
v.set_proof(ProofProto::from_proof(p));
b.set_value(v);
}
BroadcastMessage::Echo(p) => {
let mut e = EchoProto::new();
e.set_proof(ProofProto::from_proof(p));
b.set_echo(e);
}
BroadcastMessage::Ready(h) => {
let mut r = ReadyProto::new();
r.set_root_hash(h);
b.set_ready(r);
}
}
b
}
pub fn from_proto(mut mp: BroadcastProto, algorithm: &'static Algorithm) -> Option<Self> {
if mp.has_value() {
mp.take_value()
.take_proof()
.into_proof(algorithm)
.map(BroadcastMessage::Value)
} else if mp.has_echo() {
mp.take_echo()
.take_proof()
.into_proof(algorithm)
.map(BroadcastMessage::Echo)
} else if mp.has_ready() {
let h = mp.take_ready().take_root_hash();
Some(BroadcastMessage::Ready(h))
} else {
None
}
}
}
impl AgreementMessage {
pub fn into_proto(self) -> message::AgreementProto {
let mut p = message::AgreementProto::new();
p.set_epoch(self.epoch);
match self.content {
AgreementContent::BVal(b) => {
p.set_bval(b);
}
AgreementContent::Aux(b) => {
p.set_aux(b);
}
AgreementContent::Conf(v) => {
let bool_set = match v {
bool_set::NONE => 0,
bool_set::FALSE => 1,
bool_set::TRUE => 2,
_ => 3,
};
p.set_conf(bool_set);
}
AgreementContent::Term(b) => {
p.set_term(b);
}
AgreementContent::Coin(ccm) => {
let v = ccm.to_sig().0.to_vec();
p.set_coin(v);
}
}
p
}
// TODO: Re-enable lint once implemented.
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
pub fn from_proto(mp: message::AgreementProto) -> Option<Self> {
let epoch = mp.get_epoch();
if mp.has_bval() {
Some(AgreementContent::BVal(mp.get_bval()).with_epoch(epoch))
} else if mp.has_aux() {
Some(AgreementContent::Aux(mp.get_aux()).with_epoch(epoch))
} else if mp.has_conf() {
match mp.get_conf() {
0 => Some(bool_set::NONE),
1 => Some(bool_set::FALSE),
2 => Some(bool_set::TRUE),
3 => Some(bool_set::BOTH),
_ => None,
}.map(|bool_set| AgreementContent::Conf(bool_set).with_epoch(epoch))
} else if mp.has_term() {
Some(AgreementContent::Term(mp.get_term()).with_epoch(epoch))
} else if mp.has_coin() {
Signature::from_bytes(mp.get_coin())
.map(SignatureShare)
.map(|sig| {
AgreementContent::Coin(Box::new(CommonCoinMessage::new(sig))).with_epoch(epoch)
})
} else {
None
}
}
}
/// Serialisation of `Proof` defined against its protobuf interface to work
/// around the restriction of not being allowed to extend the implementation of
/// `Proof` outside its crate.
impl ProofProto {
pub fn from_proof<T: AsRef<[u8]>>(proof: Proof<T>) -> Self {
let mut proto = Self::new();
match proof {
Proof {
root_hash,
lemma,
value,
..
// algorithm, // TODO: use
} => {
proto.set_root_hash(root_hash);
proto.set_lemma(LemmaProto::from_lemma(lemma));
proto.set_value(value.as_ref().to_vec());
}
}
proto
}
pub fn into_proof<T: From<Vec<u8>>>(
mut self,
algorithm: &'static Algorithm,
) -> Option<Proof<T>> {
if !self.has_lemma() {
return None;
}
self.take_lemma().into_lemma().map(|lemma| {
Proof::new(
algorithm,
self.take_root_hash(),
lemma,
self.take_value().into(),
)
})
}
}
impl LemmaProto {
pub fn from_lemma(lemma: Lemma) -> Self {
let mut proto = Self::new();
match lemma {
Lemma {
node_hash,
sibling_hash,
sub_lemma,
} => {
proto.set_node_hash(node_hash);
if let Some(sub_proto) = sub_lemma.map(|l| Self::from_lemma(*l)) {
proto.set_sub_lemma(sub_proto);
}
match sibling_hash {
Some(Positioned::Left(hash)) => proto.set_left_sibling_hash(hash),
Some(Positioned::Right(hash)) => proto.set_right_sibling_hash(hash),
None => {}
}
}
}
proto
}
pub fn into_lemma(mut self) -> Option<Lemma> {
let node_hash = self.take_node_hash();
let sibling_hash = if self.has_left_sibling_hash() {
Some(Positioned::Left(self.take_left_sibling_hash()))
} else if self.has_right_sibling_hash() {
Some(Positioned::Right(self.take_right_sibling_hash()))
} else {
None
};
if self.has_sub_lemma() {
// If a `sub_lemma` is present is the Protobuf,
// then we expect it to unserialize to a valid `Lemma`,
// otherwise we return `None`
self.take_sub_lemma().into_lemma().map(|sub_lemma| Lemma {
node_hash,
sibling_hash,
sub_lemma: Some(Box::new(sub_lemma)),
})
} else {
// We might very well not have a sub_lemma,
// in which case we just set it to `None`,
// but still return a potentially valid `Lemma`.
Some(Lemma {
node_hash,
sibling_hash,
sub_lemma: None,
})
}
}
}

View File

@ -1,202 +0,0 @@
//! Protobuf message IO task structure.
use failure::{Backtrace, Context, Fail};
use protobuf::{self, Message, ProtobufError};
use std::io::{self, Read, Write};
use std::marker::PhantomData;
use std::net::TcpStream;
use std::{
cmp,
fmt::{self, Display},
};
/// A magic key to put right before each message. An atavism of primitive serial
/// protocols.
///
/// TODO: Replace it with a proper handshake at connection initiation.
const FRAME_START: u32 = 0x2C0F_FEE5;
/// IO/Messaging error variants.
#[derive(Debug, Fail)]
pub enum ErrorKind {
#[fail(display = "Io error: {}", _0)]
Io(#[cause] io::Error),
#[fail(display = "Protobuf error: {}", _0)]
Protobuf(#[cause] ProtobufError),
#[fail(display = "Decode error")]
Decode,
#[fail(display = "Encode error")]
Encode,
#[fail(display = "Frame start mismatch error")]
FrameStartMismatch,
}
/// An IO/Messaging error.
#[derive(Debug)]
pub struct Error {
inner: Context<ErrorKind>,
}
impl Fail for Error {
fn cause(&self) -> Option<&Fail> {
self.inner.cause()
}
fn backtrace(&self) -> Option<&Backtrace> {
self.inner.backtrace()
}
}
impl Error {
pub fn kind(&self) -> &ErrorKind {
self.inner.get_context()
}
}
impl From<ErrorKind> for Error {
fn from(kind: ErrorKind) -> Error {
Error {
inner: Context::new(kind),
}
}
}
impl From<Context<ErrorKind>> for Error {
fn from(inner: Context<ErrorKind>) -> Error {
Error { inner }
}
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Error {
ErrorKind::Io(err).into()
}
}
impl From<ProtobufError> for Error {
fn from(err: ProtobufError) -> Error {
ErrorKind::Protobuf(err).into()
}
}
impl Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Display::fmt(&self.inner, f)
}
}
pub type ProtoIoResult<T> = ::std::result::Result<T, Error>;
fn encode_u32_to_be(value: u32, buffer: &mut [u8]) -> ProtoIoResult<()> {
if buffer.len() < 4 {
return Err(ErrorKind::Encode.into());
}
let value = value.to_le();
buffer[0] = ((value & 0xFF00_0000) >> 24) as u8;
buffer[1] = ((value & 0x00FF_0000) >> 16) as u8;
buffer[2] = ((value & 0x0000_FF00) >> 8) as u8;
buffer[3] = (value & 0x0000_00FF) as u8;
Ok(())
}
fn decode_u32_from_be(buffer: &[u8]) -> ProtoIoResult<u32> {
if buffer.len() < 4 {
return Err(ErrorKind::Decode.into());
}
let mut result = u32::from(buffer[0]);
result <<= 8;
result += u32::from(buffer[1]);
result <<= 8;
result += u32::from(buffer[2]);
result <<= 8;
result += u32::from(buffer[3]);
Ok(result)
}
pub struct ProtoIo<S: Read + Write, M> {
stream: S,
buffer: [u8; 1024 * 4],
_phantom: PhantomData<M>,
}
impl<M> ProtoIo<TcpStream, M> {
pub fn try_clone(&self) -> Result<Self, ::std::io::Error> {
Ok(ProtoIo {
stream: self.stream.try_clone()?,
buffer: [0; 1024 * 4],
_phantom: PhantomData,
})
}
}
/// A message handling task.
impl<S: Read + Write, M: Message> ProtoIo<S, M>
//where T: Clone + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>
{
pub fn from_stream(stream: S) -> Self {
ProtoIo {
stream,
buffer: [0; 1024 * 4],
_phantom: PhantomData,
}
}
pub fn recv(&mut self) -> ProtoIoResult<M> {
self.stream.read_exact(&mut self.buffer[0..4])?;
let frame_start = decode_u32_from_be(&self.buffer[0..4])?;
if frame_start != FRAME_START {
return Err(ErrorKind::FrameStartMismatch.into());
};
self.stream.read_exact(&mut self.buffer[0..4])?;
let size = decode_u32_from_be(&self.buffer[0..4])? as usize;
let mut message_v: Vec<u8> = Vec::new();
message_v.reserve(size);
while message_v.len() < size {
let num_to_read = cmp::min(self.buffer.len(), size - message_v.len());
let (slice, _) = self.buffer.split_at_mut(num_to_read);
self.stream.read_exact(slice)?;
message_v.extend_from_slice(slice);
}
protobuf::parse_from_bytes(&message_v).map_err(|e| e.into())
}
pub fn send(&mut self, message: &M) -> ProtoIoResult<()> {
let mut buffer: [u8; 4] = [0; 4];
// Wrap stream
let mut stream = protobuf::CodedOutputStream::new(&mut self.stream);
// Write magic number
encode_u32_to_be(FRAME_START, &mut buffer[0..4])?;
stream.write_raw_bytes(&buffer)?;
// Write message size
encode_u32_to_be(message.compute_size(), &mut buffer[0..4])?;
stream.write_raw_bytes(&buffer)?;
// Write message
message.write_to(&mut stream)?;
// Flush
stream.flush()?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use broadcast::BroadcastMessage;
use proto::message::BroadcastProto;
use proto_io::*;
use std::io::Cursor;
#[test]
fn encode_decode_message() {
let msg0 = BroadcastMessage::Ready(b"Test 0".to_vec());
let msg1 = BroadcastMessage::Ready(b"Test 1".to_vec());
let mut pio = ProtoIo::<_, BroadcastProto>::from_stream(Cursor::new(Vec::new()));
pio.send(&msg0.clone().into()).expect("send msg0");
pio.send(&msg1.clone().into()).expect("send msg1");
println!("{:?}", pio.stream.get_ref());
pio.stream.set_position(0);
assert_eq!(msg0, pio.recv().expect("recv msg0").into());
assert_eq!(msg1, pio.recv().expect("recv msg1").into());
}
}