From ae96ba34590bef8bf79ef2145faba20d524dd291 Mon Sep 17 00:00:00 2001 From: carllin Date: Sun, 28 Feb 2021 23:29:11 -0800 Subject: [PATCH] Plumb slot update pubsub notifications (#15488) --- client/src/rpc_response.rs | 9 +++ core/benches/retransmit_stage.rs | 1 + .../optimistically_confirmed_bank_tracker.rs | 13 +++- core/src/retransmit_stage.rs | 62 ++++++++++++++++++- core/src/rpc_pubsub.rs | 59 ++++++++++++++++++ core/src/rpc_subscriptions.rs | 52 +++++++++++++++- core/src/tvu.rs | 1 + 7 files changed, 194 insertions(+), 3 deletions(-) diff --git a/client/src/rpc_response.rs b/client/src/rpc_response.rs index 42d85b7bb..f51afd956 100644 --- a/client/src/rpc_response.rs +++ b/client/src/rpc_response.rs @@ -101,6 +101,15 @@ pub struct SlotInfo { pub root: Slot, } +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase", tag = "type")] +pub enum SlotUpdate { + OptimisticConfirmation { slot: Slot, timestamp: u64 }, + FirstShredReceived { slot: Slot, timestamp: u64 }, + Frozen { slot: Slot, timestamp: u64 }, + Root { slot: Slot, timestamp: u64 }, +} + #[derive(Serialize, Deserialize, Clone, Debug)] #[serde(rename_all = "camelCase", untagged)] pub enum RpcSignatureResult { diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index 0c0f3a3b8..1014d2cd1 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -94,6 +94,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { cluster_info, packet_receiver, &Arc::new(MaxSlots::default()), + None, ); let mut index = 0; diff --git a/core/src/optimistically_confirmed_bank_tracker.rs b/core/src/optimistically_confirmed_bank_tracker.rs index 3acc92f04..2a422819b 100644 --- a/core/src/optimistically_confirmed_bank_tracker.rs +++ b/core/src/optimistically_confirmed_bank_tracker.rs @@ -4,8 +4,9 @@ use crate::rpc_subscriptions::RpcSubscriptions; use crossbeam_channel::{Receiver, RecvTimeoutError, Sender}; +use solana_client::rpc_response::SlotUpdate; use solana_runtime::{bank::Bank, bank_forks::BankForks}; -use solana_sdk::clock::Slot; +use solana_sdk::{clock::Slot, timing::timestamp}; use std::{ collections::HashSet, sync::{ @@ -130,6 +131,12 @@ impl OptimisticallyConfirmedBankTracker { } else { inc_new_counter_info!("dropped-already-rooted-optimistic-bank-notification", 1); } + + // Send slot notification regardless of whether the bank is replayed + subscriptions.notify_slot_update(SlotUpdate::OptimisticConfirmation { + slot, + timestamp: timestamp(), + }); } BankNotification::Frozen(bank) => { let frozen_slot = bank.slot(); @@ -142,6 +149,10 @@ impl OptimisticallyConfirmedBankTracker { } drop(w_optimistically_confirmed_bank); } + subscriptions.notify_slot_update(SlotUpdate::Frozen { + slot: frozen_slot, + timestamp: timestamp(), + }); } BankNotification::Root(bank) => { let root_slot = bank.slot(); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index d517e53a9..a01641aa5 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -12,10 +12,12 @@ use crate::{ repair_service::DuplicateSlotsResetSender, repair_service::RepairInfo, result::{Error, Result}, + rpc_subscriptions::RpcSubscriptions, window_service::{should_retransmit_and_persist, WindowService}, }; use crossbeam_channel::Receiver; use lru::LruCache; +use solana_client::rpc_response::SlotUpdate; use solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats}; use solana_ledger::{ blockstore::{Blockstore, CompletedSlotsReceiver}, @@ -32,7 +34,7 @@ use solana_streamer::streamer::PacketReceiver; use std::{ cmp, collections::hash_set::HashSet, - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, BTreeSet, HashMap}, net::UdpSocket, ops::{Deref, DerefMut}, sync::atomic::{AtomicBool, AtomicU64, Ordering}, @@ -237,6 +239,43 @@ fn check_if_already_received( } } +fn notify_first_shred_received( + shred_slot: Slot, + rpc_subscriptions: &RpcSubscriptions, + sent_received_slot_notification: &Mutex>, + root_bank: &Bank, +) { + let notify_slot = { + let mut sent_received_slot_notification_locked = + sent_received_slot_notification.lock().unwrap(); + if !sent_received_slot_notification_locked.contains(&shred_slot) + && shred_slot > root_bank.slot() + { + sent_received_slot_notification_locked.insert(shred_slot); + if sent_received_slot_notification_locked.len() > 100 { + let mut slots_before_root = + sent_received_slot_notification_locked.split_off(&(root_bank.slot() + 1)); + // `slots_before_root` now contains all slots <= root + std::mem::swap( + &mut slots_before_root, + &mut sent_received_slot_notification_locked, + ); + } + Some(shred_slot) + } else { + None + } + }; + + if let Some(slot) = notify_slot { + info!("First time receiving a shred from slot: {}", slot); + rpc_subscriptions.notify_slot_update(SlotUpdate::FirstShredReceived { + slot, + timestamp: timestamp(), + }); + } +} + // Returns true if turbine retransmit peers patch (#14565) is enabled. fn enable_turbine_retransmit_peers_patch(shred_slot: Slot, root_bank: &Bank) -> bool { let feature_slot = root_bank @@ -266,6 +305,8 @@ fn retransmit( last_peer_update: &AtomicU64, shreds_received: &Mutex, max_slots: &MaxSlots, + sent_received_slot_notification: &Mutex>, + rpc_subscriptions: &Option>, ) -> Result<()> { let timer = Duration::new(1, 0); let r_lock = r.lock().unwrap(); @@ -341,6 +382,16 @@ fn retransmit( None => continue, }; max_slot = max_slot.max(shred_slot); + + if let Some(rpc_subscriptions) = rpc_subscriptions { + notify_first_shred_received( + shred_slot, + rpc_subscriptions, + sent_received_slot_notification, + &root_bank, + ); + } + let mut compute_turbine_peers = Measure::start("turbine_start"); let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index( &my_id, @@ -439,12 +490,14 @@ pub fn retransmitter( cluster_info: Arc, r: Arc>, max_slots: &Arc, + rpc_subscriptions: Option>, ) -> Vec> { let stats = Arc::new(RetransmitStats::default()); let shreds_received = Arc::new(Mutex::new(( LruCache::new(DEFAULT_LRU_SIZE), PacketHasher::default(), ))); + let sent_received_slot_notification = Arc::new(Mutex::new(BTreeSet::new())); (0..sockets.len()) .map(|s| { let sockets = sockets.clone(); @@ -457,6 +510,8 @@ pub fn retransmitter( let last_peer_update = Arc::new(AtomicU64::new(0)); let shreds_received = shreds_received.clone(); let max_slots = max_slots.clone(); + let sent_received_slot_notification = sent_received_slot_notification.clone(); + let rpc_subscriptions = rpc_subscriptions.clone(); Builder::new() .name("solana-retransmitter".to_string()) @@ -475,6 +530,8 @@ pub fn retransmitter( &last_peer_update, &shreds_received, &max_slots, + &sent_received_slot_notification, + &rpc_subscriptions, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, @@ -520,6 +577,7 @@ impl RetransmitStage { repair_validators: Option>, completed_data_sets_sender: CompletedDataSetsSender, max_slots: &Arc, + rpc_subscriptions: Option>, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); @@ -531,6 +589,7 @@ impl RetransmitStage { cluster_info.clone(), retransmit_receiver, max_slots, + rpc_subscriptions, ); let leader_schedule_cache_clone = leader_schedule_cache.clone(); @@ -649,6 +708,7 @@ mod tests { cluster_info, Arc::new(Mutex::new(retransmit_receiver)), &Arc::new(MaxSlots::default()), + None, ); let _thread_hdls = vec![t_retransmit]; diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index 4c249566b..b12bfd64c 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -12,6 +12,7 @@ use solana_client::{ }, rpc_response::{ Response as RpcResponse, RpcKeyedAccount, RpcLogsResponse, RpcSignatureResult, SlotInfo, + SlotUpdate, }, }; #[cfg(test)] @@ -139,6 +140,30 @@ pub trait RpcSolPubSub { )] fn slot_unsubscribe(&self, meta: Option, id: SubscriptionId) -> Result; + // Get series of updates for all slots + #[pubsub( + subscription = "slotsUpdatesNotification", + subscribe, + name = "slotsUpdatesSubscribe" + )] + fn slots_updates_subscribe( + &self, + meta: Self::Metadata, + subscriber: Subscriber>, + ); + + // Unsubscribe from slots updates notification subscription. + #[pubsub( + subscription = "slotsUpdatesNotification", + unsubscribe, + name = "slotsUpdatesUnsubscribe" + )] + fn slots_updates_unsubscribe( + &self, + meta: Option, + id: SubscriptionId, + ) -> Result; + // Get notification when vote is encountered #[pubsub(subscription = "voteNotification", subscribe, name = "voteSubscribe")] fn vote_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber); @@ -428,6 +453,40 @@ impl RpcSolPubSub for RpcSolPubSubImpl { } } + fn slots_updates_subscribe( + &self, + _meta: Self::Metadata, + subscriber: Subscriber>, + ) { + info!("slots_updates_subscribe"); + if let Err(err) = self.check_subscription_count() { + subscriber.reject(err).unwrap_or_default(); + return; + } + let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); + let sub_id = SubscriptionId::Number(id as u64); + info!("slots_updates_subscribe: id={:?}", sub_id); + self.subscriptions + .add_slots_updates_subscription(sub_id, subscriber); + } + + fn slots_updates_unsubscribe( + &self, + _meta: Option, + id: SubscriptionId, + ) -> Result { + info!("slots_updates_unsubscribe"); + if self.subscriptions.remove_slots_updates_subscription(&id) { + Ok(true) + } else { + Err(Error { + code: ErrorCode::InvalidParams, + message: "Invalid Request: Subscription id does not exist".into(), + data: None, + }) + } + } + fn vote_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber) { info!("vote_subscribe"); if let Err(err) = self.check_subscription_count() { diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 300999905..389454f00 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -16,7 +16,7 @@ use solana_client::{ rpc_filter::RpcFilterType, rpc_response::{ ProcessedSignatureResult, ReceivedSignatureResult, Response, RpcKeyedAccount, - RpcLogsResponse, RpcResponseContext, RpcSignatureResult, SlotInfo, + RpcLogsResponse, RpcResponseContext, RpcSignatureResult, SlotInfo, SlotUpdate, }, }; use solana_measure::measure::Measure; @@ -33,6 +33,7 @@ use solana_sdk::{ commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature, + timing::timestamp, transaction, }; use solana_vote_program::vote_state::Vote; @@ -82,6 +83,7 @@ pub struct RpcVote { enum NotificationEntry { Slot(SlotInfo), + SlotUpdate(SlotUpdate), Vote(Vote), Root(Slot), Bank(CommitmentSlots), @@ -95,6 +97,9 @@ impl std::fmt::Debug for NotificationEntry { NotificationEntry::Root(root) => write!(f, "Root({})", root), NotificationEntry::Vote(vote) => write!(f, "Vote({:?})", vote), NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info), + NotificationEntry::SlotUpdate(slot_update) => { + write!(f, "SlotUpdate({:?})", slot_update) + } NotificationEntry::Bank(commitment_slots) => { write!(f, "Bank({{slot: {:?}}})", commitment_slots.slot) } @@ -142,6 +147,7 @@ type RpcSignatureSubscriptions = RwLock< >, >; type RpcSlotSubscriptions = RwLock>>; +type RpcSlotUpdateSubscriptions = RwLock>>>; type RpcVoteSubscriptions = RwLock>>; type RpcRootSubscriptions = RwLock>>; @@ -387,6 +393,7 @@ struct Subscriptions { gossip_program_subscriptions: Arc, gossip_signature_subscriptions: Arc, slot_subscriptions: Arc, + slots_updates_subscriptions: Arc, vote_subscriptions: Arc, root_subscriptions: Arc, } @@ -465,6 +472,7 @@ impl RpcSubscriptions { let gossip_program_subscriptions = Arc::new(RpcProgramSubscriptions::default()); let gossip_signature_subscriptions = Arc::new(RpcSignatureSubscriptions::default()); let slot_subscriptions = Arc::new(RpcSlotSubscriptions::default()); + let slots_updates_subscriptions = Arc::new(RpcSlotUpdateSubscriptions::default()); let vote_subscriptions = Arc::new(RpcVoteSubscriptions::default()); let root_subscriptions = Arc::new(RpcRootSubscriptions::default()); let notification_sender = Arc::new(Mutex::new(notification_sender)); @@ -482,6 +490,7 @@ impl RpcSubscriptions { gossip_program_subscriptions, gossip_signature_subscriptions, slot_subscriptions, + slots_updates_subscriptions, vote_subscriptions, root_subscriptions, }; @@ -903,6 +912,10 @@ impl RpcSubscriptions { self.enqueue_notification(NotificationEntry::Gossip(slot)); } + pub fn notify_slot_update(&self, slot_update: SlotUpdate) { + self.enqueue_notification(NotificationEntry::SlotUpdate(slot_update)); + } + pub fn add_slot_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber) { let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let mut subscriptions = self.subscriptions.slot_subscriptions.write().unwrap(); @@ -914,6 +927,29 @@ impl RpcSubscriptions { subscriptions.remove(id).is_some() } + pub fn add_slots_updates_subscription( + &self, + sub_id: SubscriptionId, + subscriber: Subscriber>, + ) { + let sink = subscriber.assign_id(sub_id.clone()).unwrap(); + let mut subscriptions = self + .subscriptions + .slots_updates_subscriptions + .write() + .unwrap(); + subscriptions.insert(sub_id, sink); + } + + pub fn remove_slots_updates_subscription(&self, id: &SubscriptionId) -> bool { + let mut subscriptions = self + .subscriptions + .slots_updates_subscriptions + .write() + .unwrap(); + subscriptions.remove(id).is_some() + } + pub fn notify_slot(&self, slot: Slot, parent: Slot, root: Slot) { self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root })); } @@ -957,6 +993,10 @@ impl RpcSubscriptions { pub fn notify_roots(&self, mut rooted_slots: Vec) { rooted_slots.sort_unstable(); rooted_slots.into_iter().for_each(|root| { + self.enqueue_notification(NotificationEntry::SlotUpdate(SlotUpdate::Root { + slot: root, + timestamp: timestamp(), + })); self.enqueue_notification(NotificationEntry::Root(root)); }); } @@ -1005,6 +1045,15 @@ impl RpcSubscriptions { notifier.notify(slot_info, sink); } } + NotificationEntry::SlotUpdate(slot_update) => { + let subscriptions = + subscriptions.slots_updates_subscriptions.read().unwrap(); + let slot_update = Arc::new(slot_update); + for (_, sink) in subscriptions.iter() { + inc_new_counter_info!("rpc-subscription-notify-slots-updates", 1); + notifier.notify(slot_update.clone(), sink); + } + } // These notifications are only triggered by votes observed on gossip, // unlike `NotificationEntry::Gossip`, which also accounts for slots seen // in VoteState's from bank states built in ReplayStage. @@ -1021,6 +1070,7 @@ impl RpcSubscriptions { inc_new_counter_info!("rpc-subscription-notify-vote", 1); notifier.notify( RpcVote { + // TODO: Remove clones slots: vote_info.slots.clone(), hash: bs58::encode(vote_info.hash).into_string(), timestamp: vote_info.timestamp, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index b21f7d73a..c83614dbd 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -178,6 +178,7 @@ impl Tvu { tvu_config.repair_validators, completed_data_sets_sender, max_slots, + Some(subscriptions.clone()), ); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel();