Modify PeerConnector to also handle inbound conns.
Because the Bitcoin handshake is symmetric, we can reuse the same logic for both incoming and outgoing connections.
This commit is contained in:
parent
5b801400c1
commit
8a1aa71736
|
@ -58,7 +58,7 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S> Service<SocketAddr> for PeerConnector<S>
|
impl<S> Service<(TcpStream, SocketAddr)> for PeerConnector<S>
|
||||||
where
|
where
|
||||||
S: Service<Request, Response = Response, Error = BoxedStdError> + Clone + Send + 'static,
|
S: Service<Request, Response = Response, Error = BoxedStdError> + Clone + Send + 'static,
|
||||||
S::Future: Send,
|
S::Future: Send,
|
||||||
|
@ -74,7 +74,9 @@ where
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn call(&mut self, addr: SocketAddr) -> Self::Future {
|
fn call(&mut self, req: (TcpStream, SocketAddr)) -> Self::Future {
|
||||||
|
let (tcp_stream, addr) = req;
|
||||||
|
|
||||||
let connector_span = span!(Level::INFO, "connector", addr = ?addr);
|
let connector_span = span!(Level::INFO, "connector", addr = ?addr);
|
||||||
let connection_span = span!(Level::INFO, "peer", addr = ?addr);
|
let connection_span = span!(Level::INFO, "peer", addr = ?addr);
|
||||||
|
|
||||||
|
@ -86,12 +88,9 @@ where
|
||||||
|
|
||||||
let fut = async move {
|
let fut = async move {
|
||||||
info!("connecting to remote peer");
|
info!("connecting to remote peer");
|
||||||
debug!("opening tcp stream");
|
|
||||||
|
|
||||||
let mut stream = Framed::new(
|
let mut stream =
|
||||||
TcpStream::connect(addr).await.expect("PeerError does not contain an io::Error variant, but this code will be removed in the next PR, so there's no need to handle this error"),
|
Framed::new(tcp_stream, Codec::builder().for_network(network).finish());
|
||||||
Codec::builder().for_network(network).finish(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let version = Message::Version {
|
let version = Message::Version {
|
||||||
version: constants::CURRENT_VERSION,
|
version: constants::CURRENT_VERSION,
|
||||||
|
|
|
@ -11,10 +11,11 @@
|
||||||
//! application's configuration file.
|
//! application's configuration file.
|
||||||
|
|
||||||
mod connect;
|
mod connect;
|
||||||
|
mod listen;
|
||||||
mod start;
|
mod start;
|
||||||
mod version;
|
mod version;
|
||||||
|
|
||||||
use self::{connect::ConnectCmd, start::StartCmd, version::VersionCmd};
|
use self::{connect::ConnectCmd, listen::ListenCmd, start::StartCmd, version::VersionCmd};
|
||||||
use crate::config::ZebradConfig;
|
use crate::config::ZebradConfig;
|
||||||
use abscissa_core::{
|
use abscissa_core::{
|
||||||
config::Override, Command, Configurable, FrameworkError, Help, Options, Runnable,
|
config::Override, Command, Configurable, FrameworkError, Help, Options, Runnable,
|
||||||
|
@ -42,6 +43,10 @@ pub enum ZebradCmd {
|
||||||
/// The `connect` subcommand
|
/// The `connect` subcommand
|
||||||
#[options(help = "testing stub for dumping network messages")]
|
#[options(help = "testing stub for dumping network messages")]
|
||||||
Connect(ConnectCmd),
|
Connect(ConnectCmd),
|
||||||
|
|
||||||
|
/// The `listen` subcommand
|
||||||
|
#[options(help = "testing stub for dumping network messages")]
|
||||||
|
Listen(ListenCmd),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This trait allows you to define how application configuration is loaded.
|
/// This trait allows you to define how application configuration is loaded.
|
||||||
|
|
|
@ -73,11 +73,15 @@ impl ConnectCmd {
|
||||||
1,
|
1,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
use tokio::net::TcpStream;
|
||||||
|
|
||||||
let config = app_config().network.clone();
|
let config = app_config().network.clone();
|
||||||
let collector = TimestampCollector::new();
|
let collector = TimestampCollector::new();
|
||||||
let mut pc = PeerConnector::new(config, Network::Mainnet, node, &collector);
|
let mut pc = PeerConnector::new(config, Network::Mainnet, node, &collector);
|
||||||
// no need to call ready because pc is always ready
|
|
||||||
let mut client = pc.call(self.addr.clone()).await?;
|
let tcp_stream = TcpStream::connect(self.addr).await?;
|
||||||
|
pc.ready().await?;
|
||||||
|
let mut client = pc.call((tcp_stream, self.addr)).await?;
|
||||||
|
|
||||||
client.ready().await?;
|
client.ready().await?;
|
||||||
|
|
||||||
|
@ -90,6 +94,7 @@ impl ConnectCmd {
|
||||||
"got addresses from first connected peer"
|
"got addresses from first connected peer"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
/*
|
||||||
use failure::Error;
|
use failure::Error;
|
||||||
use futures::{
|
use futures::{
|
||||||
future,
|
future,
|
||||||
|
@ -148,6 +153,7 @@ impl ConnectCmd {
|
||||||
// empty loop ensures we don't exit the application,
|
// empty loop ensures we don't exit the application,
|
||||||
// and this is throwaway code
|
// and this is throwaway code
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,98 @@
|
||||||
|
//! `listen` subcommand - test stub for talking to zcashd
|
||||||
|
|
||||||
|
use crate::prelude::*;
|
||||||
|
|
||||||
|
use abscissa_core::{Command, Options, Runnable};
|
||||||
|
|
||||||
|
/// `listen` subcommand
|
||||||
|
#[derive(Command, Debug, Options)]
|
||||||
|
pub struct ListenCmd {
|
||||||
|
/// The address of the node to connect to.
|
||||||
|
#[options(help = "The address to listen on.", default = "127.0.0.1:28233")]
|
||||||
|
addr: std::net::SocketAddr,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Runnable for ListenCmd {
|
||||||
|
/// Start the application.
|
||||||
|
fn run(&self) {
|
||||||
|
info!(connect.addr = ?self.addr);
|
||||||
|
|
||||||
|
use crate::components::tokio::TokioComponent;
|
||||||
|
|
||||||
|
let wait = tokio::future::pending::<()>();
|
||||||
|
// Combine the connect future with an infinite wait
|
||||||
|
// so that the program has to be explicitly killed and
|
||||||
|
// won't die before all tracing messages are written.
|
||||||
|
let fut = futures::future::join(
|
||||||
|
async {
|
||||||
|
match self.listen().await {
|
||||||
|
Ok(()) => {}
|
||||||
|
Err(e) => {
|
||||||
|
// Print any error that occurs.
|
||||||
|
error!(?e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
wait,
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = app_reader()
|
||||||
|
.state()
|
||||||
|
.components
|
||||||
|
.get_downcast_ref::<TokioComponent>()
|
||||||
|
.expect("TokioComponent should be available")
|
||||||
|
.rt
|
||||||
|
.block_on(fut);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ListenCmd {
|
||||||
|
async fn listen(&self) -> Result<(), failure::Error> {
|
||||||
|
use zebra_network::{
|
||||||
|
peer::PeerConnector,
|
||||||
|
peer_set::PeerSet,
|
||||||
|
protocol::internal::{Request, Response},
|
||||||
|
timestamp_collector::TimestampCollector,
|
||||||
|
Network,
|
||||||
|
};
|
||||||
|
|
||||||
|
info!("begin tower-based peer handling test stub");
|
||||||
|
|
||||||
|
use tower::{buffer::Buffer, service_fn, Service, ServiceExt};
|
||||||
|
|
||||||
|
let node = Buffer::new(
|
||||||
|
service_fn(|req| {
|
||||||
|
async move {
|
||||||
|
info!(?req);
|
||||||
|
Ok::<Response, failure::Error>(Response::Ok)
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
1,
|
||||||
|
);
|
||||||
|
|
||||||
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
|
|
||||||
|
let config = app_config().network.clone();
|
||||||
|
let collector = TimestampCollector::new();
|
||||||
|
let mut pc = PeerConnector::new(config, Network::Mainnet, node, &collector);
|
||||||
|
|
||||||
|
let mut listener = TcpListener::bind(self.addr).await?;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let (tcp_stream, addr) = listener.accept().await?;
|
||||||
|
|
||||||
|
pc.ready().await?;
|
||||||
|
let mut client = pc.call((tcp_stream, addr)).await?;
|
||||||
|
|
||||||
|
let addrs = match client.call(Request::GetPeers).await? {
|
||||||
|
Response::Peers(addrs) => addrs,
|
||||||
|
_ => bail!("Got wrong response type"),
|
||||||
|
};
|
||||||
|
info!(
|
||||||
|
addrs.len = addrs.len(),
|
||||||
|
"asked for addresses from remote peer"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue