diff --git a/.gitignore b/.gitignore index 917fc514..6fe8be80 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ /target/ *.swp +*.swo +*.swn diff --git a/message/.Cargo.toml.swo b/message/.Cargo.toml.swo deleted file mode 100644 index 5623b650..00000000 Binary files a/message/.Cargo.toml.swo and /dev/null differ diff --git a/message/src/.lib.rs.swo b/message/src/.lib.rs.swo deleted file mode 100644 index 1c9c8cc0..00000000 Binary files a/message/src/.lib.rs.swo and /dev/null differ diff --git a/message/src/message/.message.rs.swn b/message/src/message/.message.rs.swn deleted file mode 100644 index 3ac64fa7..00000000 Binary files a/message/src/message/.message.rs.swn and /dev/null differ diff --git a/message/src/message/.message.rs.swo b/message/src/message/.message.rs.swo deleted file mode 100644 index c0ccd225..00000000 Binary files a/message/src/message/.message.rs.swo and /dev/null differ diff --git a/message/src/message/.message_header.rs.swn b/message/src/message/.message_header.rs.swn deleted file mode 100644 index c975974f..00000000 Binary files a/message/src/message/.message_header.rs.swn and /dev/null differ diff --git a/message/src/message/.message_header.rs.swo b/message/src/message/.message_header.rs.swo deleted file mode 100644 index e6a5cb8d..00000000 Binary files a/message/src/message/.message_header.rs.swo and /dev/null differ diff --git a/message/src/message/.mod.rs.swo b/message/src/message/.mod.rs.swo deleted file mode 100644 index 3c0376fd..00000000 Binary files a/message/src/message/.mod.rs.swo and /dev/null differ diff --git a/message/src/message/.payload.rs.swn b/message/src/message/.payload.rs.swn deleted file mode 100644 index a4dcf3be..00000000 Binary files a/message/src/message/.payload.rs.swn and /dev/null differ diff --git a/message/src/message/.payload.rs.swo b/message/src/message/.payload.rs.swo deleted file mode 100644 index 542d3816..00000000 Binary files a/message/src/message/.payload.rs.swo and /dev/null differ diff --git a/message/src/types/mod.rs b/message/src/types/mod.rs index 813e10bd..15a1f7ab 100644 --- a/message/src/types/mod.rs +++ b/message/src/types/mod.rs @@ -1,5 +1,5 @@ mod addr; -mod version; +pub mod version; pub use self::addr::{Addr, AddrBelow31402}; -pub use self::version::{Version, Simple, V106, V70001}; +pub use self::version::Version; diff --git a/p2p/src/connect.rs b/p2p/src/connect.rs deleted file mode 100644 index 099e8785..00000000 --- a/p2p/src/connect.rs +++ /dev/null @@ -1,165 +0,0 @@ -use std::{net, io}; -use futures::{Future, Poll, Async}; -use futures::stream::Stream; -use tokio_core::reactor::Handle; -use message::common::{Magic, ServiceFlags, NetAddress}; -use message::types::{Version, Simple, V106, V70001}; -use stream::{TcpStream, TcpStreamNew, TcpListener}; -use io::{handshake, Handshake, HandshakeResult, accept_handshake, AcceptHandshake, IoStream}; -use util::time::{Time, RealTime}; -use util::nonce::{NonceGenerator, RandomNonce}; -use {VERSION, Error}; - -pub struct Connection where A: io::Read + io::Write { - pub stream: A, - pub handshake_result: HandshakeResult, - pub magic: Magic, - pub address: net::SocketAddr, -} - -#[derive(Debug, Clone)] -pub struct Config { - pub magic: Magic, - pub local_address: net::SocketAddr, - pub services: ServiceFlags, - pub user_agent: String, - pub start_height: i32, - pub relay: bool, -} - -fn version(config: &Config, address: &net::SocketAddr) -> Version { - Version::V70001(Simple { - version: VERSION, - services: config.services, - timestamp: RealTime.get().sec, - receiver: NetAddress { - services: config.services, - address: address.ip().into(), - port: address.port().into(), - }, - }, V106 { - from: NetAddress { - services: config.services, - address: config.local_address.ip().into(), - port: config.local_address.port().into(), - }, - nonce: RandomNonce.get(), - user_agent: config.user_agent.clone(), - start_height: config.start_height, - }, V70001 { - relay: config.relay, - }) -} - -pub fn connect(address: &net::SocketAddr, handle: &Handle, config: &Config) -> Connect { - Connect { - state: ConnectState::TcpConnect { - future: TcpStream::connect(address, handle), - version: Some(version(config, address)), - }, - magic: config.magic, - address: *address, - } -} - -pub fn listen(handle: &Handle, config: Config) -> Result { - let listener = try!(TcpListener::bind(&config.local_address, handle)); - let listen = Listen { - inner: listener.incoming() - .and_then(move |(stream, address)| accept_connection(stream.into(), &config, address)) - .boxed(), - }; - Ok(listen) -} - -enum ConnectState { - TcpConnect { - future: TcpStreamNew, - version: Option, - }, - Handshake(Handshake), - Connected, -} - -pub struct Connect { - state: ConnectState, - magic: Magic, - address: net::SocketAddr, -} - -pub struct Listen { - inner: IoStream>, -} - -impl Future for Connect { - type Item = Connection; - type Error = Error; - - fn poll(&mut self) -> Poll { - let (next, result) = match self.state { - ConnectState::TcpConnect { ref mut future, ref mut version } => { - let stream = try_ready!(future.poll()); - let version = version.take().expect("state TcpConnect must have version"); - let handshake = handshake(stream, self.magic, version); - (ConnectState::Handshake(handshake), Async::NotReady) - }, - ConnectState::Handshake(ref mut future) => { - let (stream, result) = try_ready!(future.poll()); - let connection = Connection { - stream: stream, - handshake_result: result, - magic: self.magic, - address: self.address, - }; - (ConnectState::Connected, Async::Ready(connection)) - }, - ConnectState::Connected => panic!("poll Connect after it's done"), - }; - - self.state = next; - match result { - // by polling again, we register new future - Async::NotReady => self.poll(), - result => Ok(result) - } - } -} - -fn accept_connection(stream: A, config: &Config, address: net::SocketAddr) -> AcceptConnection where A: io::Read + io::Write { - AcceptConnection { - handshake: accept_handshake(stream, config.magic, version(config, &address)), - magic: config.magic, - address: address, - } -} - -pub struct AcceptConnection where A: io::Read + io::Write { - handshake: AcceptHandshake, - magic: Magic, - address: net::SocketAddr, -} - -impl Future for AcceptConnection where A: io::Read + io::Write { - type Item = Connection; - type Error = Error; - - fn poll(&mut self) -> Poll { - let (stream, handshake_result) = try_ready!(self.handshake.poll()); - let connection = Connection { - stream: stream, - handshake_result: handshake_result, - magic: self.magic, - address: self.address, - }; - Ok(connection.into()) - } -} - -impl Stream for Listen { - type Item = Connection; - type Error = Error; - - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.poll() - } -} diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 23acd5a4..c4d3222f 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -9,11 +9,11 @@ extern crate message; extern crate primitives; extern crate serialization as ser; -pub mod connect; mod error; pub mod io; +pub mod net; +pub mod tcp; pub mod util; -pub mod stream; pub const VERSION: u32 = 70_001; pub const USER_AGENT: &'static str = "pbtc"; diff --git a/p2p/src/net/config.rs b/p2p/src/net/config.rs new file mode 100644 index 00000000..4d8668ff --- /dev/null +++ b/p2p/src/net/config.rs @@ -0,0 +1,42 @@ +use std::net::SocketAddr; +use message::common::{Magic, ServiceFlags, NetAddress}; +use message::types::version::{Version, Simple, V106, V70001}; +use util::time::{Time, RealTime}; +use util::nonce::{NonceGenerator, RandomNonce}; +use VERSION; + +#[derive(Debug, Clone)] +pub struct Config { + pub magic: Magic, + pub local_address: SocketAddr, + pub services: ServiceFlags, + pub user_agent: String, + pub start_height: i32, + pub relay: bool, +} + +impl Config { + pub fn version(&self, to: &SocketAddr) -> Version { + Version::V70001(Simple { + version: VERSION, + services: self.services, + timestamp: RealTime.get().sec, + receiver: NetAddress { + services: self.services, + address: to.ip().into(), + port: to.port().into(), + }, + }, V106 { + from: NetAddress { + services: self.services, + address: self.local_address.ip().into(), + port: self.local_address.port().into(), + }, + nonce: RandomNonce.get(), + user_agent: self.user_agent.clone(), + start_height: self.start_height, + }, V70001 { + relay: self.relay, + }) + } +} diff --git a/p2p/src/net/connect.rs b/p2p/src/net/connect.rs new file mode 100644 index 00000000..3d680193 --- /dev/null +++ b/p2p/src/net/connect.rs @@ -0,0 +1,69 @@ +use std::net::SocketAddr; +use futures::{Future, Poll, Async}; +use tokio_core::reactor::Handle; +use message::common::Magic; +use message::types::Version; +use io::{handshake, Handshake}; +use tcp::{TcpStream, TcpStreamNew}; +use net::{Config, Connection}; +use Error; + +pub fn connect(address: &SocketAddr, handle: &Handle, config: &Config) -> Connect { + Connect { + state: ConnectState::TcpConnect { + future: TcpStream::connect(address, handle), + version: Some(config.version(address)), + }, + magic: config.magic, + address: *address, + } +} + +enum ConnectState { + TcpConnect { + future: TcpStreamNew, + version: Option, + }, + Handshake(Handshake), + Connected, +} + +pub struct Connect { + state: ConnectState, + magic: Magic, + address: SocketAddr, +} + +impl Future for Connect { + type Item = Connection; + type Error = Error; + + fn poll(&mut self) -> Poll { + let (next, result) = match self.state { + ConnectState::TcpConnect { ref mut future, ref mut version } => { + let stream = try_ready!(future.poll()); + let version = version.take().expect("state TcpConnect must have version"); + let handshake = handshake(stream, self.magic, version); + (ConnectState::Handshake(handshake), Async::NotReady) + }, + ConnectState::Handshake(ref mut future) => { + let (stream, result) = try_ready!(future.poll()); + let connection = Connection { + stream: stream, + handshake_result: result, + magic: self.magic, + address: self.address, + }; + (ConnectState::Connected, Async::Ready(connection)) + }, + ConnectState::Connected => panic!("poll Connect after it's done"), + }; + + self.state = next; + match result { + // by polling again, we register new future + Async::NotReady => self.poll(), + result => Ok(result) + } + } +} diff --git a/p2p/src/net/connection.rs b/p2p/src/net/connection.rs new file mode 100644 index 00000000..565f1fe7 --- /dev/null +++ b/p2p/src/net/connection.rs @@ -0,0 +1,11 @@ +use std::{net, io}; +use message::common::Magic; +use io::HandshakeResult; + +pub struct Connection where A: io::Read + io::Write { + pub stream: A, + pub handshake_result: HandshakeResult, + pub magic: Magic, + pub address: net::SocketAddr, +} + diff --git a/p2p/src/net/listen.rs b/p2p/src/net/listen.rs new file mode 100644 index 00000000..b4c4ed57 --- /dev/null +++ b/p2p/src/net/listen.rs @@ -0,0 +1,63 @@ +use std::{net, io}; +use futures::{Future, Poll}; +use futures::stream::Stream; +use tokio_core::reactor::Handle; +use message::common::Magic; +use io::{accept_handshake, AcceptHandshake, IoStream}; +use net::{Config, Connection}; +use tcp::{TcpStream, TcpListener}; +use Error; + +pub fn listen(handle: &Handle, config: Config) -> Result { + let listener = try!(TcpListener::bind(&config.local_address, handle)); + let listen = Listen { + inner: listener.incoming() + .and_then(move |(stream, address)| accept_connection(stream.into(), &config, address)) + .boxed(), + }; + Ok(listen) +} + + +pub struct Listen { + inner: IoStream>, +} + +fn accept_connection(stream: A, config: &Config, address: net::SocketAddr) -> AcceptConnection where A: io::Read + io::Write { + AcceptConnection { + handshake: accept_handshake(stream, config.magic, config.version(&address)), + magic: config.magic, + address: address, + } +} + +struct AcceptConnection where A: io::Read + io::Write { + handshake: AcceptHandshake, + magic: Magic, + address: net::SocketAddr, +} + +impl Future for AcceptConnection where A: io::Read + io::Write { + type Item = Connection; + type Error = Error; + + fn poll(&mut self) -> Poll { + let (stream, handshake_result) = try_ready!(self.handshake.poll()); + let connection = Connection { + stream: stream, + handshake_result: handshake_result, + magic: self.magic, + address: self.address, + }; + Ok(connection.into()) + } +} + +impl Stream for Listen { + type Item = Connection; + type Error = Error; + + fn poll(&mut self) -> Poll, Self::Error> { + self.inner.poll() + } +} diff --git a/p2p/src/net/mod.rs b/p2p/src/net/mod.rs new file mode 100644 index 00000000..90ef7610 --- /dev/null +++ b/p2p/src/net/mod.rs @@ -0,0 +1,9 @@ +mod config; +mod connect; +mod connection; +mod listen; + +pub use self::config::Config; +pub use self::connect::{Connect, connect}; +pub use self::connection::Connection; +pub use self::listen::{Listen, listen}; diff --git a/p2p/src/tcp/listener.rs b/p2p/src/tcp/listener.rs new file mode 100644 index 00000000..907fedd2 --- /dev/null +++ b/p2p/src/tcp/listener.rs @@ -0,0 +1,46 @@ +use std::{ops, net}; +use futures::Poll; +use futures::stream::Stream; +use tokio_core::{net as tnet}; +use tokio_core::reactor::Handle; +use tcp::TcpStream; +use Error; + +pub struct Incoming(tnet::Incoming); + +impl Stream for Incoming { + type Item = (TcpStream, net::SocketAddr); + type Error = Error; + + fn poll(&mut self) -> Poll, Self::Error> { + match try_ready!(self.0.poll()) { + Some((stream, addr)) => { + let stream: TcpStream = stream.into(); + Ok(Some((stream, addr)).into()) + }, + None => Ok(None.into()), + } + } +} + +pub struct TcpListener(tnet::TcpListener); + +impl ops::Deref for TcpListener { + type Target = tnet::TcpListener; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl TcpListener { + pub fn bind(addr: &net::SocketAddr, handle: &Handle) -> Result { + let listener = try!(tnet::TcpListener::bind(addr, handle)); + Ok(TcpListener(listener)) + } + + pub fn incoming(self) -> Incoming { + Incoming(self.0.incoming()) + } +} + diff --git a/p2p/src/tcp/mod.rs b/p2p/src/tcp/mod.rs new file mode 100644 index 00000000..b68152a2 --- /dev/null +++ b/p2p/src/tcp/mod.rs @@ -0,0 +1,5 @@ +mod stream; +mod listener; + +pub use self::stream::{TcpStream, TcpStreamNew}; +pub use self::listener::{TcpListener, Incoming}; diff --git a/p2p/src/stream.rs b/p2p/src/tcp/stream.rs similarity index 50% rename from p2p/src/stream.rs rename to p2p/src/tcp/stream.rs index 81a93ba0..fc8930e6 100644 --- a/p2p/src/stream.rs +++ b/p2p/src/tcp/stream.rs @@ -1,7 +1,6 @@ -use std::{ops, net, io}; -use futures::{Future, Poll, Async}; -use futures::stream::Stream; -use tokio_core::{net as tnet}; +use std::{io, net, ops}; +use futures::{Poll, Future}; +use tokio_core::net as tnet; use tokio_core::reactor::Handle; use Error; @@ -13,7 +12,7 @@ impl Future for TcpStreamNew { fn poll(&mut self) -> Poll { let stream = try_ready!(self.0.poll()); - Ok(Async::Ready(TcpStream(stream))) + Ok(TcpStream(stream).into()) } } @@ -54,42 +53,3 @@ impl TcpStream { TcpStreamNew(tnet::TcpStream::connect(addr, handle)) } } - -pub struct Incoming(tnet::Incoming); - -impl Stream for Incoming { - type Item = (TcpStream, net::SocketAddr); - type Error = Error; - - fn poll(&mut self) -> Poll, Self::Error> { - match try_ready!(self.0.poll()) { - Some((stream, addr)) => { - let stream: TcpStream = stream.into(); - Ok(Some((stream, addr)).into()) - }, - None => Ok(None.into()), - } - } -} - -pub struct TcpListener(tnet::TcpListener); - -impl ops::Deref for TcpListener { - type Target = tnet::TcpListener; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl TcpListener { - pub fn bind(addr: &net::SocketAddr, handle: &Handle) -> Result { - let listener = try!(tnet::TcpListener::bind(addr, handle)); - Ok(TcpListener(listener)) - } - - pub fn incoming(self) -> Incoming { - Incoming(self.0.incoming()) - } -} -