diff --git a/Cargo.lock b/Cargo.lock index edc9ef28b..5085b17bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5638,6 +5638,7 @@ dependencies = [ "libloading", "log", "serde_json", + "solana-entry", "solana-geyser-plugin-interface", "solana-measure", "solana-metrics", diff --git a/geyser-plugin-manager/Cargo.toml b/geyser-plugin-manager/Cargo.toml index 0d7f778a4..34dbe998d 100644 --- a/geyser-plugin-manager/Cargo.toml +++ b/geyser-plugin-manager/Cargo.toml @@ -19,6 +19,7 @@ jsonrpc-server-utils = { workspace = true } libloading = { workspace = true } log = { workspace = true } serde_json = { workspace = true } +solana-entry = { workspace = true } solana-geyser-plugin-interface = { workspace = true } solana-measure = { workspace = true } solana-metrics = { workspace = true } diff --git a/geyser-plugin-manager/src/entry_notifier.rs b/geyser-plugin-manager/src/entry_notifier.rs new file mode 100644 index 000000000..13598b55a --- /dev/null +++ b/geyser-plugin-manager/src/entry_notifier.rs @@ -0,0 +1,76 @@ +/// Module responsible for notifying plugins about entries +use { + crate::geyser_plugin_manager::GeyserPluginManager, + log::*, + solana_entry::entry::Entry, + solana_geyser_plugin_interface::geyser_plugin_interface::{ + ReplicaEntryInfo, ReplicaEntryInfoVersions, + }, + solana_measure::measure::Measure, + solana_metrics::*, + solana_rpc::entry_notifier_interface::EntryNotifier, + solana_sdk::clock::Slot, + std::sync::{Arc, RwLock}, +}; + +pub(crate) struct EntryNotifierImpl { + plugin_manager: Arc>, +} + +impl EntryNotifier for EntryNotifierImpl { + fn notify_entry<'a>(&'a self, slot: Slot, index: usize, entry: &'a Entry) { + let mut measure = Measure::start("geyser-plugin-notify_plugins_of_entry_info"); + + let plugin_manager = self.plugin_manager.read().unwrap(); + if plugin_manager.plugins.is_empty() { + return; + } + + let entry_info = Self::build_replica_entry_info(slot, index, entry); + + for plugin in plugin_manager.plugins.iter() { + if !plugin.entry_notifications_enabled() { + continue; + } + match plugin.notify_entry(ReplicaEntryInfoVersions::V0_0_1(&entry_info)) { + Err(err) => { + error!( + "Failed to notify entry, error: ({}) to plugin {}", + err, + plugin.name() + ) + } + Ok(_) => { + trace!("Successfully notified entry to plugin {}", plugin.name()); + } + } + } + measure.stop(); + inc_new_counter_debug!( + "geyser-plugin-notify_plugins_of_entry_info-us", + measure.as_us() as usize, + 10000, + 10000 + ); + } +} + +impl EntryNotifierImpl { + pub fn new(plugin_manager: Arc>) -> Self { + Self { plugin_manager } + } + + fn build_replica_entry_info( + slot: Slot, + index: usize, + entry: &'_ Entry, + ) -> ReplicaEntryInfo<'_> { + ReplicaEntryInfo { + slot, + index, + num_hashes: entry.num_hashes, + hash: entry.hash.as_ref(), + executed_transaction_count: entry.transactions.len() as u64, + } + } +} diff --git a/geyser-plugin-manager/src/geyser_plugin_manager.rs b/geyser-plugin-manager/src/geyser_plugin_manager.rs index 3eb14f823..1e39d3df7 100644 --- a/geyser-plugin-manager/src/geyser_plugin_manager.rs +++ b/geyser-plugin-manager/src/geyser_plugin_manager.rs @@ -54,6 +54,16 @@ impl GeyserPluginManager { false } + /// Check if there is any plugin interested in entry data + pub fn entry_notifications_enabled(&self) -> bool { + for plugin in &self.plugins { + if plugin.entry_notifications_enabled() { + return true; + } + } + false + } + /// Admin RPC request handler pub(crate) fn list_plugins(&self) -> JsonRpcResult> { Ok(self.plugins.iter().map(|p| p.name().to_owned()).collect()) diff --git a/geyser-plugin-manager/src/geyser_plugin_service.rs b/geyser-plugin-manager/src/geyser_plugin_service.rs index 6454d71a9..808f52c07 100644 --- a/geyser-plugin-manager/src/geyser_plugin_service.rs +++ b/geyser-plugin-manager/src/geyser_plugin_service.rs @@ -3,6 +3,7 @@ use { accounts_update_notifier::AccountsUpdateNotifierImpl, block_metadata_notifier::BlockMetadataNotifierImpl, block_metadata_notifier_interface::BlockMetadataNotifierLock, + entry_notifier::EntryNotifierImpl, geyser_plugin_manager::{GeyserPluginManager, GeyserPluginManagerRequest}, slot_status_notifier::SlotStatusNotifierImpl, slot_status_observer::SlotStatusObserver, @@ -11,6 +12,7 @@ use { crossbeam_channel::Receiver, log::*, solana_rpc::{ + entry_notifier_interface::EntryNotifierLock, optimistically_confirmed_bank_tracker::BankNotification, transaction_notifier_interface::TransactionNotifierLock, }, @@ -30,6 +32,7 @@ pub struct GeyserPluginService { plugin_manager: Arc>, accounts_update_notifier: Option, transaction_notifier: Option, + entry_notifier: Option, block_metadata_notifier: Option, } @@ -74,6 +77,7 @@ impl GeyserPluginService { let account_data_notifications_enabled = plugin_manager.account_data_notifications_enabled(); let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled(); + let entry_notifications_enabled = plugin_manager.entry_notifications_enabled(); let plugin_manager = Arc::new(RwLock::new(plugin_manager)); let accounts_update_notifier: Option = @@ -93,10 +97,20 @@ impl GeyserPluginService { None }; + let entry_notifier: Option = if entry_notifications_enabled { + let entry_notifier = EntryNotifierImpl::new(plugin_manager.clone()); + Some(Arc::new(RwLock::new(entry_notifier))) + } else { + None + }; + let (slot_status_observer, block_metadata_notifier): ( Option, Option, - ) = if account_data_notifications_enabled || transaction_notifications_enabled { + ) = if account_data_notifications_enabled + || transaction_notifications_enabled + || entry_notifications_enabled + { let slot_status_notifier = SlotStatusNotifierImpl::new(plugin_manager.clone()); let slot_status_notifier = Arc::new(RwLock::new(slot_status_notifier)); ( @@ -124,6 +138,7 @@ impl GeyserPluginService { plugin_manager, accounts_update_notifier, transaction_notifier, + entry_notifier, block_metadata_notifier, }) } @@ -146,6 +161,10 @@ impl GeyserPluginService { self.transaction_notifier.clone() } + pub fn get_entry_notifier(&self) -> Option { + self.entry_notifier.clone() + } + pub fn get_block_metadata_notifier(&self) -> Option { self.block_metadata_notifier.clone() } diff --git a/geyser-plugin-manager/src/lib.rs b/geyser-plugin-manager/src/lib.rs index 391e0b486..2dd1a8f5d 100644 --- a/geyser-plugin-manager/src/lib.rs +++ b/geyser-plugin-manager/src/lib.rs @@ -1,6 +1,7 @@ pub mod accounts_update_notifier; pub mod block_metadata_notifier; pub mod block_metadata_notifier_interface; +pub mod entry_notifier; pub mod geyser_plugin_manager; pub mod geyser_plugin_service; pub mod slot_status_notifier; diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 246bffede..a5457d5c9 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -4838,6 +4838,7 @@ dependencies = [ "libloading", "log", "serde_json", + "solana-entry", "solana-geyser-plugin-interface", "solana-measure", "solana-metrics", diff --git a/rpc/src/entry_notifier_interface.rs b/rpc/src/entry_notifier_interface.rs new file mode 100644 index 000000000..2b6ba7235 --- /dev/null +++ b/rpc/src/entry_notifier_interface.rs @@ -0,0 +1,11 @@ +use { + solana_entry::entry::Entry, + solana_sdk::clock::Slot, + std::sync::{Arc, RwLock}, +}; + +pub trait EntryNotifier { + fn notify_entry(&self, slot: Slot, index: usize, entry: &Entry); +} + +pub type EntryNotifierLock = Arc>; diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index d021155e8..769397658 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -1,5 +1,6 @@ #![allow(clippy::integer_arithmetic)] mod cluster_tpu_info; +pub mod entry_notifier_interface; pub mod max_slots; pub mod optimistically_confirmed_bank_tracker; pub mod parsed_token_accounts;