Geyser Runtime Reload (#30352)

Support dynamic geyser plugin load, unload, and listing through admin RPC.
This commit is contained in:
cavemanloverboy 2023-03-16 17:03:00 -07:00 committed by GitHub
parent 9d792c1848
commit 10f49d4e26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 898 additions and 159 deletions

7
Cargo.lock generated
View File

@ -5634,6 +5634,8 @@ dependencies = [
"bs58",
"crossbeam-channel",
"json5",
"jsonrpc-core",
"jsonrpc-server-utils",
"libloading",
"log",
"serde_json",
@ -6831,12 +6833,14 @@ version = "1.16.0"
dependencies = [
"base64 0.13.0",
"bincode",
"crossbeam-channel",
"log",
"serde_derive",
"serde_json",
"solana-cli-output",
"solana-client",
"solana-core",
"solana-geyser-plugin-manager",
"solana-gossip",
"solana-ledger",
"solana-logger 1.16.0",
@ -7017,6 +7021,7 @@ dependencies = [
"jsonrpc-server-utils",
"lazy_static",
"libc",
"libloading",
"log",
"num_cpus",
"rand 0.7.3",
@ -7033,6 +7038,8 @@ dependencies = [
"solana-entry",
"solana-faucet",
"solana-genesis-utils",
"solana-geyser-plugin-interface",
"solana-geyser-plugin-manager",
"solana-gossip",
"solana-ledger",
"solana-logger 1.16.0",

View File

@ -31,7 +31,9 @@ use {
rand::{thread_rng, Rng},
solana_client::connection_cache::ConnectionCache,
solana_entry::poh::compute_hash_time_ns,
solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService,
solana_geyser_plugin_manager::{
geyser_plugin_service::GeyserPluginService, GeyserPluginManagerRequest,
},
solana_gossip::{
cluster_info::{
ClusterInfo, Node, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
@ -128,7 +130,8 @@ pub struct ValidatorConfig {
pub account_paths: Vec<PathBuf>,
pub account_shrink_paths: Option<Vec<PathBuf>>,
pub rpc_config: JsonRpcConfig,
pub geyser_plugin_config_files: Option<Vec<PathBuf>>,
/// Specifies which plugins to start up with
pub on_start_geyser_plugin_config_files: Option<Vec<PathBuf>>,
pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub)
pub pubsub_config: PubSubConfig,
pub snapshot_config: SnapshotConfig,
@ -192,7 +195,7 @@ impl Default for ValidatorConfig {
account_paths: Vec::new(),
account_shrink_paths: None,
rpc_config: JsonRpcConfig::default(),
geyser_plugin_config_files: None,
on_start_geyser_plugin_config_files: None,
rpc_addrs: None,
pubsub_config: PubSubConfig::default(),
snapshot_config: SnapshotConfig::new_load_only(),
@ -392,6 +395,7 @@ impl Validator {
cluster_entrypoints: Vec<ContactInfo>,
config: &ValidatorConfig,
should_check_duplicate_instance: bool,
rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
start_progress: Arc<RwLock<ValidatorStartProgress>>,
socket_addr_space: SocketAddrSpace,
use_quic: bool,
@ -413,12 +417,19 @@ impl Validator {
let mut bank_notification_senders = Vec::new();
let exit = Arc::new(AtomicBool::new(false));
let geyser_plugin_service =
if let Some(geyser_plugin_config_files) = &config.geyser_plugin_config_files {
if let Some(geyser_plugin_config_files) = &config.on_start_geyser_plugin_config_files {
let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
bank_notification_senders.push(confirmed_bank_sender);
let result =
GeyserPluginService::new(confirmed_bank_receiver, geyser_plugin_config_files);
let rpc_to_plugin_manager_receiver_and_exit =
rpc_to_plugin_manager_receiver.map(|receiver| (receiver, exit.clone()));
let result = GeyserPluginService::new_with_receiver(
confirmed_bank_receiver,
geyser_plugin_config_files,
rpc_to_plugin_manager_receiver_and_exit,
);
match result {
Ok(geyser_plugin_service) => Some(geyser_plugin_service),
Err(err) => {
@ -483,7 +494,6 @@ impl Validator {
start.stop();
info!("done. {}", start);
let exit = Arc::new(AtomicBool::new(false));
{
let exit = exit.clone();
config
@ -2176,6 +2186,7 @@ mod tests {
vec![LegacyContactInfo::try_from(&leader_node.info).unwrap()],
&config,
true, // should_check_duplicate_instance
None, // rpc_to_plugin_manager_receiver
start_progress.clone(),
SocketAddrSpace::Unspecified,
DEFAULT_TPU_USE_QUIC,
@ -2273,7 +2284,8 @@ mod tests {
Arc::new(RwLock::new(vec![Arc::new(vote_account_keypair)])),
vec![LegacyContactInfo::try_from(&leader_node.info).unwrap()],
&config,
true, // should_check_duplicate_instance
true, // should_check_duplicate_instance.
None, // rpc_to_plugin_manager_receiver
Arc::new(RwLock::new(ValidatorStartProgress::default())),
SocketAddrSpace::Unspecified,
DEFAULT_TPU_USE_QUIC,

View File

@ -10,9 +10,12 @@ license = { workspace = true }
edition = { workspace = true }
[dependencies]
bs58 = { workspace = true }
crossbeam-channel = { workspace = true }
json5 = { workspace = true }
jsonrpc-core = { workspace = true }
jsonrpc-server-utils = { workspace = true }
libloading = { workspace = true }
log = { workspace = true }
serde_json = { workspace = true }

View File

@ -1,9 +1,10 @@
/// Managing the Geyser plugins
use {
libloading::{Library, Symbol},
jsonrpc_core::{ErrorCode, Result as JsonRpcResult},
jsonrpc_server_utils::tokio::sync::oneshot::Sender as OneShotSender,
libloading::Library,
log::*,
solana_geyser_plugin_interface::geyser_plugin_interface::GeyserPlugin,
std::error::Error,
std::path::Path,
};
#[derive(Default, Debug)]
@ -20,26 +21,6 @@ impl GeyserPluginManager {
}
}
/// # Safety
///
/// This function loads the dynamically linked library specified in the path. The library
/// must do necessary initializations.
pub unsafe fn load_plugin(
&mut self,
libpath: &str,
config_file: &str,
) -> Result<(), Box<dyn Error>> {
type PluginConstructor = unsafe fn() -> *mut dyn GeyserPlugin;
let lib = Library::new(libpath)?;
let constructor: Symbol<PluginConstructor> = lib.get(b"_create_plugin")?;
let plugin_raw = constructor();
let mut plugin = Box::from_raw(plugin_raw);
plugin.on_load(config_file)?;
self.plugins.push(plugin);
self.libs.push(lib);
Ok(())
}
/// Unload all plugins and loaded plugin libraries, making sure to fire
/// their `on_plugin_unload()` methods so they can do any necessary cleanup.
pub fn unload(&mut self) {
@ -72,4 +53,401 @@ impl GeyserPluginManager {
}
false
}
/// Admin RPC request handler
pub(crate) fn list_plugins(&self) -> JsonRpcResult<Vec<String>> {
Ok(self.plugins.iter().map(|p| p.name().to_owned()).collect())
}
/// Admin RPC request handler
/// # Safety
///
/// This function loads the dynamically linked library specified in the path. The library
/// must do necessary initializations.
///
/// The string returned is the name of the plugin loaded, which can only be accessed once
/// the plugin has been loaded and calling the name method.
pub(crate) fn load_plugin(
&mut self,
geyser_plugin_config_file: impl AsRef<Path>,
) -> JsonRpcResult<String> {
// First load plugin
let (mut new_plugin, new_lib, new_config_file) =
load_plugin_from_config(geyser_plugin_config_file.as_ref()).map_err(|e| {
jsonrpc_core::Error {
code: ErrorCode::InvalidRequest,
message: format!("Failed to load plugin: {e}"),
data: None,
}
})?;
// Then see if a plugin with this name already exists. If so, abort
if self
.plugins
.iter()
.any(|plugin| plugin.name().eq(new_plugin.name()))
{
return Err(jsonrpc_core::Error {
code: ErrorCode::InvalidRequest,
message: format!(
"There already exists a plugin named {} loaded. Did not load requested plugin",
new_plugin.name()
),
data: None,
});
}
// Call on_load and push plugin
new_plugin
.on_load(new_config_file)
.map_err(|on_load_err| jsonrpc_core::Error {
code: ErrorCode::InvalidRequest,
message: format!(
"on_load method of plugin {} failed: {on_load_err}",
new_plugin.name()
),
data: None,
})?;
let name = new_plugin.name().to_string();
self.plugins.push(new_plugin);
self.libs.push(new_lib);
Ok(name)
}
pub(crate) fn unload_plugin(&mut self, name: &str) -> JsonRpcResult<()> {
// Check if any plugin names match this one
let Some(idx) = self.plugins.iter().position(|plugin| plugin.name().eq(name)) else {
// If we don't find one return an error
return Err(
jsonrpc_core::error::Error {
code: ErrorCode::InvalidRequest,
message: String::from("The plugin you requested to unload is not loaded"),
data: None,
}
)
};
// Unload and drop plugin and lib
self._drop_plugin(idx);
Ok(())
}
/// Checks for a plugin with a given `name`.
/// If it exists, first unload it.
/// Then, attempt to load a new plugin
pub(crate) fn reload_plugin(&mut self, name: &str, config_file: &str) -> JsonRpcResult<()> {
// Check if any plugin names match this one
let Some(idx) = self.plugins.iter().position(|plugin| plugin.name().eq(name)) else {
// If we don't find one return an error
return Err(
jsonrpc_core::error::Error {
code: ErrorCode::InvalidRequest,
message: String::from("The plugin you requested to reload is not loaded"),
data: None,
}
)
};
// Unload and drop current plugin first in case plugin requires exclusive access to resource,
// such as a particular port or database.
self._drop_plugin(idx);
// Try to load plugin, library
// SAFETY: It is up to the validator to ensure this is a valid plugin library.
let (mut new_plugin, new_lib, new_parsed_config_file) =
load_plugin_from_config(config_file.as_ref()).map_err(|err| jsonrpc_core::Error {
code: ErrorCode::InvalidRequest,
message: err.to_string(),
data: None,
})?;
// Attempt to on_load with new plugin
match new_plugin.on_load(new_parsed_config_file) {
// On success, push plugin and library
Ok(()) => {
self.plugins.push(new_plugin);
self.libs.push(new_lib);
}
// On failure, return error
Err(err) => {
return Err(jsonrpc_core::error::Error {
code: ErrorCode::InvalidRequest,
message: format!(
"Failed to start new plugin (previous plugin was dropped!): {err}"
),
data: None,
});
}
}
Ok(())
}
fn _drop_plugin(&mut self, idx: usize) {
let mut current_plugin = self.plugins.remove(idx);
let _current_lib = self.libs.remove(idx);
current_plugin.on_unload();
}
}
#[derive(Debug)]
pub enum GeyserPluginManagerRequest {
ReloadPlugin {
name: String,
config_file: String,
response_sender: OneShotSender<JsonRpcResult<()>>,
},
UnloadPlugin {
name: String,
response_sender: OneShotSender<JsonRpcResult<()>>,
},
LoadPlugin {
config_file: String,
response_sender: OneShotSender<JsonRpcResult<String>>,
},
ListPlugins {
response_sender: OneShotSender<JsonRpcResult<Vec<String>>>,
},
}
#[derive(thiserror::Error, Debug)]
pub enum GeyserPluginManagerError {
#[error("Cannot open the the plugin config file")]
CannotOpenConfigFile(String),
#[error("Cannot read the the plugin config file")]
CannotReadConfigFile(String),
#[error("The config file is not in a valid Json format")]
InvalidConfigFileFormat(String),
#[error("Plugin library path is not specified in the config file")]
LibPathNotSet,
#[error("Invalid plugin path")]
InvalidPluginPath,
#[error("Cannot load plugin shared library")]
PluginLoadError(String),
#[error("The geyser plugin {0} is already loaded shared library")]
PluginAlreadyLoaded(String),
#[error("The GeyserPlugin on_load method failed")]
PluginStartError(String),
}
/// # Safety
///
/// This function loads the dynamically linked library specified in the path. The library
/// must do necessary initializations.
///
/// This returns the geyser plugin, the dynamic library, and the parsed config file as a &str.
/// (The geyser plugin interface requires a &str for the on_load method).
#[cfg(not(test))]
pub(crate) fn load_plugin_from_config(
geyser_plugin_config_file: &Path,
) -> Result<(Box<dyn GeyserPlugin>, Library, &str), GeyserPluginManagerError> {
use std::{fs::File, io::Read, path::PathBuf};
type PluginConstructor = unsafe fn() -> *mut dyn GeyserPlugin;
use libloading::Symbol;
let mut file = match File::open(geyser_plugin_config_file) {
Ok(file) => file,
Err(err) => {
return Err(GeyserPluginManagerError::CannotOpenConfigFile(format!(
"Failed to open the plugin config file {geyser_plugin_config_file:?}, error: {err:?}"
)));
}
};
let mut contents = String::new();
if let Err(err) = file.read_to_string(&mut contents) {
return Err(GeyserPluginManagerError::CannotReadConfigFile(format!(
"Failed to read the plugin config file {geyser_plugin_config_file:?}, error: {err:?}"
)));
}
let result: serde_json::Value = match json5::from_str(&contents) {
Ok(value) => value,
Err(err) => {
return Err(GeyserPluginManagerError::InvalidConfigFileFormat(format!(
"The config file {geyser_plugin_config_file:?} is not in a valid Json5 format, error: {err:?}"
)));
}
};
let libpath = result["libpath"]
.as_str()
.ok_or(GeyserPluginManagerError::LibPathNotSet)?;
let mut libpath = PathBuf::from(libpath);
if libpath.is_relative() {
let config_dir = geyser_plugin_config_file.parent().ok_or_else(|| {
GeyserPluginManagerError::CannotOpenConfigFile(format!(
"Failed to resolve parent of {geyser_plugin_config_file:?}",
))
})?;
libpath = config_dir.join(libpath);
}
let config_file = geyser_plugin_config_file
.as_os_str()
.to_str()
.ok_or(GeyserPluginManagerError::InvalidPluginPath)?;
let (plugin, lib) = unsafe {
let lib = Library::new(libpath)
.map_err(|e| GeyserPluginManagerError::PluginLoadError(e.to_string()))?;
let constructor: Symbol<PluginConstructor> = lib
.get(b"_create_plugin")
.map_err(|e| GeyserPluginManagerError::PluginLoadError(e.to_string()))?;
let plugin_raw = constructor();
(Box::from_raw(plugin_raw), lib)
};
Ok((plugin, lib, config_file))
}
// This is mocked for tests to avoid having to do IO with a dynamically linked library
// across different architectures at test time
//
/// This returns mocked values for the geyser plugin, the dynamic library, and the parsed config file as a &str.
/// (The geyser plugin interface requires a &str for the on_load method).
#[cfg(test)]
pub(crate) fn load_plugin_from_config(
_geyser_plugin_config_file: &Path,
) -> Result<(Box<dyn GeyserPlugin>, Library, &str), GeyserPluginManagerError> {
Ok(tests::dummy_plugin_and_library())
}
#[cfg(test)]
mod tests {
use {
crate::geyser_plugin_manager::GeyserPluginManager,
libloading::Library,
solana_geyser_plugin_interface::geyser_plugin_interface::GeyserPlugin,
std::sync::{Arc, RwLock},
};
pub(super) fn dummy_plugin_and_library() -> (Box<dyn GeyserPlugin>, Library, &'static str) {
let plugin = Box::new(TestPlugin);
let lib = {
let handle: *mut std::os::raw::c_void = &mut () as *mut _ as *mut std::os::raw::c_void;
// SAFETY: all calls to get Symbols should fail, so this is actually safe
let inner_lib = unsafe { libloading::os::unix::Library::from_raw(handle) };
Library::from(inner_lib)
};
(plugin, lib, DUMMY_CONFIG)
}
pub(super) fn dummy_plugin_and_library2() -> (Box<dyn GeyserPlugin>, Library, &'static str) {
let plugin = Box::new(TestPlugin2);
let lib = {
let handle: *mut std::os::raw::c_void = &mut () as *mut _ as *mut std::os::raw::c_void;
// SAFETY: all calls to get Symbols should fail, so this is actually safe
let inner_lib = unsafe { libloading::os::unix::Library::from_raw(handle) };
Library::from(inner_lib)
};
(plugin, lib, DUMMY_CONFIG)
}
const DUMMY_NAME: &str = "dummy";
pub(super) const DUMMY_CONFIG: &str = "dummy_config";
const ANOTHER_DUMMY_NAME: &str = "another_dummy";
#[derive(Debug)]
pub(super) struct TestPlugin;
impl GeyserPlugin for TestPlugin {
fn name(&self) -> &'static str {
DUMMY_NAME
}
}
#[derive(Debug)]
pub(super) struct TestPlugin2;
impl GeyserPlugin for TestPlugin2 {
fn name(&self) -> &'static str {
ANOTHER_DUMMY_NAME
}
}
#[test]
fn test_geyser_reload() {
// Initialize empty manager
let plugin_manager = Arc::new(RwLock::new(GeyserPluginManager::new()));
// No plugins are loaded, this should fail
let mut plugin_manager_lock = plugin_manager.write().unwrap();
let reload_result = plugin_manager_lock.reload_plugin(DUMMY_NAME, DUMMY_CONFIG);
assert_eq!(
reload_result.unwrap_err().message,
"The plugin you requested to reload is not loaded"
);
// Mock having loaded plugin (TestPlugin)
let (mut plugin, lib, config) = dummy_plugin_and_library();
plugin.on_load(config).unwrap();
plugin_manager_lock.plugins.push(plugin);
plugin_manager_lock.libs.push(lib);
// plugin_manager_lock.libs.push(lib);
assert_eq!(plugin_manager_lock.plugins[0].name(), DUMMY_NAME);
plugin_manager_lock.plugins[0].name();
// Try wrong name (same error)
const WRONG_NAME: &str = "wrong_name";
let reload_result = plugin_manager_lock.reload_plugin(WRONG_NAME, DUMMY_CONFIG);
assert_eq!(
reload_result.unwrap_err().message,
"The plugin you requested to reload is not loaded"
);
// Now try a (dummy) reload, replacing TestPlugin with TestPlugin2
let reload_result = plugin_manager_lock.reload_plugin(DUMMY_NAME, DUMMY_CONFIG);
assert!(reload_result.is_ok());
}
#[test]
fn test_plugin_list() {
// Initialize empty manager
let plugin_manager = Arc::new(RwLock::new(GeyserPluginManager::new()));
let mut plugin_manager_lock = plugin_manager.write().unwrap();
// Load two plugins
// First
let (mut plugin, lib, config) = dummy_plugin_and_library();
plugin.on_load(config).unwrap();
plugin_manager_lock.plugins.push(plugin);
plugin_manager_lock.libs.push(lib);
// Second
let (mut plugin, lib, config) = dummy_plugin_and_library2();
plugin.on_load(config).unwrap();
plugin_manager_lock.plugins.push(plugin);
plugin_manager_lock.libs.push(lib);
// Check that both plugins are returned in the list
let plugins = plugin_manager_lock.list_plugins().unwrap();
assert!(plugins.iter().any(|name| name.eq(DUMMY_NAME)));
assert!(plugins.iter().any(|name| name.eq(ANOTHER_DUMMY_NAME)));
}
#[test]
fn test_plugin_load_unload() {
// Initialize empty manager
let plugin_manager = Arc::new(RwLock::new(GeyserPluginManager::new()));
let mut plugin_manager_lock = plugin_manager.write().unwrap();
// Load rpc call
let load_result = plugin_manager_lock.load_plugin(DUMMY_CONFIG);
assert!(load_result.is_ok());
assert_eq!(plugin_manager_lock.plugins.len(), 1);
// Unload rpc call
let unload_result = plugin_manager_lock.unload_plugin(DUMMY_NAME);
assert!(unload_result.is_ok());
assert_eq!(plugin_manager_lock.plugins.len(), 0);
}
}

View File

@ -3,8 +3,10 @@ use {
accounts_update_notifier::AccountsUpdateNotifierImpl,
block_metadata_notifier::BlockMetadataNotifierImpl,
block_metadata_notifier_interface::BlockMetadataNotifierLock,
geyser_plugin_manager::GeyserPluginManager, slot_status_notifier::SlotStatusNotifierImpl,
slot_status_observer::SlotStatusObserver, transaction_notifier::TransactionNotifierImpl,
geyser_plugin_manager::{GeyserPluginManager, GeyserPluginManagerRequest},
slot_status_notifier::SlotStatusNotifierImpl,
slot_status_observer::SlotStatusObserver,
transaction_notifier::TransactionNotifierImpl,
},
crossbeam_channel::Receiver,
log::*,
@ -14,36 +16,14 @@ use {
},
solana_runtime::accounts_update_notifier_interface::AccountsUpdateNotifier,
std::{
fs::File,
io::Read,
path::{Path, PathBuf},
sync::{Arc, RwLock},
sync::{atomic::AtomicBool, Arc, RwLock},
thread,
time::Duration,
},
thiserror::Error,
};
#[derive(Error, Debug)]
pub enum GeyserPluginServiceError {
#[error("Cannot open the the plugin config file")]
CannotOpenConfigFile(String),
#[error("Cannot read the the plugin config file")]
CannotReadConfigFile(String),
#[error("The config file is not in a valid Json format")]
InvalidConfigFileFormat(String),
#[error("Plugin library path is not specified in the config file")]
LibPathNotSet,
#[error("Invalid plugin path")]
InvalidPluginPath,
#[error("Cannot load plugin shared library")]
PluginLoadError(String),
}
/// The service managing the Geyser plugin workflow.
pub struct GeyserPluginService {
slot_status_observer: Option<SlotStatusObserver>,
@ -66,10 +46,20 @@ impl GeyserPluginService {
/// shall create the implementation of `GeyserPlugin` and returns to the caller.
/// The rest of the JSON fields' definition is up to to the concrete plugin implementation
/// It is usually used to configure the connection information for the external data store.
pub fn new(
confirmed_bank_receiver: Receiver<BankNotification>,
geyser_plugin_config_files: &[PathBuf],
) -> Result<Self, GeyserPluginServiceError> {
Self::new_with_receiver(confirmed_bank_receiver, geyser_plugin_config_files, None)
}
pub fn new_with_receiver(
confirmed_bank_receiver: Receiver<BankNotification>,
geyser_plugin_config_files: &[PathBuf],
rpc_to_plugin_manager_receiver_and_exit: Option<(
Receiver<GeyserPluginManagerRequest>,
Arc<AtomicBool>,
)>,
) -> Result<Self, GeyserPluginServiceError> {
info!(
"Starting GeyserPluginService from config files: {:?}",
@ -80,10 +70,10 @@ impl GeyserPluginService {
for geyser_plugin_config_file in geyser_plugin_config_files {
Self::load_plugin(&mut plugin_manager, geyser_plugin_config_file)?;
}
let account_data_notifications_enabled =
plugin_manager.account_data_notifications_enabled();
let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled();
let plugin_manager = Arc::new(RwLock::new(plugin_manager));
let accounts_update_notifier: Option<AccountsUpdateNotifier> =
@ -122,6 +112,12 @@ impl GeyserPluginService {
(None, None)
};
// Initialize plugin manager rpc handler thread if needed
if let Some((request_receiver, exit)) = rpc_to_plugin_manager_receiver_and_exit {
let plugin_manager = plugin_manager.clone();
Self::start_manager_rpc_handler(plugin_manager, request_receiver, exit)
};
info!("Started GeyserPluginService");
Ok(GeyserPluginService {
slot_status_observer,
@ -136,56 +132,9 @@ impl GeyserPluginService {
plugin_manager: &mut GeyserPluginManager,
geyser_plugin_config_file: &Path,
) -> Result<(), GeyserPluginServiceError> {
let mut file = match File::open(geyser_plugin_config_file) {
Ok(file) => file,
Err(err) => {
return Err(GeyserPluginServiceError::CannotOpenConfigFile(format!(
"Failed to open the plugin config file {geyser_plugin_config_file:?}, error: {err:?}"
)));
}
};
let mut contents = String::new();
if let Err(err) = file.read_to_string(&mut contents) {
return Err(GeyserPluginServiceError::CannotReadConfigFile(format!(
"Failed to read the plugin config file {geyser_plugin_config_file:?}, error: {err:?}"
)));
}
let result: serde_json::Value = match json5::from_str(&contents) {
Ok(value) => value,
Err(err) => {
return Err(GeyserPluginServiceError::InvalidConfigFileFormat(format!(
"The config file {geyser_plugin_config_file:?} is not in a valid Json5 format, error: {err:?}"
)));
}
};
let libpath = result["libpath"]
.as_str()
.ok_or(GeyserPluginServiceError::LibPathNotSet)?;
let mut libpath = PathBuf::from(libpath);
if libpath.is_relative() {
let config_dir = geyser_plugin_config_file.parent().ok_or_else(|| {
GeyserPluginServiceError::CannotOpenConfigFile(format!(
"Failed to resolve parent of {geyser_plugin_config_file:?}",
))
})?;
libpath = config_dir.join(libpath);
}
let config_file = geyser_plugin_config_file
.as_os_str()
.to_str()
.ok_or(GeyserPluginServiceError::InvalidPluginPath)?;
unsafe {
let result = plugin_manager.load_plugin(libpath.to_str().unwrap(), config_file);
if let Err(err) = result {
let msg = format!("Failed to load the plugin library: {libpath:?}, error: {err:?}");
return Err(GeyserPluginServiceError::PluginLoadError(msg));
}
}
plugin_manager
.load_plugin(geyser_plugin_config_file)
.map_err(|e| GeyserPluginServiceError::FailedToLoadPlugin(e.into()))?;
Ok(())
}
@ -208,4 +157,71 @@ impl GeyserPluginService {
self.plugin_manager.write().unwrap().unload();
Ok(())
}
fn start_manager_rpc_handler(
plugin_manager: Arc<RwLock<GeyserPluginManager>>,
request_receiver: Receiver<GeyserPluginManagerRequest>,
exit: Arc<AtomicBool>,
) {
thread::Builder::new()
.name("SolGeyserPluginRpc".to_string())
.spawn(move || loop {
if let Ok(request) = request_receiver.recv_timeout(Duration::from_secs(5)) {
match request {
GeyserPluginManagerRequest::ListPlugins { response_sender } => {
let plugin_list = plugin_manager.read().unwrap().list_plugins();
response_sender
.send(plugin_list)
.expect("Admin rpc service will be waiting for response");
}
GeyserPluginManagerRequest::ReloadPlugin {
ref name,
ref config_file,
response_sender,
} => {
let reload_result = plugin_manager
.write()
.unwrap()
.reload_plugin(name, config_file);
response_sender
.send(reload_result)
.expect("Admin rpc service will be waiting for response");
}
GeyserPluginManagerRequest::LoadPlugin {
ref config_file,
response_sender,
} => {
let load_result =
plugin_manager.write().unwrap().load_plugin(config_file);
response_sender
.send(load_result)
.expect("Admin rpc service will be waiting for response");
}
GeyserPluginManagerRequest::UnloadPlugin {
ref name,
response_sender,
} => {
let unload_result = plugin_manager.write().unwrap().unload_plugin(name);
response_sender
.send(unload_result)
.expect("Admin rpc service will be waiting for response");
}
}
}
if exit.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
})
.unwrap();
}
}
#[derive(Error, Debug)]
pub enum GeyserPluginServiceError {
#[error("Failed to load a geyser plugin")]
FailedToLoadPlugin(#[from] Box<dyn std::error::Error>),
}

View File

@ -6,3 +6,5 @@ pub mod geyser_plugin_service;
pub mod slot_status_notifier;
pub mod slot_status_observer;
pub mod transaction_notifier;
pub use geyser_plugin_manager::GeyserPluginManagerRequest;

View File

@ -102,7 +102,7 @@ use {
fs::File,
io::{self, stdout, BufRead, BufReader, Write},
path::{Path, PathBuf},
process::{exit, Command, Stdio},
process::{Command, Stdio},
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering},
@ -312,7 +312,7 @@ fn output_ledger(
.slot_meta_iterator(starting_slot)
.unwrap_or_else(|err| {
eprintln!("Failed to load entries starting from slot {starting_slot}: {err:?}");
exit(1);
std::process::exit(1);
});
if method == LedgerOutputMethod::Json {
@ -929,7 +929,7 @@ fn open_blockstore(
error!("Blockstore is incompatible with current software and requires updates");
if !force_update_to_open {
error!("Use --force-update-to-open to allow blockstore to update");
exit(1);
std::process::exit(1);
}
open_blockstore_with_temporary_primary_access(
ledger_path,
@ -941,12 +941,12 @@ fn open_blockstore(
"Failed to open blockstore (with --force-update-to-open) at {:?}: {:?}",
ledger_path, err
);
exit(1);
std::process::exit(1);
})
}
Err(err) => {
eprintln!("Failed to open blockstore at {ledger_path:?}: {err:?}");
exit(1);
std::process::exit(1);
}
}
}
@ -1128,14 +1128,14 @@ fn load_bank_forks(
"Unable to load bank forks at slot {halt_slot} because it is less than the starting slot {starting_slot}. \
The starting slot will be the latest snapshot slot, or genesis if --no-snapshot flag specified or no snapshots found."
);
exit(1);
std::process::exit(1);
}
// Check if we have the slot data necessary to replay from starting_slot to >= halt_slot.
if !blockstore.slot_range_connected(starting_slot, halt_slot) {
eprintln!(
"Unable to load bank forks at slot {halt_slot} due to disconnected blocks.",
);
exit(1);
std::process::exit(1);
}
}
}
@ -1164,7 +1164,7 @@ fn load_bank_forks(
{
// Couldn't get Primary access, error out to be defensive.
eprintln!("Error: custom accounts path is not supported under secondary access");
exit(1);
std::process::exit(1);
}
}
account_paths.split(',').map(PathBuf::from).collect()
@ -1187,7 +1187,7 @@ fn load_bank_forks(
Ok((account_run_path, _account_snapshot_path)) => account_run_path,
Err(err) => {
eprintln!("Unable to create account run and snapshot sub directories: {}, err: {err:?}", account_path.display());
exit(1);
std::process::exit(1);
}
}
}).collect();
@ -1207,6 +1207,7 @@ fn load_bank_forks(
let mut accounts_update_notifier = Option::<AccountsUpdateNotifier>::default();
let mut transaction_notifier = Option::<TransactionNotifierLock>::default();
let exit = Arc::new(AtomicBool::new(false));
if arg_matches.is_present("geyser_plugin_config") {
let geyser_config_files = values_t_or_exit!(arg_matches, "geyser_plugin_config", String)
.into_iter()
@ -1215,11 +1216,12 @@ fn load_bank_forks(
let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
drop(confirmed_bank_sender);
let geyser_service =
GeyserPluginService::new(confirmed_bank_receiver, &geyser_config_files).unwrap_or_else(
|err| {
eprintln!("Failed to setup Geyser service: {err:?}");
exit(1);
std::process::exit(1);
},
);
accounts_update_notifier = geyser_service.get_accounts_update_notifier();
@ -1257,7 +1259,6 @@ fn load_bank_forks(
snapshot_request_handler,
pruned_banks_request_handler,
};
let exit = Arc::new(AtomicBool::new(false));
let accounts_background_service = AccountsBackgroundService::new(
bank_forks.clone(),
&exit,
@ -2509,7 +2510,7 @@ fn main() {
)
.unwrap_or_else(|err| {
eprintln!("Failed to write genesis config: {err:?}");
exit(1);
std::process::exit(1);
});
println!("{}", open_genesis_config_by(&output_directory, arg_matches));
@ -2555,7 +2556,7 @@ fn main() {
}
Err(err) => {
eprintln!("Failed to load ledger: {err:?}");
exit(1);
std::process::exit(1);
}
}
}
@ -2632,7 +2633,7 @@ fn main() {
}
Err(err) => {
eprintln!("Failed to load ledger: {err:?}");
exit(1);
std::process::exit(1);
}
}
}
@ -2864,7 +2865,7 @@ fn main() {
)
.unwrap_or_else(|err| {
eprintln!("Ledger verification failed: {err:?}");
exit(1);
std::process::exit(1);
});
if print_accounts_stats {
let working_bank = bank_forks.read().unwrap().working_bank();
@ -2925,7 +2926,7 @@ fn main() {
}
Err(err) => {
eprintln!("Failed to load ledger: {err:?}");
exit(1);
std::process::exit(1);
}
}
}
@ -2968,7 +2969,7 @@ fn main() {
"Error: insufficient --bootstrap-validator-stake-lamports. \
Minimum amount is {minimum_stake_lamports}"
);
exit(1);
std::process::exit(1);
}
let bootstrap_validator_pubkeys = pubkeys_of(arg_matches, "bootstrap_validator");
let accounts_to_remove =
@ -2985,7 +2986,7 @@ fn main() {
|s| {
s.parse::<SnapshotVersion>().unwrap_or_else(|e| {
eprintln!("Error: {e}");
exit(1)
std::process::exit(1)
})
},
);
@ -3032,7 +3033,7 @@ fn main() {
eprintln!(
"Error: snapshot slot {snapshot_slot} does not exist in blockstore or is not full.",
);
exit(1);
std::process::exit(1);
}
let ending_slot = if is_minimized {
@ -3041,7 +3042,7 @@ fn main() {
eprintln!(
"Error: ending_slot ({ending_slot}) must be greater than snapshot_slot ({snapshot_slot})"
);
exit(1);
std::process::exit(1);
}
Some(ending_slot)
@ -3086,7 +3087,7 @@ fn main() {
.get(snapshot_slot)
.unwrap_or_else(|| {
eprintln!("Error: Slot {snapshot_slot} is not available");
exit(1);
std::process::exit(1);
});
let child_bank_required = rent_burn_percentage.is_ok()
@ -3141,7 +3142,7 @@ fn main() {
eprintln!(
"Error: Account does not exist, unable to remove it: {address}"
);
exit(1);
std::process::exit(1);
});
account.set_lamports(0);
@ -3209,7 +3210,7 @@ fn main() {
eprintln!(
"Error: --bootstrap-validator pubkeys cannot be duplicated"
);
exit(1);
std::process::exit(1);
}
}
@ -3283,7 +3284,7 @@ fn main() {
eprintln!(
"Error: --warp-slot too close. Must be >= {minimum_warp_slot}"
);
exit(1);
std::process::exit(1);
}
} else {
warn!("Warping to slot {}", minimum_warp_slot);
@ -3333,7 +3334,7 @@ fn main() {
if is_incremental {
if starting_snapshot_hashes.is_none() {
eprintln!("Unable to create incremental snapshot without a base full snapshot");
exit(1);
std::process::exit(1);
}
let full_snapshot_slot = starting_snapshot_hashes.unwrap().full.hash.0;
if bank.slot() <= full_snapshot_slot {
@ -3342,7 +3343,7 @@ fn main() {
bank.slot(),
full_snapshot_slot,
);
exit(1);
std::process::exit(1);
}
let incremental_snapshot_archive_info =
@ -3359,7 +3360,7 @@ fn main() {
)
.unwrap_or_else(|err| {
eprintln!("Unable to create incremental snapshot: {err}");
exit(1);
std::process::exit(1);
});
println!(
@ -3383,7 +3384,7 @@ fn main() {
)
.unwrap_or_else(|err| {
eprintln!("Unable to create snapshot: {err}");
exit(1);
std::process::exit(1);
});
println!(
@ -3414,7 +3415,7 @@ fn main() {
}
Err(err) => {
eprintln!("Failed to load ledger: {err:?}");
exit(1);
std::process::exit(1);
}
}
}
@ -3444,7 +3445,7 @@ fn main() {
)
.unwrap_or_else(|err| {
eprintln!("Failed to load ledger: {err:?}");
exit(1);
std::process::exit(1);
});
let bank = bank_forks.read().unwrap().working_bank();
@ -3535,7 +3536,7 @@ fn main() {
let slot = bank_forks.working_bank().slot();
let bank = bank_forks.get(slot).unwrap_or_else(|| {
eprintln!("Error: Slot {slot} is not available");
exit(1);
std::process::exit(1);
});
if arg_matches.is_present("recalculate_capitalization") {
@ -3566,7 +3567,7 @@ fn main() {
base_bank.epoch(),
warp_epoch
);
exit(1);
std::process::exit(1);
}
if let Ok(raw_inflation) = value_t!(arg_matches, "inflation", String) {
@ -4032,7 +4033,7 @@ fn main() {
}
Err(err) => {
eprintln!("Failed to load ledger: {err:?}");
exit(1);
std::process::exit(1);
}
}
}
@ -4061,20 +4062,20 @@ fn main() {
let slots: Vec<_> = metas.map(|(slot, _)| slot).collect();
if slots.is_empty() {
eprintln!("Purge range is empty");
exit(1);
std::process::exit(1);
}
*slots.last().unwrap()
}
Err(err) => {
eprintln!("Unable to read the Ledger: {err:?}");
exit(1);
std::process::exit(1);
}
},
};
if end_slot < start_slot {
eprintln!("end slot {end_slot} is less than start slot {start_slot}");
exit(1);
std::process::exit(1);
}
info!(
"Purging data from slots {} to {} ({} slots) (skip compaction: {}) (dead slot only: {})",
@ -4223,7 +4224,7 @@ fn main() {
Either adjust `--until` value, or pass a larger `--repair-limit` \
to override the limit",
);
exit(1);
std::process::exit(1);
}
let ancestor_iterator = AncestorIterator::new(start_root, &blockstore)
.take_while(|&slot| slot >= end_root);
@ -4238,7 +4239,7 @@ fn main() {
.set_roots(roots_to_fix.iter())
.unwrap_or_else(|err| {
eprintln!("Unable to set roots {roots_to_fix:?}: {err}");
exit(1);
std::process::exit(1);
});
}
} else {
@ -4312,7 +4313,7 @@ fn main() {
}
Err(err) => {
eprintln!("Unable to read the Ledger: {err:?}");
exit(1);
std::process::exit(1);
}
};
}
@ -4364,7 +4365,7 @@ fn main() {
}
("", _) => {
eprintln!("{}", matches.usage());
exit(1);
std::process::exit(1);
}
_ => unreachable!(),
};

View File

@ -283,6 +283,7 @@ impl LocalCluster {
vec![],
&leader_config,
true, // should_check_duplicate_instance
None, // rpc_to_plugin_manager_receiver
Arc::new(RwLock::new(ValidatorStartProgress::default())),
socket_addr_space,
DEFAULT_TPU_USE_QUIC,
@ -489,6 +490,7 @@ impl LocalCluster {
vec![LegacyContactInfo::try_from(&self.entry_point_info).unwrap()],
&config,
true, // should_check_duplicate_instance
None, // rpc_to_plugin_manager_receiver
Arc::new(RwLock::new(ValidatorStartProgress::default())),
socket_addr_space,
DEFAULT_TPU_USE_QUIC,
@ -862,6 +864,7 @@ impl Cluster for LocalCluster {
.unwrap_or_default(),
&safe_clone_config(&cluster_validator_info.config),
true, // should_check_duplicate_instance
None, // rpc_to_plugin_manager_receiver
Arc::new(RwLock::new(ValidatorStartProgress::default())),
socket_addr_space,
DEFAULT_TPU_USE_QUIC,

View File

@ -14,7 +14,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
account_paths: config.account_paths.clone(),
account_shrink_paths: config.account_shrink_paths.clone(),
rpc_config: config.rpc_config.clone(),
geyser_plugin_config_files: config.geyser_plugin_config_files.clone(),
on_start_geyser_plugin_config_files: config.on_start_geyser_plugin_config_files.clone(),
rpc_addrs: config.rpc_addrs,
pubsub_config: config.pubsub_config.clone(),
snapshot_config: config.snapshot_config.clone(),

View File

@ -4834,6 +4834,8 @@ dependencies = [
"bs58",
"crossbeam-channel",
"json5",
"jsonrpc-core",
"jsonrpc-server-utils",
"libloading",
"log",
"serde_json",
@ -6103,12 +6105,14 @@ version = "1.16.0"
dependencies = [
"base64 0.13.0",
"bincode",
"crossbeam-channel",
"log",
"serde_derive",
"serde_json",
"solana-cli-output",
"solana-client",
"solana-core",
"solana-geyser-plugin-manager",
"solana-gossip",
"solana-ledger",
"solana-logger 1.16.0",
@ -6219,6 +6223,7 @@ dependencies = [
"jsonrpc-server-utils",
"lazy_static",
"libc",
"libloading",
"log",
"num_cpus",
"rand 0.7.3",
@ -6234,6 +6239,8 @@ dependencies = [
"solana-entry",
"solana-faucet",
"solana-genesis-utils",
"solana-geyser-plugin-interface",
"solana-geyser-plugin-manager",
"solana-gossip",
"solana-ledger",
"solana-logger 1.16.0",

View File

@ -12,12 +12,14 @@ edition = { workspace = true }
[dependencies]
base64 = { workspace = true }
bincode = { workspace = true }
crossbeam-channel = { workspace = true }
log = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
solana-cli-output = { workspace = true }
solana-client = { workspace = true }
solana-core = { workspace = true }
solana-geyser-plugin-manager = { workspace = true }
solana-gossip = { workspace = true }
solana-ledger = { workspace = true }
solana-logger = { workspace = true }

View File

@ -1,6 +1,6 @@
#![allow(clippy::integer_arithmetic)]
use {
crossbeam_channel::Receiver,
log::*,
solana_cli_output::CliAccount,
solana_client::rpc_request::MAX_MULTIPLE_ACCOUNTS,
@ -9,6 +9,9 @@ use {
tower_storage::TowerStorage,
validator::{Validator, ValidatorConfig, ValidatorStartProgress},
},
solana_geyser_plugin_manager::{
geyser_plugin_manager::GeyserPluginManager, GeyserPluginManagerRequest,
},
solana_gossip::{
cluster_info::{ClusterInfo, Node},
gossip_service::discover_cluster,
@ -138,6 +141,7 @@ pub struct TestValidatorGenesis {
pub log_messages_bytes_limit: Option<usize>,
pub transaction_account_lock_limit: Option<usize>,
pub tpu_enable_udp: bool,
pub geyser_plugin_manager: Arc<RwLock<GeyserPluginManager>>,
admin_rpc_service_post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
}
@ -172,6 +176,7 @@ impl Default for TestValidatorGenesis {
log_messages_bytes_limit: Option::<usize>::default(),
transaction_account_lock_limit: Option::<usize>::default(),
tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP,
geyser_plugin_manager: Arc::new(RwLock::new(GeyserPluginManager::new())),
admin_rpc_service_post_init:
Arc::<RwLock<Option<AdminRpcRequestMetadataPostInit>>>::default(),
}
@ -530,7 +535,26 @@ impl TestValidatorGenesis {
mint_address: Pubkey,
socket_addr_space: SocketAddrSpace,
) -> Result<TestValidator, Box<dyn std::error::Error>> {
TestValidator::start(mint_address, self, socket_addr_space).map(|test_validator| {
self.start_with_mint_address_and_geyser_plugin_rpc(mint_address, socket_addr_space, None)
}
/// Start a test validator with the address of the mint account that will receive tokens
/// created at genesis. Augments admin rpc service with dynamic geyser plugin manager if
/// the geyser plugin service is enabled at startup.
///
pub fn start_with_mint_address_and_geyser_plugin_rpc(
&self,
mint_address: Pubkey,
socket_addr_space: SocketAddrSpace,
rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
) -> Result<TestValidator, Box<dyn std::error::Error>> {
TestValidator::start(
mint_address,
self,
socket_addr_space,
rpc_to_plugin_manager_receiver,
)
.map(|test_validator| {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
@ -579,7 +603,7 @@ impl TestValidatorGenesis {
socket_addr_space: SocketAddrSpace,
) -> (TestValidator, Keypair) {
let mint_keypair = Keypair::new();
match TestValidator::start(mint_keypair.pubkey(), self, socket_addr_space) {
match TestValidator::start(mint_keypair.pubkey(), self, socket_addr_space, None) {
Ok(test_validator) => {
test_validator.wait_for_nonzero_fees().await;
(test_validator, mint_keypair)
@ -835,6 +859,7 @@ impl TestValidator {
mint_address: Pubkey,
config: &TestValidatorGenesis,
socket_addr_space: SocketAddrSpace,
rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
) -> Result<Self, Box<dyn std::error::Error>> {
let preserve_ledger = config.ledger_path.is_some();
let ledger_path = TestValidator::initialize_ledger(mint_address, config)?;
@ -897,7 +922,7 @@ impl TestValidator {
};
let mut validator_config = ValidatorConfig {
geyser_plugin_config_files: config.geyser_plugin_config_files.clone(),
on_start_geyser_plugin_config_files: config.geyser_plugin_config_files.clone(),
rpc_addrs: Some((
SocketAddr::new(
IpAddr::V4(Ipv4Addr::UNSPECIFIED),
@ -949,6 +974,7 @@ impl TestValidator {
vec![],
&validator_config,
true, // should_check_duplicate_instance
rpc_to_plugin_manager_receiver,
config.start_progress.clone(),
socket_addr_space,
DEFAULT_TPU_USE_QUIC,

View File

@ -25,6 +25,7 @@ jsonrpc-derive = { workspace = true }
jsonrpc-ipc-server = { workspace = true }
jsonrpc-server-utils = { workspace = true }
lazy_static = { workspace = true }
libloading = { workspace = true }
log = { workspace = true }
num_cpus = { workspace = true }
rand = { workspace = true }
@ -39,6 +40,8 @@ solana-download-utils = { workspace = true }
solana-entry = { workspace = true }
solana-faucet = { workspace = true }
solana-genesis-utils = { workspace = true }
solana-geyser-plugin-interface = { workspace = true }
solana-geyser-plugin-manager = { workspace = true }
solana-gossip = { workspace = true }
solana-ledger = { workspace = true }
solana-logger = { workspace = true }

View File

@ -1,8 +1,11 @@
use {
jsonrpc_core::{MetaIoHandler, Metadata, Result},
crossbeam_channel::Sender,
jsonrpc_core::{BoxFuture, ErrorCode, MetaIoHandler, Metadata, Result},
jsonrpc_core_client::{transports::ipc, RpcError},
jsonrpc_derive::rpc,
jsonrpc_ipc_server::{RequestContext, ServerBuilder},
jsonrpc_ipc_server::{
tokio::sync::oneshot::channel as oneshot_channel, RequestContext, ServerBuilder,
},
jsonrpc_server_utils::tokio,
log::*,
serde::{de::Deserializer, Deserialize, Serialize},
@ -10,6 +13,7 @@ use {
admin_rpc_post_init::AdminRpcRequestMetadataPostInit, consensus::Tower,
tower_storage::TowerStorage, validator::ValidatorStartProgress,
},
solana_geyser_plugin_manager::GeyserPluginManagerRequest,
solana_gossip::contact_info::ContactInfo,
solana_rpc::rpc::verify_pubkey,
solana_rpc_client_api::{config::RpcAccountIndex, custom_error::RpcCustomError},
@ -41,7 +45,9 @@ pub struct AdminRpcRequestMetadata {
pub tower_storage: Arc<dyn TowerStorage>,
pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
pub post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
pub rpc_to_plugin_manager_sender: Option<Sender<GeyserPluginManagerRequest>>,
}
impl Metadata for AdminRpcRequestMetadata {}
impl AdminRpcRequestMetadata {
@ -139,6 +145,23 @@ pub trait AdminRpc {
#[rpc(meta, name = "exit")]
fn exit(&self, meta: Self::Metadata) -> Result<()>;
#[rpc(meta, name = "reloadPlugin")]
fn reload_plugin(
&self,
meta: Self::Metadata,
name: String,
config_file: String,
) -> BoxFuture<Result<()>>;
#[rpc(meta, name = "unloadPlugin")]
fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>>;
#[rpc(meta, name = "loadPlugin")]
fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>>;
#[rpc(meta, name = "listPlugins")]
fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>>;
#[rpc(meta, name = "rpcAddress")]
fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>>;
@ -234,6 +257,121 @@ impl AdminRpc for AdminRpcImpl {
Ok(())
}
fn reload_plugin(
&self,
meta: Self::Metadata,
name: String,
config_file: String,
) -> BoxFuture<Result<()>> {
Box::pin(async move {
// Construct channel for plugin to respond to this particular rpc request instance
let (response_sender, response_receiver) = oneshot_channel();
// Send request to plugin manager if there is a geyser service
if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
rpc_to_manager_sender
.send(GeyserPluginManagerRequest::ReloadPlugin {
name,
config_file,
response_sender,
})
.expect("GeyerPluginService should never drop request receiver");
} else {
return Err(jsonrpc_core::Error {
code: ErrorCode::InvalidRequest,
message: "No geyser plugin service".to_string(),
data: None,
});
}
// Await response from plugin manager
response_receiver
.await
.expect("GeyerPluginService's oneshot sender shouldn't drop early")
})
}
fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>> {
Box::pin(async move {
// Construct channel for plugin to respond to this particular rpc request instance
let (response_sender, response_receiver) = oneshot_channel();
// Send request to plugin manager if there is a geyser service
if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
rpc_to_manager_sender
.send(GeyserPluginManagerRequest::LoadPlugin {
config_file,
response_sender,
})
.expect("GeyerPluginService should never drop request receiver");
} else {
return Err(jsonrpc_core::Error {
code: ErrorCode::InvalidRequest,
message: "No geyser plugin service".to_string(),
data: None,
});
}
// Await response from plugin manager
response_receiver
.await
.expect("GeyerPluginService's oneshot sender shouldn't drop early")
})
}
fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>> {
Box::pin(async move {
// Construct channel for plugin to respond to this particular rpc request instance
let (response_sender, response_receiver) = oneshot_channel();
// Send request to plugin manager if there is a geyser service
if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
rpc_to_manager_sender
.send(GeyserPluginManagerRequest::UnloadPlugin {
name,
response_sender,
})
.expect("GeyerPluginService should never drop request receiver");
} else {
return Err(jsonrpc_core::Error {
code: ErrorCode::InvalidRequest,
message: "No geyser plugin service".to_string(),
data: None,
});
}
// Await response from plugin manager
response_receiver
.await
.expect("GeyerPluginService's oneshot sender shouldn't drop early")
})
}
fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>> {
Box::pin(async move {
// Construct channel for plugin to respond to this particular rpc request instance
let (response_sender, response_receiver) = oneshot_channel();
// Send request to plugin manager
if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
rpc_to_manager_sender
.send(GeyserPluginManagerRequest::ListPlugins { response_sender })
.expect("GeyerPluginService should never drop request receiver");
} else {
return Err(jsonrpc_core::Error {
code: ErrorCode::InvalidRequest,
message: "No geyser plugin service".to_string(),
data: None,
});
}
// Await response from plugin manager
response_receiver
.await
.expect("GeyerPluginService's oneshot sender shouldn't drop early")
})
}
fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>> {
debug!("rpc_addr admin rpc request received");
Ok(meta.rpc_addr)
@ -544,6 +682,7 @@ pub fn run(ledger_path: &Path, metadata: AdminRpcRequestMetadata) {
warn!("Unable to start admin rpc service: {:?}", err);
}
Ok(server) => {
info!("started admin rpc service!");
let close_handle = server.close_handle();
validator_exit
.write()
@ -708,6 +847,7 @@ mod tests {
repair_whitelist,
}))),
staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
rpc_to_plugin_manager_sender: None,
};
let mut io = MetaIoHandler::default();
io.extend_with(AdminRpcImpl.to_delegate());

View File

@ -381,6 +381,14 @@ fn main() {
let tower_storage = Arc::new(FileTowerStorage::new(ledger_path.clone()));
let admin_service_post_init = Arc::new(RwLock::new(None));
// If geyser_plugin_config value is invalid, the validator will exit when the values are extracted below
let (rpc_to_plugin_manager_sender, rpc_to_plugin_manager_receiver) =
if matches.is_present("geyser_plugin_config") {
let (sender, receiver) = unbounded();
(Some(sender), Some(receiver))
} else {
(None, None)
};
admin_rpc_service::run(
&ledger_path,
admin_rpc_service::AdminRpcRequestMetadata {
@ -392,6 +400,7 @@ fn main() {
staked_nodes_overrides: genesis.staked_nodes_overrides.clone(),
post_init: admin_service_post_init,
tower_storage: tower_storage.clone(),
rpc_to_plugin_manager_sender,
},
);
let dashboard = if output == Output::Dashboard {
@ -542,7 +551,11 @@ fn main() {
genesis.compute_unit_limit(compute_unit_limit);
}
match genesis.start_with_mint_address(mint_address, socket_addr_space) {
match genesis.start_with_mint_address_and_geyser_plugin_rpc(
mint_address,
socket_addr_space,
rpc_to_plugin_manager_receiver,
) {
Ok(test_validator) => {
if let Some(dashboard) = dashboard {
dashboard.run(Duration::from_millis(250));

View File

@ -1465,6 +1465,48 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
SubCommand::with_name("run")
.about("Run the validator")
)
.subcommand(
SubCommand::with_name("plugin")
.about("Manage and view geyser plugins")
.setting(AppSettings::SubcommandRequiredElseHelp)
.setting(AppSettings::InferSubcommands)
.subcommand(
SubCommand::with_name("list")
.about("List all current running gesyer plugins")
)
.subcommand(
SubCommand::with_name("unload")
.about("Unload a particular gesyer plugin. You must specify the gesyer plugin name")
.arg(
Arg::with_name("name")
.required(true)
.takes_value(true)
)
)
.subcommand(
SubCommand::with_name("reload")
.about("Reload a particular gesyer plugin. You must specify the gesyer plugin name and the new config path")
.arg(
Arg::with_name("name")
.required(true)
.takes_value(true)
)
.arg(
Arg::with_name("config")
.required(true)
.takes_value(true)
)
)
.subcommand(
SubCommand::with_name("load")
.about("Load a new gesyer plugin. You must specify the config path. Fails if overwriting (use reload)")
.arg(
Arg::with_name("config")
.required(true)
.takes_value(true)
)
)
)
.subcommand(
SubCommand::with_name("set-identity")
.about("Set the validator identity")

View File

@ -4,6 +4,7 @@ use jemallocator::Jemalloc;
use {
clap::{crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit, ArgMatches},
console::style,
crossbeam_channel::unbounded,
log::*,
rand::{seq::SliceRandom, thread_rng},
solana_clap_utils::input_parsers::{keypair_of, keypairs_of, pubkey_of, value_of},
@ -536,6 +537,79 @@ pub fn main() {
_ => unreachable!(),
}
}
("plugin", Some(plugin_subcommand_matches)) => {
match plugin_subcommand_matches.subcommand() {
("list", _) => {
let admin_client = admin_rpc_service::connect(&ledger_path);
let plugins = admin_rpc_service::runtime()
.block_on(async move { admin_client.await?.list_plugins().await })
.unwrap_or_else(|err| {
println!("Failed to list plugins: {err}");
exit(1);
});
if !plugins.is_empty() {
println!("Currently the following plugins are loaded:");
for (plugin, i) in plugins.into_iter().zip(1..) {
println!(" {i}) {plugin}");
}
} else {
println!("There are currently no plugins loaded");
}
return;
}
("unload", Some(subcommand_matches)) => {
if let Some(name) = value_t!(subcommand_matches, "name", String).ok() {
let admin_client = admin_rpc_service::connect(&ledger_path);
admin_rpc_service::runtime()
.block_on(async {
admin_client.await?.unload_plugin(name.clone()).await
})
.unwrap_or_else(|err| {
println!("Failed to unload plugin {name}: {err:?}");
exit(1);
});
println!("Successfully unloaded plugin: {name}");
}
return;
}
("load", Some(subcommand_matches)) => {
if let Some(config) = value_t!(subcommand_matches, "config", String).ok() {
let admin_client = admin_rpc_service::connect(&ledger_path);
let name = admin_rpc_service::runtime()
.block_on(async {
admin_client.await?.load_plugin(config.clone()).await
})
.unwrap_or_else(|err| {
println!("Failed to load plugin {config}: {err:?}");
exit(1);
});
println!("Successfully loaded plugin: {name}");
}
return;
}
("reload", Some(subcommand_matches)) => {
if let Some(name) = value_t!(subcommand_matches, "name", String).ok() {
if let Some(config) = value_t!(subcommand_matches, "config", String).ok() {
let admin_client = admin_rpc_service::connect(&ledger_path);
admin_rpc_service::runtime()
.block_on(async {
admin_client
.await?
.reload_plugin(name.clone(), config.clone())
.await
})
.unwrap_or_else(|err| {
println!("Failed to reload plugin {name}: {err:?}");
exit(1);
});
println!("Successfully reloaded plugin: {name}");
}
}
return;
}
_ => unreachable!(),
}
}
("contact-info", Some(subcommand_matches)) => {
let output_mode = subcommand_matches.value_of("output");
let admin_client = admin_rpc_service::connect(&ledger_path);
@ -1058,7 +1132,7 @@ pub fn main() {
let accounts_db_config = Some(accounts_db_config);
let geyser_plugin_config_files = if matches.is_present("geyser_plugin_config") {
let on_start_geyser_plugin_config_files = if matches.is_present("geyser_plugin_config") {
Some(
values_t_or_exit!(matches, "geyser_plugin_config", String)
.into_iter()
@ -1068,6 +1142,7 @@ pub fn main() {
} else {
None
};
let starting_with_geyser_plugins: bool = on_start_geyser_plugin_config_files.is_some();
let rpc_bigtable_config = if matches.is_present("enable_rpc_bigtable_ledger_storage")
|| matches.is_present("enable_bigtable_ledger_upload")
@ -1155,7 +1230,7 @@ pub fn main() {
usize
)),
},
geyser_plugin_config_files,
on_start_geyser_plugin_config_files,
rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| {
(
SocketAddr::new(rpc_bind_address, rpc_port),
@ -1513,6 +1588,13 @@ pub fn main() {
let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
let admin_service_post_init = Arc::new(RwLock::new(None));
let (rpc_to_plugin_manager_sender, rpc_to_plugin_manager_receiver) =
if starting_with_geyser_plugins {
let (sender, receiver) = unbounded();
(Some(sender), Some(receiver))
} else {
(None, None)
};
admin_rpc_service::run(
&ledger_path,
admin_rpc_service::AdminRpcRequestMetadata {
@ -1524,6 +1606,7 @@ pub fn main() {
post_init: admin_service_post_init.clone(),
tower_storage: validator_config.tower_storage.clone(),
staked_nodes_overrides,
rpc_to_plugin_manager_sender,
},
);
@ -1682,6 +1765,7 @@ pub fn main() {
cluster_entrypoints,
&validator_config,
should_check_duplicate_instance,
rpc_to_plugin_manager_receiver,
start_progress,
socket_addr_space,
tpu_use_quic,