Security: Drop blocks that are a long way ahead of the tip (#3167)
* Document the chain verifier * Drop gossiped blocks that are too far ahead of the tip * Add extra gossiped block metrics * Allow extra gossiped blocks, now we have a stricter limit * Fix a comment * Check the exact number of blocks in a downloaded block response * Drop synced blocks that are too far ahead of the tip * Add extra synced block metrics * Test dropping gossiped blocks that are too far ahead of the tip * Allow an extra checkpoint's worth of blocks in the verifier queues * Actually let's try two extra checkpoints * Scale extra height limit with lookahead limit * Also drop blocks that are behind the finalized tip * Downgrade a noisy log * Use a debug log for already verified gossiped blocks * Use debug logs for already verified synced blocks
This commit is contained in:
parent
852c5d63bb
commit
a4d1a1801c
|
@ -76,9 +76,18 @@ where
|
||||||
+ 'static,
|
+ 'static,
|
||||||
V::Future: Send + 'static,
|
V::Future: Send + 'static,
|
||||||
{
|
{
|
||||||
block: BlockVerifier<S, V>,
|
/// The checkpointing block verifier.
|
||||||
|
///
|
||||||
|
/// Always used for blocks before `Canopy`, optionally used for the entire checkpoint list.
|
||||||
checkpoint: CheckpointVerifier<S>,
|
checkpoint: CheckpointVerifier<S>,
|
||||||
|
|
||||||
|
/// The highest permitted checkpoint block.
|
||||||
|
///
|
||||||
|
/// This height must be in the `checkpoint` verifier's checkpoint list.
|
||||||
max_checkpoint_height: block::Height,
|
max_checkpoint_height: block::Height,
|
||||||
|
|
||||||
|
/// The full block verifier, used for blocks after `max_checkpoint_height`.
|
||||||
|
block: BlockVerifier<S, V>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An error while semantically verifying a block.
|
/// An error while semantically verifying a block.
|
||||||
|
@ -248,9 +257,9 @@ where
|
||||||
let block = BlockVerifier::new(network, state_service.clone(), transaction.clone());
|
let block = BlockVerifier::new(network, state_service.clone(), transaction.clone());
|
||||||
let checkpoint = CheckpointVerifier::from_checkpoint_list(list, network, tip, state_service);
|
let checkpoint = CheckpointVerifier::from_checkpoint_list(list, network, tip, state_service);
|
||||||
let chain = ChainVerifier {
|
let chain = ChainVerifier {
|
||||||
block,
|
|
||||||
checkpoint,
|
checkpoint,
|
||||||
max_checkpoint_height,
|
max_checkpoint_height,
|
||||||
|
block,
|
||||||
};
|
};
|
||||||
|
|
||||||
let chain = Buffer::new(BoxService::new(chain), VERIFIER_BUFFER_BOUND);
|
let chain = Buffer::new(BoxService::new(chain), VERIFIER_BUFFER_BOUND);
|
||||||
|
|
|
@ -114,8 +114,9 @@ impl StartCmd {
|
||||||
let (syncer, sync_status) = ChainSync::new(
|
let (syncer, sync_status) = ChainSync::new(
|
||||||
&config,
|
&config,
|
||||||
peer_set.clone(),
|
peer_set.clone(),
|
||||||
state.clone(),
|
|
||||||
chain_verifier.clone(),
|
chain_verifier.clone(),
|
||||||
|
state.clone(),
|
||||||
|
latest_chain_tip.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
info!("initializing mempool");
|
info!("initializing mempool");
|
||||||
|
@ -125,7 +126,7 @@ impl StartCmd {
|
||||||
state.clone(),
|
state.clone(),
|
||||||
tx_verifier,
|
tx_verifier,
|
||||||
sync_status.clone(),
|
sync_status.clone(),
|
||||||
latest_chain_tip,
|
latest_chain_tip.clone(),
|
||||||
chain_tip_change.clone(),
|
chain_tip_change.clone(),
|
||||||
);
|
);
|
||||||
let mempool = BoxService::new(mempool);
|
let mempool = BoxService::new(mempool);
|
||||||
|
@ -137,6 +138,7 @@ impl StartCmd {
|
||||||
block_verifier: chain_verifier,
|
block_verifier: chain_verifier,
|
||||||
mempool: mempool.clone(),
|
mempool: mempool.clone(),
|
||||||
state,
|
state,
|
||||||
|
latest_chain_tip,
|
||||||
};
|
};
|
||||||
setup_tx
|
setup_tx
|
||||||
.send(setup_data)
|
.send(setup_data)
|
||||||
|
|
|
@ -68,6 +68,9 @@ pub struct InboundSetupData {
|
||||||
|
|
||||||
/// A service that manages cached blockchain state.
|
/// A service that manages cached blockchain state.
|
||||||
pub state: State,
|
pub state: State,
|
||||||
|
|
||||||
|
/// Allows efficient access to the best tip of the blockchain.
|
||||||
|
pub latest_chain_tip: zs::LatestChainTip,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Tracks the internal state of the [`Inbound`] service during setup.
|
/// Tracks the internal state of the [`Inbound`] service during setup.
|
||||||
|
@ -199,12 +202,14 @@ impl Service<zn::Request> for Inbound {
|
||||||
block_verifier,
|
block_verifier,
|
||||||
mempool,
|
mempool,
|
||||||
state,
|
state,
|
||||||
|
latest_chain_tip,
|
||||||
} = setup_data;
|
} = setup_data;
|
||||||
|
|
||||||
let block_downloads = Box::pin(BlockDownloads::new(
|
let block_downloads = Box::pin(BlockDownloads::new(
|
||||||
Timeout::new(block_download_peer_set.clone(), BLOCK_DOWNLOAD_TIMEOUT),
|
Timeout::new(block_download_peer_set.clone(), BLOCK_DOWNLOAD_TIMEOUT),
|
||||||
Timeout::new(block_verifier, BLOCK_VERIFY_TIMEOUT),
|
Timeout::new(block_verifier, BLOCK_VERIFY_TIMEOUT),
|
||||||
state.clone(),
|
state.clone(),
|
||||||
|
latest_chain_tip,
|
||||||
));
|
));
|
||||||
|
|
||||||
result = Ok(());
|
result = Ok(());
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
|
convert::TryFrom,
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
|
@ -15,13 +16,17 @@ use tokio::{sync::oneshot, task::JoinHandle};
|
||||||
use tower::{Service, ServiceExt};
|
use tower::{Service, ServiceExt};
|
||||||
use tracing_futures::Instrument;
|
use tracing_futures::Instrument;
|
||||||
|
|
||||||
use zebra_chain::block::{self, Block};
|
use zebra_chain::{
|
||||||
|
block::{self, Block},
|
||||||
|
chain_tip::ChainTip,
|
||||||
|
};
|
||||||
use zebra_network as zn;
|
use zebra_network as zn;
|
||||||
use zebra_state as zs;
|
use zebra_state as zs;
|
||||||
|
|
||||||
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||||
|
|
||||||
/// The maximum number of concurrent inbound download and verify tasks.
|
/// The maximum number of concurrent inbound download and verify tasks.
|
||||||
|
/// Also used as the maximum lookahead limit, before block verification.
|
||||||
///
|
///
|
||||||
/// We expect the syncer to download and verify checkpoints, so this bound
|
/// We expect the syncer to download and verify checkpoints, so this bound
|
||||||
/// can be small.
|
/// can be small.
|
||||||
|
@ -33,6 +38,7 @@ type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||||
///
|
///
|
||||||
/// The maximum block size is 2 million bytes. A deserialized malicious
|
/// The maximum block size is 2 million bytes. A deserialized malicious
|
||||||
/// block with ~225_000 transparent outputs can take up 9MB of RAM.
|
/// block with ~225_000 transparent outputs can take up 9MB of RAM.
|
||||||
|
/// So the maximum inbound queue usage is `MAX_INBOUND_CONCURRENCY * 9 MB`.
|
||||||
/// (See #1880 for more details.)
|
/// (See #1880 for more details.)
|
||||||
///
|
///
|
||||||
/// Malicious blocks will eventually timeout or fail contextual validation.
|
/// Malicious blocks will eventually timeout or fail contextual validation.
|
||||||
|
@ -41,7 +47,7 @@ type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||||
/// Since Zebra keeps an `inv` index, inbound downloads for malicious blocks
|
/// Since Zebra keeps an `inv` index, inbound downloads for malicious blocks
|
||||||
/// will be directed to the malicious node that originally gossiped the hash.
|
/// will be directed to the malicious node that originally gossiped the hash.
|
||||||
/// Therefore, this attack can be carried out by a single malicious node.
|
/// Therefore, this attack can be carried out by a single malicious node.
|
||||||
const MAX_INBOUND_CONCURRENCY: usize = 10;
|
const MAX_INBOUND_CONCURRENCY: usize = 20;
|
||||||
|
|
||||||
/// The action taken in response to a peer's gossiped block hash.
|
/// The action taken in response to a peer's gossiped block hash.
|
||||||
pub enum DownloadAction {
|
pub enum DownloadAction {
|
||||||
|
@ -83,6 +89,9 @@ where
|
||||||
/// A service that manages cached blockchain state.
|
/// A service that manages cached blockchain state.
|
||||||
state: ZS,
|
state: ZS,
|
||||||
|
|
||||||
|
/// Allows efficient access to the best tip of the blockchain.
|
||||||
|
latest_chain_tip: zs::LatestChainTip,
|
||||||
|
|
||||||
// Internal downloads state
|
// Internal downloads state
|
||||||
/// A list of pending block download and verify tasks.
|
/// A list of pending block download and verify tasks.
|
||||||
#[pin]
|
#[pin]
|
||||||
|
@ -145,17 +154,18 @@ where
|
||||||
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
|
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
|
||||||
ZS::Future: Send,
|
ZS::Future: Send,
|
||||||
{
|
{
|
||||||
/// Initialize a new download stream with the provided `network` and
|
/// Initialize a new download stream with the provided `network`, `verifier`, and `state` services.
|
||||||
/// `verifier` services.
|
/// The `latest_chain_tip` must be linked to the provided `state` service.
|
||||||
///
|
///
|
||||||
/// The [`Downloads`] stream is agnostic to the network policy, so retry and
|
/// The [`Downloads`] stream is agnostic to the network policy, so retry and
|
||||||
/// timeout limits should be applied to the `network` service passed into
|
/// timeout limits should be applied to the `network` service passed into
|
||||||
/// this constructor.
|
/// this constructor.
|
||||||
pub fn new(network: ZN, verifier: ZV, state: ZS) -> Self {
|
pub fn new(network: ZN, verifier: ZV, state: ZS, latest_chain_tip: zs::LatestChainTip) -> Self {
|
||||||
Self {
|
Self {
|
||||||
network,
|
network,
|
||||||
verifier,
|
verifier,
|
||||||
state,
|
state,
|
||||||
|
latest_chain_tip,
|
||||||
pending: FuturesUnordered::new(),
|
pending: FuturesUnordered::new(),
|
||||||
cancel_handles: HashMap::new(),
|
cancel_handles: HashMap::new(),
|
||||||
}
|
}
|
||||||
|
@ -173,19 +183,23 @@ where
|
||||||
?MAX_INBOUND_CONCURRENCY,
|
?MAX_INBOUND_CONCURRENCY,
|
||||||
"block hash already queued for inbound download: ignored block"
|
"block hash already queued for inbound download: ignored block"
|
||||||
);
|
);
|
||||||
|
|
||||||
metrics::gauge!("gossip.queued.block.count", self.pending.len() as _);
|
metrics::gauge!("gossip.queued.block.count", self.pending.len() as _);
|
||||||
|
metrics::counter!("gossip.already.queued.dropped.block.hash.count", 1);
|
||||||
|
|
||||||
return DownloadAction::AlreadyQueued;
|
return DownloadAction::AlreadyQueued;
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.pending.len() >= MAX_INBOUND_CONCURRENCY {
|
if self.pending.len() >= MAX_INBOUND_CONCURRENCY {
|
||||||
tracing::info!(
|
tracing::debug!(
|
||||||
?hash,
|
?hash,
|
||||||
queue_len = self.pending.len(),
|
queue_len = self.pending.len(),
|
||||||
?MAX_INBOUND_CONCURRENCY,
|
?MAX_INBOUND_CONCURRENCY,
|
||||||
"too many blocks queued for inbound download: ignored block"
|
"too many blocks queued for inbound download: ignored block"
|
||||||
);
|
);
|
||||||
|
|
||||||
metrics::gauge!("gossip.queued.block.count", self.pending.len() as _);
|
metrics::gauge!("gossip.queued.block.count", self.pending.len() as _);
|
||||||
|
metrics::counter!("gossip.full.queue.dropped.block.hash.count", 1);
|
||||||
|
|
||||||
return DownloadAction::FullQueue;
|
return DownloadAction::FullQueue;
|
||||||
}
|
}
|
||||||
|
@ -196,6 +210,7 @@ where
|
||||||
let state = self.state.clone();
|
let state = self.state.clone();
|
||||||
let network = self.network.clone();
|
let network = self.network.clone();
|
||||||
let verifier = self.verifier.clone();
|
let verifier = self.verifier.clone();
|
||||||
|
let latest_chain_tip = self.latest_chain_tip.clone();
|
||||||
|
|
||||||
let fut = async move {
|
let fut = async move {
|
||||||
// Check if the block is already in the state.
|
// Check if the block is already in the state.
|
||||||
|
@ -212,6 +227,12 @@ where
|
||||||
.oneshot(zn::Request::BlocksByHash(std::iter::once(hash).collect()))
|
.oneshot(zn::Request::BlocksByHash(std::iter::once(hash).collect()))
|
||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
|
assert_eq!(
|
||||||
|
blocks.len(),
|
||||||
|
1,
|
||||||
|
"wrong number of blocks in response to a single hash"
|
||||||
|
);
|
||||||
|
|
||||||
blocks
|
blocks
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.next()
|
.next()
|
||||||
|
@ -221,6 +242,75 @@ where
|
||||||
};
|
};
|
||||||
metrics::counter!("gossip.downloaded.block.count", 1);
|
metrics::counter!("gossip.downloaded.block.count", 1);
|
||||||
|
|
||||||
|
// # Security & Performance
|
||||||
|
//
|
||||||
|
// Reject blocks that are too far ahead of our tip,
|
||||||
|
// and blocks that are behind the finalized tip.
|
||||||
|
//
|
||||||
|
// Avoids denial of service attacks. Also reduces wasted work on high blocks
|
||||||
|
// that will timeout before being verified, and low blocks that can never be finalized.
|
||||||
|
let tip_height = latest_chain_tip.best_tip_height();
|
||||||
|
|
||||||
|
let max_lookahead_height = if let Some(tip_height) = tip_height {
|
||||||
|
let lookahead = i32::try_from(MAX_INBOUND_CONCURRENCY).expect("fits in i32");
|
||||||
|
(tip_height + lookahead).expect("tip is much lower than Height::MAX")
|
||||||
|
} else {
|
||||||
|
let genesis_lookahead =
|
||||||
|
u32::try_from(MAX_INBOUND_CONCURRENCY - 1).expect("fits in u32");
|
||||||
|
block::Height(genesis_lookahead)
|
||||||
|
};
|
||||||
|
|
||||||
|
// Get the finalized tip height, assuming we're using the non-finalized state.
|
||||||
|
//
|
||||||
|
// It doesn't matter if we're a few blocks off here, because blocks this low
|
||||||
|
// are part of a fork with much less work. So they would be rejected anyway.
|
||||||
|
//
|
||||||
|
// And if we're still checkpointing, the checkpointer will reject blocks behind
|
||||||
|
// the finalized tip anyway.
|
||||||
|
//
|
||||||
|
// TODO: get the actual finalized tip height
|
||||||
|
let min_accepted_height = tip_height
|
||||||
|
.map(|tip_height| {
|
||||||
|
block::Height(tip_height.0.saturating_sub(zs::MAX_BLOCK_REORG_HEIGHT))
|
||||||
|
})
|
||||||
|
.unwrap_or(block::Height(0));
|
||||||
|
|
||||||
|
if let Some(block_height) = block.coinbase_height() {
|
||||||
|
if block_height > max_lookahead_height {
|
||||||
|
tracing::info!(
|
||||||
|
?hash,
|
||||||
|
?block_height,
|
||||||
|
?tip_height,
|
||||||
|
?max_lookahead_height,
|
||||||
|
lookahead_limit = ?MAX_INBOUND_CONCURRENCY,
|
||||||
|
"gossiped block height too far ahead of the tip: dropped downloaded block"
|
||||||
|
);
|
||||||
|
metrics::counter!("gossip.max.height.limit.dropped.block.count", 1);
|
||||||
|
|
||||||
|
Err("gossiped block height too far ahead")?;
|
||||||
|
} else if block_height < min_accepted_height {
|
||||||
|
tracing::debug!(
|
||||||
|
?hash,
|
||||||
|
?block_height,
|
||||||
|
?tip_height,
|
||||||
|
?min_accepted_height,
|
||||||
|
behind_tip_limit = ?zs::MAX_BLOCK_REORG_HEIGHT,
|
||||||
|
"gossiped block height behind the finalized tip: dropped downloaded block"
|
||||||
|
);
|
||||||
|
metrics::counter!("gossip.min.height.limit.dropped.block.count", 1);
|
||||||
|
|
||||||
|
Err("gossiped block height behind the finalized tip")?;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tracing::info!(
|
||||||
|
?hash,
|
||||||
|
"gossiped block with no height: dropped downloaded block"
|
||||||
|
);
|
||||||
|
metrics::counter!("gossip.no.height.dropped.block.count", 1);
|
||||||
|
|
||||||
|
Err("gossiped block with no height")?;
|
||||||
|
}
|
||||||
|
|
||||||
verifier.oneshot(block).await
|
verifier.oneshot(block).await
|
||||||
}
|
}
|
||||||
.map_ok(|hash| {
|
.map_ok(|hash| {
|
||||||
|
|
|
@ -1,7 +1,11 @@
|
||||||
//! Inbound service tests.
|
//! Inbound service tests.
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashSet, iter::FromIterator, net::SocketAddr, str::FromStr, sync::Arc,
|
collections::HashSet,
|
||||||
|
iter::{self, FromIterator},
|
||||||
|
net::SocketAddr,
|
||||||
|
str::FromStr,
|
||||||
|
sync::Arc,
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -562,6 +566,92 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Test that the inbound downloader rejects blocks above the lookahead limit.
|
||||||
|
///
|
||||||
|
/// TODO: also test that it rejects blocks behind the tip limit. (Needs ~100 fake blocks.)
|
||||||
|
#[tokio::test]
|
||||||
|
async fn inbound_block_height_lookahead_limit() -> Result<(), crate::BoxError> {
|
||||||
|
// Get services
|
||||||
|
let (
|
||||||
|
inbound_service,
|
||||||
|
_mempool,
|
||||||
|
_committed_blocks,
|
||||||
|
_added_transactions,
|
||||||
|
mut tx_verifier,
|
||||||
|
mut peer_set,
|
||||||
|
state_service,
|
||||||
|
sync_gossip_task_handle,
|
||||||
|
tx_gossip_task_handle,
|
||||||
|
) = setup(false).await;
|
||||||
|
|
||||||
|
// Get the next block
|
||||||
|
let block: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_2_BYTES.zcash_deserialize_into()?;
|
||||||
|
let block_hash = block.hash();
|
||||||
|
|
||||||
|
// Push test block hash
|
||||||
|
let _request = inbound_service
|
||||||
|
.clone()
|
||||||
|
.oneshot(Request::AdvertiseBlock(block_hash))
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Block is fetched, and committed to the state
|
||||||
|
peer_set
|
||||||
|
.expect_request(Request::BlocksByHash(iter::once(block_hash).collect()))
|
||||||
|
.await
|
||||||
|
.respond(Response::Blocks(vec![block]));
|
||||||
|
|
||||||
|
// TODO: check that the block is queued in the checkpoint verifier
|
||||||
|
|
||||||
|
// check that nothing unexpected happened
|
||||||
|
peer_set.expect_no_requests().await;
|
||||||
|
tx_verifier.expect_no_requests().await;
|
||||||
|
|
||||||
|
// Get a block that is a long way away from genesis
|
||||||
|
let block: Arc<Block> =
|
||||||
|
zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?;
|
||||||
|
let block_hash = block.hash();
|
||||||
|
|
||||||
|
// Push test block hash
|
||||||
|
let _request = inbound_service
|
||||||
|
.clone()
|
||||||
|
.oneshot(Request::AdvertiseBlock(block_hash))
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Block is fetched, but the downloader drops it because it is too high
|
||||||
|
peer_set
|
||||||
|
.expect_request(Request::BlocksByHash(iter::once(block_hash).collect()))
|
||||||
|
.await
|
||||||
|
.respond(Response::Blocks(vec![block]));
|
||||||
|
|
||||||
|
let response = state_service
|
||||||
|
.clone()
|
||||||
|
.oneshot(zebra_state::Request::Depth(block_hash))
|
||||||
|
.await?;
|
||||||
|
assert_eq!(response, zebra_state::Response::Depth(None));
|
||||||
|
|
||||||
|
// TODO: check that the block is not queued in the checkpoint verifier or non-finalized state
|
||||||
|
|
||||||
|
// check that nothing unexpected happened
|
||||||
|
peer_set.expect_no_requests().await;
|
||||||
|
tx_verifier.expect_no_requests().await;
|
||||||
|
|
||||||
|
let sync_gossip_result = sync_gossip_task_handle.now_or_never();
|
||||||
|
assert!(
|
||||||
|
matches!(sync_gossip_result, None),
|
||||||
|
"unexpected error or panic in sync gossip task: {:?}",
|
||||||
|
sync_gossip_result,
|
||||||
|
);
|
||||||
|
|
||||||
|
let tx_gossip_result = tx_gossip_task_handle.now_or_never();
|
||||||
|
assert!(
|
||||||
|
matches!(tx_gossip_result, None),
|
||||||
|
"unexpected error or panic in transaction gossip task: {:?}",
|
||||||
|
tx_gossip_result,
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn setup(
|
async fn setup(
|
||||||
add_transactions: bool,
|
add_transactions: bool,
|
||||||
) -> (
|
) -> (
|
||||||
|
@ -647,7 +737,7 @@ async fn setup(
|
||||||
state_service.clone(),
|
state_service.clone(),
|
||||||
buffered_tx_verifier.clone(),
|
buffered_tx_verifier.clone(),
|
||||||
sync_status.clone(),
|
sync_status.clone(),
|
||||||
latest_chain_tip,
|
latest_chain_tip.clone(),
|
||||||
chain_tip_change.clone(),
|
chain_tip_change.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -677,6 +767,7 @@ async fn setup(
|
||||||
block_verifier,
|
block_verifier,
|
||||||
mempool: mempool_service.clone(),
|
mempool: mempool_service.clone(),
|
||||||
state: state_service.clone(),
|
state: state_service.clone(),
|
||||||
|
latest_chain_tip,
|
||||||
};
|
};
|
||||||
let r = setup_tx.send(setup_data);
|
let r = setup_tx.send(setup_data);
|
||||||
// We can't expect or unwrap because the returned Result does not implement Debug
|
// We can't expect or unwrap because the returned Result does not implement Debug
|
||||||
|
|
|
@ -219,7 +219,7 @@ pub struct Mempool {
|
||||||
/// If the state's best chain tip has reached this height, always enable the mempool.
|
/// If the state's best chain tip has reached this height, always enable the mempool.
|
||||||
debug_enable_at_height: Option<Height>,
|
debug_enable_at_height: Option<Height>,
|
||||||
|
|
||||||
/// Allow efficient access to the best tip of the blockchain.
|
/// Allows efficient access to the best tip of the blockchain.
|
||||||
latest_chain_tip: zs::LatestChainTip,
|
latest_chain_tip: zs::LatestChainTip,
|
||||||
|
|
||||||
/// Allows the detection of newly added chain tip blocks,
|
/// Allows the detection of newly added chain tip blocks,
|
||||||
|
|
|
@ -21,6 +21,7 @@ use zebra_consensus::{
|
||||||
};
|
};
|
||||||
use zebra_network as zn;
|
use zebra_network as zn;
|
||||||
use zebra_state as zs;
|
use zebra_state as zs;
|
||||||
|
use zs::LatestChainTip;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
async_ext::NowOrLater, components::sync::downloads::BlockDownloadVerifyError,
|
async_ext::NowOrLater, components::sync::downloads::BlockDownloadVerifyError,
|
||||||
|
@ -80,6 +81,15 @@ pub const MIN_LOOKAHEAD_LIMIT: usize = zebra_consensus::MAX_CHECKPOINT_HEIGHT_GA
|
||||||
/// See [`MIN_LOOKAHEAD_LIMIT`] for details.
|
/// See [`MIN_LOOKAHEAD_LIMIT`] for details.
|
||||||
pub const DEFAULT_LOOKAHEAD_LIMIT: usize = zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP * 5;
|
pub const DEFAULT_LOOKAHEAD_LIMIT: usize = zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP * 5;
|
||||||
|
|
||||||
|
/// The expected maximum number of hashes in an ObtainTips or ExtendTips response.
|
||||||
|
///
|
||||||
|
/// This is used to allow block heights that are slightly beyond the lookahead limit,
|
||||||
|
/// but still limit the number of blocks in the pipeline between the downloader and
|
||||||
|
/// the state.
|
||||||
|
///
|
||||||
|
/// See [`MIN_LOOKAHEAD_LIMIT`] for details.
|
||||||
|
pub const MAX_TIPS_RESPONSE_HASH_COUNT: usize = 500;
|
||||||
|
|
||||||
/// Controls how long we wait for a tips response to return.
|
/// Controls how long we wait for a tips response to return.
|
||||||
///
|
///
|
||||||
/// ## Correctness
|
/// ## Correctness
|
||||||
|
@ -236,11 +246,18 @@ where
|
||||||
/// Returns a new syncer instance, using:
|
/// Returns a new syncer instance, using:
|
||||||
/// - chain: the zebra-chain `Network` to download (Mainnet or Testnet)
|
/// - chain: the zebra-chain `Network` to download (Mainnet or Testnet)
|
||||||
/// - peers: the zebra-network peers to contact for downloads
|
/// - peers: the zebra-network peers to contact for downloads
|
||||||
/// - state: the zebra-state that stores the chain
|
|
||||||
/// - verifier: the zebra-consensus verifier that checks the chain
|
/// - verifier: the zebra-consensus verifier that checks the chain
|
||||||
|
/// - state: the zebra-state that stores the chain
|
||||||
|
/// - latest_chain_tip: the latest chain tip from `state`
|
||||||
///
|
///
|
||||||
/// Also returns a [`SyncStatus`] to check if the syncer has likely reached the chain tip.
|
/// Also returns a [`SyncStatus`] to check if the syncer has likely reached the chain tip.
|
||||||
pub fn new(config: &ZebradConfig, peers: ZN, state: ZS, verifier: ZV) -> (Self, SyncStatus) {
|
pub fn new(
|
||||||
|
config: &ZebradConfig,
|
||||||
|
peers: ZN,
|
||||||
|
verifier: ZV,
|
||||||
|
state: ZS,
|
||||||
|
latest_chain_tip: LatestChainTip,
|
||||||
|
) -> (Self, SyncStatus) {
|
||||||
let tip_network = Timeout::new(peers.clone(), TIPS_RESPONSE_TIMEOUT);
|
let tip_network = Timeout::new(peers.clone(), TIPS_RESPONSE_TIMEOUT);
|
||||||
// The Hedge middleware is the outermost layer, hedging requests
|
// The Hedge middleware is the outermost layer, hedging requests
|
||||||
// between two retry-wrapped networks. The innermost timeout
|
// between two retry-wrapped networks. The innermost timeout
|
||||||
|
@ -282,7 +299,12 @@ where
|
||||||
genesis_hash: genesis_hash(config.network.network),
|
genesis_hash: genesis_hash(config.network.network),
|
||||||
lookahead_limit: config.sync.lookahead_limit,
|
lookahead_limit: config.sync.lookahead_limit,
|
||||||
tip_network,
|
tip_network,
|
||||||
downloads: Box::pin(Downloads::new(block_network, verifier)),
|
downloads: Box::pin(Downloads::new(
|
||||||
|
block_network,
|
||||||
|
verifier,
|
||||||
|
latest_chain_tip,
|
||||||
|
config.sync.lookahead_limit,
|
||||||
|
)),
|
||||||
state,
|
state,
|
||||||
prospective_tips: HashSet::new(),
|
prospective_tips: HashSet::new(),
|
||||||
recent_syncs,
|
recent_syncs,
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
|
convert::TryFrom,
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
|
@ -17,11 +18,36 @@ use tokio::{sync::oneshot, task::JoinHandle};
|
||||||
use tower::{hedge, Service, ServiceExt};
|
use tower::{hedge, Service, ServiceExt};
|
||||||
use tracing_futures::Instrument;
|
use tracing_futures::Instrument;
|
||||||
|
|
||||||
use zebra_chain::block::{self, Block};
|
use zebra_chain::{
|
||||||
|
block::{self, Block},
|
||||||
|
chain_tip::ChainTip,
|
||||||
|
};
|
||||||
use zebra_network as zn;
|
use zebra_network as zn;
|
||||||
|
use zebra_state as zs;
|
||||||
|
|
||||||
|
use super::{DEFAULT_LOOKAHEAD_LIMIT, MAX_TIPS_RESPONSE_HASH_COUNT};
|
||||||
|
|
||||||
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||||
|
|
||||||
|
/// A divisor used to calculate the extra number of blocks we allow in the
|
||||||
|
/// verifier and state pipelines, on top of the lookahead limit.
|
||||||
|
///
|
||||||
|
/// The extra number of blocks is calculated using
|
||||||
|
/// `lookahead_limit / VERIFICATION_PIPELINE_SCALING_DIVISOR`.
|
||||||
|
///
|
||||||
|
/// For the default lookahead limit, the extra number of blocks is
|
||||||
|
/// `2 * MAX_TIPS_RESPONSE_HASH_COUNT`.
|
||||||
|
///
|
||||||
|
/// This allows the verifier and state queues to hold an extra two tips responses worth of blocks,
|
||||||
|
/// even if the syncer queue is full. Any unused capacity is shared between both queues.
|
||||||
|
///
|
||||||
|
/// Since the syncer queue is limited to the `lookahead_limit`,
|
||||||
|
/// the rest of the capacity is reserved for the other queues.
|
||||||
|
/// There is no reserved capacity for the syncer queue:
|
||||||
|
/// if the other queues stay full, the syncer will eventually time out and reset.
|
||||||
|
const VERIFICATION_PIPELINE_SCALING_DIVISOR: usize =
|
||||||
|
DEFAULT_LOOKAHEAD_LIMIT / (2 * MAX_TIPS_RESPONSE_HASH_COUNT);
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug)]
|
#[derive(Copy, Clone, Debug)]
|
||||||
pub(super) struct AlwaysHedge;
|
pub(super) struct AlwaysHedge;
|
||||||
|
|
||||||
|
@ -47,6 +73,15 @@ pub enum BlockDownloadVerifyError {
|
||||||
#[error("block did not pass consensus validation")]
|
#[error("block did not pass consensus validation")]
|
||||||
Invalid(#[from] zebra_consensus::chain::VerifyChainError),
|
Invalid(#[from] zebra_consensus::chain::VerifyChainError),
|
||||||
|
|
||||||
|
#[error("downloaded block was too far ahead of the chain tip")]
|
||||||
|
AboveLookaheadHeightLimit,
|
||||||
|
|
||||||
|
#[error("downloaded block was too far behind the chain tip")]
|
||||||
|
BehindTipHeightLimit,
|
||||||
|
|
||||||
|
#[error("downloaded block had an invalid height")]
|
||||||
|
InvalidHeight,
|
||||||
|
|
||||||
#[error("block download / verification was cancelled during download")]
|
#[error("block download / verification was cancelled during download")]
|
||||||
CancelledDuringDownload,
|
CancelledDuringDownload,
|
||||||
|
|
||||||
|
@ -72,6 +107,13 @@ where
|
||||||
/// A service that verifies downloaded blocks.
|
/// A service that verifies downloaded blocks.
|
||||||
verifier: ZV,
|
verifier: ZV,
|
||||||
|
|
||||||
|
/// Allows efficient access to the best tip of the blockchain.
|
||||||
|
latest_chain_tip: zs::LatestChainTip,
|
||||||
|
|
||||||
|
// Configuration
|
||||||
|
/// The configured lookahead limit, after applying the minimum limit.
|
||||||
|
lookahead_limit: usize,
|
||||||
|
|
||||||
// Internal downloads state
|
// Internal downloads state
|
||||||
/// A list of pending block download and verify tasks.
|
/// A list of pending block download and verify tasks.
|
||||||
#[pin]
|
#[pin]
|
||||||
|
@ -132,15 +174,23 @@ where
|
||||||
ZV::Future: Send,
|
ZV::Future: Send,
|
||||||
{
|
{
|
||||||
/// Initialize a new download stream with the provided `network` and
|
/// Initialize a new download stream with the provided `network` and
|
||||||
/// `verifier` services.
|
/// `verifier` services. Uses the `latest_chain_tip` and `lookahead_limit`
|
||||||
|
/// to drop blocks that are too far ahead of the current state tip.
|
||||||
///
|
///
|
||||||
/// The [`Downloads`] stream is agnostic to the network policy, so retry and
|
/// The [`Downloads`] stream is agnostic to the network policy, so retry and
|
||||||
/// timeout limits should be applied to the `network` service passed into
|
/// timeout limits should be applied to the `network` service passed into
|
||||||
/// this constructor.
|
/// this constructor.
|
||||||
pub fn new(network: ZN, verifier: ZV) -> Self {
|
pub fn new(
|
||||||
|
network: ZN,
|
||||||
|
verifier: ZV,
|
||||||
|
latest_chain_tip: zs::LatestChainTip,
|
||||||
|
lookahead_limit: usize,
|
||||||
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
network,
|
network,
|
||||||
verifier,
|
verifier,
|
||||||
|
latest_chain_tip,
|
||||||
|
lookahead_limit,
|
||||||
pending: FuturesUnordered::new(),
|
pending: FuturesUnordered::new(),
|
||||||
cancel_handles: HashMap::new(),
|
cancel_handles: HashMap::new(),
|
||||||
}
|
}
|
||||||
|
@ -154,6 +204,7 @@ where
|
||||||
#[instrument(level = "debug", skip(self), fields(%hash))]
|
#[instrument(level = "debug", skip(self), fields(%hash))]
|
||||||
pub async fn download_and_verify(&mut self, hash: block::Hash) -> Result<(), Report> {
|
pub async fn download_and_verify(&mut self, hash: block::Hash) -> Result<(), Report> {
|
||||||
if self.cancel_handles.contains_key(&hash) {
|
if self.cancel_handles.contains_key(&hash) {
|
||||||
|
metrics::counter!("sync.already.queued.dropped.block.hash.count", 1);
|
||||||
return Err(eyre!("duplicate hash queued for download: {:?}", hash));
|
return Err(eyre!("duplicate hash queued for download: {:?}", hash));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -177,6 +228,9 @@ where
|
||||||
let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
|
let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
|
||||||
|
|
||||||
let mut verifier = self.verifier.clone();
|
let mut verifier = self.verifier.clone();
|
||||||
|
let latest_chain_tip = self.latest_chain_tip.clone();
|
||||||
|
let lookahead_limit = self.lookahead_limit;
|
||||||
|
|
||||||
let task = tokio::spawn(
|
let task = tokio::spawn(
|
||||||
async move {
|
async move {
|
||||||
// Prefer the cancel handle if both are ready.
|
// Prefer the cancel handle if both are ready.
|
||||||
|
@ -191,6 +245,12 @@ where
|
||||||
};
|
};
|
||||||
|
|
||||||
let block = if let zn::Response::Blocks(blocks) = rsp {
|
let block = if let zn::Response::Blocks(blocks) = rsp {
|
||||||
|
assert_eq!(
|
||||||
|
blocks.len(),
|
||||||
|
1,
|
||||||
|
"wrong number of blocks in response to a single hash"
|
||||||
|
);
|
||||||
|
|
||||||
blocks
|
blocks
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.next()
|
.next()
|
||||||
|
@ -200,6 +260,85 @@ where
|
||||||
};
|
};
|
||||||
metrics::counter!("sync.downloaded.block.count", 1);
|
metrics::counter!("sync.downloaded.block.count", 1);
|
||||||
|
|
||||||
|
// Security & Performance: reject blocks that are too far ahead of our tip.
|
||||||
|
// Avoids denial of service attacks, and reduces wasted work on high blocks
|
||||||
|
// that will timeout before being verified.
|
||||||
|
let tip_height = latest_chain_tip.best_tip_height();
|
||||||
|
|
||||||
|
let max_lookahead_height = if let Some(tip_height) = tip_height {
|
||||||
|
// Scale the height limit with the lookahead limit,
|
||||||
|
// so users with low capacity or under DoS can reduce them both.
|
||||||
|
let lookahead = i32::try_from(
|
||||||
|
lookahead_limit + lookahead_limit / VERIFICATION_PIPELINE_SCALING_DIVISOR,
|
||||||
|
)
|
||||||
|
.expect("fits in i32");
|
||||||
|
(tip_height + lookahead).expect("tip is much lower than Height::MAX")
|
||||||
|
} else {
|
||||||
|
let genesis_lookahead =
|
||||||
|
u32::try_from(lookahead_limit - 1).expect("fits in u32");
|
||||||
|
block::Height(genesis_lookahead)
|
||||||
|
};
|
||||||
|
|
||||||
|
// Get the finalized tip height, assuming we're using the non-finalized state.
|
||||||
|
//
|
||||||
|
// It doesn't matter if we're a few blocks off here, because blocks this low
|
||||||
|
// are part of a fork with much less work. So they would be rejected anyway.
|
||||||
|
//
|
||||||
|
// And if we're still checkpointing, the checkpointer will reject blocks behind
|
||||||
|
// the finalized tip anyway.
|
||||||
|
//
|
||||||
|
// TODO: get the actual finalized tip height
|
||||||
|
let min_accepted_height = tip_height
|
||||||
|
.map(|tip_height| {
|
||||||
|
block::Height(tip_height.0.saturating_sub(zs::MAX_BLOCK_REORG_HEIGHT))
|
||||||
|
})
|
||||||
|
.unwrap_or(block::Height(0));
|
||||||
|
|
||||||
|
if let Some(block_height) = block.coinbase_height() {
|
||||||
|
if block_height > max_lookahead_height {
|
||||||
|
tracing::info!(
|
||||||
|
?hash,
|
||||||
|
?block_height,
|
||||||
|
?tip_height,
|
||||||
|
?max_lookahead_height,
|
||||||
|
lookahead_limit = ?lookahead_limit,
|
||||||
|
"synced block height too far ahead of the tip: dropped downloaded block"
|
||||||
|
);
|
||||||
|
metrics::counter!("sync.max.height.limit.dropped.block.count", 1);
|
||||||
|
|
||||||
|
// This error should be very rare during normal operation.
|
||||||
|
//
|
||||||
|
// We need to reset the syncer on this error,
|
||||||
|
// to allow the verifier and state to catch up,
|
||||||
|
// or prevent it following a bad chain.
|
||||||
|
//
|
||||||
|
// If we don't reset the syncer on this error,
|
||||||
|
// it will continue downloading blocks from a bad chain,
|
||||||
|
// (or blocks far ahead of the current state tip).
|
||||||
|
Err(BlockDownloadVerifyError::AboveLookaheadHeightLimit)?;
|
||||||
|
} else if block_height < min_accepted_height {
|
||||||
|
tracing::debug!(
|
||||||
|
?hash,
|
||||||
|
?block_height,
|
||||||
|
?tip_height,
|
||||||
|
?min_accepted_height,
|
||||||
|
behind_tip_limit = ?zs::MAX_BLOCK_REORG_HEIGHT,
|
||||||
|
"synced block height behind the finalized tip: dropped downloaded block"
|
||||||
|
);
|
||||||
|
metrics::counter!("gossip.min.height.limit.dropped.block.count", 1);
|
||||||
|
|
||||||
|
Err(BlockDownloadVerifyError::BehindTipHeightLimit)?;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tracing::info!(
|
||||||
|
?hash,
|
||||||
|
"synced block with no height: dropped downloaded block"
|
||||||
|
);
|
||||||
|
metrics::counter!("sync.no.height.dropped.block.count", 1);
|
||||||
|
|
||||||
|
Err(BlockDownloadVerifyError::InvalidHeight)?;
|
||||||
|
}
|
||||||
|
|
||||||
let rsp = verifier
|
let rsp = verifier
|
||||||
.ready()
|
.ready()
|
||||||
.await
|
.await
|
||||||
|
|
|
@ -1,10 +1,13 @@
|
||||||
use futures::future;
|
|
||||||
use std::sync::{
|
use std::sync::{
|
||||||
atomic::{AtomicU8, Ordering},
|
atomic::{AtomicU8, Ordering},
|
||||||
Arc,
|
Arc,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use futures::future;
|
||||||
use tokio::time::{timeout, Duration};
|
use tokio::time::{timeout, Duration};
|
||||||
|
|
||||||
|
use zebra_chain::parameters::Network;
|
||||||
|
|
||||||
use super::super::*;
|
use super::super::*;
|
||||||
use crate::config::ZebradConfig;
|
use crate::config::ZebradConfig;
|
||||||
|
|
||||||
|
@ -107,6 +110,9 @@ fn request_genesis_is_rate_limited() {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// create an empty latest chain tip
|
||||||
|
let (_sender, latest_chain_tip, _change) = zs::ChainTipSender::new(None, Network::Mainnet);
|
||||||
|
|
||||||
// create a verifier service that will always panic as it will never be called
|
// create a verifier service that will always panic as it will never be called
|
||||||
let verifier_service =
|
let verifier_service =
|
||||||
tower::service_fn(
|
tower::service_fn(
|
||||||
|
@ -117,8 +123,9 @@ fn request_genesis_is_rate_limited() {
|
||||||
let (mut chain_sync, _) = ChainSync::new(
|
let (mut chain_sync, _) = ChainSync::new(
|
||||||
&ZebradConfig::default(),
|
&ZebradConfig::default(),
|
||||||
peer_service,
|
peer_service,
|
||||||
state_service,
|
|
||||||
verifier_service,
|
verifier_service,
|
||||||
|
state_service,
|
||||||
|
latest_chain_tip,
|
||||||
);
|
);
|
||||||
|
|
||||||
// run `request_genesis()` with a timeout of 13 seconds
|
// run `request_genesis()` with a timeout of 13 seconds
|
||||||
|
|
Loading…
Reference in New Issue