diff --git a/.gitignore b/.gitignore index 6fe8be80..1f1c8bf0 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ *.swp *.swo *.swn +*.DS_Store diff --git a/message/src/error.rs b/message/src/error.rs index cae9cc68..69ba2948 100644 --- a/message/src/error.rs +++ b/message/src/error.rs @@ -12,6 +12,8 @@ pub enum Error { WrongMagic, /// Invalid checksum. InvalidChecksum, + /// Invalid version. + InvalidVersion, } impl From for Error { diff --git a/message/src/lib.rs b/message/src/lib.rs index 250e8e6b..f039fb28 100644 --- a/message/src/lib.rs +++ b/message/src/lib.rs @@ -6,6 +6,7 @@ extern crate serialization as ser; pub mod common; mod message; +pub mod serialization; pub mod types; mod error; diff --git a/message/src/serialization/mod.rs b/message/src/serialization/mod.rs new file mode 100644 index 00000000..df8c9d57 --- /dev/null +++ b/message/src/serialization/mod.rs @@ -0,0 +1,19 @@ +mod stream; +mod reader; + +use ser::{Reader, Deserializable}; +use {MessageResult, Error}; +pub use self::stream::PayloadStream; +pub use self::reader::{PayloadReader, deserialize_payload}; + +pub trait PayloadType: Deserializable { + fn version() -> u32; + fn command() -> &'static str; + fn deserialize_payload(reader: &mut Reader, version: u32) -> MessageResult where Self: Sized { + if version < Self::version() { + return Err(Error::InvalidVersion); + } + + Self::deserialize(reader).map_err(Into::into) + } +} diff --git a/message/src/serialization/reader.rs b/message/src/serialization/reader.rs new file mode 100644 index 00000000..f0fa63e8 --- /dev/null +++ b/message/src/serialization/reader.rs @@ -0,0 +1,35 @@ +use ser::Reader; +use serialization::PayloadType; +use Error; + +pub fn deserialize_payload(buffer: &[u8], version: u32) -> Result where T: PayloadType { + let mut reader = PayloadReader::new(buffer, version); + let result = try!(reader.read()); + if !reader.is_finished() { + return Err(Error::Deserialize); + } + + Ok(result) +} + +pub struct PayloadReader<'a> { + reader: Reader<'a>, + version: u32, +} + +impl<'a> PayloadReader<'a> { + pub fn new(buffer: &'a [u8], version: u32) -> Self { + PayloadReader { + reader: Reader::new(buffer), + version: version, + } + } + + pub fn read(&mut self) -> Result where T: PayloadType { + T::deserialize_payload(&mut self.reader, self.version) + } + + pub fn is_finished(&self) -> bool { + self.reader.is_finished() + } +} diff --git a/message/src/serialization/stream.rs b/message/src/serialization/stream.rs new file mode 100644 index 00000000..fabd6315 --- /dev/null +++ b/message/src/serialization/stream.rs @@ -0,0 +1,26 @@ +use ser::{Stream, Serializable}; +use serialization::PayloadType; +use Error; + +pub struct PayloadStream { + stream: Stream, + version: u32, +} + +impl PayloadStream { + pub fn new(version: u32) -> Self { + PayloadStream { + stream: Stream::default(), + version: version, + } + } + + pub fn append(&mut self, t: &T) -> Result<(), Error> where T: PayloadType + Serializable { + if self.version < T::version() { + return Err(Error::InvalidVersion); + } + + t.serialize(&mut self.stream); + Ok(()) + } +} diff --git a/message/src/types/addr.rs b/message/src/types/addr.rs index 93993cc5..db95433b 100644 --- a/message/src/types/addr.rs +++ b/message/src/types/addr.rs @@ -2,6 +2,7 @@ use ser::{ Serializable, Stream, Deserializable, Reader, Error as ReaderError, }; +use serialization::PayloadType; use common::NetAddress; #[derive(Debug, PartialEq)] diff --git a/message/src/types/mod.rs b/message/src/types/mod.rs index 95c061e4..3a6d5631 100644 --- a/message/src/types/mod.rs +++ b/message/src/types/mod.rs @@ -10,8 +10,10 @@ mod headers; mod inv; mod merkle_block; mod ping; +mod pong; pub mod reject; mod sendcompact; +mod verack; pub mod version; pub use self::addr::{Addr, AddrBelow31402}; @@ -26,11 +28,12 @@ pub use self::headers::Headers; pub use self::inv::Inv; pub use self::merkle_block::MerkleBlock; pub use self::ping::Ping; +pub use self::pong::Pong; pub use self::reject::Reject; pub use self::sendcompact::SendCompact; +pub use self::verack::Verack; pub use self::version::Version; pub type GetData = Inv; pub type NotFound = Inv; pub type GetHeaders = GetBlocks; -pub type Pong = Ping; diff --git a/message/src/types/ping.rs b/message/src/types/ping.rs index d65d5223..08842e9d 100644 --- a/message/src/types/ping.rs +++ b/message/src/types/ping.rs @@ -1,4 +1,5 @@ use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError}; +use serialization::PayloadType; #[derive(Debug, PartialEq)] pub struct Ping { @@ -20,3 +21,13 @@ impl Deserializable for Ping { Ok(ping) } } + +impl PayloadType for Ping { + fn version() -> u32 { + 0 + } + + fn command() -> &'static str { + "ping" + } +} diff --git a/message/src/types/pong.rs b/message/src/types/pong.rs new file mode 100644 index 00000000..2a6d8ffe --- /dev/null +++ b/message/src/types/pong.rs @@ -0,0 +1,33 @@ +use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError}; +use serialization::PayloadType; + +#[derive(Debug, PartialEq)] +pub struct Pong { + pub nonce: u64, +} + +impl Serializable for Pong { + fn serialize(&self, stream: &mut Stream) { + stream.append(&self.nonce); + } +} + +impl Deserializable for Pong { + fn deserialize(reader: &mut Reader) -> Result where Self: Sized { + let ping = Pong { + nonce: try!(reader.read()), + }; + + Ok(ping) + } +} + +impl PayloadType for Pong { + fn version() -> u32 { + 0 + } + + fn command() -> &'static str { + "pong" + } +} diff --git a/message/src/types/reject.rs b/message/src/types/reject.rs index 8f1a1161..307a1b43 100644 --- a/message/src/types/reject.rs +++ b/message/src/types/reject.rs @@ -1,4 +1,5 @@ use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError}; +use serialization::PayloadType; #[derive(Debug, PartialEq, Clone, Copy)] #[repr(u8)] @@ -78,3 +79,13 @@ impl Deserializable for Reject { Ok(reject) } } + +impl PayloadType for Reject { + fn version() -> u32 { + 0 + } + + fn command() -> &'static str { + "reject" + } +} diff --git a/message/src/types/verack.rs b/message/src/types/verack.rs new file mode 100644 index 00000000..2aab3db7 --- /dev/null +++ b/message/src/types/verack.rs @@ -0,0 +1,25 @@ +use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError}; +use serialization::PayloadType; + +#[derive(Debug, PartialEq)] +pub struct Verack; + +impl Serializable for Verack { + fn serialize(&self, _stream: &mut Stream) {} +} + +impl Deserializable for Verack { + fn deserialize(_reader: &mut Reader) -> Result where Self: Sized { + Ok(Verack) + } +} + +impl PayloadType for Verack { + fn version() -> u32 { + 0 + } + + fn command() -> &'static str { + "verack" + } +} diff --git a/message/src/types/version.rs b/message/src/types/version.rs index 325f73b5..f0624ba9 100644 --- a/message/src/types/version.rs +++ b/message/src/types/version.rs @@ -4,18 +4,29 @@ use ser::{ Deserializable, Reader, Error as ReaderError, deserialize }; use common::{NetAddress, ServiceFlags}; +use serialization::PayloadType; #[derive(Debug, PartialEq)] pub enum Version { - Simple(Simple), - V106(Simple, V106), - V70001(Simple, V106, V70001), + V0(V0), + V106(V0, V106), + V70001(V0, V106, V70001), +} + +impl PayloadType for Version { + fn version() -> u32 { + 0 + } + + fn command() -> &'static str { + "version" + } } impl Version { pub fn version(&self) -> u32 { match *self { - Version::Simple(ref s) => s.version, + Version::V0(ref s) => s.version, Version::V106(ref s, _) => s.version, Version::V70001(ref s, _, _) => s.version, } @@ -23,7 +34,7 @@ impl Version { } #[derive(Debug, PartialEq)] -pub struct Simple { +pub struct V0 { pub version: u32, pub services: ServiceFlags, pub timestamp: i64, @@ -46,7 +57,7 @@ pub struct V70001 { impl Serializable for Version { fn serialize(&self, stream: &mut Stream) { match *self { - Version::Simple(ref simple) => { + Version::V0(ref simple) => { stream.append(simple); }, Version::V106(ref simple, ref v106) => { @@ -66,10 +77,10 @@ impl Serializable for Version { impl Deserializable for Version { fn deserialize(reader: &mut Reader) -> Result where Self: Sized { - let simple: Simple = try!(reader.read()); + let simple: V0 = try!(reader.read()); if simple.version < 106 { - return Ok(Version::Simple(simple)); + return Ok(Version::V0(simple)); } let v106: V106 = try!(reader.read()); @@ -82,7 +93,7 @@ impl Deserializable for Version { } } -impl Serializable for Simple { +impl Serializable for V0 { fn serialize(&self, stream: &mut Stream) { stream .append(&self.version) @@ -92,9 +103,9 @@ impl Serializable for Simple { } } -impl Deserializable for Simple { +impl Deserializable for V0 { fn deserialize(reader: &mut Reader) -> Result where Self: Sized { - let result = Simple { + let result = V0 { version: try!(reader.read()), services: try!(reader.read()), timestamp: try!(reader.read()), @@ -155,13 +166,13 @@ impl From<&'static str> for Version { mod test { use bytes::Bytes; use ser::{serialize, deserialize}; - use super::{Version, Simple, V106}; + use super::{Version, V0, V106}; #[test] fn test_version_serialize() { let expected: Bytes = "9c7c00000100000000000000e615104d00000000010000000000000000000000000000000000ffff0a000001208d010000000000000000000000000000000000ffff0a000002208ddd9d202c3ab457130055810100".into(); - let version = Version::V106(Simple { + let version = Version::V106(V0 { version: 31900, services: 1u64.into(), timestamp: 0x4d1015e6, @@ -180,7 +191,7 @@ mod test { fn test_version_deserialize() { let raw: Bytes = "9c7c00000100000000000000e615104d00000000010000000000000000000000000000000000ffff0a000001208d010000000000000000000000000000000000ffff0a000002208ddd9d202c3ab457130055810100".into(); - let expected = Version::V106(Simple { + let expected = Version::V106(V0 { version: 31900, services: 1u64.into(), timestamp: 0x4d1015e6, diff --git a/p2p/src/io/handshake.rs b/p2p/src/io/handshake.rs index 34d8d2f9..7189e6dc 100644 --- a/p2p/src/io/handshake.rs +++ b/p2p/src/io/handshake.rs @@ -1,9 +1,9 @@ use std::{io, cmp}; use futures::{Future, Poll, Async}; use message::{Message, Payload}; -use message::types::Version; +use message::types::{Version, Verack}; use message::common::Magic; -use io::{write_message, read_message, ReadMessage, WriteMessage}; +use io::{write_message, read_message, ReadMessage, WriteMessage, ReadSpecificMessage, read_specific_message}; use Error; pub fn handshake(a: A, magic: Magic, version: Version) -> Handshake where A: io::Write + io::Read { @@ -19,7 +19,7 @@ pub fn accept_handshake(a: A, magic: Magic, version: Version) -> AcceptHandsh version: version.version(), state: AcceptHandshakeState::ReceiveVersion { local_version: Some(version), - future: read_message(a, magic, 0), + future: read_specific_message(a, magic, 0), }, magic: magic, } @@ -49,7 +49,7 @@ enum HandshakeState { ReceiveVersion(ReadMessage), ReceiveVerack { version: Option, - future: ReadMessage, + future: ReadSpecificMessage, }, Finished, } @@ -57,7 +57,7 @@ enum HandshakeState { enum AcceptHandshakeState { ReceiveVersion { local_version: Option, - future: ReadMessage + future: ReadSpecificMessage }, SendVersion { version: Option, @@ -102,19 +102,13 @@ impl Future for Handshake where A: io::Read + io::Write { let next = HandshakeState::ReceiveVerack { version: Some(version), - future: read_message(stream, self.magic, 0), + future: read_specific_message(stream, self.magic, 0), }; (next, Async::NotReady) }, HandshakeState::ReceiveVerack { ref mut version, ref mut future } => { - let (stream, payload) = try_ready!(future.poll()); - match payload { - Ok(Payload::Verack) => (), - Ok(_) => return Ok((stream, Err(Error::Handshake)).into()), - Err(err) => return Ok((stream, Err(err.into())).into()), - } - + let (stream, _verack) = try_ready!(future.poll()); let version = version.take().expect("verack must be preceded by version"); let result = HandshakeResult { @@ -143,10 +137,9 @@ impl Future for AcceptHandshake where A: io::Read + io::Write { fn poll(&mut self) -> Poll { let (next, result) = match self.state { AcceptHandshakeState::ReceiveVersion { ref mut local_version, ref mut future } => { - let (stream, payload) = try_ready!(future.poll()); - let version = match payload { - Ok(Payload::Version(version)) => version, - Ok(_) => return Ok((stream, Err(Error::Handshake)).into()), + let (stream, version) = try_ready!(future.poll()); + let version = match version { + Ok(version) => version, Err(err) => return Ok((stream, Err(err.into())).into()), }; diff --git a/p2p/src/io/mod.rs b/p2p/src/io/mod.rs index d22c0d60..9133191e 100644 --- a/p2p/src/io/mod.rs +++ b/p2p/src/io/mod.rs @@ -2,6 +2,8 @@ mod handshake; mod read_header; mod read_message; mod read_payload; +mod read_specific_message; +mod read_specific_payload; mod readrc; mod write_message; @@ -11,5 +13,7 @@ pub use self::handshake::{ pub use self::read_header::{read_header, ReadHeader}; pub use self::read_message::{read_message, ReadMessage, read_message_stream, ReadMessageStream}; pub use self::read_payload::{read_payload, ReadPayload}; +pub use self::read_specific_payload::{read_specific_payload, ReadSpecificPayload}; +pub use self::read_specific_message::{read_specific_message, ReadSpecificMessage}; pub use self::readrc::ReadRc; pub use self::write_message::{write_message, WriteMessage}; diff --git a/p2p/src/io/read_message.rs b/p2p/src/io/read_message.rs index c42dff68..f148cd26 100644 --- a/p2p/src/io/read_message.rs +++ b/p2p/src/io/read_message.rs @@ -73,7 +73,7 @@ impl Future for ReadMessage where A: io::Read { let (read, payload) = try_ready!(future.poll()); (ReadMessageState::Finished, Async::Ready((read, payload))) }, - ReadMessageState::Finished => panic!("poll AcceptHandshake after it's done"), + ReadMessageState::Finished => panic!("poll ReadMessage after it's done"), }; self.state = next; diff --git a/p2p/src/io/read_specific_message.rs b/p2p/src/io/read_specific_message.rs new file mode 100644 index 00000000..c67c2145 --- /dev/null +++ b/p2p/src/io/read_specific_message.rs @@ -0,0 +1,76 @@ +use std::io; +use std::marker::PhantomData; +use futures::{Poll, Future, Async}; +use ser::Deserializable; +use message::{MessageResult, Error}; +use message::common::Magic; +use message::serialization::PayloadType; +use io::{read_header, ReadHeader, read_specific_payload, ReadSpecificPayload}; + +pub fn read_specific_message(a: A, magic: Magic, version: u32) -> ReadSpecificMessage + where A: io::Read, M: PayloadType + Deserializable { + ReadSpecificMessage { + state: ReadMessageState::ReadHeader { + version: version, + future: read_header(a, magic), + }, + message_type: PhantomData + } +} + +enum ReadMessageState { + ReadHeader { + version: u32, + future: ReadHeader, + }, + ReadPayload { + future: ReadSpecificPayload, + }, + Finished, +} + +pub struct ReadSpecificMessage { + state: ReadMessageState, + message_type: PhantomData, +} + +impl Future for ReadSpecificMessage where A: io::Read, M: PayloadType + Deserializable { + type Item = (A, MessageResult); + type Error = io::Error; + + fn poll(&mut self) -> Poll { + let (next, result) = match self.state { + ReadMessageState::ReadHeader { version, ref mut future } => { + let (read, header) = try_ready!(future.poll()); + let header = match header { + Ok(header) => header, + Err(err) => { + return Ok((read, Err(err)).into()); + } + }; + if header.command != M::command().into() { + return Ok((read, Err(Error::InvalidCommand)).into()); + } + let future = read_specific_payload( + read, version, header.len as usize, header.checksum, + ); + let next = ReadMessageState::ReadPayload { + future: future, + }; + (next, Async::NotReady) + }, + ReadMessageState::ReadPayload { ref mut future } => { + let (read, payload) = try_ready!(future.poll()); + (ReadMessageState::Finished, Async::Ready((read, payload))) + }, + ReadMessageState::Finished => panic!("poll ReadSpecificMessage after it's done"), + }; + + self.state = next; + match result { + // by polling again, we register new future + Async::NotReady => self.poll(), + result => Ok(result) + } + } +} diff --git a/p2p/src/io/read_specific_payload.rs b/p2p/src/io/read_specific_payload.rs new file mode 100644 index 00000000..bfbb3247 --- /dev/null +++ b/p2p/src/io/read_specific_payload.rs @@ -0,0 +1,38 @@ +use std::io; +use std::marker::PhantomData; +use futures::{Poll, Future}; +use tokio_core::io::{read_exact, ReadExact}; +use bytes::Bytes; +use hash::H32; +use ser::Deserializable; +use message::MessageResult; +use message::serialization::{PayloadType, deserialize_payload}; + +pub fn read_specific_payload(a: A, version: u32, len: usize, checksum: H32) -> ReadSpecificPayload + where A: io::Read, M: PayloadType + Deserializable { + ReadSpecificPayload { + reader: read_exact(a, Bytes::new_with_len(len)), + version: version, + checksum: checksum, + payload_type: PhantomData, + } +} + +pub struct ReadSpecificPayload { + reader: ReadExact, + version: u32, + checksum: H32, + payload_type: PhantomData, +} + +/// TODO: check checksum +impl Future for ReadSpecificPayload where A: io::Read, M: PayloadType + Deserializable { + type Item = (A, MessageResult); + type Error = io::Error; + + fn poll(&mut self) -> Poll { + let (read, data) = try_ready!(self.reader.poll()); + let payload = deserialize_payload(&data, self.version); + Ok((read, payload).into()) + } +} diff --git a/p2p/src/net/config.rs b/p2p/src/net/config.rs index 4d8668ff..6bef7b66 100644 --- a/p2p/src/net/config.rs +++ b/p2p/src/net/config.rs @@ -1,6 +1,6 @@ use std::net::SocketAddr; use message::common::{Magic, ServiceFlags, NetAddress}; -use message::types::version::{Version, Simple, V106, V70001}; +use message::types::version::{Version, V0, V106, V70001}; use util::time::{Time, RealTime}; use util::nonce::{NonceGenerator, RandomNonce}; use VERSION; @@ -17,7 +17,7 @@ pub struct Config { impl Config { pub fn version(&self, to: &SocketAddr) -> Version { - Version::V70001(Simple { + Version::V70001(V0 { version: VERSION, services: self.services, timestamp: RealTime.get().sec, diff --git a/p2p/src/net/connections.rs b/p2p/src/net/connections.rs new file mode 100644 index 00000000..4b841111 --- /dev/null +++ b/p2p/src/net/connections.rs @@ -0,0 +1,22 @@ +use futures::{oneshot, Oneshot}; +use message::Payload; +use net::Connection; + +pub struct Connections { + channels: Vec, +} + +impl Connections { + /// Broadcast messages to the network. + /// Returned future completes of first confirmed receive. + pub fn broadcast(&self, payload: Payload) -> Oneshot { + let (complete, os) = oneshot::(); + let mut complete = Some(complete); + + for channel in &self.channels { + //channel.write_message( + } + + os + } +} diff --git a/p2p/src/net/mod.rs b/p2p/src/net/mod.rs index 90ef7610..4baed9d1 100644 --- a/p2p/src/net/mod.rs +++ b/p2p/src/net/mod.rs @@ -1,6 +1,7 @@ mod config; mod connect; mod connection; +mod connections; mod listen; pub use self::config::Config;