fix(ci): Avoid inbound service overloads in tests (#6537)
* Silence an extremely verbose error in zebra-consensus tests This disables around 10,000 logs like: 2023-04-18T02:46:28.441662Z WARN init{config=Config { checkpoint_sync: true, debug_skip_parameter_preload: false } network=Mainnet debug_skip_parameter_preload=true}: unexpected error: Closed in state request while verifying previous state checkpoints * Increase the outbound connection interval to 100ms * Start the inbound service as soon as possible, and the syncer last * Increase acceptance test time limits to get more debug info * Add more debug info to inbound service overload tracing messages
This commit is contained in:
parent
1e1e2348bf
commit
9c15b14f86
|
@ -322,10 +322,20 @@ where
|
|||
unreachable!("unexpected response type: {response:?} from state request")
|
||||
}
|
||||
Err(e) => {
|
||||
#[cfg(not(test))]
|
||||
tracing::warn!(
|
||||
"unexpected error: {e:?} in state request while verifying previous \
|
||||
state checkpoints"
|
||||
)
|
||||
state checkpoints. Is Zebra shutting down?"
|
||||
);
|
||||
// This error happens a lot in some tests.
|
||||
//
|
||||
// TODO: fix the tests so they don't cause this error,
|
||||
// or change the tracing filter
|
||||
#[cfg(test)]
|
||||
tracing::debug!(
|
||||
"unexpected error: {e:?} in state request while verifying previous \
|
||||
state checkpoints. Is Zebra shutting down?"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -158,7 +158,7 @@ pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(59);
|
|||
/// connections are only initiated after this minimum time has elapsed.
|
||||
///
|
||||
/// It also enforces a minimum per-peer reconnection interval, and filters failed outbound peers.
|
||||
pub const MIN_OUTBOUND_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(50);
|
||||
pub const MIN_OUTBOUND_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(100);
|
||||
|
||||
/// The minimum time between _successful_ inbound peer connections, implemented by
|
||||
/// `peer_set::initialize::accept_inbound_connections`.
|
||||
|
@ -398,8 +398,8 @@ mod tests {
|
|||
/ (u32::try_from(MAX_ADDRS_IN_ADDRESS_BOOK).expect("fits in u32")
|
||||
* MIN_OUTBOUND_PEER_CONNECTION_INTERVAL)
|
||||
.as_secs() as f32
|
||||
>= 0.5,
|
||||
"most peers should get a connection attempt in each connection interval",
|
||||
>= 0.2,
|
||||
"some peers should get a connection attempt in each connection interval",
|
||||
);
|
||||
|
||||
assert!(
|
||||
|
|
|
@ -388,7 +388,7 @@ impl Handler {
|
|||
pub(super) enum State {
|
||||
/// Awaiting a client request or a peer message.
|
||||
AwaitingRequest,
|
||||
/// Awaiting a peer message we can interpret as a client request.
|
||||
/// Awaiting a peer message we can interpret as a response to a client request.
|
||||
AwaitingResponse {
|
||||
handler: Handler,
|
||||
tx: MustUseClientResponseSender,
|
||||
|
@ -451,7 +451,6 @@ pub struct Connection<S, Tx> {
|
|||
/// The metadata for the connected peer `service`.
|
||||
///
|
||||
/// This field is used for debugging.
|
||||
#[allow(dead_code)]
|
||||
pub connection_info: Arc<ConnectionInfo>,
|
||||
|
||||
/// The state of this connection's current request or response.
|
||||
|
@ -1242,7 +1241,18 @@ where
|
|||
let rsp = match self.svc.call(req.clone()).await {
|
||||
Err(e) => {
|
||||
if e.is::<Overloaded>() {
|
||||
tracing::info!("inbound service is overloaded, closing connection");
|
||||
tracing::info!(
|
||||
remote_user_agent = ?self.connection_info.remote.user_agent,
|
||||
negotiated_version = ?self.connection_info.negotiated_version,
|
||||
peer = ?self.metrics_label,
|
||||
last_peer_state = ?self.last_metrics_state,
|
||||
// TODO: remove this detailed debug info once #6506 is fixed
|
||||
remote_height = ?self.connection_info.remote.start_height,
|
||||
cached_addrs = ?self.cached_addrs.len(),
|
||||
connection_state = ?self.state,
|
||||
"inbound service is overloaded, closing connection",
|
||||
);
|
||||
|
||||
metrics::counter!("pool.closed.loadshed", 1);
|
||||
self.fail_with(PeerError::Overloaded);
|
||||
} else {
|
||||
|
@ -1250,10 +1260,12 @@ where
|
|||
// them to disconnect, and we might be using them to sync blocks.
|
||||
// For similar reasons, we don't want to fail_with() here - we
|
||||
// only close the connection if the peer is doing something wrong.
|
||||
info!(%e,
|
||||
info!(
|
||||
%e,
|
||||
connection_state = ?self.state,
|
||||
client_receiver = ?self.client_rx,
|
||||
"error processing peer request");
|
||||
"error processing peer request",
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -176,6 +176,22 @@ impl StartCmd {
|
|||
.buffer(mempool::downloads::MAX_INBOUND_CONCURRENCY)
|
||||
.service(mempool);
|
||||
|
||||
info!("fully initializing inbound peer request handler");
|
||||
// Fully start the inbound service as soon as possible
|
||||
let setup_data = InboundSetupData {
|
||||
address_book: address_book.clone(),
|
||||
block_download_peer_set: peer_set.clone(),
|
||||
block_verifier: chain_verifier.clone(),
|
||||
mempool: mempool.clone(),
|
||||
state,
|
||||
latest_chain_tip: latest_chain_tip.clone(),
|
||||
};
|
||||
setup_tx
|
||||
.send(setup_data)
|
||||
.map_err(|_| eyre!("could not send setup data to inbound service"))?;
|
||||
// And give it time to clear its queue
|
||||
tokio::task::yield_now().await;
|
||||
|
||||
// Launch RPC server
|
||||
let (rpc_task_handle, rpc_tx_queue_task_handle, rpc_server) = RpcServer::spawn(
|
||||
config.rpc,
|
||||
|
@ -186,27 +202,14 @@ impl StartCmd {
|
|||
app_version(),
|
||||
mempool.clone(),
|
||||
read_only_state_service,
|
||||
chain_verifier.clone(),
|
||||
chain_verifier,
|
||||
sync_status.clone(),
|
||||
address_book.clone(),
|
||||
address_book,
|
||||
latest_chain_tip.clone(),
|
||||
config.network.network,
|
||||
);
|
||||
|
||||
let setup_data = InboundSetupData {
|
||||
address_book,
|
||||
block_download_peer_set: peer_set.clone(),
|
||||
block_verifier: chain_verifier,
|
||||
mempool: mempool.clone(),
|
||||
state,
|
||||
latest_chain_tip: latest_chain_tip.clone(),
|
||||
};
|
||||
setup_tx
|
||||
.send(setup_data)
|
||||
.map_err(|_| eyre!("could not send setup data to inbound service"))?;
|
||||
|
||||
let syncer_task_handle = tokio::spawn(syncer.sync().in_current_span());
|
||||
|
||||
// Start concurrent tasks which don't add load to other tasks
|
||||
let block_gossip_task_handle = tokio::spawn(
|
||||
sync::gossip_best_tip_block_hashes(
|
||||
sync_status.clone(),
|
||||
|
@ -216,29 +219,41 @@ impl StartCmd {
|
|||
.in_current_span(),
|
||||
);
|
||||
|
||||
let mempool_crawler_task_handle = mempool::Crawler::spawn(
|
||||
&config.mempool,
|
||||
peer_set.clone(),
|
||||
mempool.clone(),
|
||||
sync_status.clone(),
|
||||
chain_tip_change,
|
||||
);
|
||||
|
||||
let mempool_queue_checker_task_handle = mempool::QueueChecker::spawn(mempool.clone());
|
||||
|
||||
let tx_gossip_task_handle = tokio::spawn(
|
||||
mempool::gossip_mempool_transaction_id(mempool_transaction_receiver, peer_set)
|
||||
.in_current_span(),
|
||||
);
|
||||
|
||||
let progress_task_handle = tokio::spawn(
|
||||
show_block_chain_progress(config.network.network, latest_chain_tip, sync_status)
|
||||
mempool::gossip_mempool_transaction_id(mempool_transaction_receiver, peer_set.clone())
|
||||
.in_current_span(),
|
||||
);
|
||||
|
||||
let mut old_databases_task_handle =
|
||||
zebra_state::check_and_delete_old_databases(config.state.clone());
|
||||
|
||||
let progress_task_handle = tokio::spawn(
|
||||
show_block_chain_progress(
|
||||
config.network.network,
|
||||
latest_chain_tip,
|
||||
sync_status.clone(),
|
||||
)
|
||||
.in_current_span(),
|
||||
);
|
||||
|
||||
// Give the inbound service more time to clear its queue,
|
||||
// then start concurrent tasks that can add load to the inbound service
|
||||
// (by opening more peer connections, so those peers send us requests)
|
||||
tokio::task::yield_now().await;
|
||||
|
||||
// The crawler only activates immediately in tests that use mempool debug mode
|
||||
let mempool_crawler_task_handle = mempool::Crawler::spawn(
|
||||
&config.mempool,
|
||||
peer_set,
|
||||
mempool.clone(),
|
||||
sync_status,
|
||||
chain_tip_change,
|
||||
);
|
||||
|
||||
let syncer_task_handle = tokio::spawn(syncer.sync().in_current_span());
|
||||
|
||||
info!("spawned initial Zebra tasks");
|
||||
|
||||
// TODO: put tasks into an ongoing FuturesUnordered and a startup FuturesUnordered?
|
||||
|
|
|
@ -59,10 +59,16 @@ pub const STOP_ON_LOAD_TIMEOUT: Duration = Duration::from_secs(10);
|
|||
/// The maximum amount of time Zebra should take to sync a few hundred blocks.
|
||||
///
|
||||
/// Usually the small checkpoint is much shorter than this.
|
||||
pub const TINY_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(120);
|
||||
//
|
||||
// Tempoaraily increased to 4 minutes to get more diagnostic info in failed tests.
|
||||
// TODO: reduce to 120 when #6506 is fixed
|
||||
pub const TINY_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(240);
|
||||
|
||||
/// The maximum amount of time Zebra should take to sync a thousand blocks.
|
||||
pub const LARGE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(180);
|
||||
//
|
||||
// Tempoaraily increased to 4 minutes to get more diagnostic info in failed tests.
|
||||
// TODO: reduce to 180 when #6506 is fixed
|
||||
pub const LARGE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(240);
|
||||
|
||||
/// The maximum time to wait for Zebrad to synchronize up to the chain tip starting from a
|
||||
/// partially synchronized state.
|
||||
|
|
Loading…
Reference in New Issue