message cleanup in progress

This commit is contained in:
debris 2016-10-10 18:08:22 +02:00
parent 3873cf6b6b
commit ffc300f85a
44 changed files with 740 additions and 495 deletions

View File

@ -2,7 +2,7 @@ use std::{str, net};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone, Copy)]
pub struct IpAddress(net::IpAddr);
impl From<net::IpAddr> for IpAddress {

View File

@ -1,7 +1,7 @@
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone, Copy)]
pub struct Port(u16);
impl From<u16> for Port {

View File

@ -12,6 +12,7 @@ mod error;
pub use primitives::{hash, bytes};
pub use message::{Message, MessageHeader, Payload};
pub use message::{Message, MessageHeader};
pub use error::Error;
pub use serialization::PayloadType;
pub type MessageResult<T> = Result<T, Error>;

View File

@ -1,52 +1,31 @@
use crypto::checksum;
use ser::{Serializable, Stream, serialize};
use ser::Stream;
use bytes::TaggedBytes;
use common::Magic;
use message::{MessageHeader, Payload};
use serialization::serialize_payload;
use {PayloadType, MessageResult, MessageHeader};
#[derive(Debug, PartialEq)]
pub struct Message {
pub header: MessageHeader,
pub payload: Payload,
pub struct Message<T> {
bytes: TaggedBytes<T>,
}
impl Message {
pub fn new(magic: Magic, payload: Payload) -> Message {
let serialized = serialize(&payload);
Message {
header: MessageHeader {
magic: magic,
command: payload.command(),
len: serialized.len() as u32,
checksum: checksum(&serialized),
},
payload: payload,
}
impl<T> Message<T> where T: PayloadType {
pub fn new(magic: Magic, version: u32, payload: &T) -> MessageResult<Self> {
let serialized = try!(serialize_payload(payload, version));
let header = MessageHeader::for_data(magic, T::command().into(), &serialized);
let mut stream = Stream::default();
stream.append(&header);
stream.append_slice(&serialized);
let message = Message {
bytes: TaggedBytes::new(stream.out()),
};
Ok(message)
}
}
impl Serializable for Message {
fn serialize(&self, stream: &mut Stream) {
stream
.append(&self.header)
.append(&self.payload);
}
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use ser::serialize;
use common::Magic;
use types::Version;
use super::Message;
use Payload;
#[test]
fn test_message_serialization() {
let expected: Bytes = "f9beb4d976657273696f6e000000000064000000358d493262ea0000010000000000000011b2d05000000000010000000000000000000000000000000000ffff000000000000000000000000000000000000000000000000ffff0000000000003b2eb35d8ce617650f2f5361746f7368693a302e372e322fc03e0300".into();
let version: Version = "62ea0000010000000000000011b2d05000000000010000000000000000000000000000000000ffff000000000000000000000000000000000000000000000000ffff0000000000003b2eb35d8ce617650f2f5361746f7368693a302e372e322fc03e0300".into();
let magic = Magic::Mainnet;
let message = Message::new(magic, Payload::Version(version));
assert_eq!(serialize(&message), expected);
impl<T> AsRef<[u8]> for Message<T> {
fn as_ref(&self) -> &[u8] {
self.bytes.as_ref()
}
}

View File

@ -1,5 +1,6 @@
use hash::H32;
use ser::{Serializable, Stream, Reader};
use crypto::checksum;
use common::{Command, Magic};
use Error;
@ -11,6 +12,17 @@ pub struct MessageHeader {
pub checksum: H32,
}
impl MessageHeader {
pub fn for_data(magic: Magic, command: Command, data: &[u8]) -> Self {
MessageHeader {
magic: magic,
command: command,
len: data.len() as u32,
checksum: checksum(data),
}
}
}
impl MessageHeader {
pub fn deserialize(data: &[u8], expected: Magic) -> Result<Self, Error> {
if data.len() != 24 {

View File

@ -1,7 +1,5 @@
mod message;
mod message_header;
mod payload;
pub use self::message::Message;
pub use self::message_header::MessageHeader;
pub use self::payload::Payload;

View File

@ -1,153 +0,0 @@
use hash::H32;
use ser::{Serializable, Stream, deserialize};
use chain::{Transaction, Block};
use crypto::checksum;
use common::Command;
use types::{
Version, Addr, AddrBelow31402, Inv,
GetData, NotFound, GetBlocks, GetHeaders, Headers,
Ping, Pong, Reject, FilterLoad, FilterAdd, FeeFilter,
MerkleBlock, SendCompact, CompactBlock, GetBlockTxn, BlockTxn,
};
use Error;
#[derive(Debug, PartialEq)]
pub enum Payload {
Version(Version),
Verack,
Addr(Addr),
AddrBelow31402(AddrBelow31402),
Inv(Inv),
GetData(GetData),
NotFound(NotFound),
GetBlocks(GetBlocks),
GetHeaders(GetHeaders),
Tx(Transaction),
Block(Block),
Headers(Headers),
GetAddr,
MemPool,
Ping(Ping),
Pong(Pong),
Reject(Reject),
FilterLoad(FilterLoad),
FilterAdd(FilterAdd),
FilterClear,
MerkleBlock(MerkleBlock),
SendHeaders,
FeeFilter(FeeFilter),
SendCompact(SendCompact),
CompactBlock(CompactBlock),
GetBlockTxn(GetBlockTxn),
BlockTxn(BlockTxn),
}
impl Payload {
pub fn command(&self) -> Command {
let cmd = match *self {
Payload::Version(_) => "version",
Payload::Verack => "verack",
Payload::Addr(_) | Payload::AddrBelow31402(_) => "addr",
Payload::Inv(_) => "inv",
Payload::GetData(_) => "getdata",
Payload::NotFound(_) => "notfound",
Payload::GetBlocks(_) => "getblocks",
Payload::GetHeaders(_) => "getheaders",
Payload::Tx(_) => "tx",
Payload::Block(_) => "block",
Payload::Headers(_) => "headers",
Payload::GetAddr => "getaddr",
Payload::MemPool=> "mempool",
Payload::Ping(_) => "ping",
Payload::Pong(_) => "pong",
Payload::Reject(_) => "reject",
Payload::FilterLoad(_) => "filterload",
Payload::FilterAdd(_) => "filteradd",
Payload::FilterClear => "filterclear",
Payload::MerkleBlock(_) => "merkleblock",
Payload::SendHeaders => "sendheaders",
Payload::FeeFilter(_) => "feefilter",
Payload::SendCompact(_) => "sendcmpct",
Payload::CompactBlock(_) => "compactblock",
Payload::GetBlockTxn(_) => "getblocktxn",
Payload::BlockTxn(_) => "blocktxn",
};
cmd.into()
}
pub fn deserialize(data: &[u8], check: &H32, version: u32, command: &Command) -> Result<Self, Error> {
if &checksum(data) != check {
return Err(Error::InvalidChecksum);
}
let result = match &command.to_string() as &str {
"version" => deserialize(data).map(Payload::Version),
"verack" if data.is_empty() => Ok(Payload::Verack),
"addr" => match version >= 31402 {
true => deserialize(data).map(Payload::Addr),
false => deserialize(data).map(Payload::AddrBelow31402),
},
"inv" => deserialize(data).map(Payload::Inv),
"getdata" => deserialize(data).map(Payload::GetData),
"notfound" => deserialize(data).map(Payload::NotFound),
"getblocks" => deserialize(data).map(Payload::GetBlocks),
"getheaders" => deserialize(data).map(Payload::GetHeaders),
"tx" => deserialize(data).map(Payload::Tx),
"block" => deserialize(data).map(Payload::Block),
"headers" => deserialize(data).map(Payload::Headers),
"getaddr" if data.is_empty() => Ok(Payload::GetAddr),
"mempool" if data.is_empty() => Ok(Payload::MemPool),
"ping" => deserialize(data).map(Payload::Ping),
"pong" => deserialize(data).map(Payload::Pong),
"reject" => deserialize(data).map(Payload::Reject),
"filterload" => deserialize(data).map(Payload::FilterLoad),
"filteradd" => deserialize(data).map(Payload::FilterAdd),
"filterclear" if data.is_empty() => Ok(Payload::FilterClear),
"merkleblock" => deserialize(data).map(Payload::MerkleBlock),
"sendheaders" if data.is_empty() => Ok(Payload::SendHeaders),
"feefilter" => deserialize(data).map(Payload::FeeFilter),
"sendcmpct" => deserialize(data).map(Payload::SendCompact),
"cmpctblock" => deserialize(data).map(Payload::CompactBlock),
"getblocktxn" => deserialize(data).map(Payload::GetBlockTxn),
"blocktxn" => deserialize(data).map(Payload::BlockTxn),
_ => return Err(Error::InvalidCommand),
};
result.map_err(Into::into)
}
}
impl Serializable for Payload {
fn serialize(&self, stream: &mut Stream) {
match *self {
Payload::Version(ref p) => { stream.append(p); },
Payload::Verack => {},
Payload::Addr(ref p) => { stream.append(p); },
Payload::AddrBelow31402(ref p) => { stream.append(p); },
Payload::Inv(ref p) => { stream.append(p); },
Payload::GetData(ref p) => { stream.append(p); },
Payload::NotFound(ref p) => { stream.append(p); },
Payload::GetBlocks(ref p) => { stream.append(p); },
Payload::GetHeaders(ref p) => { stream.append(p); },
Payload::Tx(ref p) => { stream.append(p); },
Payload::Block(ref p) => { stream.append(p); },
Payload::Headers(ref p) => { stream.append(p); },
Payload::GetAddr => {},
Payload::MemPool => {},
Payload::Ping(ref p) => { stream.append(p); },
Payload::Pong(ref p) => { stream.append(p); },
Payload::Reject(ref p) => { stream.append(p); },
Payload::FilterLoad(ref p) => { stream.append(p); },
Payload::FilterAdd(ref p) => { stream.append(p); },
Payload::FilterClear => {},
Payload::MerkleBlock(ref p) => { stream.append(p); },
Payload::SendHeaders => {},
Payload::FeeFilter(ref p) => { stream.append(p); },
Payload::SendCompact(ref p) => { stream.append(p); },
Payload::CompactBlock(ref p) => { stream.append(p); },
Payload::GetBlockTxn(ref p) => { stream.append(p); },
Payload::BlockTxn(ref p) => { stream.append(p); },
}
}
}

View File

@ -1,19 +1,14 @@
mod stream;
mod reader;
use ser::{Reader, Deserializable};
use {MessageResult, Error};
pub use self::stream::PayloadStream;
pub use self::stream::{PayloadStream, serialize_payload};
pub use self::reader::{PayloadReader, deserialize_payload};
use ser::{Reader, Stream};
use MessageResult;
pub trait PayloadType: Deserializable {
pub trait PayloadType {
fn version() -> u32;
fn command() -> &'static str;
fn deserialize_payload(reader: &mut Reader, version: u32) -> MessageResult<Self> where Self: Sized {
if version < Self::version() {
return Err(Error::InvalidVersion);
}
Self::deserialize(reader).map_err(Into::into)
}
fn deserialize_payload(reader: &mut Reader, version: u32) -> MessageResult<Self> where Self: Sized;
fn serialize_payload(&self, stream: &mut Stream, version: u32) -> MessageResult<()>;
}

View File

@ -26,6 +26,10 @@ impl<'a> PayloadReader<'a> {
}
pub fn read<T>(&mut self) -> Result<T, Error> where T: PayloadType {
if T::version() > self.version {
return Err(Error::InvalidVersion);
}
T::deserialize_payload(&mut self.reader, self.version)
}

View File

@ -1,6 +1,12 @@
use ser::{Stream, Serializable};
use serialization::PayloadType;
use Error;
use bytes::Bytes;
use ser::Stream;
use {PayloadType, Error, MessageResult};
pub fn serialize_payload<T>(t: &T, version: u32) -> MessageResult<Bytes> where T: PayloadType {
let mut stream = PayloadStream::new(version);
try!(stream.append(t));
Ok(stream.out())
}
pub struct PayloadStream {
stream: Stream,
@ -15,12 +21,19 @@ impl PayloadStream {
}
}
pub fn append<T>(&mut self, t: &T) -> Result<(), Error> where T: PayloadType + Serializable {
if self.version < T::version() {
pub fn append<T>(&mut self, t: &T) -> MessageResult<()> where T: PayloadType {
if T::version() > self.version {
return Err(Error::InvalidVersion);
}
t.serialize(&mut self.stream);
Ok(())
t.serialize_payload(&mut self.stream, self.version)
}
pub fn raw_stream(&mut self) -> &mut Stream {
&mut self.stream
}
pub fn out(self) -> Bytes {
self.stream.out()
}
}

View File

@ -2,8 +2,49 @@ use ser::{
Serializable, Stream,
Deserializable, Reader, Error as ReaderError,
};
use serialization::PayloadType;
use common::NetAddress;
use {PayloadType, MessageResult};
#[derive(Debug, PartialEq)]
pub enum Addr {
V0(V0),
V31402(V31402),
}
impl PayloadType for Addr {
fn version() -> u32 {
0
}
fn command() -> &'static str {
"addr"
}
fn deserialize_payload(reader: &mut Reader, version: u32) -> MessageResult<Self> where Self: Sized {
let result = if version < 31402 {
reader.read().map(Addr::V0)
} else {
reader.read().map(Addr::V31402)
};
result.map_err(Into::into)
}
fn serialize_payload(&self, stream: &mut Stream, version: u32) -> MessageResult<()> {
match *self {
Addr::V0(ref addr) => addr.serialize(stream),
Addr::V31402(ref addr) => {
if version < 31402 {
let view = V31402AsV0::new(addr);
view.serialize(stream);
} else {
addr.serialize(stream);
}
}
}
Ok(())
}
}
#[derive(Debug, PartialEq)]
pub struct AddressEntry {
@ -31,21 +72,20 @@ impl Deserializable for AddressEntry {
}
#[derive(Debug, PartialEq)]
pub struct Addr {
pub struct V31402 {
pub addresses: Vec<AddressEntry>,
}
impl Serializable for Addr {
impl Serializable for V31402 {
fn serialize(&self, stream: &mut Stream) {
stream.append_list(&self.addresses);
}
}
impl Deserializable for Addr {
impl Deserializable for V31402 {
fn deserialize(reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
// TODO: limit to 1000
let result = Addr {
addresses: try!(reader.read_list()),
let result = V31402 {
addresses: try!(reader.read_list_max(1000)),
};
Ok(result)
@ -53,37 +93,55 @@ impl Deserializable for Addr {
}
#[derive(Debug, PartialEq)]
pub struct AddrBelow31402 {
pub struct V0 {
pub addresses: Vec<NetAddress>,
}
impl Serializable for AddrBelow31402 {
impl Serializable for V0 {
fn serialize(&self, stream: &mut Stream) {
stream.append_list(&self.addresses);
}
}
impl Deserializable for AddrBelow31402 {
impl Deserializable for V0 {
fn deserialize(reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
// TODO: limit to 1000
let result = AddrBelow31402 {
addresses: try!(reader.read_list()),
let result = V0 {
addresses: try!(reader.read_list_max(1000)),
};
Ok(result)
}
}
struct V31402AsV0<'a> {
v: &'a V31402,
}
impl<'a> V31402AsV0<'a> {
fn new(v: &'a V31402) -> Self {
V31402AsV0 {
v: v,
}
}
}
impl<'a> Serializable for V31402AsV0<'a> {
fn serialize(&self, stream: &mut Stream) {
let vec_ref: Vec<&'a NetAddress> = self.v.addresses.iter().map(|x| &x.address).collect();
stream.append_list_ref(&vec_ref);
}
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use ser::{serialize, deserialize};
use super::{Addr, AddressEntry};
use super::{V31402, AddressEntry};
#[test]
fn test_addr_serialize() {
let expected: Bytes = "01e215104d010000000000000000000000000000000000ffff0a000001208d".into();
let addr = Addr {
let addr = V31402 {
addresses: vec![
AddressEntry {
timestamp: 0x4d1015e2,
@ -98,7 +156,7 @@ mod tests {
#[test]
fn test_addr_deserialize() {
let raw: Bytes = "01e215104d010000000000000000000000000000000000ffff0a000001208d".into();
let expected = Addr {
let expected = V31402 {
addresses: vec![
AddressEntry {
timestamp: 0x4d1015e2,

View File

@ -1,23 +1,31 @@
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
use ser::{Stream, Reader};
use common::BlockTransactions;
use {MessageResult, PayloadType};
#[derive(Debug, PartialEq)]
pub struct BlockTxn {
request: BlockTransactions,
}
impl Serializable for BlockTxn {
fn serialize(&self, stream: &mut Stream) {
stream.append(&self.request);
impl PayloadType for BlockTxn {
fn version() -> u32 {
70014
}
}
impl Deserializable for BlockTxn {
fn deserialize(reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
fn command() -> &'static str {
"blocktxn"
}
fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
let block = BlockTxn {
request: try!(reader.read()),
};
Ok(block)
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
stream.append(&self.request);
Ok(())
}
}

View File

@ -1,23 +1,31 @@
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
use ser::{Stream, Reader};
use common::BlockHeaderAndIDs;
use {PayloadType, MessageResult};
#[derive(Debug, PartialEq)]
pub struct CompactBlock {
header: BlockHeaderAndIDs,
}
impl Serializable for CompactBlock {
fn serialize(&self, stream: &mut Stream) {
stream.append(&self.header);
impl PayloadType for CompactBlock {
fn version() -> u32 {
70014
}
}
impl Deserializable for CompactBlock {
fn deserialize(reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
fn command() -> &'static str {
"cmpctblock"
}
fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
let block = CompactBlock {
header: try!(reader.read()),
};
Ok(block)
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
stream.append(&self.header);
Ok(())
}
}

View File

@ -1,22 +1,30 @@
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
#[derive(Debug, PartialEq)]
pub struct FeeFilter {
fee_rate: u64,
}
impl Serializable for FeeFilter {
fn serialize(&self, stream: &mut Stream) {
stream.append(&self.fee_rate);
impl PayloadType for FeeFilter {
fn version() -> u32 {
70013
}
}
impl Deserializable for FeeFilter {
fn deserialize(reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
fn command() -> &'static str {
"cmpctblock"
}
fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
let fee_filter = FeeFilter {
fee_rate: try!(reader.read()),
};
Ok(fee_filter)
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
stream.append(&self.fee_rate);
Ok(())
}
}

View File

@ -1,5 +1,6 @@
use bytes::Bytes;
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
#[derive(Debug, PartialEq)]
pub struct FilterAdd {
@ -7,18 +8,25 @@ pub struct FilterAdd {
data: Bytes,
}
impl Serializable for FilterAdd {
fn serialize(&self, stream: &mut Stream) {
stream.append(&self.data);
impl PayloadType for FilterAdd {
fn version() -> u32 {
70001
}
}
impl Deserializable for FilterAdd {
fn deserialize(reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
let filteradd= FilterAdd {
fn command() -> &'static str {
"filteradd"
}
fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
let filteradd = FilterAdd {
data: try!(reader.read()),
};
Ok(filteradd)
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
stream.append(&self.data);
Ok(())
}
}

View File

@ -0,0 +1,23 @@
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
#[derive(Debug, PartialEq)]
pub struct FilterClear;
impl PayloadType for FilterClear {
fn version() -> u32 {
70001
}
fn command() -> &'static str {
"filterclear"
}
fn deserialize_payload(_reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
Ok(FilterClear)
}
fn serialize_payload(&self, _stream: &mut Stream, _version: u32) -> MessageResult<()> {
Ok(())
}
}

View File

@ -1,5 +1,6 @@
use bytes::Bytes;
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
#[derive(Debug, PartialEq)]
pub struct FilterLoad {
@ -10,18 +11,16 @@ pub struct FilterLoad {
flags: u8,
}
impl Serializable for FilterLoad {
fn serialize(&self, stream: &mut Stream) {
stream
.append(&self.filter)
.append(&self.hash_functions)
.append(&self.tweak)
.append(&self.flags);
impl PayloadType for FilterLoad {
fn version() -> u32 {
70001
}
}
impl Deserializable for FilterLoad {
fn deserialize(reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
fn command() -> &'static str {
"filterload"
}
fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
let filterload = FilterLoad {
filter: try!(reader.read()),
hash_functions: try!(reader.read()),
@ -31,4 +30,13 @@ impl Deserializable for FilterLoad {
Ok(filterload)
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
stream
.append(&self.filter)
.append(&self.hash_functions)
.append(&self.tweak)
.append(&self.flags);
Ok(())
}
}

View File

@ -0,0 +1,23 @@
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
#[derive(Debug, PartialEq)]
pub struct GetAddr;
impl PayloadType for GetAddr {
fn version() -> u32 {
60002
}
fn command() -> &'static str {
"getaddr"
}
fn deserialize_payload(_reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
Ok(GetAddr)
}
fn serialize_payload(&self, _stream: &mut Stream, _version: u32) -> MessageResult<()> {
Ok(())
}
}

View File

@ -1,5 +1,6 @@
use hash::H256;
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
#[derive(Debug, PartialEq)]
pub struct GetBlocks {
@ -8,24 +9,31 @@ pub struct GetBlocks {
hash_stop: H256,
}
impl Serializable for GetBlocks {
fn serialize(&self, stream: &mut Stream) {
stream
.append(&self.version)
.append_list(&self.block_locator_hashes)
.append(&self.hash_stop);
impl PayloadType for GetBlocks {
fn version() -> u32 {
0
}
}
impl Deserializable for GetBlocks {
fn deserialize(reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
fn command() -> &'static str {
"getblocks"
}
fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
let get_blocks = GetBlocks {
version: try!(reader.read()),
block_locator_hashes: try!(reader.read_list()),
block_locator_hashes: try!(reader.read_list_max(500)),
hash_stop: try!(reader.read()),
};
Ok(get_blocks)
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
stream
.append(&self.version)
.append_list(&self.block_locator_hashes)
.append(&self.hash_stop);
Ok(())
}
}

View File

@ -1,23 +1,31 @@
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
use ser::{Stream, Reader};
use common::BlockTransactionsRequest;
use {PayloadType, MessageResult};
#[derive(Debug, PartialEq)]
pub struct GetBlockTxn {
request: BlockTransactionsRequest,
}
impl Serializable for GetBlockTxn {
fn serialize(&self, stream: &mut Stream) {
stream.append(&self.request);
impl PayloadType for GetBlockTxn {
fn version() -> u32 {
70014
}
}
impl Deserializable for GetBlockTxn {
fn deserialize(reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
fn command() -> &'static str {
"getblocktxn"
}
fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
let get_block = GetBlockTxn {
request: try!(reader.read()),
};
Ok(get_block)
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
stream.append(&self.request);
Ok(())
}
}

View File

@ -0,0 +1,31 @@
use ser::{Stream, Reader};
use common::InventoryVector;
use {PayloadType, MessageResult};
#[derive(Debug, PartialEq)]
pub struct GetData {
pub inventory: Vec<InventoryVector>,
}
impl PayloadType for GetData {
fn version() -> u32 {
0
}
fn command() -> &'static str {
"getdata"
}
fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
let inv = GetData {
inventory: try!(reader.read_list_max(50_000)),
};
Ok(inv)
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
stream.append_list(&self.inventory);
Ok(())
}
}

View File

@ -0,0 +1,39 @@
use hash::H256;
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
#[derive(Debug, PartialEq)]
pub struct GetHeaders {
version: u32,
block_locator_hashes: Vec<H256>,
hash_stop: H256,
}
impl PayloadType for GetHeaders {
fn version() -> u32 {
0
}
fn command() -> &'static str {
"getheaders"
}
fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
let get_blocks = GetHeaders {
version: try!(reader.read()),
block_locator_hashes: try!(reader.read_list_max(2000)),
hash_stop: try!(reader.read()),
};
Ok(get_blocks)
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
stream
.append(&self.version)
.append_list(&self.block_locator_hashes)
.append(&self.hash_stop);
Ok(())
}
}

View File

@ -1,23 +1,32 @@
use chain::BlockHeader;
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
#[derive(Debug, PartialEq)]
pub struct Headers {
// TODO: Block headers need to have txn_count field
headers: Vec<BlockHeader>,
}
impl Serializable for Headers {
fn serialize(&self, stream: &mut Stream) {
stream.append_list(&self.headers);
impl PayloadType for Headers {
fn version() -> u32 {
0
}
}
impl Deserializable for Headers {
fn deserialize(reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
fn command() -> &'static str {
"headers"
}
fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
let headers = Headers {
headers: try!(reader.read_list()),
};
Ok(headers)
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
stream.append_list(&self.headers);
Ok(())
}
}

View File

@ -1,23 +1,31 @@
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
use ser::{Stream, Reader};
use common::InventoryVector;
use {PayloadType, MessageResult};
#[derive(Debug, PartialEq)]
pub struct Inv {
pub inventory: Vec<InventoryVector>,
}
impl Serializable for Inv {
fn serialize(&self, stream: &mut Stream) {
stream.append_list(&self.inventory);
impl PayloadType for Inv {
fn version() -> u32 {
0
}
}
impl Deserializable for Inv {
fn deserialize(reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
fn command() -> &'static str {
"inv"
}
fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
let inv = Inv {
inventory: try!(reader.read_list()),
inventory: try!(reader.read_list_max(50_000)),
};
Ok(inv)
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
stream.append_list(&self.inventory);
Ok(())
}
}

View File

@ -0,0 +1,23 @@
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
#[derive(Debug, PartialEq)]
pub struct MemPool;
impl PayloadType for MemPool {
fn version() -> u32 {
60002
}
fn command() -> &'static str {
"mempool"
}
fn deserialize_payload(_reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
Ok(MemPool)
}
fn serialize_payload(&self, _stream: &mut Stream, _version: u32) -> MessageResult<()> {
Ok(())
}
}

View File

@ -1,7 +1,8 @@
use hash::H256;
use bytes::Bytes;
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
use ser::{Stream, Reader};
use chain::BlockHeader;
use {PayloadType, MessageResult};
#[derive(Debug, PartialEq)]
pub struct MerkleBlock {
@ -11,18 +12,16 @@ pub struct MerkleBlock {
flags: Bytes,
}
impl Serializable for MerkleBlock {
fn serialize(&self, stream: &mut Stream) {
stream
.append(&self.block_header)
.append(&self.total_transactions)
.append_list(&self.hashes)
.append(&self.flags);
impl PayloadType for MerkleBlock {
fn version() -> u32 {
70014
}
}
impl Deserializable for MerkleBlock {
fn deserialize(reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
fn command() -> &'static str {
"merkleblock"
}
fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
let merkle_block = MerkleBlock {
block_header: try!(reader.read()),
total_transactions: try!(reader.read()),
@ -32,4 +31,13 @@ impl Deserializable for MerkleBlock {
Ok(merkle_block)
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
stream
.append(&self.block_header)
.append(&self.total_transactions)
.append_list(&self.hashes)
.append(&self.flags);
Ok(())
}
}

View File

@ -2,38 +2,48 @@ pub mod addr;
mod blocktxn;
mod compactblock;
mod feefilter;
mod filterload;
mod filteradd;
mod filterclear;
mod filterload;
mod getaddr;
mod getblocks;
mod getblocktxn;
mod getdata;
mod getheaders;
mod headers;
mod inv;
mod mempool;
mod merkle_block;
mod notfound;
mod ping;
mod pong;
pub mod reject;
mod sendcompact;
mod sendheaders;
mod verack;
pub mod version;
pub use self::addr::{Addr, AddrBelow31402};
pub use self::addr::Addr;
pub use self::blocktxn::BlockTxn;
pub use self::compactblock::CompactBlock;
pub use self::feefilter::FeeFilter;
pub use self::filterload::FilterLoad;
pub use self::filterclear::FilterClear;
pub use self::filteradd::FilterAdd;
pub use self::getaddr::GetAddr;
pub use self::getblocks::GetBlocks;
pub use self::getblocktxn::GetBlockTxn;
pub use self::getdata::GetData;
pub use self::getheaders::GetHeaders;
pub use self::headers::Headers;
pub use self::inv::Inv;
pub use self::mempool::MemPool;
pub use self::merkle_block::MerkleBlock;
pub use self::notfound::NotFound;
pub use self::ping::Ping;
pub use self::pong::Pong;
pub use self::reject::Reject;
pub use self::sendcompact::SendCompact;
pub use self::sendheaders::SendHeaders;
pub use self::verack::Verack;
pub use self::version::Version;
pub type GetData = Inv;
pub type NotFound = Inv;
pub type GetHeaders = GetBlocks;

View File

@ -0,0 +1,31 @@
use ser::{Stream, Reader};
use common::InventoryVector;
use {PayloadType, MessageResult};
#[derive(Debug, PartialEq)]
pub struct NotFound {
pub inventory: Vec<InventoryVector>,
}
impl PayloadType for NotFound {
fn version() -> u32 {
0
}
fn command() -> &'static str {
"notfound"
}
fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
let inv = NotFound {
inventory: try!(reader.read_list_max(50_000)),
};
Ok(inv)
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
stream.append_list(&self.inventory);
Ok(())
}
}

View File

@ -1,27 +1,11 @@
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
use serialization::PayloadType;
use ser::{Stream, Reader};
use {MessageResult, PayloadType};
#[derive(Debug, PartialEq)]
pub struct Ping {
pub nonce: u64,
}
impl Serializable for Ping {
fn serialize(&self, stream: &mut Stream) {
stream.append(&self.nonce);
}
}
impl Deserializable for Ping {
fn deserialize(reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
let ping = Ping {
nonce: try!(reader.read()),
};
Ok(ping)
}
}
impl PayloadType for Ping {
fn version() -> u32 {
0
@ -30,4 +14,17 @@ impl PayloadType for Ping {
fn command() -> &'static str {
"ping"
}
fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
let ping = Ping {
nonce: try!(reader.read()),
};
Ok(ping)
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
stream.append(&self.nonce);
Ok(())
}
}

View File

@ -1,27 +1,11 @@
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
use serialization::PayloadType;
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
#[derive(Debug, PartialEq)]
pub struct Pong {
pub nonce: u64,
}
impl Serializable for Pong {
fn serialize(&self, stream: &mut Stream) {
stream.append(&self.nonce);
}
}
impl Deserializable for Pong {
fn deserialize(reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
let ping = Pong {
nonce: try!(reader.read()),
};
Ok(ping)
}
}
impl PayloadType for Pong {
fn version() -> u32 {
0
@ -30,4 +14,17 @@ impl PayloadType for Pong {
fn command() -> &'static str {
"pong"
}
fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
let pong = Pong {
nonce: try!(reader.read()),
};
Ok(pong)
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
stream.append(&self.nonce);
Ok(())
}
}

View File

@ -1,5 +1,5 @@
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
use serialization::PayloadType;
use {PayloadType, MessageResult};
#[derive(Debug, PartialEq, Clone, Copy)]
#[repr(u8)]
@ -59,27 +59,6 @@ pub struct Reject {
// TODO: data
}
impl Serializable for Reject {
fn serialize(&self, stream: &mut Stream) {
stream
.append(&self.message)
.append(&self.code)
.append(&self.reason);
}
}
impl Deserializable for Reject {
fn deserialize(reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
let reject = Reject {
message: try!(reader.read()),
code: try!(reader.read()),
reason: try!(reader.read()),
};
Ok(reject)
}
}
impl PayloadType for Reject {
fn version() -> u32 {
0
@ -88,4 +67,22 @@ impl PayloadType for Reject {
fn command() -> &'static str {
"reject"
}
fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
let reject = Reject {
message: try!(reader.read()),
code: try!(reader.read()),
reason: try!(reader.read()),
};
Ok(reject)
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
stream
.append(&self.message)
.append(&self.code)
.append(&self.reason);
Ok(())
}
}

View File

@ -1,4 +1,5 @@
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
#[derive(Debug, PartialEq)]
pub struct SendCompact {
@ -6,16 +7,16 @@ pub struct SendCompact {
second: u64,
}
impl Serializable for SendCompact {
fn serialize(&self, stream: &mut Stream) {
stream
.append(&self.first)
.append(&self.second);
impl PayloadType for SendCompact {
fn version() -> u32 {
70014
}
}
impl Deserializable for SendCompact {
fn deserialize(reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
fn command() -> &'static str {
"sendcmpct"
}
fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
let send_compact = SendCompact {
first: try!(reader.read()),
second: try!(reader.read()),
@ -23,4 +24,11 @@ impl Deserializable for SendCompact {
Ok(send_compact)
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
stream
.append(&self.first)
.append(&self.second);
Ok(())
}
}

View File

@ -0,0 +1,23 @@
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
#[derive(Debug, PartialEq)]
pub struct SendHeaders;
impl PayloadType for SendHeaders {
fn version() -> u32 {
70012
}
fn command() -> &'static str {
"sendheaders"
}
fn deserialize_payload(_reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
Ok(SendHeaders)
}
fn serialize_payload(&self, _stream: &mut Stream, _version: u32) -> MessageResult<()> {
Ok(())
}
}

View File

@ -1,19 +1,9 @@
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
use serialization::PayloadType;
use ser::{Stream, Reader};
use {PayloadType, MessageResult};
#[derive(Debug, PartialEq)]
pub struct Verack;
impl Serializable for Verack {
fn serialize(&self, _stream: &mut Stream) {}
}
impl Deserializable for Verack {
fn deserialize(_reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
Ok(Verack)
}
}
impl PayloadType for Verack {
fn version() -> u32 {
0
@ -22,4 +12,12 @@ impl PayloadType for Verack {
fn command() -> &'static str {
"verack"
}
fn deserialize_payload(_reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
Ok(Verack)
}
fn serialize_payload(&self, _stream: &mut Stream, _version: u32) -> MessageResult<()> {
Ok(())
}
}

View File

@ -1,10 +1,11 @@
use bytes::Bytes;
use ser::{
Serializable, Stream,
Deserializable, Reader, Error as ReaderError, deserialize
Deserializable, Reader, Error as ReaderError,
};
use common::{NetAddress, ServiceFlags};
use serialization::PayloadType;
use {PayloadType, MessageResult};
use serialization::deserialize_payload;
#[derive(Debug, PartialEq)]
pub enum Version {
@ -21,6 +22,43 @@ impl PayloadType for Version {
fn command() -> &'static str {
"version"
}
// version package is an serialization excpetion
fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
let simple: V0 = try!(reader.read());
if simple.version < 106 {
return Ok(Version::V0(simple));
}
let v106: V106 = try!(reader.read());
if simple.version < 70001 {
Ok(Version::V106(simple, v106))
} else {
let v70001: V70001 = try!(reader.read());
Ok(Version::V70001(simple, v106, v70001))
}
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
match *self {
Version::V0(ref simple) => {
stream.append(simple);
},
Version::V106(ref simple, ref v106) => {
stream
.append(simple)
.append(v106);
},
Version::V70001(ref simple, ref v106, ref v70001) => {
stream
.append(simple)
.append(v106)
.append(v70001);
},
}
Ok(())
}
}
impl Version {
@ -54,45 +92,6 @@ pub struct V70001 {
pub relay: bool,
}
impl Serializable for Version {
fn serialize(&self, stream: &mut Stream) {
match *self {
Version::V0(ref simple) => {
stream.append(simple);
},
Version::V106(ref simple, ref v106) => {
stream
.append(simple)
.append(v106);
},
Version::V70001(ref simple, ref v106, ref v70001) => {
stream
.append(simple)
.append(v106)
.append(v70001);
},
}
}
}
impl Deserializable for Version {
fn deserialize(reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
let simple: V0 = try!(reader.read());
if simple.version < 106 {
return Ok(Version::V0(simple));
}
let v106: V106 = try!(reader.read());
if simple.version < 70001 {
Ok(Version::V106(simple, v106))
} else {
let v70001: V70001 = try!(reader.read());
Ok(Version::V70001(simple, v106, v70001))
}
}
}
impl Serializable for V0 {
fn serialize(&self, stream: &mut Stream) {
stream
@ -158,14 +157,14 @@ impl Deserializable for V70001 {
impl From<&'static str> for Version {
fn from(s: &'static str) -> Self {
let bytes: Bytes = s.into();
deserialize(&bytes).unwrap()
deserialize_payload(&bytes, 0).unwrap()
}
}
#[cfg(test)]
mod test {
use bytes::Bytes;
use ser::{serialize, deserialize};
use serialization::{serialize_payload, deserialize_payload};
use super::{Version, V0, V106};
#[test]
@ -184,7 +183,7 @@ mod test {
start_height: 98645,
});
assert_eq!(serialize(&version), expected);
assert_eq!(serialize_payload(&version, 0), Ok(expected));
}
#[test]
@ -203,6 +202,6 @@ mod test {
start_height: 98645,
});
assert_eq!(expected, deserialize(&raw).unwrap());
assert_eq!(expected, deserialize_payload(&raw, 0).unwrap());
}
}

View File

@ -1,9 +1,9 @@
use std::{io, cmp};
use futures::{Future, Poll, Async};
use message::{Message, Payload};
use message::Message;
use message::types::{Version, Verack};
use message::common::Magic;
use io::{write_message, read_message, ReadMessage, WriteMessage, ReadSpecificMessage, read_specific_message};
use io::{write_message, WriteMessage, ReadSpecificMessage, read_specific_message};
use Error;
pub fn handshake<A>(a: A, magic: Magic, version: Version) -> Handshake<A> where A: io::Write + io::Read {
@ -25,7 +25,6 @@ pub fn accept_handshake<A>(a: A, magic: Magic, version: Version) -> AcceptHandsh
}
}
/// TODO: return Err if other version is not supported
pub fn negotiate_version(local: u32, other: u32) -> u32 {
cmp::min(local, other)
}
@ -36,17 +35,17 @@ pub struct HandshakeResult {
pub negotiated_version: u32,
}
fn version_message(magic: Magic, version: Version) -> Message {
Message::new(magic, Payload::Version(version))
fn version_message(magic: Magic, version: Version) -> Message<Version> {
Message::new(magic, version.version(), &version).expect("version message should always be serialized correctly")
}
fn verack_message(magic: Magic) -> Message {
Message::new(magic, Payload::Verack)
fn verack_message(magic: Magic) -> Message<Verack> {
Message::new(magic, 0, &Verack).expect("verack message should always be serialized correctly")
}
enum HandshakeState<A> {
SendVersion(WriteMessage<A>),
ReceiveVersion(ReadMessage<A>),
SendVersion(WriteMessage<Version, A>),
ReceiveVersion(ReadSpecificMessage<Version, A>),
ReceiveVerack {
version: Option<Version>,
future: ReadSpecificMessage<Verack, A>,
@ -61,11 +60,11 @@ enum AcceptHandshakeState<A> {
},
SendVersion {
version: Option<Version>,
future: WriteMessage<A>,
future: WriteMessage<Version, A>,
},
SendVerack {
version: Option<Version>,
future: WriteMessage<A>,
future: WriteMessage<Verack, A>,
},
Finished,
}
@ -90,13 +89,12 @@ impl<A> Future for Handshake<A> where A: io::Read + io::Write {
let (next, result) = match self.state {
HandshakeState::SendVersion(ref mut future) => {
let (stream, _) = try_ready!(future.poll());
(HandshakeState::ReceiveVersion(read_message(stream, self.magic, 0)), Async::NotReady)
(HandshakeState::ReceiveVersion(read_specific_message(stream, self.magic, 0)), Async::NotReady)
},
HandshakeState::ReceiveVersion(ref mut future) => {
let (stream, payload) = try_ready!(future.poll());
let version = match payload {
Ok(Payload::Version(version)) => version,
Ok(_) => return Ok((stream, Err(Error::Handshake)).into()),
let (stream, version) = try_ready!(future.poll());
let version = match version {
Ok(version) => version,
Err(err) => return Ok((stream, Err(err.into())).into()),
};

View File

@ -1,7 +1,7 @@
mod handshake;
mod read_header;
mod read_message;
mod read_payload;
//mod read_message;
//mod read_payload;
mod read_specific_message;
mod read_specific_payload;
mod readrc;
@ -11,8 +11,8 @@ pub use self::handshake::{
handshake, accept_handshake, Handshake, AcceptHandshake, HandshakeResult
};
pub use self::read_header::{read_header, ReadHeader};
pub use self::read_message::{read_message, ReadMessage, read_message_stream, ReadMessageStream};
pub use self::read_payload::{read_payload, ReadPayload};
//pub use self::read_message::{read_message, ReadMessage, read_message_stream, ReadMessageStream};
//pub use self::read_payload::{read_payload, ReadPayload};
pub use self::read_specific_payload::{read_specific_payload, ReadSpecificPayload};
pub use self::read_specific_message::{read_specific_message, ReadSpecificMessage};
pub use self::readrc::ReadRc;

View File

@ -1,14 +1,13 @@
use std::io;
use std::marker::PhantomData;
use futures::{Poll, Future, Async};
use ser::Deserializable;
use message::{MessageResult, Error};
use message::common::Magic;
use message::serialization::PayloadType;
use io::{read_header, ReadHeader, read_specific_payload, ReadSpecificPayload};
pub fn read_specific_message<M, A>(a: A, magic: Magic, version: u32) -> ReadSpecificMessage<M, A>
where A: io::Read, M: PayloadType + Deserializable {
where A: io::Read, M: PayloadType {
ReadSpecificMessage {
state: ReadMessageState::ReadHeader {
version: version,
@ -34,7 +33,7 @@ pub struct ReadSpecificMessage<M, A> {
message_type: PhantomData<M>,
}
impl<M, A> Future for ReadSpecificMessage<M, A> where A: io::Read, M: PayloadType + Deserializable {
impl<M, A> Future for ReadSpecificMessage<M, A> where A: io::Read, M: PayloadType {
type Item = (A, MessageResult<M>);
type Error = io::Error;

View File

@ -4,12 +4,11 @@ use futures::{Poll, Future};
use tokio_core::io::{read_exact, ReadExact};
use bytes::Bytes;
use hash::H32;
use ser::Deserializable;
use message::MessageResult;
use message::serialization::{PayloadType, deserialize_payload};
pub fn read_specific_payload<M, A>(a: A, version: u32, len: usize, checksum: H32) -> ReadSpecificPayload<M, A>
where A: io::Read, M: PayloadType + Deserializable {
where A: io::Read, M: PayloadType {
ReadSpecificPayload {
reader: read_exact(a, Bytes::new_with_len(len)),
version: version,
@ -26,7 +25,7 @@ pub struct ReadSpecificPayload<M, A> {
}
/// TODO: check checksum
impl<M, A> Future for ReadSpecificPayload<M, A> where A: io::Read, M: PayloadType + Deserializable {
impl<M, A> Future for ReadSpecificPayload<M, A> where A: io::Read, M: PayloadType {
type Item = (A, MessageResult<M>);
type Error = io::Error;

View File

@ -1,29 +1,23 @@
use std::io;
use futures::{Future, Poll};
use tokio_core::io::{WriteAll, write_all};
use bytes::Bytes;
use ser::serialize;
use message::Message;
pub fn write_message<A>(a: A, message: Message) -> WriteMessage<A> where A: io::Write {
pub fn write_message<M, A>(a: A, message: Message<M>) -> WriteMessage<M, A> where A: io::Write {
WriteMessage {
future: write_all(a, serialize(&message)),
message: Some(message),
future: write_all(a, message),
}
}
pub struct WriteMessage<A> {
future: WriteAll<A, Bytes>,
message: Option<Message>,
pub struct WriteMessage<M, A> {
future: WriteAll<A, Message<M>>,
}
impl<A> Future for WriteMessage<A> where A: io::Write {
type Item = (A, Message);
impl<M, A> Future for WriteMessage<M, A> where A: io::Write {
type Item = (A, Message<M>);
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (stream, _) = try_ready!(self.future.poll());
let message = self.message.take().expect("write message must be initialized with message");
Ok((stream, message).into())
self.future.poll()
}
}

View File

@ -10,21 +10,21 @@ extern crate primitives;
extern crate serialization as ser;
pub mod io;
pub mod net;
//pub mod net;
pub mod util;
mod config;
//mod config;
mod error;
mod event_loop;
mod run;
//mod run;
pub const VERSION: u32 = 70_001;
pub const USER_AGENT: &'static str = "pbtc";
pub use primitives::{hash, bytes};
pub use config::Config;
//pub use config::Config;
pub use error::Error;
pub use event_loop::event_loop;
pub use run::run;
//pub use run::run;
pub type P2PResult<T> = Result<T, Error>;

View File

@ -1,4 +1,4 @@
use std::{ops, str, fmt, io};
use std::{ops, str, fmt, io, marker};
use hex::{ToHex, FromHex, FromHexError};
#[derive(Default, PartialEq, Clone)]
@ -78,6 +78,51 @@ impl AsMut<[u8]> for Bytes {
}
}
#[derive(Default, PartialEq, Clone)]
pub struct TaggedBytes<T> {
bytes: Bytes,
label: marker::PhantomData<T>,
}
impl<T> TaggedBytes<T> {
pub fn new(bytes: Bytes) -> Self {
TaggedBytes {
bytes: bytes,
label: marker::PhantomData,
}
}
pub fn into_raw(self) -> Bytes {
self.bytes
}
}
impl<T> ops::Deref for TaggedBytes<T> {
type Target = Vec<u8>;
fn deref(&self) -> &Self::Target {
&self.bytes.0
}
}
impl<T> ops::DerefMut for TaggedBytes<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.bytes.0
}
}
impl<T> AsRef<[u8]> for TaggedBytes<T> {
fn as_ref(&self) -> &[u8] {
&self.bytes.0
}
}
impl<T> AsMut<[u8]> for TaggedBytes<T> {
fn as_mut(&mut self) -> &mut [u8] {
&mut self.bytes.0
}
}
#[cfg(test)]
mod tests {
use super::Bytes;

View File

@ -67,7 +67,22 @@ impl<'a> Reader<'a> {
pub fn read_list<T>(&mut self) -> Result<Vec<T>, Error> where T: Deserializable {
let len: usize = try!(self.read::<CompactInteger>()).into();
let mut result = vec![];
let mut result = Vec::with_capacity(len);
for _ in 0..len {
result.push(try!(self.read()));
}
Ok(result)
}
pub fn read_list_max<T>(&mut self, max: usize) -> Result<Vec<T>, Error> where T: Deserializable {
let len: usize = try!(self.read::<CompactInteger>()).into();
if len > max {
return Err(Error::MalformedData);
}
let mut result = Vec::with_capacity(len);
for _ in 0..len {
result.push(try!(self.read()));

View File

@ -43,6 +43,14 @@ impl Stream {
self
}
pub fn append_list_ref<T>(&mut self, t: &[&T]) -> &mut Self where T: Serializable {
CompactInteger::from(t.len()).serialize(self);
for i in t {
i.serialize(self);
}
self
}
/// Full stream.
pub fn out(self) -> Bytes {
self.buffer.into()