From d689e7344b57846ac9257cf5235e1ca44d88ce05 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 17 Nov 2023 05:53:24 +1000 Subject: [PATCH] fix(net): Fix potential network hangs, and reduce code complexity (#7859) * Refactor return type of poll_discover() * Simplify poll_ready() by removing preselected peers * Fix peer set readiness check * Pass task context correctly to background tasks * Make poll_discover() return Pending * Make poll_inventory() return Pending * Make poll_unready() return Poll::Pending * Simplify with futures::ready!() and ? * When there are no peers, wake on newly ready peers, or new peers, in that order * Preserve the original waker when there are no unready peers * Fix polling docs and remove unnecessary code * Make sure we're ignoring Poll::Pending not Result * Make panic checking method names clearer * Fix connection Client task wakeups and error handling * Cleanup connection panic handling and add wakeup docs * Fix connection client task wakeups to prevent hangs * Simplify error and pending handling * Clarify inventory set behaviour * Define peer set poll_* methods so they return Ok if they do something * Clarify documentation Co-authored-by: Arya * Fix test that depended on preselected peers * Check ready peers for errors before sending requests to them * Fix a hanging test by not waiting for irrelevant actions * Only remove cancel handles when they are required * fix incorrect panic on termination setting * Clarify method comments Co-authored-by: Arya --------- Co-authored-by: Arya --- zebra-chain/src/diagnostic/task.rs | 70 ++- zebra-chain/src/diagnostic/task/future.rs | 47 +- zebra-chain/src/diagnostic/task/thread.rs | 73 ++- zebra-network/src/peer/client.rs | 137 ++++-- zebra-network/src/peer/error.rs | 5 + zebra-network/src/peer/load_tracked_client.rs | 1 + .../src/peer_set/inventory_registry.rs | 53 ++- .../peer_set/inventory_registry/tests/prop.rs | 16 +- .../src/peer_set/inventory_registry/update.rs | 14 +- zebra-network/src/peer_set/set.rs | 444 +++++++++++------- .../src/peer_set/set/tests/vectors.rs | 3 +- .../finalized_state/disk_format/upgrade.rs | 2 +- 12 files changed, 593 insertions(+), 272 deletions(-) diff --git a/zebra-chain/src/diagnostic/task.rs b/zebra-chain/src/diagnostic/task.rs index 2d43f6955..bbdb72fec 100644 --- a/zebra-chain/src/diagnostic/task.rs +++ b/zebra-chain/src/diagnostic/task.rs @@ -9,7 +9,7 @@ pub mod future; pub mod thread; /// A trait that checks a task's return value for panics. -pub trait CheckForPanics { +pub trait CheckForPanics: Sized { /// The output type, after removing panics from `Self`. type Output; @@ -20,11 +20,34 @@ pub trait CheckForPanics { /// /// If `self` contains a panic payload or an unexpected termination. #[track_caller] - fn check_for_panics(self) -> Self::Output; + fn panic_if_task_has_finished(self) -> Self::Output { + self.check_for_panics_with(true) + } + + /// Check if `self` contains a panic payload, then panic. + /// Otherwise, return the non-panic part of `self`. + /// + /// # Panics + /// + /// If `self` contains a panic payload. + #[track_caller] + fn panic_if_task_has_panicked(self) -> Self::Output { + self.check_for_panics_with(false) + } + + /// Check if `self` contains a panic payload, then panic. Also panics if + /// `panic_on_unexpected_termination` is true, and `self` is an unexpected termination. + /// Otherwise, return the non-panic part of `self`. + /// + /// # Panics + /// + /// If `self` contains a panic payload, or if we're panicking on unexpected terminations. + #[track_caller] + fn check_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output; } /// A trait that waits for a task to finish, then handles panics and cancellations. -pub trait WaitForPanics { +pub trait WaitForPanics: Sized { /// The underlying task output, after removing panics and unwrapping termination results. type Output; @@ -43,5 +66,44 @@ pub trait WaitForPanics { /// /// If `self` contains an expected termination, and we're shutting down anyway. #[track_caller] - fn wait_for_panics(self) -> Self::Output; + fn wait_and_panic_on_unexpected_termination(self) -> Self::Output { + self.wait_for_panics_with(true) + } + + /// Waits for `self` to finish, then check if its output is: + /// - a panic payload: resume that panic, + /// - a task termination: hang waiting for shutdown. + /// + /// Otherwise, returns the task return value of `self`. + /// + /// # Panics + /// + /// If `self` contains a panic payload. + /// + /// # Hangs + /// + /// If `self` contains a task termination. + #[track_caller] + fn wait_for_panics(self) -> Self::Output { + self.wait_for_panics_with(false) + } + + /// Waits for `self` to finish, then check if its output is: + /// - a panic payload: resume that panic, + /// - an unexpected termination: + /// - if `panic_on_unexpected_termination` is true, panic with that error, + /// - otherwise, hang waiting for shutdown, + /// - an expected termination: hang waiting for shutdown. + /// + /// Otherwise, returns the task return value of `self`. + /// + /// # Panics + /// + /// If `self` contains a panic payload, or if we're panicking on unexpected terminations. + /// + /// # Hangs + /// + /// If `self` contains an expected or ignored termination, and we're shutting down anyway. + #[track_caller] + fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output; } diff --git a/zebra-chain/src/diagnostic/task/future.rs b/zebra-chain/src/diagnostic/task/future.rs index 431b13ed9..a119d7c62 100644 --- a/zebra-chain/src/diagnostic/task/future.rs +++ b/zebra-chain/src/diagnostic/task/future.rs @@ -18,15 +18,18 @@ impl CheckForPanics for Result { type Output = Result; /// Returns the task result if the task finished normally. - /// Otherwise, resumes any panics, logs unexpected errors, and ignores any expected errors. + /// Otherwise, resumes any panics, and ignores any expected errors. + /// Handles unexpected errors based on `panic_on_unexpected_termination`. /// /// If the task finished normally, returns `Some(T)`. /// If the task was cancelled, returns `None`. #[track_caller] - fn check_for_panics(self) -> Self::Output { + fn check_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output { match self { Ok(task_output) => Ok(task_output), - Err(join_error) => Err(join_error.check_for_panics()), + Err(join_error) => { + Err(join_error.check_for_panics_with(panic_on_unexpected_termination)) + } } } } @@ -38,22 +41,26 @@ impl CheckForPanics for JoinError { /// Resume any panics and panic on unexpected task cancellations. /// Always returns [`JoinError::Cancelled`](JoinError::is_cancelled). #[track_caller] - fn check_for_panics(self) -> Self::Output { + fn check_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output { match self.try_into_panic() { Ok(panic_payload) => panic::resume_unwind(panic_payload), // We could ignore this error, but then we'd have to change the return type. - Err(task_cancelled) if is_shutting_down() => { - debug!( - ?task_cancelled, - "ignoring cancelled task because Zebra is shutting down" - ); - - task_cancelled - } - Err(task_cancelled) => { - panic!("task cancelled during normal Zebra operation: {task_cancelled:?}"); + if !panic_on_unexpected_termination { + debug!(?task_cancelled, "ignoring expected task termination"); + + task_cancelled + } else if is_shutting_down() { + debug!( + ?task_cancelled, + "ignoring task termination because Zebra is shutting down" + ); + + task_cancelled + } else { + panic!("task unexpectedly exited with: {:?}", task_cancelled) + } } } } @@ -67,7 +74,9 @@ where /// Returns a future which waits for `self` to finish, then checks if its output is: /// - a panic payload: resume that panic, - /// - an unexpected termination: panic with that error, + /// - an unexpected termination: + /// - if `panic_on_unexpected_termination` is true, panic with that error, + /// - otherwise, hang waiting for shutdown, /// - an expected termination: hang waiting for shutdown. /// /// Otherwise, returns the task return value of `self`. @@ -79,11 +88,15 @@ where /// # Hangs /// /// If `self` contains an expected termination, and we're shutting down anyway. + /// If we're ignoring terminations because `panic_on_unexpected_termination` is `false`. /// Futures hang by returning `Pending` and not setting a waker, so this uses minimal resources. #[track_caller] - fn wait_for_panics(self) -> Self::Output { + fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output { async move { - match self.await.check_for_panics() { + match self + .await + .check_for_panics_with(panic_on_unexpected_termination) + { Ok(task_output) => task_output, Err(_expected_cancel_error) => future::pending().await, } diff --git a/zebra-chain/src/diagnostic/task/thread.rs b/zebra-chain/src/diagnostic/task/thread.rs index 84df3fac4..517aef014 100644 --- a/zebra-chain/src/diagnostic/task/thread.rs +++ b/zebra-chain/src/diagnostic/task/thread.rs @@ -8,19 +8,44 @@ use std::{ thread::{self, JoinHandle}, }; +use crate::shutdown::is_shutting_down; + use super::{CheckForPanics, WaitForPanics}; -impl CheckForPanics for thread::Result { +impl CheckForPanics for thread::Result +where + T: std::fmt::Debug, +{ type Output = T; - /// Panics if the thread panicked. + /// # Panics + /// + /// - if the thread panicked. + /// - if the thread is cancelled, `panic_on_unexpected_termination` is true, and + /// Zebra is not shutting down. /// /// Threads can't be cancelled except by using a panic, so there are no thread errors here. + /// `panic_on_unexpected_termination` is #[track_caller] - fn check_for_panics(self) -> Self::Output { + fn check_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output { match self { // The value returned by the thread when it finished. - Ok(thread_output) => thread_output, + Ok(thread_output) => { + if !panic_on_unexpected_termination { + debug!(?thread_output, "ignoring expected thread exit"); + + thread_output + } else if is_shutting_down() { + debug!( + ?thread_output, + "ignoring thread exit because Zebra is shutting down" + ); + + thread_output + } else { + panic!("thread unexpectedly exited with: {:?}", thread_output) + } + } // A thread error is always a panic. Err(panic_payload) => panic::resume_unwind(panic_payload), @@ -28,17 +53,24 @@ impl CheckForPanics for thread::Result { } } -impl WaitForPanics for JoinHandle { +impl WaitForPanics for JoinHandle +where + T: std::fmt::Debug, +{ type Output = T; /// Waits for the thread to finish, then panics if the thread panicked. #[track_caller] - fn wait_for_panics(self) -> Self::Output { - self.join().check_for_panics() + fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output { + self.join() + .check_for_panics_with(panic_on_unexpected_termination) } } -impl WaitForPanics for Arc> { +impl WaitForPanics for Arc> +where + T: std::fmt::Debug, +{ type Output = Option; /// If this is the final `Arc`, waits for the thread to finish, then panics if the thread @@ -46,7 +78,7 @@ impl WaitForPanics for Arc> { /// /// If this is not the final `Arc`, drops the handle and immediately returns `None`. #[track_caller] - fn wait_for_panics(self) -> Self::Output { + fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output { // If we are the last Arc with a reference to this handle, // we can wait for it and propagate any panics. // @@ -56,14 +88,17 @@ impl WaitForPanics for Arc> { // This is more readable as an expanded statement. #[allow(clippy::manual_map)] if let Some(handle) = Arc::into_inner(self) { - Some(handle.wait_for_panics()) + Some(handle.wait_for_panics_with(panic_on_unexpected_termination)) } else { None } } } -impl CheckForPanics for &mut Option>> { +impl CheckForPanics for &mut Option>> +where + T: std::fmt::Debug, +{ type Output = Option; /// If this is the final `Arc`, checks if the thread has finished, then panics if the thread @@ -71,14 +106,14 @@ impl CheckForPanics for &mut Option>> { /// /// If the thread has not finished, or this is not the final `Arc`, returns `None`. #[track_caller] - fn check_for_panics(self) -> Self::Output { + fn check_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output { let handle = self.take()?; if handle.is_finished() { // This is the same as calling `self.wait_for_panics()`, but we can't do that, // because we've taken `self`. #[allow(clippy::manual_map)] - return handle.wait_for_panics(); + return handle.wait_for_panics_with(panic_on_unexpected_termination); } *self = Some(handle); @@ -87,7 +122,10 @@ impl CheckForPanics for &mut Option>> { } } -impl WaitForPanics for &mut Option>> { +impl WaitForPanics for &mut Option>> +where + T: std::fmt::Debug, +{ type Output = Option; /// If this is the final `Arc`, waits for the thread to finish, then panics if the thread @@ -95,10 +133,13 @@ impl WaitForPanics for &mut Option>> { /// /// If this is not the final `Arc`, drops the handle and returns `None`. #[track_caller] - fn wait_for_panics(self) -> Self::Output { + fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output { // This is more readable as an expanded statement. #[allow(clippy::manual_map)] - if let Some(output) = self.take()?.wait_for_panics() { + if let Some(output) = self + .take()? + .wait_for_panics_with(panic_on_unexpected_termination) + { Some(output) } else { // Some other task has a reference, so we should give up ours to let them use it. diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index 699402754..8dd81ed32 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -18,6 +18,8 @@ use futures::{ use tokio::{sync::broadcast, task::JoinHandle}; use tower::Service; +use zebra_chain::diagnostic::task::CheckForPanics; + use crate::{ peer::{ error::{AlreadyErrored, ErrorSlot, PeerError, SharedPeerError}, @@ -421,8 +423,15 @@ impl MissingInventoryCollector { impl Client { /// Check if this connection's heartbeat task has exited. + /// + /// Returns an error if the heartbeat task exited. Otherwise, schedules the client task for + /// wakeup when the heartbeat task finishes, or the channel closes, and returns `Pending`. + /// + /// # Panics + /// + /// If the heartbeat task panicked. #[allow(clippy::unwrap_in_result)] - fn check_heartbeat(&mut self, cx: &mut Context<'_>) -> Result<(), SharedPeerError> { + fn poll_heartbeat(&mut self, cx: &mut Context<'_>) -> Poll> { let is_canceled = self .shutdown_tx .as_mut() @@ -430,17 +439,19 @@ impl Client { .poll_canceled(cx) .is_ready(); - if is_canceled { - return self.set_task_exited_error( - "heartbeat", - PeerError::HeartbeatTaskExited("Task was cancelled".to_string()), - ); - } - - match self.heartbeat_task.poll_unpin(cx) { + let result = match self.heartbeat_task.poll_unpin(cx) { Poll::Pending => { - // Heartbeat task is still running. - Ok(()) + // The heartbeat task returns `Pending` while it continues to run. + // But if it has dropped its receiver, it is shutting down, and we should also shut down. + if is_canceled { + self.set_task_exited_error( + "heartbeat", + PeerError::HeartbeatTaskExited("Task was cancelled".to_string()), + ) + } else { + // Heartbeat task is still running. + return Poll::Pending; + } } Poll::Ready(Ok(Ok(_))) => { // Heartbeat task stopped unexpectedly, without panic or error. @@ -459,6 +470,9 @@ impl Client { ) } Poll::Ready(Err(error)) => { + // Heartbeat task panicked. + let error = error.panic_if_task_has_panicked(); + // Heartbeat task was cancelled. if error.is_cancelled() { self.set_task_exited_error( @@ -466,11 +480,7 @@ impl Client { PeerError::HeartbeatTaskExited("Task was cancelled".to_string()), ) } - // Heartbeat task stopped with panic. - else if error.is_panic() { - panic!("heartbeat task has panicked: {error}"); - } - // Heartbeat task stopped with error. + // Heartbeat task stopped with another kind of task error. else { self.set_task_exited_error( "heartbeat", @@ -478,25 +488,48 @@ impl Client { ) } } - } + }; + + Poll::Ready(result) } - /// Check if the connection's task has exited. - fn check_connection(&mut self, context: &mut Context<'_>) -> Result<(), SharedPeerError> { - match self.connection_task.poll_unpin(context) { - Poll::Pending => { - // Connection task is still running. - Ok(()) - } - Poll::Ready(Ok(())) => { + /// Check if the connection's request/response task has exited. + /// + /// Returns an error if the connection task exited. Otherwise, schedules the client task for + /// wakeup when the connection task finishes, and returns `Pending`. + /// + /// # Panics + /// + /// If the connection task panicked. + fn poll_connection(&mut self, context: &mut Context<'_>) -> Poll> { + // Return `Pending` if the connection task is still running. + let result = match ready!(self.connection_task.poll_unpin(context)) { + Ok(()) => { // Connection task stopped unexpectedly, without panicking. self.set_task_exited_error("connection", PeerError::ConnectionTaskExited) } - Poll::Ready(Err(error)) => { - // Connection task stopped unexpectedly with a panic. - panic!("connection task has panicked: {error}"); + Err(error) => { + // Connection task panicked. + let error = error.panic_if_task_has_panicked(); + + // Connection task was cancelled. + if error.is_cancelled() { + self.set_task_exited_error( + "connection", + PeerError::HeartbeatTaskExited("Task was cancelled".to_string()), + ) + } + // Connection task stopped with another kind of task error. + else { + self.set_task_exited_error( + "connection", + PeerError::HeartbeatTaskExited(error.to_string()), + ) + } } - } + }; + + Poll::Ready(result) } /// Properly update the error slot after a background task has unexpectedly stopped. @@ -522,6 +555,9 @@ impl Client { } /// Poll for space in the shared request sender channel. + /// + /// Returns an error if the sender channel is closed. If there is no space in the channel, + /// returns `Pending`, and schedules the task for wakeup when there is space available. fn poll_request(&mut self, cx: &mut Context<'_>) -> Poll> { let server_result = ready!(self.server_tx.poll_ready(cx)); if server_result.is_err() { @@ -536,6 +572,33 @@ impl Client { } } + /// Poll for space in the shared request sender channel, and for errors in the connection tasks. + /// + /// Returns an error if the sender channel is closed, or the heartbeat or connection tasks have + /// terminated. If there is no space in the channel, returns `Pending`, and schedules the task + /// for wakeup when there is space available, or one of the tasks terminates. + fn poll_client(&mut self, cx: &mut Context<'_>) -> Poll> { + // # Correctness + // + // The current task must be scheduled for wakeup every time we return + // `Poll::Pending`. + // + // `poll_heartbeat()` and `poll_connection()` schedule the client task for wakeup + // if either task exits, or if the heartbeat task drops the cancel handle. + // + //`ready!` returns `Poll::Pending` when `server_tx` is unready, and + // schedules this task for wakeup. + // + // It's ok to exit early and skip wakeups when there is an error, because the connection + // and its tasks are shut down immediately on error. + + let _heartbeat_pending: Poll<()> = self.poll_heartbeat(cx)?; + let _connection_pending: Poll<()> = self.poll_connection(cx)?; + + // We're only pending if the sender channel is full. + self.poll_request(cx) + } + /// Shut down the resources held by the client half of this peer connection. /// /// Stops further requests to the remote peer, and stops the heartbeat task. @@ -566,20 +629,16 @@ impl Service for Client { // The current task must be scheduled for wakeup every time we return // `Poll::Pending`. // - // `check_heartbeat` and `check_connection` schedule the client task for wakeup - // if either task exits, or if the heartbeat task drops the cancel handle. + // `poll_client()` schedules the client task for wakeup if the sender channel has space, + // either connection task exits, or if the heartbeat task drops the cancel handle. + + // Check all the tasks and channels. // //`ready!` returns `Poll::Pending` when `server_tx` is unready, and // schedules this task for wakeup. + let result = ready!(self.poll_client(cx)); - let mut result = self - .check_heartbeat(cx) - .and_then(|()| self.check_connection(cx)); - - if result.is_ok() { - result = ready!(self.poll_request(cx)); - } - + // Shut down the client and connection if there is an error. if let Err(error) = result { self.shutdown(); diff --git a/zebra-network/src/peer/error.rs b/zebra-network/src/peer/error.rs index 6263fb561..c40c34b1d 100644 --- a/zebra-network/src/peer/error.rs +++ b/zebra-network/src/peer/error.rs @@ -82,6 +82,10 @@ pub enum PeerError { #[error("Internal services over capacity")] Overloaded, + /// There are no ready remote peers. + #[error("No ready peers available")] + NoReadyPeers, + /// This peer request's caused an internal service timeout, so the connection was dropped /// to shed load or prevent attacks. #[error("Internal services timed out")] @@ -147,6 +151,7 @@ impl PeerError { PeerError::Serialization(inner) => format!("Serialization({inner})").into(), PeerError::DuplicateHandshake => "DuplicateHandshake".into(), PeerError::Overloaded => "Overloaded".into(), + PeerError::NoReadyPeers => "NoReadyPeers".into(), PeerError::InboundTimeout => "InboundTimeout".into(), PeerError::ServiceShutdown => "ServiceShutdown".into(), PeerError::NotFoundResponse(_) => "NotFoundResponse".into(), diff --git a/zebra-network/src/peer/load_tracked_client.rs b/zebra-network/src/peer/load_tracked_client.rs index 49d143072..6211e6cd5 100644 --- a/zebra-network/src/peer/load_tracked_client.rs +++ b/zebra-network/src/peer/load_tracked_client.rs @@ -20,6 +20,7 @@ use crate::{ /// A client service wrapper that keeps track of its load. /// /// It also keeps track of the peer's reported protocol version. +#[derive(Debug)] pub struct LoadTrackedClient { /// A service representing a connected peer, wrapped in a load tracker. service: PeakEwma, diff --git a/zebra-network/src/peer_set/inventory_registry.rs b/zebra-network/src/peer_set/inventory_registry.rs index b3de4ccee..b43915822 100644 --- a/zebra-network/src/peer_set/inventory_registry.rs +++ b/zebra-network/src/peer_set/inventory_registry.rs @@ -275,22 +275,30 @@ impl InventoryRegistry { self.current.iter().chain(self.prev.iter()) } - /// Returns a future that polls once for new registry updates. + /// Returns a future that waits for new registry updates. #[allow(dead_code)] pub fn update(&mut self) -> Update { Update::new(self) } - /// Drive periodic inventory tasks + /// Drive periodic inventory tasks. /// - /// # Details + /// Rotates the inventory HashMaps on every timer tick. + /// Drains the inv_stream channel and registers all advertised inventory. /// - /// - rotates HashMaps based on interval events - /// - drains the inv_stream channel and registers all advertised inventory - pub fn poll_inventory(&mut self, cx: &mut Context<'_>) -> Result<(), BoxError> { + /// Returns an error if the inventory channel is closed. + /// + /// Otherwise, returns `Ok` if it performed at least one update or rotation, or `Poll::Pending` + /// if there was no inventory change. Always registers a wakeup for the next inventory update + /// or rotation, even when it returns `Ok`. + pub fn poll_inventory(&mut self, cx: &mut Context<'_>) -> Poll> { + let mut result = Poll::Pending; + // # Correctness // // Registers the current task for wakeup when the timer next becomes ready. + // (But doesn't return, because we also want to register the task for wakeup when more + // inventory arrives.) // // # Security // @@ -302,6 +310,7 @@ impl InventoryRegistry { // two interval ticks are delayed. if Pin::new(&mut self.interval).poll_next(cx).is_ready() { self.rotate(); + result = Poll::Ready(Ok(())); } // This module uses a broadcast channel instead of an mpsc channel, even @@ -320,24 +329,38 @@ impl InventoryRegistry { // rather than propagating it through the peer set's Service::poll_ready // implementation, where reporting a failure means reporting a permanent // failure of the peer set. - while let Poll::Ready(channel_result) = self.inv_stream.next().poll_unpin(cx) { + + // Returns Pending if all messages are processed, but the channel could get more. + loop { + let channel_result = self.inv_stream.next().poll_unpin(cx); + match channel_result { - Some(Ok(change)) => self.register(change), - Some(Err(BroadcastStreamRecvError::Lagged(count))) => { + Poll::Ready(Some(Ok(change))) => { + self.register(change); + result = Poll::Ready(Ok(())); + } + Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(count)))) => { + // This isn't a fatal inventory error, it's expected behaviour when Zebra is + // under load from peers. metrics::counter!("pool.inventory.dropped", 1); metrics::counter!("pool.inventory.dropped.messages", count); - // If this message happens a lot, we should improve inventory registry performance, - // or poll the registry or peer set in a separate task. + // If this message happens a lot, we should improve inventory registry + // performance, or poll the registry or peer set in a separate task. info!(count, "dropped lagged inventory advertisements"); } - // This indicates all senders, including the one in the handshaker, - // have been dropped, which really is a permanent failure. - None => return Err(broadcast::error::RecvError::Closed.into()), + Poll::Ready(None) => { + // If the channel is empty and returns None, all senders, including the one in + // the handshaker, have been dropped, which really is a permanent failure. + result = Poll::Ready(Err(broadcast::error::RecvError::Closed.into())); + } + Poll::Pending => { + break; + } } } - Ok(()) + result } /// Record the given inventory `change` for the peer `addr`. diff --git a/zebra-network/src/peer_set/inventory_registry/tests/prop.rs b/zebra-network/src/peer_set/inventory_registry/tests/prop.rs index 959d7b918..6bed4fbd7 100644 --- a/zebra-network/src/peer_set/inventory_registry/tests/prop.rs +++ b/zebra-network/src/peer_set/inventory_registry/tests/prop.rs @@ -1,7 +1,11 @@ //! Randomised property tests for the inventory registry. -use std::collections::HashSet; +use std::{ + collections::HashSet, + task::{Context, Poll}, +}; +use futures::task::noop_waker; use proptest::prelude::*; use crate::{ @@ -81,10 +85,12 @@ async fn inv_registry_inbound_wrapper_with( forwarded_msg.expect("unexpected forwarded error result"), ); - inv_registry - .update() - .await - .expect("unexpected dropped registry sender channel"); + // We don't actually care if the registry takes any action here. + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + let _poll_pending_or_ok: Poll<()> = inv_registry + .poll_inventory(&mut cx) + .map(|result| result.expect("unexpected error polling inventory")); let test_peer = test_peer .get_transient_addr() diff --git a/zebra-network/src/peer_set/inventory_registry/update.rs b/zebra-network/src/peer_set/inventory_registry/update.rs index 9ebedc55a..5089d64d3 100644 --- a/zebra-network/src/peer_set/inventory_registry/update.rs +++ b/zebra-network/src/peer_set/inventory_registry/update.rs @@ -19,8 +19,11 @@ pub struct Update<'a> { impl Unpin for Update<'_> {} impl<'a> Update<'a> { - #[allow(dead_code)] - pub(super) fn new(registry: &'a mut InventoryRegistry) -> Self { + /// Returns a new future that returns when the next inventory update or rotation has been + /// completed by `registry`. + /// + /// See [`InventoryRegistry::poll_inventory()`] for details. + pub fn new(registry: &'a mut InventoryRegistry) -> Self { Self { registry } } } @@ -28,9 +31,10 @@ impl<'a> Update<'a> { impl Future for Update<'_> { type Output = Result<(), BoxError>; + /// A future that returns when the next inventory update or rotation has been completed. + /// + /// See [`InventoryRegistry::poll_inventory()`] for details. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // TODO: should the future wait until new changes arrive? - // or for the rotation timer? - Poll::Ready(self.registry.poll_inventory(cx)) + self.registry.poll_inventory(cx) } } diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 2fa546c98..fdec72fde 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -97,7 +97,6 @@ use std::{ collections::{HashMap, HashSet}, convert, fmt::Debug, - future::Future, marker::PhantomData, net::IpAddr, pin::Pin, @@ -107,14 +106,15 @@ use std::{ use futures::{ channel::{mpsc, oneshot}, - future::{FutureExt, TryFutureExt}, + future::{Future, FutureExt, TryFutureExt}, prelude::*, stream::FuturesUnordered, + task::noop_waker, }; use itertools::Itertools; use num_integer::div_ceil; use tokio::{ - sync::{broadcast, oneshot::error::TryRecvError, watch}, + sync::{broadcast, watch}, task::JoinHandle, }; use tower::{ @@ -191,17 +191,6 @@ where // Request Routing // - /// A preselected ready service. - /// - /// # Correctness - /// - /// If this is `Some(addr)`, `addr` must be a key for a peer in `ready_services`. - /// If that peer is removed from `ready_services`, we must set the preselected peer to `None`. - /// - /// This is handled by [`PeerSet::take_ready_service`] and - /// [`PeerSet::disconnect_from_outdated_peers`]. - preselected_p2c_peer: Option, - /// Stores gossiped inventory hashes from connected peers. /// /// Used to route inventory requests to peers that are likely to have it. @@ -265,7 +254,11 @@ where C: ChainTip, { fn drop(&mut self) { - self.shut_down_tasks_and_channels() + // We don't have access to the current task (if any), so we just drop everything we can. + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + + self.shut_down_tasks_and_channels(&mut cx); } } @@ -310,7 +303,6 @@ where // Ready peers ready_services: HashMap::new(), // Request Routing - preselected_p2c_peer: None, inventory_registry: InventoryRegistry::new(inv_stream), // Busy peers @@ -335,52 +327,51 @@ where /// Check background task handles to make sure they're still running. /// + /// Never returns `Ok`. + /// /// If any background task exits, shuts down all other background tasks, - /// and returns an error. - fn poll_background_errors(&mut self, cx: &mut Context) -> Result<(), BoxError> { - if let Some(result) = self.receive_tasks_if_needed() { - return result; - } + /// and returns an error. Otherwise, returns `Pending`, and registers a wakeup for + /// receiving the background tasks, or the background tasks exiting. + fn poll_background_errors(&mut self, cx: &mut Context<'_>) -> Poll> { + futures::ready!(self.receive_tasks_if_needed(cx))?; - match Pin::new(&mut self.guards).poll_next(cx) { - // All background tasks are still running. - Poll::Pending => Ok(()), - - Poll::Ready(Some(res)) => { + // Return Pending if all background tasks are still running. + match futures::ready!(Pin::new(&mut self.guards).poll_next(cx)) { + Some(res) => { info!( background_tasks = %self.guards.len(), "a peer set background task exited, shutting down other peer set tasks" ); - self.shut_down_tasks_and_channels(); + self.shut_down_tasks_and_channels(cx); - // Flatten the join result and inner result, - // then turn Ok() task exits into errors. + // Flatten the join result and inner result, and return any errors. res.map_err(Into::into) // TODO: replace with Result::flatten when it stabilises (#70142) - .and_then(convert::identity) - .and(Err("a peer set background task exited".into())) + .and_then(convert::identity)?; + + // Turn Ok() task exits into errors. + Poll::Ready(Err("a peer set background task exited".into())) } - Poll::Ready(None) => { - self.shut_down_tasks_and_channels(); - Err("all peer set background tasks have exited".into()) + None => { + self.shut_down_tasks_and_channels(cx); + Poll::Ready(Err("all peer set background tasks have exited".into())) } } } - /// Receive background tasks, if they've been sent on the channel, - /// but not consumed yet. + /// Receive background tasks, if they've been sent on the channel, but not consumed yet. /// - /// Returns a result representing the current task state, - /// or `None` if the background tasks should be polled to check their state. - fn receive_tasks_if_needed(&mut self) -> Option> { + /// Returns a result representing the current task state, or `Poll::Pending` if the background + /// tasks should be polled again to check their state. + fn receive_tasks_if_needed(&mut self, cx: &mut Context<'_>) -> Poll> { if self.guards.is_empty() { - match self.handle_rx.try_recv() { - // The tasks haven't been sent yet. - Err(TryRecvError::Empty) => Some(Ok(())), + // Return Pending if the tasks have not been sent yet. + let handles = futures::ready!(Pin::new(&mut self.handle_rx).poll(cx)); - // The tasks have been sent, but not consumed. + match handles { + // The tasks have been sent, but not consumed yet. Ok(handles) => { // Currently, the peer set treats an empty background task set as an error. // @@ -393,21 +384,16 @@ where self.guards.extend(handles); - None + Poll::Ready(Ok(())) } - // The tasks have been sent and consumed, but then they exited. - // - // Correctness: the peer set must receive at least one task. - // - // TODO: refactor `handle_rx` and `guards` into an enum - // for the background task state: Waiting/Running/Shutdown. - Err(TryRecvError::Closed) => { - Some(Err("all peer set background tasks have exited".into())) - } + // The sender was dropped without sending the tasks. + Err(_) => Poll::Ready(Err( + "sender did not send peer background tasks before it was dropped".into(), + )), } } else { - None + Poll::Ready(Ok(())) } } @@ -415,9 +401,8 @@ where /// - services by dropping the service lists /// - background tasks via their join handles or cancel handles /// - channels by closing the channel - fn shut_down_tasks_and_channels(&mut self) { + fn shut_down_tasks_and_channels(&mut self, cx: &mut Context<'_>) { // Drop services and cancel their background tasks. - self.preselected_p2c_peer = None; self.ready_services = HashMap::new(); for (_peer_key, handle) in self.cancel_handles.drain() { @@ -429,9 +414,9 @@ where // so we don't add more peers to a shut down peer set. self.demand_signal.close_channel(); - // Shut down background tasks. + // Shut down background tasks, ignoring pending polls. self.handle_rx.close(); - self.receive_tasks_if_needed(); + let _ = self.receive_tasks_if_needed(cx); for guard in self.guards.iter() { guard.abort(); } @@ -441,25 +426,63 @@ where /// /// Move newly ready services to the ready list if they are for peers with supported protocol /// versions, otherwise they are dropped. Also drop failed services. - fn poll_unready(&mut self, cx: &mut Context<'_>) { + /// + /// Never returns an error. + /// + /// Returns `Ok(Some(())` if at least one peer became ready, `Poll::Pending` if there are + /// unready peers, but none became ready, and `Ok(None)` if the unready peers were empty. + /// + /// If there are any remaining unready peers, registers a wakeup for the next time one becomes + /// ready. If there are no unready peers, doesn't register any wakeups. (Since wakeups come + /// from peers, there needs to be at least one peer to register a wakeup.) + fn poll_unready(&mut self, cx: &mut Context<'_>) -> Poll, BoxError>> { + let mut result = Poll::Pending; + + // # Correctness + // + // `poll_next()` must always be called, because `self.unready_services` could have been + // empty before the call to `self.poll_ready()`. + // + // > When new futures are added, `poll_next` must be called in order to begin receiving + // > wake-ups for new futures. + // + // + // + // Returns Pending if we've finished processing the unready service changes, + // but there are still some unready services. loop { - match Pin::new(&mut self.unready_services).poll_next(cx) { - // No unready service changes, or empty unready services - Poll::Pending | Poll::Ready(None) => return, + // No ready peers left, but there are some unready peers pending. + let Poll::Ready(ready_peer) = Pin::new(&mut self.unready_services).poll_next(cx) else { + break; + }; + + match ready_peer { + // No unready peers in the list. + None => { + // If we've finished processing the unready service changes, and there are no + // unready services left, it doesn't make sense to return Pending, because + // their stream is terminated. But when we add more unready peers and call + // `poll_next()`, its termination status will be reset, and it will receive + // wakeups again. + if result.is_pending() { + result = Poll::Ready(Ok(None)); + } + + break; + } // Unready -> Ready - Poll::Ready(Some(Ok((key, svc)))) => { + Some(Ok((key, svc))) => { trace!(?key, "service became ready"); - let cancel = self.cancel_handles.remove(&key); - assert!(cancel.is_some(), "missing cancel handle"); - if svc.remote_version() >= self.minimum_peer_version.current() { - self.ready_services.insert(key, svc); - } + self.push_ready(true, key, svc); + + // Return Ok if at least one peer became ready. + result = Poll::Ready(Ok(Some(()))); } // Unready -> Canceled - Poll::Ready(Some(Err((key, UnreadyError::Canceled)))) => { + Some(Err((key, UnreadyError::Canceled))) => { // A service be canceled because we've connected to the same service twice. // In that case, there is a cancel handle for the peer address, // but it belongs to the service for the newer connection. @@ -469,7 +492,7 @@ where "service was canceled, dropping service" ); } - Poll::Ready(Some(Err((key, UnreadyError::CancelHandleDropped(_))))) => { + Some(Err((key, UnreadyError::CancelHandleDropped(_)))) => { // Similarly, services with dropped cancel handes can have duplicates. trace!( ?key, @@ -479,7 +502,7 @@ where } // Unready -> Errored - Poll::Ready(Some(Err((key, UnreadyError::Inner(error))))) => { + Some(Err((key, UnreadyError::Inner(error)))) => { debug!(%error, "service failed while unready, dropping service"); let cancel = self.cancel_handles.remove(&key); @@ -487,6 +510,57 @@ where } } } + + result + } + + /// Checks previously ready peer services for errors. + /// + /// The only way these peer `Client`s can become unready is when we send them a request, + /// because the peer set has exclusive access to send requests to each peer. (If an inbound + /// request is in progress, it will be handled, then our request will be sent by the connection + /// task.) + /// + /// Returns `Poll::Ready` if there are some ready peers, and `Poll::Pending` if there are no + /// ready peers. Registers a wakeup if any peer has failed due to a disconnection, hang, or protocol error. + /// + /// # Panics + /// + /// If any peers somehow became unready without being sent a request. This indicates a bug in the peer set, where requests + /// are sent to peers without putting them in `unready_peers`. + fn poll_ready_peer_errors(&mut self, cx: &mut Context<'_>) -> Poll<()> { + let mut previous = HashMap::new(); + std::mem::swap(&mut previous, &mut self.ready_services); + + // TODO: consider only checking some peers each poll (for performance reasons), + // but make sure we eventually check all of them. + for (key, mut svc) in previous.drain() { + let Poll::Ready(peer_readiness) = Pin::new(&mut svc).poll_ready(cx) else { + unreachable!( + "unexpected unready peer: peers must be put into the unready_peers list \ + after sending them a request" + ); + }; + + match peer_readiness { + // Still ready, add it back to the list. + Ok(()) => self.push_ready(false, key, svc), + + // Ready -> Errored + Err(error) => { + debug!(%error, "service failed while ready, dropping service"); + + // Ready services can just be dropped, they don't need any cleanup. + std::mem::drop(svc); + } + } + } + + if self.ready_services.is_empty() { + Poll::Pending + } else { + Poll::Ready(()) + } } /// Returns the number of peer connections Zebra already has with @@ -509,17 +583,35 @@ where self.ready_services.contains_key(&addr) || self.cancel_handles.contains_key(&addr) } - /// Checks for newly inserted or removed services. + /// Processes the entire list of newly inserted or removed services. /// /// Puts inserted services in the unready list. /// Drops removed services, after cancelling any pending requests. + /// + /// If the peer connector channel is closed, returns an error. + /// + /// Otherwise, returns `Ok` if it discovered at least one peer, or `Poll::Pending` if it didn't + /// discover any peers. Always registers a wakeup for new peers, even when it returns `Ok`. fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll> { - use futures::ready; + // Return pending if there are no peers in the list. + let mut result = Poll::Pending; + loop { - match ready!(Pin::new(&mut self.discover).poll_discover(cx)) + // If we've emptied the list, finish looping, otherwise process the new peer. + let Poll::Ready(discovered) = Pin::new(&mut self.discover).poll_discover(cx) else { + break; + }; + + // If the change channel has a permanent error, return that error. + let change = discovered .ok_or("discovery stream closed")? - .map_err(Into::into)? - { + .map_err(Into::into)?; + + // Otherwise we have successfully processed a peer. + result = Poll::Ready(Ok(())); + + // Process each change. + match change { Change::Remove(key) => { trace!(?key, "got Change::Remove from Discover"); self.remove(&key); @@ -552,32 +644,22 @@ where } } } + + result } /// Checks if the minimum peer version has changed, and disconnects from outdated peers. fn disconnect_from_outdated_peers(&mut self) { if let Some(minimum_version) = self.minimum_peer_version.changed() { - self.ready_services.retain(|address, peer| { - if peer.remote_version() >= minimum_version { - true - } else { - if self.preselected_p2c_peer == Some(*address) { - self.preselected_p2c_peer = None; - } - - false - } - }); + // It is ok to drop ready services, they don't need anything cancelled. + self.ready_services + .retain(|_address, peer| peer.remote_version() >= minimum_version); } } - /// Takes a ready service by key, invalidating `preselected_p2c_peer` if needed. + /// Takes a ready service by key. fn take_ready_service(&mut self, key: &D::Key) -> Option { if let Some(svc) = self.ready_services.remove(key) { - if Some(*key) == self.preselected_p2c_peer { - self.preselected_p2c_peer = None; - } - assert!( !self.cancel_handles.contains_key(key), "cancel handles are only used for unready service work" @@ -605,6 +687,25 @@ where } } + /// Adds a ready service to the ready list if it's for a peer with a supported version. + /// If `was_unready` is true, also removes the peer's cancel handle. + /// + /// If the service is for a connection to an outdated peer, the service is dropped. + fn push_ready(&mut self, was_unready: bool, key: D::Key, svc: D::Service) { + let cancel = self.cancel_handles.remove(&key); + assert_eq!( + cancel.is_some(), + was_unready, + "missing or unexpected cancel handle" + ); + + if svc.remote_version() >= self.minimum_peer_version.current() { + self.ready_services.insert(key, svc); + } else { + std::mem::drop(svc); + } + } + /// Adds a busy service to the unready list if it's for a peer with a supported version, /// and adds a cancel handle for the service's current request. /// @@ -631,7 +732,7 @@ where } /// Performs P2C on `self.ready_services` to randomly select a less-loaded ready service. - fn preselect_p2c_peer(&self) -> Option { + fn select_ready_p2c_peer(&self) -> Option { self.select_p2c_peer_from_list(&self.ready_services.keys().copied().collect()) } @@ -647,8 +748,7 @@ where .expect("just checked there is one service"), ), len => { - // If there are only 2 peers, randomise their order. - // Otherwise, choose 2 random peers in a random order. + // Choose 2 random peers, then return the least loaded of those 2 peers. let (a, b) = { let idxs = rand::seq::index::sample(&mut rand::thread_rng(), len, 2); let a = idxs.index(0); @@ -708,19 +808,32 @@ where /// Routes a request using P2C load-balancing. fn route_p2c(&mut self, req: Request) -> >::Future { - let preselected_key = self - .preselected_p2c_peer - .expect("ready peer service must have a preselected peer"); + if let Some(p2c_key) = self.select_ready_p2c_peer() { + tracing::trace!(?p2c_key, "routing based on p2c"); - tracing::trace!(?preselected_key, "routing based on p2c"); + let mut svc = self + .take_ready_service(&p2c_key) + .expect("selected peer must be ready"); - let mut svc = self - .take_ready_service(&preselected_key) - .expect("ready peer set must have preselected a ready peer"); + let fut = svc.call(req); + self.push_unready(p2c_key, svc); - let fut = svc.call(req); - self.push_unready(preselected_key, svc); - fut.map_err(Into::into).boxed() + return fut.map_err(Into::into).boxed(); + } + + async move { + // Let other tasks run, so a retry request might get different ready peers. + tokio::task::yield_now().await; + + // # Security + // + // Avoid routing requests to peers that are missing inventory. + // If we kept trying doomed requests, peers that are missing our requested inventory + // could take up a large amount of our bandwidth and retry limits. + Err(SharedPeerError::from(PeerError::NoReadyPeers)) + } + .map_err(Into::into) + .boxed() } /// Tries to route a request to a ready peer that advertised that inventory, @@ -995,78 +1108,71 @@ where Pin> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.poll_background_errors(cx)?; + // Update service and peer statuses. + // + // # Correctness + // + // All of the futures that receive a context from this method can wake the peer set buffer + // task. If there are no ready peers, and no new peers, network requests will pause until: + // - an unready peer becomes ready, or + // - a new peer arrives. - // Update peer statuses - let _ = self.poll_discover(cx)?; + // Check for new peers, and register a task wakeup when the next new peers arrive. New peers + // can be infrequent if our connection slots are full, or we're connected to all + // available/useful peers. + let _poll_pending_or_ready: Poll<()> = self.poll_discover(cx)?; + + // These tasks don't provide new peers or newly ready peers. + let _poll_pending: Poll<()> = self.poll_background_errors(cx)?; + let _poll_pending_or_ready: Poll<()> = self.inventory_registry.poll_inventory(cx)?; + + // Check for newly ready peers, including newly added peers (which are added as unready). + // So it needs to run after `poll_discover()`. Registers a wakeup if there are any unready + // peers. + // + // Each connected peer should become ready within a few minutes, or timeout, close the + // connection, and release its connection slot. + // + // TODO: drop peers that overload us with inbound messages and never become ready (#7822) + let _poll_pending_or_ready: Poll> = self.poll_unready(cx)?; + + // Cleanup and metrics. + + // Only checks the versions of ready peers, so it needs to run after `poll_unready()`. self.disconnect_from_outdated_peers(); - self.inventory_registry.poll_inventory(cx)?; - self.poll_unready(cx); + // These metrics should run last, to report the most up-to-date information. self.log_peer_set_size(); self.update_metrics(); - loop { - // Re-check that the pre-selected service is ready, in case - // something has happened since (e.g., it failed, peer closed - // connection, ...) - if let Some(key) = self.preselected_p2c_peer { - trace!(preselected_key = ?key); - let mut service = self - .take_ready_service(&key) - .expect("preselected peer must be in the ready list"); - match service.poll_ready(cx) { - Poll::Ready(Ok(())) => { - trace!("preselected service is still ready, keeping it selected"); - self.preselected_p2c_peer = Some(key); - self.ready_services.insert(key, service); - return Poll::Ready(Ok(())); - } - Poll::Pending => { - trace!("preselected service is no longer ready, moving to unready list"); - self.push_unready(key, service); - } - Poll::Ready(Err(error)) => { - trace!(%error, "preselected service failed, dropping it"); - std::mem::drop(service); - } - } - } + // Check for failures in ready peers, removing newly errored or disconnected peers. + // So it needs to run after `poll_unready()`. + let ready_peers: Poll<()> = self.poll_ready_peer_errors(cx); - trace!("preselected service was not ready, preselecting another ready service"); - self.preselected_p2c_peer = self.preselect_p2c_peer(); - self.update_metrics(); + if ready_peers.is_pending() { + // # Correctness + // + // If the channel is full, drop the demand signal rather than waiting. If we waited + // here, the crawler could deadlock sending a request to fetch more peers, because it + // also empties the channel. + trace!("no ready services, sending demand signal"); + let _ = self.demand_signal.try_send(MorePeers); - if self.preselected_p2c_peer.is_none() { - // CORRECTNESS - // - // If the channel is full, drop the demand signal rather than waiting. - // If we waited here, the crawler could deadlock sending a request to - // fetch more peers, because it also empties the channel. - trace!("no ready services, sending demand signal"); - let _ = self.demand_signal.try_send(MorePeers); - - // CORRECTNESS - // - // The current task must be scheduled for wakeup every time we - // return `Poll::Pending`. - // - // As long as there are unready or new peers, this task will run, - // because: - // - `poll_discover` schedules this task for wakeup when new - // peers arrive. - // - if there are unready peers, `poll_unready` schedules this - // task for wakeup when peer services become ready. - // - if the preselected peer is not ready, `service.poll_ready` - // schedules this task for wakeup when that service becomes - // ready. - // - // To avoid peers blocking on a full background error channel: - // - if no background tasks have exited since the last poll, - // `poll_background_errors` schedules this task for wakeup when - // the next task exits. - return Poll::Pending; - } + // # Correctness + // + // The current task must be scheduled for wakeup every time we return `Poll::Pending`. + // + // As long as there are unready or new peers, this task will run, because: + // - `poll_discover` schedules this task for wakeup when new peers arrive. + // - if there are unready peers, `poll_unready` or `poll_ready_peers` schedule this + // task for wakeup when peer services become ready. + // + // To avoid peers blocking on a full peer status/error channel: + // - `poll_background_errors` schedules this task for wakeup when the peer status + // update task exits. + Poll::Pending + } else { + Poll::Ready(Ok(())) } } diff --git a/zebra-network/src/peer_set/set/tests/vectors.rs b/zebra-network/src/peer_set/set/tests/vectors.rs index 829046999..02ddc25d8 100644 --- a/zebra-network/src/peer_set/set/tests/vectors.rs +++ b/zebra-network/src/peer_set/set/tests/vectors.rs @@ -10,7 +10,6 @@ use zebra_chain::{ parameters::{Network, NetworkUpgrade}, }; -use super::{PeerSetBuilder, PeerVersions}; use crate::{ constants::DEFAULT_MAX_CONNS_PER_IP, peer::{ClientRequest, MinimumPeerVersion}, @@ -19,6 +18,8 @@ use crate::{ Request, SharedPeerError, }; +use super::{PeerSetBuilder, PeerVersions}; + #[test] fn peer_set_ready_single_connection() { // We are going to use just one peer version in this test diff --git a/zebra-state/src/service/finalized_state/disk_format/upgrade.rs b/zebra-state/src/service/finalized_state/disk_format/upgrade.rs index 86a04553c..d2981e62a 100644 --- a/zebra-state/src/service/finalized_state/disk_format/upgrade.rs +++ b/zebra-state/src/service/finalized_state/disk_format/upgrade.rs @@ -883,7 +883,7 @@ impl DbFormatChangeThreadHandle { /// /// This method should be called regularly, so that panics are detected as soon as possible. pub fn check_for_panics(&mut self) { - self.update_task.check_for_panics(); + self.update_task.panic_if_task_has_panicked(); } /// Wait for the spawned thread to finish. If it exited with a panic, resume that panic.