From 9c15b14f86d2a2e349d12b7f21cb03501e876119 Mon Sep 17 00:00:00 2001 From: teor Date: Sun, 23 Apr 2023 23:41:38 +1000 Subject: [PATCH] fix(ci): Avoid inbound service overloads in tests (#6537) * Silence an extremely verbose error in zebra-consensus tests This disables around 10,000 logs like: 2023-04-18T02:46:28.441662Z WARN init{config=Config { checkpoint_sync: true, debug_skip_parameter_preload: false } network=Mainnet debug_skip_parameter_preload=true}: unexpected error: Closed in state request while verifying previous state checkpoints * Increase the outbound connection interval to 100ms * Start the inbound service as soon as possible, and the syncer last * Increase acceptance test time limits to get more debug info * Add more debug info to inbound service overload tracing messages --- zebra-consensus/src/chain.rs | 14 +++++- zebra-network/src/constants.rs | 6 +-- zebra-network/src/peer/connection.rs | 26 +++++++--- zebrad/src/commands/start.rs | 75 +++++++++++++++++----------- zebrad/tests/common/sync.rs | 10 +++- 5 files changed, 87 insertions(+), 44 deletions(-) diff --git a/zebra-consensus/src/chain.rs b/zebra-consensus/src/chain.rs index 7e7c16a8e..28f490cea 100644 --- a/zebra-consensus/src/chain.rs +++ b/zebra-consensus/src/chain.rs @@ -322,10 +322,20 @@ where unreachable!("unexpected response type: {response:?} from state request") } Err(e) => { + #[cfg(not(test))] tracing::warn!( "unexpected error: {e:?} in state request while verifying previous \ - state checkpoints" - ) + state checkpoints. Is Zebra shutting down?" + ); + // This error happens a lot in some tests. + // + // TODO: fix the tests so they don't cause this error, + // or change the tracing filter + #[cfg(test)] + tracing::debug!( + "unexpected error: {e:?} in state request while verifying previous \ + state checkpoints. Is Zebra shutting down?" + ); } } } diff --git a/zebra-network/src/constants.rs b/zebra-network/src/constants.rs index f7e7a6b0f..927f7cc2c 100644 --- a/zebra-network/src/constants.rs +++ b/zebra-network/src/constants.rs @@ -158,7 +158,7 @@ pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(59); /// connections are only initiated after this minimum time has elapsed. /// /// It also enforces a minimum per-peer reconnection interval, and filters failed outbound peers. -pub const MIN_OUTBOUND_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(50); +pub const MIN_OUTBOUND_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(100); /// The minimum time between _successful_ inbound peer connections, implemented by /// `peer_set::initialize::accept_inbound_connections`. @@ -398,8 +398,8 @@ mod tests { / (u32::try_from(MAX_ADDRS_IN_ADDRESS_BOOK).expect("fits in u32") * MIN_OUTBOUND_PEER_CONNECTION_INTERVAL) .as_secs() as f32 - >= 0.5, - "most peers should get a connection attempt in each connection interval", + >= 0.2, + "some peers should get a connection attempt in each connection interval", ); assert!( diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index d03b0acf2..066116a2f 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -388,7 +388,7 @@ impl Handler { pub(super) enum State { /// Awaiting a client request or a peer message. AwaitingRequest, - /// Awaiting a peer message we can interpret as a client request. + /// Awaiting a peer message we can interpret as a response to a client request. AwaitingResponse { handler: Handler, tx: MustUseClientResponseSender, @@ -451,7 +451,6 @@ pub struct Connection { /// The metadata for the connected peer `service`. /// /// This field is used for debugging. - #[allow(dead_code)] pub connection_info: Arc, /// The state of this connection's current request or response. @@ -1242,7 +1241,18 @@ where let rsp = match self.svc.call(req.clone()).await { Err(e) => { if e.is::() { - tracing::info!("inbound service is overloaded, closing connection"); + tracing::info!( + remote_user_agent = ?self.connection_info.remote.user_agent, + negotiated_version = ?self.connection_info.negotiated_version, + peer = ?self.metrics_label, + last_peer_state = ?self.last_metrics_state, + // TODO: remove this detailed debug info once #6506 is fixed + remote_height = ?self.connection_info.remote.start_height, + cached_addrs = ?self.cached_addrs.len(), + connection_state = ?self.state, + "inbound service is overloaded, closing connection", + ); + metrics::counter!("pool.closed.loadshed", 1); self.fail_with(PeerError::Overloaded); } else { @@ -1250,10 +1260,12 @@ where // them to disconnect, and we might be using them to sync blocks. // For similar reasons, we don't want to fail_with() here - we // only close the connection if the peer is doing something wrong. - info!(%e, - connection_state = ?self.state, - client_receiver = ?self.client_rx, - "error processing peer request"); + info!( + %e, + connection_state = ?self.state, + client_receiver = ?self.client_rx, + "error processing peer request", + ); } return; } diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 89f40f50c..7a1f95999 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -176,6 +176,22 @@ impl StartCmd { .buffer(mempool::downloads::MAX_INBOUND_CONCURRENCY) .service(mempool); + info!("fully initializing inbound peer request handler"); + // Fully start the inbound service as soon as possible + let setup_data = InboundSetupData { + address_book: address_book.clone(), + block_download_peer_set: peer_set.clone(), + block_verifier: chain_verifier.clone(), + mempool: mempool.clone(), + state, + latest_chain_tip: latest_chain_tip.clone(), + }; + setup_tx + .send(setup_data) + .map_err(|_| eyre!("could not send setup data to inbound service"))?; + // And give it time to clear its queue + tokio::task::yield_now().await; + // Launch RPC server let (rpc_task_handle, rpc_tx_queue_task_handle, rpc_server) = RpcServer::spawn( config.rpc, @@ -186,27 +202,14 @@ impl StartCmd { app_version(), mempool.clone(), read_only_state_service, - chain_verifier.clone(), + chain_verifier, sync_status.clone(), - address_book.clone(), + address_book, latest_chain_tip.clone(), config.network.network, ); - let setup_data = InboundSetupData { - address_book, - block_download_peer_set: peer_set.clone(), - block_verifier: chain_verifier, - mempool: mempool.clone(), - state, - latest_chain_tip: latest_chain_tip.clone(), - }; - setup_tx - .send(setup_data) - .map_err(|_| eyre!("could not send setup data to inbound service"))?; - - let syncer_task_handle = tokio::spawn(syncer.sync().in_current_span()); - + // Start concurrent tasks which don't add load to other tasks let block_gossip_task_handle = tokio::spawn( sync::gossip_best_tip_block_hashes( sync_status.clone(), @@ -216,29 +219,41 @@ impl StartCmd { .in_current_span(), ); - let mempool_crawler_task_handle = mempool::Crawler::spawn( - &config.mempool, - peer_set.clone(), - mempool.clone(), - sync_status.clone(), - chain_tip_change, - ); - let mempool_queue_checker_task_handle = mempool::QueueChecker::spawn(mempool.clone()); let tx_gossip_task_handle = tokio::spawn( - mempool::gossip_mempool_transaction_id(mempool_transaction_receiver, peer_set) - .in_current_span(), - ); - - let progress_task_handle = tokio::spawn( - show_block_chain_progress(config.network.network, latest_chain_tip, sync_status) + mempool::gossip_mempool_transaction_id(mempool_transaction_receiver, peer_set.clone()) .in_current_span(), ); let mut old_databases_task_handle = zebra_state::check_and_delete_old_databases(config.state.clone()); + let progress_task_handle = tokio::spawn( + show_block_chain_progress( + config.network.network, + latest_chain_tip, + sync_status.clone(), + ) + .in_current_span(), + ); + + // Give the inbound service more time to clear its queue, + // then start concurrent tasks that can add load to the inbound service + // (by opening more peer connections, so those peers send us requests) + tokio::task::yield_now().await; + + // The crawler only activates immediately in tests that use mempool debug mode + let mempool_crawler_task_handle = mempool::Crawler::spawn( + &config.mempool, + peer_set, + mempool.clone(), + sync_status, + chain_tip_change, + ); + + let syncer_task_handle = tokio::spawn(syncer.sync().in_current_span()); + info!("spawned initial Zebra tasks"); // TODO: put tasks into an ongoing FuturesUnordered and a startup FuturesUnordered? diff --git a/zebrad/tests/common/sync.rs b/zebrad/tests/common/sync.rs index 1d570196a..3d5920619 100644 --- a/zebrad/tests/common/sync.rs +++ b/zebrad/tests/common/sync.rs @@ -59,10 +59,16 @@ pub const STOP_ON_LOAD_TIMEOUT: Duration = Duration::from_secs(10); /// The maximum amount of time Zebra should take to sync a few hundred blocks. /// /// Usually the small checkpoint is much shorter than this. -pub const TINY_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(120); +// +// Tempoaraily increased to 4 minutes to get more diagnostic info in failed tests. +// TODO: reduce to 120 when #6506 is fixed +pub const TINY_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(240); /// The maximum amount of time Zebra should take to sync a thousand blocks. -pub const LARGE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(180); +// +// Tempoaraily increased to 4 minutes to get more diagnostic info in failed tests. +// TODO: reduce to 180 when #6506 is fixed +pub const LARGE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(240); /// The maximum time to wait for Zebrad to synchronize up to the chain tip starting from a /// partially synchronized state.