Geyser Runtime Reload (#30352)
Support dynamic geyser plugin load, unload, and listing through admin RPC.
This commit is contained in:
parent
9d792c1848
commit
10f49d4e26
|
@ -5634,6 +5634,8 @@ dependencies = [
|
|||
"bs58",
|
||||
"crossbeam-channel",
|
||||
"json5",
|
||||
"jsonrpc-core",
|
||||
"jsonrpc-server-utils",
|
||||
"libloading",
|
||||
"log",
|
||||
"serde_json",
|
||||
|
@ -6831,12 +6833,14 @@ version = "1.16.0"
|
|||
dependencies = [
|
||||
"base64 0.13.0",
|
||||
"bincode",
|
||||
"crossbeam-channel",
|
||||
"log",
|
||||
"serde_derive",
|
||||
"serde_json",
|
||||
"solana-cli-output",
|
||||
"solana-client",
|
||||
"solana-core",
|
||||
"solana-geyser-plugin-manager",
|
||||
"solana-gossip",
|
||||
"solana-ledger",
|
||||
"solana-logger 1.16.0",
|
||||
|
@ -7017,6 +7021,7 @@ dependencies = [
|
|||
"jsonrpc-server-utils",
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"libloading",
|
||||
"log",
|
||||
"num_cpus",
|
||||
"rand 0.7.3",
|
||||
|
@ -7033,6 +7038,8 @@ dependencies = [
|
|||
"solana-entry",
|
||||
"solana-faucet",
|
||||
"solana-genesis-utils",
|
||||
"solana-geyser-plugin-interface",
|
||||
"solana-geyser-plugin-manager",
|
||||
"solana-gossip",
|
||||
"solana-ledger",
|
||||
"solana-logger 1.16.0",
|
||||
|
|
|
@ -31,7 +31,9 @@ use {
|
|||
rand::{thread_rng, Rng},
|
||||
solana_client::connection_cache::ConnectionCache,
|
||||
solana_entry::poh::compute_hash_time_ns,
|
||||
solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService,
|
||||
solana_geyser_plugin_manager::{
|
||||
geyser_plugin_service::GeyserPluginService, GeyserPluginManagerRequest,
|
||||
},
|
||||
solana_gossip::{
|
||||
cluster_info::{
|
||||
ClusterInfo, Node, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
|
||||
|
@ -128,7 +130,8 @@ pub struct ValidatorConfig {
|
|||
pub account_paths: Vec<PathBuf>,
|
||||
pub account_shrink_paths: Option<Vec<PathBuf>>,
|
||||
pub rpc_config: JsonRpcConfig,
|
||||
pub geyser_plugin_config_files: Option<Vec<PathBuf>>,
|
||||
/// Specifies which plugins to start up with
|
||||
pub on_start_geyser_plugin_config_files: Option<Vec<PathBuf>>,
|
||||
pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub)
|
||||
pub pubsub_config: PubSubConfig,
|
||||
pub snapshot_config: SnapshotConfig,
|
||||
|
@ -192,7 +195,7 @@ impl Default for ValidatorConfig {
|
|||
account_paths: Vec::new(),
|
||||
account_shrink_paths: None,
|
||||
rpc_config: JsonRpcConfig::default(),
|
||||
geyser_plugin_config_files: None,
|
||||
on_start_geyser_plugin_config_files: None,
|
||||
rpc_addrs: None,
|
||||
pubsub_config: PubSubConfig::default(),
|
||||
snapshot_config: SnapshotConfig::new_load_only(),
|
||||
|
@ -392,6 +395,7 @@ impl Validator {
|
|||
cluster_entrypoints: Vec<ContactInfo>,
|
||||
config: &ValidatorConfig,
|
||||
should_check_duplicate_instance: bool,
|
||||
rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
|
||||
start_progress: Arc<RwLock<ValidatorStartProgress>>,
|
||||
socket_addr_space: SocketAddrSpace,
|
||||
use_quic: bool,
|
||||
|
@ -413,12 +417,19 @@ impl Validator {
|
|||
|
||||
let mut bank_notification_senders = Vec::new();
|
||||
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let geyser_plugin_service =
|
||||
if let Some(geyser_plugin_config_files) = &config.geyser_plugin_config_files {
|
||||
if let Some(geyser_plugin_config_files) = &config.on_start_geyser_plugin_config_files {
|
||||
let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
|
||||
bank_notification_senders.push(confirmed_bank_sender);
|
||||
let result =
|
||||
GeyserPluginService::new(confirmed_bank_receiver, geyser_plugin_config_files);
|
||||
let rpc_to_plugin_manager_receiver_and_exit =
|
||||
rpc_to_plugin_manager_receiver.map(|receiver| (receiver, exit.clone()));
|
||||
let result = GeyserPluginService::new_with_receiver(
|
||||
confirmed_bank_receiver,
|
||||
geyser_plugin_config_files,
|
||||
rpc_to_plugin_manager_receiver_and_exit,
|
||||
);
|
||||
match result {
|
||||
Ok(geyser_plugin_service) => Some(geyser_plugin_service),
|
||||
Err(err) => {
|
||||
|
@ -483,7 +494,6 @@ impl Validator {
|
|||
start.stop();
|
||||
info!("done. {}", start);
|
||||
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
{
|
||||
let exit = exit.clone();
|
||||
config
|
||||
|
@ -2176,6 +2186,7 @@ mod tests {
|
|||
vec![LegacyContactInfo::try_from(&leader_node.info).unwrap()],
|
||||
&config,
|
||||
true, // should_check_duplicate_instance
|
||||
None, // rpc_to_plugin_manager_receiver
|
||||
start_progress.clone(),
|
||||
SocketAddrSpace::Unspecified,
|
||||
DEFAULT_TPU_USE_QUIC,
|
||||
|
@ -2273,7 +2284,8 @@ mod tests {
|
|||
Arc::new(RwLock::new(vec![Arc::new(vote_account_keypair)])),
|
||||
vec![LegacyContactInfo::try_from(&leader_node.info).unwrap()],
|
||||
&config,
|
||||
true, // should_check_duplicate_instance
|
||||
true, // should_check_duplicate_instance.
|
||||
None, // rpc_to_plugin_manager_receiver
|
||||
Arc::new(RwLock::new(ValidatorStartProgress::default())),
|
||||
SocketAddrSpace::Unspecified,
|
||||
DEFAULT_TPU_USE_QUIC,
|
||||
|
|
|
@ -10,9 +10,12 @@ license = { workspace = true }
|
|||
edition = { workspace = true }
|
||||
|
||||
[dependencies]
|
||||
|
||||
bs58 = { workspace = true }
|
||||
crossbeam-channel = { workspace = true }
|
||||
json5 = { workspace = true }
|
||||
jsonrpc-core = { workspace = true }
|
||||
jsonrpc-server-utils = { workspace = true }
|
||||
libloading = { workspace = true }
|
||||
log = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
/// Managing the Geyser plugins
|
||||
use {
|
||||
libloading::{Library, Symbol},
|
||||
jsonrpc_core::{ErrorCode, Result as JsonRpcResult},
|
||||
jsonrpc_server_utils::tokio::sync::oneshot::Sender as OneShotSender,
|
||||
libloading::Library,
|
||||
log::*,
|
||||
solana_geyser_plugin_interface::geyser_plugin_interface::GeyserPlugin,
|
||||
std::error::Error,
|
||||
std::path::Path,
|
||||
};
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
|
@ -20,26 +21,6 @@ impl GeyserPluginManager {
|
|||
}
|
||||
}
|
||||
|
||||
/// # Safety
|
||||
///
|
||||
/// This function loads the dynamically linked library specified in the path. The library
|
||||
/// must do necessary initializations.
|
||||
pub unsafe fn load_plugin(
|
||||
&mut self,
|
||||
libpath: &str,
|
||||
config_file: &str,
|
||||
) -> Result<(), Box<dyn Error>> {
|
||||
type PluginConstructor = unsafe fn() -> *mut dyn GeyserPlugin;
|
||||
let lib = Library::new(libpath)?;
|
||||
let constructor: Symbol<PluginConstructor> = lib.get(b"_create_plugin")?;
|
||||
let plugin_raw = constructor();
|
||||
let mut plugin = Box::from_raw(plugin_raw);
|
||||
plugin.on_load(config_file)?;
|
||||
self.plugins.push(plugin);
|
||||
self.libs.push(lib);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Unload all plugins and loaded plugin libraries, making sure to fire
|
||||
/// their `on_plugin_unload()` methods so they can do any necessary cleanup.
|
||||
pub fn unload(&mut self) {
|
||||
|
@ -72,4 +53,401 @@ impl GeyserPluginManager {
|
|||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Admin RPC request handler
|
||||
pub(crate) fn list_plugins(&self) -> JsonRpcResult<Vec<String>> {
|
||||
Ok(self.plugins.iter().map(|p| p.name().to_owned()).collect())
|
||||
}
|
||||
|
||||
/// Admin RPC request handler
|
||||
/// # Safety
|
||||
///
|
||||
/// This function loads the dynamically linked library specified in the path. The library
|
||||
/// must do necessary initializations.
|
||||
///
|
||||
/// The string returned is the name of the plugin loaded, which can only be accessed once
|
||||
/// the plugin has been loaded and calling the name method.
|
||||
pub(crate) fn load_plugin(
|
||||
&mut self,
|
||||
geyser_plugin_config_file: impl AsRef<Path>,
|
||||
) -> JsonRpcResult<String> {
|
||||
// First load plugin
|
||||
let (mut new_plugin, new_lib, new_config_file) =
|
||||
load_plugin_from_config(geyser_plugin_config_file.as_ref()).map_err(|e| {
|
||||
jsonrpc_core::Error {
|
||||
code: ErrorCode::InvalidRequest,
|
||||
message: format!("Failed to load plugin: {e}"),
|
||||
data: None,
|
||||
}
|
||||
})?;
|
||||
|
||||
// Then see if a plugin with this name already exists. If so, abort
|
||||
if self
|
||||
.plugins
|
||||
.iter()
|
||||
.any(|plugin| plugin.name().eq(new_plugin.name()))
|
||||
{
|
||||
return Err(jsonrpc_core::Error {
|
||||
code: ErrorCode::InvalidRequest,
|
||||
message: format!(
|
||||
"There already exists a plugin named {} loaded. Did not load requested plugin",
|
||||
new_plugin.name()
|
||||
),
|
||||
data: None,
|
||||
});
|
||||
}
|
||||
|
||||
// Call on_load and push plugin
|
||||
new_plugin
|
||||
.on_load(new_config_file)
|
||||
.map_err(|on_load_err| jsonrpc_core::Error {
|
||||
code: ErrorCode::InvalidRequest,
|
||||
message: format!(
|
||||
"on_load method of plugin {} failed: {on_load_err}",
|
||||
new_plugin.name()
|
||||
),
|
||||
data: None,
|
||||
})?;
|
||||
let name = new_plugin.name().to_string();
|
||||
self.plugins.push(new_plugin);
|
||||
self.libs.push(new_lib);
|
||||
|
||||
Ok(name)
|
||||
}
|
||||
|
||||
pub(crate) fn unload_plugin(&mut self, name: &str) -> JsonRpcResult<()> {
|
||||
// Check if any plugin names match this one
|
||||
let Some(idx) = self.plugins.iter().position(|plugin| plugin.name().eq(name)) else {
|
||||
// If we don't find one return an error
|
||||
return Err(
|
||||
jsonrpc_core::error::Error {
|
||||
code: ErrorCode::InvalidRequest,
|
||||
message: String::from("The plugin you requested to unload is not loaded"),
|
||||
data: None,
|
||||
}
|
||||
)
|
||||
};
|
||||
|
||||
// Unload and drop plugin and lib
|
||||
self._drop_plugin(idx);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Checks for a plugin with a given `name`.
|
||||
/// If it exists, first unload it.
|
||||
/// Then, attempt to load a new plugin
|
||||
pub(crate) fn reload_plugin(&mut self, name: &str, config_file: &str) -> JsonRpcResult<()> {
|
||||
// Check if any plugin names match this one
|
||||
let Some(idx) = self.plugins.iter().position(|plugin| plugin.name().eq(name)) else {
|
||||
// If we don't find one return an error
|
||||
return Err(
|
||||
jsonrpc_core::error::Error {
|
||||
code: ErrorCode::InvalidRequest,
|
||||
message: String::from("The plugin you requested to reload is not loaded"),
|
||||
data: None,
|
||||
}
|
||||
)
|
||||
};
|
||||
|
||||
// Unload and drop current plugin first in case plugin requires exclusive access to resource,
|
||||
// such as a particular port or database.
|
||||
self._drop_plugin(idx);
|
||||
|
||||
// Try to load plugin, library
|
||||
// SAFETY: It is up to the validator to ensure this is a valid plugin library.
|
||||
let (mut new_plugin, new_lib, new_parsed_config_file) =
|
||||
load_plugin_from_config(config_file.as_ref()).map_err(|err| jsonrpc_core::Error {
|
||||
code: ErrorCode::InvalidRequest,
|
||||
message: err.to_string(),
|
||||
data: None,
|
||||
})?;
|
||||
|
||||
// Attempt to on_load with new plugin
|
||||
match new_plugin.on_load(new_parsed_config_file) {
|
||||
// On success, push plugin and library
|
||||
Ok(()) => {
|
||||
self.plugins.push(new_plugin);
|
||||
self.libs.push(new_lib);
|
||||
}
|
||||
|
||||
// On failure, return error
|
||||
Err(err) => {
|
||||
return Err(jsonrpc_core::error::Error {
|
||||
code: ErrorCode::InvalidRequest,
|
||||
message: format!(
|
||||
"Failed to start new plugin (previous plugin was dropped!): {err}"
|
||||
),
|
||||
data: None,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn _drop_plugin(&mut self, idx: usize) {
|
||||
let mut current_plugin = self.plugins.remove(idx);
|
||||
let _current_lib = self.libs.remove(idx);
|
||||
current_plugin.on_unload();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum GeyserPluginManagerRequest {
|
||||
ReloadPlugin {
|
||||
name: String,
|
||||
config_file: String,
|
||||
response_sender: OneShotSender<JsonRpcResult<()>>,
|
||||
},
|
||||
UnloadPlugin {
|
||||
name: String,
|
||||
response_sender: OneShotSender<JsonRpcResult<()>>,
|
||||
},
|
||||
LoadPlugin {
|
||||
config_file: String,
|
||||
response_sender: OneShotSender<JsonRpcResult<String>>,
|
||||
},
|
||||
ListPlugins {
|
||||
response_sender: OneShotSender<JsonRpcResult<Vec<String>>>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum GeyserPluginManagerError {
|
||||
#[error("Cannot open the the plugin config file")]
|
||||
CannotOpenConfigFile(String),
|
||||
|
||||
#[error("Cannot read the the plugin config file")]
|
||||
CannotReadConfigFile(String),
|
||||
|
||||
#[error("The config file is not in a valid Json format")]
|
||||
InvalidConfigFileFormat(String),
|
||||
|
||||
#[error("Plugin library path is not specified in the config file")]
|
||||
LibPathNotSet,
|
||||
|
||||
#[error("Invalid plugin path")]
|
||||
InvalidPluginPath,
|
||||
|
||||
#[error("Cannot load plugin shared library")]
|
||||
PluginLoadError(String),
|
||||
|
||||
#[error("The geyser plugin {0} is already loaded shared library")]
|
||||
PluginAlreadyLoaded(String),
|
||||
|
||||
#[error("The GeyserPlugin on_load method failed")]
|
||||
PluginStartError(String),
|
||||
}
|
||||
|
||||
/// # Safety
|
||||
///
|
||||
/// This function loads the dynamically linked library specified in the path. The library
|
||||
/// must do necessary initializations.
|
||||
///
|
||||
/// This returns the geyser plugin, the dynamic library, and the parsed config file as a &str.
|
||||
/// (The geyser plugin interface requires a &str for the on_load method).
|
||||
#[cfg(not(test))]
|
||||
pub(crate) fn load_plugin_from_config(
|
||||
geyser_plugin_config_file: &Path,
|
||||
) -> Result<(Box<dyn GeyserPlugin>, Library, &str), GeyserPluginManagerError> {
|
||||
use std::{fs::File, io::Read, path::PathBuf};
|
||||
type PluginConstructor = unsafe fn() -> *mut dyn GeyserPlugin;
|
||||
use libloading::Symbol;
|
||||
|
||||
let mut file = match File::open(geyser_plugin_config_file) {
|
||||
Ok(file) => file,
|
||||
Err(err) => {
|
||||
return Err(GeyserPluginManagerError::CannotOpenConfigFile(format!(
|
||||
"Failed to open the plugin config file {geyser_plugin_config_file:?}, error: {err:?}"
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let mut contents = String::new();
|
||||
if let Err(err) = file.read_to_string(&mut contents) {
|
||||
return Err(GeyserPluginManagerError::CannotReadConfigFile(format!(
|
||||
"Failed to read the plugin config file {geyser_plugin_config_file:?}, error: {err:?}"
|
||||
)));
|
||||
}
|
||||
|
||||
let result: serde_json::Value = match json5::from_str(&contents) {
|
||||
Ok(value) => value,
|
||||
Err(err) => {
|
||||
return Err(GeyserPluginManagerError::InvalidConfigFileFormat(format!(
|
||||
"The config file {geyser_plugin_config_file:?} is not in a valid Json5 format, error: {err:?}"
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let libpath = result["libpath"]
|
||||
.as_str()
|
||||
.ok_or(GeyserPluginManagerError::LibPathNotSet)?;
|
||||
let mut libpath = PathBuf::from(libpath);
|
||||
if libpath.is_relative() {
|
||||
let config_dir = geyser_plugin_config_file.parent().ok_or_else(|| {
|
||||
GeyserPluginManagerError::CannotOpenConfigFile(format!(
|
||||
"Failed to resolve parent of {geyser_plugin_config_file:?}",
|
||||
))
|
||||
})?;
|
||||
libpath = config_dir.join(libpath);
|
||||
}
|
||||
|
||||
let config_file = geyser_plugin_config_file
|
||||
.as_os_str()
|
||||
.to_str()
|
||||
.ok_or(GeyserPluginManagerError::InvalidPluginPath)?;
|
||||
|
||||
let (plugin, lib) = unsafe {
|
||||
let lib = Library::new(libpath)
|
||||
.map_err(|e| GeyserPluginManagerError::PluginLoadError(e.to_string()))?;
|
||||
let constructor: Symbol<PluginConstructor> = lib
|
||||
.get(b"_create_plugin")
|
||||
.map_err(|e| GeyserPluginManagerError::PluginLoadError(e.to_string()))?;
|
||||
let plugin_raw = constructor();
|
||||
(Box::from_raw(plugin_raw), lib)
|
||||
};
|
||||
Ok((plugin, lib, config_file))
|
||||
}
|
||||
|
||||
// This is mocked for tests to avoid having to do IO with a dynamically linked library
|
||||
// across different architectures at test time
|
||||
//
|
||||
/// This returns mocked values for the geyser plugin, the dynamic library, and the parsed config file as a &str.
|
||||
/// (The geyser plugin interface requires a &str for the on_load method).
|
||||
#[cfg(test)]
|
||||
pub(crate) fn load_plugin_from_config(
|
||||
_geyser_plugin_config_file: &Path,
|
||||
) -> Result<(Box<dyn GeyserPlugin>, Library, &str), GeyserPluginManagerError> {
|
||||
Ok(tests::dummy_plugin_and_library())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use {
|
||||
crate::geyser_plugin_manager::GeyserPluginManager,
|
||||
libloading::Library,
|
||||
solana_geyser_plugin_interface::geyser_plugin_interface::GeyserPlugin,
|
||||
std::sync::{Arc, RwLock},
|
||||
};
|
||||
|
||||
pub(super) fn dummy_plugin_and_library() -> (Box<dyn GeyserPlugin>, Library, &'static str) {
|
||||
let plugin = Box::new(TestPlugin);
|
||||
let lib = {
|
||||
let handle: *mut std::os::raw::c_void = &mut () as *mut _ as *mut std::os::raw::c_void;
|
||||
// SAFETY: all calls to get Symbols should fail, so this is actually safe
|
||||
let inner_lib = unsafe { libloading::os::unix::Library::from_raw(handle) };
|
||||
Library::from(inner_lib)
|
||||
};
|
||||
(plugin, lib, DUMMY_CONFIG)
|
||||
}
|
||||
|
||||
pub(super) fn dummy_plugin_and_library2() -> (Box<dyn GeyserPlugin>, Library, &'static str) {
|
||||
let plugin = Box::new(TestPlugin2);
|
||||
let lib = {
|
||||
let handle: *mut std::os::raw::c_void = &mut () as *mut _ as *mut std::os::raw::c_void;
|
||||
// SAFETY: all calls to get Symbols should fail, so this is actually safe
|
||||
let inner_lib = unsafe { libloading::os::unix::Library::from_raw(handle) };
|
||||
Library::from(inner_lib)
|
||||
};
|
||||
(plugin, lib, DUMMY_CONFIG)
|
||||
}
|
||||
|
||||
const DUMMY_NAME: &str = "dummy";
|
||||
pub(super) const DUMMY_CONFIG: &str = "dummy_config";
|
||||
const ANOTHER_DUMMY_NAME: &str = "another_dummy";
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(super) struct TestPlugin;
|
||||
|
||||
impl GeyserPlugin for TestPlugin {
|
||||
fn name(&self) -> &'static str {
|
||||
DUMMY_NAME
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(super) struct TestPlugin2;
|
||||
|
||||
impl GeyserPlugin for TestPlugin2 {
|
||||
fn name(&self) -> &'static str {
|
||||
ANOTHER_DUMMY_NAME
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_geyser_reload() {
|
||||
// Initialize empty manager
|
||||
let plugin_manager = Arc::new(RwLock::new(GeyserPluginManager::new()));
|
||||
|
||||
// No plugins are loaded, this should fail
|
||||
let mut plugin_manager_lock = plugin_manager.write().unwrap();
|
||||
let reload_result = plugin_manager_lock.reload_plugin(DUMMY_NAME, DUMMY_CONFIG);
|
||||
assert_eq!(
|
||||
reload_result.unwrap_err().message,
|
||||
"The plugin you requested to reload is not loaded"
|
||||
);
|
||||
|
||||
// Mock having loaded plugin (TestPlugin)
|
||||
let (mut plugin, lib, config) = dummy_plugin_and_library();
|
||||
plugin.on_load(config).unwrap();
|
||||
plugin_manager_lock.plugins.push(plugin);
|
||||
plugin_manager_lock.libs.push(lib);
|
||||
// plugin_manager_lock.libs.push(lib);
|
||||
assert_eq!(plugin_manager_lock.plugins[0].name(), DUMMY_NAME);
|
||||
plugin_manager_lock.plugins[0].name();
|
||||
|
||||
// Try wrong name (same error)
|
||||
const WRONG_NAME: &str = "wrong_name";
|
||||
let reload_result = plugin_manager_lock.reload_plugin(WRONG_NAME, DUMMY_CONFIG);
|
||||
assert_eq!(
|
||||
reload_result.unwrap_err().message,
|
||||
"The plugin you requested to reload is not loaded"
|
||||
);
|
||||
|
||||
// Now try a (dummy) reload, replacing TestPlugin with TestPlugin2
|
||||
let reload_result = plugin_manager_lock.reload_plugin(DUMMY_NAME, DUMMY_CONFIG);
|
||||
assert!(reload_result.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_plugin_list() {
|
||||
// Initialize empty manager
|
||||
let plugin_manager = Arc::new(RwLock::new(GeyserPluginManager::new()));
|
||||
let mut plugin_manager_lock = plugin_manager.write().unwrap();
|
||||
|
||||
// Load two plugins
|
||||
// First
|
||||
let (mut plugin, lib, config) = dummy_plugin_and_library();
|
||||
plugin.on_load(config).unwrap();
|
||||
plugin_manager_lock.plugins.push(plugin);
|
||||
plugin_manager_lock.libs.push(lib);
|
||||
// Second
|
||||
let (mut plugin, lib, config) = dummy_plugin_and_library2();
|
||||
plugin.on_load(config).unwrap();
|
||||
plugin_manager_lock.plugins.push(plugin);
|
||||
plugin_manager_lock.libs.push(lib);
|
||||
|
||||
// Check that both plugins are returned in the list
|
||||
let plugins = plugin_manager_lock.list_plugins().unwrap();
|
||||
assert!(plugins.iter().any(|name| name.eq(DUMMY_NAME)));
|
||||
assert!(plugins.iter().any(|name| name.eq(ANOTHER_DUMMY_NAME)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_plugin_load_unload() {
|
||||
// Initialize empty manager
|
||||
let plugin_manager = Arc::new(RwLock::new(GeyserPluginManager::new()));
|
||||
let mut plugin_manager_lock = plugin_manager.write().unwrap();
|
||||
|
||||
// Load rpc call
|
||||
let load_result = plugin_manager_lock.load_plugin(DUMMY_CONFIG);
|
||||
assert!(load_result.is_ok());
|
||||
assert_eq!(plugin_manager_lock.plugins.len(), 1);
|
||||
|
||||
// Unload rpc call
|
||||
let unload_result = plugin_manager_lock.unload_plugin(DUMMY_NAME);
|
||||
assert!(unload_result.is_ok());
|
||||
assert_eq!(plugin_manager_lock.plugins.len(), 0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,8 +3,10 @@ use {
|
|||
accounts_update_notifier::AccountsUpdateNotifierImpl,
|
||||
block_metadata_notifier::BlockMetadataNotifierImpl,
|
||||
block_metadata_notifier_interface::BlockMetadataNotifierLock,
|
||||
geyser_plugin_manager::GeyserPluginManager, slot_status_notifier::SlotStatusNotifierImpl,
|
||||
slot_status_observer::SlotStatusObserver, transaction_notifier::TransactionNotifierImpl,
|
||||
geyser_plugin_manager::{GeyserPluginManager, GeyserPluginManagerRequest},
|
||||
slot_status_notifier::SlotStatusNotifierImpl,
|
||||
slot_status_observer::SlotStatusObserver,
|
||||
transaction_notifier::TransactionNotifierImpl,
|
||||
},
|
||||
crossbeam_channel::Receiver,
|
||||
log::*,
|
||||
|
@ -14,36 +16,14 @@ use {
|
|||
},
|
||||
solana_runtime::accounts_update_notifier_interface::AccountsUpdateNotifier,
|
||||
std::{
|
||||
fs::File,
|
||||
io::Read,
|
||||
path::{Path, PathBuf},
|
||||
sync::{Arc, RwLock},
|
||||
sync::{atomic::AtomicBool, Arc, RwLock},
|
||||
thread,
|
||||
time::Duration,
|
||||
},
|
||||
thiserror::Error,
|
||||
};
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum GeyserPluginServiceError {
|
||||
#[error("Cannot open the the plugin config file")]
|
||||
CannotOpenConfigFile(String),
|
||||
|
||||
#[error("Cannot read the the plugin config file")]
|
||||
CannotReadConfigFile(String),
|
||||
|
||||
#[error("The config file is not in a valid Json format")]
|
||||
InvalidConfigFileFormat(String),
|
||||
|
||||
#[error("Plugin library path is not specified in the config file")]
|
||||
LibPathNotSet,
|
||||
|
||||
#[error("Invalid plugin path")]
|
||||
InvalidPluginPath,
|
||||
|
||||
#[error("Cannot load plugin shared library")]
|
||||
PluginLoadError(String),
|
||||
}
|
||||
|
||||
/// The service managing the Geyser plugin workflow.
|
||||
pub struct GeyserPluginService {
|
||||
slot_status_observer: Option<SlotStatusObserver>,
|
||||
|
@ -66,10 +46,20 @@ impl GeyserPluginService {
|
|||
/// shall create the implementation of `GeyserPlugin` and returns to the caller.
|
||||
/// The rest of the JSON fields' definition is up to to the concrete plugin implementation
|
||||
/// It is usually used to configure the connection information for the external data store.
|
||||
|
||||
pub fn new(
|
||||
confirmed_bank_receiver: Receiver<BankNotification>,
|
||||
geyser_plugin_config_files: &[PathBuf],
|
||||
) -> Result<Self, GeyserPluginServiceError> {
|
||||
Self::new_with_receiver(confirmed_bank_receiver, geyser_plugin_config_files, None)
|
||||
}
|
||||
|
||||
pub fn new_with_receiver(
|
||||
confirmed_bank_receiver: Receiver<BankNotification>,
|
||||
geyser_plugin_config_files: &[PathBuf],
|
||||
rpc_to_plugin_manager_receiver_and_exit: Option<(
|
||||
Receiver<GeyserPluginManagerRequest>,
|
||||
Arc<AtomicBool>,
|
||||
)>,
|
||||
) -> Result<Self, GeyserPluginServiceError> {
|
||||
info!(
|
||||
"Starting GeyserPluginService from config files: {:?}",
|
||||
|
@ -80,10 +70,10 @@ impl GeyserPluginService {
|
|||
for geyser_plugin_config_file in geyser_plugin_config_files {
|
||||
Self::load_plugin(&mut plugin_manager, geyser_plugin_config_file)?;
|
||||
}
|
||||
|
||||
let account_data_notifications_enabled =
|
||||
plugin_manager.account_data_notifications_enabled();
|
||||
let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled();
|
||||
|
||||
let plugin_manager = Arc::new(RwLock::new(plugin_manager));
|
||||
|
||||
let accounts_update_notifier: Option<AccountsUpdateNotifier> =
|
||||
|
@ -122,6 +112,12 @@ impl GeyserPluginService {
|
|||
(None, None)
|
||||
};
|
||||
|
||||
// Initialize plugin manager rpc handler thread if needed
|
||||
if let Some((request_receiver, exit)) = rpc_to_plugin_manager_receiver_and_exit {
|
||||
let plugin_manager = plugin_manager.clone();
|
||||
Self::start_manager_rpc_handler(plugin_manager, request_receiver, exit)
|
||||
};
|
||||
|
||||
info!("Started GeyserPluginService");
|
||||
Ok(GeyserPluginService {
|
||||
slot_status_observer,
|
||||
|
@ -136,56 +132,9 @@ impl GeyserPluginService {
|
|||
plugin_manager: &mut GeyserPluginManager,
|
||||
geyser_plugin_config_file: &Path,
|
||||
) -> Result<(), GeyserPluginServiceError> {
|
||||
let mut file = match File::open(geyser_plugin_config_file) {
|
||||
Ok(file) => file,
|
||||
Err(err) => {
|
||||
return Err(GeyserPluginServiceError::CannotOpenConfigFile(format!(
|
||||
"Failed to open the plugin config file {geyser_plugin_config_file:?}, error: {err:?}"
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let mut contents = String::new();
|
||||
if let Err(err) = file.read_to_string(&mut contents) {
|
||||
return Err(GeyserPluginServiceError::CannotReadConfigFile(format!(
|
||||
"Failed to read the plugin config file {geyser_plugin_config_file:?}, error: {err:?}"
|
||||
)));
|
||||
}
|
||||
|
||||
let result: serde_json::Value = match json5::from_str(&contents) {
|
||||
Ok(value) => value,
|
||||
Err(err) => {
|
||||
return Err(GeyserPluginServiceError::InvalidConfigFileFormat(format!(
|
||||
"The config file {geyser_plugin_config_file:?} is not in a valid Json5 format, error: {err:?}"
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let libpath = result["libpath"]
|
||||
.as_str()
|
||||
.ok_or(GeyserPluginServiceError::LibPathNotSet)?;
|
||||
let mut libpath = PathBuf::from(libpath);
|
||||
if libpath.is_relative() {
|
||||
let config_dir = geyser_plugin_config_file.parent().ok_or_else(|| {
|
||||
GeyserPluginServiceError::CannotOpenConfigFile(format!(
|
||||
"Failed to resolve parent of {geyser_plugin_config_file:?}",
|
||||
))
|
||||
})?;
|
||||
libpath = config_dir.join(libpath);
|
||||
}
|
||||
|
||||
let config_file = geyser_plugin_config_file
|
||||
.as_os_str()
|
||||
.to_str()
|
||||
.ok_or(GeyserPluginServiceError::InvalidPluginPath)?;
|
||||
|
||||
unsafe {
|
||||
let result = plugin_manager.load_plugin(libpath.to_str().unwrap(), config_file);
|
||||
if let Err(err) = result {
|
||||
let msg = format!("Failed to load the plugin library: {libpath:?}, error: {err:?}");
|
||||
return Err(GeyserPluginServiceError::PluginLoadError(msg));
|
||||
}
|
||||
}
|
||||
plugin_manager
|
||||
.load_plugin(geyser_plugin_config_file)
|
||||
.map_err(|e| GeyserPluginServiceError::FailedToLoadPlugin(e.into()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -208,4 +157,71 @@ impl GeyserPluginService {
|
|||
self.plugin_manager.write().unwrap().unload();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn start_manager_rpc_handler(
|
||||
plugin_manager: Arc<RwLock<GeyserPluginManager>>,
|
||||
request_receiver: Receiver<GeyserPluginManagerRequest>,
|
||||
exit: Arc<AtomicBool>,
|
||||
) {
|
||||
thread::Builder::new()
|
||||
.name("SolGeyserPluginRpc".to_string())
|
||||
.spawn(move || loop {
|
||||
if let Ok(request) = request_receiver.recv_timeout(Duration::from_secs(5)) {
|
||||
match request {
|
||||
GeyserPluginManagerRequest::ListPlugins { response_sender } => {
|
||||
let plugin_list = plugin_manager.read().unwrap().list_plugins();
|
||||
response_sender
|
||||
.send(plugin_list)
|
||||
.expect("Admin rpc service will be waiting for response");
|
||||
}
|
||||
|
||||
GeyserPluginManagerRequest::ReloadPlugin {
|
||||
ref name,
|
||||
ref config_file,
|
||||
response_sender,
|
||||
} => {
|
||||
let reload_result = plugin_manager
|
||||
.write()
|
||||
.unwrap()
|
||||
.reload_plugin(name, config_file);
|
||||
response_sender
|
||||
.send(reload_result)
|
||||
.expect("Admin rpc service will be waiting for response");
|
||||
}
|
||||
|
||||
GeyserPluginManagerRequest::LoadPlugin {
|
||||
ref config_file,
|
||||
response_sender,
|
||||
} => {
|
||||
let load_result =
|
||||
plugin_manager.write().unwrap().load_plugin(config_file);
|
||||
response_sender
|
||||
.send(load_result)
|
||||
.expect("Admin rpc service will be waiting for response");
|
||||
}
|
||||
|
||||
GeyserPluginManagerRequest::UnloadPlugin {
|
||||
ref name,
|
||||
response_sender,
|
||||
} => {
|
||||
let unload_result = plugin_manager.write().unwrap().unload_plugin(name);
|
||||
response_sender
|
||||
.send(unload_result)
|
||||
.expect("Admin rpc service will be waiting for response");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if exit.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum GeyserPluginServiceError {
|
||||
#[error("Failed to load a geyser plugin")]
|
||||
FailedToLoadPlugin(#[from] Box<dyn std::error::Error>),
|
||||
}
|
||||
|
|
|
@ -6,3 +6,5 @@ pub mod geyser_plugin_service;
|
|||
pub mod slot_status_notifier;
|
||||
pub mod slot_status_observer;
|
||||
pub mod transaction_notifier;
|
||||
|
||||
pub use geyser_plugin_manager::GeyserPluginManagerRequest;
|
||||
|
|
|
@ -102,7 +102,7 @@ use {
|
|||
fs::File,
|
||||
io::{self, stdout, BufRead, BufReader, Write},
|
||||
path::{Path, PathBuf},
|
||||
process::{exit, Command, Stdio},
|
||||
process::{Command, Stdio},
|
||||
str::FromStr,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
|
@ -312,7 +312,7 @@ fn output_ledger(
|
|||
.slot_meta_iterator(starting_slot)
|
||||
.unwrap_or_else(|err| {
|
||||
eprintln!("Failed to load entries starting from slot {starting_slot}: {err:?}");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
});
|
||||
|
||||
if method == LedgerOutputMethod::Json {
|
||||
|
@ -929,7 +929,7 @@ fn open_blockstore(
|
|||
error!("Blockstore is incompatible with current software and requires updates");
|
||||
if !force_update_to_open {
|
||||
error!("Use --force-update-to-open to allow blockstore to update");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
open_blockstore_with_temporary_primary_access(
|
||||
ledger_path,
|
||||
|
@ -941,12 +941,12 @@ fn open_blockstore(
|
|||
"Failed to open blockstore (with --force-update-to-open) at {:?}: {:?}",
|
||||
ledger_path, err
|
||||
);
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
})
|
||||
}
|
||||
Err(err) => {
|
||||
eprintln!("Failed to open blockstore at {ledger_path:?}: {err:?}");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1128,14 +1128,14 @@ fn load_bank_forks(
|
|||
"Unable to load bank forks at slot {halt_slot} because it is less than the starting slot {starting_slot}. \
|
||||
The starting slot will be the latest snapshot slot, or genesis if --no-snapshot flag specified or no snapshots found."
|
||||
);
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
// Check if we have the slot data necessary to replay from starting_slot to >= halt_slot.
|
||||
if !blockstore.slot_range_connected(starting_slot, halt_slot) {
|
||||
eprintln!(
|
||||
"Unable to load bank forks at slot {halt_slot} due to disconnected blocks.",
|
||||
);
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1164,7 +1164,7 @@ fn load_bank_forks(
|
|||
{
|
||||
// Couldn't get Primary access, error out to be defensive.
|
||||
eprintln!("Error: custom accounts path is not supported under secondary access");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
account_paths.split(',').map(PathBuf::from).collect()
|
||||
|
@ -1187,7 +1187,7 @@ fn load_bank_forks(
|
|||
Ok((account_run_path, _account_snapshot_path)) => account_run_path,
|
||||
Err(err) => {
|
||||
eprintln!("Unable to create account run and snapshot sub directories: {}, err: {err:?}", account_path.display());
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}).collect();
|
||||
|
@ -1207,6 +1207,7 @@ fn load_bank_forks(
|
|||
|
||||
let mut accounts_update_notifier = Option::<AccountsUpdateNotifier>::default();
|
||||
let mut transaction_notifier = Option::<TransactionNotifierLock>::default();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
if arg_matches.is_present("geyser_plugin_config") {
|
||||
let geyser_config_files = values_t_or_exit!(arg_matches, "geyser_plugin_config", String)
|
||||
.into_iter()
|
||||
|
@ -1215,11 +1216,12 @@ fn load_bank_forks(
|
|||
|
||||
let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
|
||||
drop(confirmed_bank_sender);
|
||||
|
||||
let geyser_service =
|
||||
GeyserPluginService::new(confirmed_bank_receiver, &geyser_config_files).unwrap_or_else(
|
||||
|err| {
|
||||
eprintln!("Failed to setup Geyser service: {err:?}");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
},
|
||||
);
|
||||
accounts_update_notifier = geyser_service.get_accounts_update_notifier();
|
||||
|
@ -1257,7 +1259,6 @@ fn load_bank_forks(
|
|||
snapshot_request_handler,
|
||||
pruned_banks_request_handler,
|
||||
};
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let accounts_background_service = AccountsBackgroundService::new(
|
||||
bank_forks.clone(),
|
||||
&exit,
|
||||
|
@ -2509,7 +2510,7 @@ fn main() {
|
|||
)
|
||||
.unwrap_or_else(|err| {
|
||||
eprintln!("Failed to write genesis config: {err:?}");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
});
|
||||
|
||||
println!("{}", open_genesis_config_by(&output_directory, arg_matches));
|
||||
|
@ -2555,7 +2556,7 @@ fn main() {
|
|||
}
|
||||
Err(err) => {
|
||||
eprintln!("Failed to load ledger: {err:?}");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2632,7 +2633,7 @@ fn main() {
|
|||
}
|
||||
Err(err) => {
|
||||
eprintln!("Failed to load ledger: {err:?}");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2864,7 +2865,7 @@ fn main() {
|
|||
)
|
||||
.unwrap_or_else(|err| {
|
||||
eprintln!("Ledger verification failed: {err:?}");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
});
|
||||
if print_accounts_stats {
|
||||
let working_bank = bank_forks.read().unwrap().working_bank();
|
||||
|
@ -2925,7 +2926,7 @@ fn main() {
|
|||
}
|
||||
Err(err) => {
|
||||
eprintln!("Failed to load ledger: {err:?}");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2968,7 +2969,7 @@ fn main() {
|
|||
"Error: insufficient --bootstrap-validator-stake-lamports. \
|
||||
Minimum amount is {minimum_stake_lamports}"
|
||||
);
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
let bootstrap_validator_pubkeys = pubkeys_of(arg_matches, "bootstrap_validator");
|
||||
let accounts_to_remove =
|
||||
|
@ -2985,7 +2986,7 @@ fn main() {
|
|||
|s| {
|
||||
s.parse::<SnapshotVersion>().unwrap_or_else(|e| {
|
||||
eprintln!("Error: {e}");
|
||||
exit(1)
|
||||
std::process::exit(1)
|
||||
})
|
||||
},
|
||||
);
|
||||
|
@ -3032,7 +3033,7 @@ fn main() {
|
|||
eprintln!(
|
||||
"Error: snapshot slot {snapshot_slot} does not exist in blockstore or is not full.",
|
||||
);
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
let ending_slot = if is_minimized {
|
||||
|
@ -3041,7 +3042,7 @@ fn main() {
|
|||
eprintln!(
|
||||
"Error: ending_slot ({ending_slot}) must be greater than snapshot_slot ({snapshot_slot})"
|
||||
);
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
Some(ending_slot)
|
||||
|
@ -3086,7 +3087,7 @@ fn main() {
|
|||
.get(snapshot_slot)
|
||||
.unwrap_or_else(|| {
|
||||
eprintln!("Error: Slot {snapshot_slot} is not available");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
});
|
||||
|
||||
let child_bank_required = rent_burn_percentage.is_ok()
|
||||
|
@ -3141,7 +3142,7 @@ fn main() {
|
|||
eprintln!(
|
||||
"Error: Account does not exist, unable to remove it: {address}"
|
||||
);
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
});
|
||||
|
||||
account.set_lamports(0);
|
||||
|
@ -3209,7 +3210,7 @@ fn main() {
|
|||
eprintln!(
|
||||
"Error: --bootstrap-validator pubkeys cannot be duplicated"
|
||||
);
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3283,7 +3284,7 @@ fn main() {
|
|||
eprintln!(
|
||||
"Error: --warp-slot too close. Must be >= {minimum_warp_slot}"
|
||||
);
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
} else {
|
||||
warn!("Warping to slot {}", minimum_warp_slot);
|
||||
|
@ -3333,7 +3334,7 @@ fn main() {
|
|||
if is_incremental {
|
||||
if starting_snapshot_hashes.is_none() {
|
||||
eprintln!("Unable to create incremental snapshot without a base full snapshot");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
let full_snapshot_slot = starting_snapshot_hashes.unwrap().full.hash.0;
|
||||
if bank.slot() <= full_snapshot_slot {
|
||||
|
@ -3342,7 +3343,7 @@ fn main() {
|
|||
bank.slot(),
|
||||
full_snapshot_slot,
|
||||
);
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
let incremental_snapshot_archive_info =
|
||||
|
@ -3359,7 +3360,7 @@ fn main() {
|
|||
)
|
||||
.unwrap_or_else(|err| {
|
||||
eprintln!("Unable to create incremental snapshot: {err}");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
});
|
||||
|
||||
println!(
|
||||
|
@ -3383,7 +3384,7 @@ fn main() {
|
|||
)
|
||||
.unwrap_or_else(|err| {
|
||||
eprintln!("Unable to create snapshot: {err}");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
});
|
||||
|
||||
println!(
|
||||
|
@ -3414,7 +3415,7 @@ fn main() {
|
|||
}
|
||||
Err(err) => {
|
||||
eprintln!("Failed to load ledger: {err:?}");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3444,7 +3445,7 @@ fn main() {
|
|||
)
|
||||
.unwrap_or_else(|err| {
|
||||
eprintln!("Failed to load ledger: {err:?}");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
});
|
||||
|
||||
let bank = bank_forks.read().unwrap().working_bank();
|
||||
|
@ -3535,7 +3536,7 @@ fn main() {
|
|||
let slot = bank_forks.working_bank().slot();
|
||||
let bank = bank_forks.get(slot).unwrap_or_else(|| {
|
||||
eprintln!("Error: Slot {slot} is not available");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
});
|
||||
|
||||
if arg_matches.is_present("recalculate_capitalization") {
|
||||
|
@ -3566,7 +3567,7 @@ fn main() {
|
|||
base_bank.epoch(),
|
||||
warp_epoch
|
||||
);
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
if let Ok(raw_inflation) = value_t!(arg_matches, "inflation", String) {
|
||||
|
@ -4032,7 +4033,7 @@ fn main() {
|
|||
}
|
||||
Err(err) => {
|
||||
eprintln!("Failed to load ledger: {err:?}");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -4061,20 +4062,20 @@ fn main() {
|
|||
let slots: Vec<_> = metas.map(|(slot, _)| slot).collect();
|
||||
if slots.is_empty() {
|
||||
eprintln!("Purge range is empty");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
*slots.last().unwrap()
|
||||
}
|
||||
Err(err) => {
|
||||
eprintln!("Unable to read the Ledger: {err:?}");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
if end_slot < start_slot {
|
||||
eprintln!("end slot {end_slot} is less than start slot {start_slot}");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
info!(
|
||||
"Purging data from slots {} to {} ({} slots) (skip compaction: {}) (dead slot only: {})",
|
||||
|
@ -4223,7 +4224,7 @@ fn main() {
|
|||
Either adjust `--until` value, or pass a larger `--repair-limit` \
|
||||
to override the limit",
|
||||
);
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
let ancestor_iterator = AncestorIterator::new(start_root, &blockstore)
|
||||
.take_while(|&slot| slot >= end_root);
|
||||
|
@ -4238,7 +4239,7 @@ fn main() {
|
|||
.set_roots(roots_to_fix.iter())
|
||||
.unwrap_or_else(|err| {
|
||||
eprintln!("Unable to set roots {roots_to_fix:?}: {err}");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
});
|
||||
}
|
||||
} else {
|
||||
|
@ -4312,7 +4313,7 @@ fn main() {
|
|||
}
|
||||
Err(err) => {
|
||||
eprintln!("Unable to read the Ledger: {err:?}");
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -4364,7 +4365,7 @@ fn main() {
|
|||
}
|
||||
("", _) => {
|
||||
eprintln!("{}", matches.usage());
|
||||
exit(1);
|
||||
std::process::exit(1);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
|
|
@ -283,6 +283,7 @@ impl LocalCluster {
|
|||
vec![],
|
||||
&leader_config,
|
||||
true, // should_check_duplicate_instance
|
||||
None, // rpc_to_plugin_manager_receiver
|
||||
Arc::new(RwLock::new(ValidatorStartProgress::default())),
|
||||
socket_addr_space,
|
||||
DEFAULT_TPU_USE_QUIC,
|
||||
|
@ -489,6 +490,7 @@ impl LocalCluster {
|
|||
vec![LegacyContactInfo::try_from(&self.entry_point_info).unwrap()],
|
||||
&config,
|
||||
true, // should_check_duplicate_instance
|
||||
None, // rpc_to_plugin_manager_receiver
|
||||
Arc::new(RwLock::new(ValidatorStartProgress::default())),
|
||||
socket_addr_space,
|
||||
DEFAULT_TPU_USE_QUIC,
|
||||
|
@ -862,6 +864,7 @@ impl Cluster for LocalCluster {
|
|||
.unwrap_or_default(),
|
||||
&safe_clone_config(&cluster_validator_info.config),
|
||||
true, // should_check_duplicate_instance
|
||||
None, // rpc_to_plugin_manager_receiver
|
||||
Arc::new(RwLock::new(ValidatorStartProgress::default())),
|
||||
socket_addr_space,
|
||||
DEFAULT_TPU_USE_QUIC,
|
||||
|
|
|
@ -14,7 +14,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
|
|||
account_paths: config.account_paths.clone(),
|
||||
account_shrink_paths: config.account_shrink_paths.clone(),
|
||||
rpc_config: config.rpc_config.clone(),
|
||||
geyser_plugin_config_files: config.geyser_plugin_config_files.clone(),
|
||||
on_start_geyser_plugin_config_files: config.on_start_geyser_plugin_config_files.clone(),
|
||||
rpc_addrs: config.rpc_addrs,
|
||||
pubsub_config: config.pubsub_config.clone(),
|
||||
snapshot_config: config.snapshot_config.clone(),
|
||||
|
|
|
@ -4834,6 +4834,8 @@ dependencies = [
|
|||
"bs58",
|
||||
"crossbeam-channel",
|
||||
"json5",
|
||||
"jsonrpc-core",
|
||||
"jsonrpc-server-utils",
|
||||
"libloading",
|
||||
"log",
|
||||
"serde_json",
|
||||
|
@ -6103,12 +6105,14 @@ version = "1.16.0"
|
|||
dependencies = [
|
||||
"base64 0.13.0",
|
||||
"bincode",
|
||||
"crossbeam-channel",
|
||||
"log",
|
||||
"serde_derive",
|
||||
"serde_json",
|
||||
"solana-cli-output",
|
||||
"solana-client",
|
||||
"solana-core",
|
||||
"solana-geyser-plugin-manager",
|
||||
"solana-gossip",
|
||||
"solana-ledger",
|
||||
"solana-logger 1.16.0",
|
||||
|
@ -6219,6 +6223,7 @@ dependencies = [
|
|||
"jsonrpc-server-utils",
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"libloading",
|
||||
"log",
|
||||
"num_cpus",
|
||||
"rand 0.7.3",
|
||||
|
@ -6234,6 +6239,8 @@ dependencies = [
|
|||
"solana-entry",
|
||||
"solana-faucet",
|
||||
"solana-genesis-utils",
|
||||
"solana-geyser-plugin-interface",
|
||||
"solana-geyser-plugin-manager",
|
||||
"solana-gossip",
|
||||
"solana-ledger",
|
||||
"solana-logger 1.16.0",
|
||||
|
|
|
@ -12,12 +12,14 @@ edition = { workspace = true }
|
|||
[dependencies]
|
||||
base64 = { workspace = true }
|
||||
bincode = { workspace = true }
|
||||
crossbeam-channel = { workspace = true }
|
||||
log = { workspace = true }
|
||||
serde_derive = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
solana-cli-output = { workspace = true }
|
||||
solana-client = { workspace = true }
|
||||
solana-core = { workspace = true }
|
||||
solana-geyser-plugin-manager = { workspace = true }
|
||||
solana-gossip = { workspace = true }
|
||||
solana-ledger = { workspace = true }
|
||||
solana-logger = { workspace = true }
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
#![allow(clippy::integer_arithmetic)]
|
||||
|
||||
use {
|
||||
crossbeam_channel::Receiver,
|
||||
log::*,
|
||||
solana_cli_output::CliAccount,
|
||||
solana_client::rpc_request::MAX_MULTIPLE_ACCOUNTS,
|
||||
|
@ -9,6 +9,9 @@ use {
|
|||
tower_storage::TowerStorage,
|
||||
validator::{Validator, ValidatorConfig, ValidatorStartProgress},
|
||||
},
|
||||
solana_geyser_plugin_manager::{
|
||||
geyser_plugin_manager::GeyserPluginManager, GeyserPluginManagerRequest,
|
||||
},
|
||||
solana_gossip::{
|
||||
cluster_info::{ClusterInfo, Node},
|
||||
gossip_service::discover_cluster,
|
||||
|
@ -138,6 +141,7 @@ pub struct TestValidatorGenesis {
|
|||
pub log_messages_bytes_limit: Option<usize>,
|
||||
pub transaction_account_lock_limit: Option<usize>,
|
||||
pub tpu_enable_udp: bool,
|
||||
pub geyser_plugin_manager: Arc<RwLock<GeyserPluginManager>>,
|
||||
admin_rpc_service_post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
|
||||
}
|
||||
|
||||
|
@ -172,6 +176,7 @@ impl Default for TestValidatorGenesis {
|
|||
log_messages_bytes_limit: Option::<usize>::default(),
|
||||
transaction_account_lock_limit: Option::<usize>::default(),
|
||||
tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP,
|
||||
geyser_plugin_manager: Arc::new(RwLock::new(GeyserPluginManager::new())),
|
||||
admin_rpc_service_post_init:
|
||||
Arc::<RwLock<Option<AdminRpcRequestMetadataPostInit>>>::default(),
|
||||
}
|
||||
|
@ -530,7 +535,26 @@ impl TestValidatorGenesis {
|
|||
mint_address: Pubkey,
|
||||
socket_addr_space: SocketAddrSpace,
|
||||
) -> Result<TestValidator, Box<dyn std::error::Error>> {
|
||||
TestValidator::start(mint_address, self, socket_addr_space).map(|test_validator| {
|
||||
self.start_with_mint_address_and_geyser_plugin_rpc(mint_address, socket_addr_space, None)
|
||||
}
|
||||
|
||||
/// Start a test validator with the address of the mint account that will receive tokens
|
||||
/// created at genesis. Augments admin rpc service with dynamic geyser plugin manager if
|
||||
/// the geyser plugin service is enabled at startup.
|
||||
///
|
||||
pub fn start_with_mint_address_and_geyser_plugin_rpc(
|
||||
&self,
|
||||
mint_address: Pubkey,
|
||||
socket_addr_space: SocketAddrSpace,
|
||||
rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
|
||||
) -> Result<TestValidator, Box<dyn std::error::Error>> {
|
||||
TestValidator::start(
|
||||
mint_address,
|
||||
self,
|
||||
socket_addr_space,
|
||||
rpc_to_plugin_manager_receiver,
|
||||
)
|
||||
.map(|test_validator| {
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_io()
|
||||
.enable_time()
|
||||
|
@ -579,7 +603,7 @@ impl TestValidatorGenesis {
|
|||
socket_addr_space: SocketAddrSpace,
|
||||
) -> (TestValidator, Keypair) {
|
||||
let mint_keypair = Keypair::new();
|
||||
match TestValidator::start(mint_keypair.pubkey(), self, socket_addr_space) {
|
||||
match TestValidator::start(mint_keypair.pubkey(), self, socket_addr_space, None) {
|
||||
Ok(test_validator) => {
|
||||
test_validator.wait_for_nonzero_fees().await;
|
||||
(test_validator, mint_keypair)
|
||||
|
@ -835,6 +859,7 @@ impl TestValidator {
|
|||
mint_address: Pubkey,
|
||||
config: &TestValidatorGenesis,
|
||||
socket_addr_space: SocketAddrSpace,
|
||||
rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
|
||||
) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
let preserve_ledger = config.ledger_path.is_some();
|
||||
let ledger_path = TestValidator::initialize_ledger(mint_address, config)?;
|
||||
|
@ -897,7 +922,7 @@ impl TestValidator {
|
|||
};
|
||||
|
||||
let mut validator_config = ValidatorConfig {
|
||||
geyser_plugin_config_files: config.geyser_plugin_config_files.clone(),
|
||||
on_start_geyser_plugin_config_files: config.geyser_plugin_config_files.clone(),
|
||||
rpc_addrs: Some((
|
||||
SocketAddr::new(
|
||||
IpAddr::V4(Ipv4Addr::UNSPECIFIED),
|
||||
|
@ -949,6 +974,7 @@ impl TestValidator {
|
|||
vec![],
|
||||
&validator_config,
|
||||
true, // should_check_duplicate_instance
|
||||
rpc_to_plugin_manager_receiver,
|
||||
config.start_progress.clone(),
|
||||
socket_addr_space,
|
||||
DEFAULT_TPU_USE_QUIC,
|
||||
|
|
|
@ -25,6 +25,7 @@ jsonrpc-derive = { workspace = true }
|
|||
jsonrpc-ipc-server = { workspace = true }
|
||||
jsonrpc-server-utils = { workspace = true }
|
||||
lazy_static = { workspace = true }
|
||||
libloading = { workspace = true }
|
||||
log = { workspace = true }
|
||||
num_cpus = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
|
@ -39,6 +40,8 @@ solana-download-utils = { workspace = true }
|
|||
solana-entry = { workspace = true }
|
||||
solana-faucet = { workspace = true }
|
||||
solana-genesis-utils = { workspace = true }
|
||||
solana-geyser-plugin-interface = { workspace = true }
|
||||
solana-geyser-plugin-manager = { workspace = true }
|
||||
solana-gossip = { workspace = true }
|
||||
solana-ledger = { workspace = true }
|
||||
solana-logger = { workspace = true }
|
||||
|
|
|
@ -1,8 +1,11 @@
|
|||
use {
|
||||
jsonrpc_core::{MetaIoHandler, Metadata, Result},
|
||||
crossbeam_channel::Sender,
|
||||
jsonrpc_core::{BoxFuture, ErrorCode, MetaIoHandler, Metadata, Result},
|
||||
jsonrpc_core_client::{transports::ipc, RpcError},
|
||||
jsonrpc_derive::rpc,
|
||||
jsonrpc_ipc_server::{RequestContext, ServerBuilder},
|
||||
jsonrpc_ipc_server::{
|
||||
tokio::sync::oneshot::channel as oneshot_channel, RequestContext, ServerBuilder,
|
||||
},
|
||||
jsonrpc_server_utils::tokio,
|
||||
log::*,
|
||||
serde::{de::Deserializer, Deserialize, Serialize},
|
||||
|
@ -10,6 +13,7 @@ use {
|
|||
admin_rpc_post_init::AdminRpcRequestMetadataPostInit, consensus::Tower,
|
||||
tower_storage::TowerStorage, validator::ValidatorStartProgress,
|
||||
},
|
||||
solana_geyser_plugin_manager::GeyserPluginManagerRequest,
|
||||
solana_gossip::contact_info::ContactInfo,
|
||||
solana_rpc::rpc::verify_pubkey,
|
||||
solana_rpc_client_api::{config::RpcAccountIndex, custom_error::RpcCustomError},
|
||||
|
@ -41,7 +45,9 @@ pub struct AdminRpcRequestMetadata {
|
|||
pub tower_storage: Arc<dyn TowerStorage>,
|
||||
pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
|
||||
pub post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
|
||||
pub rpc_to_plugin_manager_sender: Option<Sender<GeyserPluginManagerRequest>>,
|
||||
}
|
||||
|
||||
impl Metadata for AdminRpcRequestMetadata {}
|
||||
|
||||
impl AdminRpcRequestMetadata {
|
||||
|
@ -139,6 +145,23 @@ pub trait AdminRpc {
|
|||
#[rpc(meta, name = "exit")]
|
||||
fn exit(&self, meta: Self::Metadata) -> Result<()>;
|
||||
|
||||
#[rpc(meta, name = "reloadPlugin")]
|
||||
fn reload_plugin(
|
||||
&self,
|
||||
meta: Self::Metadata,
|
||||
name: String,
|
||||
config_file: String,
|
||||
) -> BoxFuture<Result<()>>;
|
||||
|
||||
#[rpc(meta, name = "unloadPlugin")]
|
||||
fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>>;
|
||||
|
||||
#[rpc(meta, name = "loadPlugin")]
|
||||
fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>>;
|
||||
|
||||
#[rpc(meta, name = "listPlugins")]
|
||||
fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>>;
|
||||
|
||||
#[rpc(meta, name = "rpcAddress")]
|
||||
fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>>;
|
||||
|
||||
|
@ -234,6 +257,121 @@ impl AdminRpc for AdminRpcImpl {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn reload_plugin(
|
||||
&self,
|
||||
meta: Self::Metadata,
|
||||
name: String,
|
||||
config_file: String,
|
||||
) -> BoxFuture<Result<()>> {
|
||||
Box::pin(async move {
|
||||
// Construct channel for plugin to respond to this particular rpc request instance
|
||||
let (response_sender, response_receiver) = oneshot_channel();
|
||||
|
||||
// Send request to plugin manager if there is a geyser service
|
||||
if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
|
||||
rpc_to_manager_sender
|
||||
.send(GeyserPluginManagerRequest::ReloadPlugin {
|
||||
name,
|
||||
config_file,
|
||||
response_sender,
|
||||
})
|
||||
.expect("GeyerPluginService should never drop request receiver");
|
||||
} else {
|
||||
return Err(jsonrpc_core::Error {
|
||||
code: ErrorCode::InvalidRequest,
|
||||
message: "No geyser plugin service".to_string(),
|
||||
data: None,
|
||||
});
|
||||
}
|
||||
|
||||
// Await response from plugin manager
|
||||
response_receiver
|
||||
.await
|
||||
.expect("GeyerPluginService's oneshot sender shouldn't drop early")
|
||||
})
|
||||
}
|
||||
|
||||
fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>> {
|
||||
Box::pin(async move {
|
||||
// Construct channel for plugin to respond to this particular rpc request instance
|
||||
let (response_sender, response_receiver) = oneshot_channel();
|
||||
|
||||
// Send request to plugin manager if there is a geyser service
|
||||
if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
|
||||
rpc_to_manager_sender
|
||||
.send(GeyserPluginManagerRequest::LoadPlugin {
|
||||
config_file,
|
||||
response_sender,
|
||||
})
|
||||
.expect("GeyerPluginService should never drop request receiver");
|
||||
} else {
|
||||
return Err(jsonrpc_core::Error {
|
||||
code: ErrorCode::InvalidRequest,
|
||||
message: "No geyser plugin service".to_string(),
|
||||
data: None,
|
||||
});
|
||||
}
|
||||
|
||||
// Await response from plugin manager
|
||||
response_receiver
|
||||
.await
|
||||
.expect("GeyerPluginService's oneshot sender shouldn't drop early")
|
||||
})
|
||||
}
|
||||
|
||||
fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>> {
|
||||
Box::pin(async move {
|
||||
// Construct channel for plugin to respond to this particular rpc request instance
|
||||
let (response_sender, response_receiver) = oneshot_channel();
|
||||
|
||||
// Send request to plugin manager if there is a geyser service
|
||||
if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
|
||||
rpc_to_manager_sender
|
||||
.send(GeyserPluginManagerRequest::UnloadPlugin {
|
||||
name,
|
||||
response_sender,
|
||||
})
|
||||
.expect("GeyerPluginService should never drop request receiver");
|
||||
} else {
|
||||
return Err(jsonrpc_core::Error {
|
||||
code: ErrorCode::InvalidRequest,
|
||||
message: "No geyser plugin service".to_string(),
|
||||
data: None,
|
||||
});
|
||||
}
|
||||
|
||||
// Await response from plugin manager
|
||||
response_receiver
|
||||
.await
|
||||
.expect("GeyerPluginService's oneshot sender shouldn't drop early")
|
||||
})
|
||||
}
|
||||
|
||||
fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>> {
|
||||
Box::pin(async move {
|
||||
// Construct channel for plugin to respond to this particular rpc request instance
|
||||
let (response_sender, response_receiver) = oneshot_channel();
|
||||
|
||||
// Send request to plugin manager
|
||||
if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
|
||||
rpc_to_manager_sender
|
||||
.send(GeyserPluginManagerRequest::ListPlugins { response_sender })
|
||||
.expect("GeyerPluginService should never drop request receiver");
|
||||
} else {
|
||||
return Err(jsonrpc_core::Error {
|
||||
code: ErrorCode::InvalidRequest,
|
||||
message: "No geyser plugin service".to_string(),
|
||||
data: None,
|
||||
});
|
||||
}
|
||||
|
||||
// Await response from plugin manager
|
||||
response_receiver
|
||||
.await
|
||||
.expect("GeyerPluginService's oneshot sender shouldn't drop early")
|
||||
})
|
||||
}
|
||||
|
||||
fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>> {
|
||||
debug!("rpc_addr admin rpc request received");
|
||||
Ok(meta.rpc_addr)
|
||||
|
@ -544,6 +682,7 @@ pub fn run(ledger_path: &Path, metadata: AdminRpcRequestMetadata) {
|
|||
warn!("Unable to start admin rpc service: {:?}", err);
|
||||
}
|
||||
Ok(server) => {
|
||||
info!("started admin rpc service!");
|
||||
let close_handle = server.close_handle();
|
||||
validator_exit
|
||||
.write()
|
||||
|
@ -708,6 +847,7 @@ mod tests {
|
|||
repair_whitelist,
|
||||
}))),
|
||||
staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
|
||||
rpc_to_plugin_manager_sender: None,
|
||||
};
|
||||
let mut io = MetaIoHandler::default();
|
||||
io.extend_with(AdminRpcImpl.to_delegate());
|
||||
|
|
|
@ -381,6 +381,14 @@ fn main() {
|
|||
let tower_storage = Arc::new(FileTowerStorage::new(ledger_path.clone()));
|
||||
|
||||
let admin_service_post_init = Arc::new(RwLock::new(None));
|
||||
// If geyser_plugin_config value is invalid, the validator will exit when the values are extracted below
|
||||
let (rpc_to_plugin_manager_sender, rpc_to_plugin_manager_receiver) =
|
||||
if matches.is_present("geyser_plugin_config") {
|
||||
let (sender, receiver) = unbounded();
|
||||
(Some(sender), Some(receiver))
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
admin_rpc_service::run(
|
||||
&ledger_path,
|
||||
admin_rpc_service::AdminRpcRequestMetadata {
|
||||
|
@ -392,6 +400,7 @@ fn main() {
|
|||
staked_nodes_overrides: genesis.staked_nodes_overrides.clone(),
|
||||
post_init: admin_service_post_init,
|
||||
tower_storage: tower_storage.clone(),
|
||||
rpc_to_plugin_manager_sender,
|
||||
},
|
||||
);
|
||||
let dashboard = if output == Output::Dashboard {
|
||||
|
@ -542,7 +551,11 @@ fn main() {
|
|||
genesis.compute_unit_limit(compute_unit_limit);
|
||||
}
|
||||
|
||||
match genesis.start_with_mint_address(mint_address, socket_addr_space) {
|
||||
match genesis.start_with_mint_address_and_geyser_plugin_rpc(
|
||||
mint_address,
|
||||
socket_addr_space,
|
||||
rpc_to_plugin_manager_receiver,
|
||||
) {
|
||||
Ok(test_validator) => {
|
||||
if let Some(dashboard) = dashboard {
|
||||
dashboard.run(Duration::from_millis(250));
|
||||
|
|
|
@ -1465,6 +1465,48 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
|
|||
SubCommand::with_name("run")
|
||||
.about("Run the validator")
|
||||
)
|
||||
.subcommand(
|
||||
SubCommand::with_name("plugin")
|
||||
.about("Manage and view geyser plugins")
|
||||
.setting(AppSettings::SubcommandRequiredElseHelp)
|
||||
.setting(AppSettings::InferSubcommands)
|
||||
.subcommand(
|
||||
SubCommand::with_name("list")
|
||||
.about("List all current running gesyer plugins")
|
||||
)
|
||||
.subcommand(
|
||||
SubCommand::with_name("unload")
|
||||
.about("Unload a particular gesyer plugin. You must specify the gesyer plugin name")
|
||||
.arg(
|
||||
Arg::with_name("name")
|
||||
.required(true)
|
||||
.takes_value(true)
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
SubCommand::with_name("reload")
|
||||
.about("Reload a particular gesyer plugin. You must specify the gesyer plugin name and the new config path")
|
||||
.arg(
|
||||
Arg::with_name("name")
|
||||
.required(true)
|
||||
.takes_value(true)
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("config")
|
||||
.required(true)
|
||||
.takes_value(true)
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
SubCommand::with_name("load")
|
||||
.about("Load a new gesyer plugin. You must specify the config path. Fails if overwriting (use reload)")
|
||||
.arg(
|
||||
Arg::with_name("config")
|
||||
.required(true)
|
||||
.takes_value(true)
|
||||
)
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
SubCommand::with_name("set-identity")
|
||||
.about("Set the validator identity")
|
||||
|
|
|
@ -4,6 +4,7 @@ use jemallocator::Jemalloc;
|
|||
use {
|
||||
clap::{crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit, ArgMatches},
|
||||
console::style,
|
||||
crossbeam_channel::unbounded,
|
||||
log::*,
|
||||
rand::{seq::SliceRandom, thread_rng},
|
||||
solana_clap_utils::input_parsers::{keypair_of, keypairs_of, pubkey_of, value_of},
|
||||
|
@ -536,6 +537,79 @@ pub fn main() {
|
|||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
("plugin", Some(plugin_subcommand_matches)) => {
|
||||
match plugin_subcommand_matches.subcommand() {
|
||||
("list", _) => {
|
||||
let admin_client = admin_rpc_service::connect(&ledger_path);
|
||||
let plugins = admin_rpc_service::runtime()
|
||||
.block_on(async move { admin_client.await?.list_plugins().await })
|
||||
.unwrap_or_else(|err| {
|
||||
println!("Failed to list plugins: {err}");
|
||||
exit(1);
|
||||
});
|
||||
if !plugins.is_empty() {
|
||||
println!("Currently the following plugins are loaded:");
|
||||
for (plugin, i) in plugins.into_iter().zip(1..) {
|
||||
println!(" {i}) {plugin}");
|
||||
}
|
||||
} else {
|
||||
println!("There are currently no plugins loaded");
|
||||
}
|
||||
return;
|
||||
}
|
||||
("unload", Some(subcommand_matches)) => {
|
||||
if let Some(name) = value_t!(subcommand_matches, "name", String).ok() {
|
||||
let admin_client = admin_rpc_service::connect(&ledger_path);
|
||||
admin_rpc_service::runtime()
|
||||
.block_on(async {
|
||||
admin_client.await?.unload_plugin(name.clone()).await
|
||||
})
|
||||
.unwrap_or_else(|err| {
|
||||
println!("Failed to unload plugin {name}: {err:?}");
|
||||
exit(1);
|
||||
});
|
||||
println!("Successfully unloaded plugin: {name}");
|
||||
}
|
||||
return;
|
||||
}
|
||||
("load", Some(subcommand_matches)) => {
|
||||
if let Some(config) = value_t!(subcommand_matches, "config", String).ok() {
|
||||
let admin_client = admin_rpc_service::connect(&ledger_path);
|
||||
let name = admin_rpc_service::runtime()
|
||||
.block_on(async {
|
||||
admin_client.await?.load_plugin(config.clone()).await
|
||||
})
|
||||
.unwrap_or_else(|err| {
|
||||
println!("Failed to load plugin {config}: {err:?}");
|
||||
exit(1);
|
||||
});
|
||||
println!("Successfully loaded plugin: {name}");
|
||||
}
|
||||
return;
|
||||
}
|
||||
("reload", Some(subcommand_matches)) => {
|
||||
if let Some(name) = value_t!(subcommand_matches, "name", String).ok() {
|
||||
if let Some(config) = value_t!(subcommand_matches, "config", String).ok() {
|
||||
let admin_client = admin_rpc_service::connect(&ledger_path);
|
||||
admin_rpc_service::runtime()
|
||||
.block_on(async {
|
||||
admin_client
|
||||
.await?
|
||||
.reload_plugin(name.clone(), config.clone())
|
||||
.await
|
||||
})
|
||||
.unwrap_or_else(|err| {
|
||||
println!("Failed to reload plugin {name}: {err:?}");
|
||||
exit(1);
|
||||
});
|
||||
println!("Successfully reloaded plugin: {name}");
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
("contact-info", Some(subcommand_matches)) => {
|
||||
let output_mode = subcommand_matches.value_of("output");
|
||||
let admin_client = admin_rpc_service::connect(&ledger_path);
|
||||
|
@ -1058,7 +1132,7 @@ pub fn main() {
|
|||
|
||||
let accounts_db_config = Some(accounts_db_config);
|
||||
|
||||
let geyser_plugin_config_files = if matches.is_present("geyser_plugin_config") {
|
||||
let on_start_geyser_plugin_config_files = if matches.is_present("geyser_plugin_config") {
|
||||
Some(
|
||||
values_t_or_exit!(matches, "geyser_plugin_config", String)
|
||||
.into_iter()
|
||||
|
@ -1068,6 +1142,7 @@ pub fn main() {
|
|||
} else {
|
||||
None
|
||||
};
|
||||
let starting_with_geyser_plugins: bool = on_start_geyser_plugin_config_files.is_some();
|
||||
|
||||
let rpc_bigtable_config = if matches.is_present("enable_rpc_bigtable_ledger_storage")
|
||||
|| matches.is_present("enable_bigtable_ledger_upload")
|
||||
|
@ -1155,7 +1230,7 @@ pub fn main() {
|
|||
usize
|
||||
)),
|
||||
},
|
||||
geyser_plugin_config_files,
|
||||
on_start_geyser_plugin_config_files,
|
||||
rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| {
|
||||
(
|
||||
SocketAddr::new(rpc_bind_address, rpc_port),
|
||||
|
@ -1513,6 +1588,13 @@ pub fn main() {
|
|||
|
||||
let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
|
||||
let admin_service_post_init = Arc::new(RwLock::new(None));
|
||||
let (rpc_to_plugin_manager_sender, rpc_to_plugin_manager_receiver) =
|
||||
if starting_with_geyser_plugins {
|
||||
let (sender, receiver) = unbounded();
|
||||
(Some(sender), Some(receiver))
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
admin_rpc_service::run(
|
||||
&ledger_path,
|
||||
admin_rpc_service::AdminRpcRequestMetadata {
|
||||
|
@ -1524,6 +1606,7 @@ pub fn main() {
|
|||
post_init: admin_service_post_init.clone(),
|
||||
tower_storage: validator_config.tower_storage.clone(),
|
||||
staked_nodes_overrides,
|
||||
rpc_to_plugin_manager_sender,
|
||||
},
|
||||
);
|
||||
|
||||
|
@ -1682,6 +1765,7 @@ pub fn main() {
|
|||
cluster_entrypoints,
|
||||
&validator_config,
|
||||
should_check_duplicate_instance,
|
||||
rpc_to_plugin_manager_receiver,
|
||||
start_progress,
|
||||
socket_addr_space,
|
||||
tpu_use_quic,
|
||||
|
|
Loading…
Reference in New Issue