Log subscription for `All` no longer clobbers `Some` subscription for pubkey (#24215)

* fix: subscribing for all logs no longer clobbers pubkey-specific subscriptions
* test: write tests to cover logsSubscribe
This commit is contained in:
Steven Luscher 2022-04-26 10:31:11 -07:00 committed by GitHub
parent 534a666153
commit 3007f233f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 149 additions and 8 deletions

View File

@ -13,10 +13,7 @@ use {
},
solana_transaction_status::{TransactionDetails, UiTransactionEncoding},
std::{
collections::{
hash_map::{Entry, HashMap},
HashSet,
},
collections::hash_map::{Entry, HashMap},
fmt,
sync::{
atomic::{AtomicU64, Ordering},
@ -290,6 +287,21 @@ impl SubscriptionControl {
})
}
#[cfg(test)]
pub fn logs_subscribed(&self, pubkey: Option<&Pubkey>) -> bool {
self.0.subscriptions.iter().any(|item| {
if let SubscriptionParams::Logs(params) = item.key() {
let subscribed_pubkey = match &params.kind {
LogsSubscriptionKind::All | LogsSubscriptionKind::AllWithVotes => None,
LogsSubscriptionKind::Single(pubkey) => Some(pubkey),
};
subscribed_pubkey == pubkey
} else {
false
}
})
}
#[cfg(test)]
pub fn signature_subscribed(&self, signature: &Signature) -> bool {
self.0.subscriptions.iter().any(|item| {
@ -373,20 +385,21 @@ impl LogsSubscriptionsIndex {
}
fn update_config(&self) {
let mentioned_addresses = self.single_count.keys().copied().collect();
let config = if self.all_with_votes_count > 0 {
TransactionLogCollectorConfig {
filter: TransactionLogCollectorFilter::AllWithVotes,
mentioned_addresses: HashSet::new(),
mentioned_addresses,
}
} else if self.all_count > 0 {
TransactionLogCollectorConfig {
filter: TransactionLogCollectorFilter::All,
mentioned_addresses: HashSet::new(),
mentioned_addresses,
}
} else {
TransactionLogCollectorConfig {
filter: TransactionLogCollectorFilter::OnlyMentionedAddresses,
mentioned_addresses: self.single_count.keys().copied().collect(),
mentioned_addresses,
}
};

View File

@ -1210,7 +1210,8 @@ pub(crate) mod tests {
serial_test::serial,
solana_client::rpc_config::{
RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsFilter,
RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
RpcTransactionLogsFilter,
},
solana_runtime::{
commitment::BlockCommitment,
@ -2791,6 +2792,114 @@ pub(crate) mod tests {
assert!(!subscriptions.control.account_subscribed(&alice.pubkey()));
}
fn make_logs_result(signature: &str, subscription_id: u64) -> serde_json::Value {
json!({
"jsonrpc": "2.0",
"method": "logsNotification",
"params": {
"result": {
"context": {
"slot": 0
},
"value": {
"signature": signature,
"err": null,
"logs": [
"Program 11111111111111111111111111111111 invoke [1]",
"Program 11111111111111111111111111111111 success"
]
}
},
"subscription": subscription_id
}
})
}
#[test]
#[serial]
fn test_logs_subscribe() {
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(100);
let bank = Bank::new_for_tests(&genesis_config);
let blockhash = bank.last_blockhash();
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let alice = Keypair::new();
let exit = Arc::new(AtomicBool::new(false));
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
max_complete_transaction_status_slot,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
optimistically_confirmed_bank,
));
let sub_config = RpcTransactionLogsConfig {
commitment: Some(CommitmentConfig::processed()),
};
let (rpc_all, mut receiver_all) = rpc_pubsub_service::test_connection(&subscriptions);
let sub_id_for_all = rpc_all
.logs_subscribe(RpcTransactionLogsFilter::All, Some(sub_config.clone()))
.unwrap();
assert!(subscriptions.control.logs_subscribed(None));
let (rpc_alice, mut receiver_alice) = rpc_pubsub_service::test_connection(&subscriptions);
let sub_id_for_alice = rpc_alice
.logs_subscribe(
RpcTransactionLogsFilter::Mentions(vec![alice.pubkey().to_string()]),
Some(sub_config),
)
.unwrap();
assert!(subscriptions.control.logs_subscribed(Some(&alice.pubkey())));
let tx = system_transaction::create_account(
&mint_keypair,
&alice,
blockhash,
1,
0,
&system_program::id(),
);
bank_forks
.read()
.unwrap()
.get(0)
.unwrap()
.process_transaction_with_logs(&tx)
.unwrap();
subscriptions.notify_subscribers(CommitmentSlots::new_from_slot(0));
let expected_response_all =
make_logs_result(&tx.signatures[0].to_string(), u64::from(sub_id_for_all));
let response_all = receiver_all.recv();
assert_eq!(
expected_response_all,
serde_json::from_str::<serde_json::Value>(&response_all).unwrap(),
);
let expected_response_alice =
make_logs_result(&tx.signatures[0].to_string(), u64::from(sub_id_for_alice));
let response_alice = receiver_alice.recv();
assert_eq!(
expected_response_alice,
serde_json::from_str::<serde_json::Value>(&response_alice).unwrap(),
);
rpc_all.logs_unsubscribe(sub_id_for_all).unwrap();
assert!(!subscriptions.control.logs_subscribed(None));
rpc_alice.logs_unsubscribe(sub_id_for_alice).unwrap();
assert!(!subscriptions.control.logs_subscribed(Some(&alice.pubkey())));
}
#[test]
fn test_total_subscriptions() {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100);

View File

@ -5672,6 +5672,25 @@ impl Bank {
.map_or(Ok(()), |sig| self.get_signature_status(sig).unwrap())
}
/// Process a Transaction and store program log data. This is used for unit tests, and simply
/// replicates the vector Bank::process_transactions method with `enable_cpi_recording: true`
pub fn process_transaction_with_logs(&self, tx: &Transaction) -> Result<()> {
let txs = vec![VersionedTransaction::from(tx.clone())];
let batch = self.prepare_entry_batch(txs)?;
let _results = self.load_execute_and_commit_transactions(
&batch,
MAX_PROCESSING_AGE,
false,
false,
true,
false,
&mut ExecuteTimings::default(),
);
tx.signatures
.get(0)
.map_or(Ok(()), |sig| self.get_signature_status(sig).unwrap())
}
/// Process multiple transaction in a single batch. This is used for benches and unit tests.
///
/// # Panics