diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs index bf76126f5..70668c0aa 100644 --- a/zebra-network/src/peer/connector.rs +++ b/zebra-network/src/peer/connector.rs @@ -58,7 +58,7 @@ where } } -impl Service for PeerConnector +impl Service<(TcpStream, SocketAddr)> for PeerConnector where S: Service + Clone + Send + 'static, S::Future: Send, @@ -74,7 +74,9 @@ where Poll::Ready(Ok(())) } - fn call(&mut self, addr: SocketAddr) -> Self::Future { + fn call(&mut self, req: (TcpStream, SocketAddr)) -> Self::Future { + let (tcp_stream, addr) = req; + let connector_span = span!(Level::INFO, "connector", addr = ?addr); let connection_span = span!(Level::INFO, "peer", addr = ?addr); @@ -86,12 +88,9 @@ where let fut = async move { info!("connecting to remote peer"); - debug!("opening tcp stream"); - let mut stream = Framed::new( - TcpStream::connect(addr).await.expect("PeerError does not contain an io::Error variant, but this code will be removed in the next PR, so there's no need to handle this error"), - Codec::builder().for_network(network).finish(), - ); + let mut stream = + Framed::new(tcp_stream, Codec::builder().for_network(network).finish()); let version = Message::Version { version: constants::CURRENT_VERSION, diff --git a/zebrad/src/commands.rs b/zebrad/src/commands.rs index c599496ed..09c9b79ec 100644 --- a/zebrad/src/commands.rs +++ b/zebrad/src/commands.rs @@ -11,10 +11,11 @@ //! application's configuration file. mod connect; +mod listen; mod start; mod version; -use self::{connect::ConnectCmd, start::StartCmd, version::VersionCmd}; +use self::{connect::ConnectCmd, listen::ListenCmd, start::StartCmd, version::VersionCmd}; use crate::config::ZebradConfig; use abscissa_core::{ config::Override, Command, Configurable, FrameworkError, Help, Options, Runnable, @@ -42,6 +43,10 @@ pub enum ZebradCmd { /// The `connect` subcommand #[options(help = "testing stub for dumping network messages")] Connect(ConnectCmd), + + /// The `listen` subcommand + #[options(help = "testing stub for dumping network messages")] + Listen(ListenCmd), } /// This trait allows you to define how application configuration is loaded. diff --git a/zebrad/src/commands/connect.rs b/zebrad/src/commands/connect.rs index b5fb7c461..df8b9b8a7 100644 --- a/zebrad/src/commands/connect.rs +++ b/zebrad/src/commands/connect.rs @@ -73,11 +73,15 @@ impl ConnectCmd { 1, ); + use tokio::net::TcpStream; + let config = app_config().network.clone(); let collector = TimestampCollector::new(); let mut pc = PeerConnector::new(config, Network::Mainnet, node, &collector); - // no need to call ready because pc is always ready - let mut client = pc.call(self.addr.clone()).await?; + + let tcp_stream = TcpStream::connect(self.addr).await?; + pc.ready().await?; + let mut client = pc.call((tcp_stream, self.addr)).await?; client.ready().await?; @@ -90,6 +94,7 @@ impl ConnectCmd { "got addresses from first connected peer" ); +/* use failure::Error; use futures::{ future, @@ -148,6 +153,7 @@ impl ConnectCmd { // empty loop ensures we don't exit the application, // and this is throwaway code } + */ Ok(()) } diff --git a/zebrad/src/commands/listen.rs b/zebrad/src/commands/listen.rs new file mode 100644 index 000000000..8c60162d3 --- /dev/null +++ b/zebrad/src/commands/listen.rs @@ -0,0 +1,98 @@ +//! `listen` subcommand - test stub for talking to zcashd + +use crate::prelude::*; + +use abscissa_core::{Command, Options, Runnable}; + +/// `listen` subcommand +#[derive(Command, Debug, Options)] +pub struct ListenCmd { + /// The address of the node to connect to. + #[options(help = "The address to listen on.", default = "127.0.0.1:28233")] + addr: std::net::SocketAddr, +} + +impl Runnable for ListenCmd { + /// Start the application. + fn run(&self) { + info!(connect.addr = ?self.addr); + + use crate::components::tokio::TokioComponent; + + let wait = tokio::future::pending::<()>(); + // Combine the connect future with an infinite wait + // so that the program has to be explicitly killed and + // won't die before all tracing messages are written. + let fut = futures::future::join( + async { + match self.listen().await { + Ok(()) => {} + Err(e) => { + // Print any error that occurs. + error!(?e); + } + } + }, + wait, + ); + + let _ = app_reader() + .state() + .components + .get_downcast_ref::() + .expect("TokioComponent should be available") + .rt + .block_on(fut); + } +} + +impl ListenCmd { + async fn listen(&self) -> Result<(), failure::Error> { + use zebra_network::{ + peer::PeerConnector, + peer_set::PeerSet, + protocol::internal::{Request, Response}, + timestamp_collector::TimestampCollector, + Network, + }; + + info!("begin tower-based peer handling test stub"); + + use tower::{buffer::Buffer, service_fn, Service, ServiceExt}; + + let node = Buffer::new( + service_fn(|req| { + async move { + info!(?req); + Ok::(Response::Ok) + } + }), + 1, + ); + + use tokio::net::{TcpListener, TcpStream}; + + let config = app_config().network.clone(); + let collector = TimestampCollector::new(); + let mut pc = PeerConnector::new(config, Network::Mainnet, node, &collector); + + let mut listener = TcpListener::bind(self.addr).await?; + + loop { + let (tcp_stream, addr) = listener.accept().await?; + + pc.ready().await?; + let mut client = pc.call((tcp_stream, addr)).await?; + + let addrs = match client.call(Request::GetPeers).await? { + Response::Peers(addrs) => addrs, + _ => bail!("Got wrong response type"), + }; + info!( + addrs.len = addrs.len(), + "asked for addresses from remote peer" + ); + } + Ok(()) + } +}