zebra/zebrad/src/commands/start/sync.rs

486 lines
20 KiB
Rust
Raw Normal View History

use std::{collections::HashSet, iter, pin::Pin, sync::Arc, time::Duration};
2020-07-08 13:33:39 -07:00
use color_eyre::eyre::{eyre, Report};
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::{task::JoinHandle, time::delay_for};
use tower::{retry::Retry, Service, ServiceExt};
use tracing_futures::{Instrument, Instrumented};
2020-07-08 13:33:39 -07:00
use zebra_chain::{
block::{Block, BlockHeaderHash},
Network,
};
use zebra_consensus::checkpoint;
use zebra_consensus::parameters;
use zebra_network::{self as zn, RetryLimit};
use zebra_state as zs;
// XXX in the future, we may not be able to access the checkpoint module.
const FANOUT: usize = checkpoint::MAX_QUEUED_BLOCKS_PER_HEIGHT;
/// Controls how far ahead of the chain tip the syncer tries to download before
/// waiting for queued verifications to complete. Set to twice the maximum
/// checkpoint distance.
pub const LOOKAHEAD_LIMIT: usize = checkpoint::MAX_CHECKPOINT_HEIGHT_GAP * 2;
#[derive(Debug)]
pub struct Syncer<ZN, ZS, ZV>
where
ZN: Service<zn::Request, Response = zn::Response, Error = Error> + Send + Clone + 'static,
ZN::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = Error> + Send + Clone + 'static,
ZS::Future: Send,
ZV: Service<Arc<Block>, Response = BlockHeaderHash, Error = Error> + Send + Clone + 'static,
ZV::Future: Send,
{
/// Used to perform extendtips requests, with no retry logic (failover is handled using fanout).
tip_network: ZN,
/// Used to download blocks, with retry logic.
block_network: Retry<RetryLimit, ZN>,
state: ZS,
verifier: ZV,
prospective_tips: HashSet<BlockHeaderHash>,
pending_blocks:
Pin<Box<FuturesUnordered<Instrumented<JoinHandle<Result<BlockHeaderHash, Error>>>>>>,
genesis_hash: BlockHeaderHash,
}
impl<ZN, ZS, ZV> Syncer<ZN, ZS, ZV>
where
ZN: Service<zn::Request, Response = zn::Response, Error = Error> + Send + Clone + 'static,
ZN::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = Error> + Send + Clone + 'static,
ZS::Future: Send,
ZV: Service<Arc<Block>, Response = BlockHeaderHash, Error = Error> + Send + Clone + 'static,
ZV::Future: Send,
{
/// Returns a new syncer instance, using:
/// - chain: the zebra-chain `Network` to download (Mainnet or Testnet)
/// - peers: the zebra-network peers to contact for downloads
/// - state: the zebra-state that stores the chain
/// - verifier: the zebra-consensus verifier that checks the chain
pub fn new(chain: Network, peers: ZN, state: ZS, verifier: ZV) -> Self {
let retry_peers = Retry::new(RetryLimit::new(10), peers.clone());
Self {
tip_network: peers,
block_network: retry_peers,
state,
verifier,
prospective_tips: HashSet::new(),
pending_blocks: Box::pin(FuturesUnordered::new()),
genesis_hash: parameters::genesis_hash(chain),
}
}
2020-07-08 13:33:39 -07:00
#[instrument(skip(self))]
pub async fn sync(&mut self) -> Result<(), Report> {
// We can't download the genesis block using our normal algorithm,
// due to protocol limitations
self.request_genesis().await?;
loop {
self.obtain_tips().await?;
2020-08-07 01:04:33 -07:00
self.update_metrics();
// ObtainTips Step 6
//
// If there are any prospective tips, call ExtendTips.
// Continue this step until there are no more prospective tips.
while !self.prospective_tips.is_empty() {
tracing::debug!("extending prospective tips");
2020-07-22 14:46:21 -07:00
self.extend_tips().await?;
2020-08-07 01:04:33 -07:00
self.update_metrics();
// Check whether we need to wait for existing block download tasks to finish
while self.pending_blocks.len() > LOOKAHEAD_LIMIT {
match self
.pending_blocks
.next()
.await
.expect("already checked there's at least one pending block task")
.expect("block download tasks should not panic")
{
Ok(hash) => tracing::debug!(?hash, "verified and committed block to state"),
// This is a non-transient error indicating either that
// we've repeatedly missed a block we need or that we've
// repeatedly missed a bad block suggested by a peer
// feeding us bad hashes.
//
// TODO(hdevalence): handle interruptions in the chain
// sync process. this should detect when we've stopped
// making progress (probably using a timeout), then
// continue the loop with a new invocation of
// obtain_tips(), which will restart block downloads.
Err(e) => tracing::error!(?e, "potentially transient error"),
};
}
2020-08-07 01:04:33 -07:00
// We just added a bunch of failures, update the metrics now,
// because we might be about to reset or delay.
self.update_metrics();
}
delay_for(Duration::from_secs(15)).await;
}
}
/// Given a block_locator list fan out request for subsequent hashes to
/// multiple peers
2020-07-08 13:33:39 -07:00
#[instrument(skip(self))]
async fn obtain_tips(&mut self) -> Result<(), Report> {
// ObtainTips Step 1
//
// Query the current state to construct the sequence of hashes: handled by
// the caller
let block_locator = self
.state
.ready_and()
.await
.map_err(|e| eyre!(e))?
.call(zebra_state::Request::GetBlockLocator {
genesis: self.genesis_hash,
})
.await
.map(|response| match response {
zebra_state::Response::BlockLocator { block_locator } => block_locator,
_ => unreachable!(
"GetBlockLocator request can only result in Response::BlockLocator"
),
})
.map_err(|e| eyre!(e))?;
2020-08-07 01:04:33 -07:00
tracing::debug!(?block_locator, "trying to obtain new chain tips");
// ObtainTips Step 2
//
// Make a FindBlocksByHash request to the network F times, where F is a
// fanout parameter, to get resp1, ..., respF
let mut requests = FuturesUnordered::new();
for _ in 0..FANOUT {
requests.push(
self.tip_network
.ready_and()
.await
.map_err(|e| eyre!(e))?
.call(zn::Request::FindBlocks {
known_blocks: block_locator.clone(),
stop: None,
}),
);
}
let mut download_set = HashSet::new();
while let Some(res) = requests.next().await {
match res.map_err::<Report, _>(|e| eyre!(e)) {
Ok(zn::Response::BlockHeaderHashes(hashes)) => {
if hashes.is_empty() {
tracing::debug!("skipping empty response");
continue;
2020-07-08 13:33:39 -07:00
} else {
tracing::debug!(hashes.len = hashes.len(), "processing response");
}
// ObtainTips Step 3
//
// For each response, starting from the beginning of the
// list, prune any block hashes already included in the
// state, stopping at the first unknown hash to get resp1',
// ..., respF'. (These lists may be empty).
let mut first_unknown = None;
for (i, &hash) in hashes.iter().enumerate() {
if !self.state_contains(hash).await? {
first_unknown = Some(i);
break;
}
}
// Hashes will be empty if we know about all the blocks in the response.
if first_unknown.is_none() {
tracing::debug!("ObtainTips: all hashes are known");
continue;
}
let first_unknown = first_unknown.expect("already checked for None");
2020-07-08 13:33:39 -07:00
tracing::debug!(
first_unknown,
"found index of first unknown hash in response"
);
2020-07-08 13:33:39 -07:00
let unknown_hashes = &hashes[first_unknown..];
let new_tip = *unknown_hashes
.last()
.expect("already checked that unknown hashes isn't empty");
2020-07-08 13:33:39 -07:00
// ObtainTips Step 4:
// Combine the last elements of each list into a set; this is the
// set of prospective tips.
if !download_set.contains(&new_tip) {
2020-08-07 01:04:33 -07:00
tracing::debug!(hashes.len = ?hashes.len(), ?new_tip, "adding new prospective tip");
2020-07-08 13:33:39 -07:00
self.prospective_tips.insert(new_tip);
} else {
tracing::debug!(?new_tip, "discarding tip already queued for download");
}
2020-08-07 01:04:33 -07:00
// ObtainTips Step 5.1
//
// Combine all elements of each list into a set
2020-07-08 13:33:39 -07:00
let prev_download_len = download_set.len();
download_set.extend(unknown_hashes);
let new_download_len = download_set.len();
tracing::debug!(
prev_download_len,
new_download_len,
new_hashes = new_download_len - prev_download_len,
"added hashes to download set"
);
}
Ok(_) => unreachable!("network returned wrong response"),
// We ignore this error because we made multiple fanout requests.
Err(e) => tracing::debug!(?e),
}
}
2020-08-07 01:04:33 -07:00
tracing::debug!(?self.prospective_tips, "ObtainTips: downloading blocks for tips");
// ObtainTips Step 5.2
//
2020-08-07 01:04:33 -07:00
// queue download and verification of those blocks.
self.request_blocks(download_set.into_iter().collect())
.await?;
Ok(())
}
2020-07-08 13:33:39 -07:00
#[instrument(skip(self))]
async fn extend_tips(&mut self) -> Result<(), Report> {
// Extend Tips 1
//
// remove all prospective tips and iterate over them individually
let tips = std::mem::take(&mut self.prospective_tips);
2020-07-08 13:33:39 -07:00
tracing::debug!(?tips, "extending tip set");
let mut download_set = HashSet::new();
for tip in tips {
// ExtendTips Step 2
//
// Create a FindBlocksByHash request consisting of just the
// prospective tip. Send this request to the network F times
let mut responses = FuturesUnordered::new();
for _ in 0..FANOUT {
responses.push(
self.tip_network
.ready_and()
.await
.map_err(|e| eyre!(e))?
.call(zn::Request::FindBlocks {
known_blocks: vec![tip],
stop: None,
}),
);
2020-07-08 13:33:39 -07:00
}
while let Some(res) = responses.next().await {
match res.map_err::<Report, _>(|e| eyre!(e)) {
Ok(zn::Response::BlockHeaderHashes(mut hashes)) => {
// ExtendTips Step 3
//
// For each response, check whether the first hash in the
// response is the genesis block; if so, discard the response.
// It indicates that the remote peer does not have any blocks
// following the prospective tip.
match (hashes.first(), hashes.len()) {
(None, _) => {
tracing::debug!("ExtendTips: skipping empty response");
continue;
}
(_, 1) => {
tracing::debug!("ExtendTips: skipping length-1 response, in case it's an unsolicited inv message");
continue;
}
(Some(hash), _) if (hash == &self.genesis_hash) => {
tracing::debug!(
"ExtendTips: skipping response, peer could not extend the tip"
);
continue;
}
(Some(&hash), _) => {
// Check for hashes we've already seen.
// This happens a lot near the end of the chain.
// This check reduces the number of duplicate
// blocks, but it is not required for
// correctness.
if self.state_contains(hash).await? {
tracing::debug!(
?hash,
"ExtendTips: skipping response, peer returned a duplicate hash: already in state"
);
continue;
}
}
}
let new_tip = hashes.pop().expect("expected: hashes must have len > 0");
// Check for tips we've already seen
// TODO: remove this check once the sync service is more reliable
if self.state_contains(new_tip).await? {
tracing::debug!(
?new_tip,
"ExtendTips: Unexpected duplicate tip from peer: already in state"
);
continue;
}
// ExtendTips Step 4
//
// Combine the last elements of the remaining responses into
// a set, and add this set to the set of prospective tips.
2020-08-07 01:04:33 -07:00
tracing::debug!(hashes.len = ?hashes.len(), ?new_tip, "ExtendTips: extending to new tip");
let _ = self.prospective_tips.insert(new_tip);
2020-08-07 01:04:33 -07:00
let prev_download_len = download_set.len();
download_set.extend(hashes);
2020-08-07 01:04:33 -07:00
let new_download_len = download_set.len();
tracing::debug!(
prev_download_len,
new_download_len,
new_hashes = new_download_len - prev_download_len,
"ExtendTips: added hashes to download set"
);
}
Ok(_) => unreachable!("network returned wrong response"),
// We ignore this error because we made multiple fanout requests.
Err(e) => tracing::debug!("{:?}", e),
}
}
}
// ExtendTips Step ??
//
// Remove tips that are already included behind one of the other
// returned tips
self.prospective_tips
.retain(|tip| !download_set.contains(tip));
2020-08-07 01:04:33 -07:00
tracing::debug!(?self.prospective_tips, "ExtendTips: downloading blocks for tips");
// ExtendTips Step 5
//
// Combine all elements of the remaining responses into a
// set, and queue download and verification of those blocks
self.request_blocks(
download_set
.into_iter()
.chain(self.prospective_tips.iter().cloned())
.collect(),
)
.await?;
Ok(())
}
/// Queue a download for the genesis block, if it isn't currently known to
/// our node.
2020-08-07 01:04:33 -07:00
#[instrument(skip(self))]
async fn request_genesis(&mut self) -> Result<(), Report> {
// Due to Bitcoin protocol limitations, we can't request the genesis
// block using our standard tip-following algorithm:
// - getblocks requires at least one hash
// - responses start with the block *after* the requested block, and
// - the genesis hash is used as a placeholder for "no matches".
//
// So we just queue the genesis block here.
if !self.state_contains(self.genesis_hash).await? {
self.request_blocks(vec![self.genesis_hash]).await?;
}
Ok(())
}
/// Queue downloads for each block that isn't currently known to our node
async fn request_blocks(&mut self, hashes: Vec<BlockHeaderHash>) -> Result<(), Report> {
2020-07-08 13:33:39 -07:00
tracing::debug!(hashes.len = hashes.len(), "requesting blocks");
for hash in hashes.into_iter() {
// TODO: remove this check once the sync service is more reliable
if self.state_contains(hash).await? {
tracing::debug!(
?hash,
"request_blocks: Unexpected duplicate hash: already in state"
);
continue;
}
// We construct the block download requests sequentially, waiting
// for the peer set to be ready to process each request. This
// ensures that we start block downloads in the order we want them
// (though they may resolve out of order), and it means that we
// respect backpressure. Otherwise, if we waited for readiness and
// did the service call in the spawned tasks, all of the spawned
// tasks would race each other waiting for the network to become
// ready.
let block_req = self
.block_network
.ready_and()
.await
.map_err(|e| eyre!(e))?
.call(zn::Request::BlocksByHash(iter::once(hash).collect()));
let span = tracing::info_span!("block_fetch_verify", ?hash);
let mut verifier = self.verifier.clone();
2020-07-21 15:40:00 -07:00
let task = tokio::spawn(async move {
let block = match block_req.await {
2020-07-21 15:40:00 -07:00
Ok(zn::Response::Blocks(blocks)) => blocks
.into_iter()
.next()
.expect("successful response has the block in it"),
Ok(_) => unreachable!("wrong response to block request"),
Err(e) => return Err(e),
};
2020-07-22 14:46:21 -07:00
metrics::counter!("sync.downloaded_blocks", 1);
let result = verifier.ready_and().await?.call(block).await;
metrics::counter!("sync.verified_blocks", 1);
result
2020-07-21 15:40:00 -07:00
})
.instrument(span);
self.pending_blocks.push(task);
}
Ok(())
}
2020-08-07 01:04:33 -07:00
/// Returns `Ok(true)` if the hash is present in the state, and `Ok(false)`
/// if the hash is not present in the state.
///
/// Returns `Err(_)` if an error occurs.
///
/// TODO: handle multiple tips in the state.
#[instrument(skip(self))]
async fn state_contains(&mut self, hash: BlockHeaderHash) -> Result<bool, Report> {
match self
.state
.ready_and()
.await
.map_err(|e| eyre!(e))?
.call(zebra_state::Request::GetDepth { hash })
.await
.map_err(|e| eyre!(e))?
{
zs::Response::Depth(Some(_)) => Ok(true),
zs::Response::Depth(None) => Ok(false),
_ => unreachable!("wrong response to depth request"),
}
}
2020-08-07 01:04:33 -07:00
/// Update metrics gauges, and create a trace containing metrics.
fn update_metrics(&self) {
metrics::gauge!(
"sync.prospective_tips.len",
self.prospective_tips.len() as i64
);
metrics::gauge!("sync.pending_blocks.len", self.pending_blocks.len() as i64);
tracing::info!(
tips.len = self.prospective_tips.len(),
pending.len = self.pending_blocks.len(),
pending.limit = LOOKAHEAD_LIMIT,
);
}
}
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;