Accountsdb plugin transaction part 3: Transaction Notifier (#21374)

The TransactionNotifierInterface interface for notifying transactions.
Changes to transaction_status_service to notify the notifier of the transaction data.
Interface to query the plugin's interest in transaction data
This commit is contained in:
Lijun Wang 2021-11-23 09:55:53 -08:00 committed by GitHub
parent 2602e7c3bc
commit c29838fce1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 481 additions and 48 deletions

3
Cargo.lock generated
View File

@ -4349,6 +4349,8 @@ name = "solana-accountsdb-plugin-interface"
version = "1.9.0" version = "1.9.0"
dependencies = [ dependencies = [
"log 0.4.14", "log 0.4.14",
"solana-sdk",
"solana-transaction-status",
"thiserror", "thiserror",
] ]
@ -4370,6 +4372,7 @@ dependencies = [
"solana-rpc", "solana-rpc",
"solana-runtime", "solana-runtime",
"solana-sdk", "solana-sdk",
"solana-transaction-status",
"thiserror", "thiserror",
] ]

View File

@ -12,6 +12,8 @@ documentation = "https://docs.rs/solana-validator"
[dependencies] [dependencies]
log = "0.4.11" log = "0.4.11"
thiserror = "1.0.30" thiserror = "1.0.30"
solana-sdk = { path = "../sdk", version = "=1.9.0" }
solana-transaction-status = { path = "../transaction-status", version = "=1.9.0" }
[package.metadata.docs.rs] [package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"] targets = ["x86_64-unknown-linux-gnu"]

View File

@ -3,6 +3,8 @@
/// In addition, the dynamic library must export a "C" function _create_plugin which /// In addition, the dynamic library must export a "C" function _create_plugin which
/// creates the implementation of the plugin. /// creates the implementation of the plugin.
use { use {
solana_sdk::{signature::Signature, transaction::SanitizedTransaction},
solana_transaction_status::TransactionStatusMeta,
std::{any::Any, error, io}, std::{any::Any, error, io},
thiserror::Error, thiserror::Error,
}; };
@ -24,6 +26,18 @@ pub enum ReplicaAccountInfoVersions<'a> {
V0_0_1(&'a ReplicaAccountInfo<'a>), V0_0_1(&'a ReplicaAccountInfo<'a>),
} }
#[derive(Clone, Debug)]
pub struct ReplicaTransactionInfo<'a> {
pub signature: &'a Signature,
pub is_vote: bool,
pub transaction: &'a SanitizedTransaction,
pub transaction_status_meta: &'a TransactionStatusMeta,
}
pub enum ReplicaTransactionInfoVersions<'a> {
V0_0_1(&'a ReplicaTransactionInfo<'a>),
}
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum AccountsDbPluginError { pub enum AccountsDbPluginError {
#[error("Error opening config file. Error detail: ({0}).")] #[error("Error opening config file. Error detail: ({0}).")]
@ -105,10 +119,27 @@ pub trait AccountsDbPlugin: Any + Send + Sync + std::fmt::Debug {
Ok(()) Ok(())
} }
/// Called when a transaction is updated at a slot.
#[allow(unused_variables)]
fn notify_transaction(
&mut self,
transaction: ReplicaTransactionInfoVersions,
slot: u64,
) -> Result<()> {
Ok(())
}
/// Check if the plugin is interested in account data /// Check if the plugin is interested in account data
/// Default is true -- if the plugin is not interested in /// Default is true -- if the plugin is not interested in
/// account data, please return false. /// account data, please return false.
fn to_notify_account_data(&self) -> bool { fn account_data_notifications_enabled(&self) -> bool {
true true
} }
/// Check if the plugin is interested in transaction data
/// Default is false -- if the plugin is not interested in
/// transaction data, please return false.
fn transaction_notifications_enabled(&self) -> bool {
false
}
} }

View File

@ -24,6 +24,7 @@ solana-metrics = { path = "../metrics", version = "=1.9.0" }
solana-rpc = { path = "../rpc", version = "=1.9.0" } solana-rpc = { path = "../rpc", version = "=1.9.0" }
solana-runtime = { path = "../runtime", version = "=1.9.0" } solana-runtime = { path = "../runtime", version = "=1.9.0" }
solana-sdk = { path = "../sdk", version = "=1.9.0" } solana-sdk = { path = "../sdk", version = "=1.9.0" }
solana-transaction-status = { path = "../transaction-status", version = "=1.9.0" }
thiserror = "1.0.30" thiserror = "1.0.30"
[package.metadata.docs.rs] [package.metadata.docs.rs]

View File

@ -54,9 +54,19 @@ impl AccountsDbPluginManager {
} }
/// Check if there is any plugin interested in account data /// Check if there is any plugin interested in account data
pub fn to_notify_account_data(&self) -> bool { pub fn account_data_notifications_enabled(&self) -> bool {
for plugin in &self.plugins { for plugin in &self.plugins {
if plugin.to_notify_account_data() { if plugin.account_data_notifications_enabled() {
return true;
}
}
false
}
/// Check if there is any plugin interested in transaction data
pub fn transaction_notifications_enabled(&self) -> bool {
for plugin in &self.plugins {
if plugin.transaction_notifications_enabled() {
return true; return true;
} }
} }

View File

@ -3,11 +3,15 @@ use {
accounts_update_notifier::AccountsUpdateNotifierImpl, accounts_update_notifier::AccountsUpdateNotifierImpl,
accountsdb_plugin_manager::AccountsDbPluginManager, accountsdb_plugin_manager::AccountsDbPluginManager,
slot_status_notifier::SlotStatusNotifierImpl, slot_status_observer::SlotStatusObserver, slot_status_notifier::SlotStatusNotifierImpl, slot_status_observer::SlotStatusObserver,
transaction_notifier::TransactionNotifierImpl,
}, },
crossbeam_channel::Receiver, crossbeam_channel::Receiver,
log::*, log::*,
serde_json, serde_json,
solana_rpc::optimistically_confirmed_bank_tracker::BankNotification, solana_rpc::{
optimistically_confirmed_bank_tracker::BankNotification,
transaction_notifier_interface::TransactionNotifierLock,
},
solana_runtime::accounts_update_notifier_interface::AccountsUpdateNotifier, solana_runtime::accounts_update_notifier_interface::AccountsUpdateNotifier,
std::{ std::{
fs::File, fs::File,
@ -45,6 +49,7 @@ pub struct AccountsDbPluginService {
slot_status_observer: Option<SlotStatusObserver>, slot_status_observer: Option<SlotStatusObserver>,
plugin_manager: Arc<RwLock<AccountsDbPluginManager>>, plugin_manager: Arc<RwLock<AccountsDbPluginManager>>,
accounts_update_notifier: Option<AccountsUpdateNotifier>, accounts_update_notifier: Option<AccountsUpdateNotifier>,
transaction_notifier: Option<TransactionNotifierLock>,
} }
impl AccountsDbPluginService { impl AccountsDbPluginService {
@ -74,18 +79,31 @@ impl AccountsDbPluginService {
for accountsdb_plugin_config_file in accountsdb_plugin_config_files { for accountsdb_plugin_config_file in accountsdb_plugin_config_files {
Self::load_plugin(&mut plugin_manager, accountsdb_plugin_config_file)?; Self::load_plugin(&mut plugin_manager, accountsdb_plugin_config_file)?;
} }
let to_notify_account_data = plugin_manager.to_notify_account_data(); let account_data_notifications_enabled =
plugin_manager.account_data_notifications_enabled();
let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled();
let plugin_manager = Arc::new(RwLock::new(plugin_manager)); let plugin_manager = Arc::new(RwLock::new(plugin_manager));
let accounts_update_notifier: Option<AccountsUpdateNotifier> = if to_notify_account_data { let accounts_update_notifier: Option<AccountsUpdateNotifier> =
let accounts_update_notifier = AccountsUpdateNotifierImpl::new(plugin_manager.clone()); if account_data_notifications_enabled {
let accounts_update_notifier =
AccountsUpdateNotifierImpl::new(plugin_manager.clone());
Some(Arc::new(RwLock::new(accounts_update_notifier))) Some(Arc::new(RwLock::new(accounts_update_notifier)))
} else { } else {
None None
}; };
let slot_status_observer = if to_notify_account_data { let transaction_notifier: Option<TransactionNotifierLock> =
if transaction_notifications_enabled {
let transaction_notifier = TransactionNotifierImpl::new(plugin_manager.clone());
Some(Arc::new(RwLock::new(transaction_notifier)))
} else {
None
};
let slot_status_observer =
if account_data_notifications_enabled || transaction_notifications_enabled {
let slot_status_notifier = SlotStatusNotifierImpl::new(plugin_manager.clone()); let slot_status_notifier = SlotStatusNotifierImpl::new(plugin_manager.clone());
let slot_status_notifier = Arc::new(RwLock::new(slot_status_notifier)); let slot_status_notifier = Arc::new(RwLock::new(slot_status_notifier));
Some(SlotStatusObserver::new( Some(SlotStatusObserver::new(
@ -101,6 +119,7 @@ impl AccountsDbPluginService {
slot_status_observer, slot_status_observer,
plugin_manager, plugin_manager,
accounts_update_notifier, accounts_update_notifier,
transaction_notifier,
}) })
} }
@ -163,6 +182,10 @@ impl AccountsDbPluginService {
self.accounts_update_notifier.clone() self.accounts_update_notifier.clone()
} }
pub fn get_transaction_notifier(&self) -> Option<TransactionNotifierLock> {
self.transaction_notifier.clone()
}
pub fn join(self) -> thread::Result<()> { pub fn join(self) -> thread::Result<()> {
if let Some(mut slot_status_observer) = self.slot_status_observer { if let Some(mut slot_status_observer) = self.slot_status_observer {
slot_status_observer.join()?; slot_status_observer.join()?;

View File

@ -3,3 +3,4 @@ pub mod accountsdb_plugin_manager;
pub mod accountsdb_plugin_service; pub mod accountsdb_plugin_service;
pub mod slot_status_notifier; pub mod slot_status_notifier;
pub mod slot_status_observer; pub mod slot_status_observer;
pub mod transaction_notifier;

View File

@ -0,0 +1,93 @@
/// Module responsible for notifying plugins of transactions
use {
crate::accountsdb_plugin_manager::AccountsDbPluginManager,
log::*,
solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{
ReplicaTransactionInfo, ReplicaTransactionInfoVersions,
},
solana_measure::measure::Measure,
solana_metrics::*,
solana_rpc::transaction_notifier_interface::TransactionNotifier,
solana_runtime::bank,
solana_sdk::{clock::Slot, signature::Signature, transaction::SanitizedTransaction},
solana_transaction_status::TransactionStatusMeta,
std::sync::{Arc, RwLock},
};
/// This implementation of TransactionNotifier is passed to the rpc's TransactionStatusService
/// at the validator startup. TransactionStatusService invokes the notify_transaction method
/// for new transactions. The implementation in turn invokes the notify_transaction of each
/// plugin enabled with transaction notification managed by the AccountsDbPluginManager.
pub(crate) struct TransactionNotifierImpl {
plugin_manager: Arc<RwLock<AccountsDbPluginManager>>,
}
impl TransactionNotifier for TransactionNotifierImpl {
fn notify_transaction(
&self,
slot: Slot,
signature: &Signature,
transaction_status_meta: &TransactionStatusMeta,
transaction: &SanitizedTransaction,
) {
let mut measure = Measure::start("accountsdb-plugin-notify_plugins_of_transaction_info");
let transaction_log_info =
Self::build_replica_transaction_info(signature, transaction_status_meta, transaction);
let mut plugin_manager = self.plugin_manager.write().unwrap();
if plugin_manager.plugins.is_empty() {
return;
}
for plugin in plugin_manager.plugins.iter_mut() {
if !plugin.transaction_notifications_enabled() {
continue;
}
match plugin.notify_transaction(
ReplicaTransactionInfoVersions::V0_0_1(&transaction_log_info),
slot,
) {
Err(err) => {
error!(
"Failed to notify transaction, error: ({}) to plugin {}",
err,
plugin.name()
)
}
Ok(_) => {
trace!(
"Successfully notified transaction to plugin {}",
plugin.name()
);
}
}
}
measure.stop();
inc_new_counter_debug!(
"accountsdb-plugin-notify_plugins_of_transaction_info-us",
measure.as_us() as usize,
10000,
10000
);
}
}
impl TransactionNotifierImpl {
pub fn new(plugin_manager: Arc<RwLock<AccountsDbPluginManager>>) -> Self {
Self { plugin_manager }
}
fn build_replica_transaction_info<'a>(
signature: &'a Signature,
transaction_status_meta: &'a TransactionStatusMeta,
transaction: &'a SanitizedTransaction,
) -> ReplicaTransactionInfo<'a> {
ReplicaTransactionInfo {
signature,
is_vote: bank::is_simple_vote_transaction(transaction),
transaction,
transaction_status_meta,
}
}
}

View File

@ -280,7 +280,7 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
/// Check if the plugin is interested in account data /// Check if the plugin is interested in account data
/// Default is true -- if the plugin is not interested in /// Default is true -- if the plugin is not interested in
/// account data, please return false. /// account data, please return false.
fn to_notify_account_data(&self) -> bool { fn account_data_notifications_enabled(&self) -> bool {
self.accounts_selector self.accounts_selector
.as_ref() .as_ref()
.map_or_else(|| false, |selector| selector.is_enabled()) .map_or_else(|| false, |selector| selector.is_enabled())

View File

@ -2541,6 +2541,8 @@ mod tests {
let transaction_status_service = TransactionStatusService::new( let transaction_status_service = TransactionStatusService::new(
transaction_status_receiver, transaction_status_receiver,
Arc::new(AtomicU64::default()), Arc::new(AtomicU64::default()),
true,
None,
blockstore.clone(), blockstore.clone(),
&Arc::new(AtomicBool::new(false)), &Arc::new(AtomicBool::new(false)),
); );

View File

@ -60,6 +60,7 @@ use {
rpc_pubsub_service::{PubSubConfig, PubSubService}, rpc_pubsub_service::{PubSubConfig, PubSubService},
rpc_service::JsonRpcService, rpc_service::JsonRpcService,
rpc_subscriptions::RpcSubscriptions, rpc_subscriptions::RpcSubscriptions,
transaction_notifier_interface::TransactionNotifierLock,
transaction_status_service::TransactionStatusService, transaction_status_service::TransactionStatusService,
}, },
solana_runtime::{ solana_runtime::{
@ -420,9 +421,18 @@ impl Validator {
.and_then(|accountsdb_plugin_service| { .and_then(|accountsdb_plugin_service| {
accountsdb_plugin_service.get_accounts_update_notifier() accountsdb_plugin_service.get_accounts_update_notifier()
}); });
let transaction_notifier =
accountsdb_plugin_service
.as_ref()
.and_then(|accountsdb_plugin_service| {
accountsdb_plugin_service.get_transaction_notifier()
});
info!( info!(
"AccountsDb plugin: accounts_update_notifier: {}", "AccountsDb plugin: accounts_update_notifier: {} transaction_notifier: {}",
accounts_update_notifier.is_some() accounts_update_notifier.is_some(),
transaction_notifier.is_some()
); );
let system_monitor_service = Some(SystemMonitorService::new( let system_monitor_service = Some(SystemMonitorService::new(
@ -461,6 +471,7 @@ impl Validator {
config.no_poh_speed_test, config.no_poh_speed_test,
accounts_package_channel.0.clone(), accounts_package_channel.0.clone(),
accounts_update_notifier, accounts_update_notifier,
transaction_notifier,
); );
*start_progress.write().unwrap() = ValidatorStartProgress::StartingServices; *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;
@ -1183,6 +1194,7 @@ fn new_banks_from_ledger(
no_poh_speed_test: bool, no_poh_speed_test: bool,
accounts_package_sender: AccountsPackageSender, accounts_package_sender: AccountsPackageSender,
accounts_update_notifier: Option<AccountsUpdateNotifier>, accounts_update_notifier: Option<AccountsUpdateNotifier>,
transaction_notifier: Option<TransactionNotifierLock>,
) -> ( ) -> (
GenesisConfig, GenesisConfig,
BankForks, BankForks,
@ -1275,12 +1287,17 @@ fn new_banks_from_ledger(
..blockstore_processor::ProcessOptions::default() ..blockstore_processor::ProcessOptions::default()
}; };
let enable_rpc_transaction_history =
config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history;
let is_plugin_transaction_history_required = transaction_notifier.as_ref().is_some();
let transaction_history_services = let transaction_history_services =
if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history { if enable_rpc_transaction_history || is_plugin_transaction_history_required {
initialize_rpc_transaction_history_services( initialize_rpc_transaction_history_services(
blockstore.clone(), blockstore.clone(),
exit, exit,
enable_rpc_transaction_history,
config.rpc_config.enable_cpi_and_log_storage, config.rpc_config.enable_cpi_and_log_storage,
transaction_notifier,
) )
} else { } else {
TransactionHistoryServices::default() TransactionHistoryServices::default()
@ -1471,7 +1488,9 @@ fn backup_and_clear_blockstore(ledger_path: &Path, start_slot: Slot, shred_versi
fn initialize_rpc_transaction_history_services( fn initialize_rpc_transaction_history_services(
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
enable_rpc_transaction_history: bool,
enable_cpi_and_log_storage: bool, enable_cpi_and_log_storage: bool,
transaction_notifier: Option<TransactionNotifierLock>,
) -> TransactionHistoryServices { ) -> TransactionHistoryServices {
let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root())); let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
let (transaction_status_sender, transaction_status_receiver) = unbounded(); let (transaction_status_sender, transaction_status_receiver) = unbounded();
@ -1482,6 +1501,8 @@ fn initialize_rpc_transaction_history_services(
let transaction_status_service = Some(TransactionStatusService::new( let transaction_status_service = Some(TransactionStatusService::new(
transaction_status_receiver, transaction_status_receiver,
max_complete_transaction_status_slot.clone(), max_complete_transaction_status_slot.clone(),
enable_rpc_transaction_history,
transaction_notifier.clone(),
blockstore.clone(), blockstore.clone(),
exit, exit,
)); ));

View File

@ -11,6 +11,7 @@ pub mod rpc_pubsub_service;
pub mod rpc_service; pub mod rpc_service;
pub mod rpc_subscription_tracker; pub mod rpc_subscription_tracker;
pub mod rpc_subscriptions; pub mod rpc_subscriptions;
pub mod transaction_notifier_interface;
pub mod transaction_status_service; pub mod transaction_status_service;
#[macro_use] #[macro_use]

View File

@ -4263,6 +4263,8 @@ pub fn create_test_transactions_and_populate_blockstore(
crate::transaction_status_service::TransactionStatusService::new( crate::transaction_status_service::TransactionStatusService::new(
transaction_status_receiver, transaction_status_receiver,
max_complete_transaction_status_slot, max_complete_transaction_status_slot,
true,
None,
blockstore, blockstore,
&Arc::new(AtomicBool::new(false)), &Arc::new(AtomicBool::new(false)),
); );

View File

@ -0,0 +1,17 @@
use {
solana_sdk::{clock::Slot, signature::Signature, transaction::SanitizedTransaction},
solana_transaction_status::TransactionStatusMeta,
std::sync::{Arc, RwLock},
};
pub trait TransactionNotifier {
fn notify_transaction(
&self,
slot: Slot,
signature: &Signature,
transaction_status_meta: &TransactionStatusMeta,
transaction: &SanitizedTransaction,
);
}
pub type TransactionNotifierLock = Arc<RwLock<dyn TransactionNotifier + Sync + Send>>;

View File

@ -1,4 +1,5 @@
use { use {
crate::transaction_notifier_interface::TransactionNotifierLock,
crossbeam_channel::{Receiver, RecvTimeoutError}, crossbeam_channel::{Receiver, RecvTimeoutError},
itertools::izip, itertools::izip,
solana_ledger::{ solana_ledger::{
@ -28,6 +29,8 @@ impl TransactionStatusService {
pub fn new( pub fn new(
write_transaction_status_receiver: Receiver<TransactionStatusMessage>, write_transaction_status_receiver: Receiver<TransactionStatusMessage>,
max_complete_transaction_status_slot: Arc<AtomicU64>, max_complete_transaction_status_slot: Arc<AtomicU64>,
enable_rpc_transaction_history: bool,
transaction_notifier: Option<TransactionNotifierLock>,
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> Self { ) -> Self {
@ -38,9 +41,12 @@ impl TransactionStatusService {
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
break; break;
} }
if let Err(RecvTimeoutError::Disconnected) = Self::write_transaction_status_batch( if let Err(RecvTimeoutError::Disconnected) = Self::write_transaction_status_batch(
&write_transaction_status_receiver, &write_transaction_status_receiver,
&max_complete_transaction_status_slot, &max_complete_transaction_status_slot,
enable_rpc_transaction_history,
transaction_notifier.clone(),
&blockstore, &blockstore,
) { ) {
break; break;
@ -53,6 +59,8 @@ impl TransactionStatusService {
fn write_transaction_status_batch( fn write_transaction_status_batch(
write_transaction_status_receiver: &Receiver<TransactionStatusMessage>, write_transaction_status_receiver: &Receiver<TransactionStatusMessage>,
max_complete_transaction_status_slot: &Arc<AtomicU64>, max_complete_transaction_status_slot: &Arc<AtomicU64>,
enable_rpc_transaction_history: bool,
transaction_notifier: Option<TransactionNotifierLock>,
blockstore: &Arc<Blockstore>, blockstore: &Arc<Blockstore>,
) -> Result<(), RecvTimeoutError> { ) -> Result<(), RecvTimeoutError> {
match write_transaction_status_receiver.recv_timeout(Duration::from_secs(1))? { match write_transaction_status_receiver.recv_timeout(Duration::from_secs(1))? {
@ -145,6 +153,27 @@ impl TransactionStatusService {
.collect(), .collect(),
); );
let transaction_status_meta = TransactionStatusMeta {
status,
fee,
pre_balances,
post_balances,
inner_instructions,
log_messages,
pre_token_balances,
post_token_balances,
rewards,
};
if let Some(transaction_notifier) = transaction_notifier.as_ref() {
transaction_notifier.write().unwrap().notify_transaction(
slot,
transaction.signature(),
&transaction_status_meta,
&transaction,
);
}
if enable_rpc_transaction_history {
if let Some(memos) = extract_and_fmt_memos(transaction.message()) { if let Some(memos) = extract_and_fmt_memos(transaction.message()) {
blockstore blockstore
.write_transaction_memos(transaction.signature(), memos) .write_transaction_memos(transaction.signature(), memos)
@ -157,22 +186,13 @@ impl TransactionStatusService {
*transaction.signature(), *transaction.signature(),
tx_account_locks.writable, tx_account_locks.writable,
tx_account_locks.readonly, tx_account_locks.readonly,
TransactionStatusMeta { transaction_status_meta,
status,
fee,
pre_balances,
post_balances,
inner_instructions,
log_messages,
pre_token_balances,
post_token_balances,
rewards,
},
) )
.expect("Expect database write to succeed: TransactionStatus"); .expect("Expect database write to succeed: TransactionStatus");
} }
} }
} }
}
TransactionStatusMessage::Freeze(slot) => { TransactionStatusMessage::Freeze(slot) => {
max_complete_transaction_status_slot.fetch_max(slot, Ordering::SeqCst); max_complete_transaction_status_slot.fetch_max(slot, Ordering::SeqCst);
} }
@ -184,3 +204,209 @@ impl TransactionStatusService {
self.thread_hdl.join() self.thread_hdl.join()
} }
} }
#[cfg(test)]
pub(crate) mod tests {
use {
super::*,
crate::transaction_notifier_interface::TransactionNotifier,
crossbeam_channel::unbounded,
dashmap::DashMap,
solana_account_decoder::parse_token::token_amount_to_ui_amount,
solana_ledger::{genesis_utils::create_genesis_config, get_tmp_ledger_path},
solana_runtime::bank::{NonceFull, NoncePartial, RentDebits, TransactionBalancesSet},
solana_sdk::{
account_utils::StateMut,
clock::Slot,
hash::Hash,
instruction::CompiledInstruction,
message::{Message, MessageHeader, SanitizedMessage},
nonce, nonce_account,
pubkey::Pubkey,
signature::{Keypair, Signature, Signer},
system_transaction,
transaction::{
SanitizedTransaction, Transaction, TransactionError, VersionedTransaction,
},
},
solana_transaction_status::{
token_balances::TransactionTokenBalancesSet, TransactionStatusMeta,
TransactionTokenBalance,
},
std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::sleep,
time::Duration,
},
};
struct TestTransactionNotifier {
notifications: DashMap<(Slot, Signature), (TransactionStatusMeta, SanitizedTransaction)>,
}
impl TestTransactionNotifier {
pub fn new() -> Self {
Self {
notifications: DashMap::default(),
}
}
}
impl TransactionNotifier for TestTransactionNotifier {
fn notify_transaction(
&self,
slot: Slot,
signature: &Signature,
transaction_status_meta: &TransactionStatusMeta,
transaction: &SanitizedTransaction,
) {
self.notifications.insert(
(slot, *signature),
(transaction_status_meta.clone(), transaction.clone()),
);
}
}
fn build_test_transaction_legacy() -> Transaction {
let keypair1 = Keypair::new();
let pubkey1 = keypair1.pubkey();
let zero = Hash::default();
system_transaction::transfer(&keypair1, &pubkey1, 42, zero)
}
fn build_message() -> Message {
Message {
header: MessageHeader {
num_readonly_signed_accounts: 11,
num_readonly_unsigned_accounts: 12,
num_required_signatures: 13,
},
account_keys: vec![Pubkey::new_unique()],
recent_blockhash: Hash::new_unique(),
instructions: vec![CompiledInstruction {
program_id_index: 1,
accounts: vec![1, 2, 3],
data: vec![4, 5, 6],
}],
}
}
#[test]
fn test_notify_transaction() {
let genesis_config = create_genesis_config(2).genesis_config;
let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config));
let (transaction_status_sender, transaction_status_receiver) = unbounded();
let ledger_path = get_tmp_ledger_path!();
let blockstore =
Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger");
let blockstore = Arc::new(blockstore);
let message_hash = Hash::new_unique();
let transaction = build_test_transaction_legacy();
let transaction = VersionedTransaction::from(transaction);
let transaction =
SanitizedTransaction::try_create(transaction, message_hash, Some(true), |_| {
Err(TransactionError::UnsupportedVersion)
})
.unwrap();
let expected_transaction = transaction.clone();
let pubkey = Pubkey::new_unique();
let mut nonce_account = nonce_account::create_account(1).into_inner();
let data = nonce::state::Data::new(Pubkey::new(&[1u8; 32]), Hash::new(&[42u8; 32]), 42);
nonce_account
.set_state(&nonce::state::Versions::new_current(
nonce::State::Initialized(data),
))
.unwrap();
let message = build_message();
let rollback_partial = NoncePartial::new(pubkey, nonce_account.clone());
let mut rent_debits = RentDebits::default();
rent_debits.insert(&pubkey, 123, 456);
let transaction_result = (
Ok(()),
Some(
NonceFull::from_partial(
rollback_partial,
&SanitizedMessage::Legacy(message),
&[(pubkey, nonce_account)],
&rent_debits,
)
.unwrap(),
),
);
let balances = TransactionBalancesSet {
pre_balances: vec![vec![123456]],
post_balances: vec![vec![234567]],
};
let owner = Pubkey::new_unique().to_string();
let pre_token_balance = TransactionTokenBalance {
account_index: 0,
mint: Pubkey::new_unique().to_string(),
ui_token_amount: token_amount_to_ui_amount(42, 2),
owner: owner.clone(),
};
let post_token_balance = TransactionTokenBalance {
account_index: 0,
mint: Pubkey::new_unique().to_string(),
ui_token_amount: token_amount_to_ui_amount(58, 2),
owner,
};
let token_balances = TransactionTokenBalancesSet {
pre_token_balances: vec![vec![pre_token_balance]],
post_token_balances: vec![vec![post_token_balance]],
};
let slot = bank.slot();
let signature = *transaction.signature();
let transaction_status_batch = TransactionStatusBatch {
bank,
transactions: vec![transaction],
statuses: vec![transaction_result],
balances,
token_balances,
inner_instructions: None,
transaction_logs: None,
rent_debits: vec![rent_debits],
};
let test_notifier = Arc::new(RwLock::new(TestTransactionNotifier::new()));
let exit = Arc::new(AtomicBool::new(false));
let transaction_status_service = TransactionStatusService::new(
transaction_status_receiver,
Arc::new(AtomicU64::default()),
false,
Some(test_notifier.clone()),
blockstore,
&exit,
);
transaction_status_sender
.send(TransactionStatusMessage::Batch(transaction_status_batch))
.unwrap();
sleep(Duration::from_millis(500));
exit.store(true, Ordering::Relaxed);
transaction_status_service.join().unwrap();
let notifier = test_notifier.read().unwrap();
assert_eq!(notifier.notifications.len(), 1);
assert!(notifier.notifications.contains_key(&(slot, signature)));
let result = &*notifier.notifications.get(&(slot, signature)).unwrap();
assert_eq!(expected_transaction.signature(), result.1.signature());
}
}