From 61ab2072bd42d7e81d63ad602fd5fb2025485041 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Mon, 23 Nov 2020 15:00:03 -0800 Subject: [PATCH] Clean up default commitment handling for subscriptions --- client/src/pubsub_client.rs | 8 +++- core/src/rpc_pubsub.rs | 12 ++++- core/src/rpc_subscriptions.rs | 55 ++++++++++++++-------- docs/src/developing/clients/jsonrpc-api.md | 2 +- local-cluster/tests/local_cluster.rs | 6 ++- 5 files changed, 57 insertions(+), 26 deletions(-) diff --git a/client/src/pubsub_client.rs b/client/src/pubsub_client.rs index daa9bac12..107d3fc0d 100644 --- a/client/src/pubsub_client.rs +++ b/client/src/pubsub_client.rs @@ -1,4 +1,7 @@ -use crate::rpc_response::{Response as RpcResponse, RpcSignatureResult, SlotInfo}; +use crate::{ + rpc_config::RpcSignatureSubscribeConfig, + rpc_response::{Response as RpcResponse, RpcSignatureResult, SlotInfo}, +}; use log::*; use serde::de::DeserializeOwned; use serde_json::{ @@ -205,6 +208,7 @@ impl PubsubClient { pub fn signature_subscribe( url: &str, signature: &Signature, + config: Option, ) -> Result< ( PubsubSignatureResponse, @@ -226,7 +230,7 @@ impl PubsubClient { "method":format!("{}Subscribe", SIGNATURE_OPERATION), "params":[ signature.to_string(), - {"enableReceivedNotification": true } + config ] }) .to_string(); diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index 77be0ccb0..51f15f4ae 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -442,7 +442,15 @@ mod tests { let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("signatureNotification"); - rpc.signature_subscribe(session, subscriber, tx.signatures[0].to_string(), None); + rpc.signature_subscribe( + session, + subscriber, + tx.signatures[0].to_string(), + Some(RpcSignatureSubscribeConfig { + commitment: Some(CommitmentConfig::single()), + ..RpcSignatureSubscribeConfig::default() + }), + ); process_transaction_and_notify(&bank_forks, &tx, &rpc.subscriptions, 0).unwrap(); @@ -472,7 +480,7 @@ mod tests { subscriber, tx.signatures[0].to_string(), Some(RpcSignatureSubscribeConfig { - commitment: None, + commitment: Some(CommitmentConfig::single()), enable_received_notification: Some(true), }), ); diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 5653f7ae6..e288433e7 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -122,7 +122,7 @@ type RpcRootSubscriptions = RwLock>>; fn add_subscription( subscriptions: &mut HashMap>>, hashmap_key: K, - commitment: Option, + commitment: CommitmentConfig, sub_id: SubscriptionId, subscriber: Subscriber, last_notified_slot: Slot, @@ -132,7 +132,6 @@ fn add_subscription( S: Clone, { let sink = subscriber.assign_id(sub_id.clone()).unwrap(); - let commitment = commitment.unwrap_or_else(CommitmentConfig::single); let subscription_data = SubscriptionData { sink, commitment, @@ -528,11 +527,11 @@ impl RpcSubscriptions { subscriber: Subscriber>, ) { let config = config.unwrap_or_default(); - let commitment_level = config + let commitment = config .commitment - .unwrap_or_else(CommitmentConfig::single) - .commitment; - let slot = match commitment_level { + .unwrap_or_else(CommitmentConfig::single_gossip); + + let slot = match commitment.commitment { CommitmentLevel::Max => self .block_commitment_cache .read() @@ -564,7 +563,7 @@ impl RpcSubscriptions { 0 }; - let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip { + let mut subscriptions = if commitment.commitment == CommitmentLevel::SingleGossip { self.subscriptions .gossip_account_subscriptions .write() @@ -572,10 +571,11 @@ impl RpcSubscriptions { } else { self.subscriptions.account_subscriptions.write().unwrap() }; + add_subscription( &mut subscriptions, pubkey, - config.commitment, + commitment, sub_id, subscriber, last_notified_slot, @@ -605,12 +605,12 @@ impl RpcSubscriptions { subscriber: Subscriber>, ) { let config = config.unwrap_or_default(); - let commitment_level = config + let commitment = config .account_config .commitment - .unwrap_or_else(CommitmentConfig::recent) - .commitment; - let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip { + .unwrap_or_else(CommitmentConfig::single_gossip); + + let mut subscriptions = if commitment.commitment == CommitmentLevel::SingleGossip { self.subscriptions .gossip_program_subscriptions .write() @@ -618,10 +618,11 @@ impl RpcSubscriptions { } else { self.subscriptions.program_subscriptions.write().unwrap() }; + add_subscription( &mut subscriptions, program_id, - config.account_config.commitment, + commitment, sub_id, subscriber, 0, // last_notified_slot is not utilized for program subscriptions @@ -657,11 +658,9 @@ impl RpcSubscriptions { .map(|config| (config.commitment, config.enable_received_notification)) .unwrap_or_default(); - let commitment_level = commitment - .unwrap_or_else(CommitmentConfig::recent) - .commitment; + let commitment = commitment.unwrap_or_else(CommitmentConfig::single_gossip); - let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip { + let mut subscriptions = if commitment.commitment == CommitmentLevel::SingleGossip { self.subscriptions .gossip_signature_subscriptions .write() @@ -669,6 +668,7 @@ impl RpcSubscriptions { } else { self.subscriptions.signature_subscriptions.write().unwrap() }; + add_subscription( &mut subscriptions, signature, @@ -1286,7 +1286,13 @@ pub(crate) mod tests { ); subscriptions.add_program_subscription( solana_stake_program::id(), - None, + Some(RpcProgramAccountsConfig { + account_config: RpcAccountInfoConfig { + commitment: Some(CommitmentConfig::recent()), + ..RpcAccountInfoConfig::default() + }, + ..RpcProgramAccountsConfig::default() + }), sub_id.clone(), subscriber, ); @@ -1645,13 +1651,22 @@ pub(crate) mod tests { fn test_add_and_remove_subscription() { let mut subscriptions: HashMap>> = HashMap::new(); + let commitment = CommitmentConfig::single_gossip(); let num_keys = 5; for key in 0..num_keys { let (subscriber, _id_receiver, _transport_receiver) = Subscriber::new_test("notification"); let sub_id = SubscriptionId::Number(key); - add_subscription(&mut subscriptions, key, None, sub_id, subscriber, 0, None); + add_subscription( + &mut subscriptions, + key, + commitment, + sub_id, + subscriber, + 0, + None, + ); } // Add another subscription to the "0" key @@ -1660,7 +1675,7 @@ pub(crate) mod tests { add_subscription( &mut subscriptions, 0, - None, + commitment, extra_sub_id.clone(), subscriber, 0, diff --git a/docs/src/developing/clients/jsonrpc-api.md b/docs/src/developing/clients/jsonrpc-api.md index 1c0a973ee..954964ed6 100644 --- a/docs/src/developing/clients/jsonrpc-api.md +++ b/docs/src/developing/clients/jsonrpc-api.md @@ -2843,7 +2843,7 @@ After connecting to the RPC PubSub websocket at `ws://
/`: - Submit subscription requests to the websocket using the methods below - Multiple subscriptions may be active at once -- Many subscriptions take the optional [`commitment` parameter](jsonrpc-api.md#configuring-state-commitment), defining how finalized a change should be to trigger a notification. For subscriptions, if commitment is unspecified, the default value is `"single"`. +- Many subscriptions take the optional [`commitment` parameter](jsonrpc-api.md#configuring-state-commitment), defining how finalized a change should be to trigger a notification. For subscriptions, if commitment is unspecified, the default value is `"singleGossip"`. ### accountSubscribe diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 2e142b2b2..76bfafd40 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -6,7 +6,7 @@ use serial_test_derive::serial; use solana_client::{ pubsub_client::PubsubClient, rpc_client::RpcClient, - rpc_config::RpcProgramAccountsConfig, + rpc_config::{RpcProgramAccountsConfig, RpcSignatureSubscribeConfig}, rpc_response::RpcSignatureResult, thin_client::{create_client, ThinClient}, }; @@ -179,6 +179,10 @@ fn test_local_cluster_signature_subscribe() { let (mut sig_subscribe_client, receiver) = PubsubClient::signature_subscribe( &format!("ws://{}", &non_bootstrap_info.rpc_pubsub.to_string()), &transaction.signatures[0], + Some(RpcSignatureSubscribeConfig { + commitment: Some(CommitmentConfig::recent()), + enable_received_notification: Some(true), + }), ) .unwrap();