Geyser Runtime Reload (#30352)

Support dynamic geyser plugin load, unload, and listing through admin RPC.
This commit is contained in:
cavemanloverboy 2023-03-16 17:03:00 -07:00 committed by GitHub
parent 9d792c1848
commit 10f49d4e26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 898 additions and 159 deletions

7
Cargo.lock generated
View File

@ -5634,6 +5634,8 @@ dependencies = [
"bs58", "bs58",
"crossbeam-channel", "crossbeam-channel",
"json5", "json5",
"jsonrpc-core",
"jsonrpc-server-utils",
"libloading", "libloading",
"log", "log",
"serde_json", "serde_json",
@ -6831,12 +6833,14 @@ version = "1.16.0"
dependencies = [ dependencies = [
"base64 0.13.0", "base64 0.13.0",
"bincode", "bincode",
"crossbeam-channel",
"log", "log",
"serde_derive", "serde_derive",
"serde_json", "serde_json",
"solana-cli-output", "solana-cli-output",
"solana-client", "solana-client",
"solana-core", "solana-core",
"solana-geyser-plugin-manager",
"solana-gossip", "solana-gossip",
"solana-ledger", "solana-ledger",
"solana-logger 1.16.0", "solana-logger 1.16.0",
@ -7017,6 +7021,7 @@ dependencies = [
"jsonrpc-server-utils", "jsonrpc-server-utils",
"lazy_static", "lazy_static",
"libc", "libc",
"libloading",
"log", "log",
"num_cpus", "num_cpus",
"rand 0.7.3", "rand 0.7.3",
@ -7033,6 +7038,8 @@ dependencies = [
"solana-entry", "solana-entry",
"solana-faucet", "solana-faucet",
"solana-genesis-utils", "solana-genesis-utils",
"solana-geyser-plugin-interface",
"solana-geyser-plugin-manager",
"solana-gossip", "solana-gossip",
"solana-ledger", "solana-ledger",
"solana-logger 1.16.0", "solana-logger 1.16.0",

View File

@ -31,7 +31,9 @@ use {
rand::{thread_rng, Rng}, rand::{thread_rng, Rng},
solana_client::connection_cache::ConnectionCache, solana_client::connection_cache::ConnectionCache,
solana_entry::poh::compute_hash_time_ns, solana_entry::poh::compute_hash_time_ns,
solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService, solana_geyser_plugin_manager::{
geyser_plugin_service::GeyserPluginService, GeyserPluginManagerRequest,
},
solana_gossip::{ solana_gossip::{
cluster_info::{ cluster_info::{
ClusterInfo, Node, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS, ClusterInfo, Node, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
@ -128,7 +130,8 @@ pub struct ValidatorConfig {
pub account_paths: Vec<PathBuf>, pub account_paths: Vec<PathBuf>,
pub account_shrink_paths: Option<Vec<PathBuf>>, pub account_shrink_paths: Option<Vec<PathBuf>>,
pub rpc_config: JsonRpcConfig, pub rpc_config: JsonRpcConfig,
pub geyser_plugin_config_files: Option<Vec<PathBuf>>, /// Specifies which plugins to start up with
pub on_start_geyser_plugin_config_files: Option<Vec<PathBuf>>,
pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub) pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub)
pub pubsub_config: PubSubConfig, pub pubsub_config: PubSubConfig,
pub snapshot_config: SnapshotConfig, pub snapshot_config: SnapshotConfig,
@ -192,7 +195,7 @@ impl Default for ValidatorConfig {
account_paths: Vec::new(), account_paths: Vec::new(),
account_shrink_paths: None, account_shrink_paths: None,
rpc_config: JsonRpcConfig::default(), rpc_config: JsonRpcConfig::default(),
geyser_plugin_config_files: None, on_start_geyser_plugin_config_files: None,
rpc_addrs: None, rpc_addrs: None,
pubsub_config: PubSubConfig::default(), pubsub_config: PubSubConfig::default(),
snapshot_config: SnapshotConfig::new_load_only(), snapshot_config: SnapshotConfig::new_load_only(),
@ -392,6 +395,7 @@ impl Validator {
cluster_entrypoints: Vec<ContactInfo>, cluster_entrypoints: Vec<ContactInfo>,
config: &ValidatorConfig, config: &ValidatorConfig,
should_check_duplicate_instance: bool, should_check_duplicate_instance: bool,
rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
start_progress: Arc<RwLock<ValidatorStartProgress>>, start_progress: Arc<RwLock<ValidatorStartProgress>>,
socket_addr_space: SocketAddrSpace, socket_addr_space: SocketAddrSpace,
use_quic: bool, use_quic: bool,
@ -413,12 +417,19 @@ impl Validator {
let mut bank_notification_senders = Vec::new(); let mut bank_notification_senders = Vec::new();
let exit = Arc::new(AtomicBool::new(false));
let geyser_plugin_service = let geyser_plugin_service =
if let Some(geyser_plugin_config_files) = &config.geyser_plugin_config_files { if let Some(geyser_plugin_config_files) = &config.on_start_geyser_plugin_config_files {
let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded(); let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
bank_notification_senders.push(confirmed_bank_sender); bank_notification_senders.push(confirmed_bank_sender);
let result = let rpc_to_plugin_manager_receiver_and_exit =
GeyserPluginService::new(confirmed_bank_receiver, geyser_plugin_config_files); rpc_to_plugin_manager_receiver.map(|receiver| (receiver, exit.clone()));
let result = GeyserPluginService::new_with_receiver(
confirmed_bank_receiver,
geyser_plugin_config_files,
rpc_to_plugin_manager_receiver_and_exit,
);
match result { match result {
Ok(geyser_plugin_service) => Some(geyser_plugin_service), Ok(geyser_plugin_service) => Some(geyser_plugin_service),
Err(err) => { Err(err) => {
@ -483,7 +494,6 @@ impl Validator {
start.stop(); start.stop();
info!("done. {}", start); info!("done. {}", start);
let exit = Arc::new(AtomicBool::new(false));
{ {
let exit = exit.clone(); let exit = exit.clone();
config config
@ -2176,6 +2186,7 @@ mod tests {
vec![LegacyContactInfo::try_from(&leader_node.info).unwrap()], vec![LegacyContactInfo::try_from(&leader_node.info).unwrap()],
&config, &config,
true, // should_check_duplicate_instance true, // should_check_duplicate_instance
None, // rpc_to_plugin_manager_receiver
start_progress.clone(), start_progress.clone(),
SocketAddrSpace::Unspecified, SocketAddrSpace::Unspecified,
DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_USE_QUIC,
@ -2273,7 +2284,8 @@ mod tests {
Arc::new(RwLock::new(vec![Arc::new(vote_account_keypair)])), Arc::new(RwLock::new(vec![Arc::new(vote_account_keypair)])),
vec![LegacyContactInfo::try_from(&leader_node.info).unwrap()], vec![LegacyContactInfo::try_from(&leader_node.info).unwrap()],
&config, &config,
true, // should_check_duplicate_instance true, // should_check_duplicate_instance.
None, // rpc_to_plugin_manager_receiver
Arc::new(RwLock::new(ValidatorStartProgress::default())), Arc::new(RwLock::new(ValidatorStartProgress::default())),
SocketAddrSpace::Unspecified, SocketAddrSpace::Unspecified,
DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_USE_QUIC,

View File

@ -10,9 +10,12 @@ license = { workspace = true }
edition = { workspace = true } edition = { workspace = true }
[dependencies] [dependencies]
bs58 = { workspace = true } bs58 = { workspace = true }
crossbeam-channel = { workspace = true } crossbeam-channel = { workspace = true }
json5 = { workspace = true } json5 = { workspace = true }
jsonrpc-core = { workspace = true }
jsonrpc-server-utils = { workspace = true }
libloading = { workspace = true } libloading = { workspace = true }
log = { workspace = true } log = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }

View File

@ -1,9 +1,10 @@
/// Managing the Geyser plugins
use { use {
libloading::{Library, Symbol}, jsonrpc_core::{ErrorCode, Result as JsonRpcResult},
jsonrpc_server_utils::tokio::sync::oneshot::Sender as OneShotSender,
libloading::Library,
log::*, log::*,
solana_geyser_plugin_interface::geyser_plugin_interface::GeyserPlugin, solana_geyser_plugin_interface::geyser_plugin_interface::GeyserPlugin,
std::error::Error, std::path::Path,
}; };
#[derive(Default, Debug)] #[derive(Default, Debug)]
@ -20,26 +21,6 @@ impl GeyserPluginManager {
} }
} }
/// # Safety
///
/// This function loads the dynamically linked library specified in the path. The library
/// must do necessary initializations.
pub unsafe fn load_plugin(
&mut self,
libpath: &str,
config_file: &str,
) -> Result<(), Box<dyn Error>> {
type PluginConstructor = unsafe fn() -> *mut dyn GeyserPlugin;
let lib = Library::new(libpath)?;
let constructor: Symbol<PluginConstructor> = lib.get(b"_create_plugin")?;
let plugin_raw = constructor();
let mut plugin = Box::from_raw(plugin_raw);
plugin.on_load(config_file)?;
self.plugins.push(plugin);
self.libs.push(lib);
Ok(())
}
/// Unload all plugins and loaded plugin libraries, making sure to fire /// Unload all plugins and loaded plugin libraries, making sure to fire
/// their `on_plugin_unload()` methods so they can do any necessary cleanup. /// their `on_plugin_unload()` methods so they can do any necessary cleanup.
pub fn unload(&mut self) { pub fn unload(&mut self) {
@ -72,4 +53,401 @@ impl GeyserPluginManager {
} }
false false
} }
/// Admin RPC request handler
pub(crate) fn list_plugins(&self) -> JsonRpcResult<Vec<String>> {
Ok(self.plugins.iter().map(|p| p.name().to_owned()).collect())
}
/// Admin RPC request handler
/// # Safety
///
/// This function loads the dynamically linked library specified in the path. The library
/// must do necessary initializations.
///
/// The string returned is the name of the plugin loaded, which can only be accessed once
/// the plugin has been loaded and calling the name method.
pub(crate) fn load_plugin(
&mut self,
geyser_plugin_config_file: impl AsRef<Path>,
) -> JsonRpcResult<String> {
// First load plugin
let (mut new_plugin, new_lib, new_config_file) =
load_plugin_from_config(geyser_plugin_config_file.as_ref()).map_err(|e| {
jsonrpc_core::Error {
code: ErrorCode::InvalidRequest,
message: format!("Failed to load plugin: {e}"),
data: None,
}
})?;
// Then see if a plugin with this name already exists. If so, abort
if self
.plugins
.iter()
.any(|plugin| plugin.name().eq(new_plugin.name()))
{
return Err(jsonrpc_core::Error {
code: ErrorCode::InvalidRequest,
message: format!(
"There already exists a plugin named {} loaded. Did not load requested plugin",
new_plugin.name()
),
data: None,
});
}
// Call on_load and push plugin
new_plugin
.on_load(new_config_file)
.map_err(|on_load_err| jsonrpc_core::Error {
code: ErrorCode::InvalidRequest,
message: format!(
"on_load method of plugin {} failed: {on_load_err}",
new_plugin.name()
),
data: None,
})?;
let name = new_plugin.name().to_string();
self.plugins.push(new_plugin);
self.libs.push(new_lib);
Ok(name)
}
pub(crate) fn unload_plugin(&mut self, name: &str) -> JsonRpcResult<()> {
// Check if any plugin names match this one
let Some(idx) = self.plugins.iter().position(|plugin| plugin.name().eq(name)) else {
// If we don't find one return an error
return Err(
jsonrpc_core::error::Error {
code: ErrorCode::InvalidRequest,
message: String::from("The plugin you requested to unload is not loaded"),
data: None,
}
)
};
// Unload and drop plugin and lib
self._drop_plugin(idx);
Ok(())
}
/// Checks for a plugin with a given `name`.
/// If it exists, first unload it.
/// Then, attempt to load a new plugin
pub(crate) fn reload_plugin(&mut self, name: &str, config_file: &str) -> JsonRpcResult<()> {
// Check if any plugin names match this one
let Some(idx) = self.plugins.iter().position(|plugin| plugin.name().eq(name)) else {
// If we don't find one return an error
return Err(
jsonrpc_core::error::Error {
code: ErrorCode::InvalidRequest,
message: String::from("The plugin you requested to reload is not loaded"),
data: None,
}
)
};
// Unload and drop current plugin first in case plugin requires exclusive access to resource,
// such as a particular port or database.
self._drop_plugin(idx);
// Try to load plugin, library
// SAFETY: It is up to the validator to ensure this is a valid plugin library.
let (mut new_plugin, new_lib, new_parsed_config_file) =
load_plugin_from_config(config_file.as_ref()).map_err(|err| jsonrpc_core::Error {
code: ErrorCode::InvalidRequest,
message: err.to_string(),
data: None,
})?;
// Attempt to on_load with new plugin
match new_plugin.on_load(new_parsed_config_file) {
// On success, push plugin and library
Ok(()) => {
self.plugins.push(new_plugin);
self.libs.push(new_lib);
}
// On failure, return error
Err(err) => {
return Err(jsonrpc_core::error::Error {
code: ErrorCode::InvalidRequest,
message: format!(
"Failed to start new plugin (previous plugin was dropped!): {err}"
),
data: None,
});
}
}
Ok(())
}
fn _drop_plugin(&mut self, idx: usize) {
let mut current_plugin = self.plugins.remove(idx);
let _current_lib = self.libs.remove(idx);
current_plugin.on_unload();
}
}
#[derive(Debug)]
pub enum GeyserPluginManagerRequest {
ReloadPlugin {
name: String,
config_file: String,
response_sender: OneShotSender<JsonRpcResult<()>>,
},
UnloadPlugin {
name: String,
response_sender: OneShotSender<JsonRpcResult<()>>,
},
LoadPlugin {
config_file: String,
response_sender: OneShotSender<JsonRpcResult<String>>,
},
ListPlugins {
response_sender: OneShotSender<JsonRpcResult<Vec<String>>>,
},
}
#[derive(thiserror::Error, Debug)]
pub enum GeyserPluginManagerError {
#[error("Cannot open the the plugin config file")]
CannotOpenConfigFile(String),
#[error("Cannot read the the plugin config file")]
CannotReadConfigFile(String),
#[error("The config file is not in a valid Json format")]
InvalidConfigFileFormat(String),
#[error("Plugin library path is not specified in the config file")]
LibPathNotSet,
#[error("Invalid plugin path")]
InvalidPluginPath,
#[error("Cannot load plugin shared library")]
PluginLoadError(String),
#[error("The geyser plugin {0} is already loaded shared library")]
PluginAlreadyLoaded(String),
#[error("The GeyserPlugin on_load method failed")]
PluginStartError(String),
}
/// # Safety
///
/// This function loads the dynamically linked library specified in the path. The library
/// must do necessary initializations.
///
/// This returns the geyser plugin, the dynamic library, and the parsed config file as a &str.
/// (The geyser plugin interface requires a &str for the on_load method).
#[cfg(not(test))]
pub(crate) fn load_plugin_from_config(
geyser_plugin_config_file: &Path,
) -> Result<(Box<dyn GeyserPlugin>, Library, &str), GeyserPluginManagerError> {
use std::{fs::File, io::Read, path::PathBuf};
type PluginConstructor = unsafe fn() -> *mut dyn GeyserPlugin;
use libloading::Symbol;
let mut file = match File::open(geyser_plugin_config_file) {
Ok(file) => file,
Err(err) => {
return Err(GeyserPluginManagerError::CannotOpenConfigFile(format!(
"Failed to open the plugin config file {geyser_plugin_config_file:?}, error: {err:?}"
)));
}
};
let mut contents = String::new();
if let Err(err) = file.read_to_string(&mut contents) {
return Err(GeyserPluginManagerError::CannotReadConfigFile(format!(
"Failed to read the plugin config file {geyser_plugin_config_file:?}, error: {err:?}"
)));
}
let result: serde_json::Value = match json5::from_str(&contents) {
Ok(value) => value,
Err(err) => {
return Err(GeyserPluginManagerError::InvalidConfigFileFormat(format!(
"The config file {geyser_plugin_config_file:?} is not in a valid Json5 format, error: {err:?}"
)));
}
};
let libpath = result["libpath"]
.as_str()
.ok_or(GeyserPluginManagerError::LibPathNotSet)?;
let mut libpath = PathBuf::from(libpath);
if libpath.is_relative() {
let config_dir = geyser_plugin_config_file.parent().ok_or_else(|| {
GeyserPluginManagerError::CannotOpenConfigFile(format!(
"Failed to resolve parent of {geyser_plugin_config_file:?}",
))
})?;
libpath = config_dir.join(libpath);
}
let config_file = geyser_plugin_config_file
.as_os_str()
.to_str()
.ok_or(GeyserPluginManagerError::InvalidPluginPath)?;
let (plugin, lib) = unsafe {
let lib = Library::new(libpath)
.map_err(|e| GeyserPluginManagerError::PluginLoadError(e.to_string()))?;
let constructor: Symbol<PluginConstructor> = lib
.get(b"_create_plugin")
.map_err(|e| GeyserPluginManagerError::PluginLoadError(e.to_string()))?;
let plugin_raw = constructor();
(Box::from_raw(plugin_raw), lib)
};
Ok((plugin, lib, config_file))
}
// This is mocked for tests to avoid having to do IO with a dynamically linked library
// across different architectures at test time
//
/// This returns mocked values for the geyser plugin, the dynamic library, and the parsed config file as a &str.
/// (The geyser plugin interface requires a &str for the on_load method).
#[cfg(test)]
pub(crate) fn load_plugin_from_config(
_geyser_plugin_config_file: &Path,
) -> Result<(Box<dyn GeyserPlugin>, Library, &str), GeyserPluginManagerError> {
Ok(tests::dummy_plugin_and_library())
}
#[cfg(test)]
mod tests {
use {
crate::geyser_plugin_manager::GeyserPluginManager,
libloading::Library,
solana_geyser_plugin_interface::geyser_plugin_interface::GeyserPlugin,
std::sync::{Arc, RwLock},
};
pub(super) fn dummy_plugin_and_library() -> (Box<dyn GeyserPlugin>, Library, &'static str) {
let plugin = Box::new(TestPlugin);
let lib = {
let handle: *mut std::os::raw::c_void = &mut () as *mut _ as *mut std::os::raw::c_void;
// SAFETY: all calls to get Symbols should fail, so this is actually safe
let inner_lib = unsafe { libloading::os::unix::Library::from_raw(handle) };
Library::from(inner_lib)
};
(plugin, lib, DUMMY_CONFIG)
}
pub(super) fn dummy_plugin_and_library2() -> (Box<dyn GeyserPlugin>, Library, &'static str) {
let plugin = Box::new(TestPlugin2);
let lib = {
let handle: *mut std::os::raw::c_void = &mut () as *mut _ as *mut std::os::raw::c_void;
// SAFETY: all calls to get Symbols should fail, so this is actually safe
let inner_lib = unsafe { libloading::os::unix::Library::from_raw(handle) };
Library::from(inner_lib)
};
(plugin, lib, DUMMY_CONFIG)
}
const DUMMY_NAME: &str = "dummy";
pub(super) const DUMMY_CONFIG: &str = "dummy_config";
const ANOTHER_DUMMY_NAME: &str = "another_dummy";
#[derive(Debug)]
pub(super) struct TestPlugin;
impl GeyserPlugin for TestPlugin {
fn name(&self) -> &'static str {
DUMMY_NAME
}
}
#[derive(Debug)]
pub(super) struct TestPlugin2;
impl GeyserPlugin for TestPlugin2 {
fn name(&self) -> &'static str {
ANOTHER_DUMMY_NAME
}
}
#[test]
fn test_geyser_reload() {
// Initialize empty manager
let plugin_manager = Arc::new(RwLock::new(GeyserPluginManager::new()));
// No plugins are loaded, this should fail
let mut plugin_manager_lock = plugin_manager.write().unwrap();
let reload_result = plugin_manager_lock.reload_plugin(DUMMY_NAME, DUMMY_CONFIG);
assert_eq!(
reload_result.unwrap_err().message,
"The plugin you requested to reload is not loaded"
);
// Mock having loaded plugin (TestPlugin)
let (mut plugin, lib, config) = dummy_plugin_and_library();
plugin.on_load(config).unwrap();
plugin_manager_lock.plugins.push(plugin);
plugin_manager_lock.libs.push(lib);
// plugin_manager_lock.libs.push(lib);
assert_eq!(plugin_manager_lock.plugins[0].name(), DUMMY_NAME);
plugin_manager_lock.plugins[0].name();
// Try wrong name (same error)
const WRONG_NAME: &str = "wrong_name";
let reload_result = plugin_manager_lock.reload_plugin(WRONG_NAME, DUMMY_CONFIG);
assert_eq!(
reload_result.unwrap_err().message,
"The plugin you requested to reload is not loaded"
);
// Now try a (dummy) reload, replacing TestPlugin with TestPlugin2
let reload_result = plugin_manager_lock.reload_plugin(DUMMY_NAME, DUMMY_CONFIG);
assert!(reload_result.is_ok());
}
#[test]
fn test_plugin_list() {
// Initialize empty manager
let plugin_manager = Arc::new(RwLock::new(GeyserPluginManager::new()));
let mut plugin_manager_lock = plugin_manager.write().unwrap();
// Load two plugins
// First
let (mut plugin, lib, config) = dummy_plugin_and_library();
plugin.on_load(config).unwrap();
plugin_manager_lock.plugins.push(plugin);
plugin_manager_lock.libs.push(lib);
// Second
let (mut plugin, lib, config) = dummy_plugin_and_library2();
plugin.on_load(config).unwrap();
plugin_manager_lock.plugins.push(plugin);
plugin_manager_lock.libs.push(lib);
// Check that both plugins are returned in the list
let plugins = plugin_manager_lock.list_plugins().unwrap();
assert!(plugins.iter().any(|name| name.eq(DUMMY_NAME)));
assert!(plugins.iter().any(|name| name.eq(ANOTHER_DUMMY_NAME)));
}
#[test]
fn test_plugin_load_unload() {
// Initialize empty manager
let plugin_manager = Arc::new(RwLock::new(GeyserPluginManager::new()));
let mut plugin_manager_lock = plugin_manager.write().unwrap();
// Load rpc call
let load_result = plugin_manager_lock.load_plugin(DUMMY_CONFIG);
assert!(load_result.is_ok());
assert_eq!(plugin_manager_lock.plugins.len(), 1);
// Unload rpc call
let unload_result = plugin_manager_lock.unload_plugin(DUMMY_NAME);
assert!(unload_result.is_ok());
assert_eq!(plugin_manager_lock.plugins.len(), 0);
}
} }

View File

@ -3,8 +3,10 @@ use {
accounts_update_notifier::AccountsUpdateNotifierImpl, accounts_update_notifier::AccountsUpdateNotifierImpl,
block_metadata_notifier::BlockMetadataNotifierImpl, block_metadata_notifier::BlockMetadataNotifierImpl,
block_metadata_notifier_interface::BlockMetadataNotifierLock, block_metadata_notifier_interface::BlockMetadataNotifierLock,
geyser_plugin_manager::GeyserPluginManager, slot_status_notifier::SlotStatusNotifierImpl, geyser_plugin_manager::{GeyserPluginManager, GeyserPluginManagerRequest},
slot_status_observer::SlotStatusObserver, transaction_notifier::TransactionNotifierImpl, slot_status_notifier::SlotStatusNotifierImpl,
slot_status_observer::SlotStatusObserver,
transaction_notifier::TransactionNotifierImpl,
}, },
crossbeam_channel::Receiver, crossbeam_channel::Receiver,
log::*, log::*,
@ -14,36 +16,14 @@ use {
}, },
solana_runtime::accounts_update_notifier_interface::AccountsUpdateNotifier, solana_runtime::accounts_update_notifier_interface::AccountsUpdateNotifier,
std::{ std::{
fs::File,
io::Read,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::{Arc, RwLock}, sync::{atomic::AtomicBool, Arc, RwLock},
thread, thread,
time::Duration,
}, },
thiserror::Error, thiserror::Error,
}; };
#[derive(Error, Debug)]
pub enum GeyserPluginServiceError {
#[error("Cannot open the the plugin config file")]
CannotOpenConfigFile(String),
#[error("Cannot read the the plugin config file")]
CannotReadConfigFile(String),
#[error("The config file is not in a valid Json format")]
InvalidConfigFileFormat(String),
#[error("Plugin library path is not specified in the config file")]
LibPathNotSet,
#[error("Invalid plugin path")]
InvalidPluginPath,
#[error("Cannot load plugin shared library")]
PluginLoadError(String),
}
/// The service managing the Geyser plugin workflow. /// The service managing the Geyser plugin workflow.
pub struct GeyserPluginService { pub struct GeyserPluginService {
slot_status_observer: Option<SlotStatusObserver>, slot_status_observer: Option<SlotStatusObserver>,
@ -66,10 +46,20 @@ impl GeyserPluginService {
/// shall create the implementation of `GeyserPlugin` and returns to the caller. /// shall create the implementation of `GeyserPlugin` and returns to the caller.
/// The rest of the JSON fields' definition is up to to the concrete plugin implementation /// The rest of the JSON fields' definition is up to to the concrete plugin implementation
/// It is usually used to configure the connection information for the external data store. /// It is usually used to configure the connection information for the external data store.
pub fn new( pub fn new(
confirmed_bank_receiver: Receiver<BankNotification>, confirmed_bank_receiver: Receiver<BankNotification>,
geyser_plugin_config_files: &[PathBuf], geyser_plugin_config_files: &[PathBuf],
) -> Result<Self, GeyserPluginServiceError> {
Self::new_with_receiver(confirmed_bank_receiver, geyser_plugin_config_files, None)
}
pub fn new_with_receiver(
confirmed_bank_receiver: Receiver<BankNotification>,
geyser_plugin_config_files: &[PathBuf],
rpc_to_plugin_manager_receiver_and_exit: Option<(
Receiver<GeyserPluginManagerRequest>,
Arc<AtomicBool>,
)>,
) -> Result<Self, GeyserPluginServiceError> { ) -> Result<Self, GeyserPluginServiceError> {
info!( info!(
"Starting GeyserPluginService from config files: {:?}", "Starting GeyserPluginService from config files: {:?}",
@ -80,10 +70,10 @@ impl GeyserPluginService {
for geyser_plugin_config_file in geyser_plugin_config_files { for geyser_plugin_config_file in geyser_plugin_config_files {
Self::load_plugin(&mut plugin_manager, geyser_plugin_config_file)?; Self::load_plugin(&mut plugin_manager, geyser_plugin_config_file)?;
} }
let account_data_notifications_enabled = let account_data_notifications_enabled =
plugin_manager.account_data_notifications_enabled(); plugin_manager.account_data_notifications_enabled();
let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled(); let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled();
let plugin_manager = Arc::new(RwLock::new(plugin_manager)); let plugin_manager = Arc::new(RwLock::new(plugin_manager));
let accounts_update_notifier: Option<AccountsUpdateNotifier> = let accounts_update_notifier: Option<AccountsUpdateNotifier> =
@ -122,6 +112,12 @@ impl GeyserPluginService {
(None, None) (None, None)
}; };
// Initialize plugin manager rpc handler thread if needed
if let Some((request_receiver, exit)) = rpc_to_plugin_manager_receiver_and_exit {
let plugin_manager = plugin_manager.clone();
Self::start_manager_rpc_handler(plugin_manager, request_receiver, exit)
};
info!("Started GeyserPluginService"); info!("Started GeyserPluginService");
Ok(GeyserPluginService { Ok(GeyserPluginService {
slot_status_observer, slot_status_observer,
@ -136,56 +132,9 @@ impl GeyserPluginService {
plugin_manager: &mut GeyserPluginManager, plugin_manager: &mut GeyserPluginManager,
geyser_plugin_config_file: &Path, geyser_plugin_config_file: &Path,
) -> Result<(), GeyserPluginServiceError> { ) -> Result<(), GeyserPluginServiceError> {
let mut file = match File::open(geyser_plugin_config_file) { plugin_manager
Ok(file) => file, .load_plugin(geyser_plugin_config_file)
Err(err) => { .map_err(|e| GeyserPluginServiceError::FailedToLoadPlugin(e.into()))?;
return Err(GeyserPluginServiceError::CannotOpenConfigFile(format!(
"Failed to open the plugin config file {geyser_plugin_config_file:?}, error: {err:?}"
)));
}
};
let mut contents = String::new();
if let Err(err) = file.read_to_string(&mut contents) {
return Err(GeyserPluginServiceError::CannotReadConfigFile(format!(
"Failed to read the plugin config file {geyser_plugin_config_file:?}, error: {err:?}"
)));
}
let result: serde_json::Value = match json5::from_str(&contents) {
Ok(value) => value,
Err(err) => {
return Err(GeyserPluginServiceError::InvalidConfigFileFormat(format!(
"The config file {geyser_plugin_config_file:?} is not in a valid Json5 format, error: {err:?}"
)));
}
};
let libpath = result["libpath"]
.as_str()
.ok_or(GeyserPluginServiceError::LibPathNotSet)?;
let mut libpath = PathBuf::from(libpath);
if libpath.is_relative() {
let config_dir = geyser_plugin_config_file.parent().ok_or_else(|| {
GeyserPluginServiceError::CannotOpenConfigFile(format!(
"Failed to resolve parent of {geyser_plugin_config_file:?}",
))
})?;
libpath = config_dir.join(libpath);
}
let config_file = geyser_plugin_config_file
.as_os_str()
.to_str()
.ok_or(GeyserPluginServiceError::InvalidPluginPath)?;
unsafe {
let result = plugin_manager.load_plugin(libpath.to_str().unwrap(), config_file);
if let Err(err) = result {
let msg = format!("Failed to load the plugin library: {libpath:?}, error: {err:?}");
return Err(GeyserPluginServiceError::PluginLoadError(msg));
}
}
Ok(()) Ok(())
} }
@ -208,4 +157,71 @@ impl GeyserPluginService {
self.plugin_manager.write().unwrap().unload(); self.plugin_manager.write().unwrap().unload();
Ok(()) Ok(())
} }
fn start_manager_rpc_handler(
plugin_manager: Arc<RwLock<GeyserPluginManager>>,
request_receiver: Receiver<GeyserPluginManagerRequest>,
exit: Arc<AtomicBool>,
) {
thread::Builder::new()
.name("SolGeyserPluginRpc".to_string())
.spawn(move || loop {
if let Ok(request) = request_receiver.recv_timeout(Duration::from_secs(5)) {
match request {
GeyserPluginManagerRequest::ListPlugins { response_sender } => {
let plugin_list = plugin_manager.read().unwrap().list_plugins();
response_sender
.send(plugin_list)
.expect("Admin rpc service will be waiting for response");
}
GeyserPluginManagerRequest::ReloadPlugin {
ref name,
ref config_file,
response_sender,
} => {
let reload_result = plugin_manager
.write()
.unwrap()
.reload_plugin(name, config_file);
response_sender
.send(reload_result)
.expect("Admin rpc service will be waiting for response");
}
GeyserPluginManagerRequest::LoadPlugin {
ref config_file,
response_sender,
} => {
let load_result =
plugin_manager.write().unwrap().load_plugin(config_file);
response_sender
.send(load_result)
.expect("Admin rpc service will be waiting for response");
}
GeyserPluginManagerRequest::UnloadPlugin {
ref name,
response_sender,
} => {
let unload_result = plugin_manager.write().unwrap().unload_plugin(name);
response_sender
.send(unload_result)
.expect("Admin rpc service will be waiting for response");
}
}
}
if exit.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
})
.unwrap();
}
}
#[derive(Error, Debug)]
pub enum GeyserPluginServiceError {
#[error("Failed to load a geyser plugin")]
FailedToLoadPlugin(#[from] Box<dyn std::error::Error>),
} }

View File

@ -6,3 +6,5 @@ pub mod geyser_plugin_service;
pub mod slot_status_notifier; pub mod slot_status_notifier;
pub mod slot_status_observer; pub mod slot_status_observer;
pub mod transaction_notifier; pub mod transaction_notifier;
pub use geyser_plugin_manager::GeyserPluginManagerRequest;

View File

@ -102,7 +102,7 @@ use {
fs::File, fs::File,
io::{self, stdout, BufRead, BufReader, Write}, io::{self, stdout, BufRead, BufReader, Write},
path::{Path, PathBuf}, path::{Path, PathBuf},
process::{exit, Command, Stdio}, process::{Command, Stdio},
str::FromStr, str::FromStr,
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
@ -312,7 +312,7 @@ fn output_ledger(
.slot_meta_iterator(starting_slot) .slot_meta_iterator(starting_slot)
.unwrap_or_else(|err| { .unwrap_or_else(|err| {
eprintln!("Failed to load entries starting from slot {starting_slot}: {err:?}"); eprintln!("Failed to load entries starting from slot {starting_slot}: {err:?}");
exit(1); std::process::exit(1);
}); });
if method == LedgerOutputMethod::Json { if method == LedgerOutputMethod::Json {
@ -929,7 +929,7 @@ fn open_blockstore(
error!("Blockstore is incompatible with current software and requires updates"); error!("Blockstore is incompatible with current software and requires updates");
if !force_update_to_open { if !force_update_to_open {
error!("Use --force-update-to-open to allow blockstore to update"); error!("Use --force-update-to-open to allow blockstore to update");
exit(1); std::process::exit(1);
} }
open_blockstore_with_temporary_primary_access( open_blockstore_with_temporary_primary_access(
ledger_path, ledger_path,
@ -941,12 +941,12 @@ fn open_blockstore(
"Failed to open blockstore (with --force-update-to-open) at {:?}: {:?}", "Failed to open blockstore (with --force-update-to-open) at {:?}: {:?}",
ledger_path, err ledger_path, err
); );
exit(1); std::process::exit(1);
}) })
} }
Err(err) => { Err(err) => {
eprintln!("Failed to open blockstore at {ledger_path:?}: {err:?}"); eprintln!("Failed to open blockstore at {ledger_path:?}: {err:?}");
exit(1); std::process::exit(1);
} }
} }
} }
@ -1128,14 +1128,14 @@ fn load_bank_forks(
"Unable to load bank forks at slot {halt_slot} because it is less than the starting slot {starting_slot}. \ "Unable to load bank forks at slot {halt_slot} because it is less than the starting slot {starting_slot}. \
The starting slot will be the latest snapshot slot, or genesis if --no-snapshot flag specified or no snapshots found." The starting slot will be the latest snapshot slot, or genesis if --no-snapshot flag specified or no snapshots found."
); );
exit(1); std::process::exit(1);
} }
// Check if we have the slot data necessary to replay from starting_slot to >= halt_slot. // Check if we have the slot data necessary to replay from starting_slot to >= halt_slot.
if !blockstore.slot_range_connected(starting_slot, halt_slot) { if !blockstore.slot_range_connected(starting_slot, halt_slot) {
eprintln!( eprintln!(
"Unable to load bank forks at slot {halt_slot} due to disconnected blocks.", "Unable to load bank forks at slot {halt_slot} due to disconnected blocks.",
); );
exit(1); std::process::exit(1);
} }
} }
} }
@ -1164,7 +1164,7 @@ fn load_bank_forks(
{ {
// Couldn't get Primary access, error out to be defensive. // Couldn't get Primary access, error out to be defensive.
eprintln!("Error: custom accounts path is not supported under secondary access"); eprintln!("Error: custom accounts path is not supported under secondary access");
exit(1); std::process::exit(1);
} }
} }
account_paths.split(',').map(PathBuf::from).collect() account_paths.split(',').map(PathBuf::from).collect()
@ -1187,7 +1187,7 @@ fn load_bank_forks(
Ok((account_run_path, _account_snapshot_path)) => account_run_path, Ok((account_run_path, _account_snapshot_path)) => account_run_path,
Err(err) => { Err(err) => {
eprintln!("Unable to create account run and snapshot sub directories: {}, err: {err:?}", account_path.display()); eprintln!("Unable to create account run and snapshot sub directories: {}, err: {err:?}", account_path.display());
exit(1); std::process::exit(1);
} }
} }
}).collect(); }).collect();
@ -1207,6 +1207,7 @@ fn load_bank_forks(
let mut accounts_update_notifier = Option::<AccountsUpdateNotifier>::default(); let mut accounts_update_notifier = Option::<AccountsUpdateNotifier>::default();
let mut transaction_notifier = Option::<TransactionNotifierLock>::default(); let mut transaction_notifier = Option::<TransactionNotifierLock>::default();
let exit = Arc::new(AtomicBool::new(false));
if arg_matches.is_present("geyser_plugin_config") { if arg_matches.is_present("geyser_plugin_config") {
let geyser_config_files = values_t_or_exit!(arg_matches, "geyser_plugin_config", String) let geyser_config_files = values_t_or_exit!(arg_matches, "geyser_plugin_config", String)
.into_iter() .into_iter()
@ -1215,11 +1216,12 @@ fn load_bank_forks(
let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded(); let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
drop(confirmed_bank_sender); drop(confirmed_bank_sender);
let geyser_service = let geyser_service =
GeyserPluginService::new(confirmed_bank_receiver, &geyser_config_files).unwrap_or_else( GeyserPluginService::new(confirmed_bank_receiver, &geyser_config_files).unwrap_or_else(
|err| { |err| {
eprintln!("Failed to setup Geyser service: {err:?}"); eprintln!("Failed to setup Geyser service: {err:?}");
exit(1); std::process::exit(1);
}, },
); );
accounts_update_notifier = geyser_service.get_accounts_update_notifier(); accounts_update_notifier = geyser_service.get_accounts_update_notifier();
@ -1257,7 +1259,6 @@ fn load_bank_forks(
snapshot_request_handler, snapshot_request_handler,
pruned_banks_request_handler, pruned_banks_request_handler,
}; };
let exit = Arc::new(AtomicBool::new(false));
let accounts_background_service = AccountsBackgroundService::new( let accounts_background_service = AccountsBackgroundService::new(
bank_forks.clone(), bank_forks.clone(),
&exit, &exit,
@ -2509,7 +2510,7 @@ fn main() {
) )
.unwrap_or_else(|err| { .unwrap_or_else(|err| {
eprintln!("Failed to write genesis config: {err:?}"); eprintln!("Failed to write genesis config: {err:?}");
exit(1); std::process::exit(1);
}); });
println!("{}", open_genesis_config_by(&output_directory, arg_matches)); println!("{}", open_genesis_config_by(&output_directory, arg_matches));
@ -2555,7 +2556,7 @@ fn main() {
} }
Err(err) => { Err(err) => {
eprintln!("Failed to load ledger: {err:?}"); eprintln!("Failed to load ledger: {err:?}");
exit(1); std::process::exit(1);
} }
} }
} }
@ -2632,7 +2633,7 @@ fn main() {
} }
Err(err) => { Err(err) => {
eprintln!("Failed to load ledger: {err:?}"); eprintln!("Failed to load ledger: {err:?}");
exit(1); std::process::exit(1);
} }
} }
} }
@ -2864,7 +2865,7 @@ fn main() {
) )
.unwrap_or_else(|err| { .unwrap_or_else(|err| {
eprintln!("Ledger verification failed: {err:?}"); eprintln!("Ledger verification failed: {err:?}");
exit(1); std::process::exit(1);
}); });
if print_accounts_stats { if print_accounts_stats {
let working_bank = bank_forks.read().unwrap().working_bank(); let working_bank = bank_forks.read().unwrap().working_bank();
@ -2925,7 +2926,7 @@ fn main() {
} }
Err(err) => { Err(err) => {
eprintln!("Failed to load ledger: {err:?}"); eprintln!("Failed to load ledger: {err:?}");
exit(1); std::process::exit(1);
} }
} }
} }
@ -2968,7 +2969,7 @@ fn main() {
"Error: insufficient --bootstrap-validator-stake-lamports. \ "Error: insufficient --bootstrap-validator-stake-lamports. \
Minimum amount is {minimum_stake_lamports}" Minimum amount is {minimum_stake_lamports}"
); );
exit(1); std::process::exit(1);
} }
let bootstrap_validator_pubkeys = pubkeys_of(arg_matches, "bootstrap_validator"); let bootstrap_validator_pubkeys = pubkeys_of(arg_matches, "bootstrap_validator");
let accounts_to_remove = let accounts_to_remove =
@ -2985,7 +2986,7 @@ fn main() {
|s| { |s| {
s.parse::<SnapshotVersion>().unwrap_or_else(|e| { s.parse::<SnapshotVersion>().unwrap_or_else(|e| {
eprintln!("Error: {e}"); eprintln!("Error: {e}");
exit(1) std::process::exit(1)
}) })
}, },
); );
@ -3032,7 +3033,7 @@ fn main() {
eprintln!( eprintln!(
"Error: snapshot slot {snapshot_slot} does not exist in blockstore or is not full.", "Error: snapshot slot {snapshot_slot} does not exist in blockstore or is not full.",
); );
exit(1); std::process::exit(1);
} }
let ending_slot = if is_minimized { let ending_slot = if is_minimized {
@ -3041,7 +3042,7 @@ fn main() {
eprintln!( eprintln!(
"Error: ending_slot ({ending_slot}) must be greater than snapshot_slot ({snapshot_slot})" "Error: ending_slot ({ending_slot}) must be greater than snapshot_slot ({snapshot_slot})"
); );
exit(1); std::process::exit(1);
} }
Some(ending_slot) Some(ending_slot)
@ -3086,7 +3087,7 @@ fn main() {
.get(snapshot_slot) .get(snapshot_slot)
.unwrap_or_else(|| { .unwrap_or_else(|| {
eprintln!("Error: Slot {snapshot_slot} is not available"); eprintln!("Error: Slot {snapshot_slot} is not available");
exit(1); std::process::exit(1);
}); });
let child_bank_required = rent_burn_percentage.is_ok() let child_bank_required = rent_burn_percentage.is_ok()
@ -3141,7 +3142,7 @@ fn main() {
eprintln!( eprintln!(
"Error: Account does not exist, unable to remove it: {address}" "Error: Account does not exist, unable to remove it: {address}"
); );
exit(1); std::process::exit(1);
}); });
account.set_lamports(0); account.set_lamports(0);
@ -3209,7 +3210,7 @@ fn main() {
eprintln!( eprintln!(
"Error: --bootstrap-validator pubkeys cannot be duplicated" "Error: --bootstrap-validator pubkeys cannot be duplicated"
); );
exit(1); std::process::exit(1);
} }
} }
@ -3283,7 +3284,7 @@ fn main() {
eprintln!( eprintln!(
"Error: --warp-slot too close. Must be >= {minimum_warp_slot}" "Error: --warp-slot too close. Must be >= {minimum_warp_slot}"
); );
exit(1); std::process::exit(1);
} }
} else { } else {
warn!("Warping to slot {}", minimum_warp_slot); warn!("Warping to slot {}", minimum_warp_slot);
@ -3333,7 +3334,7 @@ fn main() {
if is_incremental { if is_incremental {
if starting_snapshot_hashes.is_none() { if starting_snapshot_hashes.is_none() {
eprintln!("Unable to create incremental snapshot without a base full snapshot"); eprintln!("Unable to create incremental snapshot without a base full snapshot");
exit(1); std::process::exit(1);
} }
let full_snapshot_slot = starting_snapshot_hashes.unwrap().full.hash.0; let full_snapshot_slot = starting_snapshot_hashes.unwrap().full.hash.0;
if bank.slot() <= full_snapshot_slot { if bank.slot() <= full_snapshot_slot {
@ -3342,7 +3343,7 @@ fn main() {
bank.slot(), bank.slot(),
full_snapshot_slot, full_snapshot_slot,
); );
exit(1); std::process::exit(1);
} }
let incremental_snapshot_archive_info = let incremental_snapshot_archive_info =
@ -3359,7 +3360,7 @@ fn main() {
) )
.unwrap_or_else(|err| { .unwrap_or_else(|err| {
eprintln!("Unable to create incremental snapshot: {err}"); eprintln!("Unable to create incremental snapshot: {err}");
exit(1); std::process::exit(1);
}); });
println!( println!(
@ -3383,7 +3384,7 @@ fn main() {
) )
.unwrap_or_else(|err| { .unwrap_or_else(|err| {
eprintln!("Unable to create snapshot: {err}"); eprintln!("Unable to create snapshot: {err}");
exit(1); std::process::exit(1);
}); });
println!( println!(
@ -3414,7 +3415,7 @@ fn main() {
} }
Err(err) => { Err(err) => {
eprintln!("Failed to load ledger: {err:?}"); eprintln!("Failed to load ledger: {err:?}");
exit(1); std::process::exit(1);
} }
} }
} }
@ -3444,7 +3445,7 @@ fn main() {
) )
.unwrap_or_else(|err| { .unwrap_or_else(|err| {
eprintln!("Failed to load ledger: {err:?}"); eprintln!("Failed to load ledger: {err:?}");
exit(1); std::process::exit(1);
}); });
let bank = bank_forks.read().unwrap().working_bank(); let bank = bank_forks.read().unwrap().working_bank();
@ -3535,7 +3536,7 @@ fn main() {
let slot = bank_forks.working_bank().slot(); let slot = bank_forks.working_bank().slot();
let bank = bank_forks.get(slot).unwrap_or_else(|| { let bank = bank_forks.get(slot).unwrap_or_else(|| {
eprintln!("Error: Slot {slot} is not available"); eprintln!("Error: Slot {slot} is not available");
exit(1); std::process::exit(1);
}); });
if arg_matches.is_present("recalculate_capitalization") { if arg_matches.is_present("recalculate_capitalization") {
@ -3566,7 +3567,7 @@ fn main() {
base_bank.epoch(), base_bank.epoch(),
warp_epoch warp_epoch
); );
exit(1); std::process::exit(1);
} }
if let Ok(raw_inflation) = value_t!(arg_matches, "inflation", String) { if let Ok(raw_inflation) = value_t!(arg_matches, "inflation", String) {
@ -4032,7 +4033,7 @@ fn main() {
} }
Err(err) => { Err(err) => {
eprintln!("Failed to load ledger: {err:?}"); eprintln!("Failed to load ledger: {err:?}");
exit(1); std::process::exit(1);
} }
} }
} }
@ -4061,20 +4062,20 @@ fn main() {
let slots: Vec<_> = metas.map(|(slot, _)| slot).collect(); let slots: Vec<_> = metas.map(|(slot, _)| slot).collect();
if slots.is_empty() { if slots.is_empty() {
eprintln!("Purge range is empty"); eprintln!("Purge range is empty");
exit(1); std::process::exit(1);
} }
*slots.last().unwrap() *slots.last().unwrap()
} }
Err(err) => { Err(err) => {
eprintln!("Unable to read the Ledger: {err:?}"); eprintln!("Unable to read the Ledger: {err:?}");
exit(1); std::process::exit(1);
} }
}, },
}; };
if end_slot < start_slot { if end_slot < start_slot {
eprintln!("end slot {end_slot} is less than start slot {start_slot}"); eprintln!("end slot {end_slot} is less than start slot {start_slot}");
exit(1); std::process::exit(1);
} }
info!( info!(
"Purging data from slots {} to {} ({} slots) (skip compaction: {}) (dead slot only: {})", "Purging data from slots {} to {} ({} slots) (skip compaction: {}) (dead slot only: {})",
@ -4223,7 +4224,7 @@ fn main() {
Either adjust `--until` value, or pass a larger `--repair-limit` \ Either adjust `--until` value, or pass a larger `--repair-limit` \
to override the limit", to override the limit",
); );
exit(1); std::process::exit(1);
} }
let ancestor_iterator = AncestorIterator::new(start_root, &blockstore) let ancestor_iterator = AncestorIterator::new(start_root, &blockstore)
.take_while(|&slot| slot >= end_root); .take_while(|&slot| slot >= end_root);
@ -4238,7 +4239,7 @@ fn main() {
.set_roots(roots_to_fix.iter()) .set_roots(roots_to_fix.iter())
.unwrap_or_else(|err| { .unwrap_or_else(|err| {
eprintln!("Unable to set roots {roots_to_fix:?}: {err}"); eprintln!("Unable to set roots {roots_to_fix:?}: {err}");
exit(1); std::process::exit(1);
}); });
} }
} else { } else {
@ -4312,7 +4313,7 @@ fn main() {
} }
Err(err) => { Err(err) => {
eprintln!("Unable to read the Ledger: {err:?}"); eprintln!("Unable to read the Ledger: {err:?}");
exit(1); std::process::exit(1);
} }
}; };
} }
@ -4364,7 +4365,7 @@ fn main() {
} }
("", _) => { ("", _) => {
eprintln!("{}", matches.usage()); eprintln!("{}", matches.usage());
exit(1); std::process::exit(1);
} }
_ => unreachable!(), _ => unreachable!(),
}; };

View File

@ -283,6 +283,7 @@ impl LocalCluster {
vec![], vec![],
&leader_config, &leader_config,
true, // should_check_duplicate_instance true, // should_check_duplicate_instance
None, // rpc_to_plugin_manager_receiver
Arc::new(RwLock::new(ValidatorStartProgress::default())), Arc::new(RwLock::new(ValidatorStartProgress::default())),
socket_addr_space, socket_addr_space,
DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_USE_QUIC,
@ -489,6 +490,7 @@ impl LocalCluster {
vec![LegacyContactInfo::try_from(&self.entry_point_info).unwrap()], vec![LegacyContactInfo::try_from(&self.entry_point_info).unwrap()],
&config, &config,
true, // should_check_duplicate_instance true, // should_check_duplicate_instance
None, // rpc_to_plugin_manager_receiver
Arc::new(RwLock::new(ValidatorStartProgress::default())), Arc::new(RwLock::new(ValidatorStartProgress::default())),
socket_addr_space, socket_addr_space,
DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_USE_QUIC,
@ -862,6 +864,7 @@ impl Cluster for LocalCluster {
.unwrap_or_default(), .unwrap_or_default(),
&safe_clone_config(&cluster_validator_info.config), &safe_clone_config(&cluster_validator_info.config),
true, // should_check_duplicate_instance true, // should_check_duplicate_instance
None, // rpc_to_plugin_manager_receiver
Arc::new(RwLock::new(ValidatorStartProgress::default())), Arc::new(RwLock::new(ValidatorStartProgress::default())),
socket_addr_space, socket_addr_space,
DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_USE_QUIC,

View File

@ -14,7 +14,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
account_paths: config.account_paths.clone(), account_paths: config.account_paths.clone(),
account_shrink_paths: config.account_shrink_paths.clone(), account_shrink_paths: config.account_shrink_paths.clone(),
rpc_config: config.rpc_config.clone(), rpc_config: config.rpc_config.clone(),
geyser_plugin_config_files: config.geyser_plugin_config_files.clone(), on_start_geyser_plugin_config_files: config.on_start_geyser_plugin_config_files.clone(),
rpc_addrs: config.rpc_addrs, rpc_addrs: config.rpc_addrs,
pubsub_config: config.pubsub_config.clone(), pubsub_config: config.pubsub_config.clone(),
snapshot_config: config.snapshot_config.clone(), snapshot_config: config.snapshot_config.clone(),

View File

@ -4834,6 +4834,8 @@ dependencies = [
"bs58", "bs58",
"crossbeam-channel", "crossbeam-channel",
"json5", "json5",
"jsonrpc-core",
"jsonrpc-server-utils",
"libloading", "libloading",
"log", "log",
"serde_json", "serde_json",
@ -6103,12 +6105,14 @@ version = "1.16.0"
dependencies = [ dependencies = [
"base64 0.13.0", "base64 0.13.0",
"bincode", "bincode",
"crossbeam-channel",
"log", "log",
"serde_derive", "serde_derive",
"serde_json", "serde_json",
"solana-cli-output", "solana-cli-output",
"solana-client", "solana-client",
"solana-core", "solana-core",
"solana-geyser-plugin-manager",
"solana-gossip", "solana-gossip",
"solana-ledger", "solana-ledger",
"solana-logger 1.16.0", "solana-logger 1.16.0",
@ -6219,6 +6223,7 @@ dependencies = [
"jsonrpc-server-utils", "jsonrpc-server-utils",
"lazy_static", "lazy_static",
"libc", "libc",
"libloading",
"log", "log",
"num_cpus", "num_cpus",
"rand 0.7.3", "rand 0.7.3",
@ -6234,6 +6239,8 @@ dependencies = [
"solana-entry", "solana-entry",
"solana-faucet", "solana-faucet",
"solana-genesis-utils", "solana-genesis-utils",
"solana-geyser-plugin-interface",
"solana-geyser-plugin-manager",
"solana-gossip", "solana-gossip",
"solana-ledger", "solana-ledger",
"solana-logger 1.16.0", "solana-logger 1.16.0",

View File

@ -12,12 +12,14 @@ edition = { workspace = true }
[dependencies] [dependencies]
base64 = { workspace = true } base64 = { workspace = true }
bincode = { workspace = true } bincode = { workspace = true }
crossbeam-channel = { workspace = true }
log = { workspace = true } log = { workspace = true }
serde_derive = { workspace = true } serde_derive = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
solana-cli-output = { workspace = true } solana-cli-output = { workspace = true }
solana-client = { workspace = true } solana-client = { workspace = true }
solana-core = { workspace = true } solana-core = { workspace = true }
solana-geyser-plugin-manager = { workspace = true }
solana-gossip = { workspace = true } solana-gossip = { workspace = true }
solana-ledger = { workspace = true } solana-ledger = { workspace = true }
solana-logger = { workspace = true } solana-logger = { workspace = true }

View File

@ -1,6 +1,6 @@
#![allow(clippy::integer_arithmetic)] #![allow(clippy::integer_arithmetic)]
use { use {
crossbeam_channel::Receiver,
log::*, log::*,
solana_cli_output::CliAccount, solana_cli_output::CliAccount,
solana_client::rpc_request::MAX_MULTIPLE_ACCOUNTS, solana_client::rpc_request::MAX_MULTIPLE_ACCOUNTS,
@ -9,6 +9,9 @@ use {
tower_storage::TowerStorage, tower_storage::TowerStorage,
validator::{Validator, ValidatorConfig, ValidatorStartProgress}, validator::{Validator, ValidatorConfig, ValidatorStartProgress},
}, },
solana_geyser_plugin_manager::{
geyser_plugin_manager::GeyserPluginManager, GeyserPluginManagerRequest,
},
solana_gossip::{ solana_gossip::{
cluster_info::{ClusterInfo, Node}, cluster_info::{ClusterInfo, Node},
gossip_service::discover_cluster, gossip_service::discover_cluster,
@ -138,6 +141,7 @@ pub struct TestValidatorGenesis {
pub log_messages_bytes_limit: Option<usize>, pub log_messages_bytes_limit: Option<usize>,
pub transaction_account_lock_limit: Option<usize>, pub transaction_account_lock_limit: Option<usize>,
pub tpu_enable_udp: bool, pub tpu_enable_udp: bool,
pub geyser_plugin_manager: Arc<RwLock<GeyserPluginManager>>,
admin_rpc_service_post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>, admin_rpc_service_post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
} }
@ -172,6 +176,7 @@ impl Default for TestValidatorGenesis {
log_messages_bytes_limit: Option::<usize>::default(), log_messages_bytes_limit: Option::<usize>::default(),
transaction_account_lock_limit: Option::<usize>::default(), transaction_account_lock_limit: Option::<usize>::default(),
tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP, tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP,
geyser_plugin_manager: Arc::new(RwLock::new(GeyserPluginManager::new())),
admin_rpc_service_post_init: admin_rpc_service_post_init:
Arc::<RwLock<Option<AdminRpcRequestMetadataPostInit>>>::default(), Arc::<RwLock<Option<AdminRpcRequestMetadataPostInit>>>::default(),
} }
@ -530,7 +535,26 @@ impl TestValidatorGenesis {
mint_address: Pubkey, mint_address: Pubkey,
socket_addr_space: SocketAddrSpace, socket_addr_space: SocketAddrSpace,
) -> Result<TestValidator, Box<dyn std::error::Error>> { ) -> Result<TestValidator, Box<dyn std::error::Error>> {
TestValidator::start(mint_address, self, socket_addr_space).map(|test_validator| { self.start_with_mint_address_and_geyser_plugin_rpc(mint_address, socket_addr_space, None)
}
/// Start a test validator with the address of the mint account that will receive tokens
/// created at genesis. Augments admin rpc service with dynamic geyser plugin manager if
/// the geyser plugin service is enabled at startup.
///
pub fn start_with_mint_address_and_geyser_plugin_rpc(
&self,
mint_address: Pubkey,
socket_addr_space: SocketAddrSpace,
rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
) -> Result<TestValidator, Box<dyn std::error::Error>> {
TestValidator::start(
mint_address,
self,
socket_addr_space,
rpc_to_plugin_manager_receiver,
)
.map(|test_validator| {
let runtime = tokio::runtime::Builder::new_current_thread() let runtime = tokio::runtime::Builder::new_current_thread()
.enable_io() .enable_io()
.enable_time() .enable_time()
@ -579,7 +603,7 @@ impl TestValidatorGenesis {
socket_addr_space: SocketAddrSpace, socket_addr_space: SocketAddrSpace,
) -> (TestValidator, Keypair) { ) -> (TestValidator, Keypair) {
let mint_keypair = Keypair::new(); let mint_keypair = Keypair::new();
match TestValidator::start(mint_keypair.pubkey(), self, socket_addr_space) { match TestValidator::start(mint_keypair.pubkey(), self, socket_addr_space, None) {
Ok(test_validator) => { Ok(test_validator) => {
test_validator.wait_for_nonzero_fees().await; test_validator.wait_for_nonzero_fees().await;
(test_validator, mint_keypair) (test_validator, mint_keypair)
@ -835,6 +859,7 @@ impl TestValidator {
mint_address: Pubkey, mint_address: Pubkey,
config: &TestValidatorGenesis, config: &TestValidatorGenesis,
socket_addr_space: SocketAddrSpace, socket_addr_space: SocketAddrSpace,
rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
) -> Result<Self, Box<dyn std::error::Error>> { ) -> Result<Self, Box<dyn std::error::Error>> {
let preserve_ledger = config.ledger_path.is_some(); let preserve_ledger = config.ledger_path.is_some();
let ledger_path = TestValidator::initialize_ledger(mint_address, config)?; let ledger_path = TestValidator::initialize_ledger(mint_address, config)?;
@ -897,7 +922,7 @@ impl TestValidator {
}; };
let mut validator_config = ValidatorConfig { let mut validator_config = ValidatorConfig {
geyser_plugin_config_files: config.geyser_plugin_config_files.clone(), on_start_geyser_plugin_config_files: config.geyser_plugin_config_files.clone(),
rpc_addrs: Some(( rpc_addrs: Some((
SocketAddr::new( SocketAddr::new(
IpAddr::V4(Ipv4Addr::UNSPECIFIED), IpAddr::V4(Ipv4Addr::UNSPECIFIED),
@ -949,6 +974,7 @@ impl TestValidator {
vec![], vec![],
&validator_config, &validator_config,
true, // should_check_duplicate_instance true, // should_check_duplicate_instance
rpc_to_plugin_manager_receiver,
config.start_progress.clone(), config.start_progress.clone(),
socket_addr_space, socket_addr_space,
DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_USE_QUIC,

View File

@ -25,6 +25,7 @@ jsonrpc-derive = { workspace = true }
jsonrpc-ipc-server = { workspace = true } jsonrpc-ipc-server = { workspace = true }
jsonrpc-server-utils = { workspace = true } jsonrpc-server-utils = { workspace = true }
lazy_static = { workspace = true } lazy_static = { workspace = true }
libloading = { workspace = true }
log = { workspace = true } log = { workspace = true }
num_cpus = { workspace = true } num_cpus = { workspace = true }
rand = { workspace = true } rand = { workspace = true }
@ -39,6 +40,8 @@ solana-download-utils = { workspace = true }
solana-entry = { workspace = true } solana-entry = { workspace = true }
solana-faucet = { workspace = true } solana-faucet = { workspace = true }
solana-genesis-utils = { workspace = true } solana-genesis-utils = { workspace = true }
solana-geyser-plugin-interface = { workspace = true }
solana-geyser-plugin-manager = { workspace = true }
solana-gossip = { workspace = true } solana-gossip = { workspace = true }
solana-ledger = { workspace = true } solana-ledger = { workspace = true }
solana-logger = { workspace = true } solana-logger = { workspace = true }

View File

@ -1,8 +1,11 @@
use { use {
jsonrpc_core::{MetaIoHandler, Metadata, Result}, crossbeam_channel::Sender,
jsonrpc_core::{BoxFuture, ErrorCode, MetaIoHandler, Metadata, Result},
jsonrpc_core_client::{transports::ipc, RpcError}, jsonrpc_core_client::{transports::ipc, RpcError},
jsonrpc_derive::rpc, jsonrpc_derive::rpc,
jsonrpc_ipc_server::{RequestContext, ServerBuilder}, jsonrpc_ipc_server::{
tokio::sync::oneshot::channel as oneshot_channel, RequestContext, ServerBuilder,
},
jsonrpc_server_utils::tokio, jsonrpc_server_utils::tokio,
log::*, log::*,
serde::{de::Deserializer, Deserialize, Serialize}, serde::{de::Deserializer, Deserialize, Serialize},
@ -10,6 +13,7 @@ use {
admin_rpc_post_init::AdminRpcRequestMetadataPostInit, consensus::Tower, admin_rpc_post_init::AdminRpcRequestMetadataPostInit, consensus::Tower,
tower_storage::TowerStorage, validator::ValidatorStartProgress, tower_storage::TowerStorage, validator::ValidatorStartProgress,
}, },
solana_geyser_plugin_manager::GeyserPluginManagerRequest,
solana_gossip::contact_info::ContactInfo, solana_gossip::contact_info::ContactInfo,
solana_rpc::rpc::verify_pubkey, solana_rpc::rpc::verify_pubkey,
solana_rpc_client_api::{config::RpcAccountIndex, custom_error::RpcCustomError}, solana_rpc_client_api::{config::RpcAccountIndex, custom_error::RpcCustomError},
@ -41,7 +45,9 @@ pub struct AdminRpcRequestMetadata {
pub tower_storage: Arc<dyn TowerStorage>, pub tower_storage: Arc<dyn TowerStorage>,
pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>, pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
pub post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>, pub post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
pub rpc_to_plugin_manager_sender: Option<Sender<GeyserPluginManagerRequest>>,
} }
impl Metadata for AdminRpcRequestMetadata {} impl Metadata for AdminRpcRequestMetadata {}
impl AdminRpcRequestMetadata { impl AdminRpcRequestMetadata {
@ -139,6 +145,23 @@ pub trait AdminRpc {
#[rpc(meta, name = "exit")] #[rpc(meta, name = "exit")]
fn exit(&self, meta: Self::Metadata) -> Result<()>; fn exit(&self, meta: Self::Metadata) -> Result<()>;
#[rpc(meta, name = "reloadPlugin")]
fn reload_plugin(
&self,
meta: Self::Metadata,
name: String,
config_file: String,
) -> BoxFuture<Result<()>>;
#[rpc(meta, name = "unloadPlugin")]
fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>>;
#[rpc(meta, name = "loadPlugin")]
fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>>;
#[rpc(meta, name = "listPlugins")]
fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>>;
#[rpc(meta, name = "rpcAddress")] #[rpc(meta, name = "rpcAddress")]
fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>>; fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>>;
@ -234,6 +257,121 @@ impl AdminRpc for AdminRpcImpl {
Ok(()) Ok(())
} }
fn reload_plugin(
&self,
meta: Self::Metadata,
name: String,
config_file: String,
) -> BoxFuture<Result<()>> {
Box::pin(async move {
// Construct channel for plugin to respond to this particular rpc request instance
let (response_sender, response_receiver) = oneshot_channel();
// Send request to plugin manager if there is a geyser service
if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
rpc_to_manager_sender
.send(GeyserPluginManagerRequest::ReloadPlugin {
name,
config_file,
response_sender,
})
.expect("GeyerPluginService should never drop request receiver");
} else {
return Err(jsonrpc_core::Error {
code: ErrorCode::InvalidRequest,
message: "No geyser plugin service".to_string(),
data: None,
});
}
// Await response from plugin manager
response_receiver
.await
.expect("GeyerPluginService's oneshot sender shouldn't drop early")
})
}
fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>> {
Box::pin(async move {
// Construct channel for plugin to respond to this particular rpc request instance
let (response_sender, response_receiver) = oneshot_channel();
// Send request to plugin manager if there is a geyser service
if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
rpc_to_manager_sender
.send(GeyserPluginManagerRequest::LoadPlugin {
config_file,
response_sender,
})
.expect("GeyerPluginService should never drop request receiver");
} else {
return Err(jsonrpc_core::Error {
code: ErrorCode::InvalidRequest,
message: "No geyser plugin service".to_string(),
data: None,
});
}
// Await response from plugin manager
response_receiver
.await
.expect("GeyerPluginService's oneshot sender shouldn't drop early")
})
}
fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>> {
Box::pin(async move {
// Construct channel for plugin to respond to this particular rpc request instance
let (response_sender, response_receiver) = oneshot_channel();
// Send request to plugin manager if there is a geyser service
if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
rpc_to_manager_sender
.send(GeyserPluginManagerRequest::UnloadPlugin {
name,
response_sender,
})
.expect("GeyerPluginService should never drop request receiver");
} else {
return Err(jsonrpc_core::Error {
code: ErrorCode::InvalidRequest,
message: "No geyser plugin service".to_string(),
data: None,
});
}
// Await response from plugin manager
response_receiver
.await
.expect("GeyerPluginService's oneshot sender shouldn't drop early")
})
}
fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>> {
Box::pin(async move {
// Construct channel for plugin to respond to this particular rpc request instance
let (response_sender, response_receiver) = oneshot_channel();
// Send request to plugin manager
if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
rpc_to_manager_sender
.send(GeyserPluginManagerRequest::ListPlugins { response_sender })
.expect("GeyerPluginService should never drop request receiver");
} else {
return Err(jsonrpc_core::Error {
code: ErrorCode::InvalidRequest,
message: "No geyser plugin service".to_string(),
data: None,
});
}
// Await response from plugin manager
response_receiver
.await
.expect("GeyerPluginService's oneshot sender shouldn't drop early")
})
}
fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>> { fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>> {
debug!("rpc_addr admin rpc request received"); debug!("rpc_addr admin rpc request received");
Ok(meta.rpc_addr) Ok(meta.rpc_addr)
@ -544,6 +682,7 @@ pub fn run(ledger_path: &Path, metadata: AdminRpcRequestMetadata) {
warn!("Unable to start admin rpc service: {:?}", err); warn!("Unable to start admin rpc service: {:?}", err);
} }
Ok(server) => { Ok(server) => {
info!("started admin rpc service!");
let close_handle = server.close_handle(); let close_handle = server.close_handle();
validator_exit validator_exit
.write() .write()
@ -708,6 +847,7 @@ mod tests {
repair_whitelist, repair_whitelist,
}))), }))),
staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())), staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
rpc_to_plugin_manager_sender: None,
}; };
let mut io = MetaIoHandler::default(); let mut io = MetaIoHandler::default();
io.extend_with(AdminRpcImpl.to_delegate()); io.extend_with(AdminRpcImpl.to_delegate());

View File

@ -381,6 +381,14 @@ fn main() {
let tower_storage = Arc::new(FileTowerStorage::new(ledger_path.clone())); let tower_storage = Arc::new(FileTowerStorage::new(ledger_path.clone()));
let admin_service_post_init = Arc::new(RwLock::new(None)); let admin_service_post_init = Arc::new(RwLock::new(None));
// If geyser_plugin_config value is invalid, the validator will exit when the values are extracted below
let (rpc_to_plugin_manager_sender, rpc_to_plugin_manager_receiver) =
if matches.is_present("geyser_plugin_config") {
let (sender, receiver) = unbounded();
(Some(sender), Some(receiver))
} else {
(None, None)
};
admin_rpc_service::run( admin_rpc_service::run(
&ledger_path, &ledger_path,
admin_rpc_service::AdminRpcRequestMetadata { admin_rpc_service::AdminRpcRequestMetadata {
@ -392,6 +400,7 @@ fn main() {
staked_nodes_overrides: genesis.staked_nodes_overrides.clone(), staked_nodes_overrides: genesis.staked_nodes_overrides.clone(),
post_init: admin_service_post_init, post_init: admin_service_post_init,
tower_storage: tower_storage.clone(), tower_storage: tower_storage.clone(),
rpc_to_plugin_manager_sender,
}, },
); );
let dashboard = if output == Output::Dashboard { let dashboard = if output == Output::Dashboard {
@ -542,7 +551,11 @@ fn main() {
genesis.compute_unit_limit(compute_unit_limit); genesis.compute_unit_limit(compute_unit_limit);
} }
match genesis.start_with_mint_address(mint_address, socket_addr_space) { match genesis.start_with_mint_address_and_geyser_plugin_rpc(
mint_address,
socket_addr_space,
rpc_to_plugin_manager_receiver,
) {
Ok(test_validator) => { Ok(test_validator) => {
if let Some(dashboard) = dashboard { if let Some(dashboard) = dashboard {
dashboard.run(Duration::from_millis(250)); dashboard.run(Duration::from_millis(250));

View File

@ -1465,6 +1465,48 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
SubCommand::with_name("run") SubCommand::with_name("run")
.about("Run the validator") .about("Run the validator")
) )
.subcommand(
SubCommand::with_name("plugin")
.about("Manage and view geyser plugins")
.setting(AppSettings::SubcommandRequiredElseHelp)
.setting(AppSettings::InferSubcommands)
.subcommand(
SubCommand::with_name("list")
.about("List all current running gesyer plugins")
)
.subcommand(
SubCommand::with_name("unload")
.about("Unload a particular gesyer plugin. You must specify the gesyer plugin name")
.arg(
Arg::with_name("name")
.required(true)
.takes_value(true)
)
)
.subcommand(
SubCommand::with_name("reload")
.about("Reload a particular gesyer plugin. You must specify the gesyer plugin name and the new config path")
.arg(
Arg::with_name("name")
.required(true)
.takes_value(true)
)
.arg(
Arg::with_name("config")
.required(true)
.takes_value(true)
)
)
.subcommand(
SubCommand::with_name("load")
.about("Load a new gesyer plugin. You must specify the config path. Fails if overwriting (use reload)")
.arg(
Arg::with_name("config")
.required(true)
.takes_value(true)
)
)
)
.subcommand( .subcommand(
SubCommand::with_name("set-identity") SubCommand::with_name("set-identity")
.about("Set the validator identity") .about("Set the validator identity")

View File

@ -4,6 +4,7 @@ use jemallocator::Jemalloc;
use { use {
clap::{crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit, ArgMatches}, clap::{crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit, ArgMatches},
console::style, console::style,
crossbeam_channel::unbounded,
log::*, log::*,
rand::{seq::SliceRandom, thread_rng}, rand::{seq::SliceRandom, thread_rng},
solana_clap_utils::input_parsers::{keypair_of, keypairs_of, pubkey_of, value_of}, solana_clap_utils::input_parsers::{keypair_of, keypairs_of, pubkey_of, value_of},
@ -536,6 +537,79 @@ pub fn main() {
_ => unreachable!(), _ => unreachable!(),
} }
} }
("plugin", Some(plugin_subcommand_matches)) => {
match plugin_subcommand_matches.subcommand() {
("list", _) => {
let admin_client = admin_rpc_service::connect(&ledger_path);
let plugins = admin_rpc_service::runtime()
.block_on(async move { admin_client.await?.list_plugins().await })
.unwrap_or_else(|err| {
println!("Failed to list plugins: {err}");
exit(1);
});
if !plugins.is_empty() {
println!("Currently the following plugins are loaded:");
for (plugin, i) in plugins.into_iter().zip(1..) {
println!(" {i}) {plugin}");
}
} else {
println!("There are currently no plugins loaded");
}
return;
}
("unload", Some(subcommand_matches)) => {
if let Some(name) = value_t!(subcommand_matches, "name", String).ok() {
let admin_client = admin_rpc_service::connect(&ledger_path);
admin_rpc_service::runtime()
.block_on(async {
admin_client.await?.unload_plugin(name.clone()).await
})
.unwrap_or_else(|err| {
println!("Failed to unload plugin {name}: {err:?}");
exit(1);
});
println!("Successfully unloaded plugin: {name}");
}
return;
}
("load", Some(subcommand_matches)) => {
if let Some(config) = value_t!(subcommand_matches, "config", String).ok() {
let admin_client = admin_rpc_service::connect(&ledger_path);
let name = admin_rpc_service::runtime()
.block_on(async {
admin_client.await?.load_plugin(config.clone()).await
})
.unwrap_or_else(|err| {
println!("Failed to load plugin {config}: {err:?}");
exit(1);
});
println!("Successfully loaded plugin: {name}");
}
return;
}
("reload", Some(subcommand_matches)) => {
if let Some(name) = value_t!(subcommand_matches, "name", String).ok() {
if let Some(config) = value_t!(subcommand_matches, "config", String).ok() {
let admin_client = admin_rpc_service::connect(&ledger_path);
admin_rpc_service::runtime()
.block_on(async {
admin_client
.await?
.reload_plugin(name.clone(), config.clone())
.await
})
.unwrap_or_else(|err| {
println!("Failed to reload plugin {name}: {err:?}");
exit(1);
});
println!("Successfully reloaded plugin: {name}");
}
}
return;
}
_ => unreachable!(),
}
}
("contact-info", Some(subcommand_matches)) => { ("contact-info", Some(subcommand_matches)) => {
let output_mode = subcommand_matches.value_of("output"); let output_mode = subcommand_matches.value_of("output");
let admin_client = admin_rpc_service::connect(&ledger_path); let admin_client = admin_rpc_service::connect(&ledger_path);
@ -1058,7 +1132,7 @@ pub fn main() {
let accounts_db_config = Some(accounts_db_config); let accounts_db_config = Some(accounts_db_config);
let geyser_plugin_config_files = if matches.is_present("geyser_plugin_config") { let on_start_geyser_plugin_config_files = if matches.is_present("geyser_plugin_config") {
Some( Some(
values_t_or_exit!(matches, "geyser_plugin_config", String) values_t_or_exit!(matches, "geyser_plugin_config", String)
.into_iter() .into_iter()
@ -1068,6 +1142,7 @@ pub fn main() {
} else { } else {
None None
}; };
let starting_with_geyser_plugins: bool = on_start_geyser_plugin_config_files.is_some();
let rpc_bigtable_config = if matches.is_present("enable_rpc_bigtable_ledger_storage") let rpc_bigtable_config = if matches.is_present("enable_rpc_bigtable_ledger_storage")
|| matches.is_present("enable_bigtable_ledger_upload") || matches.is_present("enable_bigtable_ledger_upload")
@ -1155,7 +1230,7 @@ pub fn main() {
usize usize
)), )),
}, },
geyser_plugin_config_files, on_start_geyser_plugin_config_files,
rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| { rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| {
( (
SocketAddr::new(rpc_bind_address, rpc_port), SocketAddr::new(rpc_bind_address, rpc_port),
@ -1513,6 +1588,13 @@ pub fn main() {
let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default())); let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
let admin_service_post_init = Arc::new(RwLock::new(None)); let admin_service_post_init = Arc::new(RwLock::new(None));
let (rpc_to_plugin_manager_sender, rpc_to_plugin_manager_receiver) =
if starting_with_geyser_plugins {
let (sender, receiver) = unbounded();
(Some(sender), Some(receiver))
} else {
(None, None)
};
admin_rpc_service::run( admin_rpc_service::run(
&ledger_path, &ledger_path,
admin_rpc_service::AdminRpcRequestMetadata { admin_rpc_service::AdminRpcRequestMetadata {
@ -1524,6 +1606,7 @@ pub fn main() {
post_init: admin_service_post_init.clone(), post_init: admin_service_post_init.clone(),
tower_storage: validator_config.tower_storage.clone(), tower_storage: validator_config.tower_storage.clone(),
staked_nodes_overrides, staked_nodes_overrides,
rpc_to_plugin_manager_sender,
}, },
); );
@ -1682,6 +1765,7 @@ pub fn main() {
cluster_entrypoints, cluster_entrypoints,
&validator_config, &validator_config,
should_check_duplicate_instance, should_check_duplicate_instance,
rpc_to_plugin_manager_receiver,
start_progress, start_progress,
socket_addr_space, socket_addr_space,
tpu_use_quic, tpu_use_quic,