removes Box<dyn Iterator<...>> from rpc/src/rpc_subscriptions.rs (#29203)

Box<dyn ...> requires dynamic dispatch, is heap allocated, slow and
verbose.
This commit is contained in:
behzad nouri 2022-12-15 22:33:52 +00:00 committed by GitHub
parent 83b4c347b8
commit 109dbf76df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 35 additions and 52 deletions

View File

@ -13,6 +13,7 @@ use {
}, },
}, },
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender}, crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
itertools::Either,
rayon::prelude::*, rayon::prelude::*,
serde::Serialize, serde::Serialize,
solana_account_decoder::{parse_token::is_known_spl_token_id, UiAccount, UiAccountEncoding}, solana_account_decoder::{parse_token::is_known_spl_token_id, UiAccount, UiAccountEncoding},
@ -45,7 +46,7 @@ use {
cell::RefCell, cell::RefCell,
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
io::Cursor, io::Cursor,
iter, str, str,
sync::{ sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc, Mutex, RwLock, Weak, Arc, Mutex, RwLock, Weak,
@ -129,7 +130,7 @@ impl std::fmt::Debug for NotificationEntry {
} }
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
fn check_commitment_and_notify<P, S, B, F, X>( fn check_commitment_and_notify<P, S, B, F, X, I>(
params: &P, params: &P,
subscription: &SubscriptionInfo, subscription: &SubscriptionInfo,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
@ -142,8 +143,9 @@ fn check_commitment_and_notify<P, S, B, F, X>(
where where
S: Clone + Serialize, S: Clone + Serialize,
B: Fn(&Bank, &P) -> X, B: Fn(&Bank, &P) -> X,
F: Fn(X, &P, Slot, Arc<Bank>) -> (Box<dyn Iterator<Item = S>>, Slot), F: Fn(X, &P, Slot, Arc<Bank>) -> (I, Slot),
X: Clone + Default, X: Clone + Default,
I: IntoIterator<Item = S>,
{ {
let mut notified = false; let mut notified = false;
let bank = bank_forks.read().unwrap().get(slot); let bank = bank_forks.read().unwrap().get(slot);
@ -370,36 +372,23 @@ fn filter_account_result(
params: &AccountSubscriptionParams, params: &AccountSubscriptionParams,
last_notified_slot: Slot, last_notified_slot: Slot,
bank: Arc<Bank>, bank: Arc<Bank>,
) -> (Box<dyn Iterator<Item = UiAccount>>, Slot) { ) -> (Option<UiAccount>, Slot) {
// If the account is not found, `last_modified_slot` will default to zero and // If the account is not found, `last_modified_slot` will default to zero and
// we will notify clients that the account no longer exists if we haven't already // we will notify clients that the account no longer exists if we haven't already
let (account, last_modified_slot) = result.unwrap_or_default(); let (account, last_modified_slot) = result.unwrap_or_default();
// If last_modified_slot < last_notified_slot this means that we last notified for a fork // If last_modified_slot < last_notified_slot this means that we last notified for a fork
// and should notify that the account state has been reverted. // and should notify that the account state has been reverted.
let results: Box<dyn Iterator<Item = UiAccount>> = if last_modified_slot != last_notified_slot { let account = (last_modified_slot != last_notified_slot).then(|| {
if is_known_spl_token_id(account.owner()) if is_known_spl_token_id(account.owner())
&& params.encoding == UiAccountEncoding::JsonParsed && params.encoding == UiAccountEncoding::JsonParsed
{ {
Box::new(iter::once(get_parsed_token_account( get_parsed_token_account(bank, &params.pubkey, account)
bank,
&params.pubkey,
account,
)))
} else { } else {
Box::new(iter::once(UiAccount::encode( UiAccount::encode(&params.pubkey, &account, params.encoding, None, None)
&params.pubkey,
&account,
params.encoding,
None,
None,
)))
} }
} else { });
Box::new(iter::empty()) (account, last_modified_slot)
};
(results, last_modified_slot)
} }
fn filter_signature_result( fn filter_signature_result(
@ -407,11 +396,11 @@ fn filter_signature_result(
_params: &SignatureSubscriptionParams, _params: &SignatureSubscriptionParams,
last_notified_slot: Slot, last_notified_slot: Slot,
_bank: Arc<Bank>, _bank: Arc<Bank>,
) -> (Box<dyn Iterator<Item = RpcSignatureResult>>, Slot) { ) -> (Option<RpcSignatureResult>, Slot) {
( (
Box::new(result.into_iter().map(|result| { result.map(|result| {
RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult { err: result.err() }) RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult { err: result.err() })
})), }),
last_notified_slot, last_notified_slot,
) )
} }
@ -421,7 +410,7 @@ fn filter_program_results(
params: &ProgramSubscriptionParams, params: &ProgramSubscriptionParams,
last_notified_slot: Slot, last_notified_slot: Slot,
bank: Arc<Bank>, bank: Arc<Bank>,
) -> (Box<dyn Iterator<Item = RpcKeyedAccount>>, Slot) { ) -> (impl Iterator<Item = RpcKeyedAccount>, Slot) {
let accounts_is_empty = accounts.is_empty(); let accounts_is_empty = accounts.is_empty();
let encoding = params.encoding; let encoding = params.encoding;
let filters = params.filters.clone(); let filters = params.filters.clone();
@ -430,20 +419,19 @@ fn filter_program_results(
.iter() .iter()
.all(|filter_type| filter_type.allows(account)) .all(|filter_type| filter_type.allows(account))
}); });
let accounts: Box<dyn Iterator<Item = RpcKeyedAccount>> = let accounts = if is_known_spl_token_id(&params.pubkey)
if is_known_spl_token_id(&params.pubkey) && params.encoding == UiAccountEncoding::JsonParsed
&& params.encoding == UiAccountEncoding::JsonParsed && !accounts_is_empty
&& !accounts_is_empty {
{ let accounts = get_parsed_token_accounts(bank, keyed_accounts);
Box::new(get_parsed_token_accounts(bank, keyed_accounts)) Either::Left(accounts)
} else { } else {
Box::new( let accounts = keyed_accounts.map(move |(pubkey, account)| RpcKeyedAccount {
keyed_accounts.map(move |(pubkey, account)| RpcKeyedAccount { pubkey: pubkey.to_string(),
pubkey: pubkey.to_string(), account: UiAccount::encode(&pubkey, &account, encoding, None, None),
account: UiAccount::encode(&pubkey, &account, encoding, None, None), });
}), Either::Right(accounts)
) };
};
(accounts, last_notified_slot) (accounts, last_notified_slot)
} }
@ -452,18 +440,13 @@ fn filter_logs_results(
_params: &LogsSubscriptionParams, _params: &LogsSubscriptionParams,
last_notified_slot: Slot, last_notified_slot: Slot,
_bank: Arc<Bank>, _bank: Arc<Bank>,
) -> (Box<dyn Iterator<Item = RpcLogsResponse>>, Slot) { ) -> (impl Iterator<Item = RpcLogsResponse>, Slot) {
match logs { let responses = logs.into_iter().flatten().map(|log| RpcLogsResponse {
None => (Box::new(iter::empty()), last_notified_slot), signature: log.signature.to_string(),
Some(logs) => ( err: log.result.err(),
Box::new(logs.into_iter().map(|log| RpcLogsResponse { logs: log.log_messages,
signature: log.signature.to_string(), });
err: log.result.err(), (responses, last_notified_slot)
logs: log.log_messages,
})),
last_notified_slot,
),
}
} }
fn initial_last_notified_slot( fn initial_last_notified_slot(