Plumb Entry notification into plugin manager (#30910)
Plumb entry notification into plugin manager
This commit is contained in:
parent
1f420f2b03
commit
932301583c
|
@ -5638,6 +5638,7 @@ dependencies = [
|
||||||
"libloading",
|
"libloading",
|
||||||
"log",
|
"log",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
"solana-entry",
|
||||||
"solana-geyser-plugin-interface",
|
"solana-geyser-plugin-interface",
|
||||||
"solana-measure",
|
"solana-measure",
|
||||||
"solana-metrics",
|
"solana-metrics",
|
||||||
|
|
|
@ -19,6 +19,7 @@ jsonrpc-server-utils = { workspace = true }
|
||||||
libloading = { workspace = true }
|
libloading = { workspace = true }
|
||||||
log = { workspace = true }
|
log = { workspace = true }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
|
solana-entry = { workspace = true }
|
||||||
solana-geyser-plugin-interface = { workspace = true }
|
solana-geyser-plugin-interface = { workspace = true }
|
||||||
solana-measure = { workspace = true }
|
solana-measure = { workspace = true }
|
||||||
solana-metrics = { workspace = true }
|
solana-metrics = { workspace = true }
|
||||||
|
|
|
@ -0,0 +1,76 @@
|
||||||
|
/// Module responsible for notifying plugins about entries
|
||||||
|
use {
|
||||||
|
crate::geyser_plugin_manager::GeyserPluginManager,
|
||||||
|
log::*,
|
||||||
|
solana_entry::entry::Entry,
|
||||||
|
solana_geyser_plugin_interface::geyser_plugin_interface::{
|
||||||
|
ReplicaEntryInfo, ReplicaEntryInfoVersions,
|
||||||
|
},
|
||||||
|
solana_measure::measure::Measure,
|
||||||
|
solana_metrics::*,
|
||||||
|
solana_rpc::entry_notifier_interface::EntryNotifier,
|
||||||
|
solana_sdk::clock::Slot,
|
||||||
|
std::sync::{Arc, RwLock},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub(crate) struct EntryNotifierImpl {
|
||||||
|
plugin_manager: Arc<RwLock<GeyserPluginManager>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EntryNotifier for EntryNotifierImpl {
|
||||||
|
fn notify_entry<'a>(&'a self, slot: Slot, index: usize, entry: &'a Entry) {
|
||||||
|
let mut measure = Measure::start("geyser-plugin-notify_plugins_of_entry_info");
|
||||||
|
|
||||||
|
let plugin_manager = self.plugin_manager.read().unwrap();
|
||||||
|
if plugin_manager.plugins.is_empty() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let entry_info = Self::build_replica_entry_info(slot, index, entry);
|
||||||
|
|
||||||
|
for plugin in plugin_manager.plugins.iter() {
|
||||||
|
if !plugin.entry_notifications_enabled() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
match plugin.notify_entry(ReplicaEntryInfoVersions::V0_0_1(&entry_info)) {
|
||||||
|
Err(err) => {
|
||||||
|
error!(
|
||||||
|
"Failed to notify entry, error: ({}) to plugin {}",
|
||||||
|
err,
|
||||||
|
plugin.name()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
Ok(_) => {
|
||||||
|
trace!("Successfully notified entry to plugin {}", plugin.name());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
measure.stop();
|
||||||
|
inc_new_counter_debug!(
|
||||||
|
"geyser-plugin-notify_plugins_of_entry_info-us",
|
||||||
|
measure.as_us() as usize,
|
||||||
|
10000,
|
||||||
|
10000
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EntryNotifierImpl {
|
||||||
|
pub fn new(plugin_manager: Arc<RwLock<GeyserPluginManager>>) -> Self {
|
||||||
|
Self { plugin_manager }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_replica_entry_info(
|
||||||
|
slot: Slot,
|
||||||
|
index: usize,
|
||||||
|
entry: &'_ Entry,
|
||||||
|
) -> ReplicaEntryInfo<'_> {
|
||||||
|
ReplicaEntryInfo {
|
||||||
|
slot,
|
||||||
|
index,
|
||||||
|
num_hashes: entry.num_hashes,
|
||||||
|
hash: entry.hash.as_ref(),
|
||||||
|
executed_transaction_count: entry.transactions.len() as u64,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -54,6 +54,16 @@ impl GeyserPluginManager {
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check if there is any plugin interested in entry data
|
||||||
|
pub fn entry_notifications_enabled(&self) -> bool {
|
||||||
|
for plugin in &self.plugins {
|
||||||
|
if plugin.entry_notifications_enabled() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
/// Admin RPC request handler
|
/// Admin RPC request handler
|
||||||
pub(crate) fn list_plugins(&self) -> JsonRpcResult<Vec<String>> {
|
pub(crate) fn list_plugins(&self) -> JsonRpcResult<Vec<String>> {
|
||||||
Ok(self.plugins.iter().map(|p| p.name().to_owned()).collect())
|
Ok(self.plugins.iter().map(|p| p.name().to_owned()).collect())
|
||||||
|
|
|
@ -3,6 +3,7 @@ use {
|
||||||
accounts_update_notifier::AccountsUpdateNotifierImpl,
|
accounts_update_notifier::AccountsUpdateNotifierImpl,
|
||||||
block_metadata_notifier::BlockMetadataNotifierImpl,
|
block_metadata_notifier::BlockMetadataNotifierImpl,
|
||||||
block_metadata_notifier_interface::BlockMetadataNotifierLock,
|
block_metadata_notifier_interface::BlockMetadataNotifierLock,
|
||||||
|
entry_notifier::EntryNotifierImpl,
|
||||||
geyser_plugin_manager::{GeyserPluginManager, GeyserPluginManagerRequest},
|
geyser_plugin_manager::{GeyserPluginManager, GeyserPluginManagerRequest},
|
||||||
slot_status_notifier::SlotStatusNotifierImpl,
|
slot_status_notifier::SlotStatusNotifierImpl,
|
||||||
slot_status_observer::SlotStatusObserver,
|
slot_status_observer::SlotStatusObserver,
|
||||||
|
@ -11,6 +12,7 @@ use {
|
||||||
crossbeam_channel::Receiver,
|
crossbeam_channel::Receiver,
|
||||||
log::*,
|
log::*,
|
||||||
solana_rpc::{
|
solana_rpc::{
|
||||||
|
entry_notifier_interface::EntryNotifierLock,
|
||||||
optimistically_confirmed_bank_tracker::BankNotification,
|
optimistically_confirmed_bank_tracker::BankNotification,
|
||||||
transaction_notifier_interface::TransactionNotifierLock,
|
transaction_notifier_interface::TransactionNotifierLock,
|
||||||
},
|
},
|
||||||
|
@ -30,6 +32,7 @@ pub struct GeyserPluginService {
|
||||||
plugin_manager: Arc<RwLock<GeyserPluginManager>>,
|
plugin_manager: Arc<RwLock<GeyserPluginManager>>,
|
||||||
accounts_update_notifier: Option<AccountsUpdateNotifier>,
|
accounts_update_notifier: Option<AccountsUpdateNotifier>,
|
||||||
transaction_notifier: Option<TransactionNotifierLock>,
|
transaction_notifier: Option<TransactionNotifierLock>,
|
||||||
|
entry_notifier: Option<EntryNotifierLock>,
|
||||||
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
|
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,6 +77,7 @@ impl GeyserPluginService {
|
||||||
let account_data_notifications_enabled =
|
let account_data_notifications_enabled =
|
||||||
plugin_manager.account_data_notifications_enabled();
|
plugin_manager.account_data_notifications_enabled();
|
||||||
let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled();
|
let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled();
|
||||||
|
let entry_notifications_enabled = plugin_manager.entry_notifications_enabled();
|
||||||
let plugin_manager = Arc::new(RwLock::new(plugin_manager));
|
let plugin_manager = Arc::new(RwLock::new(plugin_manager));
|
||||||
|
|
||||||
let accounts_update_notifier: Option<AccountsUpdateNotifier> =
|
let accounts_update_notifier: Option<AccountsUpdateNotifier> =
|
||||||
|
@ -93,10 +97,20 @@ impl GeyserPluginService {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let entry_notifier: Option<EntryNotifierLock> = if entry_notifications_enabled {
|
||||||
|
let entry_notifier = EntryNotifierImpl::new(plugin_manager.clone());
|
||||||
|
Some(Arc::new(RwLock::new(entry_notifier)))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
let (slot_status_observer, block_metadata_notifier): (
|
let (slot_status_observer, block_metadata_notifier): (
|
||||||
Option<SlotStatusObserver>,
|
Option<SlotStatusObserver>,
|
||||||
Option<BlockMetadataNotifierLock>,
|
Option<BlockMetadataNotifierLock>,
|
||||||
) = if account_data_notifications_enabled || transaction_notifications_enabled {
|
) = if account_data_notifications_enabled
|
||||||
|
|| transaction_notifications_enabled
|
||||||
|
|| entry_notifications_enabled
|
||||||
|
{
|
||||||
let slot_status_notifier = SlotStatusNotifierImpl::new(plugin_manager.clone());
|
let slot_status_notifier = SlotStatusNotifierImpl::new(plugin_manager.clone());
|
||||||
let slot_status_notifier = Arc::new(RwLock::new(slot_status_notifier));
|
let slot_status_notifier = Arc::new(RwLock::new(slot_status_notifier));
|
||||||
(
|
(
|
||||||
|
@ -124,6 +138,7 @@ impl GeyserPluginService {
|
||||||
plugin_manager,
|
plugin_manager,
|
||||||
accounts_update_notifier,
|
accounts_update_notifier,
|
||||||
transaction_notifier,
|
transaction_notifier,
|
||||||
|
entry_notifier,
|
||||||
block_metadata_notifier,
|
block_metadata_notifier,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -146,6 +161,10 @@ impl GeyserPluginService {
|
||||||
self.transaction_notifier.clone()
|
self.transaction_notifier.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_entry_notifier(&self) -> Option<EntryNotifierLock> {
|
||||||
|
self.entry_notifier.clone()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_block_metadata_notifier(&self) -> Option<BlockMetadataNotifierLock> {
|
pub fn get_block_metadata_notifier(&self) -> Option<BlockMetadataNotifierLock> {
|
||||||
self.block_metadata_notifier.clone()
|
self.block_metadata_notifier.clone()
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
pub mod accounts_update_notifier;
|
pub mod accounts_update_notifier;
|
||||||
pub mod block_metadata_notifier;
|
pub mod block_metadata_notifier;
|
||||||
pub mod block_metadata_notifier_interface;
|
pub mod block_metadata_notifier_interface;
|
||||||
|
pub mod entry_notifier;
|
||||||
pub mod geyser_plugin_manager;
|
pub mod geyser_plugin_manager;
|
||||||
pub mod geyser_plugin_service;
|
pub mod geyser_plugin_service;
|
||||||
pub mod slot_status_notifier;
|
pub mod slot_status_notifier;
|
||||||
|
|
|
@ -4838,6 +4838,7 @@ dependencies = [
|
||||||
"libloading",
|
"libloading",
|
||||||
"log",
|
"log",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
"solana-entry",
|
||||||
"solana-geyser-plugin-interface",
|
"solana-geyser-plugin-interface",
|
||||||
"solana-measure",
|
"solana-measure",
|
||||||
"solana-metrics",
|
"solana-metrics",
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
use {
|
||||||
|
solana_entry::entry::Entry,
|
||||||
|
solana_sdk::clock::Slot,
|
||||||
|
std::sync::{Arc, RwLock},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub trait EntryNotifier {
|
||||||
|
fn notify_entry(&self, slot: Slot, index: usize, entry: &Entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type EntryNotifierLock = Arc<RwLock<dyn EntryNotifier + Sync + Send>>;
|
|
@ -1,5 +1,6 @@
|
||||||
#![allow(clippy::integer_arithmetic)]
|
#![allow(clippy::integer_arithmetic)]
|
||||||
mod cluster_tpu_info;
|
mod cluster_tpu_info;
|
||||||
|
pub mod entry_notifier_interface;
|
||||||
pub mod max_slots;
|
pub mod max_slots;
|
||||||
pub mod optimistically_confirmed_bank_tracker;
|
pub mod optimistically_confirmed_bank_tracker;
|
||||||
pub mod parsed_token_accounts;
|
pub mod parsed_token_accounts;
|
||||||
|
|
Loading…
Reference in New Issue