p2p streams...

This commit is contained in:
debris 2016-10-05 17:23:50 +02:00
parent 55c6584bcf
commit c0448f3110
9 changed files with 200 additions and 37 deletions

28
Cargo.lock generated
View File

@ -3,7 +3,7 @@ name = "pbtc"
version = "0.1.0"
dependencies = [
"bitcrypto 0.1.0",
"clap 2.13.0 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
"keys 0.1.0",
"message 0.1.0",
"p2p 0.1.0",
@ -17,11 +17,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "arrayvec"
version = "0.3.19"
version = "0.3.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"nodrop 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"odds 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)",
"odds 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -69,7 +69,7 @@ dependencies = [
[[package]]
name = "clap"
version = "2.13.0"
version = "2.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -88,7 +88,7 @@ name = "eth-secp256k1"
version = "0.5.6"
source = "git+https://github.com/ethcore/rust-secp256k1#f998f9a8c18227af200f0f7fdadf8a6560d391ff"
dependencies = [
"arrayvec 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
"arrayvec 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)",
"gcc 0.3.35 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
@ -97,7 +97,7 @@ dependencies = [
[[package]]
name = "futures"
version = "0.1.1"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
@ -218,12 +218,12 @@ name = "nodrop"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"odds 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)",
"odds 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "odds"
version = "0.2.18"
version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
@ -231,7 +231,7 @@ name = "p2p"
version = "0.1.0"
dependencies = [
"bitcrypto 0.1.0",
"futures 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"message 0.1.0",
"primitives 0.1.0",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
@ -344,7 +344,7 @@ name = "tokio-core"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
"scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -397,15 +397,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[metadata]
"checksum ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "23ac7c30002a5accbf7e8987d0632fa6de155b7c3d39d0067317a391e00a2ef6"
"checksum arrayvec 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)" = "295abbcf8112693d74fa426186926b50d67105c88d4d0d94dd8222e51da9755a"
"checksum arrayvec 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d89f1b0e242270b5b797778af0c8d182a1a2ccac5d8d6fadf414223cc0fab096"
"checksum base58 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5024ee8015f02155eee35c711107ddd9a9bf3cb689cf2a9089c97e79b6e1ae83"
"checksum bitflags 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8dead7461c1127cf637931a1e50934eb6eee8bff2f74433ac7909e9afcee04a3"
"checksum bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "aad18937a628ec6abcd26d1489012cc0e18c21798210f491af69ded9b881106d"
"checksum byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855"
"checksum cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de1e760d7b6535af4241fca8bd8adf68e2e7edacc6b29f5d399050c5e48cf88c"
"checksum clap 2.13.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2887ae5b606c1fa314b9238e25a8be3fa673378415c32efc5749464f3365ee9d"
"checksum clap 2.14.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5fa304b03c49ccbb005784fc26e985b5d2310b1d37f2c311ce90dbcd18ea5fde"
"checksum eth-secp256k1 0.5.6 (git+https://github.com/ethcore/rust-secp256k1)" = "<none>"
"checksum futures 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "62af3ebbb8916ecf7ebcc4c130aacc33cc7f48d8b6e74fc9ed010bfa4f359794"
"checksum futures 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0bd34f72c0fffc9d2f6c570fd392bf99b9c5cd1481d79809e1cc2320befc0af0"
"checksum gcc 0.3.35 (registry+https://github.com/rust-lang/crates.io-index)" = "91ecd03771effb0c968fd6950b37e89476a578aaf1c70297d8e92b6516ec3312"
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
"checksum lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "49247ec2a285bb3dcb23cbd9c35193c025e7251bfce77c1d5da97e6362dffe7f"
@ -417,7 +417,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum net2 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)" = "5edf9cb6be97212423aed9413dd4729d62b370b5e1c571750e882cebbbc1e3e2"
"checksum nix 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a7bb1da2be7da3cbffda73fc681d509ffd9e665af478d2bee1907cee0bc64b2"
"checksum nodrop 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "0dbbadd3f4c98dea0bd3d9b4be4c0cdaf1ab57035cb2e41fce3983db5add7cc5"
"checksum odds 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)" = "95509b8be274a669ac94a94cc12e59f2ae0b0625ffe576103153a5c8fd6d5c62"
"checksum odds 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)" = "b20d7892efe1faecee7b0ab6492cd0484f8516d54f6aae87d00d5936dd374aed"
"checksum rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "2791d88c6defac799c3f20d74f094ca33b9332612d9aef9078519c82e4fe04a5"
"checksum rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)" = "f76d05d3993fd5f4af9434e8e436db163a12a9d40e1a58a726f27a01dfd12a2a"
"checksum rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)" = "6159e4e6e559c81bd706afe9c8fd68f547d3e851ce12e76b1de7914bab61691b"

View File

@ -3,13 +3,15 @@ mod read_header;
mod read_message;
mod read_payload;
mod stream;
mod readrc;
mod write_message;
pub use self::handshake::{
handshake, accept_handshake, Handshake, AcceptHandshake, HandshakeResult
};
pub use self::read_header::{read_header, ReadHeader};
pub use self::read_message::{read_message, ReadMessage};
pub use self::read_message::{read_message, ReadMessage, read_message_stream, ReadMessageStream};
pub use self::read_payload::{read_payload, ReadPayload};
pub use self::stream::IoStream;
pub use self::readrc::ReadRc;
pub use self::write_message::{write_message, WriteMessage};

View File

@ -1,8 +1,9 @@
use std::io;
use std::io::{self, Read};
use futures::{Future, Poll, Async};
use futures::stream::Stream;
use message::{Message, MessageHeader};
use message::common::Magic;
use io::{read_header, read_payload, ReadHeader, ReadPayload};
use io::{read_header, read_payload, ReadHeader, ReadPayload, ReadRc};
use Error;
enum ReadMessageState<A> {
@ -26,10 +27,27 @@ pub fn read_message<A>(a: A, magic: Magic, version: u32) -> ReadMessage<A> where
}
}
pub fn read_message_stream<A>(a: A, magic: Magic, version: u32) -> ReadMessageStream<A> where A: io::Read {
let stream: ReadRc<A> = a.into();
ReadMessageStream {
future: read_message(stream.clone(), magic, version),
magic: magic,
version: version,
stream: stream,
}
}
pub struct ReadMessage<A> {
state: ReadMessageState<A>,
}
pub struct ReadMessageStream<A> {
future: ReadMessage<ReadRc<A>>,
magic: Magic,
version: u32,
stream: ReadRc<A>,
}
impl<A> Future for ReadMessage<A> where A: io::Read {
type Item = (A, Message);
type Error = Error;
@ -69,13 +87,37 @@ impl<A> Future for ReadMessage<A> where A: io::Read {
}
}
impl<A> Stream for ReadMessageStream<A> where A: io::Read {
type Item = Message;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let result = match self.future.poll() {
Ok(Async::Ready((_, result))) => {
Ok(Some(result).into())
},
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_) => {
// TODO: error should be somehow handled and passed upstream
try!(self.stream.read_to_end(&mut Vec::new()));
Ok(Async::NotReady)
}
};
self.future = read_message(self.stream.clone(), self.magic, self.version);
result
}
}
#[cfg(test)]
mod tests {
use futures::Future;
use std::io::Cursor;
use futures::{Future, Async};
use futures::stream::Stream;
use bytes::Bytes;
use message::{Message, Payload};
use message::common::Magic;
use super::read_message;
use super::{read_message, read_message_stream};
#[test]
fn test_read_message() {
@ -83,4 +125,16 @@ mod tests {
let expected = Message::new(Magic::Mainnet, Payload::Verack);
assert_eq!(read_message(raw.as_ref(), Magic::Mainnet, 0).wait().unwrap().1, expected);
}
#[test]
fn test_read_message_stream() {
let raw: Bytes = "f9beb4d976657261636b000000000000000000005df6e0e2f9beb4d9676574616464720000000000000000005df6e0e2".into();
let expected0 = Message::new(Magic::Mainnet, Payload::Verack);
let expected1 = Message::new(Magic::Mainnet, Payload::GetAddr);
let mut stream = read_message_stream(Cursor::new(raw), Magic::Mainnet, 0);
assert_eq!(stream.poll().unwrap(), Some(expected0).into());
assert_eq!(stream.poll().unwrap(), Some(expected1).into());
assert_eq!(stream.poll().unwrap(), Async::NotReady);
}
}

33
p2p/src/io/readrc.rs Normal file
View File

@ -0,0 +1,33 @@
use std::rc::Rc;
use std::cell::RefCell;
use std::io::{Read, Error};
pub struct ReadRc<A> {
read: Rc<RefCell<A>>
}
impl<A> ReadRc<A> {
pub fn new(a: Rc<RefCell<A>>) -> Self {
ReadRc {
read: a,
}
}
}
impl<A> From<A> for ReadRc<A> {
fn from(a: A) -> Self {
ReadRc::new(Rc::new(RefCell::new(a)))
}
}
impl<A> Read for ReadRc<A> where A: Read {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
self.read.borrow_mut().read(buf)
}
}
impl<A> Clone for ReadRc<A> {
fn clone(&self) -> Self {
ReadRc::new(self.read.clone())
}
}

View File

@ -50,7 +50,7 @@ impl Future for Connect {
let (stream, result) = try_ready!(future.poll());
let connection = Connection {
stream: stream,
handshake_result: result,
version: result.negotiated_version,
magic: self.magic,
address: self.address,
};

View File

@ -1,23 +1,54 @@
use std::{net, io};
use tokio_core::io::{Io, ReadHalf, WriteHalf};
use message::{Message, Payload};
use message::common::{Magic, InventoryVector};
use message::types::{Ping, Pong, addr, Inv, GetData, NotFound};
use io::{HandshakeResult, write_message, WriteMessage};
use io::{write_message, WriteMessage, read_message_stream, ReadMessageStream};
pub struct Connection<A> {
pub stream: A,
pub handshake_result: HandshakeResult,
pub version: u32,
pub magic: Magic,
pub address: net::SocketAddr,
}
impl<A> io::Read for Connection<A> where A: io::Read {
pub struct ConnectionReader<A> {
stream: A,
version: u32,
magic: Magic,
}
pub struct ConnectionWriter<A> {
stream: A,
version: u32,
magic: Magic,
}
impl<A> Connection<A> where A: Io {
/// This function will panic if a task is not currently running.
pub fn split(self) -> (ConnectionReader<ReadHalf<A>>, ConnectionWriter<WriteHalf<A>>) {
let (r, w) = self.stream.split();
let reader = ConnectionReader {
stream: r,
version: self.version,
magic: self.magic,
};
let writer = ConnectionWriter {
stream: w,
version: self.version,
magic: self.magic,
};
(reader, writer)
}
}
impl<A> io::Read for ConnectionReader<A> where A: io::Read {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
self.stream.read(buf)
}
}
impl<A> io::Write for Connection<A> where A: io::Write {
impl<A> io::Write for ConnectionWriter<A> where A: io::Write {
fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
self.stream.write(buf)
}
@ -27,33 +58,39 @@ impl<A> io::Write for Connection<A> where A: io::Write {
}
}
impl<A> Connection<A> where A: io::Read + io::Write {
fn write_message(self, payload: Payload) -> WriteMessage<Connection<A>> {
impl<A> ConnectionReader<A> where A: io::Read {
pub fn incoming(self) -> ReadMessageStream<A> {
read_message_stream(self.stream, self.magic, self.version)
}
}
impl<A> ConnectionWriter<A> where A: io::Write {
fn write_message(self, payload: Payload) -> WriteMessage<ConnectionWriter<A>> {
let message = Message::new(self.magic, payload);
write_message(self, message)
}
pub fn ping(self) -> WriteMessage<Connection<A>> {
pub fn ping(self) -> WriteMessage<ConnectionWriter<A>> {
let payload = Payload::Ping(Ping {
nonce: 0,
});
self.write_message(payload)
}
pub fn pong(self, nonce: u64) -> WriteMessage<Connection<A>> {
pub fn pong(self, nonce: u64) -> WriteMessage<ConnectionWriter<A>> {
let payload = Payload::Pong(Pong {
nonce: nonce,
});
self.write_message(payload)
}
pub fn getaddr(self) -> WriteMessage<Connection<A>> {
pub fn getaddr(self) -> WriteMessage<ConnectionWriter<A>> {
let payload = Payload::GetAddr;
self.write_message(payload)
}
pub fn addr(self, addresses: Vec<addr::AddressEntry>) -> WriteMessage<Connection<A>> {
let payload = if self.handshake_result.negotiated_version < 31402 {
pub fn addr(self, addresses: Vec<addr::AddressEntry>) -> WriteMessage<ConnectionWriter<A>> {
let payload = if self.version < 31402 {
Payload::AddrBelow31402(addr::AddrBelow31402 {
addresses: addresses.into_iter().map(|x| x.address).collect(),
})
@ -65,24 +102,51 @@ impl<A> Connection<A> where A: io::Read + io::Write {
self.write_message(payload)
}
pub fn inv(self, inventory: Vec<InventoryVector>) -> WriteMessage<Connection<A>> {
pub fn inv(self, inventory: Vec<InventoryVector>) -> WriteMessage<ConnectionWriter<A>> {
let payload = Payload::Inv(Inv {
inventory: inventory,
});
self.write_message(payload)
}
pub fn getdata(self, inventory: Vec<InventoryVector>) -> WriteMessage<Connection<A>> {
pub fn getdata(self, inventory: Vec<InventoryVector>) -> WriteMessage<ConnectionWriter<A>> {
let payload = Payload::GetData(GetData {
inventory: inventory,
});
self.write_message(payload)
}
pub fn notfound(self, inventory: Vec<InventoryVector>) -> WriteMessage<Connection<A>> {
pub fn notfound(self, inventory: Vec<InventoryVector>) -> WriteMessage<ConnectionWriter<A>> {
let payload = Payload::NotFound(NotFound {
inventory: inventory,
});
self.write_message(payload)
}
}
#[cfg(test)]
mod test {
use std::io::Cursor;
use futures::stream::Stream;
use bytes::Bytes;
use message::{Message, Payload};
use message::common::Magic;
use super::ConnectionReader;
#[test]
fn test_connection_reader_stream() {
let raw: Bytes = "f9beb4d976657261636b000000000000000000005df6e0e2f9beb4d9676574616464720000000000000000005df6e0e2".into();
let expected0 = Message::new(Magic::Mainnet, Payload::Verack);
let expected1 = Message::new(Magic::Mainnet, Payload::GetAddr);
let reader = ConnectionReader {
stream: Cursor::new(raw),
version: 0,
magic: Magic::Mainnet,
};
let mut incoming = reader.incoming();
assert_eq!(incoming.poll().unwrap(), Some(expected0).into());
assert_eq!(incoming.poll().unwrap(), Some(expected1).into());
}
}

View File

@ -45,7 +45,7 @@ impl<A> Future for AcceptConnection<A> where A: io::Read + io::Write {
let (stream, handshake_result) = try_ready!(self.handshake.poll());
let connection = Connection {
stream: stream,
handshake_result: handshake_result,
version: handshake_result.negotiated_version,
magic: self.magic,
address: self.address,
};

View File

@ -24,7 +24,7 @@ pub fn run(config: Config, handle: &Handle) -> Result<BoxFuture<(), Error>, Erro
let listen = try!(listen(&handle, config.connection));
let server = listen.for_each(|connection| {
println!("new connection: {:?}", connection.handshake_result);
println!("new connection: {:?}", connection.version);
Ok(())
}).boxed();

View File

@ -1,4 +1,4 @@
use std::{ops, str, fmt};
use std::{ops, str, fmt, io};
use hex::{ToHex, FromHex, FromHexError};
#[derive(Default, PartialEq, Clone)]
@ -36,6 +36,16 @@ impl str::FromStr for Bytes {
}
}
impl io::Write for Bytes {
fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
self.0.write(buf)
}
fn flush(&mut self) -> Result<(), io::Error> {
self.0.flush()
}
}
impl fmt::Debug for Bytes {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(&self.0.to_hex())