message cleanup

This commit is contained in:
debris 2016-10-10 11:56:01 +02:00
parent 4417a5d9c7
commit 3873cf6b6b
21 changed files with 348 additions and 35 deletions

1
.gitignore vendored
View File

@ -5,3 +5,4 @@
*.swp
*.swo
*.swn
*.DS_Store

View File

@ -12,6 +12,8 @@ pub enum Error {
WrongMagic,
/// Invalid checksum.
InvalidChecksum,
/// Invalid version.
InvalidVersion,
}
impl From<ReaderError> for Error {

View File

@ -6,6 +6,7 @@ extern crate serialization as ser;
pub mod common;
mod message;
pub mod serialization;
pub mod types;
mod error;

View File

@ -0,0 +1,19 @@
mod stream;
mod reader;
use ser::{Reader, Deserializable};
use {MessageResult, Error};
pub use self::stream::PayloadStream;
pub use self::reader::{PayloadReader, deserialize_payload};
pub trait PayloadType: Deserializable {
fn version() -> u32;
fn command() -> &'static str;
fn deserialize_payload(reader: &mut Reader, version: u32) -> MessageResult<Self> where Self: Sized {
if version < Self::version() {
return Err(Error::InvalidVersion);
}
Self::deserialize(reader).map_err(Into::into)
}
}

View File

@ -0,0 +1,35 @@
use ser::Reader;
use serialization::PayloadType;
use Error;
pub fn deserialize_payload<T>(buffer: &[u8], version: u32) -> Result<T, Error> where T: PayloadType {
let mut reader = PayloadReader::new(buffer, version);
let result = try!(reader.read());
if !reader.is_finished() {
return Err(Error::Deserialize);
}
Ok(result)
}
pub struct PayloadReader<'a> {
reader: Reader<'a>,
version: u32,
}
impl<'a> PayloadReader<'a> {
pub fn new(buffer: &'a [u8], version: u32) -> Self {
PayloadReader {
reader: Reader::new(buffer),
version: version,
}
}
pub fn read<T>(&mut self) -> Result<T, Error> where T: PayloadType {
T::deserialize_payload(&mut self.reader, self.version)
}
pub fn is_finished(&self) -> bool {
self.reader.is_finished()
}
}

View File

@ -0,0 +1,26 @@
use ser::{Stream, Serializable};
use serialization::PayloadType;
use Error;
pub struct PayloadStream {
stream: Stream,
version: u32,
}
impl PayloadStream {
pub fn new(version: u32) -> Self {
PayloadStream {
stream: Stream::default(),
version: version,
}
}
pub fn append<T>(&mut self, t: &T) -> Result<(), Error> where T: PayloadType + Serializable {
if self.version < T::version() {
return Err(Error::InvalidVersion);
}
t.serialize(&mut self.stream);
Ok(())
}
}

View File

@ -2,6 +2,7 @@ use ser::{
Serializable, Stream,
Deserializable, Reader, Error as ReaderError,
};
use serialization::PayloadType;
use common::NetAddress;
#[derive(Debug, PartialEq)]

View File

@ -10,8 +10,10 @@ mod headers;
mod inv;
mod merkle_block;
mod ping;
mod pong;
pub mod reject;
mod sendcompact;
mod verack;
pub mod version;
pub use self::addr::{Addr, AddrBelow31402};
@ -26,11 +28,12 @@ pub use self::headers::Headers;
pub use self::inv::Inv;
pub use self::merkle_block::MerkleBlock;
pub use self::ping::Ping;
pub use self::pong::Pong;
pub use self::reject::Reject;
pub use self::sendcompact::SendCompact;
pub use self::verack::Verack;
pub use self::version::Version;
pub type GetData = Inv;
pub type NotFound = Inv;
pub type GetHeaders = GetBlocks;
pub type Pong = Ping;

View File

@ -1,4 +1,5 @@
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
use serialization::PayloadType;
#[derive(Debug, PartialEq)]
pub struct Ping {
@ -20,3 +21,13 @@ impl Deserializable for Ping {
Ok(ping)
}
}
impl PayloadType for Ping {
fn version() -> u32 {
0
}
fn command() -> &'static str {
"ping"
}
}

33
message/src/types/pong.rs Normal file
View File

@ -0,0 +1,33 @@
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
use serialization::PayloadType;
#[derive(Debug, PartialEq)]
pub struct Pong {
pub nonce: u64,
}
impl Serializable for Pong {
fn serialize(&self, stream: &mut Stream) {
stream.append(&self.nonce);
}
}
impl Deserializable for Pong {
fn deserialize(reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
let ping = Pong {
nonce: try!(reader.read()),
};
Ok(ping)
}
}
impl PayloadType for Pong {
fn version() -> u32 {
0
}
fn command() -> &'static str {
"pong"
}
}

View File

@ -1,4 +1,5 @@
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
use serialization::PayloadType;
#[derive(Debug, PartialEq, Clone, Copy)]
#[repr(u8)]
@ -78,3 +79,13 @@ impl Deserializable for Reject {
Ok(reject)
}
}
impl PayloadType for Reject {
fn version() -> u32 {
0
}
fn command() -> &'static str {
"reject"
}
}

View File

@ -0,0 +1,25 @@
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
use serialization::PayloadType;
#[derive(Debug, PartialEq)]
pub struct Verack;
impl Serializable for Verack {
fn serialize(&self, _stream: &mut Stream) {}
}
impl Deserializable for Verack {
fn deserialize(_reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
Ok(Verack)
}
}
impl PayloadType for Verack {
fn version() -> u32 {
0
}
fn command() -> &'static str {
"verack"
}
}

View File

@ -4,18 +4,29 @@ use ser::{
Deserializable, Reader, Error as ReaderError, deserialize
};
use common::{NetAddress, ServiceFlags};
use serialization::PayloadType;
#[derive(Debug, PartialEq)]
pub enum Version {
Simple(Simple),
V106(Simple, V106),
V70001(Simple, V106, V70001),
V0(V0),
V106(V0, V106),
V70001(V0, V106, V70001),
}
impl PayloadType for Version {
fn version() -> u32 {
0
}
fn command() -> &'static str {
"version"
}
}
impl Version {
pub fn version(&self) -> u32 {
match *self {
Version::Simple(ref s) => s.version,
Version::V0(ref s) => s.version,
Version::V106(ref s, _) => s.version,
Version::V70001(ref s, _, _) => s.version,
}
@ -23,7 +34,7 @@ impl Version {
}
#[derive(Debug, PartialEq)]
pub struct Simple {
pub struct V0 {
pub version: u32,
pub services: ServiceFlags,
pub timestamp: i64,
@ -46,7 +57,7 @@ pub struct V70001 {
impl Serializable for Version {
fn serialize(&self, stream: &mut Stream) {
match *self {
Version::Simple(ref simple) => {
Version::V0(ref simple) => {
stream.append(simple);
},
Version::V106(ref simple, ref v106) => {
@ -66,10 +77,10 @@ impl Serializable for Version {
impl Deserializable for Version {
fn deserialize(reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
let simple: Simple = try!(reader.read());
let simple: V0 = try!(reader.read());
if simple.version < 106 {
return Ok(Version::Simple(simple));
return Ok(Version::V0(simple));
}
let v106: V106 = try!(reader.read());
@ -82,7 +93,7 @@ impl Deserializable for Version {
}
}
impl Serializable for Simple {
impl Serializable for V0 {
fn serialize(&self, stream: &mut Stream) {
stream
.append(&self.version)
@ -92,9 +103,9 @@ impl Serializable for Simple {
}
}
impl Deserializable for Simple {
impl Deserializable for V0 {
fn deserialize(reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
let result = Simple {
let result = V0 {
version: try!(reader.read()),
services: try!(reader.read()),
timestamp: try!(reader.read()),
@ -155,13 +166,13 @@ impl From<&'static str> for Version {
mod test {
use bytes::Bytes;
use ser::{serialize, deserialize};
use super::{Version, Simple, V106};
use super::{Version, V0, V106};
#[test]
fn test_version_serialize() {
let expected: Bytes = "9c7c00000100000000000000e615104d00000000010000000000000000000000000000000000ffff0a000001208d010000000000000000000000000000000000ffff0a000002208ddd9d202c3ab457130055810100".into();
let version = Version::V106(Simple {
let version = Version::V106(V0 {
version: 31900,
services: 1u64.into(),
timestamp: 0x4d1015e6,
@ -180,7 +191,7 @@ mod test {
fn test_version_deserialize() {
let raw: Bytes = "9c7c00000100000000000000e615104d00000000010000000000000000000000000000000000ffff0a000001208d010000000000000000000000000000000000ffff0a000002208ddd9d202c3ab457130055810100".into();
let expected = Version::V106(Simple {
let expected = Version::V106(V0 {
version: 31900,
services: 1u64.into(),
timestamp: 0x4d1015e6,

View File

@ -1,9 +1,9 @@
use std::{io, cmp};
use futures::{Future, Poll, Async};
use message::{Message, Payload};
use message::types::Version;
use message::types::{Version, Verack};
use message::common::Magic;
use io::{write_message, read_message, ReadMessage, WriteMessage};
use io::{write_message, read_message, ReadMessage, WriteMessage, ReadSpecificMessage, read_specific_message};
use Error;
pub fn handshake<A>(a: A, magic: Magic, version: Version) -> Handshake<A> where A: io::Write + io::Read {
@ -19,7 +19,7 @@ pub fn accept_handshake<A>(a: A, magic: Magic, version: Version) -> AcceptHandsh
version: version.version(),
state: AcceptHandshakeState::ReceiveVersion {
local_version: Some(version),
future: read_message(a, magic, 0),
future: read_specific_message(a, magic, 0),
},
magic: magic,
}
@ -49,7 +49,7 @@ enum HandshakeState<A> {
ReceiveVersion(ReadMessage<A>),
ReceiveVerack {
version: Option<Version>,
future: ReadMessage<A>,
future: ReadSpecificMessage<Verack, A>,
},
Finished,
}
@ -57,7 +57,7 @@ enum HandshakeState<A> {
enum AcceptHandshakeState<A> {
ReceiveVersion {
local_version: Option<Version>,
future: ReadMessage<A>
future: ReadSpecificMessage<Version, A>
},
SendVersion {
version: Option<Version>,
@ -102,19 +102,13 @@ impl<A> Future for Handshake<A> where A: io::Read + io::Write {
let next = HandshakeState::ReceiveVerack {
version: Some(version),
future: read_message(stream, self.magic, 0),
future: read_specific_message(stream, self.magic, 0),
};
(next, Async::NotReady)
},
HandshakeState::ReceiveVerack { ref mut version, ref mut future } => {
let (stream, payload) = try_ready!(future.poll());
match payload {
Ok(Payload::Verack) => (),
Ok(_) => return Ok((stream, Err(Error::Handshake)).into()),
Err(err) => return Ok((stream, Err(err.into())).into()),
}
let (stream, _verack) = try_ready!(future.poll());
let version = version.take().expect("verack must be preceded by version");
let result = HandshakeResult {
@ -143,10 +137,9 @@ impl<A> Future for AcceptHandshake<A> where A: io::Read + io::Write {
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (next, result) = match self.state {
AcceptHandshakeState::ReceiveVersion { ref mut local_version, ref mut future } => {
let (stream, payload) = try_ready!(future.poll());
let version = match payload {
Ok(Payload::Version(version)) => version,
Ok(_) => return Ok((stream, Err(Error::Handshake)).into()),
let (stream, version) = try_ready!(future.poll());
let version = match version {
Ok(version) => version,
Err(err) => return Ok((stream, Err(err.into())).into()),
};

View File

@ -2,6 +2,8 @@ mod handshake;
mod read_header;
mod read_message;
mod read_payload;
mod read_specific_message;
mod read_specific_payload;
mod readrc;
mod write_message;
@ -11,5 +13,7 @@ pub use self::handshake::{
pub use self::read_header::{read_header, ReadHeader};
pub use self::read_message::{read_message, ReadMessage, read_message_stream, ReadMessageStream};
pub use self::read_payload::{read_payload, ReadPayload};
pub use self::read_specific_payload::{read_specific_payload, ReadSpecificPayload};
pub use self::read_specific_message::{read_specific_message, ReadSpecificMessage};
pub use self::readrc::ReadRc;
pub use self::write_message::{write_message, WriteMessage};

View File

@ -73,7 +73,7 @@ impl<A> Future for ReadMessage<A> where A: io::Read {
let (read, payload) = try_ready!(future.poll());
(ReadMessageState::Finished, Async::Ready((read, payload)))
},
ReadMessageState::Finished => panic!("poll AcceptHandshake after it's done"),
ReadMessageState::Finished => panic!("poll ReadMessage after it's done"),
};
self.state = next;

View File

@ -0,0 +1,76 @@
use std::io;
use std::marker::PhantomData;
use futures::{Poll, Future, Async};
use ser::Deserializable;
use message::{MessageResult, Error};
use message::common::Magic;
use message::serialization::PayloadType;
use io::{read_header, ReadHeader, read_specific_payload, ReadSpecificPayload};
pub fn read_specific_message<M, A>(a: A, magic: Magic, version: u32) -> ReadSpecificMessage<M, A>
where A: io::Read, M: PayloadType + Deserializable {
ReadSpecificMessage {
state: ReadMessageState::ReadHeader {
version: version,
future: read_header(a, magic),
},
message_type: PhantomData
}
}
enum ReadMessageState<M, A> {
ReadHeader {
version: u32,
future: ReadHeader<A>,
},
ReadPayload {
future: ReadSpecificPayload<M, A>,
},
Finished,
}
pub struct ReadSpecificMessage<M, A> {
state: ReadMessageState<M, A>,
message_type: PhantomData<M>,
}
impl<M, A> Future for ReadSpecificMessage<M, A> where A: io::Read, M: PayloadType + Deserializable {
type Item = (A, MessageResult<M>);
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (next, result) = match self.state {
ReadMessageState::ReadHeader { version, ref mut future } => {
let (read, header) = try_ready!(future.poll());
let header = match header {
Ok(header) => header,
Err(err) => {
return Ok((read, Err(err)).into());
}
};
if header.command != M::command().into() {
return Ok((read, Err(Error::InvalidCommand)).into());
}
let future = read_specific_payload(
read, version, header.len as usize, header.checksum,
);
let next = ReadMessageState::ReadPayload {
future: future,
};
(next, Async::NotReady)
},
ReadMessageState::ReadPayload { ref mut future } => {
let (read, payload) = try_ready!(future.poll());
(ReadMessageState::Finished, Async::Ready((read, payload)))
},
ReadMessageState::Finished => panic!("poll ReadSpecificMessage after it's done"),
};
self.state = next;
match result {
// by polling again, we register new future
Async::NotReady => self.poll(),
result => Ok(result)
}
}
}

View File

@ -0,0 +1,38 @@
use std::io;
use std::marker::PhantomData;
use futures::{Poll, Future};
use tokio_core::io::{read_exact, ReadExact};
use bytes::Bytes;
use hash::H32;
use ser::Deserializable;
use message::MessageResult;
use message::serialization::{PayloadType, deserialize_payload};
pub fn read_specific_payload<M, A>(a: A, version: u32, len: usize, checksum: H32) -> ReadSpecificPayload<M, A>
where A: io::Read, M: PayloadType + Deserializable {
ReadSpecificPayload {
reader: read_exact(a, Bytes::new_with_len(len)),
version: version,
checksum: checksum,
payload_type: PhantomData,
}
}
pub struct ReadSpecificPayload<M, A> {
reader: ReadExact<A, Bytes>,
version: u32,
checksum: H32,
payload_type: PhantomData<M>,
}
/// TODO: check checksum
impl<M, A> Future for ReadSpecificPayload<M, A> where A: io::Read, M: PayloadType + Deserializable {
type Item = (A, MessageResult<M>);
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (read, data) = try_ready!(self.reader.poll());
let payload = deserialize_payload(&data, self.version);
Ok((read, payload).into())
}
}

View File

@ -1,6 +1,6 @@
use std::net::SocketAddr;
use message::common::{Magic, ServiceFlags, NetAddress};
use message::types::version::{Version, Simple, V106, V70001};
use message::types::version::{Version, V0, V106, V70001};
use util::time::{Time, RealTime};
use util::nonce::{NonceGenerator, RandomNonce};
use VERSION;
@ -17,7 +17,7 @@ pub struct Config {
impl Config {
pub fn version(&self, to: &SocketAddr) -> Version {
Version::V70001(Simple {
Version::V70001(V0 {
version: VERSION,
services: self.services,
timestamp: RealTime.get().sec,

View File

@ -0,0 +1,22 @@
use futures::{oneshot, Oneshot};
use message::Payload;
use net::Connection;
pub struct Connections {
channels: Vec<Connection>,
}
impl Connections {
/// Broadcast messages to the network.
/// Returned future completes of first confirmed receive.
pub fn broadcast(&self, payload: Payload) -> Oneshot<Payload> {
let (complete, os) = oneshot::<Payload>();
let mut complete = Some(complete);
for channel in &self.channels {
//channel.write_message(
}
os
}
}

View File

@ -1,6 +1,7 @@
mod config;
mod connect;
mod connection;
mod connections;
mod listen;
pub use self::config::Config;