From 2928c5d1031ed5d19b1b3ddc5b01d3b2d551f30f Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Fri, 22 May 2020 13:55:17 -0600 Subject: [PATCH] Add SingleGossip commitment level to use for subscriptions (#10147) automerge --- core/src/rpc.rs | 5 +- core/src/rpc_subscriptions.rs | 354 +++++++++++++++++++++++++++++----- sdk/src/commitment_config.rs | 9 +- 3 files changed, 316 insertions(+), 52 deletions(-) diff --git a/core/src/rpc.rs b/core/src/rpc.rs index d29b967ffb..b696b89ef9 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -92,7 +92,10 @@ impl JsonRpcRequestProcessor { debug!("RPC using node root: {:?}", slot); Ok(r_bank_forks.get(slot).cloned().unwrap()) } - Some(commitment_config) if commitment_config.commitment == CommitmentLevel::Single => { + Some(commitment_config) + if commitment_config.commitment == CommitmentLevel::Single + || commitment_config.commitment == CommitmentLevel::SingleGossip => + { let slot = self .block_commitment_cache .read() diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index f3055cbb60..9cf553df29 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -57,6 +57,7 @@ enum NotificationEntry { Vote(Vote), Root(Slot), Bank(CacheSlotInfo), + Gossip(Slot), } impl std::fmt::Debug for NotificationEntry { @@ -70,6 +71,7 @@ impl std::fmt::Debug for NotificationEntry { "Bank({{current_slot: {:?}}})", cache_slot_info.current_slot ), + NotificationEntry::Gossip(slot) => write!(f, "Gossip({:?})", slot), } } } @@ -171,7 +173,9 @@ where CommitmentLevel::Max => cache_slot_info.largest_confirmed_root, CommitmentLevel::Recent => cache_slot_info.current_slot, CommitmentLevel::Root => cache_slot_info.node_root, - CommitmentLevel::Single => cache_slot_info.highest_confirmed_slot, + CommitmentLevel::Single | CommitmentLevel::SingleGossip => { + cache_slot_info.highest_confirmed_slot + } }; let results = { let bank_forks = bank_forks.read().unwrap(); @@ -258,6 +262,9 @@ struct Subscriptions { account_subscriptions: Arc, program_subscriptions: Arc, signature_subscriptions: Arc, + gossip_account_subscriptions: Arc, + gossip_program_subscriptions: Arc, + gossip_signature_subscriptions: Arc, slot_subscriptions: Arc, vote_subscriptions: Arc, root_subscriptions: Arc, @@ -270,6 +277,7 @@ pub struct RpcSubscriptions { notifier_runtime: Option, bank_forks: Arc>, block_commitment_cache: Arc>, + last_checked_slots: Arc>>, exit: Arc, } @@ -295,6 +303,9 @@ impl RpcSubscriptions { let account_subscriptions = Arc::new(RpcAccountSubscriptions::default()); let program_subscriptions = Arc::new(RpcProgramSubscriptions::default()); let signature_subscriptions = Arc::new(RpcSignatureSubscriptions::default()); + let gossip_account_subscriptions = Arc::new(RpcAccountSubscriptions::default()); + let gossip_program_subscriptions = Arc::new(RpcProgramSubscriptions::default()); + let gossip_signature_subscriptions = Arc::new(RpcSignatureSubscriptions::default()); let slot_subscriptions = Arc::new(RpcSlotSubscriptions::default()); let vote_subscriptions = Arc::new(RpcVoteSubscriptions::default()); let root_subscriptions = Arc::new(RpcRootSubscriptions::default()); @@ -307,12 +318,18 @@ impl RpcSubscriptions { account_subscriptions, program_subscriptions, signature_subscriptions, + gossip_account_subscriptions, + gossip_program_subscriptions, + gossip_signature_subscriptions, slot_subscriptions, vote_subscriptions, root_subscriptions, }; let _subscriptions = subscriptions.clone(); + let last_checked_slots = Arc::new(RwLock::new(HashMap::new())); + let _last_checked_slots = last_checked_slots.clone(); + let notifier_runtime = RuntimeBuilder::new() .core_threads(1) .name_prefix("solana-rpc-notifier-") @@ -329,6 +346,7 @@ impl RpcSubscriptions { notification_receiver, _subscriptions, _bank_forks, + _last_checked_slots, ); }) .unwrap(); @@ -340,6 +358,7 @@ impl RpcSubscriptions { t_cleanup: Some(t_cleanup), bank_forks, block_commitment_cache, + last_checked_slots, exit: exit.clone(), } } @@ -427,11 +446,10 @@ impl RpcSubscriptions { sub_id: SubscriptionId, subscriber: Subscriber>, ) { - let mut subscriptions = self.subscriptions.account_subscriptions.write().unwrap(); - let slot = match commitment + let commitment_level = commitment .unwrap_or_else(CommitmentConfig::single) - .commitment - { + .commitment; + let slot = match commitment_level { CommitmentLevel::Max => self .block_commitment_cache .read() @@ -444,6 +462,12 @@ impl RpcSubscriptions { .read() .unwrap() .highest_confirmed_slot(), + CommitmentLevel::SingleGossip => *self + .last_checked_slots + .read() + .unwrap() + .get(&CommitmentLevel::SingleGossip) + .unwrap_or(&0), }; let last_notified_slot = if let Some((_account, slot)) = self .bank_forks @@ -456,6 +480,15 @@ impl RpcSubscriptions { } else { 0 }; + + let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip { + self.subscriptions + .gossip_account_subscriptions + .write() + .unwrap() + } else { + self.subscriptions.account_subscriptions.write().unwrap() + }; add_subscription( &mut subscriptions, pubkey, @@ -468,7 +501,16 @@ impl RpcSubscriptions { pub fn remove_account_subscription(&self, id: &SubscriptionId) -> bool { let mut subscriptions = self.subscriptions.account_subscriptions.write().unwrap(); - remove_subscription(&mut subscriptions, id) + if remove_subscription(&mut subscriptions, id) { + true + } else { + let mut subscriptions = self + .subscriptions + .gossip_account_subscriptions + .write() + .unwrap(); + remove_subscription(&mut subscriptions, id) + } } pub fn add_program_subscription( @@ -478,7 +520,17 @@ impl RpcSubscriptions { sub_id: SubscriptionId, subscriber: Subscriber>, ) { - let mut subscriptions = self.subscriptions.program_subscriptions.write().unwrap(); + let commitment_level = commitment + .unwrap_or_else(CommitmentConfig::recent) + .commitment; + let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip { + self.subscriptions + .gossip_program_subscriptions + .write() + .unwrap() + } else { + self.subscriptions.program_subscriptions.write().unwrap() + }; add_subscription( &mut subscriptions, program_id, @@ -491,7 +543,16 @@ impl RpcSubscriptions { pub fn remove_program_subscription(&self, id: &SubscriptionId) -> bool { let mut subscriptions = self.subscriptions.program_subscriptions.write().unwrap(); - remove_subscription(&mut subscriptions, id) + if remove_subscription(&mut subscriptions, id) { + true + } else { + let mut subscriptions = self + .subscriptions + .gossip_program_subscriptions + .write() + .unwrap(); + remove_subscription(&mut subscriptions, id) + } } pub fn add_signature_subscription( @@ -501,7 +562,17 @@ impl RpcSubscriptions { sub_id: SubscriptionId, subscriber: Subscriber>, ) { - let mut subscriptions = self.subscriptions.signature_subscriptions.write().unwrap(); + let commitment_level = commitment + .unwrap_or_else(CommitmentConfig::recent) + .commitment; + let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip { + self.subscriptions + .gossip_signature_subscriptions + .write() + .unwrap() + } else { + self.subscriptions.signature_subscriptions.write().unwrap() + }; add_subscription( &mut subscriptions, signature, @@ -514,7 +585,16 @@ impl RpcSubscriptions { pub fn remove_signature_subscription(&self, id: &SubscriptionId) -> bool { let mut subscriptions = self.subscriptions.signature_subscriptions.write().unwrap(); - remove_subscription(&mut subscriptions, id) + if remove_subscription(&mut subscriptions, id) { + true + } else { + let mut subscriptions = self + .subscriptions + .gossip_signature_subscriptions + .write() + .unwrap(); + remove_subscription(&mut subscriptions, id) + } } /// Notify subscribers of changes to any accounts or new signatures since @@ -523,6 +603,12 @@ impl RpcSubscriptions { self.enqueue_notification(NotificationEntry::Bank(cache_slot_info)); } + /// Notify SingleGossip commitment-level subscribers of changes to any accounts or new + /// signatures. + pub fn notify_gossip_subscribers(&self, slot: Slot) { + self.enqueue_notification(NotificationEntry::Gossip(slot)); + } + pub fn add_slot_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber) { let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let mut subscriptions = self.subscriptions.slot_subscriptions.write().unwrap(); @@ -594,6 +680,7 @@ impl RpcSubscriptions { notification_receiver: Receiver, subscriptions: Subscriptions, bank_forks: Arc>, + last_checked_slots: Arc>>, ) { loop { if exit.load(Ordering::Relaxed) { @@ -627,47 +714,32 @@ impl RpcSubscriptions { } } NotificationEntry::Bank(cache_slot_info) => { - let pubkeys: Vec<_> = { - let subs = subscriptions.account_subscriptions.read().unwrap(); - subs.keys().cloned().collect() + RpcSubscriptions::notify_accounts_programs_signatures( + &subscriptions.account_subscriptions, + &subscriptions.program_subscriptions, + &subscriptions.signature_subscriptions, + &bank_forks, + &cache_slot_info, + ¬ifier, + ) + } + NotificationEntry::Gossip(slot) => { + let _ = last_checked_slots + .write() + .unwrap() + .insert(CommitmentLevel::SingleGossip, slot); + let cache_slot_info = CacheSlotInfo { + highest_confirmed_slot: slot, + ..CacheSlotInfo::default() }; - for pubkey in &pubkeys { - Self::check_account( - pubkey, - &bank_forks, - subscriptions.account_subscriptions.clone(), - ¬ifier, - &cache_slot_info, - ); - } - - let programs: Vec<_> = { - let subs = subscriptions.program_subscriptions.read().unwrap(); - subs.keys().cloned().collect() - }; - for program_id in &programs { - Self::check_program( - program_id, - &bank_forks, - subscriptions.program_subscriptions.clone(), - ¬ifier, - &cache_slot_info, - ); - } - - let signatures: Vec<_> = { - let subs = subscriptions.signature_subscriptions.read().unwrap(); - subs.keys().cloned().collect() - }; - for signature in &signatures { - Self::check_signature( - signature, - &bank_forks, - subscriptions.signature_subscriptions.clone(), - ¬ifier, - &cache_slot_info, - ); - } + RpcSubscriptions::notify_accounts_programs_signatures( + &subscriptions.gossip_account_subscriptions, + &subscriptions.gossip_program_subscriptions, + &subscriptions.gossip_signature_subscriptions, + &bank_forks, + &cache_slot_info, + ¬ifier, + ) } }, Err(RecvTimeoutError::Timeout) => { @@ -681,6 +753,57 @@ impl RpcSubscriptions { } } + fn notify_accounts_programs_signatures( + account_subscriptions: &Arc, + program_subscriptions: &Arc, + signature_subscriptions: &Arc, + bank_forks: &Arc>, + cache_slot_info: &CacheSlotInfo, + notifier: &RpcNotifier, + ) { + let pubkeys: Vec<_> = { + let subs = account_subscriptions.read().unwrap(); + subs.keys().cloned().collect() + }; + for pubkey in &pubkeys { + Self::check_account( + pubkey, + &bank_forks, + account_subscriptions.clone(), + ¬ifier, + &cache_slot_info, + ); + } + + let programs: Vec<_> = { + let subs = program_subscriptions.read().unwrap(); + subs.keys().cloned().collect() + }; + for program_id in &programs { + Self::check_program( + program_id, + &bank_forks, + program_subscriptions.clone(), + ¬ifier, + &cache_slot_info, + ); + } + + let signatures: Vec<_> = { + let subs = signature_subscriptions.read().unwrap(); + subs.keys().cloned().collect() + }; + for signature in &signatures { + Self::check_signature( + signature, + &bank_forks, + signature_subscriptions.clone(), + ¬ifier, + &cache_slot_info, + ); + } + } + fn shutdown(&mut self) -> std::thread::Result<()> { if let Some(runtime) = self.notifier_runtime.take() { info!("RPC Notifier runtime - shutting down"); @@ -1232,4 +1355,135 @@ pub(crate) mod tests { assert_eq!(subscriptions.len(), (num_keys - 1) as usize); assert!(subscriptions.get(&0).is_none()); } + + #[test] + #[serial] + fn test_gossip_separate_account_notifications() { + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(100); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let bank = Bank::new(&genesis_config); + let blockhash = bank.last_blockhash(); + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone(); + let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); + bank_forks.write().unwrap().insert(bank1); + let alice = Keypair::new(); + + let (subscriber0, _id_receiver, transport_receiver0) = + Subscriber::new_test("accountNotification"); + let (subscriber1, _id_receiver, transport_receiver1) = + Subscriber::new_test("accountNotification"); + let exit = Arc::new(AtomicBool::new(false)); + let subscriptions = RpcSubscriptions::new( + &exit, + bank_forks.clone(), + Arc::new(RwLock::new( + BlockCommitmentCache::new_for_tests_with_blockstore_bank( + blockstore, + bank_forks.read().unwrap().get(1).unwrap().clone(), + 1, + ), + )), + ); + let sub_id0 = SubscriptionId::Number(0 as u64); + subscriptions.add_account_subscription( + alice.pubkey(), + Some(CommitmentConfig::single_gossip()), + sub_id0.clone(), + subscriber0, + ); + let sub_id1 = SubscriptionId::Number(1 as u64); + subscriptions.add_account_subscription( + alice.pubkey(), + Some(CommitmentConfig::recent()), + sub_id1.clone(), + subscriber1, + ); + + assert!(subscriptions + .subscriptions + .account_subscriptions + .read() + .unwrap() + .contains_key(&alice.pubkey())); + + let tx = system_transaction::create_account( + &mint_keypair, + &alice, + blockhash, + 1, + 16, + &solana_budget_program::id(), + ); + bank_forks + .write() + .unwrap() + .get(1) + .unwrap() + .process_transaction(&tx) + .unwrap(); + let mut cache_slot_info = CacheSlotInfo::default(); + cache_slot_info.current_slot = 1; + subscriptions.notify_subscribers(cache_slot_info); + let (response, _) = robust_poll_or_panic(transport_receiver1); + let expected = json!({ + "jsonrpc": "2.0", + "method": "accountNotification", + "params": { + "result": { + "context": { "slot": 1 }, + "value": { + "data": "1111111111111111", + "executable": false, + "lamports": 1, + "owner": "Budget1111111111111111111111111111111111111", + "rentEpoch": 1, + }, + }, + "subscription": 1, + } + }); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); + + subscriptions.notify_gossip_subscribers(1); + let (response, _) = robust_poll_or_panic(transport_receiver0); + let expected = json!({ + "jsonrpc": "2.0", + "method": "accountNotification", + "params": { + "result": { + "context": { "slot": 1 }, + "value": { + "data": "1111111111111111", + "executable": false, + "lamports": 1, + "owner": "Budget1111111111111111111111111111111111111", + "rentEpoch": 1, + }, + }, + "subscription": 0, + } + }); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); + + subscriptions.remove_account_subscription(&sub_id0); + assert!(subscriptions + .subscriptions + .account_subscriptions + .read() + .unwrap() + .contains_key(&alice.pubkey())); + subscriptions.remove_account_subscription(&sub_id1); + assert!(!subscriptions + .subscriptions + .account_subscriptions + .read() + .unwrap() + .contains_key(&alice.pubkey())); + } } diff --git a/sdk/src/commitment_config.rs b/sdk/src/commitment_config.rs index b72da97bb1..206102c080 100644 --- a/sdk/src/commitment_config.rs +++ b/sdk/src/commitment_config.rs @@ -37,6 +37,12 @@ impl CommitmentConfig { } } + pub fn single_gossip() -> Self { + Self { + commitment: CommitmentLevel::SingleGossip, + } + } + pub fn ok(self) -> Option { if self == Self::default() { None @@ -46,11 +52,12 @@ impl CommitmentConfig { } } -#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq)] +#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq, Hash)] #[serde(rename_all = "camelCase")] pub enum CommitmentLevel { Max, Recent, Root, Single, + SingleGossip, }