From ffc300f85a4d9f40aa959438b61dcff328986db5 Mon Sep 17 00:00:00 2001 From: debris Date: Mon, 10 Oct 2016 18:08:22 +0200 Subject: [PATCH] message cleanup in progress --- message/src/common/ip.rs | 2 +- message/src/common/port.rs | 2 +- message/src/lib.rs | 3 +- message/src/message/message.rs | 65 ++++------- message/src/message/message_header.rs | 12 ++ message/src/message/mod.rs | 2 - message/src/message/payload.rs | 153 -------------------------- message/src/serialization/mod.rs | 17 +-- message/src/serialization/reader.rs | 4 + message/src/serialization/stream.rs | 27 +++-- message/src/types/addr.rs | 90 ++++++++++++--- message/src/types/blocktxn.rs | 22 ++-- message/src/types/compactblock.rs | 22 ++-- message/src/types/feefilter.rs | 22 ++-- message/src/types/filteradd.rs | 24 ++-- message/src/types/filterclear.rs | 23 ++++ message/src/types/filterload.rs | 30 +++-- message/src/types/getaddr.rs | 23 ++++ message/src/types/getblocks.rs | 30 +++-- message/src/types/getblocktxn.rs | 22 ++-- message/src/types/getdata.rs | 31 ++++++ message/src/types/getheaders.rs | 39 +++++++ message/src/types/headers.rs | 23 ++-- message/src/types/inv.rs | 24 ++-- message/src/types/mempool.rs | 23 ++++ message/src/types/merkle_block.rs | 30 +++-- message/src/types/mod.rs | 22 +++- message/src/types/notfound.rs | 31 ++++++ message/src/types/ping.rs | 33 +++--- message/src/types/pong.rs | 33 +++--- message/src/types/reject.rs | 41 ++++--- message/src/types/sendcompact.rs | 26 +++-- message/src/types/sendheaders.rs | 23 ++++ message/src/types/verack.rs | 22 ++-- message/src/types/version.rs | 89 ++++++++------- p2p/src/io/handshake.rs | 30 +++-- p2p/src/io/mod.rs | 8 +- p2p/src/io/read_specific_message.rs | 5 +- p2p/src/io/read_specific_payload.rs | 5 +- p2p/src/io/write_message.rs | 20 ++-- p2p/src/lib.rs | 10 +- primitives/src/bytes.rs | 47 +++++++- serialization/src/reader.rs | 17 ++- serialization/src/stream.rs | 8 ++ 44 files changed, 740 insertions(+), 495 deletions(-) delete mode 100644 message/src/message/payload.rs create mode 100644 message/src/types/filterclear.rs create mode 100644 message/src/types/getaddr.rs create mode 100644 message/src/types/getdata.rs create mode 100644 message/src/types/getheaders.rs create mode 100644 message/src/types/mempool.rs create mode 100644 message/src/types/notfound.rs create mode 100644 message/src/types/sendheaders.rs diff --git a/message/src/common/ip.rs b/message/src/common/ip.rs index 5bde7779..fc0bdf7f 100644 --- a/message/src/common/ip.rs +++ b/message/src/common/ip.rs @@ -2,7 +2,7 @@ use std::{str, net}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError}; -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone, Copy)] pub struct IpAddress(net::IpAddr); impl From for IpAddress { diff --git a/message/src/common/port.rs b/message/src/common/port.rs index 2d86e504..fa794fa4 100644 --- a/message/src/common/port.rs +++ b/message/src/common/port.rs @@ -1,7 +1,7 @@ use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError}; -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone, Copy)] pub struct Port(u16); impl From for Port { diff --git a/message/src/lib.rs b/message/src/lib.rs index f039fb28..66317652 100644 --- a/message/src/lib.rs +++ b/message/src/lib.rs @@ -12,6 +12,7 @@ mod error; pub use primitives::{hash, bytes}; -pub use message::{Message, MessageHeader, Payload}; +pub use message::{Message, MessageHeader}; pub use error::Error; +pub use serialization::PayloadType; pub type MessageResult = Result; diff --git a/message/src/message/message.rs b/message/src/message/message.rs index 9afa1302..860bcd91 100644 --- a/message/src/message/message.rs +++ b/message/src/message/message.rs @@ -1,52 +1,31 @@ -use crypto::checksum; -use ser::{Serializable, Stream, serialize}; +use ser::Stream; +use bytes::TaggedBytes; use common::Magic; -use message::{MessageHeader, Payload}; +use serialization::serialize_payload; +use {PayloadType, MessageResult, MessageHeader}; -#[derive(Debug, PartialEq)] -pub struct Message { - pub header: MessageHeader, - pub payload: Payload, +pub struct Message { + bytes: TaggedBytes, } -impl Message { - pub fn new(magic: Magic, payload: Payload) -> Message { - let serialized = serialize(&payload); - Message { - header: MessageHeader { - magic: magic, - command: payload.command(), - len: serialized.len() as u32, - checksum: checksum(&serialized), - }, - payload: payload, - } +impl Message where T: PayloadType { + pub fn new(magic: Magic, version: u32, payload: &T) -> MessageResult { + let serialized = try!(serialize_payload(payload, version)); + let header = MessageHeader::for_data(magic, T::command().into(), &serialized); + let mut stream = Stream::default(); + stream.append(&header); + stream.append_slice(&serialized); + + let message = Message { + bytes: TaggedBytes::new(stream.out()), + }; + + Ok(message) } } -impl Serializable for Message { - fn serialize(&self, stream: &mut Stream) { - stream - .append(&self.header) - .append(&self.payload); - } -} - -#[cfg(test)] -mod tests { - use bytes::Bytes; - use ser::serialize; - use common::Magic; - use types::Version; - use super::Message; - use Payload; - - #[test] - fn test_message_serialization() { - let expected: Bytes = "f9beb4d976657273696f6e000000000064000000358d493262ea0000010000000000000011b2d05000000000010000000000000000000000000000000000ffff000000000000000000000000000000000000000000000000ffff0000000000003b2eb35d8ce617650f2f5361746f7368693a302e372e322fc03e0300".into(); - let version: Version = "62ea0000010000000000000011b2d05000000000010000000000000000000000000000000000ffff000000000000000000000000000000000000000000000000ffff0000000000003b2eb35d8ce617650f2f5361746f7368693a302e372e322fc03e0300".into(); - let magic = Magic::Mainnet; - let message = Message::new(magic, Payload::Version(version)); - assert_eq!(serialize(&message), expected); +impl AsRef<[u8]> for Message { + fn as_ref(&self) -> &[u8] { + self.bytes.as_ref() } } diff --git a/message/src/message/message_header.rs b/message/src/message/message_header.rs index e8b3e7f3..b1725d16 100644 --- a/message/src/message/message_header.rs +++ b/message/src/message/message_header.rs @@ -1,5 +1,6 @@ use hash::H32; use ser::{Serializable, Stream, Reader}; +use crypto::checksum; use common::{Command, Magic}; use Error; @@ -11,6 +12,17 @@ pub struct MessageHeader { pub checksum: H32, } +impl MessageHeader { + pub fn for_data(magic: Magic, command: Command, data: &[u8]) -> Self { + MessageHeader { + magic: magic, + command: command, + len: data.len() as u32, + checksum: checksum(data), + } + } +} + impl MessageHeader { pub fn deserialize(data: &[u8], expected: Magic) -> Result { if data.len() != 24 { diff --git a/message/src/message/mod.rs b/message/src/message/mod.rs index 01676b08..4bf62b8f 100644 --- a/message/src/message/mod.rs +++ b/message/src/message/mod.rs @@ -1,7 +1,5 @@ mod message; mod message_header; -mod payload; pub use self::message::Message; pub use self::message_header::MessageHeader; -pub use self::payload::Payload; diff --git a/message/src/message/payload.rs b/message/src/message/payload.rs deleted file mode 100644 index 5b02ce23..00000000 --- a/message/src/message/payload.rs +++ /dev/null @@ -1,153 +0,0 @@ -use hash::H32; -use ser::{Serializable, Stream, deserialize}; -use chain::{Transaction, Block}; -use crypto::checksum; -use common::Command; -use types::{ - Version, Addr, AddrBelow31402, Inv, - GetData, NotFound, GetBlocks, GetHeaders, Headers, - Ping, Pong, Reject, FilterLoad, FilterAdd, FeeFilter, - MerkleBlock, SendCompact, CompactBlock, GetBlockTxn, BlockTxn, -}; -use Error; - -#[derive(Debug, PartialEq)] -pub enum Payload { - Version(Version), - Verack, - Addr(Addr), - AddrBelow31402(AddrBelow31402), - Inv(Inv), - GetData(GetData), - NotFound(NotFound), - GetBlocks(GetBlocks), - GetHeaders(GetHeaders), - Tx(Transaction), - Block(Block), - Headers(Headers), - GetAddr, - MemPool, - Ping(Ping), - Pong(Pong), - Reject(Reject), - FilterLoad(FilterLoad), - FilterAdd(FilterAdd), - FilterClear, - MerkleBlock(MerkleBlock), - SendHeaders, - FeeFilter(FeeFilter), - SendCompact(SendCompact), - CompactBlock(CompactBlock), - GetBlockTxn(GetBlockTxn), - BlockTxn(BlockTxn), -} - -impl Payload { - pub fn command(&self) -> Command { - let cmd = match *self { - Payload::Version(_) => "version", - Payload::Verack => "verack", - Payload::Addr(_) | Payload::AddrBelow31402(_) => "addr", - Payload::Inv(_) => "inv", - Payload::GetData(_) => "getdata", - Payload::NotFound(_) => "notfound", - Payload::GetBlocks(_) => "getblocks", - Payload::GetHeaders(_) => "getheaders", - Payload::Tx(_) => "tx", - Payload::Block(_) => "block", - Payload::Headers(_) => "headers", - Payload::GetAddr => "getaddr", - Payload::MemPool=> "mempool", - Payload::Ping(_) => "ping", - Payload::Pong(_) => "pong", - Payload::Reject(_) => "reject", - Payload::FilterLoad(_) => "filterload", - Payload::FilterAdd(_) => "filteradd", - Payload::FilterClear => "filterclear", - Payload::MerkleBlock(_) => "merkleblock", - Payload::SendHeaders => "sendheaders", - Payload::FeeFilter(_) => "feefilter", - Payload::SendCompact(_) => "sendcmpct", - Payload::CompactBlock(_) => "compactblock", - Payload::GetBlockTxn(_) => "getblocktxn", - Payload::BlockTxn(_) => "blocktxn", - }; - - cmd.into() - } - - pub fn deserialize(data: &[u8], check: &H32, version: u32, command: &Command) -> Result { - if &checksum(data) != check { - return Err(Error::InvalidChecksum); - } - - let result = match &command.to_string() as &str { - "version" => deserialize(data).map(Payload::Version), - "verack" if data.is_empty() => Ok(Payload::Verack), - "addr" => match version >= 31402 { - true => deserialize(data).map(Payload::Addr), - false => deserialize(data).map(Payload::AddrBelow31402), - }, - "inv" => deserialize(data).map(Payload::Inv), - "getdata" => deserialize(data).map(Payload::GetData), - "notfound" => deserialize(data).map(Payload::NotFound), - "getblocks" => deserialize(data).map(Payload::GetBlocks), - "getheaders" => deserialize(data).map(Payload::GetHeaders), - "tx" => deserialize(data).map(Payload::Tx), - "block" => deserialize(data).map(Payload::Block), - "headers" => deserialize(data).map(Payload::Headers), - "getaddr" if data.is_empty() => Ok(Payload::GetAddr), - "mempool" if data.is_empty() => Ok(Payload::MemPool), - "ping" => deserialize(data).map(Payload::Ping), - "pong" => deserialize(data).map(Payload::Pong), - "reject" => deserialize(data).map(Payload::Reject), - "filterload" => deserialize(data).map(Payload::FilterLoad), - "filteradd" => deserialize(data).map(Payload::FilterAdd), - "filterclear" if data.is_empty() => Ok(Payload::FilterClear), - "merkleblock" => deserialize(data).map(Payload::MerkleBlock), - "sendheaders" if data.is_empty() => Ok(Payload::SendHeaders), - "feefilter" => deserialize(data).map(Payload::FeeFilter), - "sendcmpct" => deserialize(data).map(Payload::SendCompact), - "cmpctblock" => deserialize(data).map(Payload::CompactBlock), - "getblocktxn" => deserialize(data).map(Payload::GetBlockTxn), - "blocktxn" => deserialize(data).map(Payload::BlockTxn), - _ => return Err(Error::InvalidCommand), - }; - - result.map_err(Into::into) - } -} - -impl Serializable for Payload { - fn serialize(&self, stream: &mut Stream) { - match *self { - Payload::Version(ref p) => { stream.append(p); }, - Payload::Verack => {}, - Payload::Addr(ref p) => { stream.append(p); }, - Payload::AddrBelow31402(ref p) => { stream.append(p); }, - Payload::Inv(ref p) => { stream.append(p); }, - Payload::GetData(ref p) => { stream.append(p); }, - Payload::NotFound(ref p) => { stream.append(p); }, - Payload::GetBlocks(ref p) => { stream.append(p); }, - Payload::GetHeaders(ref p) => { stream.append(p); }, - Payload::Tx(ref p) => { stream.append(p); }, - Payload::Block(ref p) => { stream.append(p); }, - Payload::Headers(ref p) => { stream.append(p); }, - Payload::GetAddr => {}, - Payload::MemPool => {}, - Payload::Ping(ref p) => { stream.append(p); }, - Payload::Pong(ref p) => { stream.append(p); }, - Payload::Reject(ref p) => { stream.append(p); }, - Payload::FilterLoad(ref p) => { stream.append(p); }, - Payload::FilterAdd(ref p) => { stream.append(p); }, - Payload::FilterClear => {}, - Payload::MerkleBlock(ref p) => { stream.append(p); }, - Payload::SendHeaders => {}, - Payload::FeeFilter(ref p) => { stream.append(p); }, - Payload::SendCompact(ref p) => { stream.append(p); }, - Payload::CompactBlock(ref p) => { stream.append(p); }, - Payload::GetBlockTxn(ref p) => { stream.append(p); }, - Payload::BlockTxn(ref p) => { stream.append(p); }, - } - } -} diff --git a/message/src/serialization/mod.rs b/message/src/serialization/mod.rs index df8c9d57..21515e06 100644 --- a/message/src/serialization/mod.rs +++ b/message/src/serialization/mod.rs @@ -1,19 +1,14 @@ mod stream; mod reader; -use ser::{Reader, Deserializable}; -use {MessageResult, Error}; -pub use self::stream::PayloadStream; +pub use self::stream::{PayloadStream, serialize_payload}; pub use self::reader::{PayloadReader, deserialize_payload}; +use ser::{Reader, Stream}; +use MessageResult; -pub trait PayloadType: Deserializable { +pub trait PayloadType { fn version() -> u32; fn command() -> &'static str; - fn deserialize_payload(reader: &mut Reader, version: u32) -> MessageResult where Self: Sized { - if version < Self::version() { - return Err(Error::InvalidVersion); - } - - Self::deserialize(reader).map_err(Into::into) - } + fn deserialize_payload(reader: &mut Reader, version: u32) -> MessageResult where Self: Sized; + fn serialize_payload(&self, stream: &mut Stream, version: u32) -> MessageResult<()>; } diff --git a/message/src/serialization/reader.rs b/message/src/serialization/reader.rs index f0fa63e8..c25c8c4e 100644 --- a/message/src/serialization/reader.rs +++ b/message/src/serialization/reader.rs @@ -26,6 +26,10 @@ impl<'a> PayloadReader<'a> { } pub fn read(&mut self) -> Result where T: PayloadType { + if T::version() > self.version { + return Err(Error::InvalidVersion); + } + T::deserialize_payload(&mut self.reader, self.version) } diff --git a/message/src/serialization/stream.rs b/message/src/serialization/stream.rs index fabd6315..0c88d378 100644 --- a/message/src/serialization/stream.rs +++ b/message/src/serialization/stream.rs @@ -1,6 +1,12 @@ -use ser::{Stream, Serializable}; -use serialization::PayloadType; -use Error; +use bytes::Bytes; +use ser::Stream; +use {PayloadType, Error, MessageResult}; + +pub fn serialize_payload(t: &T, version: u32) -> MessageResult where T: PayloadType { + let mut stream = PayloadStream::new(version); + try!(stream.append(t)); + Ok(stream.out()) +} pub struct PayloadStream { stream: Stream, @@ -15,12 +21,19 @@ impl PayloadStream { } } - pub fn append(&mut self, t: &T) -> Result<(), Error> where T: PayloadType + Serializable { - if self.version < T::version() { + pub fn append(&mut self, t: &T) -> MessageResult<()> where T: PayloadType { + if T::version() > self.version { return Err(Error::InvalidVersion); } - t.serialize(&mut self.stream); - Ok(()) + t.serialize_payload(&mut self.stream, self.version) + } + + pub fn raw_stream(&mut self) -> &mut Stream { + &mut self.stream + } + + pub fn out(self) -> Bytes { + self.stream.out() } } diff --git a/message/src/types/addr.rs b/message/src/types/addr.rs index db95433b..8af5829a 100644 --- a/message/src/types/addr.rs +++ b/message/src/types/addr.rs @@ -2,8 +2,49 @@ use ser::{ Serializable, Stream, Deserializable, Reader, Error as ReaderError, }; -use serialization::PayloadType; use common::NetAddress; +use {PayloadType, MessageResult}; + +#[derive(Debug, PartialEq)] +pub enum Addr { + V0(V0), + V31402(V31402), +} + +impl PayloadType for Addr { + fn version() -> u32 { + 0 + } + + fn command() -> &'static str { + "addr" + } + + fn deserialize_payload(reader: &mut Reader, version: u32) -> MessageResult where Self: Sized { + let result = if version < 31402 { + reader.read().map(Addr::V0) + } else { + reader.read().map(Addr::V31402) + }; + + result.map_err(Into::into) + } + + fn serialize_payload(&self, stream: &mut Stream, version: u32) -> MessageResult<()> { + match *self { + Addr::V0(ref addr) => addr.serialize(stream), + Addr::V31402(ref addr) => { + if version < 31402 { + let view = V31402AsV0::new(addr); + view.serialize(stream); + } else { + addr.serialize(stream); + } + } + } + Ok(()) + } +} #[derive(Debug, PartialEq)] pub struct AddressEntry { @@ -31,21 +72,20 @@ impl Deserializable for AddressEntry { } #[derive(Debug, PartialEq)] -pub struct Addr { +pub struct V31402 { pub addresses: Vec, } -impl Serializable for Addr { +impl Serializable for V31402 { fn serialize(&self, stream: &mut Stream) { stream.append_list(&self.addresses); } } -impl Deserializable for Addr { +impl Deserializable for V31402 { fn deserialize(reader: &mut Reader) -> Result where Self: Sized { - // TODO: limit to 1000 - let result = Addr { - addresses: try!(reader.read_list()), + let result = V31402 { + addresses: try!(reader.read_list_max(1000)), }; Ok(result) @@ -53,37 +93,55 @@ impl Deserializable for Addr { } #[derive(Debug, PartialEq)] -pub struct AddrBelow31402 { +pub struct V0 { pub addresses: Vec, } -impl Serializable for AddrBelow31402 { +impl Serializable for V0 { fn serialize(&self, stream: &mut Stream) { stream.append_list(&self.addresses); } } -impl Deserializable for AddrBelow31402 { +impl Deserializable for V0 { fn deserialize(reader: &mut Reader) -> Result where Self: Sized { - // TODO: limit to 1000 - let result = AddrBelow31402 { - addresses: try!(reader.read_list()), + let result = V0 { + addresses: try!(reader.read_list_max(1000)), }; Ok(result) } } +struct V31402AsV0<'a> { + v: &'a V31402, +} + +impl<'a> V31402AsV0<'a> { + fn new(v: &'a V31402) -> Self { + V31402AsV0 { + v: v, + } + } +} + +impl<'a> Serializable for V31402AsV0<'a> { + fn serialize(&self, stream: &mut Stream) { + let vec_ref: Vec<&'a NetAddress> = self.v.addresses.iter().map(|x| &x.address).collect(); + stream.append_list_ref(&vec_ref); + } +} + #[cfg(test)] mod tests { use bytes::Bytes; use ser::{serialize, deserialize}; - use super::{Addr, AddressEntry}; + use super::{V31402, AddressEntry}; #[test] fn test_addr_serialize() { let expected: Bytes = "01e215104d010000000000000000000000000000000000ffff0a000001208d".into(); - let addr = Addr { + let addr = V31402 { addresses: vec![ AddressEntry { timestamp: 0x4d1015e2, @@ -98,7 +156,7 @@ mod tests { #[test] fn test_addr_deserialize() { let raw: Bytes = "01e215104d010000000000000000000000000000000000ffff0a000001208d".into(); - let expected = Addr { + let expected = V31402 { addresses: vec![ AddressEntry { timestamp: 0x4d1015e2, diff --git a/message/src/types/blocktxn.rs b/message/src/types/blocktxn.rs index 9479ec43..138a9db2 100644 --- a/message/src/types/blocktxn.rs +++ b/message/src/types/blocktxn.rs @@ -1,23 +1,31 @@ -use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError}; +use ser::{Stream, Reader}; use common::BlockTransactions; +use {MessageResult, PayloadType}; #[derive(Debug, PartialEq)] pub struct BlockTxn { request: BlockTransactions, } -impl Serializable for BlockTxn { - fn serialize(&self, stream: &mut Stream) { - stream.append(&self.request); +impl PayloadType for BlockTxn { + fn version() -> u32 { + 70014 } -} -impl Deserializable for BlockTxn { - fn deserialize(reader: &mut Reader) -> Result where Self: Sized { + fn command() -> &'static str { + "blocktxn" + } + + fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { let block = BlockTxn { request: try!(reader.read()), }; Ok(block) } + + fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { + stream.append(&self.request); + Ok(()) + } } diff --git a/message/src/types/compactblock.rs b/message/src/types/compactblock.rs index be2669a0..87bff7d1 100644 --- a/message/src/types/compactblock.rs +++ b/message/src/types/compactblock.rs @@ -1,23 +1,31 @@ -use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError}; +use ser::{Stream, Reader}; use common::BlockHeaderAndIDs; +use {PayloadType, MessageResult}; #[derive(Debug, PartialEq)] pub struct CompactBlock { header: BlockHeaderAndIDs, } -impl Serializable for CompactBlock { - fn serialize(&self, stream: &mut Stream) { - stream.append(&self.header); +impl PayloadType for CompactBlock { + fn version() -> u32 { + 70014 } -} -impl Deserializable for CompactBlock { - fn deserialize(reader: &mut Reader) -> Result where Self: Sized { + fn command() -> &'static str { + "cmpctblock" + } + + fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { let block = CompactBlock { header: try!(reader.read()), }; Ok(block) } + + fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { + stream.append(&self.header); + Ok(()) + } } diff --git a/message/src/types/feefilter.rs b/message/src/types/feefilter.rs index 47263fe5..76bcbe45 100644 --- a/message/src/types/feefilter.rs +++ b/message/src/types/feefilter.rs @@ -1,22 +1,30 @@ -use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError}; +use ser::{Stream, Reader}; +use {PayloadType, MessageResult}; #[derive(Debug, PartialEq)] pub struct FeeFilter { fee_rate: u64, } -impl Serializable for FeeFilter { - fn serialize(&self, stream: &mut Stream) { - stream.append(&self.fee_rate); +impl PayloadType for FeeFilter { + fn version() -> u32 { + 70013 } -} -impl Deserializable for FeeFilter { - fn deserialize(reader: &mut Reader) -> Result where Self: Sized { + fn command() -> &'static str { + "cmpctblock" + } + + fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { let fee_filter = FeeFilter { fee_rate: try!(reader.read()), }; Ok(fee_filter) } + + fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { + stream.append(&self.fee_rate); + Ok(()) + } } diff --git a/message/src/types/filteradd.rs b/message/src/types/filteradd.rs index c41573dc..79f03366 100644 --- a/message/src/types/filteradd.rs +++ b/message/src/types/filteradd.rs @@ -1,5 +1,6 @@ use bytes::Bytes; -use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError}; +use ser::{Stream, Reader}; +use {PayloadType, MessageResult}; #[derive(Debug, PartialEq)] pub struct FilterAdd { @@ -7,18 +8,25 @@ pub struct FilterAdd { data: Bytes, } -impl Serializable for FilterAdd { - fn serialize(&self, stream: &mut Stream) { - stream.append(&self.data); +impl PayloadType for FilterAdd { + fn version() -> u32 { + 70001 } -} -impl Deserializable for FilterAdd { - fn deserialize(reader: &mut Reader) -> Result where Self: Sized { - let filteradd= FilterAdd { + fn command() -> &'static str { + "filteradd" + } + + fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { + let filteradd = FilterAdd { data: try!(reader.read()), }; Ok(filteradd) } + + fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { + stream.append(&self.data); + Ok(()) + } } diff --git a/message/src/types/filterclear.rs b/message/src/types/filterclear.rs new file mode 100644 index 00000000..98157f73 --- /dev/null +++ b/message/src/types/filterclear.rs @@ -0,0 +1,23 @@ +use ser::{Stream, Reader}; +use {PayloadType, MessageResult}; + +#[derive(Debug, PartialEq)] +pub struct FilterClear; + +impl PayloadType for FilterClear { + fn version() -> u32 { + 70001 + } + + fn command() -> &'static str { + "filterclear" + } + + fn deserialize_payload(_reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { + Ok(FilterClear) + } + + fn serialize_payload(&self, _stream: &mut Stream, _version: u32) -> MessageResult<()> { + Ok(()) + } +} diff --git a/message/src/types/filterload.rs b/message/src/types/filterload.rs index b37201eb..e5b5a75d 100644 --- a/message/src/types/filterload.rs +++ b/message/src/types/filterload.rs @@ -1,5 +1,6 @@ use bytes::Bytes; -use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError}; +use ser::{Stream, Reader}; +use {PayloadType, MessageResult}; #[derive(Debug, PartialEq)] pub struct FilterLoad { @@ -10,18 +11,16 @@ pub struct FilterLoad { flags: u8, } -impl Serializable for FilterLoad { - fn serialize(&self, stream: &mut Stream) { - stream - .append(&self.filter) - .append(&self.hash_functions) - .append(&self.tweak) - .append(&self.flags); +impl PayloadType for FilterLoad { + fn version() -> u32 { + 70001 } -} -impl Deserializable for FilterLoad { - fn deserialize(reader: &mut Reader) -> Result where Self: Sized { + fn command() -> &'static str { + "filterload" + } + + fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { let filterload = FilterLoad { filter: try!(reader.read()), hash_functions: try!(reader.read()), @@ -31,4 +30,13 @@ impl Deserializable for FilterLoad { Ok(filterload) } + + fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { + stream + .append(&self.filter) + .append(&self.hash_functions) + .append(&self.tweak) + .append(&self.flags); + Ok(()) + } } diff --git a/message/src/types/getaddr.rs b/message/src/types/getaddr.rs new file mode 100644 index 00000000..e61065d5 --- /dev/null +++ b/message/src/types/getaddr.rs @@ -0,0 +1,23 @@ +use ser::{Stream, Reader}; +use {PayloadType, MessageResult}; + +#[derive(Debug, PartialEq)] +pub struct GetAddr; + +impl PayloadType for GetAddr { + fn version() -> u32 { + 60002 + } + + fn command() -> &'static str { + "getaddr" + } + + fn deserialize_payload(_reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { + Ok(GetAddr) + } + + fn serialize_payload(&self, _stream: &mut Stream, _version: u32) -> MessageResult<()> { + Ok(()) + } +} diff --git a/message/src/types/getblocks.rs b/message/src/types/getblocks.rs index d4ee90df..faaa7f57 100644 --- a/message/src/types/getblocks.rs +++ b/message/src/types/getblocks.rs @@ -1,5 +1,6 @@ use hash::H256; -use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError}; +use ser::{Stream, Reader}; +use {PayloadType, MessageResult}; #[derive(Debug, PartialEq)] pub struct GetBlocks { @@ -8,24 +9,31 @@ pub struct GetBlocks { hash_stop: H256, } -impl Serializable for GetBlocks { - fn serialize(&self, stream: &mut Stream) { - stream - .append(&self.version) - .append_list(&self.block_locator_hashes) - .append(&self.hash_stop); +impl PayloadType for GetBlocks { + fn version() -> u32 { + 0 } -} -impl Deserializable for GetBlocks { - fn deserialize(reader: &mut Reader) -> Result where Self: Sized { + fn command() -> &'static str { + "getblocks" + } + + fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { let get_blocks = GetBlocks { version: try!(reader.read()), - block_locator_hashes: try!(reader.read_list()), + block_locator_hashes: try!(reader.read_list_max(500)), hash_stop: try!(reader.read()), }; Ok(get_blocks) } + + fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { + stream + .append(&self.version) + .append_list(&self.block_locator_hashes) + .append(&self.hash_stop); + Ok(()) + } } diff --git a/message/src/types/getblocktxn.rs b/message/src/types/getblocktxn.rs index c03dafcb..33884e1d 100644 --- a/message/src/types/getblocktxn.rs +++ b/message/src/types/getblocktxn.rs @@ -1,23 +1,31 @@ -use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError}; +use ser::{Stream, Reader}; use common::BlockTransactionsRequest; +use {PayloadType, MessageResult}; #[derive(Debug, PartialEq)] pub struct GetBlockTxn { request: BlockTransactionsRequest, } -impl Serializable for GetBlockTxn { - fn serialize(&self, stream: &mut Stream) { - stream.append(&self.request); +impl PayloadType for GetBlockTxn { + fn version() -> u32 { + 70014 } -} -impl Deserializable for GetBlockTxn { - fn deserialize(reader: &mut Reader) -> Result where Self: Sized { + fn command() -> &'static str { + "getblocktxn" + } + + fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { let get_block = GetBlockTxn { request: try!(reader.read()), }; Ok(get_block) } + + fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { + stream.append(&self.request); + Ok(()) + } } diff --git a/message/src/types/getdata.rs b/message/src/types/getdata.rs new file mode 100644 index 00000000..e771fd51 --- /dev/null +++ b/message/src/types/getdata.rs @@ -0,0 +1,31 @@ +use ser::{Stream, Reader}; +use common::InventoryVector; +use {PayloadType, MessageResult}; + +#[derive(Debug, PartialEq)] +pub struct GetData { + pub inventory: Vec, +} + +impl PayloadType for GetData { + fn version() -> u32 { + 0 + } + + fn command() -> &'static str { + "getdata" + } + + fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { + let inv = GetData { + inventory: try!(reader.read_list_max(50_000)), + }; + + Ok(inv) + } + + fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { + stream.append_list(&self.inventory); + Ok(()) + } +} diff --git a/message/src/types/getheaders.rs b/message/src/types/getheaders.rs new file mode 100644 index 00000000..88e0a6b2 --- /dev/null +++ b/message/src/types/getheaders.rs @@ -0,0 +1,39 @@ +use hash::H256; +use ser::{Stream, Reader}; +use {PayloadType, MessageResult}; + +#[derive(Debug, PartialEq)] +pub struct GetHeaders { + version: u32, + block_locator_hashes: Vec, + hash_stop: H256, +} + +impl PayloadType for GetHeaders { + fn version() -> u32 { + 0 + } + + fn command() -> &'static str { + "getheaders" + } + + fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { + let get_blocks = GetHeaders { + version: try!(reader.read()), + block_locator_hashes: try!(reader.read_list_max(2000)), + hash_stop: try!(reader.read()), + }; + + Ok(get_blocks) + } + + fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { + stream + .append(&self.version) + .append_list(&self.block_locator_hashes) + .append(&self.hash_stop); + Ok(()) + } +} + diff --git a/message/src/types/headers.rs b/message/src/types/headers.rs index d3519970..bbd5d4bf 100644 --- a/message/src/types/headers.rs +++ b/message/src/types/headers.rs @@ -1,23 +1,32 @@ use chain::BlockHeader; -use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError}; +use ser::{Stream, Reader}; +use {PayloadType, MessageResult}; #[derive(Debug, PartialEq)] pub struct Headers { + // TODO: Block headers need to have txn_count field headers: Vec, } -impl Serializable for Headers { - fn serialize(&self, stream: &mut Stream) { - stream.append_list(&self.headers); +impl PayloadType for Headers { + fn version() -> u32 { + 0 } -} -impl Deserializable for Headers { - fn deserialize(reader: &mut Reader) -> Result where Self: Sized { + fn command() -> &'static str { + "headers" + } + + fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { let headers = Headers { headers: try!(reader.read_list()), }; Ok(headers) } + + fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { + stream.append_list(&self.headers); + Ok(()) + } } diff --git a/message/src/types/inv.rs b/message/src/types/inv.rs index 28c93fc9..1978481e 100644 --- a/message/src/types/inv.rs +++ b/message/src/types/inv.rs @@ -1,23 +1,31 @@ -use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError}; +use ser::{Stream, Reader}; use common::InventoryVector; +use {PayloadType, MessageResult}; #[derive(Debug, PartialEq)] pub struct Inv { pub inventory: Vec, } -impl Serializable for Inv { - fn serialize(&self, stream: &mut Stream) { - stream.append_list(&self.inventory); +impl PayloadType for Inv { + fn version() -> u32 { + 0 } -} -impl Deserializable for Inv { - fn deserialize(reader: &mut Reader) -> Result where Self: Sized { + fn command() -> &'static str { + "inv" + } + + fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { let inv = Inv { - inventory: try!(reader.read_list()), + inventory: try!(reader.read_list_max(50_000)), }; Ok(inv) } + + fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { + stream.append_list(&self.inventory); + Ok(()) + } } diff --git a/message/src/types/mempool.rs b/message/src/types/mempool.rs new file mode 100644 index 00000000..6bef1893 --- /dev/null +++ b/message/src/types/mempool.rs @@ -0,0 +1,23 @@ +use ser::{Stream, Reader}; +use {PayloadType, MessageResult}; + +#[derive(Debug, PartialEq)] +pub struct MemPool; + +impl PayloadType for MemPool { + fn version() -> u32 { + 60002 + } + + fn command() -> &'static str { + "mempool" + } + + fn deserialize_payload(_reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { + Ok(MemPool) + } + + fn serialize_payload(&self, _stream: &mut Stream, _version: u32) -> MessageResult<()> { + Ok(()) + } +} diff --git a/message/src/types/merkle_block.rs b/message/src/types/merkle_block.rs index 696ab903..516a8a06 100644 --- a/message/src/types/merkle_block.rs +++ b/message/src/types/merkle_block.rs @@ -1,7 +1,8 @@ use hash::H256; use bytes::Bytes; -use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError}; +use ser::{Stream, Reader}; use chain::BlockHeader; +use {PayloadType, MessageResult}; #[derive(Debug, PartialEq)] pub struct MerkleBlock { @@ -11,18 +12,16 @@ pub struct MerkleBlock { flags: Bytes, } -impl Serializable for MerkleBlock { - fn serialize(&self, stream: &mut Stream) { - stream - .append(&self.block_header) - .append(&self.total_transactions) - .append_list(&self.hashes) - .append(&self.flags); +impl PayloadType for MerkleBlock { + fn version() -> u32 { + 70014 } -} -impl Deserializable for MerkleBlock { - fn deserialize(reader: &mut Reader) -> Result where Self: Sized { + fn command() -> &'static str { + "merkleblock" + } + + fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { let merkle_block = MerkleBlock { block_header: try!(reader.read()), total_transactions: try!(reader.read()), @@ -32,4 +31,13 @@ impl Deserializable for MerkleBlock { Ok(merkle_block) } + + fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { + stream + .append(&self.block_header) + .append(&self.total_transactions) + .append_list(&self.hashes) + .append(&self.flags); + Ok(()) + } } diff --git a/message/src/types/mod.rs b/message/src/types/mod.rs index 3a6d5631..1a9bb8ad 100644 --- a/message/src/types/mod.rs +++ b/message/src/types/mod.rs @@ -2,38 +2,48 @@ pub mod addr; mod blocktxn; mod compactblock; mod feefilter; -mod filterload; mod filteradd; +mod filterclear; +mod filterload; +mod getaddr; mod getblocks; mod getblocktxn; +mod getdata; +mod getheaders; mod headers; mod inv; +mod mempool; mod merkle_block; +mod notfound; mod ping; mod pong; pub mod reject; mod sendcompact; +mod sendheaders; mod verack; pub mod version; -pub use self::addr::{Addr, AddrBelow31402}; +pub use self::addr::Addr; pub use self::blocktxn::BlockTxn; pub use self::compactblock::CompactBlock; pub use self::feefilter::FeeFilter; pub use self::filterload::FilterLoad; +pub use self::filterclear::FilterClear; pub use self::filteradd::FilterAdd; +pub use self::getaddr::GetAddr; pub use self::getblocks::GetBlocks; pub use self::getblocktxn::GetBlockTxn; +pub use self::getdata::GetData; +pub use self::getheaders::GetHeaders; pub use self::headers::Headers; pub use self::inv::Inv; +pub use self::mempool::MemPool; pub use self::merkle_block::MerkleBlock; +pub use self::notfound::NotFound; pub use self::ping::Ping; pub use self::pong::Pong; pub use self::reject::Reject; pub use self::sendcompact::SendCompact; +pub use self::sendheaders::SendHeaders; pub use self::verack::Verack; pub use self::version::Version; - -pub type GetData = Inv; -pub type NotFound = Inv; -pub type GetHeaders = GetBlocks; diff --git a/message/src/types/notfound.rs b/message/src/types/notfound.rs new file mode 100644 index 00000000..03e420c1 --- /dev/null +++ b/message/src/types/notfound.rs @@ -0,0 +1,31 @@ +use ser::{Stream, Reader}; +use common::InventoryVector; +use {PayloadType, MessageResult}; + +#[derive(Debug, PartialEq)] +pub struct NotFound { + pub inventory: Vec, +} + +impl PayloadType for NotFound { + fn version() -> u32 { + 0 + } + + fn command() -> &'static str { + "notfound" + } + + fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { + let inv = NotFound { + inventory: try!(reader.read_list_max(50_000)), + }; + + Ok(inv) + } + + fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { + stream.append_list(&self.inventory); + Ok(()) + } +} diff --git a/message/src/types/ping.rs b/message/src/types/ping.rs index 08842e9d..c6134db4 100644 --- a/message/src/types/ping.rs +++ b/message/src/types/ping.rs @@ -1,27 +1,11 @@ -use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError}; -use serialization::PayloadType; +use ser::{Stream, Reader}; +use {MessageResult, PayloadType}; #[derive(Debug, PartialEq)] pub struct Ping { pub nonce: u64, } -impl Serializable for Ping { - fn serialize(&self, stream: &mut Stream) { - stream.append(&self.nonce); - } -} - -impl Deserializable for Ping { - fn deserialize(reader: &mut Reader) -> Result where Self: Sized { - let ping = Ping { - nonce: try!(reader.read()), - }; - - Ok(ping) - } -} - impl PayloadType for Ping { fn version() -> u32 { 0 @@ -30,4 +14,17 @@ impl PayloadType for Ping { fn command() -> &'static str { "ping" } + + fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { + let ping = Ping { + nonce: try!(reader.read()), + }; + + Ok(ping) + } + + fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { + stream.append(&self.nonce); + Ok(()) + } } diff --git a/message/src/types/pong.rs b/message/src/types/pong.rs index 2a6d8ffe..a1768a07 100644 --- a/message/src/types/pong.rs +++ b/message/src/types/pong.rs @@ -1,27 +1,11 @@ -use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError}; -use serialization::PayloadType; +use ser::{Stream, Reader}; +use {PayloadType, MessageResult}; #[derive(Debug, PartialEq)] pub struct Pong { pub nonce: u64, } -impl Serializable for Pong { - fn serialize(&self, stream: &mut Stream) { - stream.append(&self.nonce); - } -} - -impl Deserializable for Pong { - fn deserialize(reader: &mut Reader) -> Result where Self: Sized { - let ping = Pong { - nonce: try!(reader.read()), - }; - - Ok(ping) - } -} - impl PayloadType for Pong { fn version() -> u32 { 0 @@ -30,4 +14,17 @@ impl PayloadType for Pong { fn command() -> &'static str { "pong" } + + fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { + let pong = Pong { + nonce: try!(reader.read()), + }; + + Ok(pong) + } + + fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { + stream.append(&self.nonce); + Ok(()) + } } diff --git a/message/src/types/reject.rs b/message/src/types/reject.rs index 307a1b43..74c887a3 100644 --- a/message/src/types/reject.rs +++ b/message/src/types/reject.rs @@ -1,5 +1,5 @@ use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError}; -use serialization::PayloadType; +use {PayloadType, MessageResult}; #[derive(Debug, PartialEq, Clone, Copy)] #[repr(u8)] @@ -59,27 +59,6 @@ pub struct Reject { // TODO: data } -impl Serializable for Reject { - fn serialize(&self, stream: &mut Stream) { - stream - .append(&self.message) - .append(&self.code) - .append(&self.reason); - } -} - -impl Deserializable for Reject { - fn deserialize(reader: &mut Reader) -> Result where Self: Sized { - let reject = Reject { - message: try!(reader.read()), - code: try!(reader.read()), - reason: try!(reader.read()), - }; - - Ok(reject) - } -} - impl PayloadType for Reject { fn version() -> u32 { 0 @@ -88,4 +67,22 @@ impl PayloadType for Reject { fn command() -> &'static str { "reject" } + + fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { + let reject = Reject { + message: try!(reader.read()), + code: try!(reader.read()), + reason: try!(reader.read()), + }; + + Ok(reject) + } + + fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { + stream + .append(&self.message) + .append(&self.code) + .append(&self.reason); + Ok(()) + } } diff --git a/message/src/types/sendcompact.rs b/message/src/types/sendcompact.rs index 4987775f..7477ecc2 100644 --- a/message/src/types/sendcompact.rs +++ b/message/src/types/sendcompact.rs @@ -1,4 +1,5 @@ -use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError}; +use ser::{Stream, Reader}; +use {PayloadType, MessageResult}; #[derive(Debug, PartialEq)] pub struct SendCompact { @@ -6,16 +7,16 @@ pub struct SendCompact { second: u64, } -impl Serializable for SendCompact { - fn serialize(&self, stream: &mut Stream) { - stream - .append(&self.first) - .append(&self.second); +impl PayloadType for SendCompact { + fn version() -> u32 { + 70014 } -} -impl Deserializable for SendCompact { - fn deserialize(reader: &mut Reader) -> Result where Self: Sized { + fn command() -> &'static str { + "sendcmpct" + } + + fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { let send_compact = SendCompact { first: try!(reader.read()), second: try!(reader.read()), @@ -23,4 +24,11 @@ impl Deserializable for SendCompact { Ok(send_compact) } + + fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { + stream + .append(&self.first) + .append(&self.second); + Ok(()) + } } diff --git a/message/src/types/sendheaders.rs b/message/src/types/sendheaders.rs new file mode 100644 index 00000000..8ec2807d --- /dev/null +++ b/message/src/types/sendheaders.rs @@ -0,0 +1,23 @@ +use ser::{Stream, Reader}; +use {PayloadType, MessageResult}; + +#[derive(Debug, PartialEq)] +pub struct SendHeaders; + +impl PayloadType for SendHeaders { + fn version() -> u32 { + 70012 + } + + fn command() -> &'static str { + "sendheaders" + } + + fn deserialize_payload(_reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { + Ok(SendHeaders) + } + + fn serialize_payload(&self, _stream: &mut Stream, _version: u32) -> MessageResult<()> { + Ok(()) + } +} diff --git a/message/src/types/verack.rs b/message/src/types/verack.rs index 2aab3db7..65a4e546 100644 --- a/message/src/types/verack.rs +++ b/message/src/types/verack.rs @@ -1,19 +1,9 @@ -use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError}; -use serialization::PayloadType; +use ser::{Stream, Reader}; +use {PayloadType, MessageResult}; #[derive(Debug, PartialEq)] pub struct Verack; -impl Serializable for Verack { - fn serialize(&self, _stream: &mut Stream) {} -} - -impl Deserializable for Verack { - fn deserialize(_reader: &mut Reader) -> Result where Self: Sized { - Ok(Verack) - } -} - impl PayloadType for Verack { fn version() -> u32 { 0 @@ -22,4 +12,12 @@ impl PayloadType for Verack { fn command() -> &'static str { "verack" } + + fn deserialize_payload(_reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { + Ok(Verack) + } + + fn serialize_payload(&self, _stream: &mut Stream, _version: u32) -> MessageResult<()> { + Ok(()) + } } diff --git a/message/src/types/version.rs b/message/src/types/version.rs index f0624ba9..b7f860de 100644 --- a/message/src/types/version.rs +++ b/message/src/types/version.rs @@ -1,10 +1,11 @@ use bytes::Bytes; use ser::{ Serializable, Stream, - Deserializable, Reader, Error as ReaderError, deserialize + Deserializable, Reader, Error as ReaderError, }; use common::{NetAddress, ServiceFlags}; -use serialization::PayloadType; +use {PayloadType, MessageResult}; +use serialization::deserialize_payload; #[derive(Debug, PartialEq)] pub enum Version { @@ -21,6 +22,43 @@ impl PayloadType for Version { fn command() -> &'static str { "version" } + + // version package is an serialization excpetion + fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { + let simple: V0 = try!(reader.read()); + + if simple.version < 106 { + return Ok(Version::V0(simple)); + } + + let v106: V106 = try!(reader.read()); + if simple.version < 70001 { + Ok(Version::V106(simple, v106)) + } else { + let v70001: V70001 = try!(reader.read()); + Ok(Version::V70001(simple, v106, v70001)) + } + } + + fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { + match *self { + Version::V0(ref simple) => { + stream.append(simple); + }, + Version::V106(ref simple, ref v106) => { + stream + .append(simple) + .append(v106); + }, + Version::V70001(ref simple, ref v106, ref v70001) => { + stream + .append(simple) + .append(v106) + .append(v70001); + }, + } + Ok(()) + } } impl Version { @@ -54,45 +92,6 @@ pub struct V70001 { pub relay: bool, } -impl Serializable for Version { - fn serialize(&self, stream: &mut Stream) { - match *self { - Version::V0(ref simple) => { - stream.append(simple); - }, - Version::V106(ref simple, ref v106) => { - stream - .append(simple) - .append(v106); - }, - Version::V70001(ref simple, ref v106, ref v70001) => { - stream - .append(simple) - .append(v106) - .append(v70001); - }, - } - } -} - -impl Deserializable for Version { - fn deserialize(reader: &mut Reader) -> Result where Self: Sized { - let simple: V0 = try!(reader.read()); - - if simple.version < 106 { - return Ok(Version::V0(simple)); - } - - let v106: V106 = try!(reader.read()); - if simple.version < 70001 { - Ok(Version::V106(simple, v106)) - } else { - let v70001: V70001 = try!(reader.read()); - Ok(Version::V70001(simple, v106, v70001)) - } - } -} - impl Serializable for V0 { fn serialize(&self, stream: &mut Stream) { stream @@ -158,14 +157,14 @@ impl Deserializable for V70001 { impl From<&'static str> for Version { fn from(s: &'static str) -> Self { let bytes: Bytes = s.into(); - deserialize(&bytes).unwrap() + deserialize_payload(&bytes, 0).unwrap() } } #[cfg(test)] mod test { use bytes::Bytes; - use ser::{serialize, deserialize}; + use serialization::{serialize_payload, deserialize_payload}; use super::{Version, V0, V106}; #[test] @@ -184,7 +183,7 @@ mod test { start_height: 98645, }); - assert_eq!(serialize(&version), expected); + assert_eq!(serialize_payload(&version, 0), Ok(expected)); } #[test] @@ -203,6 +202,6 @@ mod test { start_height: 98645, }); - assert_eq!(expected, deserialize(&raw).unwrap()); + assert_eq!(expected, deserialize_payload(&raw, 0).unwrap()); } } diff --git a/p2p/src/io/handshake.rs b/p2p/src/io/handshake.rs index 7189e6dc..1ce813b2 100644 --- a/p2p/src/io/handshake.rs +++ b/p2p/src/io/handshake.rs @@ -1,9 +1,9 @@ use std::{io, cmp}; use futures::{Future, Poll, Async}; -use message::{Message, Payload}; +use message::Message; use message::types::{Version, Verack}; use message::common::Magic; -use io::{write_message, read_message, ReadMessage, WriteMessage, ReadSpecificMessage, read_specific_message}; +use io::{write_message, WriteMessage, ReadSpecificMessage, read_specific_message}; use Error; pub fn handshake(a: A, magic: Magic, version: Version) -> Handshake where A: io::Write + io::Read { @@ -25,7 +25,6 @@ pub fn accept_handshake(a: A, magic: Magic, version: Version) -> AcceptHandsh } } -/// TODO: return Err if other version is not supported pub fn negotiate_version(local: u32, other: u32) -> u32 { cmp::min(local, other) } @@ -36,17 +35,17 @@ pub struct HandshakeResult { pub negotiated_version: u32, } -fn version_message(magic: Magic, version: Version) -> Message { - Message::new(magic, Payload::Version(version)) +fn version_message(magic: Magic, version: Version) -> Message { + Message::new(magic, version.version(), &version).expect("version message should always be serialized correctly") } -fn verack_message(magic: Magic) -> Message { - Message::new(magic, Payload::Verack) +fn verack_message(magic: Magic) -> Message { + Message::new(magic, 0, &Verack).expect("verack message should always be serialized correctly") } enum HandshakeState { - SendVersion(WriteMessage), - ReceiveVersion(ReadMessage), + SendVersion(WriteMessage), + ReceiveVersion(ReadSpecificMessage), ReceiveVerack { version: Option, future: ReadSpecificMessage, @@ -61,11 +60,11 @@ enum AcceptHandshakeState { }, SendVersion { version: Option, - future: WriteMessage, + future: WriteMessage, }, SendVerack { version: Option, - future: WriteMessage, + future: WriteMessage, }, Finished, } @@ -90,13 +89,12 @@ impl Future for Handshake where A: io::Read + io::Write { let (next, result) = match self.state { HandshakeState::SendVersion(ref mut future) => { let (stream, _) = try_ready!(future.poll()); - (HandshakeState::ReceiveVersion(read_message(stream, self.magic, 0)), Async::NotReady) + (HandshakeState::ReceiveVersion(read_specific_message(stream, self.magic, 0)), Async::NotReady) }, HandshakeState::ReceiveVersion(ref mut future) => { - let (stream, payload) = try_ready!(future.poll()); - let version = match payload { - Ok(Payload::Version(version)) => version, - Ok(_) => return Ok((stream, Err(Error::Handshake)).into()), + let (stream, version) = try_ready!(future.poll()); + let version = match version { + Ok(version) => version, Err(err) => return Ok((stream, Err(err.into())).into()), }; diff --git a/p2p/src/io/mod.rs b/p2p/src/io/mod.rs index 9133191e..d2e9136a 100644 --- a/p2p/src/io/mod.rs +++ b/p2p/src/io/mod.rs @@ -1,7 +1,7 @@ mod handshake; mod read_header; -mod read_message; -mod read_payload; +//mod read_message; +//mod read_payload; mod read_specific_message; mod read_specific_payload; mod readrc; @@ -11,8 +11,8 @@ pub use self::handshake::{ handshake, accept_handshake, Handshake, AcceptHandshake, HandshakeResult }; pub use self::read_header::{read_header, ReadHeader}; -pub use self::read_message::{read_message, ReadMessage, read_message_stream, ReadMessageStream}; -pub use self::read_payload::{read_payload, ReadPayload}; +//pub use self::read_message::{read_message, ReadMessage, read_message_stream, ReadMessageStream}; +//pub use self::read_payload::{read_payload, ReadPayload}; pub use self::read_specific_payload::{read_specific_payload, ReadSpecificPayload}; pub use self::read_specific_message::{read_specific_message, ReadSpecificMessage}; pub use self::readrc::ReadRc; diff --git a/p2p/src/io/read_specific_message.rs b/p2p/src/io/read_specific_message.rs index c67c2145..d19a312a 100644 --- a/p2p/src/io/read_specific_message.rs +++ b/p2p/src/io/read_specific_message.rs @@ -1,14 +1,13 @@ use std::io; use std::marker::PhantomData; use futures::{Poll, Future, Async}; -use ser::Deserializable; use message::{MessageResult, Error}; use message::common::Magic; use message::serialization::PayloadType; use io::{read_header, ReadHeader, read_specific_payload, ReadSpecificPayload}; pub fn read_specific_message(a: A, magic: Magic, version: u32) -> ReadSpecificMessage - where A: io::Read, M: PayloadType + Deserializable { + where A: io::Read, M: PayloadType { ReadSpecificMessage { state: ReadMessageState::ReadHeader { version: version, @@ -34,7 +33,7 @@ pub struct ReadSpecificMessage { message_type: PhantomData, } -impl Future for ReadSpecificMessage where A: io::Read, M: PayloadType + Deserializable { +impl Future for ReadSpecificMessage where A: io::Read, M: PayloadType { type Item = (A, MessageResult); type Error = io::Error; diff --git a/p2p/src/io/read_specific_payload.rs b/p2p/src/io/read_specific_payload.rs index bfbb3247..ae264fc8 100644 --- a/p2p/src/io/read_specific_payload.rs +++ b/p2p/src/io/read_specific_payload.rs @@ -4,12 +4,11 @@ use futures::{Poll, Future}; use tokio_core::io::{read_exact, ReadExact}; use bytes::Bytes; use hash::H32; -use ser::Deserializable; use message::MessageResult; use message::serialization::{PayloadType, deserialize_payload}; pub fn read_specific_payload(a: A, version: u32, len: usize, checksum: H32) -> ReadSpecificPayload - where A: io::Read, M: PayloadType + Deserializable { + where A: io::Read, M: PayloadType { ReadSpecificPayload { reader: read_exact(a, Bytes::new_with_len(len)), version: version, @@ -26,7 +25,7 @@ pub struct ReadSpecificPayload { } /// TODO: check checksum -impl Future for ReadSpecificPayload where A: io::Read, M: PayloadType + Deserializable { +impl Future for ReadSpecificPayload where A: io::Read, M: PayloadType { type Item = (A, MessageResult); type Error = io::Error; diff --git a/p2p/src/io/write_message.rs b/p2p/src/io/write_message.rs index 6ee50e6d..a4bc2123 100644 --- a/p2p/src/io/write_message.rs +++ b/p2p/src/io/write_message.rs @@ -1,29 +1,23 @@ use std::io; use futures::{Future, Poll}; use tokio_core::io::{WriteAll, write_all}; -use bytes::Bytes; -use ser::serialize; use message::Message; -pub fn write_message(a: A, message: Message) -> WriteMessage where A: io::Write { +pub fn write_message(a: A, message: Message) -> WriteMessage where A: io::Write { WriteMessage { - future: write_all(a, serialize(&message)), - message: Some(message), + future: write_all(a, message), } } -pub struct WriteMessage { - future: WriteAll, - message: Option, +pub struct WriteMessage { + future: WriteAll>, } -impl Future for WriteMessage where A: io::Write { - type Item = (A, Message); +impl Future for WriteMessage where A: io::Write { + type Item = (A, Message); type Error = io::Error; fn poll(&mut self) -> Poll { - let (stream, _) = try_ready!(self.future.poll()); - let message = self.message.take().expect("write message must be initialized with message"); - Ok((stream, message).into()) + self.future.poll() } } diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 75c679e6..4f10fb97 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -10,21 +10,21 @@ extern crate primitives; extern crate serialization as ser; pub mod io; -pub mod net; +//pub mod net; pub mod util; -mod config; +//mod config; mod error; mod event_loop; -mod run; +//mod run; pub const VERSION: u32 = 70_001; pub const USER_AGENT: &'static str = "pbtc"; pub use primitives::{hash, bytes}; -pub use config::Config; +//pub use config::Config; pub use error::Error; pub use event_loop::event_loop; -pub use run::run; +//pub use run::run; pub type P2PResult = Result; diff --git a/primitives/src/bytes.rs b/primitives/src/bytes.rs index d489e3be..55831f6b 100644 --- a/primitives/src/bytes.rs +++ b/primitives/src/bytes.rs @@ -1,4 +1,4 @@ -use std::{ops, str, fmt, io}; +use std::{ops, str, fmt, io, marker}; use hex::{ToHex, FromHex, FromHexError}; #[derive(Default, PartialEq, Clone)] @@ -78,6 +78,51 @@ impl AsMut<[u8]> for Bytes { } } +#[derive(Default, PartialEq, Clone)] +pub struct TaggedBytes { + bytes: Bytes, + label: marker::PhantomData, +} + +impl TaggedBytes { + pub fn new(bytes: Bytes) -> Self { + TaggedBytes { + bytes: bytes, + label: marker::PhantomData, + } + } + + pub fn into_raw(self) -> Bytes { + self.bytes + } +} + +impl ops::Deref for TaggedBytes { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.bytes.0 + } +} + +impl ops::DerefMut for TaggedBytes { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.bytes.0 + } +} + +impl AsRef<[u8]> for TaggedBytes { + fn as_ref(&self) -> &[u8] { + &self.bytes.0 + } +} + +impl AsMut<[u8]> for TaggedBytes { + fn as_mut(&mut self) -> &mut [u8] { + &mut self.bytes.0 + } +} + #[cfg(test)] mod tests { use super::Bytes; diff --git a/serialization/src/reader.rs b/serialization/src/reader.rs index 649eb400..e820cc72 100644 --- a/serialization/src/reader.rs +++ b/serialization/src/reader.rs @@ -67,7 +67,22 @@ impl<'a> Reader<'a> { pub fn read_list(&mut self) -> Result, Error> where T: Deserializable { let len: usize = try!(self.read::()).into(); - let mut result = vec![]; + let mut result = Vec::with_capacity(len); + + for _ in 0..len { + result.push(try!(self.read())); + } + + Ok(result) + } + + pub fn read_list_max(&mut self, max: usize) -> Result, Error> where T: Deserializable { + let len: usize = try!(self.read::()).into(); + if len > max { + return Err(Error::MalformedData); + } + + let mut result = Vec::with_capacity(len); for _ in 0..len { result.push(try!(self.read())); diff --git a/serialization/src/stream.rs b/serialization/src/stream.rs index 28928c8a..9b3508d7 100644 --- a/serialization/src/stream.rs +++ b/serialization/src/stream.rs @@ -43,6 +43,14 @@ impl Stream { self } + pub fn append_list_ref(&mut self, t: &[&T]) -> &mut Self where T: Serializable { + CompactInteger::from(t.len()).serialize(self); + for i in t { + i.serialize(self); + } + self + } + /// Full stream. pub fn out(self) -> Bytes { self.buffer.into()