From c0448f3110814760bfc0f0095afe65cc4f78adcc Mon Sep 17 00:00:00 2001 From: debris Date: Wed, 5 Oct 2016 17:23:50 +0200 Subject: [PATCH] p2p streams... --- Cargo.lock | 28 ++++++------ p2p/src/io/mod.rs | 4 +- p2p/src/io/read_message.rs | 62 +++++++++++++++++++++++-- p2p/src/io/readrc.rs | 33 ++++++++++++++ p2p/src/net/connect.rs | 2 +- p2p/src/net/connection.rs | 92 ++++++++++++++++++++++++++++++++------ p2p/src/net/listen.rs | 2 +- p2p/src/run.rs | 2 +- primitives/src/bytes.rs | 12 ++++- 9 files changed, 200 insertions(+), 37 deletions(-) create mode 100644 p2p/src/io/readrc.rs diff --git a/Cargo.lock b/Cargo.lock index 38f7cc68..3b9ad33c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3,7 +3,7 @@ name = "pbtc" version = "0.1.0" dependencies = [ "bitcrypto 0.1.0", - "clap 2.13.0 (registry+https://github.com/rust-lang/crates.io-index)", + "clap 2.14.0 (registry+https://github.com/rust-lang/crates.io-index)", "keys 0.1.0", "message 0.1.0", "p2p 0.1.0", @@ -17,11 +17,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "arrayvec" -version = "0.3.19" +version = "0.3.20" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "nodrop 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", - "odds 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)", + "odds 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -69,7 +69,7 @@ dependencies = [ [[package]] name = "clap" -version = "2.13.0" +version = "2.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -88,7 +88,7 @@ name = "eth-secp256k1" version = "0.5.6" source = "git+https://github.com/ethcore/rust-secp256k1#f998f9a8c18227af200f0f7fdadf8a6560d391ff" dependencies = [ - "arrayvec 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", + "arrayvec 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)", "gcc 0.3.35 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", @@ -97,7 +97,7 @@ dependencies = [ [[package]] name = "futures" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -218,12 +218,12 @@ name = "nodrop" version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "odds 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)", + "odds 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "odds" -version = "0.2.18" +version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -231,7 +231,7 @@ name = "p2p" version = "0.1.0" dependencies = [ "bitcrypto 0.1.0", - "futures 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "message 0.1.0", "primitives 0.1.0", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", @@ -344,7 +344,7 @@ name = "tokio-core" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "futures 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -397,15 +397,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [metadata] "checksum ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "23ac7c30002a5accbf7e8987d0632fa6de155b7c3d39d0067317a391e00a2ef6" -"checksum arrayvec 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)" = "295abbcf8112693d74fa426186926b50d67105c88d4d0d94dd8222e51da9755a" +"checksum arrayvec 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d89f1b0e242270b5b797778af0c8d182a1a2ccac5d8d6fadf414223cc0fab096" "checksum base58 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5024ee8015f02155eee35c711107ddd9a9bf3cb689cf2a9089c97e79b6e1ae83" "checksum bitflags 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8dead7461c1127cf637931a1e50934eb6eee8bff2f74433ac7909e9afcee04a3" "checksum bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "aad18937a628ec6abcd26d1489012cc0e18c21798210f491af69ded9b881106d" "checksum byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855" "checksum cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de1e760d7b6535af4241fca8bd8adf68e2e7edacc6b29f5d399050c5e48cf88c" -"checksum clap 2.13.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2887ae5b606c1fa314b9238e25a8be3fa673378415c32efc5749464f3365ee9d" +"checksum clap 2.14.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5fa304b03c49ccbb005784fc26e985b5d2310b1d37f2c311ce90dbcd18ea5fde" "checksum eth-secp256k1 0.5.6 (git+https://github.com/ethcore/rust-secp256k1)" = "" -"checksum futures 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "62af3ebbb8916ecf7ebcc4c130aacc33cc7f48d8b6e74fc9ed010bfa4f359794" +"checksum futures 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0bd34f72c0fffc9d2f6c570fd392bf99b9c5cd1481d79809e1cc2320befc0af0" "checksum gcc 0.3.35 (registry+https://github.com/rust-lang/crates.io-index)" = "91ecd03771effb0c968fd6950b37e89476a578aaf1c70297d8e92b6516ec3312" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" "checksum lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "49247ec2a285bb3dcb23cbd9c35193c025e7251bfce77c1d5da97e6362dffe7f" @@ -417,7 +417,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum net2 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)" = "5edf9cb6be97212423aed9413dd4729d62b370b5e1c571750e882cebbbc1e3e2" "checksum nix 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a7bb1da2be7da3cbffda73fc681d509ffd9e665af478d2bee1907cee0bc64b2" "checksum nodrop 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "0dbbadd3f4c98dea0bd3d9b4be4c0cdaf1ab57035cb2e41fce3983db5add7cc5" -"checksum odds 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)" = "95509b8be274a669ac94a94cc12e59f2ae0b0625ffe576103153a5c8fd6d5c62" +"checksum odds 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)" = "b20d7892efe1faecee7b0ab6492cd0484f8516d54f6aae87d00d5936dd374aed" "checksum rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "2791d88c6defac799c3f20d74f094ca33b9332612d9aef9078519c82e4fe04a5" "checksum rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)" = "f76d05d3993fd5f4af9434e8e436db163a12a9d40e1a58a726f27a01dfd12a2a" "checksum rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)" = "6159e4e6e559c81bd706afe9c8fd68f547d3e851ce12e76b1de7914bab61691b" diff --git a/p2p/src/io/mod.rs b/p2p/src/io/mod.rs index ae1805ff..501ed847 100644 --- a/p2p/src/io/mod.rs +++ b/p2p/src/io/mod.rs @@ -3,13 +3,15 @@ mod read_header; mod read_message; mod read_payload; mod stream; +mod readrc; mod write_message; pub use self::handshake::{ handshake, accept_handshake, Handshake, AcceptHandshake, HandshakeResult }; pub use self::read_header::{read_header, ReadHeader}; -pub use self::read_message::{read_message, ReadMessage}; +pub use self::read_message::{read_message, ReadMessage, read_message_stream, ReadMessageStream}; pub use self::read_payload::{read_payload, ReadPayload}; pub use self::stream::IoStream; +pub use self::readrc::ReadRc; pub use self::write_message::{write_message, WriteMessage}; diff --git a/p2p/src/io/read_message.rs b/p2p/src/io/read_message.rs index 30954e95..233f816e 100644 --- a/p2p/src/io/read_message.rs +++ b/p2p/src/io/read_message.rs @@ -1,8 +1,9 @@ -use std::io; +use std::io::{self, Read}; use futures::{Future, Poll, Async}; +use futures::stream::Stream; use message::{Message, MessageHeader}; use message::common::Magic; -use io::{read_header, read_payload, ReadHeader, ReadPayload}; +use io::{read_header, read_payload, ReadHeader, ReadPayload, ReadRc}; use Error; enum ReadMessageState { @@ -26,10 +27,27 @@ pub fn read_message(a: A, magic: Magic, version: u32) -> ReadMessage where } } +pub fn read_message_stream(a: A, magic: Magic, version: u32) -> ReadMessageStream where A: io::Read { + let stream: ReadRc = a.into(); + ReadMessageStream { + future: read_message(stream.clone(), magic, version), + magic: magic, + version: version, + stream: stream, + } +} + pub struct ReadMessage { state: ReadMessageState, } +pub struct ReadMessageStream { + future: ReadMessage>, + magic: Magic, + version: u32, + stream: ReadRc, +} + impl Future for ReadMessage where A: io::Read { type Item = (A, Message); type Error = Error; @@ -69,13 +87,37 @@ impl Future for ReadMessage where A: io::Read { } } +impl Stream for ReadMessageStream where A: io::Read { + type Item = Message; + type Error = Error; + + fn poll(&mut self) -> Poll, Self::Error> { + let result = match self.future.poll() { + Ok(Async::Ready((_, result))) => { + Ok(Some(result).into()) + }, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(_) => { + // TODO: error should be somehow handled and passed upstream + try!(self.stream.read_to_end(&mut Vec::new())); + Ok(Async::NotReady) + } + }; + + self.future = read_message(self.stream.clone(), self.magic, self.version); + result + } +} + #[cfg(test)] mod tests { - use futures::Future; + use std::io::Cursor; + use futures::{Future, Async}; + use futures::stream::Stream; use bytes::Bytes; use message::{Message, Payload}; use message::common::Magic; - use super::read_message; + use super::{read_message, read_message_stream}; #[test] fn test_read_message() { @@ -83,4 +125,16 @@ mod tests { let expected = Message::new(Magic::Mainnet, Payload::Verack); assert_eq!(read_message(raw.as_ref(), Magic::Mainnet, 0).wait().unwrap().1, expected); } + + #[test] + fn test_read_message_stream() { + let raw: Bytes = "f9beb4d976657261636b000000000000000000005df6e0e2f9beb4d9676574616464720000000000000000005df6e0e2".into(); + let expected0 = Message::new(Magic::Mainnet, Payload::Verack); + let expected1 = Message::new(Magic::Mainnet, Payload::GetAddr); + + let mut stream = read_message_stream(Cursor::new(raw), Magic::Mainnet, 0); + assert_eq!(stream.poll().unwrap(), Some(expected0).into()); + assert_eq!(stream.poll().unwrap(), Some(expected1).into()); + assert_eq!(stream.poll().unwrap(), Async::NotReady); + } } diff --git a/p2p/src/io/readrc.rs b/p2p/src/io/readrc.rs new file mode 100644 index 00000000..6719a8be --- /dev/null +++ b/p2p/src/io/readrc.rs @@ -0,0 +1,33 @@ +use std::rc::Rc; +use std::cell::RefCell; +use std::io::{Read, Error}; + +pub struct ReadRc { + read: Rc> +} + +impl ReadRc { + pub fn new(a: Rc>) -> Self { + ReadRc { + read: a, + } + } +} + +impl From for ReadRc { + fn from(a: A) -> Self { + ReadRc::new(Rc::new(RefCell::new(a))) + } +} + +impl Read for ReadRc where A: Read { + fn read(&mut self, buf: &mut [u8]) -> Result { + self.read.borrow_mut().read(buf) + } +} + +impl Clone for ReadRc { + fn clone(&self) -> Self { + ReadRc::new(self.read.clone()) + } +} diff --git a/p2p/src/net/connect.rs b/p2p/src/net/connect.rs index 3d680193..c8e421d8 100644 --- a/p2p/src/net/connect.rs +++ b/p2p/src/net/connect.rs @@ -50,7 +50,7 @@ impl Future for Connect { let (stream, result) = try_ready!(future.poll()); let connection = Connection { stream: stream, - handshake_result: result, + version: result.negotiated_version, magic: self.magic, address: self.address, }; diff --git a/p2p/src/net/connection.rs b/p2p/src/net/connection.rs index 72490a07..9de2b290 100644 --- a/p2p/src/net/connection.rs +++ b/p2p/src/net/connection.rs @@ -1,23 +1,54 @@ use std::{net, io}; +use tokio_core::io::{Io, ReadHalf, WriteHalf}; use message::{Message, Payload}; use message::common::{Magic, InventoryVector}; use message::types::{Ping, Pong, addr, Inv, GetData, NotFound}; -use io::{HandshakeResult, write_message, WriteMessage}; +use io::{write_message, WriteMessage, read_message_stream, ReadMessageStream}; pub struct Connection { pub stream: A, - pub handshake_result: HandshakeResult, + pub version: u32, pub magic: Magic, pub address: net::SocketAddr, } -impl io::Read for Connection where A: io::Read { +pub struct ConnectionReader { + stream: A, + version: u32, + magic: Magic, +} + +pub struct ConnectionWriter { + stream: A, + version: u32, + magic: Magic, +} + +impl Connection where A: Io { + /// This function will panic if a task is not currently running. + pub fn split(self) -> (ConnectionReader>, ConnectionWriter>) { + let (r, w) = self.stream.split(); + let reader = ConnectionReader { + stream: r, + version: self.version, + magic: self.magic, + }; + let writer = ConnectionWriter { + stream: w, + version: self.version, + magic: self.magic, + }; + (reader, writer) + } +} + +impl io::Read for ConnectionReader where A: io::Read { fn read(&mut self, buf: &mut [u8]) -> Result { self.stream.read(buf) } } -impl io::Write for Connection where A: io::Write { +impl io::Write for ConnectionWriter where A: io::Write { fn write(&mut self, buf: &[u8]) -> Result { self.stream.write(buf) } @@ -27,33 +58,39 @@ impl io::Write for Connection where A: io::Write { } } -impl Connection where A: io::Read + io::Write { - fn write_message(self, payload: Payload) -> WriteMessage> { +impl ConnectionReader where A: io::Read { + pub fn incoming(self) -> ReadMessageStream { + read_message_stream(self.stream, self.magic, self.version) + } +} + +impl ConnectionWriter where A: io::Write { + fn write_message(self, payload: Payload) -> WriteMessage> { let message = Message::new(self.magic, payload); write_message(self, message) } - pub fn ping(self) -> WriteMessage> { + pub fn ping(self) -> WriteMessage> { let payload = Payload::Ping(Ping { nonce: 0, }); self.write_message(payload) } - pub fn pong(self, nonce: u64) -> WriteMessage> { + pub fn pong(self, nonce: u64) -> WriteMessage> { let payload = Payload::Pong(Pong { nonce: nonce, }); self.write_message(payload) } - pub fn getaddr(self) -> WriteMessage> { + pub fn getaddr(self) -> WriteMessage> { let payload = Payload::GetAddr; self.write_message(payload) } - pub fn addr(self, addresses: Vec) -> WriteMessage> { - let payload = if self.handshake_result.negotiated_version < 31402 { + pub fn addr(self, addresses: Vec) -> WriteMessage> { + let payload = if self.version < 31402 { Payload::AddrBelow31402(addr::AddrBelow31402 { addresses: addresses.into_iter().map(|x| x.address).collect(), }) @@ -65,24 +102,51 @@ impl Connection where A: io::Read + io::Write { self.write_message(payload) } - pub fn inv(self, inventory: Vec) -> WriteMessage> { + pub fn inv(self, inventory: Vec) -> WriteMessage> { let payload = Payload::Inv(Inv { inventory: inventory, }); self.write_message(payload) } - pub fn getdata(self, inventory: Vec) -> WriteMessage> { + pub fn getdata(self, inventory: Vec) -> WriteMessage> { let payload = Payload::GetData(GetData { inventory: inventory, }); self.write_message(payload) } - pub fn notfound(self, inventory: Vec) -> WriteMessage> { + pub fn notfound(self, inventory: Vec) -> WriteMessage> { let payload = Payload::NotFound(NotFound { inventory: inventory, }); self.write_message(payload) } } + +#[cfg(test)] +mod test { + use std::io::Cursor; + use futures::stream::Stream; + use bytes::Bytes; + use message::{Message, Payload}; + use message::common::Magic; + use super::ConnectionReader; + + #[test] + fn test_connection_reader_stream() { + let raw: Bytes = "f9beb4d976657261636b000000000000000000005df6e0e2f9beb4d9676574616464720000000000000000005df6e0e2".into(); + let expected0 = Message::new(Magic::Mainnet, Payload::Verack); + let expected1 = Message::new(Magic::Mainnet, Payload::GetAddr); + + let reader = ConnectionReader { + stream: Cursor::new(raw), + version: 0, + magic: Magic::Mainnet, + }; + + let mut incoming = reader.incoming(); + assert_eq!(incoming.poll().unwrap(), Some(expected0).into()); + assert_eq!(incoming.poll().unwrap(), Some(expected1).into()); + } +} diff --git a/p2p/src/net/listen.rs b/p2p/src/net/listen.rs index b4c4ed57..7327c90f 100644 --- a/p2p/src/net/listen.rs +++ b/p2p/src/net/listen.rs @@ -45,7 +45,7 @@ impl Future for AcceptConnection where A: io::Read + io::Write { let (stream, handshake_result) = try_ready!(self.handshake.poll()); let connection = Connection { stream: stream, - handshake_result: handshake_result, + version: handshake_result.negotiated_version, magic: self.magic, address: self.address, }; diff --git a/p2p/src/run.rs b/p2p/src/run.rs index 08bc6568..7a02c033 100644 --- a/p2p/src/run.rs +++ b/p2p/src/run.rs @@ -24,7 +24,7 @@ pub fn run(config: Config, handle: &Handle) -> Result, Erro let listen = try!(listen(&handle, config.connection)); let server = listen.for_each(|connection| { - println!("new connection: {:?}", connection.handshake_result); + println!("new connection: {:?}", connection.version); Ok(()) }).boxed(); diff --git a/primitives/src/bytes.rs b/primitives/src/bytes.rs index 8df393d4..d489e3be 100644 --- a/primitives/src/bytes.rs +++ b/primitives/src/bytes.rs @@ -1,4 +1,4 @@ -use std::{ops, str, fmt}; +use std::{ops, str, fmt, io}; use hex::{ToHex, FromHex, FromHexError}; #[derive(Default, PartialEq, Clone)] @@ -36,6 +36,16 @@ impl str::FromStr for Bytes { } } +impl io::Write for Bytes { + fn write(&mut self, buf: &[u8]) -> Result { + self.0.write(buf) + } + + fn flush(&mut self) -> Result<(), io::Error> { + self.0.flush() + } +} + impl fmt::Debug for Bytes { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.write_str(&self.0.to_hex())