listen stream

This commit is contained in:
debris 2016-10-02 17:45:25 +02:00
parent 5aacf6ea38
commit b896bb2985
3 changed files with 83 additions and 10 deletions

View File

@ -1,19 +1,23 @@
use std::{net, io};
use futures::{Future, Poll, Async};
use futures::stream::Stream;
use tokio_core::reactor::Handle;
use net::common::{Magic, ServiceFlags};
use net::messages::Version;
use tokio_core::net::TcpListener;
use tokio_core::io::IoStream;
use net::common::{Magic, ServiceFlags, NetAddress};
use net::messages::{Version, Simple};
use stream::{TcpStream, TcpStreamNew};
use io::{handshake, Handshake, HandshakeResult, Error};
use util::nonce::{NonceGenerator, RandomNonce};
use io::{handshake, Handshake, HandshakeResult, Error, accept_handshake, AcceptHandshake, VERSION};
use util::time::{Time, RealTime};
pub struct Connection<A> where A: io::Read + io::Write {
pub stream: A,
pub handshake_result: HandshakeResult,
pub magic: Magic,
pub address: net::SocketAddr,
}
#[derive(Debug, Clone)]
pub struct Config {
magic: Magic,
port: u16,
@ -23,20 +27,41 @@ pub struct Config {
relay: bool,
}
fn version(config: &Config) -> Version {
unimplemented!();
/// TODO: must be VERSION 70_001
fn version(config: &Config, address: &net::SocketAddr) -> Version {
Version::Simple(Simple {
version: VERSION,
services: config.services,
timestamp: RealTime.get().sec,
receiver: NetAddress {
services: config.services,
address: address.ip().into(),
port: address.port().into(),
},
})
}
pub fn connect(addr: &net::SocketAddr, handle: &Handle, config: &Config) -> Connect {
pub fn connect(address: &net::SocketAddr, handle: &Handle, config: &Config) -> Connect {
Connect {
state: ConnectState::TcpConnect {
future: TcpStream::connect(addr, handle),
version: Some(version(config)),
future: TcpStream::connect(address, handle),
version: Some(version(config, address)),
},
magic: config.magic,
address: *address,
}
}
pub fn listen(address: &net::SocketAddr, handle: &Handle, config: Config) -> Result<Listen, Error> {
let listener = try!(TcpListener::bind(address, handle));
let listen = Listen {
incoming: listener.incoming()
.map(move |(stream, address)| accept_connection(stream.into(), &config, address))
.boxed(),
};
Ok(listen)
}
enum ConnectState {
TcpConnect {
future: TcpStreamNew,
@ -49,6 +74,17 @@ enum ConnectState {
pub struct Connect {
state: ConnectState,
magic: Magic,
address: net::SocketAddr,
}
pub struct Listen {
incoming: IoStream<AcceptConnection<TcpStream>>,
}
impl Listen {
pub fn incoming(self) -> IoStream<AcceptConnection<TcpStream>> {
self.incoming
}
}
impl Future for Connect {
@ -69,6 +105,7 @@ impl Future for Connect {
stream: stream,
handshake_result: result,
magic: self.magic,
address: self.address,
};
(ConnectState::Connected, Async::Ready(connection))
},
@ -83,3 +120,33 @@ impl Future for Connect {
}
}
}
fn accept_connection<A>(stream: A, config: &Config, address: net::SocketAddr) -> AcceptConnection<A> where A: io::Read + io::Write {
AcceptConnection {
handshake: accept_handshake(stream, config.magic, version(config, &address)),
magic: config.magic,
address: address,
}
}
pub struct AcceptConnection<A> where A: io::Read + io::Write {
handshake: AcceptHandshake<A>,
magic: Magic,
address: net::SocketAddr,
}
impl<A> Future for AcceptConnection<A> where A: io::Read + io::Write {
type Item = Connection<A>;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (stream, handshake_result) = try_ready!(self.handshake.poll());
let connection = Connection {
stream: stream,
handshake_result: handshake_result,
magic: self.magic,
address: self.address,
};
Ok(connection.into())
}
}

View File

@ -5,7 +5,7 @@ mod read_message;
mod read_payload;
mod write_message;
pub const VERSION: u32 = 70_000;
pub const VERSION: u32 = 70_001;
pub const USER_AGENT: &'static str = "pbtc";
pub use self::error::Error;

View File

@ -18,6 +18,12 @@ impl Future for TcpStreamNew {
pub struct TcpStream(tnet::TcpStream);
impl From<tnet::TcpStream> for TcpStream {
fn from(s: tnet::TcpStream) -> Self {
TcpStream(s)
}
}
impl ops::Deref for TcpStream {
type Target = tnet::TcpStream;