Add SingleGossip commitment level to use for subscriptions (#10147)

automerge
This commit is contained in:
Tyera Eulberg 2020-05-22 13:55:17 -06:00 committed by GitHub
parent 2324eb9ff9
commit 2928c5d103
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 316 additions and 52 deletions

View File

@ -92,7 +92,10 @@ impl JsonRpcRequestProcessor {
debug!("RPC using node root: {:?}", slot);
Ok(r_bank_forks.get(slot).cloned().unwrap())
}
Some(commitment_config) if commitment_config.commitment == CommitmentLevel::Single => {
Some(commitment_config)
if commitment_config.commitment == CommitmentLevel::Single
|| commitment_config.commitment == CommitmentLevel::SingleGossip =>
{
let slot = self
.block_commitment_cache
.read()

View File

@ -57,6 +57,7 @@ enum NotificationEntry {
Vote(Vote),
Root(Slot),
Bank(CacheSlotInfo),
Gossip(Slot),
}
impl std::fmt::Debug for NotificationEntry {
@ -70,6 +71,7 @@ impl std::fmt::Debug for NotificationEntry {
"Bank({{current_slot: {:?}}})",
cache_slot_info.current_slot
),
NotificationEntry::Gossip(slot) => write!(f, "Gossip({:?})", slot),
}
}
}
@ -171,7 +173,9 @@ where
CommitmentLevel::Max => cache_slot_info.largest_confirmed_root,
CommitmentLevel::Recent => cache_slot_info.current_slot,
CommitmentLevel::Root => cache_slot_info.node_root,
CommitmentLevel::Single => cache_slot_info.highest_confirmed_slot,
CommitmentLevel::Single | CommitmentLevel::SingleGossip => {
cache_slot_info.highest_confirmed_slot
}
};
let results = {
let bank_forks = bank_forks.read().unwrap();
@ -258,6 +262,9 @@ struct Subscriptions {
account_subscriptions: Arc<RpcAccountSubscriptions>,
program_subscriptions: Arc<RpcProgramSubscriptions>,
signature_subscriptions: Arc<RpcSignatureSubscriptions>,
gossip_account_subscriptions: Arc<RpcAccountSubscriptions>,
gossip_program_subscriptions: Arc<RpcProgramSubscriptions>,
gossip_signature_subscriptions: Arc<RpcSignatureSubscriptions>,
slot_subscriptions: Arc<RpcSlotSubscriptions>,
vote_subscriptions: Arc<RpcVoteSubscriptions>,
root_subscriptions: Arc<RpcRootSubscriptions>,
@ -270,6 +277,7 @@ pub struct RpcSubscriptions {
notifier_runtime: Option<Runtime>,
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
last_checked_slots: Arc<RwLock<HashMap<CommitmentLevel, Slot>>>,
exit: Arc<AtomicBool>,
}
@ -295,6 +303,9 @@ impl RpcSubscriptions {
let account_subscriptions = Arc::new(RpcAccountSubscriptions::default());
let program_subscriptions = Arc::new(RpcProgramSubscriptions::default());
let signature_subscriptions = Arc::new(RpcSignatureSubscriptions::default());
let gossip_account_subscriptions = Arc::new(RpcAccountSubscriptions::default());
let gossip_program_subscriptions = Arc::new(RpcProgramSubscriptions::default());
let gossip_signature_subscriptions = Arc::new(RpcSignatureSubscriptions::default());
let slot_subscriptions = Arc::new(RpcSlotSubscriptions::default());
let vote_subscriptions = Arc::new(RpcVoteSubscriptions::default());
let root_subscriptions = Arc::new(RpcRootSubscriptions::default());
@ -307,12 +318,18 @@ impl RpcSubscriptions {
account_subscriptions,
program_subscriptions,
signature_subscriptions,
gossip_account_subscriptions,
gossip_program_subscriptions,
gossip_signature_subscriptions,
slot_subscriptions,
vote_subscriptions,
root_subscriptions,
};
let _subscriptions = subscriptions.clone();
let last_checked_slots = Arc::new(RwLock::new(HashMap::new()));
let _last_checked_slots = last_checked_slots.clone();
let notifier_runtime = RuntimeBuilder::new()
.core_threads(1)
.name_prefix("solana-rpc-notifier-")
@ -329,6 +346,7 @@ impl RpcSubscriptions {
notification_receiver,
_subscriptions,
_bank_forks,
_last_checked_slots,
);
})
.unwrap();
@ -340,6 +358,7 @@ impl RpcSubscriptions {
t_cleanup: Some(t_cleanup),
bank_forks,
block_commitment_cache,
last_checked_slots,
exit: exit.clone(),
}
}
@ -427,11 +446,10 @@ impl RpcSubscriptions {
sub_id: SubscriptionId,
subscriber: Subscriber<Response<RpcAccount>>,
) {
let mut subscriptions = self.subscriptions.account_subscriptions.write().unwrap();
let slot = match commitment
let commitment_level = commitment
.unwrap_or_else(CommitmentConfig::single)
.commitment
{
.commitment;
let slot = match commitment_level {
CommitmentLevel::Max => self
.block_commitment_cache
.read()
@ -444,6 +462,12 @@ impl RpcSubscriptions {
.read()
.unwrap()
.highest_confirmed_slot(),
CommitmentLevel::SingleGossip => *self
.last_checked_slots
.read()
.unwrap()
.get(&CommitmentLevel::SingleGossip)
.unwrap_or(&0),
};
let last_notified_slot = if let Some((_account, slot)) = self
.bank_forks
@ -456,6 +480,15 @@ impl RpcSubscriptions {
} else {
0
};
let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip {
self.subscriptions
.gossip_account_subscriptions
.write()
.unwrap()
} else {
self.subscriptions.account_subscriptions.write().unwrap()
};
add_subscription(
&mut subscriptions,
pubkey,
@ -468,7 +501,16 @@ impl RpcSubscriptions {
pub fn remove_account_subscription(&self, id: &SubscriptionId) -> bool {
let mut subscriptions = self.subscriptions.account_subscriptions.write().unwrap();
remove_subscription(&mut subscriptions, id)
if remove_subscription(&mut subscriptions, id) {
true
} else {
let mut subscriptions = self
.subscriptions
.gossip_account_subscriptions
.write()
.unwrap();
remove_subscription(&mut subscriptions, id)
}
}
pub fn add_program_subscription(
@ -478,7 +520,17 @@ impl RpcSubscriptions {
sub_id: SubscriptionId,
subscriber: Subscriber<Response<RpcKeyedAccount>>,
) {
let mut subscriptions = self.subscriptions.program_subscriptions.write().unwrap();
let commitment_level = commitment
.unwrap_or_else(CommitmentConfig::recent)
.commitment;
let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip {
self.subscriptions
.gossip_program_subscriptions
.write()
.unwrap()
} else {
self.subscriptions.program_subscriptions.write().unwrap()
};
add_subscription(
&mut subscriptions,
program_id,
@ -491,7 +543,16 @@ impl RpcSubscriptions {
pub fn remove_program_subscription(&self, id: &SubscriptionId) -> bool {
let mut subscriptions = self.subscriptions.program_subscriptions.write().unwrap();
remove_subscription(&mut subscriptions, id)
if remove_subscription(&mut subscriptions, id) {
true
} else {
let mut subscriptions = self
.subscriptions
.gossip_program_subscriptions
.write()
.unwrap();
remove_subscription(&mut subscriptions, id)
}
}
pub fn add_signature_subscription(
@ -501,7 +562,17 @@ impl RpcSubscriptions {
sub_id: SubscriptionId,
subscriber: Subscriber<Response<RpcSignatureResult>>,
) {
let mut subscriptions = self.subscriptions.signature_subscriptions.write().unwrap();
let commitment_level = commitment
.unwrap_or_else(CommitmentConfig::recent)
.commitment;
let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip {
self.subscriptions
.gossip_signature_subscriptions
.write()
.unwrap()
} else {
self.subscriptions.signature_subscriptions.write().unwrap()
};
add_subscription(
&mut subscriptions,
signature,
@ -514,7 +585,16 @@ impl RpcSubscriptions {
pub fn remove_signature_subscription(&self, id: &SubscriptionId) -> bool {
let mut subscriptions = self.subscriptions.signature_subscriptions.write().unwrap();
remove_subscription(&mut subscriptions, id)
if remove_subscription(&mut subscriptions, id) {
true
} else {
let mut subscriptions = self
.subscriptions
.gossip_signature_subscriptions
.write()
.unwrap();
remove_subscription(&mut subscriptions, id)
}
}
/// Notify subscribers of changes to any accounts or new signatures since
@ -523,6 +603,12 @@ impl RpcSubscriptions {
self.enqueue_notification(NotificationEntry::Bank(cache_slot_info));
}
/// Notify SingleGossip commitment-level subscribers of changes to any accounts or new
/// signatures.
pub fn notify_gossip_subscribers(&self, slot: Slot) {
self.enqueue_notification(NotificationEntry::Gossip(slot));
}
pub fn add_slot_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<SlotInfo>) {
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let mut subscriptions = self.subscriptions.slot_subscriptions.write().unwrap();
@ -594,6 +680,7 @@ impl RpcSubscriptions {
notification_receiver: Receiver<NotificationEntry>,
subscriptions: Subscriptions,
bank_forks: Arc<RwLock<BankForks>>,
last_checked_slots: Arc<RwLock<HashMap<CommitmentLevel, Slot>>>,
) {
loop {
if exit.load(Ordering::Relaxed) {
@ -627,47 +714,32 @@ impl RpcSubscriptions {
}
}
NotificationEntry::Bank(cache_slot_info) => {
let pubkeys: Vec<_> = {
let subs = subscriptions.account_subscriptions.read().unwrap();
subs.keys().cloned().collect()
RpcSubscriptions::notify_accounts_programs_signatures(
&subscriptions.account_subscriptions,
&subscriptions.program_subscriptions,
&subscriptions.signature_subscriptions,
&bank_forks,
&cache_slot_info,
&notifier,
)
}
NotificationEntry::Gossip(slot) => {
let _ = last_checked_slots
.write()
.unwrap()
.insert(CommitmentLevel::SingleGossip, slot);
let cache_slot_info = CacheSlotInfo {
highest_confirmed_slot: slot,
..CacheSlotInfo::default()
};
for pubkey in &pubkeys {
Self::check_account(
pubkey,
&bank_forks,
subscriptions.account_subscriptions.clone(),
&notifier,
&cache_slot_info,
);
}
let programs: Vec<_> = {
let subs = subscriptions.program_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for program_id in &programs {
Self::check_program(
program_id,
&bank_forks,
subscriptions.program_subscriptions.clone(),
&notifier,
&cache_slot_info,
);
}
let signatures: Vec<_> = {
let subs = subscriptions.signature_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for signature in &signatures {
Self::check_signature(
signature,
&bank_forks,
subscriptions.signature_subscriptions.clone(),
&notifier,
&cache_slot_info,
);
}
RpcSubscriptions::notify_accounts_programs_signatures(
&subscriptions.gossip_account_subscriptions,
&subscriptions.gossip_program_subscriptions,
&subscriptions.gossip_signature_subscriptions,
&bank_forks,
&cache_slot_info,
&notifier,
)
}
},
Err(RecvTimeoutError::Timeout) => {
@ -681,6 +753,57 @@ impl RpcSubscriptions {
}
}
fn notify_accounts_programs_signatures(
account_subscriptions: &Arc<RpcAccountSubscriptions>,
program_subscriptions: &Arc<RpcProgramSubscriptions>,
signature_subscriptions: &Arc<RpcSignatureSubscriptions>,
bank_forks: &Arc<RwLock<BankForks>>,
cache_slot_info: &CacheSlotInfo,
notifier: &RpcNotifier,
) {
let pubkeys: Vec<_> = {
let subs = account_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for pubkey in &pubkeys {
Self::check_account(
pubkey,
&bank_forks,
account_subscriptions.clone(),
&notifier,
&cache_slot_info,
);
}
let programs: Vec<_> = {
let subs = program_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for program_id in &programs {
Self::check_program(
program_id,
&bank_forks,
program_subscriptions.clone(),
&notifier,
&cache_slot_info,
);
}
let signatures: Vec<_> = {
let subs = signature_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for signature in &signatures {
Self::check_signature(
signature,
&bank_forks,
signature_subscriptions.clone(),
&notifier,
&cache_slot_info,
);
}
}
fn shutdown(&mut self) -> std::thread::Result<()> {
if let Some(runtime) = self.notifier_runtime.take() {
info!("RPC Notifier runtime - shutting down");
@ -1232,4 +1355,135 @@ pub(crate) mod tests {
assert_eq!(subscriptions.len(), (num_keys - 1) as usize);
assert!(subscriptions.get(&0).is_none());
}
#[test]
#[serial]
fn test_gossip_separate_account_notifications() {
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(100);
let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let bank = Bank::new(&genesis_config);
let blockhash = bank.last_blockhash();
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank)));
let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone();
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);
let alice = Keypair::new();
let (subscriber0, _id_receiver, transport_receiver0) =
Subscriber::new_test("accountNotification");
let (subscriber1, _id_receiver, transport_receiver1) =
Subscriber::new_test("accountNotification");
let exit = Arc::new(AtomicBool::new(false));
let subscriptions = RpcSubscriptions::new(
&exit,
bank_forks.clone(),
Arc::new(RwLock::new(
BlockCommitmentCache::new_for_tests_with_blockstore_bank(
blockstore,
bank_forks.read().unwrap().get(1).unwrap().clone(),
1,
),
)),
);
let sub_id0 = SubscriptionId::Number(0 as u64);
subscriptions.add_account_subscription(
alice.pubkey(),
Some(CommitmentConfig::single_gossip()),
sub_id0.clone(),
subscriber0,
);
let sub_id1 = SubscriptionId::Number(1 as u64);
subscriptions.add_account_subscription(
alice.pubkey(),
Some(CommitmentConfig::recent()),
sub_id1.clone(),
subscriber1,
);
assert!(subscriptions
.subscriptions
.account_subscriptions
.read()
.unwrap()
.contains_key(&alice.pubkey()));
let tx = system_transaction::create_account(
&mint_keypair,
&alice,
blockhash,
1,
16,
&solana_budget_program::id(),
);
bank_forks
.write()
.unwrap()
.get(1)
.unwrap()
.process_transaction(&tx)
.unwrap();
let mut cache_slot_info = CacheSlotInfo::default();
cache_slot_info.current_slot = 1;
subscriptions.notify_subscribers(cache_slot_info);
let (response, _) = robust_poll_or_panic(transport_receiver1);
let expected = json!({
"jsonrpc": "2.0",
"method": "accountNotification",
"params": {
"result": {
"context": { "slot": 1 },
"value": {
"data": "1111111111111111",
"executable": false,
"lamports": 1,
"owner": "Budget1111111111111111111111111111111111111",
"rentEpoch": 1,
},
},
"subscription": 1,
}
});
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
subscriptions.notify_gossip_subscribers(1);
let (response, _) = robust_poll_or_panic(transport_receiver0);
let expected = json!({
"jsonrpc": "2.0",
"method": "accountNotification",
"params": {
"result": {
"context": { "slot": 1 },
"value": {
"data": "1111111111111111",
"executable": false,
"lamports": 1,
"owner": "Budget1111111111111111111111111111111111111",
"rentEpoch": 1,
},
},
"subscription": 0,
}
});
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
subscriptions.remove_account_subscription(&sub_id0);
assert!(subscriptions
.subscriptions
.account_subscriptions
.read()
.unwrap()
.contains_key(&alice.pubkey()));
subscriptions.remove_account_subscription(&sub_id1);
assert!(!subscriptions
.subscriptions
.account_subscriptions
.read()
.unwrap()
.contains_key(&alice.pubkey()));
}
}

View File

@ -37,6 +37,12 @@ impl CommitmentConfig {
}
}
pub fn single_gossip() -> Self {
Self {
commitment: CommitmentLevel::SingleGossip,
}
}
pub fn ok(self) -> Option<Self> {
if self == Self::default() {
None
@ -46,11 +52,12 @@ impl CommitmentConfig {
}
}
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq)]
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[serde(rename_all = "camelCase")]
pub enum CommitmentLevel {
Max,
Recent,
Root,
Single,
SingleGossip,
}