Remove RwLock on TransactionNotifier (#33962)
* Remove RwLock on TransactionNotifier
This commit is contained in:
parent
b013c03afa
commit
eba1b2d3e3
|
@ -83,7 +83,7 @@ use {
|
||||||
rpc_pubsub_service::{PubSubConfig, PubSubService},
|
rpc_pubsub_service::{PubSubConfig, PubSubService},
|
||||||
rpc_service::JsonRpcService,
|
rpc_service::JsonRpcService,
|
||||||
rpc_subscriptions::RpcSubscriptions,
|
rpc_subscriptions::RpcSubscriptions,
|
||||||
transaction_notifier_interface::TransactionNotifierLock,
|
transaction_notifier_interface::TransactionNotifierArc,
|
||||||
transaction_status_service::TransactionStatusService,
|
transaction_status_service::TransactionStatusService,
|
||||||
},
|
},
|
||||||
solana_runtime::{
|
solana_runtime::{
|
||||||
|
@ -1689,7 +1689,7 @@ fn load_blockstore(
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
start_progress: &Arc<RwLock<ValidatorStartProgress>>,
|
start_progress: &Arc<RwLock<ValidatorStartProgress>>,
|
||||||
accounts_update_notifier: Option<AccountsUpdateNotifier>,
|
accounts_update_notifier: Option<AccountsUpdateNotifier>,
|
||||||
transaction_notifier: Option<TransactionNotifierLock>,
|
transaction_notifier: Option<TransactionNotifierArc>,
|
||||||
entry_notifier: Option<EntryNotifierArc>,
|
entry_notifier: Option<EntryNotifierArc>,
|
||||||
poh_timing_point_sender: Option<PohTimingSender>,
|
poh_timing_point_sender: Option<PohTimingSender>,
|
||||||
) -> Result<
|
) -> Result<
|
||||||
|
@ -2167,7 +2167,7 @@ fn initialize_rpc_transaction_history_services(
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
enable_rpc_transaction_history: bool,
|
enable_rpc_transaction_history: bool,
|
||||||
enable_extended_tx_metadata_storage: bool,
|
enable_extended_tx_metadata_storage: bool,
|
||||||
transaction_notifier: Option<TransactionNotifierLock>,
|
transaction_notifier: Option<TransactionNotifierArc>,
|
||||||
) -> TransactionHistoryServices {
|
) -> TransactionHistoryServices {
|
||||||
let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
|
let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
|
||||||
let (transaction_status_sender, transaction_status_receiver) = unbounded();
|
let (transaction_status_sender, transaction_status_receiver) = unbounded();
|
||||||
|
|
|
@ -15,7 +15,7 @@ use {
|
||||||
solana_ledger::entry_notifier_interface::EntryNotifierArc,
|
solana_ledger::entry_notifier_interface::EntryNotifierArc,
|
||||||
solana_rpc::{
|
solana_rpc::{
|
||||||
optimistically_confirmed_bank_tracker::SlotNotification,
|
optimistically_confirmed_bank_tracker::SlotNotification,
|
||||||
transaction_notifier_interface::TransactionNotifierLock,
|
transaction_notifier_interface::TransactionNotifierArc,
|
||||||
},
|
},
|
||||||
std::{
|
std::{
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
|
@ -34,7 +34,7 @@ pub struct GeyserPluginService {
|
||||||
slot_status_observer: Option<SlotStatusObserver>,
|
slot_status_observer: Option<SlotStatusObserver>,
|
||||||
plugin_manager: Arc<RwLock<GeyserPluginManager>>,
|
plugin_manager: Arc<RwLock<GeyserPluginManager>>,
|
||||||
accounts_update_notifier: Option<AccountsUpdateNotifier>,
|
accounts_update_notifier: Option<AccountsUpdateNotifier>,
|
||||||
transaction_notifier: Option<TransactionNotifierLock>,
|
transaction_notifier: Option<TransactionNotifierArc>,
|
||||||
entry_notifier: Option<EntryNotifierArc>,
|
entry_notifier: Option<EntryNotifierArc>,
|
||||||
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
|
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
|
||||||
}
|
}
|
||||||
|
@ -92,10 +92,10 @@ impl GeyserPluginService {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
let transaction_notifier: Option<TransactionNotifierLock> =
|
let transaction_notifier: Option<TransactionNotifierArc> =
|
||||||
if transaction_notifications_enabled {
|
if transaction_notifications_enabled {
|
||||||
let transaction_notifier = TransactionNotifierImpl::new(plugin_manager.clone());
|
let transaction_notifier = TransactionNotifierImpl::new(plugin_manager.clone());
|
||||||
Some(Arc::new(RwLock::new(transaction_notifier)))
|
Some(Arc::new(transaction_notifier))
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
@ -160,7 +160,7 @@ impl GeyserPluginService {
|
||||||
self.accounts_update_notifier.clone()
|
self.accounts_update_notifier.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_transaction_notifier(&self) -> Option<TransactionNotifierLock> {
|
pub fn get_transaction_notifier(&self) -> Option<TransactionNotifierArc> {
|
||||||
self.transaction_notifier.clone()
|
self.transaction_notifier.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use {
|
use {
|
||||||
solana_sdk::{clock::Slot, signature::Signature, transaction::SanitizedTransaction},
|
solana_sdk::{clock::Slot, signature::Signature, transaction::SanitizedTransaction},
|
||||||
solana_transaction_status::TransactionStatusMeta,
|
solana_transaction_status::TransactionStatusMeta,
|
||||||
std::sync::{Arc, RwLock},
|
std::sync::Arc,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub trait TransactionNotifier {
|
pub trait TransactionNotifier {
|
||||||
|
@ -15,4 +15,4 @@ pub trait TransactionNotifier {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type TransactionNotifierLock = Arc<RwLock<dyn TransactionNotifier + Sync + Send>>;
|
pub type TransactionNotifierArc = Arc<dyn TransactionNotifier + Sync + Send>;
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use {
|
use {
|
||||||
crate::transaction_notifier_interface::TransactionNotifierLock,
|
crate::transaction_notifier_interface::TransactionNotifierArc,
|
||||||
crossbeam_channel::{Receiver, RecvTimeoutError},
|
crossbeam_channel::{Receiver, RecvTimeoutError},
|
||||||
itertools::izip,
|
itertools::izip,
|
||||||
solana_accounts_db::transaction_results::{DurableNonceFee, TransactionExecutionDetails},
|
solana_accounts_db::transaction_results::{DurableNonceFee, TransactionExecutionDetails},
|
||||||
|
@ -29,7 +29,7 @@ impl TransactionStatusService {
|
||||||
write_transaction_status_receiver: Receiver<TransactionStatusMessage>,
|
write_transaction_status_receiver: Receiver<TransactionStatusMessage>,
|
||||||
max_complete_transaction_status_slot: Arc<AtomicU64>,
|
max_complete_transaction_status_slot: Arc<AtomicU64>,
|
||||||
enable_rpc_transaction_history: bool,
|
enable_rpc_transaction_history: bool,
|
||||||
transaction_notifier: Option<TransactionNotifierLock>,
|
transaction_notifier: Option<TransactionNotifierArc>,
|
||||||
blockstore: Arc<Blockstore>,
|
blockstore: Arc<Blockstore>,
|
||||||
enable_extended_tx_metadata_storage: bool,
|
enable_extended_tx_metadata_storage: bool,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
|
@ -60,7 +60,7 @@ impl TransactionStatusService {
|
||||||
write_transaction_status_receiver: &Receiver<TransactionStatusMessage>,
|
write_transaction_status_receiver: &Receiver<TransactionStatusMessage>,
|
||||||
max_complete_transaction_status_slot: &Arc<AtomicU64>,
|
max_complete_transaction_status_slot: &Arc<AtomicU64>,
|
||||||
enable_rpc_transaction_history: bool,
|
enable_rpc_transaction_history: bool,
|
||||||
transaction_notifier: Option<TransactionNotifierLock>,
|
transaction_notifier: Option<TransactionNotifierArc>,
|
||||||
blockstore: &Blockstore,
|
blockstore: &Blockstore,
|
||||||
enable_extended_tx_metadata_storage: bool,
|
enable_extended_tx_metadata_storage: bool,
|
||||||
) -> Result<(), RecvTimeoutError> {
|
) -> Result<(), RecvTimeoutError> {
|
||||||
|
@ -169,7 +169,7 @@ impl TransactionStatusService {
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(transaction_notifier) = transaction_notifier.as_ref() {
|
if let Some(transaction_notifier) = transaction_notifier.as_ref() {
|
||||||
transaction_notifier.write().unwrap().notify_transaction(
|
transaction_notifier.notify_transaction(
|
||||||
slot,
|
slot,
|
||||||
transaction_index,
|
transaction_index,
|
||||||
transaction.signature(),
|
transaction.signature(),
|
||||||
|
@ -255,7 +255,7 @@ pub(crate) mod tests {
|
||||||
std::{
|
std::{
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicBool, Ordering},
|
atomic::{AtomicBool, Ordering},
|
||||||
Arc, RwLock,
|
Arc,
|
||||||
},
|
},
|
||||||
thread::sleep,
|
thread::sleep,
|
||||||
time::Duration,
|
time::Duration,
|
||||||
|
@ -432,7 +432,7 @@ pub(crate) mod tests {
|
||||||
transaction_indexes: vec![transaction_index],
|
transaction_indexes: vec![transaction_index],
|
||||||
};
|
};
|
||||||
|
|
||||||
let test_notifier = Arc::new(RwLock::new(TestTransactionNotifier::new()));
|
let test_notifier = Arc::new(TestTransactionNotifier::new());
|
||||||
|
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let transaction_status_service = TransactionStatusService::new(
|
let transaction_status_service = TransactionStatusService::new(
|
||||||
|
@ -452,16 +452,15 @@ pub(crate) mod tests {
|
||||||
|
|
||||||
exit.store(true, Ordering::Relaxed);
|
exit.store(true, Ordering::Relaxed);
|
||||||
transaction_status_service.join().unwrap();
|
transaction_status_service.join().unwrap();
|
||||||
let notifier = test_notifier.read().unwrap();
|
assert_eq!(test_notifier.notifications.len(), 1);
|
||||||
assert_eq!(notifier.notifications.len(), 1);
|
|
||||||
let key = TestNotifierKey {
|
let key = TestNotifierKey {
|
||||||
slot,
|
slot,
|
||||||
transaction_index,
|
transaction_index,
|
||||||
signature,
|
signature,
|
||||||
};
|
};
|
||||||
assert!(notifier.notifications.contains_key(&key));
|
assert!(test_notifier.notifications.contains_key(&key));
|
||||||
|
|
||||||
let result = &*notifier.notifications.get(&key).unwrap();
|
let result = test_notifier.notifications.get(&key).unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
expected_transaction.signature(),
|
expected_transaction.signature(),
|
||||||
result.transaction.signature()
|
result.transaction.signature()
|
||||||
|
|
Loading…
Reference in New Issue