diff --git a/Cargo.lock b/Cargo.lock index 22736352e4..8496b61852 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4798,6 +4798,7 @@ dependencies = [ "serde_json", "serial_test", "solana-client", + "solana-ledger", "solana-logger 1.10.0", "solana-measure", "solana-merkle-tree", @@ -4809,6 +4810,7 @@ dependencies = [ "solana-sdk", "solana-streamer", "solana-test-validator", + "solana-transaction-status", "solana-version", "systemstat", ] diff --git a/client-test/Cargo.toml b/client-test/Cargo.toml index 061eb33f47..e6aace466a 100644 --- a/client-test/Cargo.toml +++ b/client-test/Cargo.toml @@ -13,6 +13,7 @@ edition = "2021" serde_json = "1.0.73" serial_test = "0.5.1" solana-client = { path = "../client", version = "=1.10.0" } +solana-ledger = { path = "../ledger", version = "=1.10.0" } solana-measure = { path = "../measure", version = "=1.10.0" } solana-merkle-tree = { path = "../merkle-tree", version = "=1.10.0" } solana-metrics = { path = "../metrics", version = "=1.10.0" } @@ -23,6 +24,7 @@ solana-runtime = { path = "../runtime", version = "=1.10.0" } solana-sdk = { path = "../sdk", version = "=1.10.0" } solana-streamer = { path = "../streamer", version = "=1.10.0" } solana-test-validator = { path = "../test-validator", version = "=1.10.0" } +solana-transaction-status = { path = "../transaction-status", version = "=1.10.0" } solana-version = { path = "../version", version = "=1.10.0" } systemstat = "0.1.10" diff --git a/client-test/tests/client.rs b/client-test/tests/client.rs index 799cdc41d7..8f0116e106 100644 --- a/client-test/tests/client.rs +++ b/client-test/tests/client.rs @@ -4,11 +4,16 @@ use { solana_client::{ pubsub_client::PubsubClient, rpc_client::RpcClient, - rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, - rpc_response::SlotInfo, + rpc_config::{ + RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, + RpcProgramAccountsConfig, + }, + rpc_response::{RpcBlockUpdate, SlotInfo}, }, + solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}, solana_rpc::{ optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, + rpc::create_test_transactions_and_populate_blockstore, rpc_pubsub_service::{PubSubConfig, PubSubService}, rpc_subscriptions::RpcSubscriptions, }, @@ -20,7 +25,7 @@ use { }, solana_sdk::{ clock::Slot, - commitment_config::CommitmentConfig, + commitment_config::{CommitmentConfig, CommitmentLevel}, native_token::sol_to_lamports, pubkey::Pubkey, rpc_port, @@ -29,11 +34,12 @@ use { }, solana_streamer::socket::SocketAddrSpace, solana_test_validator::TestValidator, + solana_transaction_status::{TransactionDetails, UiTransactionEncoding}, std::{ collections::HashSet, net::{IpAddr, SocketAddr}, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, RwLock, }, thread::sleep, @@ -119,9 +125,10 @@ fn test_account_subscription() { let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); bank_forks.write().unwrap().insert(bank1); let bob = Keypair::new(); - + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::default())), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), @@ -194,6 +201,112 @@ fn test_account_subscription() { assert_eq!(errors, [].to_vec()); } +#[test] +#[serial] +fn test_block_subscription() { + // setup BankForks + let exit = Arc::new(AtomicBool::new(false)); + let GenesisConfigInfo { + genesis_config, + mint_keypair: alice, + .. + } = create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + + // setup Blockstore + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Blockstore::open(&ledger_path).unwrap(); + let blockstore = Arc::new(blockstore); + + // populate ledger with test txs + let bank = bank_forks.read().unwrap().working_bank(); + let keypair1 = Keypair::new(); + let keypair2 = Keypair::new(); + let keypair3 = Keypair::new(); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root())); + let _confirmed_block_signatures = create_test_transactions_and_populate_blockstore( + vec![&alice, &keypair1, &keypair2, &keypair3], + 0, + bank, + blockstore.clone(), + max_complete_transaction_status_slot, + ); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + // setup RpcSubscriptions && PubSubService + let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore( + &exit, + max_complete_transaction_status_slot, + blockstore.clone(), + bank_forks.clone(), + Arc::new(RwLock::new(BlockCommitmentCache::default())), + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), + )); + let pubsub_addr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + rpc_port::DEFAULT_RPC_PUBSUB_PORT, + ); + let pub_cfg = PubSubConfig { + enable_block_subscription: true, + ..PubSubConfig::default() + }; + let (trigger, pubsub_service) = PubSubService::new(pub_cfg, &subscriptions, pubsub_addr); + + std::thread::sleep(Duration::from_millis(400)); + + // setup PubsubClient + let (mut client, receiver) = PubsubClient::block_subscribe( + &format!("ws://0.0.0.0:{}/", pubsub_addr.port()), + RpcBlockSubscribeFilter::All, + Some(RpcBlockSubscribeConfig { + commitment: Some(CommitmentConfig { + commitment: CommitmentLevel::Confirmed, + }), + encoding: Some(UiTransactionEncoding::Json), + transaction_details: Some(TransactionDetails::Signatures), + show_rewards: None, + }), + ) + .unwrap(); + + // trigger Gossip notification + let slot = bank_forks.read().unwrap().highest_slot(); + subscriptions.notify_gossip_subscribers(slot); + let maybe_actual = receiver.recv_timeout(Duration::from_millis(400)); + match maybe_actual { + Ok(actual) => { + let complete_block = blockstore.get_complete_block(slot, false).unwrap(); + let block = complete_block.clone().configure( + UiTransactionEncoding::Json, + TransactionDetails::Signatures, + false, + ); + let expected = RpcBlockUpdate { + slot, + block: Some(block), + err: None, + }; + let block = complete_block.configure( + UiTransactionEncoding::Json, + TransactionDetails::Signatures, + false, + ); + assert_eq!(actual.value.slot, expected.slot); + assert!(block.eq(&actual.value.block.unwrap())); + } + Err(e) => { + eprintln!("unexpected websocket receive timeout"); + assert_eq!(Some(e), None); + } + } + + // cleanup + exit.store(true, Ordering::Relaxed); + trigger.cancel(); + client.shutdown().unwrap(); + pubsub_service.close().unwrap(); +} + #[test] #[serial] fn test_program_subscription() { @@ -215,9 +328,10 @@ fn test_program_subscription() { let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); bank_forks.write().unwrap().insert(bank1); let bob = Keypair::new(); - + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::default())), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), @@ -300,9 +414,10 @@ fn test_root_subscription() { let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone(); let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); bank_forks.write().unwrap().insert(bank1); - + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::default())), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), @@ -350,8 +465,10 @@ fn test_slot_subscription() { let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::default())), optimistically_confirmed_bank, diff --git a/client/src/pubsub_client.rs b/client/src/pubsub_client.rs index d5b70b5569..ad82aa45ad 100644 --- a/client/src/pubsub_client.rs +++ b/client/src/pubsub_client.rs @@ -1,12 +1,13 @@ use { crate::{ rpc_config::{ - RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, - RpcTransactionLogsConfig, RpcTransactionLogsFilter, + RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, + RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig, + RpcTransactionLogsFilter, }, rpc_response::{ - Response as RpcResponse, RpcKeyedAccount, RpcLogsResponse, RpcSignatureResult, - SlotInfo, SlotUpdate, + Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse, + RpcSignatureResult, SlotInfo, SlotUpdate, }, }, log::*, @@ -173,6 +174,12 @@ pub type SignatureSubscription = ( Receiver>, ); +pub type PubsubBlockClientSubscription = PubsubClientSubscription>; +pub type BlockSubscription = ( + PubsubBlockClientSubscription, + Receiver>, +); + pub type PubsubProgramClientSubscription = PubsubClientSubscription>; pub type ProgramSubscription = ( PubsubProgramClientSubscription, @@ -266,6 +273,45 @@ impl PubsubClient { Ok((result, receiver)) } + pub fn block_subscribe( + url: &str, + filter: RpcBlockSubscribeFilter, + config: Option, + ) -> Result { + let url = Url::parse(url)?; + let socket = connect_with_retry(url)?; + let (sender, receiver) = channel(); + + let socket = Arc::new(RwLock::new(socket)); + let socket_clone = socket.clone(); + let exit = Arc::new(AtomicBool::new(false)); + let exit_clone = exit.clone(); + let body = json!({ + "jsonrpc":"2.0", + "id":1, + "method":"blockSubscribe", + "params":[filter, config] + }) + .to_string(); + + let subscription_id = PubsubBlockClientSubscription::send_subscribe(&socket_clone, body)?; + + let t_cleanup = std::thread::spawn(move || { + Self::cleanup_with_sender(exit_clone, &socket_clone, sender) + }); + + let result = PubsubClientSubscription { + message_type: PhantomData, + operation: "blocks", + socket, + subscription_id, + t_cleanup: Some(t_cleanup), + exit, + }; + + Ok((result, receiver)) + } + pub fn logs_subscribe( url: &str, filter: RpcTransactionLogsFilter, diff --git a/client/src/rpc_config.rs b/client/src/rpc_config.rs index 93d78f47ca..edf3dc8198 100644 --- a/client/src/rpc_config.rs +++ b/client/src/rpc_config.rs @@ -182,6 +182,23 @@ pub struct RpcSignatureSubscribeConfig { pub enable_received_notification: Option, } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum RpcBlockSubscribeFilter { + All, + MentionsAccountOrProgram(String), +} + +#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RpcBlockSubscribeConfig { + #[serde(flatten)] + pub commitment: Option, + pub encoding: Option, + pub transaction_details: Option, + pub show_rewards: Option, +} + #[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct RpcSignaturesForAddressConfig { diff --git a/client/src/rpc_response.rs b/client/src/rpc_response.rs index 81146a1f0d..a576c6168e 100644 --- a/client/src/rpc_response.rs +++ b/client/src/rpc_response.rs @@ -9,9 +9,10 @@ use { transaction::{Result, TransactionError}, }, solana_transaction_status::{ - ConfirmedTransactionStatusWithSignature, TransactionConfirmationStatus, + ConfirmedTransactionStatusWithSignature, TransactionConfirmationStatus, UiConfirmedBlock, }, std::{collections::HashMap, fmt, net::SocketAddr}, + thiserror::Error, }; pub type RpcResult = client_error::Result>; @@ -424,6 +425,20 @@ pub struct RpcInflationReward { pub commission: Option, // Vote account commission when the reward was credited } +#[derive(Clone, Deserialize, Serialize, Debug, Error, Eq, PartialEq)] +pub enum RpcBlockUpdateError { + #[error("block store error")] + BlockStoreError, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct RpcBlockUpdate { + pub slot: Slot, + pub block: Option, + pub err: Option, +} + impl From for RpcConfirmedTransactionStatusWithSignature { fn from(value: ConfirmedTransactionStatusWithSignature) -> Self { let ConfirmedTransactionStatusWithSignature { diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 2594488085..124d4a6c5f 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -963,7 +963,10 @@ mod tests { signature::{Keypair, Signature, Signer}, }, solana_vote_program::vote_state::Vote, - std::{collections::BTreeSet, sync::Arc}, + std::{ + collections::BTreeSet, + sync::{atomic::AtomicU64, Arc}, + }, }; #[test] @@ -1650,8 +1653,10 @@ mod tests { let vote_tracker = VoteTracker::new(&bank); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::default())), optimistically_confirmed_bank, @@ -1769,8 +1774,10 @@ mod tests { let bank = bank_forks.read().unwrap().get(0).unwrap().clone(); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::default())), optimistically_confirmed_bank, diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 6827bd8bbf..f47226e27e 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -3087,8 +3087,10 @@ pub mod tests { let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(bank_forks); let exit = Arc::new(AtomicBool::new(false)); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::default())), optimistically_confirmed_bank, @@ -3622,8 +3624,10 @@ pub mod tests { &replay_vote_sender, &VerifyRecyclers::default(), ); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks.clone(), block_commitment_cache, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), @@ -3690,8 +3694,10 @@ pub mod tests { let exit = Arc::new(AtomicBool::new(false)); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks.clone(), block_commitment_cache.clone(), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 9ba7b93e6f..80c426abd9 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -399,6 +399,7 @@ pub mod tests { solana_runtime::bank::Bank, solana_sdk::signature::{Keypair, Signer}, solana_streamer::socket::SocketAddrSpace, + std::sync::atomic::AtomicU64, std::sync::atomic::Ordering, }; @@ -448,6 +449,7 @@ pub mod tests { let bank_forks = Arc::new(RwLock::new(bank_forks)); let tower = Tower::default(); let accounts_package_channel = channel(); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let tvu = Tvu::new( &vote_keypair.pubkey(), Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])), @@ -465,6 +467,7 @@ pub mod tests { ledger_signal_receiver, &Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks.clone(), block_commitment_cache.clone(), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), diff --git a/core/src/validator.rs b/core/src/validator.rs index f447690e80..89b89a4504 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -538,6 +538,8 @@ impl Validator { let rpc_subscriptions = Arc::new(RpcSubscriptions::new_with_config( &exit, + max_complete_transaction_status_slot.clone(), + blockstore.clone(), bank_forks.clone(), block_commitment_cache.clone(), optimistically_confirmed_bank.clone(), diff --git a/replica-node/src/replica_node.rs b/replica-node/src/replica_node.rs index 351cdfdaba..021783a2f8 100644 --- a/replica-node/src/replica_node.rs +++ b/replica-node/src/replica_node.rs @@ -191,6 +191,8 @@ fn start_client_rpc_services( let subscriptions = Arc::new(RpcSubscriptions::new( &exit, + max_complete_transaction_status_slot.clone(), + blockstore.clone(), bank_forks.clone(), block_commitment_cache.clone(), optimistically_confirmed_bank.clone(), diff --git a/rpc/src/optimistically_confirmed_bank_tracker.rs b/rpc/src/optimistically_confirmed_bank_tracker.rs index d730fb4891..95f2f863d9 100644 --- a/rpc/src/optimistically_confirmed_bank_tracker.rs +++ b/rpc/src/optimistically_confirmed_bank_tracker.rs @@ -321,6 +321,7 @@ mod tests { accounts_background_service::AbsRequestSender, commitment::BlockCommitmentCache, }, solana_sdk::pubkey::Pubkey, + std::sync::atomic::AtomicU64, }; #[test] @@ -343,8 +344,10 @@ mod tests { OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks.clone(), block_commitment_cache, optimistically_confirmed_bank.clone(), diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 941863416c..64bcd421f8 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -2101,7 +2101,7 @@ fn verify_and_parse_signatures_for_address_params( Ok((address, before, until, limit)) } -fn check_is_at_least_confirmed(commitment: CommitmentConfig) -> Result<()> { +pub(crate) fn check_is_at_least_confirmed(commitment: CommitmentConfig) -> Result<()> { if !commitment.is_at_least_confirmed() { return Err(Error::invalid_params( "Method does not support commitment below `confirmed`", @@ -7807,9 +7807,10 @@ pub mod tests { let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let mut pending_optimistically_confirmed_banks = HashSet::new(); - + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks.clone(), block_commitment_cache.clone(), optimistically_confirmed_bank.clone(), diff --git a/rpc/src/rpc_pubsub.rs b/rpc/src/rpc_pubsub.rs index bea5a00e41..b9564fb9f1 100644 --- a/rpc/src/rpc_pubsub.rs +++ b/rpc/src/rpc_pubsub.rs @@ -2,11 +2,13 @@ use { crate::{ + rpc::check_is_at_least_confirmed, rpc_pubsub_service::PubSubConfig, rpc_subscription_tracker::{ - AccountSubscriptionParams, LogsSubscriptionKind, LogsSubscriptionParams, - ProgramSubscriptionParams, SignatureSubscriptionParams, SubscriptionControl, - SubscriptionId, SubscriptionParams, SubscriptionToken, + AccountSubscriptionParams, BlockSubscriptionKind, BlockSubscriptionParams, + LogsSubscriptionKind, LogsSubscriptionParams, ProgramSubscriptionParams, + SignatureSubscriptionParams, SubscriptionControl, SubscriptionId, SubscriptionParams, + SubscriptionToken, }, }, dashmap::DashMap, @@ -16,15 +18,17 @@ use { solana_account_decoder::{UiAccount, UiAccountEncoding}, solana_client::{ rpc_config::{ - RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, - RpcTransactionLogsConfig, RpcTransactionLogsFilter, + RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, + RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig, + RpcTransactionLogsFilter, }, rpc_response::{ - Response as RpcResponse, RpcKeyedAccount, RpcLogsResponse, RpcSignatureResult, RpcVote, - SlotInfo, SlotUpdate, + Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse, + RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate, }, }, solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}, + solana_transaction_status::UiTransactionEncoding, std::{str::FromStr, sync::Arc}, }; @@ -187,6 +191,28 @@ pub trait RpcSolPubSub { id: PubSubSubscriptionId, ) -> Result; + // Subscribe to block data and content + #[pubsub(subscription = "blockNotification", subscribe, name = "blockSubscribe")] + fn block_subscribe( + &self, + meta: Self::Metadata, + subscriber: Subscriber>, + filter: RpcBlockSubscribeFilter, + config: Option, + ); + + // Unsubscribe from block notification subscription. + #[pubsub( + subscription = "blockNotification", + unsubscribe, + name = "blockUnsubscribe" + )] + fn block_unsubscribe( + &self, + meta: Option, + id: PubSubSubscriptionId, + ) -> Result; + // Get notification when vote is encountered #[pubsub(subscription = "voteNotification", subscribe, name = "voteSubscribe")] fn vote_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber); @@ -295,6 +321,18 @@ mod internal { #[rpc(name = "slotsUpdatesUnsubscribe")] fn slots_updates_unsubscribe(&self, id: SubscriptionId) -> Result; + // Subscribe to block data and content + #[rpc(name = "blockSubscribe")] + fn block_subscribe( + &self, + filter: RpcBlockSubscribeFilter, + config: Option, + ) -> Result; + + // Unsubscribe from block notification subscription. + #[rpc(name = "blockUnsubscribe")] + fn block_unsubscribe(&self, id: SubscriptionId) -> Result; + // Get notification when vote is encountered #[rpc(name = "voteSubscribe")] fn vote_subscribe(&self) -> Result; @@ -475,6 +513,42 @@ impl RpcSolPubSubInternal for RpcSolPubSubImpl { self.unsubscribe(id) } + fn block_subscribe( + &self, + filter: RpcBlockSubscribeFilter, + config: Option, + ) -> Result { + if !self.config.enable_block_subscription { + return Err(Error::new(jsonrpc_core::ErrorCode::MethodNotFound)); + } + let config = config.unwrap_or_default(); + let commitment = config.commitment.unwrap_or_default(); + check_is_at_least_confirmed(commitment)?; + let params = BlockSubscriptionParams { + commitment: config.commitment.unwrap_or_default(), + encoding: config.encoding.unwrap_or(UiTransactionEncoding::Base64), + kind: match filter { + RpcBlockSubscribeFilter::All => BlockSubscriptionKind::All, + RpcBlockSubscribeFilter::MentionsAccountOrProgram(key) => { + BlockSubscriptionKind::MentionsAccountOrProgram(param::( + &key, + "mentions_account_or_program", + )?) + } + }, + transaction_details: config.transaction_details.unwrap_or_default(), + show_rewards: config.show_rewards.unwrap_or_default(), + }; + self.subscribe(SubscriptionParams::Block(params)) + } + + fn block_unsubscribe(&self, id: SubscriptionId) -> Result { + if !self.config.enable_block_subscription { + return Err(Error::new(jsonrpc_core::ErrorCode::MethodNotFound)); + } + self.unsubscribe(id) + } + fn vote_subscribe(&self) -> Result { if !self.config.enable_vote_subscription { return Err(Error::new(jsonrpc_core::ErrorCode::MethodNotFound)); @@ -539,7 +613,10 @@ mod tests { solana_stake_program::stake_state, solana_vote_program::vote_state::Vote, std::{ - sync::{atomic::AtomicBool, RwLock}, + sync::{ + atomic::{AtomicBool, AtomicU64}, + RwLock, + }, thread::sleep, time::Duration, }, @@ -578,8 +655,10 @@ mod tests { let bank = Bank::new_for_tests(&genesis_config); let blockhash = bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &Arc::new(AtomicBool::new(false)), + max_complete_transaction_status_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), @@ -705,7 +784,11 @@ mod tests { let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let mut io = IoHandler::<()>::default(); - let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(bank_forks)); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks( + max_complete_transaction_status_slot, + bank_forks, + )); let (rpc, _receiver) = rpc_pubsub_service::test_connection(&subscriptions); io.extend_with(rpc.to_delegate()); @@ -756,9 +839,10 @@ mod tests { let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone(); let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); bank_forks.write().unwrap().insert(bank1); - + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &Arc::new(AtomicBool::new(false)), + max_complete_transaction_status_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( 1, 1, @@ -873,9 +957,10 @@ mod tests { let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone(); let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); bank_forks.write().unwrap().insert(bank1); - + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &Arc::new(AtomicBool::new(false)), + max_complete_transaction_status_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( 1, 1, @@ -963,7 +1048,11 @@ mod tests { )))); let mut io = IoHandler::<()>::default(); - let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(bank_forks)); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks( + max_complete_transaction_status_slot, + bank_forks, + )); let (rpc, _receiver) = rpc_pubsub_service::test_connection(&subscriptions); io.extend_with(rpc.to_delegate()); @@ -1007,8 +1096,10 @@ mod tests { let bob = Keypair::new(); let exit = Arc::new(AtomicBool::new(false)); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), @@ -1058,9 +1149,10 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())); - + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks.clone(), block_commitment_cache, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), @@ -1128,7 +1220,11 @@ mod tests { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let bank = Bank::new_for_tests(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); - let rpc_subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(bank_forks)); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let rpc_subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks( + max_complete_transaction_status_slot, + bank_forks, + )); let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&rpc_subscriptions); rpc.slot_subscribe().unwrap(); @@ -1156,7 +1252,11 @@ mod tests { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let bank = Bank::new_for_tests(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); - let rpc_subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(bank_forks)); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let rpc_subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks( + max_complete_transaction_status_slot, + bank_forks, + )); let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&rpc_subscriptions); let sub_id = rpc.slot_subscribe().unwrap(); @@ -1198,8 +1298,10 @@ mod tests { // Setup Subscriptions let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks, block_commitment_cache, optimistically_confirmed_bank, @@ -1228,7 +1330,11 @@ mod tests { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let bank = Bank::new_for_tests(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); - let rpc_subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(bank_forks)); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let rpc_subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks( + max_complete_transaction_status_slot, + bank_forks, + )); let (rpc, _receiver) = rpc_pubsub_service::test_connection(&rpc_subscriptions); let sub_id = rpc.vote_subscribe().unwrap(); diff --git a/rpc/src/rpc_pubsub_service.rs b/rpc/src/rpc_pubsub_service.rs index a8f6c37457..78e7a113f3 100644 --- a/rpc/src/rpc_pubsub_service.rs +++ b/rpc/src/rpc_pubsub_service.rs @@ -33,6 +33,7 @@ pub const DEFAULT_WORKER_THREADS: usize = 1; #[derive(Debug, Clone)] pub struct PubSubConfig { + pub enable_block_subscription: bool, pub enable_vote_subscription: bool, pub max_active_subscriptions: usize, pub queue_capacity_items: usize, @@ -44,6 +45,7 @@ pub struct PubSubConfig { impl Default for PubSubConfig { fn default() -> Self { Self { + enable_block_subscription: false, enable_vote_subscription: false, max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS, queue_capacity_items: DEFAULT_QUEUE_CAPACITY_ITEMS, @@ -57,6 +59,7 @@ impl Default for PubSubConfig { impl PubSubConfig { pub fn default_for_tests() -> Self { Self { + enable_block_subscription: false, enable_vote_subscription: false, max_active_subscriptions: MAX_ACTIVE_SUBSCRIPTIONS, queue_capacity_items: DEFAULT_TEST_QUEUE_CAPACITY_ITEMS, @@ -142,6 +145,9 @@ fn count_final(params: &SubscriptionParams) { SubscriptionParams::Vote => { inc_new_counter_info!("rpc-pubsub-final-votes", 1); } + SubscriptionParams::Block(_) => { + inc_new_counter_info!("rpc-pubsub-final-slot-txs", 1); + } } } @@ -187,16 +193,17 @@ pub struct TestBroadcastReceiver { #[cfg(test)] impl TestBroadcastReceiver { pub fn recv(&mut self) -> String { - use { - std::{ - thread::sleep, - time::{Duration, Instant}, - }, - tokio::sync::broadcast::error::TryRecvError, + return match self.recv_timeout(std::time::Duration::from_secs(5)) { + Err(err) => panic!("broadcast receiver error: {}", err), + Ok(str) => str, }; + } - let timeout = Duration::from_secs(5); - let started = Instant::now(); + pub fn recv_timeout(&mut self, timeout: std::time::Duration) -> Result { + use std::thread::sleep; + use tokio::sync::broadcast::error::TryRecvError; + + let started = std::time::Instant::now(); loop { match self.inner.try_recv() { @@ -206,17 +213,16 @@ impl TestBroadcastReceiver { started.elapsed().as_millis() ); if let Some(json) = self.handler.handle(notification).expect("handler failed") { - return json.to_string(); + return Ok(json.to_string()); } } Err(TryRecvError::Empty) => { - assert!( - started.elapsed() <= timeout, - "TestBroadcastReceiver: no data, timeout reached" - ); - sleep(Duration::from_millis(50)); + if started.elapsed() > timeout { + return Err("TestBroadcastReceiver: no data, timeout reached".into()); + } + sleep(std::time::Duration::from_millis(50)); } - Err(err) => panic!("broadcast receiver error: {}", err), + Err(e) => return Err(e.to_string()), } } } @@ -230,6 +236,7 @@ pub fn test_connection( let rpc_impl = RpcSolPubSubImpl::new( PubSubConfig { + enable_block_subscription: true, enable_vote_subscription: true, queue_capacity_items: 100, ..PubSubConfig::default() @@ -383,7 +390,10 @@ mod tests { }, std::{ net::{IpAddr, Ipv4Addr}, - sync::{atomic::AtomicBool, RwLock}, + sync::{ + atomic::{AtomicBool, AtomicU64}, + RwLock, + }, }, }; @@ -391,6 +401,7 @@ mod tests { fn test_pubsub_new() { let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); let exit = Arc::new(AtomicBool::new(false)); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let bank = Bank::new_for_tests(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); @@ -398,6 +409,7 @@ mod tests { OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), optimistically_confirmed_bank, diff --git a/rpc/src/rpc_subscription_tracker.rs b/rpc/src/rpc_subscription_tracker.rs index 36ef4d5e99..bc202591d2 100644 --- a/rpc/src/rpc_subscription_tracker.rs +++ b/rpc/src/rpc_subscription_tracker.rs @@ -11,6 +11,7 @@ use { solana_sdk::{ clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature, }, + solana_transaction_status::{TransactionDetails, UiTransactionEncoding}, std::{ collections::{ hash_map::{Entry, HashMap}, @@ -44,6 +45,7 @@ impl From for u64 { #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum SubscriptionParams { Account(AccountSubscriptionParams), + Block(BlockSubscriptionParams), Logs(LogsSubscriptionParams), Program(ProgramSubscriptionParams), Signature(SignatureSubscriptionParams), @@ -62,6 +64,7 @@ impl SubscriptionParams { SubscriptionParams::Signature(_) => "signatureNotification", SubscriptionParams::Slot => "slotNotification", SubscriptionParams::SlotsUpdates => "slotsUpdatesNotification", + SubscriptionParams::Block(_) => "blockNotification", SubscriptionParams::Root => "rootNotification", SubscriptionParams::Vote => "voteNotification", } @@ -73,6 +76,7 @@ impl SubscriptionParams { SubscriptionParams::Logs(params) => Some(params.commitment), SubscriptionParams::Program(params) => Some(params.commitment), SubscriptionParams::Signature(params) => Some(params.commitment), + SubscriptionParams::Block(params) => Some(params.commitment), SubscriptionParams::Slot | SubscriptionParams::SlotsUpdates | SubscriptionParams::Root @@ -83,12 +87,13 @@ impl SubscriptionParams { fn is_commitment_watcher(&self) -> bool { let commitment = match self { SubscriptionParams::Account(params) => ¶ms.commitment, + SubscriptionParams::Block(params) => ¶ms.commitment, SubscriptionParams::Logs(params) => ¶ms.commitment, SubscriptionParams::Program(params) => ¶ms.commitment, SubscriptionParams::Signature(params) => ¶ms.commitment, - SubscriptionParams::Slot + SubscriptionParams::Root + | SubscriptionParams::Slot | SubscriptionParams::SlotsUpdates - | SubscriptionParams::Root | SubscriptionParams::Vote => return false, }; !commitment.is_confirmed() @@ -97,12 +102,13 @@ impl SubscriptionParams { fn is_gossip_watcher(&self) -> bool { let commitment = match self { SubscriptionParams::Account(params) => ¶ms.commitment, + SubscriptionParams::Block(params) => ¶ms.commitment, SubscriptionParams::Logs(params) => ¶ms.commitment, SubscriptionParams::Program(params) => ¶ms.commitment, SubscriptionParams::Signature(params) => ¶ms.commitment, - SubscriptionParams::Slot + SubscriptionParams::Root + | SubscriptionParams::Slot | SubscriptionParams::SlotsUpdates - | SubscriptionParams::Root | SubscriptionParams::Vote => return false, }; commitment.is_confirmed() @@ -127,6 +133,21 @@ pub struct AccountSubscriptionParams { pub commitment: CommitmentConfig, } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct BlockSubscriptionParams { + pub commitment: CommitmentConfig, + pub encoding: UiTransactionEncoding, + pub kind: BlockSubscriptionKind, + pub transaction_details: TransactionDetails, + pub show_rewards: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum BlockSubscriptionKind { + All, + MentionsAccountOrProgram(Pubkey), +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct LogsSubscriptionParams { pub kind: LogsSubscriptionKind, @@ -473,12 +494,15 @@ impl SubscriptionsTracker { ) -> &HashMap>> { &self.by_signature } + pub fn commitment_watchers(&self) -> &HashMap> { &self.commitment_watchers } + pub fn gossip_watchers(&self) -> &HashMap> { &self.gossip_watchers } + pub fn node_progress_watchers(&self) -> &HashMap> { &self.node_progress_watchers } diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index c10df6116d..ce2d84b8ff 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -1,14 +1,14 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request - use { crate::{ optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, parsed_token_accounts::{get_parsed_token_account, get_parsed_token_accounts}, rpc_pubsub_service::PubSubConfig, rpc_subscription_tracker::{ - AccountSubscriptionParams, LogsSubscriptionKind, LogsSubscriptionParams, - ProgramSubscriptionParams, SignatureSubscriptionParams, SubscriptionControl, - SubscriptionId, SubscriptionInfo, SubscriptionParams, SubscriptionsTracker, + AccountSubscriptionParams, BlockSubscriptionKind, BlockSubscriptionParams, + LogsSubscriptionKind, LogsSubscriptionParams, ProgramSubscriptionParams, + SignatureSubscriptionParams, SubscriptionControl, SubscriptionId, SubscriptionInfo, + SubscriptionParams, SubscriptionsTracker, }, }, crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender}, @@ -18,10 +18,12 @@ use { solana_client::{ rpc_filter::RpcFilterType, rpc_response::{ - ProcessedSignatureResult, ReceivedSignatureResult, Response, RpcKeyedAccount, - RpcLogsResponse, RpcResponseContext, RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate, + ProcessedSignatureResult, ReceivedSignatureResult, Response, RpcBlockUpdate, + RpcBlockUpdateError, RpcKeyedAccount, RpcLogsResponse, RpcResponseContext, + RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate, }, }, + solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}, solana_measure::measure::Measure, solana_rayon_threadlimit::get_thread_count, solana_runtime::{ @@ -37,6 +39,7 @@ use { timing::timestamp, transaction, }, + solana_transaction_status::ConfirmedBlock, solana_vote_program::vote_state::VoteTransaction, std::{ cell::RefCell, @@ -44,7 +47,7 @@ use { io::Cursor, iter, str, sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, + atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, Arc, Mutex, RwLock, Weak, }, thread::{Builder, JoinHandle}, @@ -130,7 +133,7 @@ fn check_commitment_and_notify( params: &P, subscription: &SubscriptionInfo, bank_forks: &Arc>, - commitment_slots: &CommitmentSlots, + slot: Slot, bank_method: B, filter_results: F, notifier: &RpcNotifier, @@ -142,20 +145,6 @@ where F: Fn(X, &P, Slot, Arc) -> (Box>, Slot), X: Clone + Default, { - let commitment = if let Some(commitment) = subscription.commitment() { - commitment - } else { - error!("missing commitment in check_commitment_and_notify"); - return false; - }; - let slot = if commitment.is_finalized() { - commitment_slots.highest_confirmed_root - } else if commitment.is_confirmed() { - commitment_slots.highest_confirmed_slot - } else { - commitment_slots.slot - }; - let mut notified = false; if let Some(bank) = bank_forks.read().unwrap().get(slot).cloned() { let results = bank_method(&bank, params); @@ -175,6 +164,7 @@ where notified = true; } } + notified } @@ -287,6 +277,46 @@ impl RpcNotifier { } } +fn filter_block_result_txs( + block: ConfirmedBlock, + last_modified_slot: Slot, + params: &BlockSubscriptionParams, +) -> Option { + let transactions = match params.kind { + BlockSubscriptionKind::All => block.transactions, + BlockSubscriptionKind::MentionsAccountOrProgram(pk) => block + .transactions + .into_iter() + .filter(|tx| tx.transaction.message.account_keys.contains(&pk)) + .collect(), + }; + + if transactions.is_empty() { + if let BlockSubscriptionKind::MentionsAccountOrProgram(_) = params.kind { + return None; + } + } + + let block = ConfirmedBlock { + transactions, + ..block + } + .configure( + params.encoding, + params.transaction_details, + params.show_rewards, + ); + + // If last_modified_slot < last_notified_slot, then the last notif was for a fork. + // That's the risk clients take when subscribing to non-finalized commitments. + // This code lets the logic for dealing with forks live on the client side. + Some(RpcBlockUpdate { + slot: last_modified_slot, + block: Some(block), + err: None, + }) +} + fn filter_account_result( result: Option<(AccountSharedData, Slot)>, params: &AccountSubscriptionParams, @@ -416,14 +446,7 @@ fn initial_last_notified_slot( 0 } } - // last_notified_slot is not utilized for these subscriptions - SubscriptionParams::Logs(_) - | SubscriptionParams::Program(_) - | SubscriptionParams::Signature(_) - | SubscriptionParams::Slot - | SubscriptionParams::SlotsUpdates - | SubscriptionParams::Root - | SubscriptionParams::Vote => 0, + _ => 0, } } @@ -480,12 +503,16 @@ impl Drop for RpcSubscriptions { impl RpcSubscriptions { pub fn new( exit: &Arc, + max_complete_transaction_status_slot: Arc, + blockstore: Arc, bank_forks: Arc>, block_commitment_cache: Arc>, optimistically_confirmed_bank: Arc>, ) -> Self { Self::new_with_config( exit, + max_complete_transaction_status_slot, + blockstore, bank_forks, block_commitment_cache, optimistically_confirmed_bank, @@ -495,12 +522,38 @@ impl RpcSubscriptions { pub fn new_for_tests( exit: &Arc, + max_complete_transaction_status_slot: Arc, + bank_forks: Arc>, + block_commitment_cache: Arc>, + optimistically_confirmed_bank: Arc>, + ) -> Self { + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Blockstore::open(&ledger_path).unwrap(); + let blockstore = Arc::new(blockstore); + + Self::new_with_config( + exit, + max_complete_transaction_status_slot, + blockstore, + bank_forks, + block_commitment_cache, + optimistically_confirmed_bank, + &PubSubConfig::default_for_tests(), + ) + } + + pub fn new_for_tests_with_blockstore( + exit: &Arc, + max_complete_transaction_status_slot: Arc, + blockstore: Arc, bank_forks: Arc>, block_commitment_cache: Arc>, optimistically_confirmed_bank: Arc>, ) -> Self { Self::new_with_config( exit, + max_complete_transaction_status_slot, + blockstore, bank_forks, block_commitment_cache, optimistically_confirmed_bank, @@ -510,6 +563,8 @@ impl RpcSubscriptions { pub fn new_with_config( exit: &Arc, + max_complete_transaction_status_slot: Arc, + blockstore: Arc, bank_forks: Arc>, block_commitment_cache: Arc>, optimistically_confirmed_bank: Arc>, @@ -541,6 +596,8 @@ impl RpcSubscriptions { pool.install(|| { Self::process_notifications( exit_clone, + max_complete_transaction_status_slot, + blockstore, notifier, notification_receiver, subscriptions, @@ -568,11 +625,19 @@ impl RpcSubscriptions { } // For tests only... - pub fn default_with_bank_forks(bank_forks: Arc>) -> Self { + pub fn default_with_bank_forks( + max_complete_transaction_status_slot: Arc, + bank_forks: Arc>, + ) -> Self { + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Blockstore::open(&ledger_path).unwrap(); + let blockstore = Arc::new(blockstore); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); Self::new( &Arc::new(AtomicBool::new(false)), + max_complete_transaction_status_slot, + blockstore, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::default())), optimistically_confirmed_bank, @@ -641,6 +706,8 @@ impl RpcSubscriptions { fn process_notifications( exit: Arc, + max_complete_transaction_status_slot: Arc, + blockstore: Arc, notifier: RpcNotifier, notification_receiver: Receiver, mut subscriptions: SubscriptionsTracker, @@ -719,27 +786,32 @@ impl RpcSubscriptions { } } NotificationEntry::Bank(commitment_slots) => { - RpcSubscriptions::notify_accounts_logs_programs_signatures( + const SOURCE: &str = "bank"; + RpcSubscriptions::notify_watchers( + max_complete_transaction_status_slot.clone(), subscriptions.commitment_watchers(), &bank_forks, + &blockstore, &commitment_slots, ¬ifier, - "bank", - ) + SOURCE, + ); } NotificationEntry::Gossip(slot) => { let commitment_slots = CommitmentSlots { highest_confirmed_slot: slot, ..CommitmentSlots::default() }; - - RpcSubscriptions::notify_accounts_logs_programs_signatures( + const SOURCE: &str = "gossip"; + RpcSubscriptions::notify_watchers( + max_complete_transaction_status_slot.clone(), subscriptions.gossip_watchers(), &bank_forks, + &blockstore, &commitment_slots, ¬ifier, - "gossip", - ) + SOURCE, + ); } NotificationEntry::SignaturesReceived((slot, slot_signatures)) => { for slot_signature in &slot_signatures { @@ -785,100 +857,205 @@ impl RpcSubscriptions { } } - fn notify_accounts_logs_programs_signatures( + fn notify_watchers( + max_complete_transaction_status_slot: Arc, subscriptions: &HashMap>, bank_forks: &Arc>, + blockstore: &Blockstore, commitment_slots: &CommitmentSlots, notifier: &RpcNotifier, source: &'static str, ) { - let mut total_time = Measure::start("notify_accounts_logs_programs_signatures"); + let mut total_time = Measure::start("notify_watchers"); + let num_accounts_found = AtomicUsize::new(0); let num_accounts_notified = AtomicUsize::new(0); + let num_blocks_found = AtomicUsize::new(0); + let num_blocks_notified = AtomicUsize::new(0); + let num_logs_found = AtomicUsize::new(0); let num_logs_notified = AtomicUsize::new(0); - let num_signatures_found = AtomicUsize::new(0); - let num_signatures_notified = AtomicUsize::new(0); - let num_programs_found = AtomicUsize::new(0); let num_programs_notified = AtomicUsize::new(0); + let num_signatures_found = AtomicUsize::new(0); + let num_signatures_notified = AtomicUsize::new(0); + let subscriptions = subscriptions.into_par_iter(); subscriptions.for_each(|(_id, subscription)| { + let slot = if let Some(commitment) = subscription.commitment() { + if commitment.is_finalized() { + Some(commitment_slots.highest_confirmed_root) + } else if commitment.is_confirmed() { + Some(commitment_slots.highest_confirmed_slot) + } else { + Some(commitment_slots.slot) + } + } else { + error!("missing commitment in notify_watchers"); + None + }; match subscription.params() { SubscriptionParams::Account(params) => { - let notified = check_commitment_and_notify( - params, - subscription, - bank_forks, - commitment_slots, - |bank, params| bank.get_account_modified_slot(¶ms.pubkey), - filter_account_result, - notifier, - false, - ); - num_accounts_found.fetch_add(1, Ordering::Relaxed); + if let Some(slot) = slot { + let notified = check_commitment_and_notify( + params, + subscription, + bank_forks, + slot, + |bank, params| bank.get_account_modified_slot(¶ms.pubkey), + filter_account_result, + notifier, + false, + ); - if notified { - num_accounts_notified.fetch_add(1, Ordering::Relaxed); + if notified { + num_accounts_notified.fetch_add(1, Ordering::Relaxed); + } + } + } + SubscriptionParams::Block(params) => { + num_blocks_found.fetch_add(1, Ordering::Relaxed); + if let Some(slot) = slot { + if let Some(bank) = bank_forks.read().unwrap().get(slot) { + // We're calling it unnotified in this context + // because, logically, it gets set to `last_notified_slot + 1` + // on the final iteration of the loop down below. + // This is used to notify blocks for slots that were + // potentially missed due to upstream transient errors + // that led to this notification not being triggered for + // a slot. + // + // e.g. + // notify_watchers is triggered for Slot 1 + // some time passes + // notify_watchers is triggered for Slot 4 + // this will try to fetch blocks for slots 2, 3, and 4 + // as long as they are ancestors of `slot` + let mut w_last_unnotified_slot = + subscription.last_notified_slot.write().unwrap(); + // would mean it's the first notification for this subscription connection + if *w_last_unnotified_slot == 0 { + *w_last_unnotified_slot = slot; + } + let mut slots_to_notify: Vec<_> = + (*w_last_unnotified_slot..slot).collect(); + let ancestors = bank.proper_ancestors_set(); + slots_to_notify = slots_to_notify + .into_iter() + .filter(|slot| ancestors.contains(slot)) + .collect(); + slots_to_notify.push(slot); + for s in slots_to_notify { + // To avoid skipping a slot that fails this condition, + // caused by non-deterministic concurrency accesses, we + // break out of the loop. Besides if the current `s` is + // greater, then any `s + K` is also greater. + if s > max_complete_transaction_status_slot.load(Ordering::SeqCst) { + break; + } + match blockstore.get_complete_block(s, false) { + Ok(block) => { + if let Some(res) = filter_block_result_txs(block, s, params) + { + notifier.notify( + Response { + context: RpcResponseContext { slot: s }, + value: res, + }, + subscription, + false, + ); + num_blocks_notified.fetch_add(1, Ordering::Relaxed); + // the next time this subscription is notified it will + // try to fetch all slots between (s + 1) to `slot`, inclusively + *w_last_unnotified_slot = s + 1; + } + } + Err(e) => { + // we don't advance `w_last_unnotified_slot` so that + // it'll retry on the next notification trigger + error!("get_complete_block error: {}", e); + notifier.notify( + Response { + context: RpcResponseContext { slot: s }, + value: RpcBlockUpdate { + slot, + block: None, + err: Some(RpcBlockUpdateError::BlockStoreError), + }, + }, + subscription, + false, + ); + } + } + } + } } } SubscriptionParams::Logs(params) => { - let notified = check_commitment_and_notify( - params, - subscription, - bank_forks, - commitment_slots, - get_transaction_logs, - filter_logs_results, - notifier, - false, - ); num_logs_found.fetch_add(1, Ordering::Relaxed); + if let Some(slot) = slot { + let notified = check_commitment_and_notify( + params, + subscription, + bank_forks, + slot, + get_transaction_logs, + filter_logs_results, + notifier, + false, + ); - if notified { - num_logs_notified.fetch_add(1, Ordering::Relaxed); + if notified { + num_logs_notified.fetch_add(1, Ordering::Relaxed); + } } } SubscriptionParams::Program(params) => { - let notified = check_commitment_and_notify( - params, - subscription, - bank_forks, - commitment_slots, - |bank, params| { - bank.get_program_accounts_modified_since_parent(¶ms.pubkey) - }, - filter_program_results, - notifier, - false, - ); num_programs_found.fetch_add(1, Ordering::Relaxed); + if let Some(slot) = slot { + let notified = check_commitment_and_notify( + params, + subscription, + bank_forks, + slot, + |bank, params| { + bank.get_program_accounts_modified_since_parent(¶ms.pubkey) + }, + filter_program_results, + notifier, + false, + ); - if notified { - num_programs_notified.fetch_add(1, Ordering::Relaxed); + if notified { + num_programs_notified.fetch_add(1, Ordering::Relaxed); + } } } SubscriptionParams::Signature(params) => { - let notified = check_commitment_and_notify( - params, - subscription, - bank_forks, - commitment_slots, - |bank, params| { - bank.get_signature_status_processed_since_parent(¶ms.signature) - }, - filter_signature_result, - notifier, - true, // Unsubscribe. - ); num_signatures_found.fetch_add(1, Ordering::Relaxed); + if let Some(slot) = slot { + let notified = check_commitment_and_notify( + params, + subscription, + bank_forks, + slot, + |bank, params| { + bank.get_signature_status_processed_since_parent(¶ms.signature) + }, + filter_signature_result, + notifier, + true, // Unsubscribe. + ); - if notified { - num_signatures_notified.fetch_add(1, Ordering::Relaxed); + if notified { + num_signatures_notified.fetch_add(1, Ordering::Relaxed); + } } } _ => error!("wrong subscription type in alps map"), @@ -997,13 +1174,14 @@ pub(crate) mod tests { optimistically_confirmed_bank_tracker::{ BankNotification, OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker, }, + rpc::create_test_transactions_and_populate_blockstore, rpc_pubsub::RpcSolPubSubInternal, rpc_pubsub_service, }, serial_test::serial, solana_client::rpc_config::{ RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, - RpcTransactionLogsFilter, + RpcTransactionLogsFilter, {RpcBlockSubscribeConfig, RpcBlockSubscribeFilter}, }, solana_runtime::{ commitment::BlockCommitment, @@ -1016,7 +1194,11 @@ pub(crate) mod tests { stake, system_instruction, system_program, system_transaction, transaction::Transaction, }, - std::{collections::HashSet, sync::atomic::Ordering::Relaxed}, + solana_transaction_status::{TransactionDetails, UiTransactionEncoding}, + std::{ + collections::HashSet, + sync::atomic::{AtomicU64, Ordering::Relaxed}, + }, }; fn make_account_result(lamports: u64, subscription: u64, data: &str) -> serde_json::Value { @@ -1056,8 +1238,10 @@ pub(crate) mod tests { let alice = Keypair::new(); let exit = Arc::new(AtomicBool::new(false)); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( 1, 1, @@ -1153,6 +1337,294 @@ pub(crate) mod tests { } } + #[test] + #[serial] + fn test_check_confirmed_block_subscribe() { + let exit = Arc::new(AtomicBool::new(false)); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Blockstore::open(&ledger_path).unwrap(); + let blockstore = Arc::new(blockstore); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore( + &exit, + max_complete_transaction_status_slot, + blockstore.clone(), + bank_forks.clone(), + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + optimistically_confirmed_bank, + )); + let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions); + let filter = RpcBlockSubscribeFilter::All; + let config = RpcBlockSubscribeConfig { + commitment: Some(CommitmentConfig::confirmed()), + encoding: Some(UiTransactionEncoding::Json), + transaction_details: Some(TransactionDetails::Signatures), + show_rewards: None, + }; + let params = BlockSubscriptionParams { + kind: BlockSubscriptionKind::All, + commitment: config.commitment.unwrap(), + encoding: config.encoding.unwrap(), + transaction_details: config.transaction_details.unwrap(), + show_rewards: config.show_rewards.unwrap_or_default(), + }; + let sub_id = rpc.block_subscribe(filter, Some(config)).unwrap(); + + subscriptions + .control + .assert_subscribed(&SubscriptionParams::Block(params.clone())); + + let bank = bank_forks.read().unwrap().working_bank(); + let keypair1 = Keypair::new(); + let keypair2 = Keypair::new(); + let keypair3 = Keypair::new(); + let keypair4 = Keypair::new(); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root())); + let _confirmed_block_signatures = create_test_transactions_and_populate_blockstore( + vec![&keypair1, &keypair2, &keypair3, &keypair4], + 0, + bank, + blockstore.clone(), + max_complete_transaction_status_slot, + ); + + let slot = 0; + subscriptions.notify_gossip_subscribers(slot); + let actual_resp = receiver.recv(); + let actual_resp = serde_json::from_str::(&actual_resp).unwrap(); + + let block = blockstore.get_complete_block(slot, false).unwrap(); + let block = block.configure(params.encoding, params.transaction_details, false); + let expected_resp = RpcBlockUpdate { + slot, + block: Some(block), + err: None, + }; + let expected_resp = json!({ + "jsonrpc": "2.0", + "method": "blockNotification", + "params": { + "result": { + "context": { "slot": slot }, + "value": expected_resp, + }, + "subscription": 0, + } + }); + assert_eq!(expected_resp, actual_resp); + + // should not trigger since commitment NOT set to finalized + subscriptions.notify_subscribers(CommitmentSlots { + slot, + root: slot, + highest_confirmed_slot: slot, + highest_confirmed_root: slot, + }); + let should_err = receiver.recv_timeout(Duration::from_millis(300)); + assert!(should_err.is_err()); + + rpc.slot_unsubscribe(sub_id).unwrap(); + subscriptions + .control + .assert_unsubscribed(&SubscriptionParams::Block(params)); + } + + #[test] + #[serial] + fn test_check_confirmed_block_subscribe_with_mentions() { + let exit = Arc::new(AtomicBool::new(false)); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Blockstore::open(&ledger_path).unwrap(); + let blockstore = Arc::new(blockstore); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore( + &exit, + max_complete_transaction_status_slot, + blockstore.clone(), + bank_forks.clone(), + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + optimistically_confirmed_bank, + )); + let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions); + let keypair1 = Keypair::new(); + let filter = + RpcBlockSubscribeFilter::MentionsAccountOrProgram(keypair1.pubkey().to_string()); + let config = RpcBlockSubscribeConfig { + commitment: Some(CommitmentConfig::confirmed()), + encoding: Some(UiTransactionEncoding::Json), + transaction_details: Some(TransactionDetails::Signatures), + show_rewards: None, + }; + let params = BlockSubscriptionParams { + kind: BlockSubscriptionKind::MentionsAccountOrProgram(keypair1.pubkey()), + commitment: config.commitment.unwrap(), + encoding: config.encoding.unwrap(), + transaction_details: config.transaction_details.unwrap(), + show_rewards: config.show_rewards.unwrap_or_default(), + }; + let sub_id = rpc.block_subscribe(filter, Some(config)).unwrap(); + + subscriptions + .control + .assert_subscribed(&SubscriptionParams::Block(params.clone())); + + let bank = bank_forks.read().unwrap().working_bank(); + let keypair2 = Keypair::new(); + let keypair3 = Keypair::new(); + let keypair4 = Keypair::new(); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root())); + let _confirmed_block_signatures = create_test_transactions_and_populate_blockstore( + vec![&keypair1, &keypair2, &keypair3, &keypair4], + 0, + bank, + blockstore.clone(), + max_complete_transaction_status_slot, + ); + + let slot = 0; + subscriptions.notify_gossip_subscribers(slot); + let actual_resp = receiver.recv(); + let actual_resp = serde_json::from_str::(&actual_resp).unwrap(); + + // make sure it filtered out the other keypairs + let mut block = blockstore.get_complete_block(slot, false).unwrap(); + block.transactions.retain(|tx| { + tx.transaction + .message + .account_keys + .contains(&keypair1.pubkey()) + }); + let block = block.configure(params.encoding, params.transaction_details, false); + let expected_resp = RpcBlockUpdate { + slot, + block: Some(block), + err: None, + }; + let expected_resp = json!({ + "jsonrpc": "2.0", + "method": "blockNotification", + "params": { + "result": { + "context": { "slot": slot }, + "value": expected_resp, + }, + "subscription": 0, + } + }); + assert_eq!(expected_resp, actual_resp); + + rpc.slot_unsubscribe(sub_id).unwrap(); + subscriptions + .control + .assert_unsubscribed(&SubscriptionParams::Block(params)); + } + + #[test] + #[serial] + fn test_check_finalized_block_subscribe() { + let exit = Arc::new(AtomicBool::new(false)); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Blockstore::open(&ledger_path).unwrap(); + let blockstore = Arc::new(blockstore); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore( + &exit, + max_complete_transaction_status_slot, + blockstore.clone(), + bank_forks.clone(), + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + optimistically_confirmed_bank, + )); + let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions); + let filter = RpcBlockSubscribeFilter::All; + let config = RpcBlockSubscribeConfig { + commitment: Some(CommitmentConfig::finalized()), + encoding: Some(UiTransactionEncoding::Json), + transaction_details: Some(TransactionDetails::Signatures), + show_rewards: None, + }; + let params = BlockSubscriptionParams { + kind: BlockSubscriptionKind::All, + commitment: config.commitment.unwrap(), + encoding: config.encoding.unwrap(), + transaction_details: config.transaction_details.unwrap(), + show_rewards: config.show_rewards.unwrap_or_default(), + }; + let sub_id = rpc.block_subscribe(filter, Some(config)).unwrap(); + subscriptions + .control + .assert_subscribed(&SubscriptionParams::Block(params.clone())); + + let bank = bank_forks.read().unwrap().working_bank(); + let keypair1 = Keypair::new(); + let keypair2 = Keypair::new(); + let keypair3 = Keypair::new(); + let keypair4 = Keypair::new(); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root())); + let _confirmed_block_signatures = create_test_transactions_and_populate_blockstore( + vec![&keypair1, &keypair2, &keypair3, &keypair4], + 0, + bank, + blockstore.clone(), + max_complete_transaction_status_slot, + ); + + let slot = 0; + subscriptions.notify_subscribers(CommitmentSlots { + slot, + root: slot, + highest_confirmed_slot: slot, + highest_confirmed_root: slot, + }); + let actual_resp = receiver.recv(); + let actual_resp = serde_json::from_str::(&actual_resp).unwrap(); + + let block = blockstore.get_complete_block(slot, false).unwrap(); + let block = block.configure(params.encoding, params.transaction_details, false); + let expected_resp = RpcBlockUpdate { + slot, + block: Some(block), + err: None, + }; + let expected_resp = json!({ + "jsonrpc": "2.0", + "method": "blockNotification", + "params": { + "result": { + "context": { "slot": slot }, + "value": expected_resp, + }, + "subscription": 0, + } + }); + assert_eq!(expected_resp, actual_resp); + + // should not trigger since commitment set to finalized + subscriptions.notify_gossip_subscribers(slot); + let should_err = receiver.recv_timeout(Duration::from_millis(300)); + assert!(should_err.is_err()); + + rpc.slot_unsubscribe(sub_id).unwrap(); + subscriptions + .control + .assert_unsubscribed(&SubscriptionParams::Block(params)); + } + #[test] #[serial] fn test_check_program_subscribe() { @@ -1184,8 +1656,10 @@ pub(crate) mod tests { let exit = Arc::new(AtomicBool::new(false)); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), optimistically_confirmed_bank, @@ -1329,9 +1803,10 @@ pub(crate) mod tests { let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let mut pending_optimistically_confirmed_banks = HashSet::new(); - + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( 1, 1, @@ -1498,9 +1973,10 @@ pub(crate) mod tests { let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let mut pending_optimistically_confirmed_banks = HashSet::new(); - + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( 1, 1, @@ -1608,9 +2084,10 @@ pub(crate) mod tests { let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let mut pending_optimistically_confirmed_banks = HashSet::new(); - + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( 1, 1, @@ -1794,8 +2271,10 @@ pub(crate) mod tests { let exit = Arc::new(AtomicBool::new(false)); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks, Arc::new(RwLock::new(block_commitment_cache)), optimistically_confirmed_bank, @@ -1966,8 +2445,10 @@ pub(crate) mod tests { let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), optimistically_confirmed_bank, @@ -2010,8 +2491,10 @@ pub(crate) mod tests { let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks, Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), optimistically_confirmed_bank, @@ -2066,8 +2549,10 @@ pub(crate) mod tests { let mut pending_optimistically_confirmed_banks = HashSet::new(); let exit = Arc::new(AtomicBool::new(false)); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( &exit, + max_complete_transaction_status_slot, bank_forks.clone(), Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( 1, 1, @@ -2218,8 +2703,12 @@ pub(crate) mod tests { fn test_total_subscriptions() { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100); let bank = Bank::new_for_tests(&genesis_config); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); - let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(bank_forks)); + let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks( + max_complete_transaction_status_slot, + bank_forks, + )); let (rpc1, _receiver1) = rpc_pubsub_service::test_connection(&subscriptions); let sub_id1 = rpc1 diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index a3bcdf7a51..da555c2d6c 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -1505,6 +1505,10 @@ impl Bank { .scan_results_limit_bytes } + pub fn proper_ancestors_set(&self) -> HashSet { + HashSet::from_iter(self.proper_ancestors()) + } + /// Returns all ancestors excluding self.slot. pub(crate) fn proper_ancestors(&self) -> impl Iterator + '_ { self.ancestors diff --git a/transaction-status/src/lib.rs b/transaction-status/src/lib.rs index 9cea1d9ee0..77e5aea85a 100644 --- a/transaction-status/src/lib.rs +++ b/transaction-status/src/lib.rs @@ -355,7 +355,7 @@ pub struct Reward { pub type Rewards = Vec; -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ConfirmedBlock { pub previous_blockhash: String, @@ -485,7 +485,7 @@ impl From for EncodedConfirmedBlock { } } -#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum TransactionDetails { Full, @@ -579,7 +579,7 @@ impl TransactionWithStatusMeta { } } -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct EncodedTransactionWithStatusMeta { pub transaction: EncodedTransaction, @@ -595,7 +595,7 @@ impl TransactionStatusMeta { } } -#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq)] +#[derive(Serialize, Deserialize, Clone, Copy, Debug, Eq, Hash, PartialEq)] #[serde(rename_all = "camelCase")] pub enum UiTransactionEncoding { Binary, // Legacy. Retained for RPC backwards compatibility diff --git a/validator/src/main.rs b/validator/src/main.rs index 67ac40cb21..b7a5e76181 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1203,6 +1203,12 @@ pub fn main() { .default_value("4") .help("PubSub worker threads"), ) + .arg( + Arg::with_name("rpc_pubsub_enable_block_subscription") + .long("rpc-pubsub-enable-block-subscription") + .takes_value(false) + .help("Enable the unstable RPC PubSub `blockSubscribe` subscription"), + ) .arg( Arg::with_name("rpc_pubsub_enable_vote_subscription") .long("rpc-pubsub-enable-vote-subscription") @@ -2217,6 +2223,7 @@ pub fn main() { ) }), pubsub_config: PubSubConfig { + enable_block_subscription: matches.is_present("rpc_pubsub_enable_block_subscription"), enable_vote_subscription: matches.is_present("rpc_pubsub_enable_vote_subscription"), max_active_subscriptions: value_t_or_exit!( matches,