fix(rpc): recreate dead and uncleaned subscriptions (#22281)

This commit is contained in:
Nikita 2022-01-05 10:15:21 +03:00 committed by GitHub
parent 5bb376f304
commit c1995c647b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 35 additions and 18 deletions

View File

@ -180,9 +180,10 @@ pub struct SignatureSubscriptionParams {
#[derive(Clone)]
pub struct SubscriptionControl(Arc<SubscriptionControlInner>);
pub struct WeakSubscriptionTokenRef(Weak<SubscriptionTokenInner>, SubscriptionId);
struct SubscriptionControlInner {
subscriptions: DashMap<SubscriptionParams, Weak<SubscriptionTokenInner>>,
subscriptions: DashMap<SubscriptionParams, WeakSubscriptionTokenRef>,
next_id: AtomicU64,
max_active_subscriptions: usize,
sender: crossbeam_channel::Sender<TimestampedNotificationEntry>,
@ -216,33 +217,44 @@ impl SubscriptionControl {
self.0.subscriptions.len()
);
let count = self.0.subscriptions.len();
match self.0.subscriptions.entry(params) {
DashEntry::Occupied(entry) => Ok(SubscriptionToken(
entry
.get()
.upgrade()
.expect("dead subscription encountered in SubscriptionControl"),
let create_token_and_weak_ref = |id, params| {
let token = SubscriptionToken(
Arc::new(SubscriptionTokenInner {
control: Arc::clone(&self.0),
params,
id,
}),
self.0.counter.create_token(),
)),
);
let weak_ref = WeakSubscriptionTokenRef(Arc::downgrade(&token.0), token.0.id);
(token, weak_ref)
};
match self.0.subscriptions.entry(params) {
DashEntry::Occupied(mut entry) => match entry.get().0.upgrade() {
Some(token_ref) => Ok(SubscriptionToken(token_ref, self.0.counter.create_token())),
// This means the last Arc for this Weak pointer entered the drop just before us,
// but could not remove the entry since we are holding the write lock.
// See `Drop` implementation for `SubscriptionTokenInner` for further info.
None => {
let (token, weak_ref) =
create_token_and_weak_ref(entry.get().1, entry.key().clone());
entry.insert(weak_ref);
Ok(token)
}
},
DashEntry::Vacant(entry) => {
if count >= self.0.max_active_subscriptions {
inc_new_counter_info!("rpc-subscription-refused-limit-reached", 1);
return Err(Error::TooManySubscriptions);
}
let id = SubscriptionId::from(self.0.next_id.fetch_add(1, Ordering::AcqRel));
let token = SubscriptionToken(
Arc::new(SubscriptionTokenInner {
control: Arc::clone(&self.0),
params: entry.key().clone(),
id,
}),
self.0.counter.create_token(),
);
let (token, weak_ref) = create_token_and_weak_ref(id, entry.key().clone());
let _ = self
.0
.sender
.send(NotificationEntry::Subscribed(token.0.params.clone(), id).into());
entry.insert(Arc::downgrade(&token.0));
entry.insert(weak_ref);
datapoint_info!(
"rpc-subscription",
("total", self.0.subscriptions.len(), i64)
@ -529,7 +541,9 @@ impl Drop for SubscriptionTokenInner {
DashEntry::Vacant(_) => {
warn!("Subscriptions inconsistency (missing entry in by_params)");
}
DashEntry::Occupied(entry) => {
// Check the strong refs count to ensure no other thread recreated this subscription (not token)
// while we were acquiring the lock.
DashEntry::Occupied(entry) if entry.get().0.strong_count() == 0 => {
let _ = self
.control
.sender
@ -540,6 +554,9 @@ impl Drop for SubscriptionTokenInner {
("total", self.control.subscriptions.len(), i64)
);
}
// This branch handles the case in which this entry got recreated
// while we were waiting for the lock (inside the `DashMap::entry` method).
DashEntry::Occupied(_entry) /* if _entry.get().0.strong_count() > 0 */ => (),
}
}
}