zebrad: move seed command into inbound component

Remove the seed command entirely, and make the behavior it provided
(responding to `Request::Peers`) part of the ordinary functioning of the
start command.

The new `Inbound` service should be expanded to handle all request
types.
This commit is contained in:
Henry de Valence 2020-09-18 12:18:22 -07:00
parent 1d3892e1dc
commit 1d0ebf89c6
7 changed files with 127 additions and 220 deletions

View File

@ -10,8 +10,6 @@ structure, and documentation for all of the config options can be found
[here](https://doc.zebra.zfnd.org/zebrad/config/struct.ZebradConfig.html).
* `zebrad start` starts a full node.
* `zebrad seed` starts a crawler that can power a DNS seeder, but does not
attempt to sync the chain state.
## Return Codes

View File

@ -2,14 +2,11 @@
mod generate;
mod revhex;
mod seed;
mod start;
mod version;
use self::ZebradCmd::*;
use self::{
generate::GenerateCmd, revhex::RevhexCmd, seed::SeedCmd, start::StartCmd, version::VersionCmd,
};
use self::{generate::GenerateCmd, revhex::RevhexCmd, start::StartCmd, version::VersionCmd};
use crate::config::ZebradConfig;
@ -36,10 +33,6 @@ pub enum ZebradCmd {
#[options(help = "reverses the endianness of a hex string, like a block or transaction hash")]
Revhex(RevhexCmd),
/// The `seed` subcommand
#[options(help = "dns seeder")]
Seed(SeedCmd),
/// The `start` subcommand
#[options(help = "start the application")]
Start(StartCmd),
@ -56,7 +49,7 @@ impl ZebradCmd {
pub(crate) fn is_server(&self) -> bool {
match self {
// List all the commands, so new commands have to make a choice here
Seed(_) | Start(_) => true,
Start(_) => true,
Generate(_) | Help(_) | Revhex(_) | Version(_) => false,
}
}

View File

@ -1,144 +0,0 @@
//! `seed` subcommand - runs a dns seeder
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
};
use abscissa_core::{Command, Options, Runnable};
use futures::{channel::oneshot, prelude::*};
use tower::{buffer::Buffer, Service, ServiceExt};
use zebra_network::{AddressBook, BoxError, Request, Response};
use crate::components::tokio::RuntimeRun;
use crate::prelude::*;
use color_eyre::eyre::{eyre, Report};
/// Whether our `SeedService` is poll_ready or not.
#[derive(Debug)]
enum SeederState {
/// Waiting for the address book to be shared with us via the oneshot channel.
AwaitingAddressBook(oneshot::Receiver<Arc<Mutex<AddressBook>>>),
/// Address book received, ready to service requests.
Ready(Arc<Mutex<AddressBook>>),
}
#[derive(Debug)]
struct SeedService {
state: SeederState,
}
impl Service<Request> for SeedService {
type Response = Response;
type Error = BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
#[instrument(skip(self, _cx))]
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.state {
SeederState::Ready(_) => Poll::Ready(Ok(())),
SeederState::AwaitingAddressBook(ref mut rx) => match rx.try_recv() {
Err(e) => {
error!("oneshot sender dropped, failing service: {:?}", e);
Poll::Ready(Err(e.into()))
}
Ok(None) => {
trace!("awaiting address book, service is unready");
Poll::Pending
}
Ok(Some(address_book)) => {
debug!("received address_book via oneshot, service becomes ready");
self.state = SeederState::Ready(address_book);
Poll::Ready(Ok(()))
}
},
}
}
// Note: the generated span applies only to this function, not
// to the future, but this is OK because the current implementation
// is not actually async.
#[instrument]
fn call(&mut self, req: Request) -> Self::Future {
let address_book = if let SeederState::Ready(address_book) = &self.state {
address_book
} else {
panic!("SeedService::call without SeedService::poll_ready");
};
let response = match req {
Request::Peers => {
debug!("selecting peers to gossip");
let mut peers = address_book.lock().unwrap().sanitized();
// truncate the list so that we do not trivially reveal our entire peer set.
peers.truncate(50);
Ok(Response::Peers(peers))
}
_ => {
debug!("ignoring request");
Ok(Response::Nil)
}
};
Box::pin(futures::future::ready(response))
}
}
/// `seed` subcommand
///
/// A DNS seeder command to spider and collect as many valid peer
/// addresses as we can.
// This is not a unit-like struct because it makes Command and Options sad.
#[derive(Command, Debug, Default, Options)]
pub struct SeedCmd {}
impl Runnable for SeedCmd {
/// Start the application.
fn run(&self) {
info!("Starting zebrad in seed mode");
use crate::components::tokio::TokioComponent;
let rt = app_writer()
.state_mut()
.components
.get_downcast_mut::<TokioComponent>()
.expect("TokioComponent should be available")
.rt
.take();
rt.expect("runtime should not already be taken")
.run(self.seed());
}
}
impl SeedCmd {
async fn seed(&self) -> Result<(), Report> {
info!("begin tower-based peer handling test stub");
let (addressbook_tx, addressbook_rx) = oneshot::channel();
let seed_service = SeedService {
state: SeederState::AwaitingAddressBook(addressbook_rx),
};
let buffered_svc = Buffer::new(seed_service, 1);
let config = app_config().network.clone();
let (mut peer_set, address_book) = zebra_network::init(config, buffered_svc).await;
let _ = addressbook_tx.send(address_book);
info!("waiting for peer_set ready");
peer_set.ready_and().await.map_err(|e| eyre!(e))?;
info!("peer_set became ready");
let eternity = future::pending::<()>();
eternity.await;
Ok(())
}
}

View File

@ -19,17 +19,18 @@
//! * This task runs in the background and continuously queries the network for
//! new blocks to be verified and added to the local state
use crate::components::tokio::RuntimeRun;
use abscissa_core::{config, Command, FrameworkError, Options, Runnable};
use color_eyre::eyre::{eyre, Report};
use tokio::sync::oneshot;
use tower::builder::ServiceBuilder;
use crate::components::{tokio::RuntimeRun, Inbound};
use crate::config::ZebradConfig;
use crate::{
components::{tokio::TokioComponent, ChainSync},
prelude::*,
};
use abscissa_core::{config, Command, FrameworkError, Options, Runnable};
use color_eyre::eyre::Report;
use tower::{buffer::Buffer, service_fn};
/// `start` subcommand
#[derive(Command, Debug, Options)]
pub struct StartCmd {
@ -40,10 +41,13 @@ pub struct StartCmd {
impl StartCmd {
async fn start(&self) -> Result<(), Report> {
info!(?self, "starting to connect to the network");
let config = app_config().clone();
info!(?config);
let config = app_config();
info!("initializing node state");
let state = zebra_state::init(config.state.clone(), config.network.network);
info!("initializing chain verifier");
let verifier = zebra_consensus::chain::init(
config.consensus.clone(),
config.network.network,
@ -51,16 +55,21 @@ impl StartCmd {
)
.await;
// The service that our node uses to respond to requests by peers
let node = Buffer::new(
service_fn(|req| async move {
debug!(?req, "inbound peer request");
Ok::<zebra_network::Response, Report>(zebra_network::Response::Nil)
}),
1,
);
let (peer_set, _address_book) = zebra_network::init(config.network.clone(), node).await;
info!("initializing network");
// The service that our node uses to respond to requests by peers
let (setup_tx, setup_rx) = oneshot::channel();
let inbound = ServiceBuilder::new()
.load_shed()
.buffer(20)
.service(Inbound::new(setup_rx));
let (peer_set, address_book) = zebra_network::init(config.network.clone(), inbound).await;
setup_tx
.send((peer_set.clone(), address_book))
.map_err(|_| eyre!("could not send setup data to inbound service"))?;
info!("initializing syncer");
let mut syncer = ChainSync::new(config.network.network, peer_set, state, verifier);
syncer.sync().await

View File

@ -1,6 +1,8 @@
mod inbound;
pub mod metrics;
mod sync;
pub mod tokio;
pub mod tracing;
pub use inbound::Inbound;
pub use sync::ChainSync;

View File

@ -0,0 +1,98 @@
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
};
use futures::future::FutureExt;
use tokio::sync::oneshot;
use tower::{buffer::Buffer, util::BoxService, Service};
use zebra_network as zn;
use zebra_network::AddressBook;
type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
pub type SetupData = (Outbound, Arc<Mutex<AddressBook>>);
pub struct Inbound {
// invariant: outbound, address_book are Some if network_setup is None
//
// why not use an enum for the inbound state? because it would mean
// match-wrapping the body of Service::call rather than just expect()ing
// some Options.
network_setup: Option<oneshot::Receiver<SetupData>>,
outbound: Option<Outbound>,
address_book: Option<Arc<Mutex<zn::AddressBook>>>,
}
impl Inbound {
pub fn new(network_setup: oneshot::Receiver<SetupData>) -> Self {
Self {
network_setup: Some(network_setup),
outbound: None,
address_book: None,
}
}
}
impl Service<zn::Request> for Inbound {
type Response = zn::Response;
type Error = zn::BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
#[instrument(skip(self, _cx))]
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
use oneshot::error::TryRecvError;
match self.network_setup.take() {
Some(mut rx) => match rx.try_recv() {
Ok((outbound, address_book)) => {
self.outbound = Some(outbound);
self.address_book = Some(address_book);
self.network_setup = None;
Poll::Ready(Ok(()))
}
Err(e @ TryRecvError::Closed) => {
// returning poll_ready(err) means that poll_ready should
// never be called again, but put the oneshot back so we
// error again in case someone does.
self.network_setup = Some(rx);
Poll::Ready(Err(e.into()))
}
Err(TryRecvError::Empty) => {
self.network_setup = Some(rx);
Poll::Pending
}
},
None => Poll::Ready(Ok(())),
}
}
#[instrument(skip(self))]
fn call(&mut self, req: zn::Request) -> Self::Future {
match req {
zn::Request::Peers => {
// We could truncate the list to try to not reveal our entire
// peer set. But because we don't monitor repeated requests,
// this wouldn't actually achieve anything, because a crawler
// could just repeatedly query it.
let mut peers = self
.address_book
.as_ref()
.unwrap()
.lock()
.unwrap()
.sanitized();
const MAX_ADDR: usize = 1000; // bitcoin protocol constant
peers.truncate(MAX_ADDR);
async { Ok(zn::Response::Peers(peers)) }.boxed()
}
_ => {
debug!("ignoring unimplemented request");
async { Ok(zn::Response::Nil) }.boxed()
}
}
}
}

View File

@ -218,52 +218,6 @@ fn revhex_args() -> Result<()> {
Ok(())
}
#[test]
fn seed_no_args() -> Result<()> {
zebra_test::init();
let testdir = testdir()?.with_config(default_test_config()?)?;
let mut child = testdir.spawn_child(&["-v", "seed"])?;
// Run the program and kill it at 1 second
std::thread::sleep(Duration::from_secs(1));
child.kill()?;
let output = child.wait_with_output()?;
let output = output.assert_failure()?;
output.stdout_contains(r"Starting zebrad in seed mode")?;
// Make sure the command was killed
output.assert_was_killed()?;
Ok(())
}
#[test]
fn seed_args() -> Result<()> {
zebra_test::init();
let testdir = testdir()?.with_config(default_test_config()?)?;
let testdir = &testdir;
// unexpected free argument `argument`
let child = testdir.spawn_child(&["seed", "argument"])?;
let output = child.wait_with_output()?;
output.assert_failure()?;
// unrecognized option `-f`
let child = testdir.spawn_child(&["seed", "-f"])?;
let output = child.wait_with_output()?;
output.assert_failure()?;
// unexpected free argument `start`
let child = testdir.spawn_child(&["seed", "start"])?;
let output = child.wait_with_output()?;
output.assert_failure()?;
Ok(())
}
#[test]
fn start_no_args() -> Result<()> {
zebra_test::init();
@ -279,8 +233,6 @@ fn start_no_args() -> Result<()> {
let output = child.wait_with_output()?;
let output = output.assert_failure()?;
// start is the default mode, so we check for end of line, to distinguish it
// from seed
output.stdout_contains(r"Starting zebrad$")?;
// Make sure the command was killed
@ -455,7 +407,6 @@ fn valid_generated_config_test() -> Result<()> {
// they use the generated config. So parallel execution can cause port and
// cache conflicts.
valid_generated_config("start", r"Starting zebrad$")?;
valid_generated_config("seed", r"Starting zebrad in seed mode")?;
Ok(())
}