From 905b90d6a160ee2c2dc9b0e8bb777661dd14dc41 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 19 Apr 2021 15:31:50 +1000 Subject: [PATCH] Refactor and document correctness for std::sync::Mutex in ErrorSlot --- zebra-network/src/peer/connection.rs | 40 ++++++++++++++++---------- zebra-network/src/peer/error.rs | 42 ++++++++++++++++++++++++++-- 2 files changed, 66 insertions(+), 16 deletions(-) diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index fd974f06f..0d51ae5c6 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -516,6 +516,12 @@ where parent: &span, "sending an error response to a pending request on a failed connection" ); + // Correctness + // + // Error slots use a threaded `std::sync::Mutex`, so + // accessing the slot can block the async task's + // current thread. So we only hold the lock for long + // enough to get a reference to the error. let e = self .error_slot .try_get_error() @@ -532,6 +538,10 @@ where } /// Marks the peer as having failed with error `e`. + /// + /// # Panics + /// + /// If `self` has already failed with a previous error. fn fail_with(&mut self, e: E) where E: Into, @@ -541,32 +551,28 @@ where connection_state = ?self.state, client_receiver = ?self.client_rx, "failing peer service with error"); + // Update the shared error slot - let mut guard = self - .error_slot - .0 - .lock() - .expect("mutex should be unpoisoned"); - if let Some(original_error) = guard.clone() { + // + // # Correctness + // + // Error slots use a threaded `std::sync::Mutex`, so accessing the slot + // can block the async task's current thread. We only perform a single + // slot update per `Client`, and panic to enforce this constraint. + if self.error_slot.try_update_error(e).is_err() { // This panic typically happens due to these bugs: // * we mark a connection as failed without using fail_with // * we call fail_with without checking for a failed connection // state + // * we continue processing messages after calling fail_with // // See the original bug #1510 and PR #1531, and the later bug #1599 // and PR #1600. - panic!( - "calling fail_with on already-failed connection state: failed connections must stop processing pending requests and responses, then close the connection. state: {:?} original error: {:?} new error: {:?} client receiver: {:?}", + panic!("calling fail_with on already-failed connection state: failed connections must stop processing pending requests and responses, then close the connection. state: {:?} client receiver: {:?}", self.state, - original_error, - e, self.client_rx ); - } else { - *guard = Some(e); } - // Drop the guard immediately to release the mutex. - std::mem::drop(guard); // We want to close the client channel and set State::Failed so // that we can flush any pending client requests. However, we may have @@ -575,8 +581,14 @@ where self.client_rx.close(); let old_state = std::mem::replace(&mut self.state, State::Failed); if let State::AwaitingResponse { tx, .. } = old_state { + // # Correctness + // // We know the slot has Some(e) because we just set it above, // and the error slot is never unset. + // + // Accessing the error slot locks a threaded std::sync::Mutex, which + // can block the current async task thread. We briefly lock the mutex + // to get a reference to the error. let e = self.error_slot.try_get_error().unwrap(); let _ = tx.send(Err(e)); } diff --git a/zebra-network/src/peer/error.rs b/zebra-network/src/peer/error.rs index 0ce0184ba..5311e6773 100644 --- a/zebra-network/src/peer/error.rs +++ b/zebra-network/src/peer/error.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use thiserror::Error; @@ -64,10 +64,25 @@ pub enum PeerError { NotFound(Vec), } +/// A shared error slot for peer errors. +/// +/// # Correctness +/// +/// Error slots are shared between sync and async code. In async code, the error +/// mutex should be held for as short a time as possible. This avoids blocking +/// the async task thread on acquiring the mutex. #[derive(Default, Clone)] -pub(super) struct ErrorSlot(pub(super) Arc>>); +pub(super) struct ErrorSlot(Arc>>); impl ErrorSlot { + /// Read the current error in the slot. + /// + /// Returns `None` if there is no error in the slot. + /// + /// # Correctness + /// + /// Briefly locks the error slot's threaded `std::sync::Mutex`, to get a + /// reference to the error in the slot. pub fn try_get_error(&self) -> Option { self.0 .lock() @@ -75,8 +90,31 @@ impl ErrorSlot { .as_ref() .cloned() } + + /// Update the current error in the slot. + /// + /// Returns `Err(AlreadyErrored)` if there was already an error in the slot. + /// + /// # Correctness + /// + /// Briefly locks the error slot's threaded `std::sync::Mutex`, to check for + /// a previous error, then update the error in the slot. + pub fn try_update_error(&self, e: SharedPeerError) -> Result<(), AlreadyErrored> { + let mut guard = self.0.lock().expect("error mutex should be unpoisoned"); + + if let Some(original_error) = guard.clone() { + error!(?original_error, new_error = ?e, "peer connection already errored"); + Err(AlreadyErrored) + } else { + *guard = Some(e); + Ok(()) + } + } } +/// The `ErrorSlot` already contains an error. +pub struct AlreadyErrored; + /// An error during a handshake with a remote peer. #[derive(Error, Debug)] pub enum HandshakeError {