change(state): Remove `active_value` field from `ChainTipSender` (#4175)
* Refactor instrumented records in `ChainTipSender` Record the fields using a helper method. This reduces duplicate code and prepares for removing the `active_value` field from the type, because the replacement for it has to be atomically read to avoid a deadlock when reading from the `watch::Sender` endpoint. * Replace `active_value` field with sender borrow Now that Tokio has been updated to a version in which `watch::Sender` has a `borrow` method, we can remove the `active_value` field. To prevent a deadlock like the one that happened with the synchronizer some time ago, the method instrumentation was slightly refactored to have the fields recorded in helper methods that avoid obtaining a read-lock twice. * Remove some `TODO` comments They are resolved with the current changes. * Refactor to avoid obtaining current `Span` twice This is a minor optimization to remove an unnecessary second call to `tracing::Span::current()`. * Add a comment about preventing dead-locks Explain why a binding can't be created for the old tip reference. * Retry after log ssh failures in full sync test Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
parent
2423f591b0
commit
b356a67391
|
@ -9,7 +9,7 @@ use std::sync::Arc;
|
|||
|
||||
use chrono::{DateTime, Utc};
|
||||
use tokio::sync::watch;
|
||||
use tracing::instrument;
|
||||
use tracing::{field, instrument};
|
||||
|
||||
#[cfg(any(test, feature = "proptest-impl"))]
|
||||
use proptest_derive::Arbitrary;
|
||||
|
@ -120,10 +120,6 @@ pub struct ChainTipSender {
|
|||
|
||||
/// The sender channel for chain tip data.
|
||||
sender: watch::Sender<ChainTipData>,
|
||||
|
||||
/// A copy of the data in `sender`.
|
||||
// TODO: Replace with calls to `watch::Sender::borrow` once Tokio is updated to 1.0.0 (#2573)
|
||||
active_value: ChainTipData,
|
||||
}
|
||||
|
||||
impl ChainTipSender {
|
||||
|
@ -135,14 +131,13 @@ impl ChainTipSender {
|
|||
network: Network,
|
||||
) -> (Self, LatestChainTip, ChainTipChange) {
|
||||
let initial_tip = initial_tip.into();
|
||||
ChainTipSender::record_new_tip(&initial_tip);
|
||||
Self::record_new_tip(&initial_tip);
|
||||
|
||||
let (sender, receiver) = watch::channel(None);
|
||||
|
||||
let mut sender = ChainTipSender {
|
||||
use_non_finalized_tip: false,
|
||||
sender,
|
||||
active_value: None,
|
||||
};
|
||||
|
||||
let current = LatestChainTip::new(receiver);
|
||||
|
@ -156,21 +151,13 @@ impl ChainTipSender {
|
|||
/// Update the latest finalized tip.
|
||||
///
|
||||
/// May trigger an update to the best tip.
|
||||
//
|
||||
// TODO: when we replace active_value with `watch::Sender::borrow`,
|
||||
// refactor instrument to avoid multiple borrows, to prevent deadlocks
|
||||
#[instrument(
|
||||
skip(self, new_tip),
|
||||
fields(
|
||||
old_use_non_finalized_tip = ?self.use_non_finalized_tip,
|
||||
old_height = ?self.active_value.as_ref().map(|block| block.height),
|
||||
old_hash = ?self.active_value.as_ref().map(|block| block.hash),
|
||||
new_height,
|
||||
new_hash,
|
||||
))]
|
||||
fields(old_use_non_finalized_tip, old_height, old_hash, new_height, new_hash)
|
||||
)]
|
||||
pub fn set_finalized_tip(&mut self, new_tip: impl Into<Option<ChainTipBlock>> + Clone) {
|
||||
let new_tip = new_tip.into();
|
||||
ChainTipSender::record_new_tip(&new_tip);
|
||||
self.record_fields(&new_tip);
|
||||
|
||||
if !self.use_non_finalized_tip {
|
||||
self.update(new_tip);
|
||||
|
@ -180,24 +167,16 @@ impl ChainTipSender {
|
|||
/// Update the latest non-finalized tip.
|
||||
///
|
||||
/// May trigger an update to the best tip.
|
||||
//
|
||||
// TODO: when we replace active_value with `watch::Sender::borrow`,
|
||||
// refactor instrument to avoid multiple borrows, to prevent deadlocks
|
||||
#[instrument(
|
||||
skip(self, new_tip),
|
||||
fields(
|
||||
old_use_non_finalized_tip = ?self.use_non_finalized_tip,
|
||||
old_height = ?self.active_value.as_ref().map(|block| block.height),
|
||||
old_hash = ?self.active_value.as_ref().map(|block| block.hash),
|
||||
new_height,
|
||||
new_hash,
|
||||
))]
|
||||
fields(old_use_non_finalized_tip, old_height, old_hash, new_height, new_hash)
|
||||
)]
|
||||
pub fn set_best_non_finalized_tip(
|
||||
&mut self,
|
||||
new_tip: impl Into<Option<ChainTipBlock>> + Clone,
|
||||
) {
|
||||
let new_tip = new_tip.into();
|
||||
ChainTipSender::record_new_tip(&new_tip);
|
||||
self.record_fields(&new_tip);
|
||||
|
||||
// once the non-finalized state becomes active, it is always populated
|
||||
// but ignoring `None`s makes the tests easier
|
||||
|
@ -212,7 +191,11 @@ impl ChainTipSender {
|
|||
/// An update is only sent if the current best tip is different from the last best tip
|
||||
/// that was sent.
|
||||
fn update(&mut self, new_tip: Option<ChainTipBlock>) {
|
||||
let needs_update = match (new_tip.as_ref(), self.active_value.as_ref()) {
|
||||
// Correctness: the `self.sender.borrow()` must not be placed in a `let` binding to prevent
|
||||
// a read-lock being created and living beyond the `self.sender.send(..)` call. If that
|
||||
// happens, the `send` method will attempt to obtain a write-lock and will dead-lock.
|
||||
// Without the binding, the guard is dropped at the end of the expression.
|
||||
let needs_update = match (new_tip.as_ref(), self.sender.borrow().as_ref()) {
|
||||
// since the blocks have been contextually validated,
|
||||
// we know their hashes cover all the block data
|
||||
(Some(new_tip), Some(active_value)) => new_tip.hash != active_value.hash,
|
||||
|
@ -221,8 +204,7 @@ impl ChainTipSender {
|
|||
};
|
||||
|
||||
if needs_update {
|
||||
let _ = self.sender.send(new_tip.clone());
|
||||
self.active_value = new_tip;
|
||||
let _ = self.sender.send(new_tip);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -230,13 +212,43 @@ impl ChainTipSender {
|
|||
///
|
||||
/// Callers should create a new span with empty `new_height` and `new_hash` fields.
|
||||
fn record_new_tip(new_tip: &Option<ChainTipBlock>) {
|
||||
Self::record_tip(&tracing::Span::current(), "new", new_tip);
|
||||
}
|
||||
|
||||
/// Record `new_tip` and the fields from `self` in the current span.
|
||||
///
|
||||
/// The fields recorded are:
|
||||
///
|
||||
/// - `new_height`
|
||||
/// - `new_hash`
|
||||
/// - `old_height`
|
||||
/// - `old_hash`
|
||||
/// - `old_use_non_finalized_tip`
|
||||
///
|
||||
/// Callers should create a new span with the empty fields described above.
|
||||
fn record_fields(&self, new_tip: &Option<ChainTipBlock>) {
|
||||
let span = tracing::Span::current();
|
||||
|
||||
let new_height = new_tip.as_ref().map(|block| block.height);
|
||||
let new_hash = new_tip.as_ref().map(|block| block.hash);
|
||||
let old_tip = &*self.sender.borrow();
|
||||
|
||||
span.record("new_height", &tracing::field::debug(new_height));
|
||||
span.record("new_hash", &tracing::field::debug(new_hash));
|
||||
Self::record_tip(&span, "new", new_tip);
|
||||
Self::record_tip(&span, "old", old_tip);
|
||||
|
||||
span.record(
|
||||
"old_use_non_finalized_tip",
|
||||
&field::debug(self.use_non_finalized_tip),
|
||||
);
|
||||
}
|
||||
|
||||
/// Record `tip` into `span` using the `prefix` to name the fields.
|
||||
///
|
||||
/// Callers should create a new span with empty `{prefix}_height` and `{prefix}_hash` fields.
|
||||
fn record_tip(span: &tracing::Span, prefix: &str, tip: &Option<ChainTipBlock>) {
|
||||
let height = tip.as_ref().map(|block| block.height);
|
||||
let hash = tip.as_ref().map(|block| block.hash);
|
||||
|
||||
span.record(format!("{}_height", prefix).as_str(), &field::debug(height));
|
||||
span.record(format!("{}_hash", prefix).as_str(), &field::debug(hash));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue