fix(net): Fix potential network hangs, and reduce code complexity (#7859)

* Refactor return type of poll_discover()

* Simplify poll_ready() by removing preselected peers

* Fix peer set readiness check

* Pass task context correctly to background tasks

* Make poll_discover() return Pending

* Make poll_inventory() return Pending

* Make poll_unready() return Poll::Pending

* Simplify with futures::ready!() and ?

* When there are no peers, wake on newly ready peers, or new peers, in that order

* Preserve the original waker when there are no unready peers

* Fix polling docs and remove unnecessary code

* Make sure we're ignoring Poll::Pending not Result

* Make panic checking method names clearer

* Fix connection Client task wakeups and error handling

* Cleanup connection panic handling and add wakeup docs

* Fix connection client task wakeups to prevent hangs

* Simplify error and pending handling

* Clarify inventory set behaviour

* Define peer set poll_* methods so they return Ok if they do something

* Clarify documentation

Co-authored-by: Arya <aryasolhi@gmail.com>

* Fix test that depended on preselected peers

* Check ready peers for errors before sending requests to them

* Fix a hanging test by not waiting for irrelevant actions

* Only remove cancel handles when they are required

* fix incorrect panic on termination setting

* Clarify method comments

Co-authored-by: Arya <aryasolhi@gmail.com>

---------

Co-authored-by: Arya <aryasolhi@gmail.com>
This commit is contained in:
teor 2023-11-17 05:53:24 +10:00 committed by GitHub
parent a22c8d5f42
commit d689e7344b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 593 additions and 272 deletions

View File

@ -9,7 +9,7 @@ pub mod future;
pub mod thread; pub mod thread;
/// A trait that checks a task's return value for panics. /// A trait that checks a task's return value for panics.
pub trait CheckForPanics { pub trait CheckForPanics: Sized {
/// The output type, after removing panics from `Self`. /// The output type, after removing panics from `Self`.
type Output; type Output;
@ -20,11 +20,34 @@ pub trait CheckForPanics {
/// ///
/// If `self` contains a panic payload or an unexpected termination. /// If `self` contains a panic payload or an unexpected termination.
#[track_caller] #[track_caller]
fn check_for_panics(self) -> Self::Output; fn panic_if_task_has_finished(self) -> Self::Output {
self.check_for_panics_with(true)
}
/// Check if `self` contains a panic payload, then panic.
/// Otherwise, return the non-panic part of `self`.
///
/// # Panics
///
/// If `self` contains a panic payload.
#[track_caller]
fn panic_if_task_has_panicked(self) -> Self::Output {
self.check_for_panics_with(false)
}
/// Check if `self` contains a panic payload, then panic. Also panics if
/// `panic_on_unexpected_termination` is true, and `self` is an unexpected termination.
/// Otherwise, return the non-panic part of `self`.
///
/// # Panics
///
/// If `self` contains a panic payload, or if we're panicking on unexpected terminations.
#[track_caller]
fn check_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output;
} }
/// A trait that waits for a task to finish, then handles panics and cancellations. /// A trait that waits for a task to finish, then handles panics and cancellations.
pub trait WaitForPanics { pub trait WaitForPanics: Sized {
/// The underlying task output, after removing panics and unwrapping termination results. /// The underlying task output, after removing panics and unwrapping termination results.
type Output; type Output;
@ -43,5 +66,44 @@ pub trait WaitForPanics {
/// ///
/// If `self` contains an expected termination, and we're shutting down anyway. /// If `self` contains an expected termination, and we're shutting down anyway.
#[track_caller] #[track_caller]
fn wait_for_panics(self) -> Self::Output; fn wait_and_panic_on_unexpected_termination(self) -> Self::Output {
self.wait_for_panics_with(true)
}
/// Waits for `self` to finish, then check if its output is:
/// - a panic payload: resume that panic,
/// - a task termination: hang waiting for shutdown.
///
/// Otherwise, returns the task return value of `self`.
///
/// # Panics
///
/// If `self` contains a panic payload.
///
/// # Hangs
///
/// If `self` contains a task termination.
#[track_caller]
fn wait_for_panics(self) -> Self::Output {
self.wait_for_panics_with(false)
}
/// Waits for `self` to finish, then check if its output is:
/// - a panic payload: resume that panic,
/// - an unexpected termination:
/// - if `panic_on_unexpected_termination` is true, panic with that error,
/// - otherwise, hang waiting for shutdown,
/// - an expected termination: hang waiting for shutdown.
///
/// Otherwise, returns the task return value of `self`.
///
/// # Panics
///
/// If `self` contains a panic payload, or if we're panicking on unexpected terminations.
///
/// # Hangs
///
/// If `self` contains an expected or ignored termination, and we're shutting down anyway.
#[track_caller]
fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output;
} }

View File

@ -18,15 +18,18 @@ impl<T> CheckForPanics for Result<T, JoinError> {
type Output = Result<T, JoinError>; type Output = Result<T, JoinError>;
/// Returns the task result if the task finished normally. /// Returns the task result if the task finished normally.
/// Otherwise, resumes any panics, logs unexpected errors, and ignores any expected errors. /// Otherwise, resumes any panics, and ignores any expected errors.
/// Handles unexpected errors based on `panic_on_unexpected_termination`.
/// ///
/// If the task finished normally, returns `Some(T)`. /// If the task finished normally, returns `Some(T)`.
/// If the task was cancelled, returns `None`. /// If the task was cancelled, returns `None`.
#[track_caller] #[track_caller]
fn check_for_panics(self) -> Self::Output { fn check_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output {
match self { match self {
Ok(task_output) => Ok(task_output), Ok(task_output) => Ok(task_output),
Err(join_error) => Err(join_error.check_for_panics()), Err(join_error) => {
Err(join_error.check_for_panics_with(panic_on_unexpected_termination))
}
} }
} }
} }
@ -38,22 +41,26 @@ impl CheckForPanics for JoinError {
/// Resume any panics and panic on unexpected task cancellations. /// Resume any panics and panic on unexpected task cancellations.
/// Always returns [`JoinError::Cancelled`](JoinError::is_cancelled). /// Always returns [`JoinError::Cancelled`](JoinError::is_cancelled).
#[track_caller] #[track_caller]
fn check_for_panics(self) -> Self::Output { fn check_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output {
match self.try_into_panic() { match self.try_into_panic() {
Ok(panic_payload) => panic::resume_unwind(panic_payload), Ok(panic_payload) => panic::resume_unwind(panic_payload),
// We could ignore this error, but then we'd have to change the return type. // We could ignore this error, but then we'd have to change the return type.
Err(task_cancelled) if is_shutting_down() => {
debug!(
?task_cancelled,
"ignoring cancelled task because Zebra is shutting down"
);
task_cancelled
}
Err(task_cancelled) => { Err(task_cancelled) => {
panic!("task cancelled during normal Zebra operation: {task_cancelled:?}"); if !panic_on_unexpected_termination {
debug!(?task_cancelled, "ignoring expected task termination");
task_cancelled
} else if is_shutting_down() {
debug!(
?task_cancelled,
"ignoring task termination because Zebra is shutting down"
);
task_cancelled
} else {
panic!("task unexpectedly exited with: {:?}", task_cancelled)
}
} }
} }
} }
@ -67,7 +74,9 @@ where
/// Returns a future which waits for `self` to finish, then checks if its output is: /// Returns a future which waits for `self` to finish, then checks if its output is:
/// - a panic payload: resume that panic, /// - a panic payload: resume that panic,
/// - an unexpected termination: panic with that error, /// - an unexpected termination:
/// - if `panic_on_unexpected_termination` is true, panic with that error,
/// - otherwise, hang waiting for shutdown,
/// - an expected termination: hang waiting for shutdown. /// - an expected termination: hang waiting for shutdown.
/// ///
/// Otherwise, returns the task return value of `self`. /// Otherwise, returns the task return value of `self`.
@ -79,11 +88,15 @@ where
/// # Hangs /// # Hangs
/// ///
/// If `self` contains an expected termination, and we're shutting down anyway. /// If `self` contains an expected termination, and we're shutting down anyway.
/// If we're ignoring terminations because `panic_on_unexpected_termination` is `false`.
/// Futures hang by returning `Pending` and not setting a waker, so this uses minimal resources. /// Futures hang by returning `Pending` and not setting a waker, so this uses minimal resources.
#[track_caller] #[track_caller]
fn wait_for_panics(self) -> Self::Output { fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output {
async move { async move {
match self.await.check_for_panics() { match self
.await
.check_for_panics_with(panic_on_unexpected_termination)
{
Ok(task_output) => task_output, Ok(task_output) => task_output,
Err(_expected_cancel_error) => future::pending().await, Err(_expected_cancel_error) => future::pending().await,
} }

View File

@ -8,19 +8,44 @@ use std::{
thread::{self, JoinHandle}, thread::{self, JoinHandle},
}; };
use crate::shutdown::is_shutting_down;
use super::{CheckForPanics, WaitForPanics}; use super::{CheckForPanics, WaitForPanics};
impl<T> CheckForPanics for thread::Result<T> { impl<T> CheckForPanics for thread::Result<T>
where
T: std::fmt::Debug,
{
type Output = T; type Output = T;
/// Panics if the thread panicked. /// # Panics
///
/// - if the thread panicked.
/// - if the thread is cancelled, `panic_on_unexpected_termination` is true, and
/// Zebra is not shutting down.
/// ///
/// Threads can't be cancelled except by using a panic, so there are no thread errors here. /// Threads can't be cancelled except by using a panic, so there are no thread errors here.
/// `panic_on_unexpected_termination` is
#[track_caller] #[track_caller]
fn check_for_panics(self) -> Self::Output { fn check_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output {
match self { match self {
// The value returned by the thread when it finished. // The value returned by the thread when it finished.
Ok(thread_output) => thread_output, Ok(thread_output) => {
if !panic_on_unexpected_termination {
debug!(?thread_output, "ignoring expected thread exit");
thread_output
} else if is_shutting_down() {
debug!(
?thread_output,
"ignoring thread exit because Zebra is shutting down"
);
thread_output
} else {
panic!("thread unexpectedly exited with: {:?}", thread_output)
}
}
// A thread error is always a panic. // A thread error is always a panic.
Err(panic_payload) => panic::resume_unwind(panic_payload), Err(panic_payload) => panic::resume_unwind(panic_payload),
@ -28,17 +53,24 @@ impl<T> CheckForPanics for thread::Result<T> {
} }
} }
impl<T> WaitForPanics for JoinHandle<T> { impl<T> WaitForPanics for JoinHandle<T>
where
T: std::fmt::Debug,
{
type Output = T; type Output = T;
/// Waits for the thread to finish, then panics if the thread panicked. /// Waits for the thread to finish, then panics if the thread panicked.
#[track_caller] #[track_caller]
fn wait_for_panics(self) -> Self::Output { fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output {
self.join().check_for_panics() self.join()
.check_for_panics_with(panic_on_unexpected_termination)
} }
} }
impl<T> WaitForPanics for Arc<JoinHandle<T>> { impl<T> WaitForPanics for Arc<JoinHandle<T>>
where
T: std::fmt::Debug,
{
type Output = Option<T>; type Output = Option<T>;
/// If this is the final `Arc`, waits for the thread to finish, then panics if the thread /// If this is the final `Arc`, waits for the thread to finish, then panics if the thread
@ -46,7 +78,7 @@ impl<T> WaitForPanics for Arc<JoinHandle<T>> {
/// ///
/// If this is not the final `Arc`, drops the handle and immediately returns `None`. /// If this is not the final `Arc`, drops the handle and immediately returns `None`.
#[track_caller] #[track_caller]
fn wait_for_panics(self) -> Self::Output { fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output {
// If we are the last Arc with a reference to this handle, // If we are the last Arc with a reference to this handle,
// we can wait for it and propagate any panics. // we can wait for it and propagate any panics.
// //
@ -56,14 +88,17 @@ impl<T> WaitForPanics for Arc<JoinHandle<T>> {
// This is more readable as an expanded statement. // This is more readable as an expanded statement.
#[allow(clippy::manual_map)] #[allow(clippy::manual_map)]
if let Some(handle) = Arc::into_inner(self) { if let Some(handle) = Arc::into_inner(self) {
Some(handle.wait_for_panics()) Some(handle.wait_for_panics_with(panic_on_unexpected_termination))
} else { } else {
None None
} }
} }
} }
impl<T> CheckForPanics for &mut Option<Arc<JoinHandle<T>>> { impl<T> CheckForPanics for &mut Option<Arc<JoinHandle<T>>>
where
T: std::fmt::Debug,
{
type Output = Option<T>; type Output = Option<T>;
/// If this is the final `Arc`, checks if the thread has finished, then panics if the thread /// If this is the final `Arc`, checks if the thread has finished, then panics if the thread
@ -71,14 +106,14 @@ impl<T> CheckForPanics for &mut Option<Arc<JoinHandle<T>>> {
/// ///
/// If the thread has not finished, or this is not the final `Arc`, returns `None`. /// If the thread has not finished, or this is not the final `Arc`, returns `None`.
#[track_caller] #[track_caller]
fn check_for_panics(self) -> Self::Output { fn check_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output {
let handle = self.take()?; let handle = self.take()?;
if handle.is_finished() { if handle.is_finished() {
// This is the same as calling `self.wait_for_panics()`, but we can't do that, // This is the same as calling `self.wait_for_panics()`, but we can't do that,
// because we've taken `self`. // because we've taken `self`.
#[allow(clippy::manual_map)] #[allow(clippy::manual_map)]
return handle.wait_for_panics(); return handle.wait_for_panics_with(panic_on_unexpected_termination);
} }
*self = Some(handle); *self = Some(handle);
@ -87,7 +122,10 @@ impl<T> CheckForPanics for &mut Option<Arc<JoinHandle<T>>> {
} }
} }
impl<T> WaitForPanics for &mut Option<Arc<JoinHandle<T>>> { impl<T> WaitForPanics for &mut Option<Arc<JoinHandle<T>>>
where
T: std::fmt::Debug,
{
type Output = Option<T>; type Output = Option<T>;
/// If this is the final `Arc`, waits for the thread to finish, then panics if the thread /// If this is the final `Arc`, waits for the thread to finish, then panics if the thread
@ -95,10 +133,13 @@ impl<T> WaitForPanics for &mut Option<Arc<JoinHandle<T>>> {
/// ///
/// If this is not the final `Arc`, drops the handle and returns `None`. /// If this is not the final `Arc`, drops the handle and returns `None`.
#[track_caller] #[track_caller]
fn wait_for_panics(self) -> Self::Output { fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output {
// This is more readable as an expanded statement. // This is more readable as an expanded statement.
#[allow(clippy::manual_map)] #[allow(clippy::manual_map)]
if let Some(output) = self.take()?.wait_for_panics() { if let Some(output) = self
.take()?
.wait_for_panics_with(panic_on_unexpected_termination)
{
Some(output) Some(output)
} else { } else {
// Some other task has a reference, so we should give up ours to let them use it. // Some other task has a reference, so we should give up ours to let them use it.

View File

@ -18,6 +18,8 @@ use futures::{
use tokio::{sync::broadcast, task::JoinHandle}; use tokio::{sync::broadcast, task::JoinHandle};
use tower::Service; use tower::Service;
use zebra_chain::diagnostic::task::CheckForPanics;
use crate::{ use crate::{
peer::{ peer::{
error::{AlreadyErrored, ErrorSlot, PeerError, SharedPeerError}, error::{AlreadyErrored, ErrorSlot, PeerError, SharedPeerError},
@ -421,8 +423,15 @@ impl MissingInventoryCollector {
impl Client { impl Client {
/// Check if this connection's heartbeat task has exited. /// Check if this connection's heartbeat task has exited.
///
/// Returns an error if the heartbeat task exited. Otherwise, schedules the client task for
/// wakeup when the heartbeat task finishes, or the channel closes, and returns `Pending`.
///
/// # Panics
///
/// If the heartbeat task panicked.
#[allow(clippy::unwrap_in_result)] #[allow(clippy::unwrap_in_result)]
fn check_heartbeat(&mut self, cx: &mut Context<'_>) -> Result<(), SharedPeerError> { fn poll_heartbeat(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SharedPeerError>> {
let is_canceled = self let is_canceled = self
.shutdown_tx .shutdown_tx
.as_mut() .as_mut()
@ -430,17 +439,19 @@ impl Client {
.poll_canceled(cx) .poll_canceled(cx)
.is_ready(); .is_ready();
if is_canceled { let result = match self.heartbeat_task.poll_unpin(cx) {
return self.set_task_exited_error(
"heartbeat",
PeerError::HeartbeatTaskExited("Task was cancelled".to_string()),
);
}
match self.heartbeat_task.poll_unpin(cx) {
Poll::Pending => { Poll::Pending => {
// Heartbeat task is still running. // The heartbeat task returns `Pending` while it continues to run.
Ok(()) // But if it has dropped its receiver, it is shutting down, and we should also shut down.
if is_canceled {
self.set_task_exited_error(
"heartbeat",
PeerError::HeartbeatTaskExited("Task was cancelled".to_string()),
)
} else {
// Heartbeat task is still running.
return Poll::Pending;
}
} }
Poll::Ready(Ok(Ok(_))) => { Poll::Ready(Ok(Ok(_))) => {
// Heartbeat task stopped unexpectedly, without panic or error. // Heartbeat task stopped unexpectedly, without panic or error.
@ -459,6 +470,9 @@ impl Client {
) )
} }
Poll::Ready(Err(error)) => { Poll::Ready(Err(error)) => {
// Heartbeat task panicked.
let error = error.panic_if_task_has_panicked();
// Heartbeat task was cancelled. // Heartbeat task was cancelled.
if error.is_cancelled() { if error.is_cancelled() {
self.set_task_exited_error( self.set_task_exited_error(
@ -466,11 +480,7 @@ impl Client {
PeerError::HeartbeatTaskExited("Task was cancelled".to_string()), PeerError::HeartbeatTaskExited("Task was cancelled".to_string()),
) )
} }
// Heartbeat task stopped with panic. // Heartbeat task stopped with another kind of task error.
else if error.is_panic() {
panic!("heartbeat task has panicked: {error}");
}
// Heartbeat task stopped with error.
else { else {
self.set_task_exited_error( self.set_task_exited_error(
"heartbeat", "heartbeat",
@ -478,25 +488,48 @@ impl Client {
) )
} }
} }
} };
Poll::Ready(result)
} }
/// Check if the connection's task has exited. /// Check if the connection's request/response task has exited.
fn check_connection(&mut self, context: &mut Context<'_>) -> Result<(), SharedPeerError> { ///
match self.connection_task.poll_unpin(context) { /// Returns an error if the connection task exited. Otherwise, schedules the client task for
Poll::Pending => { /// wakeup when the connection task finishes, and returns `Pending`.
// Connection task is still running. ///
Ok(()) /// # Panics
} ///
Poll::Ready(Ok(())) => { /// If the connection task panicked.
fn poll_connection(&mut self, context: &mut Context<'_>) -> Poll<Result<(), SharedPeerError>> {
// Return `Pending` if the connection task is still running.
let result = match ready!(self.connection_task.poll_unpin(context)) {
Ok(()) => {
// Connection task stopped unexpectedly, without panicking. // Connection task stopped unexpectedly, without panicking.
self.set_task_exited_error("connection", PeerError::ConnectionTaskExited) self.set_task_exited_error("connection", PeerError::ConnectionTaskExited)
} }
Poll::Ready(Err(error)) => { Err(error) => {
// Connection task stopped unexpectedly with a panic. // Connection task panicked.
panic!("connection task has panicked: {error}"); let error = error.panic_if_task_has_panicked();
// Connection task was cancelled.
if error.is_cancelled() {
self.set_task_exited_error(
"connection",
PeerError::HeartbeatTaskExited("Task was cancelled".to_string()),
)
}
// Connection task stopped with another kind of task error.
else {
self.set_task_exited_error(
"connection",
PeerError::HeartbeatTaskExited(error.to_string()),
)
}
} }
} };
Poll::Ready(result)
} }
/// Properly update the error slot after a background task has unexpectedly stopped. /// Properly update the error slot after a background task has unexpectedly stopped.
@ -522,6 +555,9 @@ impl Client {
} }
/// Poll for space in the shared request sender channel. /// Poll for space in the shared request sender channel.
///
/// Returns an error if the sender channel is closed. If there is no space in the channel,
/// returns `Pending`, and schedules the task for wakeup when there is space available.
fn poll_request(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SharedPeerError>> { fn poll_request(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SharedPeerError>> {
let server_result = ready!(self.server_tx.poll_ready(cx)); let server_result = ready!(self.server_tx.poll_ready(cx));
if server_result.is_err() { if server_result.is_err() {
@ -536,6 +572,33 @@ impl Client {
} }
} }
/// Poll for space in the shared request sender channel, and for errors in the connection tasks.
///
/// Returns an error if the sender channel is closed, or the heartbeat or connection tasks have
/// terminated. If there is no space in the channel, returns `Pending`, and schedules the task
/// for wakeup when there is space available, or one of the tasks terminates.
fn poll_client(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SharedPeerError>> {
// # Correctness
//
// The current task must be scheduled for wakeup every time we return
// `Poll::Pending`.
//
// `poll_heartbeat()` and `poll_connection()` schedule the client task for wakeup
// if either task exits, or if the heartbeat task drops the cancel handle.
//
//`ready!` returns `Poll::Pending` when `server_tx` is unready, and
// schedules this task for wakeup.
//
// It's ok to exit early and skip wakeups when there is an error, because the connection
// and its tasks are shut down immediately on error.
let _heartbeat_pending: Poll<()> = self.poll_heartbeat(cx)?;
let _connection_pending: Poll<()> = self.poll_connection(cx)?;
// We're only pending if the sender channel is full.
self.poll_request(cx)
}
/// Shut down the resources held by the client half of this peer connection. /// Shut down the resources held by the client half of this peer connection.
/// ///
/// Stops further requests to the remote peer, and stops the heartbeat task. /// Stops further requests to the remote peer, and stops the heartbeat task.
@ -566,20 +629,16 @@ impl Service<Request> for Client {
// The current task must be scheduled for wakeup every time we return // The current task must be scheduled for wakeup every time we return
// `Poll::Pending`. // `Poll::Pending`.
// //
// `check_heartbeat` and `check_connection` schedule the client task for wakeup // `poll_client()` schedules the client task for wakeup if the sender channel has space,
// if either task exits, or if the heartbeat task drops the cancel handle. // either connection task exits, or if the heartbeat task drops the cancel handle.
// Check all the tasks and channels.
// //
//`ready!` returns `Poll::Pending` when `server_tx` is unready, and //`ready!` returns `Poll::Pending` when `server_tx` is unready, and
// schedules this task for wakeup. // schedules this task for wakeup.
let result = ready!(self.poll_client(cx));
let mut result = self // Shut down the client and connection if there is an error.
.check_heartbeat(cx)
.and_then(|()| self.check_connection(cx));
if result.is_ok() {
result = ready!(self.poll_request(cx));
}
if let Err(error) = result { if let Err(error) = result {
self.shutdown(); self.shutdown();

View File

@ -82,6 +82,10 @@ pub enum PeerError {
#[error("Internal services over capacity")] #[error("Internal services over capacity")]
Overloaded, Overloaded,
/// There are no ready remote peers.
#[error("No ready peers available")]
NoReadyPeers,
/// This peer request's caused an internal service timeout, so the connection was dropped /// This peer request's caused an internal service timeout, so the connection was dropped
/// to shed load or prevent attacks. /// to shed load or prevent attacks.
#[error("Internal services timed out")] #[error("Internal services timed out")]
@ -147,6 +151,7 @@ impl PeerError {
PeerError::Serialization(inner) => format!("Serialization({inner})").into(), PeerError::Serialization(inner) => format!("Serialization({inner})").into(),
PeerError::DuplicateHandshake => "DuplicateHandshake".into(), PeerError::DuplicateHandshake => "DuplicateHandshake".into(),
PeerError::Overloaded => "Overloaded".into(), PeerError::Overloaded => "Overloaded".into(),
PeerError::NoReadyPeers => "NoReadyPeers".into(),
PeerError::InboundTimeout => "InboundTimeout".into(), PeerError::InboundTimeout => "InboundTimeout".into(),
PeerError::ServiceShutdown => "ServiceShutdown".into(), PeerError::ServiceShutdown => "ServiceShutdown".into(),
PeerError::NotFoundResponse(_) => "NotFoundResponse".into(), PeerError::NotFoundResponse(_) => "NotFoundResponse".into(),

View File

@ -20,6 +20,7 @@ use crate::{
/// A client service wrapper that keeps track of its load. /// A client service wrapper that keeps track of its load.
/// ///
/// It also keeps track of the peer's reported protocol version. /// It also keeps track of the peer's reported protocol version.
#[derive(Debug)]
pub struct LoadTrackedClient { pub struct LoadTrackedClient {
/// A service representing a connected peer, wrapped in a load tracker. /// A service representing a connected peer, wrapped in a load tracker.
service: PeakEwma<Client>, service: PeakEwma<Client>,

View File

@ -275,22 +275,30 @@ impl InventoryRegistry {
self.current.iter().chain(self.prev.iter()) self.current.iter().chain(self.prev.iter())
} }
/// Returns a future that polls once for new registry updates. /// Returns a future that waits for new registry updates.
#[allow(dead_code)] #[allow(dead_code)]
pub fn update(&mut self) -> Update { pub fn update(&mut self) -> Update {
Update::new(self) Update::new(self)
} }
/// Drive periodic inventory tasks /// Drive periodic inventory tasks.
/// ///
/// # Details /// Rotates the inventory HashMaps on every timer tick.
/// Drains the inv_stream channel and registers all advertised inventory.
/// ///
/// - rotates HashMaps based on interval events /// Returns an error if the inventory channel is closed.
/// - drains the inv_stream channel and registers all advertised inventory ///
pub fn poll_inventory(&mut self, cx: &mut Context<'_>) -> Result<(), BoxError> { /// Otherwise, returns `Ok` if it performed at least one update or rotation, or `Poll::Pending`
/// if there was no inventory change. Always registers a wakeup for the next inventory update
/// or rotation, even when it returns `Ok`.
pub fn poll_inventory(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
let mut result = Poll::Pending;
// # Correctness // # Correctness
// //
// Registers the current task for wakeup when the timer next becomes ready. // Registers the current task for wakeup when the timer next becomes ready.
// (But doesn't return, because we also want to register the task for wakeup when more
// inventory arrives.)
// //
// # Security // # Security
// //
@ -302,6 +310,7 @@ impl InventoryRegistry {
// two interval ticks are delayed. // two interval ticks are delayed.
if Pin::new(&mut self.interval).poll_next(cx).is_ready() { if Pin::new(&mut self.interval).poll_next(cx).is_ready() {
self.rotate(); self.rotate();
result = Poll::Ready(Ok(()));
} }
// This module uses a broadcast channel instead of an mpsc channel, even // This module uses a broadcast channel instead of an mpsc channel, even
@ -320,24 +329,38 @@ impl InventoryRegistry {
// rather than propagating it through the peer set's Service::poll_ready // rather than propagating it through the peer set's Service::poll_ready
// implementation, where reporting a failure means reporting a permanent // implementation, where reporting a failure means reporting a permanent
// failure of the peer set. // failure of the peer set.
while let Poll::Ready(channel_result) = self.inv_stream.next().poll_unpin(cx) {
// Returns Pending if all messages are processed, but the channel could get more.
loop {
let channel_result = self.inv_stream.next().poll_unpin(cx);
match channel_result { match channel_result {
Some(Ok(change)) => self.register(change), Poll::Ready(Some(Ok(change))) => {
Some(Err(BroadcastStreamRecvError::Lagged(count))) => { self.register(change);
result = Poll::Ready(Ok(()));
}
Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(count)))) => {
// This isn't a fatal inventory error, it's expected behaviour when Zebra is
// under load from peers.
metrics::counter!("pool.inventory.dropped", 1); metrics::counter!("pool.inventory.dropped", 1);
metrics::counter!("pool.inventory.dropped.messages", count); metrics::counter!("pool.inventory.dropped.messages", count);
// If this message happens a lot, we should improve inventory registry performance, // If this message happens a lot, we should improve inventory registry
// or poll the registry or peer set in a separate task. // performance, or poll the registry or peer set in a separate task.
info!(count, "dropped lagged inventory advertisements"); info!(count, "dropped lagged inventory advertisements");
} }
// This indicates all senders, including the one in the handshaker, Poll::Ready(None) => {
// have been dropped, which really is a permanent failure. // If the channel is empty and returns None, all senders, including the one in
None => return Err(broadcast::error::RecvError::Closed.into()), // the handshaker, have been dropped, which really is a permanent failure.
result = Poll::Ready(Err(broadcast::error::RecvError::Closed.into()));
}
Poll::Pending => {
break;
}
} }
} }
Ok(()) result
} }
/// Record the given inventory `change` for the peer `addr`. /// Record the given inventory `change` for the peer `addr`.

View File

@ -1,7 +1,11 @@
//! Randomised property tests for the inventory registry. //! Randomised property tests for the inventory registry.
use std::collections::HashSet; use std::{
collections::HashSet,
task::{Context, Poll},
};
use futures::task::noop_waker;
use proptest::prelude::*; use proptest::prelude::*;
use crate::{ use crate::{
@ -81,10 +85,12 @@ async fn inv_registry_inbound_wrapper_with(
forwarded_msg.expect("unexpected forwarded error result"), forwarded_msg.expect("unexpected forwarded error result"),
); );
inv_registry // We don't actually care if the registry takes any action here.
.update() let waker = noop_waker();
.await let mut cx = Context::from_waker(&waker);
.expect("unexpected dropped registry sender channel"); let _poll_pending_or_ok: Poll<()> = inv_registry
.poll_inventory(&mut cx)
.map(|result| result.expect("unexpected error polling inventory"));
let test_peer = test_peer let test_peer = test_peer
.get_transient_addr() .get_transient_addr()

View File

@ -19,8 +19,11 @@ pub struct Update<'a> {
impl Unpin for Update<'_> {} impl Unpin for Update<'_> {}
impl<'a> Update<'a> { impl<'a> Update<'a> {
#[allow(dead_code)] /// Returns a new future that returns when the next inventory update or rotation has been
pub(super) fn new(registry: &'a mut InventoryRegistry) -> Self { /// completed by `registry`.
///
/// See [`InventoryRegistry::poll_inventory()`] for details.
pub fn new(registry: &'a mut InventoryRegistry) -> Self {
Self { registry } Self { registry }
} }
} }
@ -28,9 +31,10 @@ impl<'a> Update<'a> {
impl Future for Update<'_> { impl Future for Update<'_> {
type Output = Result<(), BoxError>; type Output = Result<(), BoxError>;
/// A future that returns when the next inventory update or rotation has been completed.
///
/// See [`InventoryRegistry::poll_inventory()`] for details.
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// TODO: should the future wait until new changes arrive? self.registry.poll_inventory(cx)
// or for the rotation timer?
Poll::Ready(self.registry.poll_inventory(cx))
} }
} }

View File

@ -97,7 +97,6 @@ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
convert, convert,
fmt::Debug, fmt::Debug,
future::Future,
marker::PhantomData, marker::PhantomData,
net::IpAddr, net::IpAddr,
pin::Pin, pin::Pin,
@ -107,14 +106,15 @@ use std::{
use futures::{ use futures::{
channel::{mpsc, oneshot}, channel::{mpsc, oneshot},
future::{FutureExt, TryFutureExt}, future::{Future, FutureExt, TryFutureExt},
prelude::*, prelude::*,
stream::FuturesUnordered, stream::FuturesUnordered,
task::noop_waker,
}; };
use itertools::Itertools; use itertools::Itertools;
use num_integer::div_ceil; use num_integer::div_ceil;
use tokio::{ use tokio::{
sync::{broadcast, oneshot::error::TryRecvError, watch}, sync::{broadcast, watch},
task::JoinHandle, task::JoinHandle,
}; };
use tower::{ use tower::{
@ -191,17 +191,6 @@ where
// Request Routing // Request Routing
// //
/// A preselected ready service.
///
/// # Correctness
///
/// If this is `Some(addr)`, `addr` must be a key for a peer in `ready_services`.
/// If that peer is removed from `ready_services`, we must set the preselected peer to `None`.
///
/// This is handled by [`PeerSet::take_ready_service`] and
/// [`PeerSet::disconnect_from_outdated_peers`].
preselected_p2c_peer: Option<D::Key>,
/// Stores gossiped inventory hashes from connected peers. /// Stores gossiped inventory hashes from connected peers.
/// ///
/// Used to route inventory requests to peers that are likely to have it. /// Used to route inventory requests to peers that are likely to have it.
@ -265,7 +254,11 @@ where
C: ChainTip, C: ChainTip,
{ {
fn drop(&mut self) { fn drop(&mut self) {
self.shut_down_tasks_and_channels() // We don't have access to the current task (if any), so we just drop everything we can.
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
self.shut_down_tasks_and_channels(&mut cx);
} }
} }
@ -310,7 +303,6 @@ where
// Ready peers // Ready peers
ready_services: HashMap::new(), ready_services: HashMap::new(),
// Request Routing // Request Routing
preselected_p2c_peer: None,
inventory_registry: InventoryRegistry::new(inv_stream), inventory_registry: InventoryRegistry::new(inv_stream),
// Busy peers // Busy peers
@ -335,52 +327,51 @@ where
/// Check background task handles to make sure they're still running. /// Check background task handles to make sure they're still running.
/// ///
/// Never returns `Ok`.
///
/// If any background task exits, shuts down all other background tasks, /// If any background task exits, shuts down all other background tasks,
/// and returns an error. /// and returns an error. Otherwise, returns `Pending`, and registers a wakeup for
fn poll_background_errors(&mut self, cx: &mut Context) -> Result<(), BoxError> { /// receiving the background tasks, or the background tasks exiting.
if let Some(result) = self.receive_tasks_if_needed() { fn poll_background_errors(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
return result; futures::ready!(self.receive_tasks_if_needed(cx))?;
}
match Pin::new(&mut self.guards).poll_next(cx) { // Return Pending if all background tasks are still running.
// All background tasks are still running. match futures::ready!(Pin::new(&mut self.guards).poll_next(cx)) {
Poll::Pending => Ok(()), Some(res) => {
Poll::Ready(Some(res)) => {
info!( info!(
background_tasks = %self.guards.len(), background_tasks = %self.guards.len(),
"a peer set background task exited, shutting down other peer set tasks" "a peer set background task exited, shutting down other peer set tasks"
); );
self.shut_down_tasks_and_channels(); self.shut_down_tasks_and_channels(cx);
// Flatten the join result and inner result, // Flatten the join result and inner result, and return any errors.
// then turn Ok() task exits into errors.
res.map_err(Into::into) res.map_err(Into::into)
// TODO: replace with Result::flatten when it stabilises (#70142) // TODO: replace with Result::flatten when it stabilises (#70142)
.and_then(convert::identity) .and_then(convert::identity)?;
.and(Err("a peer set background task exited".into()))
// Turn Ok() task exits into errors.
Poll::Ready(Err("a peer set background task exited".into()))
} }
Poll::Ready(None) => { None => {
self.shut_down_tasks_and_channels(); self.shut_down_tasks_and_channels(cx);
Err("all peer set background tasks have exited".into()) Poll::Ready(Err("all peer set background tasks have exited".into()))
} }
} }
} }
/// Receive background tasks, if they've been sent on the channel, /// Receive background tasks, if they've been sent on the channel, but not consumed yet.
/// but not consumed yet.
/// ///
/// Returns a result representing the current task state, /// Returns a result representing the current task state, or `Poll::Pending` if the background
/// or `None` if the background tasks should be polled to check their state. /// tasks should be polled again to check their state.
fn receive_tasks_if_needed(&mut self) -> Option<Result<(), BoxError>> { fn receive_tasks_if_needed(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
if self.guards.is_empty() { if self.guards.is_empty() {
match self.handle_rx.try_recv() { // Return Pending if the tasks have not been sent yet.
// The tasks haven't been sent yet. let handles = futures::ready!(Pin::new(&mut self.handle_rx).poll(cx));
Err(TryRecvError::Empty) => Some(Ok(())),
// The tasks have been sent, but not consumed. match handles {
// The tasks have been sent, but not consumed yet.
Ok(handles) => { Ok(handles) => {
// Currently, the peer set treats an empty background task set as an error. // Currently, the peer set treats an empty background task set as an error.
// //
@ -393,21 +384,16 @@ where
self.guards.extend(handles); self.guards.extend(handles);
None Poll::Ready(Ok(()))
} }
// The tasks have been sent and consumed, but then they exited. // The sender was dropped without sending the tasks.
// Err(_) => Poll::Ready(Err(
// Correctness: the peer set must receive at least one task. "sender did not send peer background tasks before it was dropped".into(),
// )),
// TODO: refactor `handle_rx` and `guards` into an enum
// for the background task state: Waiting/Running/Shutdown.
Err(TryRecvError::Closed) => {
Some(Err("all peer set background tasks have exited".into()))
}
} }
} else { } else {
None Poll::Ready(Ok(()))
} }
} }
@ -415,9 +401,8 @@ where
/// - services by dropping the service lists /// - services by dropping the service lists
/// - background tasks via their join handles or cancel handles /// - background tasks via their join handles or cancel handles
/// - channels by closing the channel /// - channels by closing the channel
fn shut_down_tasks_and_channels(&mut self) { fn shut_down_tasks_and_channels(&mut self, cx: &mut Context<'_>) {
// Drop services and cancel their background tasks. // Drop services and cancel their background tasks.
self.preselected_p2c_peer = None;
self.ready_services = HashMap::new(); self.ready_services = HashMap::new();
for (_peer_key, handle) in self.cancel_handles.drain() { for (_peer_key, handle) in self.cancel_handles.drain() {
@ -429,9 +414,9 @@ where
// so we don't add more peers to a shut down peer set. // so we don't add more peers to a shut down peer set.
self.demand_signal.close_channel(); self.demand_signal.close_channel();
// Shut down background tasks. // Shut down background tasks, ignoring pending polls.
self.handle_rx.close(); self.handle_rx.close();
self.receive_tasks_if_needed(); let _ = self.receive_tasks_if_needed(cx);
for guard in self.guards.iter() { for guard in self.guards.iter() {
guard.abort(); guard.abort();
} }
@ -441,25 +426,63 @@ where
/// ///
/// Move newly ready services to the ready list if they are for peers with supported protocol /// Move newly ready services to the ready list if they are for peers with supported protocol
/// versions, otherwise they are dropped. Also drop failed services. /// versions, otherwise they are dropped. Also drop failed services.
fn poll_unready(&mut self, cx: &mut Context<'_>) { ///
/// Never returns an error.
///
/// Returns `Ok(Some(())` if at least one peer became ready, `Poll::Pending` if there are
/// unready peers, but none became ready, and `Ok(None)` if the unready peers were empty.
///
/// If there are any remaining unready peers, registers a wakeup for the next time one becomes
/// ready. If there are no unready peers, doesn't register any wakeups. (Since wakeups come
/// from peers, there needs to be at least one peer to register a wakeup.)
fn poll_unready(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<()>, BoxError>> {
let mut result = Poll::Pending;
// # Correctness
//
// `poll_next()` must always be called, because `self.unready_services` could have been
// empty before the call to `self.poll_ready()`.
//
// > When new futures are added, `poll_next` must be called in order to begin receiving
// > wake-ups for new futures.
//
// <https://docs.rs/futures/latest/futures/stream/futures_unordered/struct.FuturesUnordered.html>
//
// Returns Pending if we've finished processing the unready service changes,
// but there are still some unready services.
loop { loop {
match Pin::new(&mut self.unready_services).poll_next(cx) { // No ready peers left, but there are some unready peers pending.
// No unready service changes, or empty unready services let Poll::Ready(ready_peer) = Pin::new(&mut self.unready_services).poll_next(cx) else {
Poll::Pending | Poll::Ready(None) => return, break;
};
match ready_peer {
// No unready peers in the list.
None => {
// If we've finished processing the unready service changes, and there are no
// unready services left, it doesn't make sense to return Pending, because
// their stream is terminated. But when we add more unready peers and call
// `poll_next()`, its termination status will be reset, and it will receive
// wakeups again.
if result.is_pending() {
result = Poll::Ready(Ok(None));
}
break;
}
// Unready -> Ready // Unready -> Ready
Poll::Ready(Some(Ok((key, svc)))) => { Some(Ok((key, svc))) => {
trace!(?key, "service became ready"); trace!(?key, "service became ready");
let cancel = self.cancel_handles.remove(&key);
assert!(cancel.is_some(), "missing cancel handle");
if svc.remote_version() >= self.minimum_peer_version.current() { self.push_ready(true, key, svc);
self.ready_services.insert(key, svc);
} // Return Ok if at least one peer became ready.
result = Poll::Ready(Ok(Some(())));
} }
// Unready -> Canceled // Unready -> Canceled
Poll::Ready(Some(Err((key, UnreadyError::Canceled)))) => { Some(Err((key, UnreadyError::Canceled))) => {
// A service be canceled because we've connected to the same service twice. // A service be canceled because we've connected to the same service twice.
// In that case, there is a cancel handle for the peer address, // In that case, there is a cancel handle for the peer address,
// but it belongs to the service for the newer connection. // but it belongs to the service for the newer connection.
@ -469,7 +492,7 @@ where
"service was canceled, dropping service" "service was canceled, dropping service"
); );
} }
Poll::Ready(Some(Err((key, UnreadyError::CancelHandleDropped(_))))) => { Some(Err((key, UnreadyError::CancelHandleDropped(_)))) => {
// Similarly, services with dropped cancel handes can have duplicates. // Similarly, services with dropped cancel handes can have duplicates.
trace!( trace!(
?key, ?key,
@ -479,7 +502,7 @@ where
} }
// Unready -> Errored // Unready -> Errored
Poll::Ready(Some(Err((key, UnreadyError::Inner(error))))) => { Some(Err((key, UnreadyError::Inner(error)))) => {
debug!(%error, "service failed while unready, dropping service"); debug!(%error, "service failed while unready, dropping service");
let cancel = self.cancel_handles.remove(&key); let cancel = self.cancel_handles.remove(&key);
@ -487,6 +510,57 @@ where
} }
} }
} }
result
}
/// Checks previously ready peer services for errors.
///
/// The only way these peer `Client`s can become unready is when we send them a request,
/// because the peer set has exclusive access to send requests to each peer. (If an inbound
/// request is in progress, it will be handled, then our request will be sent by the connection
/// task.)
///
/// Returns `Poll::Ready` if there are some ready peers, and `Poll::Pending` if there are no
/// ready peers. Registers a wakeup if any peer has failed due to a disconnection, hang, or protocol error.
///
/// # Panics
///
/// If any peers somehow became unready without being sent a request. This indicates a bug in the peer set, where requests
/// are sent to peers without putting them in `unready_peers`.
fn poll_ready_peer_errors(&mut self, cx: &mut Context<'_>) -> Poll<()> {
let mut previous = HashMap::new();
std::mem::swap(&mut previous, &mut self.ready_services);
// TODO: consider only checking some peers each poll (for performance reasons),
// but make sure we eventually check all of them.
for (key, mut svc) in previous.drain() {
let Poll::Ready(peer_readiness) = Pin::new(&mut svc).poll_ready(cx) else {
unreachable!(
"unexpected unready peer: peers must be put into the unready_peers list \
after sending them a request"
);
};
match peer_readiness {
// Still ready, add it back to the list.
Ok(()) => self.push_ready(false, key, svc),
// Ready -> Errored
Err(error) => {
debug!(%error, "service failed while ready, dropping service");
// Ready services can just be dropped, they don't need any cleanup.
std::mem::drop(svc);
}
}
}
if self.ready_services.is_empty() {
Poll::Pending
} else {
Poll::Ready(())
}
} }
/// Returns the number of peer connections Zebra already has with /// Returns the number of peer connections Zebra already has with
@ -509,17 +583,35 @@ where
self.ready_services.contains_key(&addr) || self.cancel_handles.contains_key(&addr) self.ready_services.contains_key(&addr) || self.cancel_handles.contains_key(&addr)
} }
/// Checks for newly inserted or removed services. /// Processes the entire list of newly inserted or removed services.
/// ///
/// Puts inserted services in the unready list. /// Puts inserted services in the unready list.
/// Drops removed services, after cancelling any pending requests. /// Drops removed services, after cancelling any pending requests.
///
/// If the peer connector channel is closed, returns an error.
///
/// Otherwise, returns `Ok` if it discovered at least one peer, or `Poll::Pending` if it didn't
/// discover any peers. Always registers a wakeup for new peers, even when it returns `Ok`.
fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> { fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
use futures::ready; // Return pending if there are no peers in the list.
let mut result = Poll::Pending;
loop { loop {
match ready!(Pin::new(&mut self.discover).poll_discover(cx)) // If we've emptied the list, finish looping, otherwise process the new peer.
let Poll::Ready(discovered) = Pin::new(&mut self.discover).poll_discover(cx) else {
break;
};
// If the change channel has a permanent error, return that error.
let change = discovered
.ok_or("discovery stream closed")? .ok_or("discovery stream closed")?
.map_err(Into::into)? .map_err(Into::into)?;
{
// Otherwise we have successfully processed a peer.
result = Poll::Ready(Ok(()));
// Process each change.
match change {
Change::Remove(key) => { Change::Remove(key) => {
trace!(?key, "got Change::Remove from Discover"); trace!(?key, "got Change::Remove from Discover");
self.remove(&key); self.remove(&key);
@ -552,32 +644,22 @@ where
} }
} }
} }
result
} }
/// Checks if the minimum peer version has changed, and disconnects from outdated peers. /// Checks if the minimum peer version has changed, and disconnects from outdated peers.
fn disconnect_from_outdated_peers(&mut self) { fn disconnect_from_outdated_peers(&mut self) {
if let Some(minimum_version) = self.minimum_peer_version.changed() { if let Some(minimum_version) = self.minimum_peer_version.changed() {
self.ready_services.retain(|address, peer| { // It is ok to drop ready services, they don't need anything cancelled.
if peer.remote_version() >= minimum_version { self.ready_services
true .retain(|_address, peer| peer.remote_version() >= minimum_version);
} else {
if self.preselected_p2c_peer == Some(*address) {
self.preselected_p2c_peer = None;
}
false
}
});
} }
} }
/// Takes a ready service by key, invalidating `preselected_p2c_peer` if needed. /// Takes a ready service by key.
fn take_ready_service(&mut self, key: &D::Key) -> Option<D::Service> { fn take_ready_service(&mut self, key: &D::Key) -> Option<D::Service> {
if let Some(svc) = self.ready_services.remove(key) { if let Some(svc) = self.ready_services.remove(key) {
if Some(*key) == self.preselected_p2c_peer {
self.preselected_p2c_peer = None;
}
assert!( assert!(
!self.cancel_handles.contains_key(key), !self.cancel_handles.contains_key(key),
"cancel handles are only used for unready service work" "cancel handles are only used for unready service work"
@ -605,6 +687,25 @@ where
} }
} }
/// Adds a ready service to the ready list if it's for a peer with a supported version.
/// If `was_unready` is true, also removes the peer's cancel handle.
///
/// If the service is for a connection to an outdated peer, the service is dropped.
fn push_ready(&mut self, was_unready: bool, key: D::Key, svc: D::Service) {
let cancel = self.cancel_handles.remove(&key);
assert_eq!(
cancel.is_some(),
was_unready,
"missing or unexpected cancel handle"
);
if svc.remote_version() >= self.minimum_peer_version.current() {
self.ready_services.insert(key, svc);
} else {
std::mem::drop(svc);
}
}
/// Adds a busy service to the unready list if it's for a peer with a supported version, /// Adds a busy service to the unready list if it's for a peer with a supported version,
/// and adds a cancel handle for the service's current request. /// and adds a cancel handle for the service's current request.
/// ///
@ -631,7 +732,7 @@ where
} }
/// Performs P2C on `self.ready_services` to randomly select a less-loaded ready service. /// Performs P2C on `self.ready_services` to randomly select a less-loaded ready service.
fn preselect_p2c_peer(&self) -> Option<D::Key> { fn select_ready_p2c_peer(&self) -> Option<D::Key> {
self.select_p2c_peer_from_list(&self.ready_services.keys().copied().collect()) self.select_p2c_peer_from_list(&self.ready_services.keys().copied().collect())
} }
@ -647,8 +748,7 @@ where
.expect("just checked there is one service"), .expect("just checked there is one service"),
), ),
len => { len => {
// If there are only 2 peers, randomise their order. // Choose 2 random peers, then return the least loaded of those 2 peers.
// Otherwise, choose 2 random peers in a random order.
let (a, b) = { let (a, b) = {
let idxs = rand::seq::index::sample(&mut rand::thread_rng(), len, 2); let idxs = rand::seq::index::sample(&mut rand::thread_rng(), len, 2);
let a = idxs.index(0); let a = idxs.index(0);
@ -708,19 +808,32 @@ where
/// Routes a request using P2C load-balancing. /// Routes a request using P2C load-balancing.
fn route_p2c(&mut self, req: Request) -> <Self as tower::Service<Request>>::Future { fn route_p2c(&mut self, req: Request) -> <Self as tower::Service<Request>>::Future {
let preselected_key = self if let Some(p2c_key) = self.select_ready_p2c_peer() {
.preselected_p2c_peer tracing::trace!(?p2c_key, "routing based on p2c");
.expect("ready peer service must have a preselected peer");
tracing::trace!(?preselected_key, "routing based on p2c"); let mut svc = self
.take_ready_service(&p2c_key)
.expect("selected peer must be ready");
let mut svc = self let fut = svc.call(req);
.take_ready_service(&preselected_key) self.push_unready(p2c_key, svc);
.expect("ready peer set must have preselected a ready peer");
let fut = svc.call(req); return fut.map_err(Into::into).boxed();
self.push_unready(preselected_key, svc); }
fut.map_err(Into::into).boxed()
async move {
// Let other tasks run, so a retry request might get different ready peers.
tokio::task::yield_now().await;
// # Security
//
// Avoid routing requests to peers that are missing inventory.
// If we kept trying doomed requests, peers that are missing our requested inventory
// could take up a large amount of our bandwidth and retry limits.
Err(SharedPeerError::from(PeerError::NoReadyPeers))
}
.map_err(Into::into)
.boxed()
} }
/// Tries to route a request to a ready peer that advertised that inventory, /// Tries to route a request to a ready peer that advertised that inventory,
@ -995,78 +1108,71 @@ where
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_background_errors(cx)?; // Update service and peer statuses.
//
// # Correctness
//
// All of the futures that receive a context from this method can wake the peer set buffer
// task. If there are no ready peers, and no new peers, network requests will pause until:
// - an unready peer becomes ready, or
// - a new peer arrives.
// Update peer statuses // Check for new peers, and register a task wakeup when the next new peers arrive. New peers
let _ = self.poll_discover(cx)?; // can be infrequent if our connection slots are full, or we're connected to all
// available/useful peers.
let _poll_pending_or_ready: Poll<()> = self.poll_discover(cx)?;
// These tasks don't provide new peers or newly ready peers.
let _poll_pending: Poll<()> = self.poll_background_errors(cx)?;
let _poll_pending_or_ready: Poll<()> = self.inventory_registry.poll_inventory(cx)?;
// Check for newly ready peers, including newly added peers (which are added as unready).
// So it needs to run after `poll_discover()`. Registers a wakeup if there are any unready
// peers.
//
// Each connected peer should become ready within a few minutes, or timeout, close the
// connection, and release its connection slot.
//
// TODO: drop peers that overload us with inbound messages and never become ready (#7822)
let _poll_pending_or_ready: Poll<Option<()>> = self.poll_unready(cx)?;
// Cleanup and metrics.
// Only checks the versions of ready peers, so it needs to run after `poll_unready()`.
self.disconnect_from_outdated_peers(); self.disconnect_from_outdated_peers();
self.inventory_registry.poll_inventory(cx)?;
self.poll_unready(cx);
// These metrics should run last, to report the most up-to-date information.
self.log_peer_set_size(); self.log_peer_set_size();
self.update_metrics(); self.update_metrics();
loop { // Check for failures in ready peers, removing newly errored or disconnected peers.
// Re-check that the pre-selected service is ready, in case // So it needs to run after `poll_unready()`.
// something has happened since (e.g., it failed, peer closed let ready_peers: Poll<()> = self.poll_ready_peer_errors(cx);
// connection, ...)
if let Some(key) = self.preselected_p2c_peer {
trace!(preselected_key = ?key);
let mut service = self
.take_ready_service(&key)
.expect("preselected peer must be in the ready list");
match service.poll_ready(cx) {
Poll::Ready(Ok(())) => {
trace!("preselected service is still ready, keeping it selected");
self.preselected_p2c_peer = Some(key);
self.ready_services.insert(key, service);
return Poll::Ready(Ok(()));
}
Poll::Pending => {
trace!("preselected service is no longer ready, moving to unready list");
self.push_unready(key, service);
}
Poll::Ready(Err(error)) => {
trace!(%error, "preselected service failed, dropping it");
std::mem::drop(service);
}
}
}
trace!("preselected service was not ready, preselecting another ready service"); if ready_peers.is_pending() {
self.preselected_p2c_peer = self.preselect_p2c_peer(); // # Correctness
self.update_metrics(); //
// If the channel is full, drop the demand signal rather than waiting. If we waited
// here, the crawler could deadlock sending a request to fetch more peers, because it
// also empties the channel.
trace!("no ready services, sending demand signal");
let _ = self.demand_signal.try_send(MorePeers);
if self.preselected_p2c_peer.is_none() { // # Correctness
// CORRECTNESS //
// // The current task must be scheduled for wakeup every time we return `Poll::Pending`.
// If the channel is full, drop the demand signal rather than waiting. //
// If we waited here, the crawler could deadlock sending a request to // As long as there are unready or new peers, this task will run, because:
// fetch more peers, because it also empties the channel. // - `poll_discover` schedules this task for wakeup when new peers arrive.
trace!("no ready services, sending demand signal"); // - if there are unready peers, `poll_unready` or `poll_ready_peers` schedule this
let _ = self.demand_signal.try_send(MorePeers); // task for wakeup when peer services become ready.
//
// CORRECTNESS // To avoid peers blocking on a full peer status/error channel:
// // - `poll_background_errors` schedules this task for wakeup when the peer status
// The current task must be scheduled for wakeup every time we // update task exits.
// return `Poll::Pending`. Poll::Pending
// } else {
// As long as there are unready or new peers, this task will run, Poll::Ready(Ok(()))
// because:
// - `poll_discover` schedules this task for wakeup when new
// peers arrive.
// - if there are unready peers, `poll_unready` schedules this
// task for wakeup when peer services become ready.
// - if the preselected peer is not ready, `service.poll_ready`
// schedules this task for wakeup when that service becomes
// ready.
//
// To avoid peers blocking on a full background error channel:
// - if no background tasks have exited since the last poll,
// `poll_background_errors` schedules this task for wakeup when
// the next task exits.
return Poll::Pending;
}
} }
} }

View File

@ -10,7 +10,6 @@ use zebra_chain::{
parameters::{Network, NetworkUpgrade}, parameters::{Network, NetworkUpgrade},
}; };
use super::{PeerSetBuilder, PeerVersions};
use crate::{ use crate::{
constants::DEFAULT_MAX_CONNS_PER_IP, constants::DEFAULT_MAX_CONNS_PER_IP,
peer::{ClientRequest, MinimumPeerVersion}, peer::{ClientRequest, MinimumPeerVersion},
@ -19,6 +18,8 @@ use crate::{
Request, SharedPeerError, Request, SharedPeerError,
}; };
use super::{PeerSetBuilder, PeerVersions};
#[test] #[test]
fn peer_set_ready_single_connection() { fn peer_set_ready_single_connection() {
// We are going to use just one peer version in this test // We are going to use just one peer version in this test

View File

@ -883,7 +883,7 @@ impl DbFormatChangeThreadHandle {
/// ///
/// This method should be called regularly, so that panics are detected as soon as possible. /// This method should be called regularly, so that panics are detected as soon as possible.
pub fn check_for_panics(&mut self) { pub fn check_for_panics(&mut self) {
self.update_task.check_for_panics(); self.update_task.panic_if_task_has_panicked();
} }
/// Wait for the spawned thread to finish. If it exited with a panic, resume that panic. /// Wait for the spawned thread to finish. If it exited with a panic, resume that panic.