ledger-tool: Notify geyser of transactions (#29933)

ledger-tool: Notify geyser of transactions
This commit is contained in:
Benno Fünfstück 2023-02-09 21:34:20 +01:00 committed by GitHub
parent 2f9146e8c8
commit e8c43aa0d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 49 additions and 14 deletions

1
Cargo.lock generated
View File

@ -5784,6 +5784,7 @@ dependencies = [
"solana-ledger", "solana-ledger",
"solana-logger 1.16.0", "solana-logger 1.16.0",
"solana-measure", "solana-measure",
"solana-rpc",
"solana-runtime", "solana-runtime",
"solana-sdk 1.16.0", "solana-sdk 1.16.0",
"solana-stake-program", "solana-stake-program",

View File

@ -33,6 +33,7 @@ solana-geyser-plugin-manager = { path = "../geyser-plugin-manager", version = "=
solana-ledger = { path = "../ledger", version = "=1.16.0" } solana-ledger = { path = "../ledger", version = "=1.16.0" }
solana-logger = { path = "../logger", version = "=1.16.0" } solana-logger = { path = "../logger", version = "=1.16.0" }
solana-measure = { path = "../measure", version = "=1.16.0" } solana-measure = { path = "../measure", version = "=1.16.0" }
solana-rpc = { path = "../rpc", version = "=1.16.0" }
solana-runtime = { path = "../runtime", version = "=1.16.0" } solana-runtime = { path = "../runtime", version = "=1.16.0" }
solana-sdk = { path = "../sdk", version = "=1.16.0" } solana-sdk = { path = "../sdk", version = "=1.16.0" }
solana-stake-program = { path = "../programs/stake", version = "=1.16.0" } solana-stake-program = { path = "../programs/stake", version = "=1.16.0" }

View File

@ -36,10 +36,16 @@ use {
AccessType, BlockstoreOptions, BlockstoreRecoveryMode, LedgerColumnOptions, AccessType, BlockstoreOptions, BlockstoreRecoveryMode, LedgerColumnOptions,
ShredStorageType, BLOCKSTORE_DIRECTORY_ROCKS_FIFO, ShredStorageType, BLOCKSTORE_DIRECTORY_ROCKS_FIFO,
}, },
blockstore_processor::{self, BlockstoreProcessorError, ProcessOptions}, blockstore_processor::{
self, BlockstoreProcessorError, ProcessOptions, TransactionStatusSender,
},
shred::Shred, shred::Shred,
}, },
solana_measure::{measure, measure::Measure}, solana_measure::{measure, measure::Measure},
solana_rpc::{
transaction_notifier_interface::TransactionNotifierLock,
transaction_status_service::TransactionStatusService,
},
solana_runtime::{ solana_runtime::{
accounts::Accounts, accounts::Accounts,
accounts_background_service::{ accounts_background_service::{
@ -1078,7 +1084,7 @@ fn get_accounts_db_config(ledger_path: &Path, arg_matches: &ArgMatches<'_>) -> A
fn load_bank_forks( fn load_bank_forks(
arg_matches: &ArgMatches, arg_matches: &ArgMatches,
genesis_config: &GenesisConfig, genesis_config: &GenesisConfig,
blockstore: &Blockstore, blockstore: Arc<Blockstore>,
process_options: ProcessOptions, process_options: ProcessOptions,
snapshot_archive_path: Option<PathBuf>, snapshot_archive_path: Option<PathBuf>,
incremental_snapshot_archive_path: Option<PathBuf>, incremental_snapshot_archive_path: Option<PathBuf>,
@ -1188,6 +1194,7 @@ fn load_bank_forks(
info!("done. {}", measure); info!("done. {}", measure);
let mut accounts_update_notifier = Option::<AccountsUpdateNotifier>::default(); let mut accounts_update_notifier = Option::<AccountsUpdateNotifier>::default();
let mut transaction_notifier = Option::<TransactionNotifierLock>::default();
if arg_matches.is_present("geyser_plugin_config") { if arg_matches.is_present("geyser_plugin_config") {
let geyser_config_files = values_t_or_exit!(arg_matches, "geyser_plugin_config", String) let geyser_config_files = values_t_or_exit!(arg_matches, "geyser_plugin_config", String)
.into_iter() .into_iter()
@ -1204,12 +1211,13 @@ fn load_bank_forks(
}, },
); );
accounts_update_notifier = geyser_service.get_accounts_update_notifier(); accounts_update_notifier = geyser_service.get_accounts_update_notifier();
transaction_notifier = geyser_service.get_transaction_notifier();
} }
let (bank_forks, leader_schedule_cache, starting_snapshot_hashes, ..) = let (bank_forks, leader_schedule_cache, starting_snapshot_hashes, ..) =
bank_forks_utils::load_bank_forks( bank_forks_utils::load_bank_forks(
genesis_config, genesis_config,
blockstore, blockstore.as_ref(),
account_paths, account_paths,
None, None,
snapshot_config.as_ref(), snapshot_config.as_ref(),
@ -1246,12 +1254,34 @@ fn load_bank_forks(
None, None,
); );
let (transaction_status_sender, transaction_status_service) = if transaction_notifier.is_some()
{
let (transaction_status_sender, transaction_status_receiver) = unbounded();
let transaction_status_service = TransactionStatusService::new(
transaction_status_receiver,
Arc::default(),
false,
transaction_notifier,
blockstore.clone(),
false,
&exit,
);
(
Some(TransactionStatusSender {
sender: transaction_status_sender,
}),
Some(transaction_status_service),
)
} else {
(None, None)
};
let result = blockstore_processor::process_blockstore_from_root( let result = blockstore_processor::process_blockstore_from_root(
blockstore, blockstore.as_ref(),
&bank_forks, &bank_forks,
&leader_schedule_cache, &leader_schedule_cache,
&process_options, &process_options,
None, transaction_status_sender.as_ref(),
None, None,
&accounts_background_request_sender, &accounts_background_request_sender,
) )
@ -1259,6 +1289,9 @@ fn load_bank_forks(
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
accounts_background_service.join().unwrap(); accounts_background_service.join().unwrap();
if let Some(service) = transaction_status_service {
service.join().unwrap();
}
result result
} }
@ -2475,7 +2508,7 @@ fn main() {
match load_bank_forks( match load_bank_forks(
arg_matches, arg_matches,
&genesis_config, &genesis_config,
&blockstore, Arc::new(blockstore),
process_options, process_options,
snapshot_archive_path, snapshot_archive_path,
incremental_snapshot_archive_path, incremental_snapshot_archive_path,
@ -2566,7 +2599,7 @@ fn main() {
match load_bank_forks( match load_bank_forks(
arg_matches, arg_matches,
&genesis_config, &genesis_config,
&blockstore, Arc::new(blockstore),
process_options, process_options,
snapshot_archive_path, snapshot_archive_path,
incremental_snapshot_archive_path, incremental_snapshot_archive_path,
@ -2801,7 +2834,7 @@ fn main() {
let (bank_forks, ..) = load_bank_forks( let (bank_forks, ..) = load_bank_forks(
arg_matches, arg_matches,
&genesis_config, &genesis_config,
&blockstore, Arc::new(blockstore),
process_options, process_options,
snapshot_archive_path, snapshot_archive_path,
incremental_snapshot_archive_path, incremental_snapshot_archive_path,
@ -2844,7 +2877,7 @@ fn main() {
match load_bank_forks( match load_bank_forks(
arg_matches, arg_matches,
&open_genesis_config_by(&ledger_path, arg_matches), &open_genesis_config_by(&ledger_path, arg_matches),
&blockstore, Arc::new(blockstore),
process_options, process_options,
snapshot_archive_path, snapshot_archive_path,
incremental_snapshot_archive_path, incremental_snapshot_archive_path,
@ -2948,12 +2981,12 @@ fn main() {
usize usize
); );
let genesis_config = open_genesis_config_by(&ledger_path, arg_matches); let genesis_config = open_genesis_config_by(&ledger_path, arg_matches);
let blockstore = open_blockstore( let blockstore = Arc::new(open_blockstore(
&ledger_path, &ledger_path,
AccessType::Secondary, AccessType::Secondary,
wal_recovery_mode, wal_recovery_mode,
force_update_to_open, force_update_to_open,
); ));
let snapshot_slot = if Some("ROOT") == arg_matches.value_of("snapshot_slot") { let snapshot_slot = if Some("ROOT") == arg_matches.value_of("snapshot_slot") {
blockstore blockstore
@ -3009,7 +3042,7 @@ fn main() {
match load_bank_forks( match load_bank_forks(
arg_matches, arg_matches,
&genesis_config, &genesis_config,
&blockstore, blockstore.clone(),
ProcessOptions { ProcessOptions {
new_hard_forks, new_hard_forks,
halt_at_slot: Some(snapshot_slot), halt_at_slot: Some(snapshot_slot),
@ -3351,7 +3384,7 @@ fn main() {
let (bank_forks, ..) = load_bank_forks( let (bank_forks, ..) = load_bank_forks(
arg_matches, arg_matches,
&genesis_config, &genesis_config,
&blockstore, Arc::new(blockstore),
process_options, process_options,
snapshot_archive_path, snapshot_archive_path,
incremental_snapshot_archive_path, incremental_snapshot_archive_path,
@ -3439,7 +3472,7 @@ fn main() {
match load_bank_forks( match load_bank_forks(
arg_matches, arg_matches,
&genesis_config, &genesis_config,
&blockstore, Arc::new(blockstore),
process_options, process_options,
snapshot_archive_path, snapshot_archive_path,
incremental_snapshot_archive_path, incremental_snapshot_archive_path,