Reorganize `connect` subcommand for readibility (#450)
This commit is contained in:
parent
d8b3db5679
commit
df656a8bf0
|
@ -1,12 +1,22 @@
|
|||
//! `connect` subcommand - test stub for talking to zcashd
|
||||
|
||||
use crate::prelude::*;
|
||||
|
||||
use crate::{components::tokio::TokioComponent, prelude::*};
|
||||
use abscissa_core::{Command, Options, Runnable};
|
||||
|
||||
use color_eyre::Report;
|
||||
use eyre::eyre;
|
||||
use futures::prelude::*;
|
||||
use eyre::{eyre, WrapErr};
|
||||
use futures::{
|
||||
prelude::*,
|
||||
stream::{FuturesUnordered, StreamExt},
|
||||
};
|
||||
use std::collections::BTreeSet;
|
||||
use tower::{buffer::Buffer, service_fn, Service, ServiceExt};
|
||||
use zebra_chain::{block::BlockHeaderHash, types::BlockHeight};
|
||||
|
||||
// genesis
|
||||
static GENESIS: BlockHeaderHash = BlockHeaderHash([
|
||||
8, 206, 61, 151, 49, 176, 0, 192, 131, 56, 69, 92, 138, 74, 107, 208, 93, 161, 110, 38, 177,
|
||||
29, 170, 27, 145, 113, 132, 236, 232, 15, 4, 0,
|
||||
]);
|
||||
|
||||
/// `connect` subcommand
|
||||
#[derive(Command, Debug, Options)]
|
||||
|
@ -24,7 +34,6 @@ impl Runnable for ConnectCmd {
|
|||
fn run(&self) {
|
||||
info!(connect.addr = ?self.addr);
|
||||
|
||||
use crate::components::tokio::TokioComponent;
|
||||
let rt = app_writer()
|
||||
.state_mut()
|
||||
.components
|
||||
|
@ -49,8 +58,7 @@ impl Runnable for ConnectCmd {
|
|||
|
||||
impl ConnectCmd {
|
||||
async fn connect(&self) -> Result<(), Report> {
|
||||
info!("begin tower-based peer handling test stub");
|
||||
use tower::{buffer::Buffer, service_fn, Service, ServiceExt};
|
||||
info!(?self, "begin tower-based peer handling test stub");
|
||||
|
||||
// The service that our node uses to respond to requests by peers
|
||||
let node = Buffer::new(
|
||||
|
@ -67,111 +75,143 @@ impl ConnectCmd {
|
|||
// Connect only to the specified peer.
|
||||
config.initial_mainnet_peers.insert(self.addr.to_string());
|
||||
|
||||
let mut state = zebra_state::in_memory::init();
|
||||
let (mut peer_set, _address_book) = zebra_network::init(config, node).await;
|
||||
let mut retry_peer_set =
|
||||
tower::retry::Retry::new(zebra_network::RetryErrors, peer_set.clone());
|
||||
let state = zebra_state::in_memory::init();
|
||||
let (peer_set, _address_book) = zebra_network::init(config, node).await;
|
||||
let retry_peer_set = tower::retry::Retry::new(zebra_network::RetryErrors, peer_set.clone());
|
||||
|
||||
info!("waiting for peer_set ready");
|
||||
peer_set.ready_and().await.map_err(|e| eyre!(e))?;
|
||||
|
||||
info!("peer_set became ready");
|
||||
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use std::collections::BTreeSet;
|
||||
use zebra_chain::block::BlockHeaderHash;
|
||||
use zebra_chain::types::BlockHeight;
|
||||
|
||||
// genesis
|
||||
let mut tip = BlockHeaderHash([
|
||||
8, 206, 61, 151, 49, 176, 0, 192, 131, 56, 69, 92, 138, 74, 107, 208, 93, 161, 110, 38,
|
||||
177, 29, 170, 27, 145, 113, 132, 236, 232, 15, 4, 0,
|
||||
]);
|
||||
|
||||
// TODO(jlusby): Replace with real state service
|
||||
let mut downloaded_block_heights = BTreeSet::<BlockHeight>::new();
|
||||
downloaded_block_heights.insert(BlockHeight(0));
|
||||
|
||||
let mut block_requests = FuturesUnordered::new();
|
||||
let mut requested_block_heights = 0;
|
||||
let mut connect = Connect {
|
||||
retry_peer_set,
|
||||
peer_set,
|
||||
state,
|
||||
tip: GENESIS,
|
||||
block_requests: FuturesUnordered::new(),
|
||||
requested_block_heights: 0,
|
||||
downloaded_block_heights,
|
||||
};
|
||||
|
||||
while requested_block_heights < 700_000 {
|
||||
// Request the next 500 hashes.
|
||||
let hashes = if let Ok(zebra_network::Response::BlockHeaderHashes(hashes)) =
|
||||
retry_peer_set
|
||||
.ready_and()
|
||||
.await
|
||||
.map_err(|e| eyre!(e))?
|
||||
.call(zebra_network::Request::FindBlocks {
|
||||
known_blocks: vec![tip],
|
||||
stop: None,
|
||||
})
|
||||
.await
|
||||
{
|
||||
info!(
|
||||
new_hashes = hashes.len(),
|
||||
requested = requested_block_heights,
|
||||
in_flight = block_requests.len(),
|
||||
downloaded = downloaded_block_heights.len(),
|
||||
highest = downloaded_block_heights.iter().next_back().unwrap().0,
|
||||
"requested more hashes"
|
||||
);
|
||||
requested_block_heights += hashes.len();
|
||||
hashes
|
||||
} else {
|
||||
panic!("request failed, TODO implement retry");
|
||||
};
|
||||
connect.connect().await
|
||||
}
|
||||
}
|
||||
|
||||
tip = *hashes.last().unwrap();
|
||||
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
|
||||
struct Connect<ZN, ZS>
|
||||
where
|
||||
ZN: Service<zebra_network::Request>,
|
||||
{
|
||||
retry_peer_set: tower::retry::Retry<zebra_network::RetryErrors, ZN>,
|
||||
peer_set: ZN,
|
||||
state: ZS,
|
||||
tip: BlockHeaderHash,
|
||||
block_requests: FuturesUnordered<ZN::Future>,
|
||||
requested_block_heights: usize,
|
||||
downloaded_block_heights: BTreeSet<BlockHeight>,
|
||||
}
|
||||
|
||||
impl<ZN, ZS> Connect<ZN, ZS>
|
||||
where
|
||||
ZN: Service<zebra_network::Request, Response = zebra_network::Response, Error = Error>
|
||||
+ Send
|
||||
+ Clone
|
||||
+ 'static,
|
||||
ZN::Future: Send,
|
||||
ZS: Service<zebra_state::Request, Response = zebra_state::Response, Error = Error>
|
||||
+ Send
|
||||
+ Clone
|
||||
+ 'static,
|
||||
ZS::Future: Send,
|
||||
{
|
||||
async fn connect(&mut self) -> Result<(), Report> {
|
||||
// TODO(jlusby): Replace with real state service
|
||||
|
||||
while self.requested_block_heights < 700_000 {
|
||||
let hashes = self.next_hashes().await?;
|
||||
self.tip = *hashes.last().unwrap();
|
||||
|
||||
// Request the corresponding blocks in chunks
|
||||
for chunk in hashes.chunks(10usize) {
|
||||
let request = peer_set.ready_and().await.map_err(|e| eyre!(e))?.call(
|
||||
zebra_network::Request::BlocksByHash(chunk.iter().cloned().collect()),
|
||||
);
|
||||
|
||||
block_requests.push(request);
|
||||
}
|
||||
self.request_blocks(hashes).await?;
|
||||
|
||||
// Allow at most 300 block requests in flight.
|
||||
while block_requests.len() > 300 {
|
||||
match block_requests.next().await {
|
||||
Some(Ok(zebra_network::Response::Blocks(blocks))) => {
|
||||
for block in blocks {
|
||||
downloaded_block_heights.insert(block.coinbase_height().unwrap());
|
||||
state
|
||||
.ready_and()
|
||||
.await
|
||||
.map_err(|e| eyre!(e))?
|
||||
.call(zebra_state::Request::AddBlock { block })
|
||||
.await
|
||||
.map_err(|e| eyre!(e))?;
|
||||
}
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!(%e);
|
||||
}
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
self.drain_requests(300).await?;
|
||||
}
|
||||
|
||||
while let Some(Ok(zebra_network::Response::Blocks(blocks))) = block_requests.next().await {
|
||||
for block in blocks {
|
||||
downloaded_block_heights.insert(block.coinbase_height().unwrap());
|
||||
state
|
||||
.ready_and()
|
||||
.await
|
||||
.map_err(|e| eyre!(e))?
|
||||
.call(zebra_state::Request::AddBlock { block })
|
||||
.await
|
||||
.map_err(|e| eyre!(e))?;
|
||||
}
|
||||
}
|
||||
self.drain_requests(0).await?;
|
||||
|
||||
let eternity = future::pending::<()>();
|
||||
eternity.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn next_hashes(&mut self) -> Result<Vec<BlockHeaderHash>, Report> {
|
||||
// Request the next 500 hashes.
|
||||
self.retry_peer_set
|
||||
.ready_and()
|
||||
.await
|
||||
.map_err(|e| eyre!(e))?
|
||||
.call(zebra_network::Request::FindBlocks {
|
||||
known_blocks: vec![self.tip],
|
||||
stop: None,
|
||||
})
|
||||
.await
|
||||
.map_err(|e| eyre!(e))
|
||||
.wrap_err("request failed, TODO implement retry")
|
||||
.map(|response| match response {
|
||||
zebra_network::Response::BlockHeaderHashes(hashes) => hashes,
|
||||
_ => unreachable!("FindBlocks always gets a BlockHeaderHashes response"),
|
||||
})
|
||||
.map(|hashes| {
|
||||
info!(
|
||||
new_hashes = hashes.len(),
|
||||
requested = self.requested_block_heights,
|
||||
in_flight = self.block_requests.len(),
|
||||
downloaded = self.downloaded_block_heights.len(),
|
||||
highest = self.downloaded_block_heights.iter().next_back().unwrap().0,
|
||||
"requested more hashes"
|
||||
);
|
||||
self.requested_block_heights += hashes.len();
|
||||
hashes
|
||||
})
|
||||
}
|
||||
|
||||
async fn request_blocks(&mut self, hashes: Vec<BlockHeaderHash>) -> Result<(), Report> {
|
||||
for chunk in hashes.chunks(10usize) {
|
||||
let request = self.peer_set.ready_and().await.map_err(|e| eyre!(e))?.call(
|
||||
zebra_network::Request::BlocksByHash(chunk.iter().cloned().collect()),
|
||||
);
|
||||
|
||||
self.block_requests.push(request);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn drain_requests(&mut self, request_goal: usize) -> Result<(), Report> {
|
||||
while self.block_requests.len() > request_goal {
|
||||
match self.block_requests.next().await {
|
||||
Some(Ok(zebra_network::Response::Blocks(blocks))) => {
|
||||
for block in blocks {
|
||||
self.downloaded_block_heights
|
||||
.insert(block.coinbase_height().unwrap());
|
||||
self.state
|
||||
.ready_and()
|
||||
.await
|
||||
.map_err(|e| eyre!(e))?
|
||||
.call(zebra_state::Request::AddBlock { block })
|
||||
.await
|
||||
.map_err(|e| eyre!(e))?;
|
||||
}
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!(%e);
|
||||
}
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue