Geyser: add starting entry to ReplicaEntryInfo(V2) (#33963)

* Add ReplicaEntryInfoV2

* Add starting_transaction_index field to EntryNotification

* Populate starting_transaction_index in replay stage

* Cache and populate starting_transaction_index in banking stage

* Build ReplicaEntryInfoV2
This commit is contained in:
Tyera 2023-11-14 09:49:26 -07:00 committed by GitHub
parent ad65c82d6d
commit 0e91e96967
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 56 additions and 10 deletions

View File

@ -29,6 +29,7 @@ impl TpuEntryNotifier {
.spawn(move || {
let mut current_slot = 0;
let mut current_index = 0;
let mut current_transaction_index = 0;
loop {
if exit.load(Ordering::Relaxed) {
break;
@ -41,6 +42,7 @@ impl TpuEntryNotifier {
&broadcast_entry_sender,
&mut current_slot,
&mut current_index,
&mut current_transaction_index,
) {
break;
}
@ -57,11 +59,13 @@ impl TpuEntryNotifier {
broadcast_entry_sender: &Sender<WorkingBankEntry>,
current_slot: &mut u64,
current_index: &mut usize,
current_transaction_index: &mut usize,
) -> Result<(), RecvTimeoutError> {
let (bank, (entry, tick_height)) = entry_receiver.recv_timeout(Duration::from_secs(1))?;
let slot = bank.slot();
let index = if slot != *current_slot {
*current_index = 0;
*current_transaction_index = 0;
*current_slot = slot;
0
} else {
@ -78,11 +82,13 @@ impl TpuEntryNotifier {
slot,
index,
entry: entry_summary,
starting_transaction_index: *current_transaction_index,
}) {
warn!(
"Failed to send slot {slot:?} entry {index:?} from Tpu to EntryNotifierService, error {err:?}",
);
}
*current_transaction_index += entry.transactions.len();
if let Err(err) = broadcast_entry_sender.send((bank, (entry, tick_height))) {
warn!(

View File

@ -185,12 +185,31 @@ pub struct ReplicaEntryInfo<'a> {
pub executed_transaction_count: u64,
}
#[derive(Clone, Debug)]
#[repr(C)]
pub struct ReplicaEntryInfoV2<'a> {
/// The slot number of the block containing this Entry
pub slot: Slot,
/// The Entry's index in the block
pub index: usize,
/// The number of hashes since the previous Entry
pub num_hashes: u64,
/// The Entry's SHA-256 hash, generated from the previous Entry's hash with
/// `solana_entry::entry::next_hash()`
pub hash: &'a [u8],
/// The number of executed transactions in the Entry
pub executed_transaction_count: u64,
/// The index-in-block of the first executed transaction in this Entry
pub starting_transaction_index: usize,
}
/// A wrapper to future-proof ReplicaEntryInfo handling. To make a change to the structure of
/// ReplicaEntryInfo, add an new enum variant wrapping a newer version, which will force plugin
/// implementations to handle the change.
#[repr(u32)]
pub enum ReplicaEntryInfoVersions<'a> {
V0_0_1(&'a ReplicaEntryInfo<'a>),
V0_0_2(&'a ReplicaEntryInfoV2<'a>),
}
#[derive(Clone, Debug)]

View File

@ -4,7 +4,7 @@ use {
log::*,
solana_entry::entry::EntrySummary,
solana_geyser_plugin_interface::geyser_plugin_interface::{
ReplicaEntryInfo, ReplicaEntryInfoVersions,
ReplicaEntryInfoV2, ReplicaEntryInfoVersions,
},
solana_ledger::entry_notifier_interface::EntryNotifier,
solana_measure::measure::Measure,
@ -18,7 +18,13 @@ pub(crate) struct EntryNotifierImpl {
}
impl EntryNotifier for EntryNotifierImpl {
fn notify_entry<'a>(&'a self, slot: Slot, index: usize, entry: &'a EntrySummary) {
fn notify_entry<'a>(
&'a self,
slot: Slot,
index: usize,
entry: &'a EntrySummary,
starting_transaction_index: usize,
) {
let mut measure = Measure::start("geyser-plugin-notify_plugins_of_entry_info");
let plugin_manager = self.plugin_manager.read().unwrap();
@ -26,13 +32,14 @@ impl EntryNotifier for EntryNotifierImpl {
return;
}
let entry_info = Self::build_replica_entry_info(slot, index, entry);
let entry_info =
Self::build_replica_entry_info(slot, index, entry, starting_transaction_index);
for plugin in plugin_manager.plugins.iter() {
if !plugin.entry_notifications_enabled() {
continue;
}
match plugin.notify_entry(ReplicaEntryInfoVersions::V0_0_1(&entry_info)) {
match plugin.notify_entry(ReplicaEntryInfoVersions::V0_0_2(&entry_info)) {
Err(err) => {
error!(
"Failed to notify entry, error: ({}) to plugin {}",
@ -64,13 +71,15 @@ impl EntryNotifierImpl {
slot: Slot,
index: usize,
entry: &'_ EntrySummary,
) -> ReplicaEntryInfo<'_> {
ReplicaEntryInfo {
starting_transaction_index: usize,
) -> ReplicaEntryInfoV2<'_> {
ReplicaEntryInfoV2 {
slot,
index,
num_hashes: entry.num_hashes,
hash: entry.hash.as_ref(),
executed_transaction_count: entry.num_transactions,
starting_transaction_index,
}
}
}

View File

@ -1221,6 +1221,7 @@ fn confirm_slot_entries(
slot,
index: entry_index,
entry: entry.into(),
starting_transaction_index: entry_tx_starting_index,
}) {
warn!(
"Slot {}, entry {} entry_notification_sender send failed: {:?}",

View File

@ -1,7 +1,13 @@
use {solana_entry::entry::EntrySummary, solana_sdk::clock::Slot, std::sync::Arc};
pub trait EntryNotifier {
fn notify_entry(&self, slot: Slot, index: usize, entry: &EntrySummary);
fn notify_entry(
&self,
slot: Slot,
index: usize,
entry: &EntrySummary,
starting_transaction_index: usize,
);
}
pub type EntryNotifierArc = Arc<dyn EntryNotifier + Sync + Send>;

View File

@ -17,6 +17,7 @@ pub struct EntryNotification {
pub slot: Slot,
pub index: usize,
pub entry: EntrySummary,
pub starting_transaction_index: usize,
}
pub type EntryNotifierSender = Sender<EntryNotification>;
@ -54,9 +55,13 @@ impl EntryNotifierService {
entry_notification_receiver: &EntryNotifierReceiver,
entry_notifier: EntryNotifierArc,
) -> Result<(), RecvTimeoutError> {
let EntryNotification { slot, index, entry } =
entry_notification_receiver.recv_timeout(Duration::from_secs(1))?;
entry_notifier.notify_entry(slot, index, &entry);
let EntryNotification {
slot,
index,
entry,
starting_transaction_index,
} = entry_notification_receiver.recv_timeout(Duration::from_secs(1))?;
entry_notifier.notify_entry(slot, index, &entry, starting_transaction_index);
Ok(())
}