fix(network): increase state concurrency and syncer lookahead (#3455)

* fix(state): set state concurrency based on other services' concurrency

* fix(sync): increase the sync downloader lookahead limit

It seems like the recent tokio upgrade made this code even more efficient,
so on testnet we can have around 6000 blocks in flight.

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
teor 2022-02-03 08:44:15 +10:00 committed by GitHub
parent 4d2b3768c7
commit fa071562fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 35 additions and 16 deletions

View File

@ -52,6 +52,8 @@
//! * runs in the background and gossips newly added mempool transactions
//! to peers
use std::cmp::max;
use abscissa_core::{config, Command, FrameworkError, Options, Runnable};
use color_eyre::eyre::{eyre, Report};
use futures::FutureExt;
@ -61,7 +63,7 @@ use tracing_futures::Instrument;
use crate::{
components::{
inbound::InboundSetupData,
inbound::{self, InboundSetupData},
mempool::{self, Mempool},
sync,
tokio::{RuntimeRun, TokioComponent},
@ -87,7 +89,9 @@ impl StartCmd {
info!("initializing node state");
let (state_service, latest_chain_tip, chain_tip_change) =
zebra_state::init(config.state.clone(), config.network.network);
let state = ServiceBuilder::new().buffer(20).service(state_service);
let state = ServiceBuilder::new()
.buffer(Self::state_buffer_bound())
.service(state_service);
info!("initializing network");
// The service that our node uses to respond to requests by peers. The
@ -96,7 +100,7 @@ impl StartCmd {
let (setup_tx, setup_rx) = oneshot::channel();
let inbound = ServiceBuilder::new()
.load_shed()
.buffer(20)
.buffer(inbound::downloads::MAX_INBOUND_CONCURRENCY)
.service(Inbound::new(setup_rx));
let (peer_set, address_book) =
@ -132,7 +136,9 @@ impl StartCmd {
chain_tip_change.clone(),
);
let mempool = BoxService::new(mempool);
let mempool = ServiceBuilder::new().buffer(20).service(mempool);
let mempool = ServiceBuilder::new()
.buffer(mempool::downloads::MAX_INBOUND_CONCURRENCY)
.service(mempool);
let setup_data = InboundSetupData {
address_book,
@ -253,6 +259,22 @@ impl StartCmd {
exit_status
}
/// Returns the bound for the state service buffer,
/// based on the configurations of the services that use the state concurrently.
fn state_buffer_bound() -> usize {
let config = app_config().clone();
// TODO: do we also need to account for concurrent use across services?
// we could multiply the maximum by 3/2, or add a fixed constant
max(
config.sync.max_concurrent_block_requests,
max(
inbound::downloads::MAX_INBOUND_CONCURRENCY,
mempool::downloads::MAX_INBOUND_CONCURRENCY,
),
)
}
}
impl Runnable for StartCmd {

View File

@ -36,7 +36,8 @@ use super::{
sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT},
};
mod downloads;
pub(crate) mod downloads;
#[cfg(test)]
mod tests;

View File

@ -1,3 +1,5 @@
//! A download stream that handles gossiped blocks from peers.
use std::{
collections::HashMap,
convert::TryFrom,
@ -47,7 +49,7 @@ type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// Since Zebra keeps an `inv` index, inbound downloads for malicious blocks
/// will be directed to the malicious node that originally gossiped the hash.
/// Therefore, this attack can be carried out by a single malicious node.
const MAX_INBOUND_CONCURRENCY: usize = 20;
pub const MAX_INBOUND_CONCURRENCY: usize = 20;
/// The action taken in response to a peer's gossiped block hash.
pub enum DownloadAction {

View File

@ -25,18 +25,13 @@ use zebra_chain::{
use zebra_network as zn;
use zebra_state as zs;
use super::{DEFAULT_LOOKAHEAD_LIMIT, MAX_TIPS_RESPONSE_HASH_COUNT};
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// A divisor used to calculate the extra number of blocks we allow in the
/// A multiplier used to calculate the extra number of blocks we allow in the
/// verifier and state pipelines, on top of the lookahead limit.
///
/// The extra number of blocks is calculated using
/// `lookahead_limit / VERIFICATION_PIPELINE_SCALING_DIVISOR`.
///
/// For the default lookahead limit, the extra number of blocks is
/// `4 * MAX_TIPS_RESPONSE_HASH_COUNT`.
/// `lookahead_limit * VERIFICATION_PIPELINE_SCALING_MULTIPLIER`.
///
/// This allows the verifier and state queues to hold a few extra tips responses worth of blocks,
/// even if the syncer queue is full. Any unused capacity is shared between both queues.
@ -48,8 +43,7 @@ type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// the rest of the capacity is reserved for the other queues.
/// There is no reserved capacity for the syncer queue:
/// if the other queues stay full, the syncer will eventually time out and reset.
const VERIFICATION_PIPELINE_SCALING_DIVISOR: usize =
DEFAULT_LOOKAHEAD_LIMIT / (4 * MAX_TIPS_RESPONSE_HASH_COUNT);
const VERIFICATION_PIPELINE_SCALING_MULTIPLIER: usize = 2;
#[derive(Copy, Clone, Debug)]
pub(super) struct AlwaysHedge;
@ -282,7 +276,7 @@ where
// Scale the height limit with the lookahead limit,
// so users with low capacity or under DoS can reduce them both.
let lookahead = i32::try_from(
lookahead_limit + lookahead_limit / VERIFICATION_PIPELINE_SCALING_DIVISOR,
lookahead_limit + lookahead_limit * VERIFICATION_PIPELINE_SCALING_MULTIPLIER,
)
.expect("fits in i32");
(tip_height + lookahead).expect("tip is much lower than Height::MAX")