p2p overhaul

This commit is contained in:
debris 2016-10-03 17:17:34 +02:00
parent dc21039182
commit 5f2c7545b8
21 changed files with 255 additions and 213 deletions

2
.gitignore vendored
View File

@ -3,3 +3,5 @@
/target/
*.swp
*.swo
*.swn

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -1,5 +1,5 @@
mod addr;
mod version;
pub mod version;
pub use self::addr::{Addr, AddrBelow31402};
pub use self::version::{Version, Simple, V106, V70001};
pub use self::version::Version;

View File

@ -1,165 +0,0 @@
use std::{net, io};
use futures::{Future, Poll, Async};
use futures::stream::Stream;
use tokio_core::reactor::Handle;
use message::common::{Magic, ServiceFlags, NetAddress};
use message::types::{Version, Simple, V106, V70001};
use stream::{TcpStream, TcpStreamNew, TcpListener};
use io::{handshake, Handshake, HandshakeResult, accept_handshake, AcceptHandshake, IoStream};
use util::time::{Time, RealTime};
use util::nonce::{NonceGenerator, RandomNonce};
use {VERSION, Error};
pub struct Connection<A> where A: io::Read + io::Write {
pub stream: A,
pub handshake_result: HandshakeResult,
pub magic: Magic,
pub address: net::SocketAddr,
}
#[derive(Debug, Clone)]
pub struct Config {
pub magic: Magic,
pub local_address: net::SocketAddr,
pub services: ServiceFlags,
pub user_agent: String,
pub start_height: i32,
pub relay: bool,
}
fn version(config: &Config, address: &net::SocketAddr) -> Version {
Version::V70001(Simple {
version: VERSION,
services: config.services,
timestamp: RealTime.get().sec,
receiver: NetAddress {
services: config.services,
address: address.ip().into(),
port: address.port().into(),
},
}, V106 {
from: NetAddress {
services: config.services,
address: config.local_address.ip().into(),
port: config.local_address.port().into(),
},
nonce: RandomNonce.get(),
user_agent: config.user_agent.clone(),
start_height: config.start_height,
}, V70001 {
relay: config.relay,
})
}
pub fn connect(address: &net::SocketAddr, handle: &Handle, config: &Config) -> Connect {
Connect {
state: ConnectState::TcpConnect {
future: TcpStream::connect(address, handle),
version: Some(version(config, address)),
},
magic: config.magic,
address: *address,
}
}
pub fn listen(handle: &Handle, config: Config) -> Result<Listen, Error> {
let listener = try!(TcpListener::bind(&config.local_address, handle));
let listen = Listen {
inner: listener.incoming()
.and_then(move |(stream, address)| accept_connection(stream.into(), &config, address))
.boxed(),
};
Ok(listen)
}
enum ConnectState {
TcpConnect {
future: TcpStreamNew,
version: Option<Version>,
},
Handshake(Handshake<TcpStream>),
Connected,
}
pub struct Connect {
state: ConnectState,
magic: Magic,
address: net::SocketAddr,
}
pub struct Listen {
inner: IoStream<Connection<TcpStream>>,
}
impl Future for Connect {
type Item = Connection<TcpStream>;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (next, result) = match self.state {
ConnectState::TcpConnect { ref mut future, ref mut version } => {
let stream = try_ready!(future.poll());
let version = version.take().expect("state TcpConnect must have version");
let handshake = handshake(stream, self.magic, version);
(ConnectState::Handshake(handshake), Async::NotReady)
},
ConnectState::Handshake(ref mut future) => {
let (stream, result) = try_ready!(future.poll());
let connection = Connection {
stream: stream,
handshake_result: result,
magic: self.magic,
address: self.address,
};
(ConnectState::Connected, Async::Ready(connection))
},
ConnectState::Connected => panic!("poll Connect after it's done"),
};
self.state = next;
match result {
// by polling again, we register new future
Async::NotReady => self.poll(),
result => Ok(result)
}
}
}
fn accept_connection<A>(stream: A, config: &Config, address: net::SocketAddr) -> AcceptConnection<A> where A: io::Read + io::Write {
AcceptConnection {
handshake: accept_handshake(stream, config.magic, version(config, &address)),
magic: config.magic,
address: address,
}
}
pub struct AcceptConnection<A> where A: io::Read + io::Write {
handshake: AcceptHandshake<A>,
magic: Magic,
address: net::SocketAddr,
}
impl<A> Future for AcceptConnection<A> where A: io::Read + io::Write {
type Item = Connection<A>;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (stream, handshake_result) = try_ready!(self.handshake.poll());
let connection = Connection {
stream: stream,
handshake_result: handshake_result,
magic: self.magic,
address: self.address,
};
Ok(connection.into())
}
}
impl Stream for Listen {
type Item = Connection<TcpStream>;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.inner.poll()
}
}

View File

@ -9,11 +9,11 @@ extern crate message;
extern crate primitives;
extern crate serialization as ser;
pub mod connect;
mod error;
pub mod io;
pub mod net;
pub mod tcp;
pub mod util;
pub mod stream;
pub const VERSION: u32 = 70_001;
pub const USER_AGENT: &'static str = "pbtc";

42
p2p/src/net/config.rs Normal file
View File

@ -0,0 +1,42 @@
use std::net::SocketAddr;
use message::common::{Magic, ServiceFlags, NetAddress};
use message::types::version::{Version, Simple, V106, V70001};
use util::time::{Time, RealTime};
use util::nonce::{NonceGenerator, RandomNonce};
use VERSION;
#[derive(Debug, Clone)]
pub struct Config {
pub magic: Magic,
pub local_address: SocketAddr,
pub services: ServiceFlags,
pub user_agent: String,
pub start_height: i32,
pub relay: bool,
}
impl Config {
pub fn version(&self, to: &SocketAddr) -> Version {
Version::V70001(Simple {
version: VERSION,
services: self.services,
timestamp: RealTime.get().sec,
receiver: NetAddress {
services: self.services,
address: to.ip().into(),
port: to.port().into(),
},
}, V106 {
from: NetAddress {
services: self.services,
address: self.local_address.ip().into(),
port: self.local_address.port().into(),
},
nonce: RandomNonce.get(),
user_agent: self.user_agent.clone(),
start_height: self.start_height,
}, V70001 {
relay: self.relay,
})
}
}

69
p2p/src/net/connect.rs Normal file
View File

@ -0,0 +1,69 @@
use std::net::SocketAddr;
use futures::{Future, Poll, Async};
use tokio_core::reactor::Handle;
use message::common::Magic;
use message::types::Version;
use io::{handshake, Handshake};
use tcp::{TcpStream, TcpStreamNew};
use net::{Config, Connection};
use Error;
pub fn connect(address: &SocketAddr, handle: &Handle, config: &Config) -> Connect {
Connect {
state: ConnectState::TcpConnect {
future: TcpStream::connect(address, handle),
version: Some(config.version(address)),
},
magic: config.magic,
address: *address,
}
}
enum ConnectState {
TcpConnect {
future: TcpStreamNew,
version: Option<Version>,
},
Handshake(Handshake<TcpStream>),
Connected,
}
pub struct Connect {
state: ConnectState,
magic: Magic,
address: SocketAddr,
}
impl Future for Connect {
type Item = Connection<TcpStream>;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (next, result) = match self.state {
ConnectState::TcpConnect { ref mut future, ref mut version } => {
let stream = try_ready!(future.poll());
let version = version.take().expect("state TcpConnect must have version");
let handshake = handshake(stream, self.magic, version);
(ConnectState::Handshake(handshake), Async::NotReady)
},
ConnectState::Handshake(ref mut future) => {
let (stream, result) = try_ready!(future.poll());
let connection = Connection {
stream: stream,
handshake_result: result,
magic: self.magic,
address: self.address,
};
(ConnectState::Connected, Async::Ready(connection))
},
ConnectState::Connected => panic!("poll Connect after it's done"),
};
self.state = next;
match result {
// by polling again, we register new future
Async::NotReady => self.poll(),
result => Ok(result)
}
}
}

11
p2p/src/net/connection.rs Normal file
View File

@ -0,0 +1,11 @@
use std::{net, io};
use message::common::Magic;
use io::HandshakeResult;
pub struct Connection<A> where A: io::Read + io::Write {
pub stream: A,
pub handshake_result: HandshakeResult,
pub magic: Magic,
pub address: net::SocketAddr,
}

63
p2p/src/net/listen.rs Normal file
View File

@ -0,0 +1,63 @@
use std::{net, io};
use futures::{Future, Poll};
use futures::stream::Stream;
use tokio_core::reactor::Handle;
use message::common::Magic;
use io::{accept_handshake, AcceptHandshake, IoStream};
use net::{Config, Connection};
use tcp::{TcpStream, TcpListener};
use Error;
pub fn listen(handle: &Handle, config: Config) -> Result<Listen, Error> {
let listener = try!(TcpListener::bind(&config.local_address, handle));
let listen = Listen {
inner: listener.incoming()
.and_then(move |(stream, address)| accept_connection(stream.into(), &config, address))
.boxed(),
};
Ok(listen)
}
pub struct Listen {
inner: IoStream<Connection<TcpStream>>,
}
fn accept_connection<A>(stream: A, config: &Config, address: net::SocketAddr) -> AcceptConnection<A> where A: io::Read + io::Write {
AcceptConnection {
handshake: accept_handshake(stream, config.magic, config.version(&address)),
magic: config.magic,
address: address,
}
}
struct AcceptConnection<A> where A: io::Read + io::Write {
handshake: AcceptHandshake<A>,
magic: Magic,
address: net::SocketAddr,
}
impl<A> Future for AcceptConnection<A> where A: io::Read + io::Write {
type Item = Connection<A>;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (stream, handshake_result) = try_ready!(self.handshake.poll());
let connection = Connection {
stream: stream,
handshake_result: handshake_result,
magic: self.magic,
address: self.address,
};
Ok(connection.into())
}
}
impl Stream for Listen {
type Item = Connection<TcpStream>;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.inner.poll()
}
}

9
p2p/src/net/mod.rs Normal file
View File

@ -0,0 +1,9 @@
mod config;
mod connect;
mod connection;
mod listen;
pub use self::config::Config;
pub use self::connect::{Connect, connect};
pub use self::connection::Connection;
pub use self::listen::{Listen, listen};

46
p2p/src/tcp/listener.rs Normal file
View File

@ -0,0 +1,46 @@
use std::{ops, net};
use futures::Poll;
use futures::stream::Stream;
use tokio_core::{net as tnet};
use tokio_core::reactor::Handle;
use tcp::TcpStream;
use Error;
pub struct Incoming(tnet::Incoming);
impl Stream for Incoming {
type Item = (TcpStream, net::SocketAddr);
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match try_ready!(self.0.poll()) {
Some((stream, addr)) => {
let stream: TcpStream = stream.into();
Ok(Some((stream, addr)).into())
},
None => Ok(None.into()),
}
}
}
pub struct TcpListener(tnet::TcpListener);
impl ops::Deref for TcpListener {
type Target = tnet::TcpListener;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl TcpListener {
pub fn bind(addr: &net::SocketAddr, handle: &Handle) -> Result<TcpListener, Error> {
let listener = try!(tnet::TcpListener::bind(addr, handle));
Ok(TcpListener(listener))
}
pub fn incoming(self) -> Incoming {
Incoming(self.0.incoming())
}
}

5
p2p/src/tcp/mod.rs Normal file
View File

@ -0,0 +1,5 @@
mod stream;
mod listener;
pub use self::stream::{TcpStream, TcpStreamNew};
pub use self::listener::{TcpListener, Incoming};

View File

@ -1,7 +1,6 @@
use std::{ops, net, io};
use futures::{Future, Poll, Async};
use futures::stream::Stream;
use tokio_core::{net as tnet};
use std::{io, net, ops};
use futures::{Poll, Future};
use tokio_core::net as tnet;
use tokio_core::reactor::Handle;
use Error;
@ -13,7 +12,7 @@ impl Future for TcpStreamNew {
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let stream = try_ready!(self.0.poll());
Ok(Async::Ready(TcpStream(stream)))
Ok(TcpStream(stream).into())
}
}
@ -54,42 +53,3 @@ impl TcpStream {
TcpStreamNew(tnet::TcpStream::connect(addr, handle))
}
}
pub struct Incoming(tnet::Incoming);
impl Stream for Incoming {
type Item = (TcpStream, net::SocketAddr);
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match try_ready!(self.0.poll()) {
Some((stream, addr)) => {
let stream: TcpStream = stream.into();
Ok(Some((stream, addr)).into())
},
None => Ok(None.into()),
}
}
}
pub struct TcpListener(tnet::TcpListener);
impl ops::Deref for TcpListener {
type Target = tnet::TcpListener;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl TcpListener {
pub fn bind(addr: &net::SocketAddr, handle: &Handle) -> Result<TcpListener, Error> {
let listener = try!(tnet::TcpListener::bind(addr, handle));
Ok(TcpListener(listener))
}
pub fn incoming(self) -> Incoming {
Incoming(self.0.incoming())
}
}