Cluster info geyzer notification firs impl

add cluster notifier registration and test

Add cluster node remove geyzer notification

rebase on master and correct PR comments

fix rebase, fmt, clippy
This commit is contained in:
musitdev 2023-12-11 13:53:19 +01:00 committed by godmodegalactus
parent 3863bb1bdf
commit 897e0bc14d
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
15 changed files with 1088 additions and 817 deletions

1575
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -659,13 +659,19 @@ impl Validator {
.as_ref()
.and_then(|geyser_plugin_service| geyser_plugin_service.get_block_metadata_notifier());
let cluster_info_notifier = geyser_plugin_service
.as_ref()
.and_then(|geyser_plugin_service| geyser_plugin_service.get_cluster_info_notifier());
info!(
"Geyser plugin: accounts_update_notifier: {}, \
transaction_notifier: {}, \
entry_notifier: {}",
entry_notifier: {} \
cluster_info_notifier: {}",
accounts_update_notifier.is_some(),
transaction_notifier.is_some(),
entry_notifier.is_some()
entry_notifier.is_some(),
cluster_info_notifier.is_some(),
);
let system_monitor_service = Some(SystemMonitorService::new(
@ -744,6 +750,9 @@ impl Validator {
identity_keypair.clone(),
socket_addr_space,
);
//register Geyzer notifier.
cluster_info.set_clusterinfo_notifier(cluster_info_notifier);
cluster_info.set_contact_debug_interval(config.contact_debug_interval);
cluster_info.set_entrypoints(cluster_entrypoints);
cluster_info.restore_contact_info(ledger_path, config.contact_save_interval);

View File

@ -11,6 +11,7 @@ edition = { workspace = true }
[dependencies]
log = { workspace = true }
solana-gossip = { workspace = true }
solana-sdk = { workspace = true }
solana-transaction-status = { workspace = true }
thiserror = { workspace = true }

View File

@ -2,9 +2,11 @@
/// the GeyserPlugin trait to work with the runtime.
/// In addition, the dynamic library must export a "C" function _create_plugin which
/// creates the implementation of the plugin.
use std::net::SocketAddr;
use {
solana_sdk::{
clock::{Slot, UnixTimestamp},
pubkey::Pubkey,
signature::Signature,
transaction::SanitizedTransaction,
},
@ -13,6 +15,37 @@ use {
thiserror::Error,
};
#[derive(Debug, Clone, PartialEq, Eq)]
#[repr(C)]
/// Information about a node in the cluster.
pub struct ReplicaClusterInfoNode {
pub id: Pubkey,
/// gossip address
pub gossip: Option<SocketAddr>,
/// address to connect to for replication
pub tvu: Option<SocketAddr>,
/// TVU over QUIC protocol.
pub tvu_quic: Option<SocketAddr>,
/// repair service over QUIC protocol.
pub serve_repair_quic: Option<SocketAddr>,
/// transactions address
pub tpu: Option<SocketAddr>,
/// address to forward unprocessed transactions to
pub tpu_forwards: Option<SocketAddr>,
/// address to which to send bank state requests
pub tpu_vote: Option<SocketAddr>,
/// address to which to send JSON-RPC requests
pub rpc: Option<SocketAddr>,
/// websocket for JSON-RPC push notifications
pub rpc_pubsub: Option<SocketAddr>,
/// address to send repair requests to
pub serve_repair: Option<SocketAddr>,
/// latest wallclock picked
pub wallclock: u64,
/// node shred version
pub shred_version: u16,
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[repr(C)]
/// Information about an account being updated
@ -380,6 +413,18 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug {
Ok(())
}
/// Called when a cluster info is updated on gossip network.
#[allow(unused_variables)]
fn update_cluster_info(&self, cluster_info: &ReplicaClusterInfoNode) -> Result<()> {
Ok(())
}
/// Called when a cluster info is removed on gossip network.
#[allow(unused_variables)]
fn notify_clusterinfo_remove(&self, pubkey: &Pubkey) -> Result<()> {
Ok(())
}
/// Called when all accounts are notified of during startup.
fn notify_end_of_startup(&self) -> Result<()> {
Ok(())
@ -438,4 +483,11 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug {
fn entry_notifications_enabled(&self) -> bool {
false
}
/// Check if the plugin is interested in cluster info data
/// Default is false -- if the plugin is interested in
/// cluster info data, return true.
fn clusterinfo_notifications_enabled(&self) -> bool {
false
}
}

View File

@ -19,7 +19,9 @@ libloading = { workspace = true }
log = { workspace = true }
serde_json = { workspace = true }
solana-accounts-db = { workspace = true }
solana-client = { workspace = true }
solana-entry = { workspace = true }
solana-gossip = { workspace = true }
solana-ledger = { workspace = true }
solana-measure = { workspace = true }
solana-metrics = { workspace = true }

View File

@ -0,0 +1,152 @@
// Module responsible for notifying plugins of transactions
use {
crate::geyser_plugin_manager::GeyserPluginManager,
agave_geyser_plugin_interface::geyser_plugin_interface::ReplicaClusterInfoNode,
log::*,
solana_client::connection_cache::Protocol,
solana_gossip::{
cluster_info_notifier_interface::ClusterInfoNotifierInterface,
legacy_contact_info::LegacyContactInfo,
},
solana_measure::measure::Measure,
solana_metrics::*,
solana_sdk::pubkey::Pubkey,
std::sync::{Arc, RwLock},
};
// This implementation of ClusterInfoNotifierImpl is passed to the rpc's TransactionStatusService
// at the validator startup. TransactionStatusService invokes the notify_transaction method
// for new transactions. The implementation in turn invokes the notify_transaction of each
// plugin enabled with transaction notification managed by the GeyserPluginManager.
#[derive(Debug)]
pub(crate) struct ClusterInfoNotifierImpl {
plugin_manager: Arc<RwLock<GeyserPluginManager>>,
}
impl ClusterInfoNotifierImpl {
pub fn new(plugin_manager: Arc<RwLock<GeyserPluginManager>>) -> Self {
ClusterInfoNotifierImpl { plugin_manager }
}
fn clusterinfo_from_legacy_contact_info(
legacy_info: &LegacyContactInfo,
) -> ReplicaClusterInfoNode {
ReplicaClusterInfoNode {
id: *legacy_info.pubkey(),
// gossip address
gossip: legacy_info.gossip().ok(),
// address to connect to for replication
tvu: legacy_info.tvu(Protocol::UDP).ok(),
// TVU over QUIC protocol.
tvu_quic: legacy_info.tvu(Protocol::QUIC).ok(),
// repair service over QUIC protocol.
serve_repair_quic: legacy_info.serve_repair(Protocol::QUIC).ok(),
// transactions address
tpu: legacy_info.tpu(Protocol::UDP).ok(),
// address to forward unprocessed transactions to
tpu_forwards: legacy_info.tpu_forwards(Protocol::UDP).ok(),
// address to which to send bank state requests
tpu_vote: legacy_info.tpu_vote().ok(),
// address to which to send JSON-RPC requests
rpc: legacy_info.rpc().ok(),
// websocket for JSON-RPC push notifications
rpc_pubsub: legacy_info.rpc_pubsub().ok(),
// address to send repair requests to
serve_repair: legacy_info.serve_repair(Protocol::UDP).ok(),
// latest wallclock picked
wallclock: legacy_info.wallclock(),
// node shred version
shred_version: legacy_info.shred_version(),
}
}
}
impl ClusterInfoNotifierInterface for ClusterInfoNotifierImpl {
fn notify_clusterinfo_update(&self, contact_info: &LegacyContactInfo) {
let cluster_info =
ClusterInfoNotifierImpl::clusterinfo_from_legacy_contact_info(contact_info);
let mut measure2 = Measure::start("geyser-plugin-notify_plugins_of_cluster_info_update");
let plugin_manager = self.plugin_manager.read().unwrap();
if plugin_manager.plugins.is_empty() {
return;
}
for plugin in plugin_manager.plugins.iter() {
let mut measure = Measure::start("geyser-plugin-update-cluster_info");
match plugin.update_cluster_info(&cluster_info) {
Err(err) => {
error!(
"Failed to update cluster_info {}, error: {} to plugin {}",
bs58::encode(cluster_info.id).into_string(),
err,
plugin.name()
)
}
Ok(_) => {
trace!(
"Successfully updated cluster_info {} to plugin {}",
bs58::encode(cluster_info.id).into_string(),
plugin.name()
);
}
}
measure.stop();
inc_new_counter_debug!(
"geyser-plugin-update-cluster_info-us",
measure.as_us() as usize,
100000,
100000
);
}
measure2.stop();
inc_new_counter_debug!(
"geyser-plugin-notify_plugins_of_cluster_info_update-us",
measure2.as_us() as usize,
100000,
100000
);
}
fn notify_clusterinfo_remove(&self, pubkey: &Pubkey) {
let mut measure2 = Measure::start("geyser-plugin-notify_plugins_of_cluster_info_update");
let plugin_manager = self.plugin_manager.read().unwrap();
if plugin_manager.plugins.is_empty() {
return;
}
for plugin in plugin_manager.plugins.iter() {
let mut measure = Measure::start("geyser-plugin-remove-cluster_info");
match plugin.notify_clusterinfo_remove(pubkey) {
Err(err) => {
error!(
"Failed to remove cluster_info {}, error: {} to plugin {}",
bs58::encode(pubkey).into_string(),
err,
plugin.name()
)
}
Ok(_) => {
trace!(
"Successfully remove cluster_info {} to plugin {}",
bs58::encode(pubkey).into_string(),
plugin.name()
);
}
}
measure.stop();
inc_new_counter_debug!(
"geyser-plugin-remove-cluster_info-us",
measure.as_us() as usize,
100000,
100000
);
}
measure2.stop();
inc_new_counter_debug!(
"geyser-plugin-notify_plugins_of_cluster_info_remove-us",
measure2.as_us() as usize,
100000,
100000
);
}
}

View File

@ -100,6 +100,15 @@ impl GeyserPluginManager {
false
}
/// Check if the plugin is interested in cluster info data
pub fn clusterinfo_notifications_enabled(&self) -> bool {
for plugin in &self.plugins {
if plugin.entry_notifications_enabled() {
return true;
}
}
false
}
/// Admin RPC request handler
pub(crate) fn list_plugins(&self) -> JsonRpcResult<Vec<String>> {
Ok(self.plugins.iter().map(|p| p.name().to_owned()).collect())

View File

@ -3,6 +3,7 @@ use {
accounts_update_notifier::AccountsUpdateNotifierImpl,
block_metadata_notifier::BlockMetadataNotifierImpl,
block_metadata_notifier_interface::BlockMetadataNotifierArc,
cluster_info_notifier::ClusterInfoNotifierImpl,
entry_notifier::EntryNotifierImpl,
geyser_plugin_manager::{GeyserPluginManager, GeyserPluginManagerRequest},
slot_status_notifier::SlotStatusNotifierImpl,
@ -12,6 +13,7 @@ use {
crossbeam_channel::Receiver,
log::*,
solana_accounts_db::accounts_update_notifier_interface::AccountsUpdateNotifier,
solana_gossip::cluster_info_notifier_interface::ClusterInfoUpdateNotifierLock,
solana_ledger::entry_notifier_interface::EntryNotifierArc,
solana_rpc::{
optimistically_confirmed_bank_tracker::SlotNotification,
@ -37,6 +39,7 @@ pub struct GeyserPluginService {
transaction_notifier: Option<TransactionNotifierArc>,
entry_notifier: Option<EntryNotifierArc>,
block_metadata_notifier: Option<BlockMetadataNotifierArc>,
cluster_info_notifier: Option<ClusterInfoUpdateNotifierLock>,
}
impl GeyserPluginService {
@ -81,8 +84,17 @@ impl GeyserPluginService {
plugin_manager.account_data_notifications_enabled();
let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled();
let entry_notifications_enabled = plugin_manager.entry_notifications_enabled();
let cluster_info_notifications_enabled = plugin_manager.clusterinfo_notifications_enabled();
let plugin_manager = Arc::new(RwLock::new(plugin_manager));
let cluster_info_notifier: Option<ClusterInfoUpdateNotifierLock> =
if cluster_info_notifications_enabled {
let cluster_info_notifier = ClusterInfoNotifierImpl::new(plugin_manager.clone());
Some(Arc::new(RwLock::new(cluster_info_notifier)))
} else {
None
};
let accounts_update_notifier: Option<AccountsUpdateNotifier> =
if account_data_notifications_enabled {
let accounts_update_notifier =
@ -143,6 +155,7 @@ impl GeyserPluginService {
transaction_notifier,
entry_notifier,
block_metadata_notifier,
cluster_info_notifier,
})
}
@ -172,6 +185,10 @@ impl GeyserPluginService {
self.block_metadata_notifier.clone()
}
pub fn get_cluster_info_notifier(&self) -> Option<ClusterInfoUpdateNotifierLock> {
self.cluster_info_notifier.clone()
}
pub fn join(self) -> thread::Result<()> {
if let Some(mut slot_status_observer) = self.slot_status_observer {
slot_status_observer.join()?;

View File

@ -1,6 +1,7 @@
pub mod accounts_update_notifier;
pub mod block_metadata_notifier;
pub mod block_metadata_notifier_interface;
pub mod cluster_info_notifier;
pub mod entry_notifier;
pub mod geyser_plugin_manager;
pub mod geyser_plugin_service;

View File

@ -18,6 +18,7 @@
note = "Please use `solana_net_utils::{MINIMUM_VALIDATOR_PORT_RANGE_WIDTH, VALIDATOR_PORT_RANGE}` instead"
)]
#[allow(deprecated)]
use crate::cluster_info_notifier_interface::ClusterInfoUpdateNotifierLock;
pub use solana_net_utils::{MINIMUM_VALIDATOR_PORT_RANGE_WIDTH, VALIDATOR_PORT_RANGE};
use {
crate::{
@ -430,6 +431,13 @@ impl ClusterInfo {
me
}
pub fn set_clusterinfo_notifier(
&self,
cluster_info_notifier: Option<ClusterInfoUpdateNotifierLock>,
) {
self.gossip.set_clusterinfo_notifier(cluster_info_notifier);
}
pub fn set_contact_debug_interval(&mut self, new: u64) {
self.contact_debug_interval = new;
}

View File

@ -0,0 +1,16 @@
use {
crate::legacy_contact_info::LegacyContactInfo,
solana_sdk::pubkey::Pubkey,
std::sync::{Arc, RwLock},
};
pub trait ClusterInfoNotifierInterface: std::fmt::Debug {
/// Notified when an cluster node is updated (added or changed).
fn notify_clusterinfo_update(&self, cluster_info: &LegacyContactInfo);
/// Notified when a node is removed from the cluster
fn notify_clusterinfo_remove(&self, pubkey: &Pubkey);
}
pub type ClusterInfoUpdateNotifierLock =
Arc<RwLock<dyn ClusterInfoNotifierInterface + Sync + Send>>;

View File

@ -26,6 +26,7 @@
use {
crate::{
cluster_info_notifier_interface::ClusterInfoUpdateNotifierLock,
crds_entry::CrdsEntry,
crds_gossip_pull::CrdsTimeouts,
crds_shards::CrdsShards,
@ -54,6 +55,8 @@ use {
},
};
mod geyser_plugin_utils;
const CRDS_SHARDS_BITS: u32 = 12;
// Number of vote slots to track in an lru-cache for metrics.
const VOTE_SLOTS_METRICS_CAP: usize = 100;
@ -86,6 +89,8 @@ pub struct Crds {
// Mapping from nodes' pubkeys to their respective shred-version.
shred_versions: HashMap<Pubkey, u16>,
stats: Mutex<CrdsStats>,
/// GeyserPlugin cluster info update notifier
clusterinfo_update_notifier: Option<ClusterInfoUpdateNotifierLock>,
}
#[derive(PartialEq, Eq, Debug)]
@ -174,6 +179,7 @@ impl Default for Crds {
purged: VecDeque::default(),
shred_versions: HashMap::default(),
stats: Mutex::<CrdsStats>::default(),
clusterinfo_update_notifier: None,
}
}
}
@ -204,6 +210,13 @@ fn overrides(value: &CrdsValue, other: &VersionedCrdsValue) -> bool {
}
impl Crds {
pub fn set_clusterinfo_notifier(
&mut self,
cluster_info_notifier: Option<ClusterInfoUpdateNotifierLock>,
) {
self.clusterinfo_update_notifier = cluster_info_notifier;
}
/// Returns true if the given value updates an existing one in the table.
/// The value is outdated and fails to insert, if it already exists in the
/// table with a more recent wallclock.
@ -221,9 +234,10 @@ impl Crds {
route: GossipRoute,
) -> Result<(), CrdsError> {
let label = value.label();
let gossip_label = label.clone();
let pubkey = value.pubkey();
let value = VersionedCrdsValue::new(value, self.cursor, now);
match self.table.entry(label) {
let ret = match self.table.entry(label) {
Entry::Vacant(entry) => {
self.stats.lock().unwrap().record_insert(&value, route);
let entry_index = entry.index();
@ -309,7 +323,10 @@ impl Crds {
Err(CrdsError::InsertFailed)
}
}
}
};
//notify geyser interface
self.notify_clusterinfo_update(self.table.get(&gossip_label));
ret
}
pub fn get<'a, 'b, V>(&'a self, key: V::Key) -> Option<V>
@ -533,6 +550,8 @@ impl Crds {
self.shards.remove(index, &value);
match value.value.data {
CrdsData::LegacyContactInfo(_) => {
//notify geyser interface
self.notify_clusterinfo_remove(&value.value.pubkey());
self.nodes.swap_remove(&index);
}
CrdsData::Vote(_, _) => {

View File

@ -0,0 +1,26 @@
use crate::{
crds::{Crds, Pubkey, VersionedCrdsValue},
crds_value::CrdsData,
};
impl Crds {
/// Notified when a node cluster info is updated by gossip network.
pub fn notify_clusterinfo_update(&self, crd_value: Option<&VersionedCrdsValue>) {
if let Some(clusterinfo_update_notifier) = &self.clusterinfo_update_notifier {
if let Some(value) = crd_value {
if let CrdsData::LegacyContactInfo(ref cluster_info) = value.value.data {
let notifier = &clusterinfo_update_notifier.read().unwrap();
notifier.notify_clusterinfo_update(cluster_info);
}
}
}
}
/// Notified when the node is removed from the gossip network.
pub fn notify_clusterinfo_remove(&self, pubkey: &Pubkey) {
if let Some(clusterinfo_update_notifier) = &self.clusterinfo_update_notifier {
let notifier = &clusterinfo_update_notifier.read().unwrap();
notifier.notify_clusterinfo_remove(pubkey);
}
}
}

View File

@ -8,6 +8,7 @@ use {
crate::{
cluster_info::Ping,
cluster_info_metrics::GossipStats,
cluster_info_notifier_interface::ClusterInfoUpdateNotifierLock,
crds::{Crds, GossipRoute},
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, CrdsGossipPull, CrdsTimeouts, ProcessPullStats},
@ -45,6 +46,14 @@ pub struct CrdsGossip {
}
impl CrdsGossip {
pub fn set_clusterinfo_notifier(
&self,
cluster_info_notifier: Option<ClusterInfoUpdateNotifierLock>,
) {
let mut crds = self.crds.write().unwrap();
crds.set_clusterinfo_notifier(cluster_info_notifier);
}
/// Process a push message to the network.
///
/// Returns unique origins' pubkeys of upserted values.

View File

@ -3,6 +3,7 @@
pub mod cluster_info;
pub mod cluster_info_metrics;
pub mod cluster_info_notifier_interface;
pub mod contact_info;
pub mod crds;
pub mod crds_entry;