From 0e91e969672dfdaf8a8bd0fbd06cbcdab27e6ac3 Mon Sep 17 00:00:00 2001 From: Tyera Date: Tue, 14 Nov 2023 09:49:26 -0700 Subject: [PATCH] Geyser: add starting entry to ReplicaEntryInfo(V2) (#33963) * Add ReplicaEntryInfoV2 * Add starting_transaction_index field to EntryNotification * Populate starting_transaction_index in replay stage * Cache and populate starting_transaction_index in banking stage * Build ReplicaEntryInfoV2 --- core/src/tpu_entry_notifier.rs | 6 ++++++ .../src/geyser_plugin_interface.rs | 19 +++++++++++++++++ geyser-plugin-manager/src/entry_notifier.rs | 21 +++++++++++++------ ledger/src/blockstore_processor.rs | 1 + ledger/src/entry_notifier_interface.rs | 8 ++++++- ledger/src/entry_notifier_service.rs | 11 +++++++--- 6 files changed, 56 insertions(+), 10 deletions(-) diff --git a/core/src/tpu_entry_notifier.rs b/core/src/tpu_entry_notifier.rs index 730a3b14f..22994455e 100644 --- a/core/src/tpu_entry_notifier.rs +++ b/core/src/tpu_entry_notifier.rs @@ -29,6 +29,7 @@ impl TpuEntryNotifier { .spawn(move || { let mut current_slot = 0; let mut current_index = 0; + let mut current_transaction_index = 0; loop { if exit.load(Ordering::Relaxed) { break; @@ -41,6 +42,7 @@ impl TpuEntryNotifier { &broadcast_entry_sender, &mut current_slot, &mut current_index, + &mut current_transaction_index, ) { break; } @@ -57,11 +59,13 @@ impl TpuEntryNotifier { broadcast_entry_sender: &Sender, current_slot: &mut u64, current_index: &mut usize, + current_transaction_index: &mut usize, ) -> Result<(), RecvTimeoutError> { let (bank, (entry, tick_height)) = entry_receiver.recv_timeout(Duration::from_secs(1))?; let slot = bank.slot(); let index = if slot != *current_slot { *current_index = 0; + *current_transaction_index = 0; *current_slot = slot; 0 } else { @@ -78,11 +82,13 @@ impl TpuEntryNotifier { slot, index, entry: entry_summary, + starting_transaction_index: *current_transaction_index, }) { warn!( "Failed to send slot {slot:?} entry {index:?} from Tpu to EntryNotifierService, error {err:?}", ); } + *current_transaction_index += entry.transactions.len(); if let Err(err) = broadcast_entry_sender.send((bank, (entry, tick_height))) { warn!( diff --git a/geyser-plugin-interface/src/geyser_plugin_interface.rs b/geyser-plugin-interface/src/geyser_plugin_interface.rs index 8b31aba48..8cfea4c76 100644 --- a/geyser-plugin-interface/src/geyser_plugin_interface.rs +++ b/geyser-plugin-interface/src/geyser_plugin_interface.rs @@ -185,12 +185,31 @@ pub struct ReplicaEntryInfo<'a> { pub executed_transaction_count: u64, } +#[derive(Clone, Debug)] +#[repr(C)] +pub struct ReplicaEntryInfoV2<'a> { + /// The slot number of the block containing this Entry + pub slot: Slot, + /// The Entry's index in the block + pub index: usize, + /// The number of hashes since the previous Entry + pub num_hashes: u64, + /// The Entry's SHA-256 hash, generated from the previous Entry's hash with + /// `solana_entry::entry::next_hash()` + pub hash: &'a [u8], + /// The number of executed transactions in the Entry + pub executed_transaction_count: u64, + /// The index-in-block of the first executed transaction in this Entry + pub starting_transaction_index: usize, +} + /// A wrapper to future-proof ReplicaEntryInfo handling. To make a change to the structure of /// ReplicaEntryInfo, add an new enum variant wrapping a newer version, which will force plugin /// implementations to handle the change. #[repr(u32)] pub enum ReplicaEntryInfoVersions<'a> { V0_0_1(&'a ReplicaEntryInfo<'a>), + V0_0_2(&'a ReplicaEntryInfoV2<'a>), } #[derive(Clone, Debug)] diff --git a/geyser-plugin-manager/src/entry_notifier.rs b/geyser-plugin-manager/src/entry_notifier.rs index ce6c3239c..ea14592b6 100644 --- a/geyser-plugin-manager/src/entry_notifier.rs +++ b/geyser-plugin-manager/src/entry_notifier.rs @@ -4,7 +4,7 @@ use { log::*, solana_entry::entry::EntrySummary, solana_geyser_plugin_interface::geyser_plugin_interface::{ - ReplicaEntryInfo, ReplicaEntryInfoVersions, + ReplicaEntryInfoV2, ReplicaEntryInfoVersions, }, solana_ledger::entry_notifier_interface::EntryNotifier, solana_measure::measure::Measure, @@ -18,7 +18,13 @@ pub(crate) struct EntryNotifierImpl { } impl EntryNotifier for EntryNotifierImpl { - fn notify_entry<'a>(&'a self, slot: Slot, index: usize, entry: &'a EntrySummary) { + fn notify_entry<'a>( + &'a self, + slot: Slot, + index: usize, + entry: &'a EntrySummary, + starting_transaction_index: usize, + ) { let mut measure = Measure::start("geyser-plugin-notify_plugins_of_entry_info"); let plugin_manager = self.plugin_manager.read().unwrap(); @@ -26,13 +32,14 @@ impl EntryNotifier for EntryNotifierImpl { return; } - let entry_info = Self::build_replica_entry_info(slot, index, entry); + let entry_info = + Self::build_replica_entry_info(slot, index, entry, starting_transaction_index); for plugin in plugin_manager.plugins.iter() { if !plugin.entry_notifications_enabled() { continue; } - match plugin.notify_entry(ReplicaEntryInfoVersions::V0_0_1(&entry_info)) { + match plugin.notify_entry(ReplicaEntryInfoVersions::V0_0_2(&entry_info)) { Err(err) => { error!( "Failed to notify entry, error: ({}) to plugin {}", @@ -64,13 +71,15 @@ impl EntryNotifierImpl { slot: Slot, index: usize, entry: &'_ EntrySummary, - ) -> ReplicaEntryInfo<'_> { - ReplicaEntryInfo { + starting_transaction_index: usize, + ) -> ReplicaEntryInfoV2<'_> { + ReplicaEntryInfoV2 { slot, index, num_hashes: entry.num_hashes, hash: entry.hash.as_ref(), executed_transaction_count: entry.num_transactions, + starting_transaction_index, } } } diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 5218b55c4..77155adb0 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -1221,6 +1221,7 @@ fn confirm_slot_entries( slot, index: entry_index, entry: entry.into(), + starting_transaction_index: entry_tx_starting_index, }) { warn!( "Slot {}, entry {} entry_notification_sender send failed: {:?}", diff --git a/ledger/src/entry_notifier_interface.rs b/ledger/src/entry_notifier_interface.rs index 174be9e1b..dc605e0eb 100644 --- a/ledger/src/entry_notifier_interface.rs +++ b/ledger/src/entry_notifier_interface.rs @@ -1,7 +1,13 @@ use {solana_entry::entry::EntrySummary, solana_sdk::clock::Slot, std::sync::Arc}; pub trait EntryNotifier { - fn notify_entry(&self, slot: Slot, index: usize, entry: &EntrySummary); + fn notify_entry( + &self, + slot: Slot, + index: usize, + entry: &EntrySummary, + starting_transaction_index: usize, + ); } pub type EntryNotifierArc = Arc; diff --git a/ledger/src/entry_notifier_service.rs b/ledger/src/entry_notifier_service.rs index ec7eae0bc..6a249abf2 100644 --- a/ledger/src/entry_notifier_service.rs +++ b/ledger/src/entry_notifier_service.rs @@ -17,6 +17,7 @@ pub struct EntryNotification { pub slot: Slot, pub index: usize, pub entry: EntrySummary, + pub starting_transaction_index: usize, } pub type EntryNotifierSender = Sender; @@ -54,9 +55,13 @@ impl EntryNotifierService { entry_notification_receiver: &EntryNotifierReceiver, entry_notifier: EntryNotifierArc, ) -> Result<(), RecvTimeoutError> { - let EntryNotification { slot, index, entry } = - entry_notification_receiver.recv_timeout(Duration::from_secs(1))?; - entry_notifier.notify_entry(slot, index, &entry); + let EntryNotification { + slot, + index, + entry, + starting_transaction_index, + } = entry_notification_receiver.recv_timeout(Duration::from_secs(1))?; + entry_notifier.notify_entry(slot, index, &entry, starting_transaction_index); Ok(()) }