Geyser Runtime Reload (#30352)
Support dynamic geyser plugin load, unload, and listing through admin RPC.
This commit is contained in:
parent
9d792c1848
commit
10f49d4e26
|
@ -5634,6 +5634,8 @@ dependencies = [
|
||||||
"bs58",
|
"bs58",
|
||||||
"crossbeam-channel",
|
"crossbeam-channel",
|
||||||
"json5",
|
"json5",
|
||||||
|
"jsonrpc-core",
|
||||||
|
"jsonrpc-server-utils",
|
||||||
"libloading",
|
"libloading",
|
||||||
"log",
|
"log",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
@ -6831,12 +6833,14 @@ version = "1.16.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"base64 0.13.0",
|
"base64 0.13.0",
|
||||||
"bincode",
|
"bincode",
|
||||||
|
"crossbeam-channel",
|
||||||
"log",
|
"log",
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"solana-cli-output",
|
"solana-cli-output",
|
||||||
"solana-client",
|
"solana-client",
|
||||||
"solana-core",
|
"solana-core",
|
||||||
|
"solana-geyser-plugin-manager",
|
||||||
"solana-gossip",
|
"solana-gossip",
|
||||||
"solana-ledger",
|
"solana-ledger",
|
||||||
"solana-logger 1.16.0",
|
"solana-logger 1.16.0",
|
||||||
|
@ -7017,6 +7021,7 @@ dependencies = [
|
||||||
"jsonrpc-server-utils",
|
"jsonrpc-server-utils",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"libc",
|
"libc",
|
||||||
|
"libloading",
|
||||||
"log",
|
"log",
|
||||||
"num_cpus",
|
"num_cpus",
|
||||||
"rand 0.7.3",
|
"rand 0.7.3",
|
||||||
|
@ -7033,6 +7038,8 @@ dependencies = [
|
||||||
"solana-entry",
|
"solana-entry",
|
||||||
"solana-faucet",
|
"solana-faucet",
|
||||||
"solana-genesis-utils",
|
"solana-genesis-utils",
|
||||||
|
"solana-geyser-plugin-interface",
|
||||||
|
"solana-geyser-plugin-manager",
|
||||||
"solana-gossip",
|
"solana-gossip",
|
||||||
"solana-ledger",
|
"solana-ledger",
|
||||||
"solana-logger 1.16.0",
|
"solana-logger 1.16.0",
|
||||||
|
|
|
@ -31,7 +31,9 @@ use {
|
||||||
rand::{thread_rng, Rng},
|
rand::{thread_rng, Rng},
|
||||||
solana_client::connection_cache::ConnectionCache,
|
solana_client::connection_cache::ConnectionCache,
|
||||||
solana_entry::poh::compute_hash_time_ns,
|
solana_entry::poh::compute_hash_time_ns,
|
||||||
solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService,
|
solana_geyser_plugin_manager::{
|
||||||
|
geyser_plugin_service::GeyserPluginService, GeyserPluginManagerRequest,
|
||||||
|
},
|
||||||
solana_gossip::{
|
solana_gossip::{
|
||||||
cluster_info::{
|
cluster_info::{
|
||||||
ClusterInfo, Node, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
|
ClusterInfo, Node, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
|
||||||
|
@ -128,7 +130,8 @@ pub struct ValidatorConfig {
|
||||||
pub account_paths: Vec<PathBuf>,
|
pub account_paths: Vec<PathBuf>,
|
||||||
pub account_shrink_paths: Option<Vec<PathBuf>>,
|
pub account_shrink_paths: Option<Vec<PathBuf>>,
|
||||||
pub rpc_config: JsonRpcConfig,
|
pub rpc_config: JsonRpcConfig,
|
||||||
pub geyser_plugin_config_files: Option<Vec<PathBuf>>,
|
/// Specifies which plugins to start up with
|
||||||
|
pub on_start_geyser_plugin_config_files: Option<Vec<PathBuf>>,
|
||||||
pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub)
|
pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub)
|
||||||
pub pubsub_config: PubSubConfig,
|
pub pubsub_config: PubSubConfig,
|
||||||
pub snapshot_config: SnapshotConfig,
|
pub snapshot_config: SnapshotConfig,
|
||||||
|
@ -192,7 +195,7 @@ impl Default for ValidatorConfig {
|
||||||
account_paths: Vec::new(),
|
account_paths: Vec::new(),
|
||||||
account_shrink_paths: None,
|
account_shrink_paths: None,
|
||||||
rpc_config: JsonRpcConfig::default(),
|
rpc_config: JsonRpcConfig::default(),
|
||||||
geyser_plugin_config_files: None,
|
on_start_geyser_plugin_config_files: None,
|
||||||
rpc_addrs: None,
|
rpc_addrs: None,
|
||||||
pubsub_config: PubSubConfig::default(),
|
pubsub_config: PubSubConfig::default(),
|
||||||
snapshot_config: SnapshotConfig::new_load_only(),
|
snapshot_config: SnapshotConfig::new_load_only(),
|
||||||
|
@ -392,6 +395,7 @@ impl Validator {
|
||||||
cluster_entrypoints: Vec<ContactInfo>,
|
cluster_entrypoints: Vec<ContactInfo>,
|
||||||
config: &ValidatorConfig,
|
config: &ValidatorConfig,
|
||||||
should_check_duplicate_instance: bool,
|
should_check_duplicate_instance: bool,
|
||||||
|
rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
|
||||||
start_progress: Arc<RwLock<ValidatorStartProgress>>,
|
start_progress: Arc<RwLock<ValidatorStartProgress>>,
|
||||||
socket_addr_space: SocketAddrSpace,
|
socket_addr_space: SocketAddrSpace,
|
||||||
use_quic: bool,
|
use_quic: bool,
|
||||||
|
@ -413,12 +417,19 @@ impl Validator {
|
||||||
|
|
||||||
let mut bank_notification_senders = Vec::new();
|
let mut bank_notification_senders = Vec::new();
|
||||||
|
|
||||||
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
let geyser_plugin_service =
|
let geyser_plugin_service =
|
||||||
if let Some(geyser_plugin_config_files) = &config.geyser_plugin_config_files {
|
if let Some(geyser_plugin_config_files) = &config.on_start_geyser_plugin_config_files {
|
||||||
let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
|
let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
|
||||||
bank_notification_senders.push(confirmed_bank_sender);
|
bank_notification_senders.push(confirmed_bank_sender);
|
||||||
let result =
|
let rpc_to_plugin_manager_receiver_and_exit =
|
||||||
GeyserPluginService::new(confirmed_bank_receiver, geyser_plugin_config_files);
|
rpc_to_plugin_manager_receiver.map(|receiver| (receiver, exit.clone()));
|
||||||
|
let result = GeyserPluginService::new_with_receiver(
|
||||||
|
confirmed_bank_receiver,
|
||||||
|
geyser_plugin_config_files,
|
||||||
|
rpc_to_plugin_manager_receiver_and_exit,
|
||||||
|
);
|
||||||
match result {
|
match result {
|
||||||
Ok(geyser_plugin_service) => Some(geyser_plugin_service),
|
Ok(geyser_plugin_service) => Some(geyser_plugin_service),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
@ -483,7 +494,6 @@ impl Validator {
|
||||||
start.stop();
|
start.stop();
|
||||||
info!("done. {}", start);
|
info!("done. {}", start);
|
||||||
|
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
|
||||||
{
|
{
|
||||||
let exit = exit.clone();
|
let exit = exit.clone();
|
||||||
config
|
config
|
||||||
|
@ -2176,6 +2186,7 @@ mod tests {
|
||||||
vec![LegacyContactInfo::try_from(&leader_node.info).unwrap()],
|
vec![LegacyContactInfo::try_from(&leader_node.info).unwrap()],
|
||||||
&config,
|
&config,
|
||||||
true, // should_check_duplicate_instance
|
true, // should_check_duplicate_instance
|
||||||
|
None, // rpc_to_plugin_manager_receiver
|
||||||
start_progress.clone(),
|
start_progress.clone(),
|
||||||
SocketAddrSpace::Unspecified,
|
SocketAddrSpace::Unspecified,
|
||||||
DEFAULT_TPU_USE_QUIC,
|
DEFAULT_TPU_USE_QUIC,
|
||||||
|
@ -2273,7 +2284,8 @@ mod tests {
|
||||||
Arc::new(RwLock::new(vec![Arc::new(vote_account_keypair)])),
|
Arc::new(RwLock::new(vec![Arc::new(vote_account_keypair)])),
|
||||||
vec![LegacyContactInfo::try_from(&leader_node.info).unwrap()],
|
vec![LegacyContactInfo::try_from(&leader_node.info).unwrap()],
|
||||||
&config,
|
&config,
|
||||||
true, // should_check_duplicate_instance
|
true, // should_check_duplicate_instance.
|
||||||
|
None, // rpc_to_plugin_manager_receiver
|
||||||
Arc::new(RwLock::new(ValidatorStartProgress::default())),
|
Arc::new(RwLock::new(ValidatorStartProgress::default())),
|
||||||
SocketAddrSpace::Unspecified,
|
SocketAddrSpace::Unspecified,
|
||||||
DEFAULT_TPU_USE_QUIC,
|
DEFAULT_TPU_USE_QUIC,
|
||||||
|
|
|
@ -10,9 +10,12 @@ license = { workspace = true }
|
||||||
edition = { workspace = true }
|
edition = { workspace = true }
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|
||||||
bs58 = { workspace = true }
|
bs58 = { workspace = true }
|
||||||
crossbeam-channel = { workspace = true }
|
crossbeam-channel = { workspace = true }
|
||||||
json5 = { workspace = true }
|
json5 = { workspace = true }
|
||||||
|
jsonrpc-core = { workspace = true }
|
||||||
|
jsonrpc-server-utils = { workspace = true }
|
||||||
libloading = { workspace = true }
|
libloading = { workspace = true }
|
||||||
log = { workspace = true }
|
log = { workspace = true }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
/// Managing the Geyser plugins
|
|
||||||
use {
|
use {
|
||||||
libloading::{Library, Symbol},
|
jsonrpc_core::{ErrorCode, Result as JsonRpcResult},
|
||||||
|
jsonrpc_server_utils::tokio::sync::oneshot::Sender as OneShotSender,
|
||||||
|
libloading::Library,
|
||||||
log::*,
|
log::*,
|
||||||
solana_geyser_plugin_interface::geyser_plugin_interface::GeyserPlugin,
|
solana_geyser_plugin_interface::geyser_plugin_interface::GeyserPlugin,
|
||||||
std::error::Error,
|
std::path::Path,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
|
@ -20,26 +21,6 @@ impl GeyserPluginManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// # Safety
|
|
||||||
///
|
|
||||||
/// This function loads the dynamically linked library specified in the path. The library
|
|
||||||
/// must do necessary initializations.
|
|
||||||
pub unsafe fn load_plugin(
|
|
||||||
&mut self,
|
|
||||||
libpath: &str,
|
|
||||||
config_file: &str,
|
|
||||||
) -> Result<(), Box<dyn Error>> {
|
|
||||||
type PluginConstructor = unsafe fn() -> *mut dyn GeyserPlugin;
|
|
||||||
let lib = Library::new(libpath)?;
|
|
||||||
let constructor: Symbol<PluginConstructor> = lib.get(b"_create_plugin")?;
|
|
||||||
let plugin_raw = constructor();
|
|
||||||
let mut plugin = Box::from_raw(plugin_raw);
|
|
||||||
plugin.on_load(config_file)?;
|
|
||||||
self.plugins.push(plugin);
|
|
||||||
self.libs.push(lib);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Unload all plugins and loaded plugin libraries, making sure to fire
|
/// Unload all plugins and loaded plugin libraries, making sure to fire
|
||||||
/// their `on_plugin_unload()` methods so they can do any necessary cleanup.
|
/// their `on_plugin_unload()` methods so they can do any necessary cleanup.
|
||||||
pub fn unload(&mut self) {
|
pub fn unload(&mut self) {
|
||||||
|
@ -72,4 +53,401 @@ impl GeyserPluginManager {
|
||||||
}
|
}
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Admin RPC request handler
|
||||||
|
pub(crate) fn list_plugins(&self) -> JsonRpcResult<Vec<String>> {
|
||||||
|
Ok(self.plugins.iter().map(|p| p.name().to_owned()).collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Admin RPC request handler
|
||||||
|
/// # Safety
|
||||||
|
///
|
||||||
|
/// This function loads the dynamically linked library specified in the path. The library
|
||||||
|
/// must do necessary initializations.
|
||||||
|
///
|
||||||
|
/// The string returned is the name of the plugin loaded, which can only be accessed once
|
||||||
|
/// the plugin has been loaded and calling the name method.
|
||||||
|
pub(crate) fn load_plugin(
|
||||||
|
&mut self,
|
||||||
|
geyser_plugin_config_file: impl AsRef<Path>,
|
||||||
|
) -> JsonRpcResult<String> {
|
||||||
|
// First load plugin
|
||||||
|
let (mut new_plugin, new_lib, new_config_file) =
|
||||||
|
load_plugin_from_config(geyser_plugin_config_file.as_ref()).map_err(|e| {
|
||||||
|
jsonrpc_core::Error {
|
||||||
|
code: ErrorCode::InvalidRequest,
|
||||||
|
message: format!("Failed to load plugin: {e}"),
|
||||||
|
data: None,
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// Then see if a plugin with this name already exists. If so, abort
|
||||||
|
if self
|
||||||
|
.plugins
|
||||||
|
.iter()
|
||||||
|
.any(|plugin| plugin.name().eq(new_plugin.name()))
|
||||||
|
{
|
||||||
|
return Err(jsonrpc_core::Error {
|
||||||
|
code: ErrorCode::InvalidRequest,
|
||||||
|
message: format!(
|
||||||
|
"There already exists a plugin named {} loaded. Did not load requested plugin",
|
||||||
|
new_plugin.name()
|
||||||
|
),
|
||||||
|
data: None,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call on_load and push plugin
|
||||||
|
new_plugin
|
||||||
|
.on_load(new_config_file)
|
||||||
|
.map_err(|on_load_err| jsonrpc_core::Error {
|
||||||
|
code: ErrorCode::InvalidRequest,
|
||||||
|
message: format!(
|
||||||
|
"on_load method of plugin {} failed: {on_load_err}",
|
||||||
|
new_plugin.name()
|
||||||
|
),
|
||||||
|
data: None,
|
||||||
|
})?;
|
||||||
|
let name = new_plugin.name().to_string();
|
||||||
|
self.plugins.push(new_plugin);
|
||||||
|
self.libs.push(new_lib);
|
||||||
|
|
||||||
|
Ok(name)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn unload_plugin(&mut self, name: &str) -> JsonRpcResult<()> {
|
||||||
|
// Check if any plugin names match this one
|
||||||
|
let Some(idx) = self.plugins.iter().position(|plugin| plugin.name().eq(name)) else {
|
||||||
|
// If we don't find one return an error
|
||||||
|
return Err(
|
||||||
|
jsonrpc_core::error::Error {
|
||||||
|
code: ErrorCode::InvalidRequest,
|
||||||
|
message: String::from("The plugin you requested to unload is not loaded"),
|
||||||
|
data: None,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
// Unload and drop plugin and lib
|
||||||
|
self._drop_plugin(idx);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Checks for a plugin with a given `name`.
|
||||||
|
/// If it exists, first unload it.
|
||||||
|
/// Then, attempt to load a new plugin
|
||||||
|
pub(crate) fn reload_plugin(&mut self, name: &str, config_file: &str) -> JsonRpcResult<()> {
|
||||||
|
// Check if any plugin names match this one
|
||||||
|
let Some(idx) = self.plugins.iter().position(|plugin| plugin.name().eq(name)) else {
|
||||||
|
// If we don't find one return an error
|
||||||
|
return Err(
|
||||||
|
jsonrpc_core::error::Error {
|
||||||
|
code: ErrorCode::InvalidRequest,
|
||||||
|
message: String::from("The plugin you requested to reload is not loaded"),
|
||||||
|
data: None,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
// Unload and drop current plugin first in case plugin requires exclusive access to resource,
|
||||||
|
// such as a particular port or database.
|
||||||
|
self._drop_plugin(idx);
|
||||||
|
|
||||||
|
// Try to load plugin, library
|
||||||
|
// SAFETY: It is up to the validator to ensure this is a valid plugin library.
|
||||||
|
let (mut new_plugin, new_lib, new_parsed_config_file) =
|
||||||
|
load_plugin_from_config(config_file.as_ref()).map_err(|err| jsonrpc_core::Error {
|
||||||
|
code: ErrorCode::InvalidRequest,
|
||||||
|
message: err.to_string(),
|
||||||
|
data: None,
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// Attempt to on_load with new plugin
|
||||||
|
match new_plugin.on_load(new_parsed_config_file) {
|
||||||
|
// On success, push plugin and library
|
||||||
|
Ok(()) => {
|
||||||
|
self.plugins.push(new_plugin);
|
||||||
|
self.libs.push(new_lib);
|
||||||
|
}
|
||||||
|
|
||||||
|
// On failure, return error
|
||||||
|
Err(err) => {
|
||||||
|
return Err(jsonrpc_core::error::Error {
|
||||||
|
code: ErrorCode::InvalidRequest,
|
||||||
|
message: format!(
|
||||||
|
"Failed to start new plugin (previous plugin was dropped!): {err}"
|
||||||
|
),
|
||||||
|
data: None,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn _drop_plugin(&mut self, idx: usize) {
|
||||||
|
let mut current_plugin = self.plugins.remove(idx);
|
||||||
|
let _current_lib = self.libs.remove(idx);
|
||||||
|
current_plugin.on_unload();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum GeyserPluginManagerRequest {
|
||||||
|
ReloadPlugin {
|
||||||
|
name: String,
|
||||||
|
config_file: String,
|
||||||
|
response_sender: OneShotSender<JsonRpcResult<()>>,
|
||||||
|
},
|
||||||
|
UnloadPlugin {
|
||||||
|
name: String,
|
||||||
|
response_sender: OneShotSender<JsonRpcResult<()>>,
|
||||||
|
},
|
||||||
|
LoadPlugin {
|
||||||
|
config_file: String,
|
||||||
|
response_sender: OneShotSender<JsonRpcResult<String>>,
|
||||||
|
},
|
||||||
|
ListPlugins {
|
||||||
|
response_sender: OneShotSender<JsonRpcResult<Vec<String>>>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(thiserror::Error, Debug)]
|
||||||
|
pub enum GeyserPluginManagerError {
|
||||||
|
#[error("Cannot open the the plugin config file")]
|
||||||
|
CannotOpenConfigFile(String),
|
||||||
|
|
||||||
|
#[error("Cannot read the the plugin config file")]
|
||||||
|
CannotReadConfigFile(String),
|
||||||
|
|
||||||
|
#[error("The config file is not in a valid Json format")]
|
||||||
|
InvalidConfigFileFormat(String),
|
||||||
|
|
||||||
|
#[error("Plugin library path is not specified in the config file")]
|
||||||
|
LibPathNotSet,
|
||||||
|
|
||||||
|
#[error("Invalid plugin path")]
|
||||||
|
InvalidPluginPath,
|
||||||
|
|
||||||
|
#[error("Cannot load plugin shared library")]
|
||||||
|
PluginLoadError(String),
|
||||||
|
|
||||||
|
#[error("The geyser plugin {0} is already loaded shared library")]
|
||||||
|
PluginAlreadyLoaded(String),
|
||||||
|
|
||||||
|
#[error("The GeyserPlugin on_load method failed")]
|
||||||
|
PluginStartError(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// # Safety
|
||||||
|
///
|
||||||
|
/// This function loads the dynamically linked library specified in the path. The library
|
||||||
|
/// must do necessary initializations.
|
||||||
|
///
|
||||||
|
/// This returns the geyser plugin, the dynamic library, and the parsed config file as a &str.
|
||||||
|
/// (The geyser plugin interface requires a &str for the on_load method).
|
||||||
|
#[cfg(not(test))]
|
||||||
|
pub(crate) fn load_plugin_from_config(
|
||||||
|
geyser_plugin_config_file: &Path,
|
||||||
|
) -> Result<(Box<dyn GeyserPlugin>, Library, &str), GeyserPluginManagerError> {
|
||||||
|
use std::{fs::File, io::Read, path::PathBuf};
|
||||||
|
type PluginConstructor = unsafe fn() -> *mut dyn GeyserPlugin;
|
||||||
|
use libloading::Symbol;
|
||||||
|
|
||||||
|
let mut file = match File::open(geyser_plugin_config_file) {
|
||||||
|
Ok(file) => file,
|
||||||
|
Err(err) => {
|
||||||
|
return Err(GeyserPluginManagerError::CannotOpenConfigFile(format!(
|
||||||
|
"Failed to open the plugin config file {geyser_plugin_config_file:?}, error: {err:?}"
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut contents = String::new();
|
||||||
|
if let Err(err) = file.read_to_string(&mut contents) {
|
||||||
|
return Err(GeyserPluginManagerError::CannotReadConfigFile(format!(
|
||||||
|
"Failed to read the plugin config file {geyser_plugin_config_file:?}, error: {err:?}"
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
let result: serde_json::Value = match json5::from_str(&contents) {
|
||||||
|
Ok(value) => value,
|
||||||
|
Err(err) => {
|
||||||
|
return Err(GeyserPluginManagerError::InvalidConfigFileFormat(format!(
|
||||||
|
"The config file {geyser_plugin_config_file:?} is not in a valid Json5 format, error: {err:?}"
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let libpath = result["libpath"]
|
||||||
|
.as_str()
|
||||||
|
.ok_or(GeyserPluginManagerError::LibPathNotSet)?;
|
||||||
|
let mut libpath = PathBuf::from(libpath);
|
||||||
|
if libpath.is_relative() {
|
||||||
|
let config_dir = geyser_plugin_config_file.parent().ok_or_else(|| {
|
||||||
|
GeyserPluginManagerError::CannotOpenConfigFile(format!(
|
||||||
|
"Failed to resolve parent of {geyser_plugin_config_file:?}",
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
libpath = config_dir.join(libpath);
|
||||||
|
}
|
||||||
|
|
||||||
|
let config_file = geyser_plugin_config_file
|
||||||
|
.as_os_str()
|
||||||
|
.to_str()
|
||||||
|
.ok_or(GeyserPluginManagerError::InvalidPluginPath)?;
|
||||||
|
|
||||||
|
let (plugin, lib) = unsafe {
|
||||||
|
let lib = Library::new(libpath)
|
||||||
|
.map_err(|e| GeyserPluginManagerError::PluginLoadError(e.to_string()))?;
|
||||||
|
let constructor: Symbol<PluginConstructor> = lib
|
||||||
|
.get(b"_create_plugin")
|
||||||
|
.map_err(|e| GeyserPluginManagerError::PluginLoadError(e.to_string()))?;
|
||||||
|
let plugin_raw = constructor();
|
||||||
|
(Box::from_raw(plugin_raw), lib)
|
||||||
|
};
|
||||||
|
Ok((plugin, lib, config_file))
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is mocked for tests to avoid having to do IO with a dynamically linked library
|
||||||
|
// across different architectures at test time
|
||||||
|
//
|
||||||
|
/// This returns mocked values for the geyser plugin, the dynamic library, and the parsed config file as a &str.
|
||||||
|
/// (The geyser plugin interface requires a &str for the on_load method).
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(crate) fn load_plugin_from_config(
|
||||||
|
_geyser_plugin_config_file: &Path,
|
||||||
|
) -> Result<(Box<dyn GeyserPlugin>, Library, &str), GeyserPluginManagerError> {
|
||||||
|
Ok(tests::dummy_plugin_and_library())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use {
|
||||||
|
crate::geyser_plugin_manager::GeyserPluginManager,
|
||||||
|
libloading::Library,
|
||||||
|
solana_geyser_plugin_interface::geyser_plugin_interface::GeyserPlugin,
|
||||||
|
std::sync::{Arc, RwLock},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub(super) fn dummy_plugin_and_library() -> (Box<dyn GeyserPlugin>, Library, &'static str) {
|
||||||
|
let plugin = Box::new(TestPlugin);
|
||||||
|
let lib = {
|
||||||
|
let handle: *mut std::os::raw::c_void = &mut () as *mut _ as *mut std::os::raw::c_void;
|
||||||
|
// SAFETY: all calls to get Symbols should fail, so this is actually safe
|
||||||
|
let inner_lib = unsafe { libloading::os::unix::Library::from_raw(handle) };
|
||||||
|
Library::from(inner_lib)
|
||||||
|
};
|
||||||
|
(plugin, lib, DUMMY_CONFIG)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn dummy_plugin_and_library2() -> (Box<dyn GeyserPlugin>, Library, &'static str) {
|
||||||
|
let plugin = Box::new(TestPlugin2);
|
||||||
|
let lib = {
|
||||||
|
let handle: *mut std::os::raw::c_void = &mut () as *mut _ as *mut std::os::raw::c_void;
|
||||||
|
// SAFETY: all calls to get Symbols should fail, so this is actually safe
|
||||||
|
let inner_lib = unsafe { libloading::os::unix::Library::from_raw(handle) };
|
||||||
|
Library::from(inner_lib)
|
||||||
|
};
|
||||||
|
(plugin, lib, DUMMY_CONFIG)
|
||||||
|
}
|
||||||
|
|
||||||
|
const DUMMY_NAME: &str = "dummy";
|
||||||
|
pub(super) const DUMMY_CONFIG: &str = "dummy_config";
|
||||||
|
const ANOTHER_DUMMY_NAME: &str = "another_dummy";
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(super) struct TestPlugin;
|
||||||
|
|
||||||
|
impl GeyserPlugin for TestPlugin {
|
||||||
|
fn name(&self) -> &'static str {
|
||||||
|
DUMMY_NAME
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(super) struct TestPlugin2;
|
||||||
|
|
||||||
|
impl GeyserPlugin for TestPlugin2 {
|
||||||
|
fn name(&self) -> &'static str {
|
||||||
|
ANOTHER_DUMMY_NAME
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_geyser_reload() {
|
||||||
|
// Initialize empty manager
|
||||||
|
let plugin_manager = Arc::new(RwLock::new(GeyserPluginManager::new()));
|
||||||
|
|
||||||
|
// No plugins are loaded, this should fail
|
||||||
|
let mut plugin_manager_lock = plugin_manager.write().unwrap();
|
||||||
|
let reload_result = plugin_manager_lock.reload_plugin(DUMMY_NAME, DUMMY_CONFIG);
|
||||||
|
assert_eq!(
|
||||||
|
reload_result.unwrap_err().message,
|
||||||
|
"The plugin you requested to reload is not loaded"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Mock having loaded plugin (TestPlugin)
|
||||||
|
let (mut plugin, lib, config) = dummy_plugin_and_library();
|
||||||
|
plugin.on_load(config).unwrap();
|
||||||
|
plugin_manager_lock.plugins.push(plugin);
|
||||||
|
plugin_manager_lock.libs.push(lib);
|
||||||
|
// plugin_manager_lock.libs.push(lib);
|
||||||
|
assert_eq!(plugin_manager_lock.plugins[0].name(), DUMMY_NAME);
|
||||||
|
plugin_manager_lock.plugins[0].name();
|
||||||
|
|
||||||
|
// Try wrong name (same error)
|
||||||
|
const WRONG_NAME: &str = "wrong_name";
|
||||||
|
let reload_result = plugin_manager_lock.reload_plugin(WRONG_NAME, DUMMY_CONFIG);
|
||||||
|
assert_eq!(
|
||||||
|
reload_result.unwrap_err().message,
|
||||||
|
"The plugin you requested to reload is not loaded"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Now try a (dummy) reload, replacing TestPlugin with TestPlugin2
|
||||||
|
let reload_result = plugin_manager_lock.reload_plugin(DUMMY_NAME, DUMMY_CONFIG);
|
||||||
|
assert!(reload_result.is_ok());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_plugin_list() {
|
||||||
|
// Initialize empty manager
|
||||||
|
let plugin_manager = Arc::new(RwLock::new(GeyserPluginManager::new()));
|
||||||
|
let mut plugin_manager_lock = plugin_manager.write().unwrap();
|
||||||
|
|
||||||
|
// Load two plugins
|
||||||
|
// First
|
||||||
|
let (mut plugin, lib, config) = dummy_plugin_and_library();
|
||||||
|
plugin.on_load(config).unwrap();
|
||||||
|
plugin_manager_lock.plugins.push(plugin);
|
||||||
|
plugin_manager_lock.libs.push(lib);
|
||||||
|
// Second
|
||||||
|
let (mut plugin, lib, config) = dummy_plugin_and_library2();
|
||||||
|
plugin.on_load(config).unwrap();
|
||||||
|
plugin_manager_lock.plugins.push(plugin);
|
||||||
|
plugin_manager_lock.libs.push(lib);
|
||||||
|
|
||||||
|
// Check that both plugins are returned in the list
|
||||||
|
let plugins = plugin_manager_lock.list_plugins().unwrap();
|
||||||
|
assert!(plugins.iter().any(|name| name.eq(DUMMY_NAME)));
|
||||||
|
assert!(plugins.iter().any(|name| name.eq(ANOTHER_DUMMY_NAME)));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_plugin_load_unload() {
|
||||||
|
// Initialize empty manager
|
||||||
|
let plugin_manager = Arc::new(RwLock::new(GeyserPluginManager::new()));
|
||||||
|
let mut plugin_manager_lock = plugin_manager.write().unwrap();
|
||||||
|
|
||||||
|
// Load rpc call
|
||||||
|
let load_result = plugin_manager_lock.load_plugin(DUMMY_CONFIG);
|
||||||
|
assert!(load_result.is_ok());
|
||||||
|
assert_eq!(plugin_manager_lock.plugins.len(), 1);
|
||||||
|
|
||||||
|
// Unload rpc call
|
||||||
|
let unload_result = plugin_manager_lock.unload_plugin(DUMMY_NAME);
|
||||||
|
assert!(unload_result.is_ok());
|
||||||
|
assert_eq!(plugin_manager_lock.plugins.len(), 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,8 +3,10 @@ use {
|
||||||
accounts_update_notifier::AccountsUpdateNotifierImpl,
|
accounts_update_notifier::AccountsUpdateNotifierImpl,
|
||||||
block_metadata_notifier::BlockMetadataNotifierImpl,
|
block_metadata_notifier::BlockMetadataNotifierImpl,
|
||||||
block_metadata_notifier_interface::BlockMetadataNotifierLock,
|
block_metadata_notifier_interface::BlockMetadataNotifierLock,
|
||||||
geyser_plugin_manager::GeyserPluginManager, slot_status_notifier::SlotStatusNotifierImpl,
|
geyser_plugin_manager::{GeyserPluginManager, GeyserPluginManagerRequest},
|
||||||
slot_status_observer::SlotStatusObserver, transaction_notifier::TransactionNotifierImpl,
|
slot_status_notifier::SlotStatusNotifierImpl,
|
||||||
|
slot_status_observer::SlotStatusObserver,
|
||||||
|
transaction_notifier::TransactionNotifierImpl,
|
||||||
},
|
},
|
||||||
crossbeam_channel::Receiver,
|
crossbeam_channel::Receiver,
|
||||||
log::*,
|
log::*,
|
||||||
|
@ -14,36 +16,14 @@ use {
|
||||||
},
|
},
|
||||||
solana_runtime::accounts_update_notifier_interface::AccountsUpdateNotifier,
|
solana_runtime::accounts_update_notifier_interface::AccountsUpdateNotifier,
|
||||||
std::{
|
std::{
|
||||||
fs::File,
|
|
||||||
io::Read,
|
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
sync::{Arc, RwLock},
|
sync::{atomic::AtomicBool, Arc, RwLock},
|
||||||
thread,
|
thread,
|
||||||
|
time::Duration,
|
||||||
},
|
},
|
||||||
thiserror::Error,
|
thiserror::Error,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Error, Debug)]
|
|
||||||
pub enum GeyserPluginServiceError {
|
|
||||||
#[error("Cannot open the the plugin config file")]
|
|
||||||
CannotOpenConfigFile(String),
|
|
||||||
|
|
||||||
#[error("Cannot read the the plugin config file")]
|
|
||||||
CannotReadConfigFile(String),
|
|
||||||
|
|
||||||
#[error("The config file is not in a valid Json format")]
|
|
||||||
InvalidConfigFileFormat(String),
|
|
||||||
|
|
||||||
#[error("Plugin library path is not specified in the config file")]
|
|
||||||
LibPathNotSet,
|
|
||||||
|
|
||||||
#[error("Invalid plugin path")]
|
|
||||||
InvalidPluginPath,
|
|
||||||
|
|
||||||
#[error("Cannot load plugin shared library")]
|
|
||||||
PluginLoadError(String),
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The service managing the Geyser plugin workflow.
|
/// The service managing the Geyser plugin workflow.
|
||||||
pub struct GeyserPluginService {
|
pub struct GeyserPluginService {
|
||||||
slot_status_observer: Option<SlotStatusObserver>,
|
slot_status_observer: Option<SlotStatusObserver>,
|
||||||
|
@ -66,10 +46,20 @@ impl GeyserPluginService {
|
||||||
/// shall create the implementation of `GeyserPlugin` and returns to the caller.
|
/// shall create the implementation of `GeyserPlugin` and returns to the caller.
|
||||||
/// The rest of the JSON fields' definition is up to to the concrete plugin implementation
|
/// The rest of the JSON fields' definition is up to to the concrete plugin implementation
|
||||||
/// It is usually used to configure the connection information for the external data store.
|
/// It is usually used to configure the connection information for the external data store.
|
||||||
|
|
||||||
pub fn new(
|
pub fn new(
|
||||||
confirmed_bank_receiver: Receiver<BankNotification>,
|
confirmed_bank_receiver: Receiver<BankNotification>,
|
||||||
geyser_plugin_config_files: &[PathBuf],
|
geyser_plugin_config_files: &[PathBuf],
|
||||||
|
) -> Result<Self, GeyserPluginServiceError> {
|
||||||
|
Self::new_with_receiver(confirmed_bank_receiver, geyser_plugin_config_files, None)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn new_with_receiver(
|
||||||
|
confirmed_bank_receiver: Receiver<BankNotification>,
|
||||||
|
geyser_plugin_config_files: &[PathBuf],
|
||||||
|
rpc_to_plugin_manager_receiver_and_exit: Option<(
|
||||||
|
Receiver<GeyserPluginManagerRequest>,
|
||||||
|
Arc<AtomicBool>,
|
||||||
|
)>,
|
||||||
) -> Result<Self, GeyserPluginServiceError> {
|
) -> Result<Self, GeyserPluginServiceError> {
|
||||||
info!(
|
info!(
|
||||||
"Starting GeyserPluginService from config files: {:?}",
|
"Starting GeyserPluginService from config files: {:?}",
|
||||||
|
@ -80,10 +70,10 @@ impl GeyserPluginService {
|
||||||
for geyser_plugin_config_file in geyser_plugin_config_files {
|
for geyser_plugin_config_file in geyser_plugin_config_files {
|
||||||
Self::load_plugin(&mut plugin_manager, geyser_plugin_config_file)?;
|
Self::load_plugin(&mut plugin_manager, geyser_plugin_config_file)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let account_data_notifications_enabled =
|
let account_data_notifications_enabled =
|
||||||
plugin_manager.account_data_notifications_enabled();
|
plugin_manager.account_data_notifications_enabled();
|
||||||
let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled();
|
let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled();
|
||||||
|
|
||||||
let plugin_manager = Arc::new(RwLock::new(plugin_manager));
|
let plugin_manager = Arc::new(RwLock::new(plugin_manager));
|
||||||
|
|
||||||
let accounts_update_notifier: Option<AccountsUpdateNotifier> =
|
let accounts_update_notifier: Option<AccountsUpdateNotifier> =
|
||||||
|
@ -122,6 +112,12 @@ impl GeyserPluginService {
|
||||||
(None, None)
|
(None, None)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Initialize plugin manager rpc handler thread if needed
|
||||||
|
if let Some((request_receiver, exit)) = rpc_to_plugin_manager_receiver_and_exit {
|
||||||
|
let plugin_manager = plugin_manager.clone();
|
||||||
|
Self::start_manager_rpc_handler(plugin_manager, request_receiver, exit)
|
||||||
|
};
|
||||||
|
|
||||||
info!("Started GeyserPluginService");
|
info!("Started GeyserPluginService");
|
||||||
Ok(GeyserPluginService {
|
Ok(GeyserPluginService {
|
||||||
slot_status_observer,
|
slot_status_observer,
|
||||||
|
@ -136,56 +132,9 @@ impl GeyserPluginService {
|
||||||
plugin_manager: &mut GeyserPluginManager,
|
plugin_manager: &mut GeyserPluginManager,
|
||||||
geyser_plugin_config_file: &Path,
|
geyser_plugin_config_file: &Path,
|
||||||
) -> Result<(), GeyserPluginServiceError> {
|
) -> Result<(), GeyserPluginServiceError> {
|
||||||
let mut file = match File::open(geyser_plugin_config_file) {
|
plugin_manager
|
||||||
Ok(file) => file,
|
.load_plugin(geyser_plugin_config_file)
|
||||||
Err(err) => {
|
.map_err(|e| GeyserPluginServiceError::FailedToLoadPlugin(e.into()))?;
|
||||||
return Err(GeyserPluginServiceError::CannotOpenConfigFile(format!(
|
|
||||||
"Failed to open the plugin config file {geyser_plugin_config_file:?}, error: {err:?}"
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut contents = String::new();
|
|
||||||
if let Err(err) = file.read_to_string(&mut contents) {
|
|
||||||
return Err(GeyserPluginServiceError::CannotReadConfigFile(format!(
|
|
||||||
"Failed to read the plugin config file {geyser_plugin_config_file:?}, error: {err:?}"
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
let result: serde_json::Value = match json5::from_str(&contents) {
|
|
||||||
Ok(value) => value,
|
|
||||||
Err(err) => {
|
|
||||||
return Err(GeyserPluginServiceError::InvalidConfigFileFormat(format!(
|
|
||||||
"The config file {geyser_plugin_config_file:?} is not in a valid Json5 format, error: {err:?}"
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let libpath = result["libpath"]
|
|
||||||
.as_str()
|
|
||||||
.ok_or(GeyserPluginServiceError::LibPathNotSet)?;
|
|
||||||
let mut libpath = PathBuf::from(libpath);
|
|
||||||
if libpath.is_relative() {
|
|
||||||
let config_dir = geyser_plugin_config_file.parent().ok_or_else(|| {
|
|
||||||
GeyserPluginServiceError::CannotOpenConfigFile(format!(
|
|
||||||
"Failed to resolve parent of {geyser_plugin_config_file:?}",
|
|
||||||
))
|
|
||||||
})?;
|
|
||||||
libpath = config_dir.join(libpath);
|
|
||||||
}
|
|
||||||
|
|
||||||
let config_file = geyser_plugin_config_file
|
|
||||||
.as_os_str()
|
|
||||||
.to_str()
|
|
||||||
.ok_or(GeyserPluginServiceError::InvalidPluginPath)?;
|
|
||||||
|
|
||||||
unsafe {
|
|
||||||
let result = plugin_manager.load_plugin(libpath.to_str().unwrap(), config_file);
|
|
||||||
if let Err(err) = result {
|
|
||||||
let msg = format!("Failed to load the plugin library: {libpath:?}, error: {err:?}");
|
|
||||||
return Err(GeyserPluginServiceError::PluginLoadError(msg));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -208,4 +157,71 @@ impl GeyserPluginService {
|
||||||
self.plugin_manager.write().unwrap().unload();
|
self.plugin_manager.write().unwrap().unload();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn start_manager_rpc_handler(
|
||||||
|
plugin_manager: Arc<RwLock<GeyserPluginManager>>,
|
||||||
|
request_receiver: Receiver<GeyserPluginManagerRequest>,
|
||||||
|
exit: Arc<AtomicBool>,
|
||||||
|
) {
|
||||||
|
thread::Builder::new()
|
||||||
|
.name("SolGeyserPluginRpc".to_string())
|
||||||
|
.spawn(move || loop {
|
||||||
|
if let Ok(request) = request_receiver.recv_timeout(Duration::from_secs(5)) {
|
||||||
|
match request {
|
||||||
|
GeyserPluginManagerRequest::ListPlugins { response_sender } => {
|
||||||
|
let plugin_list = plugin_manager.read().unwrap().list_plugins();
|
||||||
|
response_sender
|
||||||
|
.send(plugin_list)
|
||||||
|
.expect("Admin rpc service will be waiting for response");
|
||||||
|
}
|
||||||
|
|
||||||
|
GeyserPluginManagerRequest::ReloadPlugin {
|
||||||
|
ref name,
|
||||||
|
ref config_file,
|
||||||
|
response_sender,
|
||||||
|
} => {
|
||||||
|
let reload_result = plugin_manager
|
||||||
|
.write()
|
||||||
|
.unwrap()
|
||||||
|
.reload_plugin(name, config_file);
|
||||||
|
response_sender
|
||||||
|
.send(reload_result)
|
||||||
|
.expect("Admin rpc service will be waiting for response");
|
||||||
|
}
|
||||||
|
|
||||||
|
GeyserPluginManagerRequest::LoadPlugin {
|
||||||
|
ref config_file,
|
||||||
|
response_sender,
|
||||||
|
} => {
|
||||||
|
let load_result =
|
||||||
|
plugin_manager.write().unwrap().load_plugin(config_file);
|
||||||
|
response_sender
|
||||||
|
.send(load_result)
|
||||||
|
.expect("Admin rpc service will be waiting for response");
|
||||||
|
}
|
||||||
|
|
||||||
|
GeyserPluginManagerRequest::UnloadPlugin {
|
||||||
|
ref name,
|
||||||
|
response_sender,
|
||||||
|
} => {
|
||||||
|
let unload_result = plugin_manager.write().unwrap().unload_plugin(name);
|
||||||
|
response_sender
|
||||||
|
.send(unload_result)
|
||||||
|
.expect("Admin rpc service will be waiting for response");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if exit.load(std::sync::atomic::Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Error, Debug)]
|
||||||
|
pub enum GeyserPluginServiceError {
|
||||||
|
#[error("Failed to load a geyser plugin")]
|
||||||
|
FailedToLoadPlugin(#[from] Box<dyn std::error::Error>),
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,3 +6,5 @@ pub mod geyser_plugin_service;
|
||||||
pub mod slot_status_notifier;
|
pub mod slot_status_notifier;
|
||||||
pub mod slot_status_observer;
|
pub mod slot_status_observer;
|
||||||
pub mod transaction_notifier;
|
pub mod transaction_notifier;
|
||||||
|
|
||||||
|
pub use geyser_plugin_manager::GeyserPluginManagerRequest;
|
||||||
|
|
|
@ -102,7 +102,7 @@ use {
|
||||||
fs::File,
|
fs::File,
|
||||||
io::{self, stdout, BufRead, BufReader, Write},
|
io::{self, stdout, BufRead, BufReader, Write},
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
process::{exit, Command, Stdio},
|
process::{Command, Stdio},
|
||||||
str::FromStr,
|
str::FromStr,
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicBool, Ordering},
|
atomic::{AtomicBool, Ordering},
|
||||||
|
@ -312,7 +312,7 @@ fn output_ledger(
|
||||||
.slot_meta_iterator(starting_slot)
|
.slot_meta_iterator(starting_slot)
|
||||||
.unwrap_or_else(|err| {
|
.unwrap_or_else(|err| {
|
||||||
eprintln!("Failed to load entries starting from slot {starting_slot}: {err:?}");
|
eprintln!("Failed to load entries starting from slot {starting_slot}: {err:?}");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
if method == LedgerOutputMethod::Json {
|
if method == LedgerOutputMethod::Json {
|
||||||
|
@ -929,7 +929,7 @@ fn open_blockstore(
|
||||||
error!("Blockstore is incompatible with current software and requires updates");
|
error!("Blockstore is incompatible with current software and requires updates");
|
||||||
if !force_update_to_open {
|
if !force_update_to_open {
|
||||||
error!("Use --force-update-to-open to allow blockstore to update");
|
error!("Use --force-update-to-open to allow blockstore to update");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
open_blockstore_with_temporary_primary_access(
|
open_blockstore_with_temporary_primary_access(
|
||||||
ledger_path,
|
ledger_path,
|
||||||
|
@ -941,12 +941,12 @@ fn open_blockstore(
|
||||||
"Failed to open blockstore (with --force-update-to-open) at {:?}: {:?}",
|
"Failed to open blockstore (with --force-update-to-open) at {:?}: {:?}",
|
||||||
ledger_path, err
|
ledger_path, err
|
||||||
);
|
);
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
eprintln!("Failed to open blockstore at {ledger_path:?}: {err:?}");
|
eprintln!("Failed to open blockstore at {ledger_path:?}: {err:?}");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1128,14 +1128,14 @@ fn load_bank_forks(
|
||||||
"Unable to load bank forks at slot {halt_slot} because it is less than the starting slot {starting_slot}. \
|
"Unable to load bank forks at slot {halt_slot} because it is less than the starting slot {starting_slot}. \
|
||||||
The starting slot will be the latest snapshot slot, or genesis if --no-snapshot flag specified or no snapshots found."
|
The starting slot will be the latest snapshot slot, or genesis if --no-snapshot flag specified or no snapshots found."
|
||||||
);
|
);
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
// Check if we have the slot data necessary to replay from starting_slot to >= halt_slot.
|
// Check if we have the slot data necessary to replay from starting_slot to >= halt_slot.
|
||||||
if !blockstore.slot_range_connected(starting_slot, halt_slot) {
|
if !blockstore.slot_range_connected(starting_slot, halt_slot) {
|
||||||
eprintln!(
|
eprintln!(
|
||||||
"Unable to load bank forks at slot {halt_slot} due to disconnected blocks.",
|
"Unable to load bank forks at slot {halt_slot} due to disconnected blocks.",
|
||||||
);
|
);
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1164,7 +1164,7 @@ fn load_bank_forks(
|
||||||
{
|
{
|
||||||
// Couldn't get Primary access, error out to be defensive.
|
// Couldn't get Primary access, error out to be defensive.
|
||||||
eprintln!("Error: custom accounts path is not supported under secondary access");
|
eprintln!("Error: custom accounts path is not supported under secondary access");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
account_paths.split(',').map(PathBuf::from).collect()
|
account_paths.split(',').map(PathBuf::from).collect()
|
||||||
|
@ -1187,7 +1187,7 @@ fn load_bank_forks(
|
||||||
Ok((account_run_path, _account_snapshot_path)) => account_run_path,
|
Ok((account_run_path, _account_snapshot_path)) => account_run_path,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
eprintln!("Unable to create account run and snapshot sub directories: {}, err: {err:?}", account_path.display());
|
eprintln!("Unable to create account run and snapshot sub directories: {}, err: {err:?}", account_path.display());
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}).collect();
|
}).collect();
|
||||||
|
@ -1207,6 +1207,7 @@ fn load_bank_forks(
|
||||||
|
|
||||||
let mut accounts_update_notifier = Option::<AccountsUpdateNotifier>::default();
|
let mut accounts_update_notifier = Option::<AccountsUpdateNotifier>::default();
|
||||||
let mut transaction_notifier = Option::<TransactionNotifierLock>::default();
|
let mut transaction_notifier = Option::<TransactionNotifierLock>::default();
|
||||||
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
if arg_matches.is_present("geyser_plugin_config") {
|
if arg_matches.is_present("geyser_plugin_config") {
|
||||||
let geyser_config_files = values_t_or_exit!(arg_matches, "geyser_plugin_config", String)
|
let geyser_config_files = values_t_or_exit!(arg_matches, "geyser_plugin_config", String)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
@ -1215,11 +1216,12 @@ fn load_bank_forks(
|
||||||
|
|
||||||
let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
|
let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
|
||||||
drop(confirmed_bank_sender);
|
drop(confirmed_bank_sender);
|
||||||
|
|
||||||
let geyser_service =
|
let geyser_service =
|
||||||
GeyserPluginService::new(confirmed_bank_receiver, &geyser_config_files).unwrap_or_else(
|
GeyserPluginService::new(confirmed_bank_receiver, &geyser_config_files).unwrap_or_else(
|
||||||
|err| {
|
|err| {
|
||||||
eprintln!("Failed to setup Geyser service: {err:?}");
|
eprintln!("Failed to setup Geyser service: {err:?}");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
accounts_update_notifier = geyser_service.get_accounts_update_notifier();
|
accounts_update_notifier = geyser_service.get_accounts_update_notifier();
|
||||||
|
@ -1257,7 +1259,6 @@ fn load_bank_forks(
|
||||||
snapshot_request_handler,
|
snapshot_request_handler,
|
||||||
pruned_banks_request_handler,
|
pruned_banks_request_handler,
|
||||||
};
|
};
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
|
||||||
let accounts_background_service = AccountsBackgroundService::new(
|
let accounts_background_service = AccountsBackgroundService::new(
|
||||||
bank_forks.clone(),
|
bank_forks.clone(),
|
||||||
&exit,
|
&exit,
|
||||||
|
@ -2509,7 +2510,7 @@ fn main() {
|
||||||
)
|
)
|
||||||
.unwrap_or_else(|err| {
|
.unwrap_or_else(|err| {
|
||||||
eprintln!("Failed to write genesis config: {err:?}");
|
eprintln!("Failed to write genesis config: {err:?}");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
println!("{}", open_genesis_config_by(&output_directory, arg_matches));
|
println!("{}", open_genesis_config_by(&output_directory, arg_matches));
|
||||||
|
@ -2555,7 +2556,7 @@ fn main() {
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
eprintln!("Failed to load ledger: {err:?}");
|
eprintln!("Failed to load ledger: {err:?}");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2632,7 +2633,7 @@ fn main() {
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
eprintln!("Failed to load ledger: {err:?}");
|
eprintln!("Failed to load ledger: {err:?}");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2864,7 +2865,7 @@ fn main() {
|
||||||
)
|
)
|
||||||
.unwrap_or_else(|err| {
|
.unwrap_or_else(|err| {
|
||||||
eprintln!("Ledger verification failed: {err:?}");
|
eprintln!("Ledger verification failed: {err:?}");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
});
|
});
|
||||||
if print_accounts_stats {
|
if print_accounts_stats {
|
||||||
let working_bank = bank_forks.read().unwrap().working_bank();
|
let working_bank = bank_forks.read().unwrap().working_bank();
|
||||||
|
@ -2925,7 +2926,7 @@ fn main() {
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
eprintln!("Failed to load ledger: {err:?}");
|
eprintln!("Failed to load ledger: {err:?}");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2968,7 +2969,7 @@ fn main() {
|
||||||
"Error: insufficient --bootstrap-validator-stake-lamports. \
|
"Error: insufficient --bootstrap-validator-stake-lamports. \
|
||||||
Minimum amount is {minimum_stake_lamports}"
|
Minimum amount is {minimum_stake_lamports}"
|
||||||
);
|
);
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
let bootstrap_validator_pubkeys = pubkeys_of(arg_matches, "bootstrap_validator");
|
let bootstrap_validator_pubkeys = pubkeys_of(arg_matches, "bootstrap_validator");
|
||||||
let accounts_to_remove =
|
let accounts_to_remove =
|
||||||
|
@ -2985,7 +2986,7 @@ fn main() {
|
||||||
|s| {
|
|s| {
|
||||||
s.parse::<SnapshotVersion>().unwrap_or_else(|e| {
|
s.parse::<SnapshotVersion>().unwrap_or_else(|e| {
|
||||||
eprintln!("Error: {e}");
|
eprintln!("Error: {e}");
|
||||||
exit(1)
|
std::process::exit(1)
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
@ -3032,7 +3033,7 @@ fn main() {
|
||||||
eprintln!(
|
eprintln!(
|
||||||
"Error: snapshot slot {snapshot_slot} does not exist in blockstore or is not full.",
|
"Error: snapshot slot {snapshot_slot} does not exist in blockstore or is not full.",
|
||||||
);
|
);
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
let ending_slot = if is_minimized {
|
let ending_slot = if is_minimized {
|
||||||
|
@ -3041,7 +3042,7 @@ fn main() {
|
||||||
eprintln!(
|
eprintln!(
|
||||||
"Error: ending_slot ({ending_slot}) must be greater than snapshot_slot ({snapshot_slot})"
|
"Error: ending_slot ({ending_slot}) must be greater than snapshot_slot ({snapshot_slot})"
|
||||||
);
|
);
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
Some(ending_slot)
|
Some(ending_slot)
|
||||||
|
@ -3086,7 +3087,7 @@ fn main() {
|
||||||
.get(snapshot_slot)
|
.get(snapshot_slot)
|
||||||
.unwrap_or_else(|| {
|
.unwrap_or_else(|| {
|
||||||
eprintln!("Error: Slot {snapshot_slot} is not available");
|
eprintln!("Error: Slot {snapshot_slot} is not available");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
let child_bank_required = rent_burn_percentage.is_ok()
|
let child_bank_required = rent_burn_percentage.is_ok()
|
||||||
|
@ -3141,7 +3142,7 @@ fn main() {
|
||||||
eprintln!(
|
eprintln!(
|
||||||
"Error: Account does not exist, unable to remove it: {address}"
|
"Error: Account does not exist, unable to remove it: {address}"
|
||||||
);
|
);
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
account.set_lamports(0);
|
account.set_lamports(0);
|
||||||
|
@ -3209,7 +3210,7 @@ fn main() {
|
||||||
eprintln!(
|
eprintln!(
|
||||||
"Error: --bootstrap-validator pubkeys cannot be duplicated"
|
"Error: --bootstrap-validator pubkeys cannot be duplicated"
|
||||||
);
|
);
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3283,7 +3284,7 @@ fn main() {
|
||||||
eprintln!(
|
eprintln!(
|
||||||
"Error: --warp-slot too close. Must be >= {minimum_warp_slot}"
|
"Error: --warp-slot too close. Must be >= {minimum_warp_slot}"
|
||||||
);
|
);
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
warn!("Warping to slot {}", minimum_warp_slot);
|
warn!("Warping to slot {}", minimum_warp_slot);
|
||||||
|
@ -3333,7 +3334,7 @@ fn main() {
|
||||||
if is_incremental {
|
if is_incremental {
|
||||||
if starting_snapshot_hashes.is_none() {
|
if starting_snapshot_hashes.is_none() {
|
||||||
eprintln!("Unable to create incremental snapshot without a base full snapshot");
|
eprintln!("Unable to create incremental snapshot without a base full snapshot");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
let full_snapshot_slot = starting_snapshot_hashes.unwrap().full.hash.0;
|
let full_snapshot_slot = starting_snapshot_hashes.unwrap().full.hash.0;
|
||||||
if bank.slot() <= full_snapshot_slot {
|
if bank.slot() <= full_snapshot_slot {
|
||||||
|
@ -3342,7 +3343,7 @@ fn main() {
|
||||||
bank.slot(),
|
bank.slot(),
|
||||||
full_snapshot_slot,
|
full_snapshot_slot,
|
||||||
);
|
);
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
let incremental_snapshot_archive_info =
|
let incremental_snapshot_archive_info =
|
||||||
|
@ -3359,7 +3360,7 @@ fn main() {
|
||||||
)
|
)
|
||||||
.unwrap_or_else(|err| {
|
.unwrap_or_else(|err| {
|
||||||
eprintln!("Unable to create incremental snapshot: {err}");
|
eprintln!("Unable to create incremental snapshot: {err}");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
println!(
|
println!(
|
||||||
|
@ -3383,7 +3384,7 @@ fn main() {
|
||||||
)
|
)
|
||||||
.unwrap_or_else(|err| {
|
.unwrap_or_else(|err| {
|
||||||
eprintln!("Unable to create snapshot: {err}");
|
eprintln!("Unable to create snapshot: {err}");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
println!(
|
println!(
|
||||||
|
@ -3414,7 +3415,7 @@ fn main() {
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
eprintln!("Failed to load ledger: {err:?}");
|
eprintln!("Failed to load ledger: {err:?}");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3444,7 +3445,7 @@ fn main() {
|
||||||
)
|
)
|
||||||
.unwrap_or_else(|err| {
|
.unwrap_or_else(|err| {
|
||||||
eprintln!("Failed to load ledger: {err:?}");
|
eprintln!("Failed to load ledger: {err:?}");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
let bank = bank_forks.read().unwrap().working_bank();
|
let bank = bank_forks.read().unwrap().working_bank();
|
||||||
|
@ -3535,7 +3536,7 @@ fn main() {
|
||||||
let slot = bank_forks.working_bank().slot();
|
let slot = bank_forks.working_bank().slot();
|
||||||
let bank = bank_forks.get(slot).unwrap_or_else(|| {
|
let bank = bank_forks.get(slot).unwrap_or_else(|| {
|
||||||
eprintln!("Error: Slot {slot} is not available");
|
eprintln!("Error: Slot {slot} is not available");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
if arg_matches.is_present("recalculate_capitalization") {
|
if arg_matches.is_present("recalculate_capitalization") {
|
||||||
|
@ -3566,7 +3567,7 @@ fn main() {
|
||||||
base_bank.epoch(),
|
base_bank.epoch(),
|
||||||
warp_epoch
|
warp_epoch
|
||||||
);
|
);
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Ok(raw_inflation) = value_t!(arg_matches, "inflation", String) {
|
if let Ok(raw_inflation) = value_t!(arg_matches, "inflation", String) {
|
||||||
|
@ -4032,7 +4033,7 @@ fn main() {
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
eprintln!("Failed to load ledger: {err:?}");
|
eprintln!("Failed to load ledger: {err:?}");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4061,20 +4062,20 @@ fn main() {
|
||||||
let slots: Vec<_> = metas.map(|(slot, _)| slot).collect();
|
let slots: Vec<_> = metas.map(|(slot, _)| slot).collect();
|
||||||
if slots.is_empty() {
|
if slots.is_empty() {
|
||||||
eprintln!("Purge range is empty");
|
eprintln!("Purge range is empty");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
*slots.last().unwrap()
|
*slots.last().unwrap()
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
eprintln!("Unable to read the Ledger: {err:?}");
|
eprintln!("Unable to read the Ledger: {err:?}");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
if end_slot < start_slot {
|
if end_slot < start_slot {
|
||||||
eprintln!("end slot {end_slot} is less than start slot {start_slot}");
|
eprintln!("end slot {end_slot} is less than start slot {start_slot}");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
info!(
|
info!(
|
||||||
"Purging data from slots {} to {} ({} slots) (skip compaction: {}) (dead slot only: {})",
|
"Purging data from slots {} to {} ({} slots) (skip compaction: {}) (dead slot only: {})",
|
||||||
|
@ -4223,7 +4224,7 @@ fn main() {
|
||||||
Either adjust `--until` value, or pass a larger `--repair-limit` \
|
Either adjust `--until` value, or pass a larger `--repair-limit` \
|
||||||
to override the limit",
|
to override the limit",
|
||||||
);
|
);
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
let ancestor_iterator = AncestorIterator::new(start_root, &blockstore)
|
let ancestor_iterator = AncestorIterator::new(start_root, &blockstore)
|
||||||
.take_while(|&slot| slot >= end_root);
|
.take_while(|&slot| slot >= end_root);
|
||||||
|
@ -4238,7 +4239,7 @@ fn main() {
|
||||||
.set_roots(roots_to_fix.iter())
|
.set_roots(roots_to_fix.iter())
|
||||||
.unwrap_or_else(|err| {
|
.unwrap_or_else(|err| {
|
||||||
eprintln!("Unable to set roots {roots_to_fix:?}: {err}");
|
eprintln!("Unable to set roots {roots_to_fix:?}: {err}");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -4312,7 +4313,7 @@ fn main() {
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
eprintln!("Unable to read the Ledger: {err:?}");
|
eprintln!("Unable to read the Ledger: {err:?}");
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -4364,7 +4365,7 @@ fn main() {
|
||||||
}
|
}
|
||||||
("", _) => {
|
("", _) => {
|
||||||
eprintln!("{}", matches.usage());
|
eprintln!("{}", matches.usage());
|
||||||
exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
|
|
@ -283,6 +283,7 @@ impl LocalCluster {
|
||||||
vec![],
|
vec![],
|
||||||
&leader_config,
|
&leader_config,
|
||||||
true, // should_check_duplicate_instance
|
true, // should_check_duplicate_instance
|
||||||
|
None, // rpc_to_plugin_manager_receiver
|
||||||
Arc::new(RwLock::new(ValidatorStartProgress::default())),
|
Arc::new(RwLock::new(ValidatorStartProgress::default())),
|
||||||
socket_addr_space,
|
socket_addr_space,
|
||||||
DEFAULT_TPU_USE_QUIC,
|
DEFAULT_TPU_USE_QUIC,
|
||||||
|
@ -489,6 +490,7 @@ impl LocalCluster {
|
||||||
vec![LegacyContactInfo::try_from(&self.entry_point_info).unwrap()],
|
vec![LegacyContactInfo::try_from(&self.entry_point_info).unwrap()],
|
||||||
&config,
|
&config,
|
||||||
true, // should_check_duplicate_instance
|
true, // should_check_duplicate_instance
|
||||||
|
None, // rpc_to_plugin_manager_receiver
|
||||||
Arc::new(RwLock::new(ValidatorStartProgress::default())),
|
Arc::new(RwLock::new(ValidatorStartProgress::default())),
|
||||||
socket_addr_space,
|
socket_addr_space,
|
||||||
DEFAULT_TPU_USE_QUIC,
|
DEFAULT_TPU_USE_QUIC,
|
||||||
|
@ -862,6 +864,7 @@ impl Cluster for LocalCluster {
|
||||||
.unwrap_or_default(),
|
.unwrap_or_default(),
|
||||||
&safe_clone_config(&cluster_validator_info.config),
|
&safe_clone_config(&cluster_validator_info.config),
|
||||||
true, // should_check_duplicate_instance
|
true, // should_check_duplicate_instance
|
||||||
|
None, // rpc_to_plugin_manager_receiver
|
||||||
Arc::new(RwLock::new(ValidatorStartProgress::default())),
|
Arc::new(RwLock::new(ValidatorStartProgress::default())),
|
||||||
socket_addr_space,
|
socket_addr_space,
|
||||||
DEFAULT_TPU_USE_QUIC,
|
DEFAULT_TPU_USE_QUIC,
|
||||||
|
|
|
@ -14,7 +14,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
|
||||||
account_paths: config.account_paths.clone(),
|
account_paths: config.account_paths.clone(),
|
||||||
account_shrink_paths: config.account_shrink_paths.clone(),
|
account_shrink_paths: config.account_shrink_paths.clone(),
|
||||||
rpc_config: config.rpc_config.clone(),
|
rpc_config: config.rpc_config.clone(),
|
||||||
geyser_plugin_config_files: config.geyser_plugin_config_files.clone(),
|
on_start_geyser_plugin_config_files: config.on_start_geyser_plugin_config_files.clone(),
|
||||||
rpc_addrs: config.rpc_addrs,
|
rpc_addrs: config.rpc_addrs,
|
||||||
pubsub_config: config.pubsub_config.clone(),
|
pubsub_config: config.pubsub_config.clone(),
|
||||||
snapshot_config: config.snapshot_config.clone(),
|
snapshot_config: config.snapshot_config.clone(),
|
||||||
|
|
|
@ -4834,6 +4834,8 @@ dependencies = [
|
||||||
"bs58",
|
"bs58",
|
||||||
"crossbeam-channel",
|
"crossbeam-channel",
|
||||||
"json5",
|
"json5",
|
||||||
|
"jsonrpc-core",
|
||||||
|
"jsonrpc-server-utils",
|
||||||
"libloading",
|
"libloading",
|
||||||
"log",
|
"log",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
@ -6103,12 +6105,14 @@ version = "1.16.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"base64 0.13.0",
|
"base64 0.13.0",
|
||||||
"bincode",
|
"bincode",
|
||||||
|
"crossbeam-channel",
|
||||||
"log",
|
"log",
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"solana-cli-output",
|
"solana-cli-output",
|
||||||
"solana-client",
|
"solana-client",
|
||||||
"solana-core",
|
"solana-core",
|
||||||
|
"solana-geyser-plugin-manager",
|
||||||
"solana-gossip",
|
"solana-gossip",
|
||||||
"solana-ledger",
|
"solana-ledger",
|
||||||
"solana-logger 1.16.0",
|
"solana-logger 1.16.0",
|
||||||
|
@ -6219,6 +6223,7 @@ dependencies = [
|
||||||
"jsonrpc-server-utils",
|
"jsonrpc-server-utils",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"libc",
|
"libc",
|
||||||
|
"libloading",
|
||||||
"log",
|
"log",
|
||||||
"num_cpus",
|
"num_cpus",
|
||||||
"rand 0.7.3",
|
"rand 0.7.3",
|
||||||
|
@ -6234,6 +6239,8 @@ dependencies = [
|
||||||
"solana-entry",
|
"solana-entry",
|
||||||
"solana-faucet",
|
"solana-faucet",
|
||||||
"solana-genesis-utils",
|
"solana-genesis-utils",
|
||||||
|
"solana-geyser-plugin-interface",
|
||||||
|
"solana-geyser-plugin-manager",
|
||||||
"solana-gossip",
|
"solana-gossip",
|
||||||
"solana-ledger",
|
"solana-ledger",
|
||||||
"solana-logger 1.16.0",
|
"solana-logger 1.16.0",
|
||||||
|
|
|
@ -12,12 +12,14 @@ edition = { workspace = true }
|
||||||
[dependencies]
|
[dependencies]
|
||||||
base64 = { workspace = true }
|
base64 = { workspace = true }
|
||||||
bincode = { workspace = true }
|
bincode = { workspace = true }
|
||||||
|
crossbeam-channel = { workspace = true }
|
||||||
log = { workspace = true }
|
log = { workspace = true }
|
||||||
serde_derive = { workspace = true }
|
serde_derive = { workspace = true }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
solana-cli-output = { workspace = true }
|
solana-cli-output = { workspace = true }
|
||||||
solana-client = { workspace = true }
|
solana-client = { workspace = true }
|
||||||
solana-core = { workspace = true }
|
solana-core = { workspace = true }
|
||||||
|
solana-geyser-plugin-manager = { workspace = true }
|
||||||
solana-gossip = { workspace = true }
|
solana-gossip = { workspace = true }
|
||||||
solana-ledger = { workspace = true }
|
solana-ledger = { workspace = true }
|
||||||
solana-logger = { workspace = true }
|
solana-logger = { workspace = true }
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
#![allow(clippy::integer_arithmetic)]
|
#![allow(clippy::integer_arithmetic)]
|
||||||
|
|
||||||
use {
|
use {
|
||||||
|
crossbeam_channel::Receiver,
|
||||||
log::*,
|
log::*,
|
||||||
solana_cli_output::CliAccount,
|
solana_cli_output::CliAccount,
|
||||||
solana_client::rpc_request::MAX_MULTIPLE_ACCOUNTS,
|
solana_client::rpc_request::MAX_MULTIPLE_ACCOUNTS,
|
||||||
|
@ -9,6 +9,9 @@ use {
|
||||||
tower_storage::TowerStorage,
|
tower_storage::TowerStorage,
|
||||||
validator::{Validator, ValidatorConfig, ValidatorStartProgress},
|
validator::{Validator, ValidatorConfig, ValidatorStartProgress},
|
||||||
},
|
},
|
||||||
|
solana_geyser_plugin_manager::{
|
||||||
|
geyser_plugin_manager::GeyserPluginManager, GeyserPluginManagerRequest,
|
||||||
|
},
|
||||||
solana_gossip::{
|
solana_gossip::{
|
||||||
cluster_info::{ClusterInfo, Node},
|
cluster_info::{ClusterInfo, Node},
|
||||||
gossip_service::discover_cluster,
|
gossip_service::discover_cluster,
|
||||||
|
@ -138,6 +141,7 @@ pub struct TestValidatorGenesis {
|
||||||
pub log_messages_bytes_limit: Option<usize>,
|
pub log_messages_bytes_limit: Option<usize>,
|
||||||
pub transaction_account_lock_limit: Option<usize>,
|
pub transaction_account_lock_limit: Option<usize>,
|
||||||
pub tpu_enable_udp: bool,
|
pub tpu_enable_udp: bool,
|
||||||
|
pub geyser_plugin_manager: Arc<RwLock<GeyserPluginManager>>,
|
||||||
admin_rpc_service_post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
|
admin_rpc_service_post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -172,6 +176,7 @@ impl Default for TestValidatorGenesis {
|
||||||
log_messages_bytes_limit: Option::<usize>::default(),
|
log_messages_bytes_limit: Option::<usize>::default(),
|
||||||
transaction_account_lock_limit: Option::<usize>::default(),
|
transaction_account_lock_limit: Option::<usize>::default(),
|
||||||
tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP,
|
tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP,
|
||||||
|
geyser_plugin_manager: Arc::new(RwLock::new(GeyserPluginManager::new())),
|
||||||
admin_rpc_service_post_init:
|
admin_rpc_service_post_init:
|
||||||
Arc::<RwLock<Option<AdminRpcRequestMetadataPostInit>>>::default(),
|
Arc::<RwLock<Option<AdminRpcRequestMetadataPostInit>>>::default(),
|
||||||
}
|
}
|
||||||
|
@ -530,7 +535,26 @@ impl TestValidatorGenesis {
|
||||||
mint_address: Pubkey,
|
mint_address: Pubkey,
|
||||||
socket_addr_space: SocketAddrSpace,
|
socket_addr_space: SocketAddrSpace,
|
||||||
) -> Result<TestValidator, Box<dyn std::error::Error>> {
|
) -> Result<TestValidator, Box<dyn std::error::Error>> {
|
||||||
TestValidator::start(mint_address, self, socket_addr_space).map(|test_validator| {
|
self.start_with_mint_address_and_geyser_plugin_rpc(mint_address, socket_addr_space, None)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Start a test validator with the address of the mint account that will receive tokens
|
||||||
|
/// created at genesis. Augments admin rpc service with dynamic geyser plugin manager if
|
||||||
|
/// the geyser plugin service is enabled at startup.
|
||||||
|
///
|
||||||
|
pub fn start_with_mint_address_and_geyser_plugin_rpc(
|
||||||
|
&self,
|
||||||
|
mint_address: Pubkey,
|
||||||
|
socket_addr_space: SocketAddrSpace,
|
||||||
|
rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
|
||||||
|
) -> Result<TestValidator, Box<dyn std::error::Error>> {
|
||||||
|
TestValidator::start(
|
||||||
|
mint_address,
|
||||||
|
self,
|
||||||
|
socket_addr_space,
|
||||||
|
rpc_to_plugin_manager_receiver,
|
||||||
|
)
|
||||||
|
.map(|test_validator| {
|
||||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||||
.enable_io()
|
.enable_io()
|
||||||
.enable_time()
|
.enable_time()
|
||||||
|
@ -579,7 +603,7 @@ impl TestValidatorGenesis {
|
||||||
socket_addr_space: SocketAddrSpace,
|
socket_addr_space: SocketAddrSpace,
|
||||||
) -> (TestValidator, Keypair) {
|
) -> (TestValidator, Keypair) {
|
||||||
let mint_keypair = Keypair::new();
|
let mint_keypair = Keypair::new();
|
||||||
match TestValidator::start(mint_keypair.pubkey(), self, socket_addr_space) {
|
match TestValidator::start(mint_keypair.pubkey(), self, socket_addr_space, None) {
|
||||||
Ok(test_validator) => {
|
Ok(test_validator) => {
|
||||||
test_validator.wait_for_nonzero_fees().await;
|
test_validator.wait_for_nonzero_fees().await;
|
||||||
(test_validator, mint_keypair)
|
(test_validator, mint_keypair)
|
||||||
|
@ -835,6 +859,7 @@ impl TestValidator {
|
||||||
mint_address: Pubkey,
|
mint_address: Pubkey,
|
||||||
config: &TestValidatorGenesis,
|
config: &TestValidatorGenesis,
|
||||||
socket_addr_space: SocketAddrSpace,
|
socket_addr_space: SocketAddrSpace,
|
||||||
|
rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
|
||||||
) -> Result<Self, Box<dyn std::error::Error>> {
|
) -> Result<Self, Box<dyn std::error::Error>> {
|
||||||
let preserve_ledger = config.ledger_path.is_some();
|
let preserve_ledger = config.ledger_path.is_some();
|
||||||
let ledger_path = TestValidator::initialize_ledger(mint_address, config)?;
|
let ledger_path = TestValidator::initialize_ledger(mint_address, config)?;
|
||||||
|
@ -897,7 +922,7 @@ impl TestValidator {
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut validator_config = ValidatorConfig {
|
let mut validator_config = ValidatorConfig {
|
||||||
geyser_plugin_config_files: config.geyser_plugin_config_files.clone(),
|
on_start_geyser_plugin_config_files: config.geyser_plugin_config_files.clone(),
|
||||||
rpc_addrs: Some((
|
rpc_addrs: Some((
|
||||||
SocketAddr::new(
|
SocketAddr::new(
|
||||||
IpAddr::V4(Ipv4Addr::UNSPECIFIED),
|
IpAddr::V4(Ipv4Addr::UNSPECIFIED),
|
||||||
|
@ -949,6 +974,7 @@ impl TestValidator {
|
||||||
vec![],
|
vec![],
|
||||||
&validator_config,
|
&validator_config,
|
||||||
true, // should_check_duplicate_instance
|
true, // should_check_duplicate_instance
|
||||||
|
rpc_to_plugin_manager_receiver,
|
||||||
config.start_progress.clone(),
|
config.start_progress.clone(),
|
||||||
socket_addr_space,
|
socket_addr_space,
|
||||||
DEFAULT_TPU_USE_QUIC,
|
DEFAULT_TPU_USE_QUIC,
|
||||||
|
|
|
@ -25,6 +25,7 @@ jsonrpc-derive = { workspace = true }
|
||||||
jsonrpc-ipc-server = { workspace = true }
|
jsonrpc-ipc-server = { workspace = true }
|
||||||
jsonrpc-server-utils = { workspace = true }
|
jsonrpc-server-utils = { workspace = true }
|
||||||
lazy_static = { workspace = true }
|
lazy_static = { workspace = true }
|
||||||
|
libloading = { workspace = true }
|
||||||
log = { workspace = true }
|
log = { workspace = true }
|
||||||
num_cpus = { workspace = true }
|
num_cpus = { workspace = true }
|
||||||
rand = { workspace = true }
|
rand = { workspace = true }
|
||||||
|
@ -39,6 +40,8 @@ solana-download-utils = { workspace = true }
|
||||||
solana-entry = { workspace = true }
|
solana-entry = { workspace = true }
|
||||||
solana-faucet = { workspace = true }
|
solana-faucet = { workspace = true }
|
||||||
solana-genesis-utils = { workspace = true }
|
solana-genesis-utils = { workspace = true }
|
||||||
|
solana-geyser-plugin-interface = { workspace = true }
|
||||||
|
solana-geyser-plugin-manager = { workspace = true }
|
||||||
solana-gossip = { workspace = true }
|
solana-gossip = { workspace = true }
|
||||||
solana-ledger = { workspace = true }
|
solana-ledger = { workspace = true }
|
||||||
solana-logger = { workspace = true }
|
solana-logger = { workspace = true }
|
||||||
|
|
|
@ -1,8 +1,11 @@
|
||||||
use {
|
use {
|
||||||
jsonrpc_core::{MetaIoHandler, Metadata, Result},
|
crossbeam_channel::Sender,
|
||||||
|
jsonrpc_core::{BoxFuture, ErrorCode, MetaIoHandler, Metadata, Result},
|
||||||
jsonrpc_core_client::{transports::ipc, RpcError},
|
jsonrpc_core_client::{transports::ipc, RpcError},
|
||||||
jsonrpc_derive::rpc,
|
jsonrpc_derive::rpc,
|
||||||
jsonrpc_ipc_server::{RequestContext, ServerBuilder},
|
jsonrpc_ipc_server::{
|
||||||
|
tokio::sync::oneshot::channel as oneshot_channel, RequestContext, ServerBuilder,
|
||||||
|
},
|
||||||
jsonrpc_server_utils::tokio,
|
jsonrpc_server_utils::tokio,
|
||||||
log::*,
|
log::*,
|
||||||
serde::{de::Deserializer, Deserialize, Serialize},
|
serde::{de::Deserializer, Deserialize, Serialize},
|
||||||
|
@ -10,6 +13,7 @@ use {
|
||||||
admin_rpc_post_init::AdminRpcRequestMetadataPostInit, consensus::Tower,
|
admin_rpc_post_init::AdminRpcRequestMetadataPostInit, consensus::Tower,
|
||||||
tower_storage::TowerStorage, validator::ValidatorStartProgress,
|
tower_storage::TowerStorage, validator::ValidatorStartProgress,
|
||||||
},
|
},
|
||||||
|
solana_geyser_plugin_manager::GeyserPluginManagerRequest,
|
||||||
solana_gossip::contact_info::ContactInfo,
|
solana_gossip::contact_info::ContactInfo,
|
||||||
solana_rpc::rpc::verify_pubkey,
|
solana_rpc::rpc::verify_pubkey,
|
||||||
solana_rpc_client_api::{config::RpcAccountIndex, custom_error::RpcCustomError},
|
solana_rpc_client_api::{config::RpcAccountIndex, custom_error::RpcCustomError},
|
||||||
|
@ -41,7 +45,9 @@ pub struct AdminRpcRequestMetadata {
|
||||||
pub tower_storage: Arc<dyn TowerStorage>,
|
pub tower_storage: Arc<dyn TowerStorage>,
|
||||||
pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
|
pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
|
||||||
pub post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
|
pub post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
|
||||||
|
pub rpc_to_plugin_manager_sender: Option<Sender<GeyserPluginManagerRequest>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Metadata for AdminRpcRequestMetadata {}
|
impl Metadata for AdminRpcRequestMetadata {}
|
||||||
|
|
||||||
impl AdminRpcRequestMetadata {
|
impl AdminRpcRequestMetadata {
|
||||||
|
@ -139,6 +145,23 @@ pub trait AdminRpc {
|
||||||
#[rpc(meta, name = "exit")]
|
#[rpc(meta, name = "exit")]
|
||||||
fn exit(&self, meta: Self::Metadata) -> Result<()>;
|
fn exit(&self, meta: Self::Metadata) -> Result<()>;
|
||||||
|
|
||||||
|
#[rpc(meta, name = "reloadPlugin")]
|
||||||
|
fn reload_plugin(
|
||||||
|
&self,
|
||||||
|
meta: Self::Metadata,
|
||||||
|
name: String,
|
||||||
|
config_file: String,
|
||||||
|
) -> BoxFuture<Result<()>>;
|
||||||
|
|
||||||
|
#[rpc(meta, name = "unloadPlugin")]
|
||||||
|
fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>>;
|
||||||
|
|
||||||
|
#[rpc(meta, name = "loadPlugin")]
|
||||||
|
fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>>;
|
||||||
|
|
||||||
|
#[rpc(meta, name = "listPlugins")]
|
||||||
|
fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>>;
|
||||||
|
|
||||||
#[rpc(meta, name = "rpcAddress")]
|
#[rpc(meta, name = "rpcAddress")]
|
||||||
fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>>;
|
fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>>;
|
||||||
|
|
||||||
|
@ -234,6 +257,121 @@ impl AdminRpc for AdminRpcImpl {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn reload_plugin(
|
||||||
|
&self,
|
||||||
|
meta: Self::Metadata,
|
||||||
|
name: String,
|
||||||
|
config_file: String,
|
||||||
|
) -> BoxFuture<Result<()>> {
|
||||||
|
Box::pin(async move {
|
||||||
|
// Construct channel for plugin to respond to this particular rpc request instance
|
||||||
|
let (response_sender, response_receiver) = oneshot_channel();
|
||||||
|
|
||||||
|
// Send request to plugin manager if there is a geyser service
|
||||||
|
if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
|
||||||
|
rpc_to_manager_sender
|
||||||
|
.send(GeyserPluginManagerRequest::ReloadPlugin {
|
||||||
|
name,
|
||||||
|
config_file,
|
||||||
|
response_sender,
|
||||||
|
})
|
||||||
|
.expect("GeyerPluginService should never drop request receiver");
|
||||||
|
} else {
|
||||||
|
return Err(jsonrpc_core::Error {
|
||||||
|
code: ErrorCode::InvalidRequest,
|
||||||
|
message: "No geyser plugin service".to_string(),
|
||||||
|
data: None,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Await response from plugin manager
|
||||||
|
response_receiver
|
||||||
|
.await
|
||||||
|
.expect("GeyerPluginService's oneshot sender shouldn't drop early")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>> {
|
||||||
|
Box::pin(async move {
|
||||||
|
// Construct channel for plugin to respond to this particular rpc request instance
|
||||||
|
let (response_sender, response_receiver) = oneshot_channel();
|
||||||
|
|
||||||
|
// Send request to plugin manager if there is a geyser service
|
||||||
|
if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
|
||||||
|
rpc_to_manager_sender
|
||||||
|
.send(GeyserPluginManagerRequest::LoadPlugin {
|
||||||
|
config_file,
|
||||||
|
response_sender,
|
||||||
|
})
|
||||||
|
.expect("GeyerPluginService should never drop request receiver");
|
||||||
|
} else {
|
||||||
|
return Err(jsonrpc_core::Error {
|
||||||
|
code: ErrorCode::InvalidRequest,
|
||||||
|
message: "No geyser plugin service".to_string(),
|
||||||
|
data: None,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Await response from plugin manager
|
||||||
|
response_receiver
|
||||||
|
.await
|
||||||
|
.expect("GeyerPluginService's oneshot sender shouldn't drop early")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>> {
|
||||||
|
Box::pin(async move {
|
||||||
|
// Construct channel for plugin to respond to this particular rpc request instance
|
||||||
|
let (response_sender, response_receiver) = oneshot_channel();
|
||||||
|
|
||||||
|
// Send request to plugin manager if there is a geyser service
|
||||||
|
if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
|
||||||
|
rpc_to_manager_sender
|
||||||
|
.send(GeyserPluginManagerRequest::UnloadPlugin {
|
||||||
|
name,
|
||||||
|
response_sender,
|
||||||
|
})
|
||||||
|
.expect("GeyerPluginService should never drop request receiver");
|
||||||
|
} else {
|
||||||
|
return Err(jsonrpc_core::Error {
|
||||||
|
code: ErrorCode::InvalidRequest,
|
||||||
|
message: "No geyser plugin service".to_string(),
|
||||||
|
data: None,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Await response from plugin manager
|
||||||
|
response_receiver
|
||||||
|
.await
|
||||||
|
.expect("GeyerPluginService's oneshot sender shouldn't drop early")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>> {
|
||||||
|
Box::pin(async move {
|
||||||
|
// Construct channel for plugin to respond to this particular rpc request instance
|
||||||
|
let (response_sender, response_receiver) = oneshot_channel();
|
||||||
|
|
||||||
|
// Send request to plugin manager
|
||||||
|
if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
|
||||||
|
rpc_to_manager_sender
|
||||||
|
.send(GeyserPluginManagerRequest::ListPlugins { response_sender })
|
||||||
|
.expect("GeyerPluginService should never drop request receiver");
|
||||||
|
} else {
|
||||||
|
return Err(jsonrpc_core::Error {
|
||||||
|
code: ErrorCode::InvalidRequest,
|
||||||
|
message: "No geyser plugin service".to_string(),
|
||||||
|
data: None,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Await response from plugin manager
|
||||||
|
response_receiver
|
||||||
|
.await
|
||||||
|
.expect("GeyerPluginService's oneshot sender shouldn't drop early")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>> {
|
fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>> {
|
||||||
debug!("rpc_addr admin rpc request received");
|
debug!("rpc_addr admin rpc request received");
|
||||||
Ok(meta.rpc_addr)
|
Ok(meta.rpc_addr)
|
||||||
|
@ -544,6 +682,7 @@ pub fn run(ledger_path: &Path, metadata: AdminRpcRequestMetadata) {
|
||||||
warn!("Unable to start admin rpc service: {:?}", err);
|
warn!("Unable to start admin rpc service: {:?}", err);
|
||||||
}
|
}
|
||||||
Ok(server) => {
|
Ok(server) => {
|
||||||
|
info!("started admin rpc service!");
|
||||||
let close_handle = server.close_handle();
|
let close_handle = server.close_handle();
|
||||||
validator_exit
|
validator_exit
|
||||||
.write()
|
.write()
|
||||||
|
@ -708,6 +847,7 @@ mod tests {
|
||||||
repair_whitelist,
|
repair_whitelist,
|
||||||
}))),
|
}))),
|
||||||
staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
|
staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
|
||||||
|
rpc_to_plugin_manager_sender: None,
|
||||||
};
|
};
|
||||||
let mut io = MetaIoHandler::default();
|
let mut io = MetaIoHandler::default();
|
||||||
io.extend_with(AdminRpcImpl.to_delegate());
|
io.extend_with(AdminRpcImpl.to_delegate());
|
||||||
|
|
|
@ -381,6 +381,14 @@ fn main() {
|
||||||
let tower_storage = Arc::new(FileTowerStorage::new(ledger_path.clone()));
|
let tower_storage = Arc::new(FileTowerStorage::new(ledger_path.clone()));
|
||||||
|
|
||||||
let admin_service_post_init = Arc::new(RwLock::new(None));
|
let admin_service_post_init = Arc::new(RwLock::new(None));
|
||||||
|
// If geyser_plugin_config value is invalid, the validator will exit when the values are extracted below
|
||||||
|
let (rpc_to_plugin_manager_sender, rpc_to_plugin_manager_receiver) =
|
||||||
|
if matches.is_present("geyser_plugin_config") {
|
||||||
|
let (sender, receiver) = unbounded();
|
||||||
|
(Some(sender), Some(receiver))
|
||||||
|
} else {
|
||||||
|
(None, None)
|
||||||
|
};
|
||||||
admin_rpc_service::run(
|
admin_rpc_service::run(
|
||||||
&ledger_path,
|
&ledger_path,
|
||||||
admin_rpc_service::AdminRpcRequestMetadata {
|
admin_rpc_service::AdminRpcRequestMetadata {
|
||||||
|
@ -392,6 +400,7 @@ fn main() {
|
||||||
staked_nodes_overrides: genesis.staked_nodes_overrides.clone(),
|
staked_nodes_overrides: genesis.staked_nodes_overrides.clone(),
|
||||||
post_init: admin_service_post_init,
|
post_init: admin_service_post_init,
|
||||||
tower_storage: tower_storage.clone(),
|
tower_storage: tower_storage.clone(),
|
||||||
|
rpc_to_plugin_manager_sender,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
let dashboard = if output == Output::Dashboard {
|
let dashboard = if output == Output::Dashboard {
|
||||||
|
@ -542,7 +551,11 @@ fn main() {
|
||||||
genesis.compute_unit_limit(compute_unit_limit);
|
genesis.compute_unit_limit(compute_unit_limit);
|
||||||
}
|
}
|
||||||
|
|
||||||
match genesis.start_with_mint_address(mint_address, socket_addr_space) {
|
match genesis.start_with_mint_address_and_geyser_plugin_rpc(
|
||||||
|
mint_address,
|
||||||
|
socket_addr_space,
|
||||||
|
rpc_to_plugin_manager_receiver,
|
||||||
|
) {
|
||||||
Ok(test_validator) => {
|
Ok(test_validator) => {
|
||||||
if let Some(dashboard) = dashboard {
|
if let Some(dashboard) = dashboard {
|
||||||
dashboard.run(Duration::from_millis(250));
|
dashboard.run(Duration::from_millis(250));
|
||||||
|
|
|
@ -1465,6 +1465,48 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
|
||||||
SubCommand::with_name("run")
|
SubCommand::with_name("run")
|
||||||
.about("Run the validator")
|
.about("Run the validator")
|
||||||
)
|
)
|
||||||
|
.subcommand(
|
||||||
|
SubCommand::with_name("plugin")
|
||||||
|
.about("Manage and view geyser plugins")
|
||||||
|
.setting(AppSettings::SubcommandRequiredElseHelp)
|
||||||
|
.setting(AppSettings::InferSubcommands)
|
||||||
|
.subcommand(
|
||||||
|
SubCommand::with_name("list")
|
||||||
|
.about("List all current running gesyer plugins")
|
||||||
|
)
|
||||||
|
.subcommand(
|
||||||
|
SubCommand::with_name("unload")
|
||||||
|
.about("Unload a particular gesyer plugin. You must specify the gesyer plugin name")
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("name")
|
||||||
|
.required(true)
|
||||||
|
.takes_value(true)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.subcommand(
|
||||||
|
SubCommand::with_name("reload")
|
||||||
|
.about("Reload a particular gesyer plugin. You must specify the gesyer plugin name and the new config path")
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("name")
|
||||||
|
.required(true)
|
||||||
|
.takes_value(true)
|
||||||
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("config")
|
||||||
|
.required(true)
|
||||||
|
.takes_value(true)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.subcommand(
|
||||||
|
SubCommand::with_name("load")
|
||||||
|
.about("Load a new gesyer plugin. You must specify the config path. Fails if overwriting (use reload)")
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("config")
|
||||||
|
.required(true)
|
||||||
|
.takes_value(true)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
.subcommand(
|
.subcommand(
|
||||||
SubCommand::with_name("set-identity")
|
SubCommand::with_name("set-identity")
|
||||||
.about("Set the validator identity")
|
.about("Set the validator identity")
|
||||||
|
|
|
@ -4,6 +4,7 @@ use jemallocator::Jemalloc;
|
||||||
use {
|
use {
|
||||||
clap::{crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit, ArgMatches},
|
clap::{crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit, ArgMatches},
|
||||||
console::style,
|
console::style,
|
||||||
|
crossbeam_channel::unbounded,
|
||||||
log::*,
|
log::*,
|
||||||
rand::{seq::SliceRandom, thread_rng},
|
rand::{seq::SliceRandom, thread_rng},
|
||||||
solana_clap_utils::input_parsers::{keypair_of, keypairs_of, pubkey_of, value_of},
|
solana_clap_utils::input_parsers::{keypair_of, keypairs_of, pubkey_of, value_of},
|
||||||
|
@ -536,6 +537,79 @@ pub fn main() {
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
("plugin", Some(plugin_subcommand_matches)) => {
|
||||||
|
match plugin_subcommand_matches.subcommand() {
|
||||||
|
("list", _) => {
|
||||||
|
let admin_client = admin_rpc_service::connect(&ledger_path);
|
||||||
|
let plugins = admin_rpc_service::runtime()
|
||||||
|
.block_on(async move { admin_client.await?.list_plugins().await })
|
||||||
|
.unwrap_or_else(|err| {
|
||||||
|
println!("Failed to list plugins: {err}");
|
||||||
|
exit(1);
|
||||||
|
});
|
||||||
|
if !plugins.is_empty() {
|
||||||
|
println!("Currently the following plugins are loaded:");
|
||||||
|
for (plugin, i) in plugins.into_iter().zip(1..) {
|
||||||
|
println!(" {i}) {plugin}");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
println!("There are currently no plugins loaded");
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
("unload", Some(subcommand_matches)) => {
|
||||||
|
if let Some(name) = value_t!(subcommand_matches, "name", String).ok() {
|
||||||
|
let admin_client = admin_rpc_service::connect(&ledger_path);
|
||||||
|
admin_rpc_service::runtime()
|
||||||
|
.block_on(async {
|
||||||
|
admin_client.await?.unload_plugin(name.clone()).await
|
||||||
|
})
|
||||||
|
.unwrap_or_else(|err| {
|
||||||
|
println!("Failed to unload plugin {name}: {err:?}");
|
||||||
|
exit(1);
|
||||||
|
});
|
||||||
|
println!("Successfully unloaded plugin: {name}");
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
("load", Some(subcommand_matches)) => {
|
||||||
|
if let Some(config) = value_t!(subcommand_matches, "config", String).ok() {
|
||||||
|
let admin_client = admin_rpc_service::connect(&ledger_path);
|
||||||
|
let name = admin_rpc_service::runtime()
|
||||||
|
.block_on(async {
|
||||||
|
admin_client.await?.load_plugin(config.clone()).await
|
||||||
|
})
|
||||||
|
.unwrap_or_else(|err| {
|
||||||
|
println!("Failed to load plugin {config}: {err:?}");
|
||||||
|
exit(1);
|
||||||
|
});
|
||||||
|
println!("Successfully loaded plugin: {name}");
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
("reload", Some(subcommand_matches)) => {
|
||||||
|
if let Some(name) = value_t!(subcommand_matches, "name", String).ok() {
|
||||||
|
if let Some(config) = value_t!(subcommand_matches, "config", String).ok() {
|
||||||
|
let admin_client = admin_rpc_service::connect(&ledger_path);
|
||||||
|
admin_rpc_service::runtime()
|
||||||
|
.block_on(async {
|
||||||
|
admin_client
|
||||||
|
.await?
|
||||||
|
.reload_plugin(name.clone(), config.clone())
|
||||||
|
.await
|
||||||
|
})
|
||||||
|
.unwrap_or_else(|err| {
|
||||||
|
println!("Failed to reload plugin {name}: {err:?}");
|
||||||
|
exit(1);
|
||||||
|
});
|
||||||
|
println!("Successfully reloaded plugin: {name}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
("contact-info", Some(subcommand_matches)) => {
|
("contact-info", Some(subcommand_matches)) => {
|
||||||
let output_mode = subcommand_matches.value_of("output");
|
let output_mode = subcommand_matches.value_of("output");
|
||||||
let admin_client = admin_rpc_service::connect(&ledger_path);
|
let admin_client = admin_rpc_service::connect(&ledger_path);
|
||||||
|
@ -1058,7 +1132,7 @@ pub fn main() {
|
||||||
|
|
||||||
let accounts_db_config = Some(accounts_db_config);
|
let accounts_db_config = Some(accounts_db_config);
|
||||||
|
|
||||||
let geyser_plugin_config_files = if matches.is_present("geyser_plugin_config") {
|
let on_start_geyser_plugin_config_files = if matches.is_present("geyser_plugin_config") {
|
||||||
Some(
|
Some(
|
||||||
values_t_or_exit!(matches, "geyser_plugin_config", String)
|
values_t_or_exit!(matches, "geyser_plugin_config", String)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
@ -1068,6 +1142,7 @@ pub fn main() {
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
let starting_with_geyser_plugins: bool = on_start_geyser_plugin_config_files.is_some();
|
||||||
|
|
||||||
let rpc_bigtable_config = if matches.is_present("enable_rpc_bigtable_ledger_storage")
|
let rpc_bigtable_config = if matches.is_present("enable_rpc_bigtable_ledger_storage")
|
||||||
|| matches.is_present("enable_bigtable_ledger_upload")
|
|| matches.is_present("enable_bigtable_ledger_upload")
|
||||||
|
@ -1155,7 +1230,7 @@ pub fn main() {
|
||||||
usize
|
usize
|
||||||
)),
|
)),
|
||||||
},
|
},
|
||||||
geyser_plugin_config_files,
|
on_start_geyser_plugin_config_files,
|
||||||
rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| {
|
rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| {
|
||||||
(
|
(
|
||||||
SocketAddr::new(rpc_bind_address, rpc_port),
|
SocketAddr::new(rpc_bind_address, rpc_port),
|
||||||
|
@ -1513,6 +1588,13 @@ pub fn main() {
|
||||||
|
|
||||||
let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
|
let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
|
||||||
let admin_service_post_init = Arc::new(RwLock::new(None));
|
let admin_service_post_init = Arc::new(RwLock::new(None));
|
||||||
|
let (rpc_to_plugin_manager_sender, rpc_to_plugin_manager_receiver) =
|
||||||
|
if starting_with_geyser_plugins {
|
||||||
|
let (sender, receiver) = unbounded();
|
||||||
|
(Some(sender), Some(receiver))
|
||||||
|
} else {
|
||||||
|
(None, None)
|
||||||
|
};
|
||||||
admin_rpc_service::run(
|
admin_rpc_service::run(
|
||||||
&ledger_path,
|
&ledger_path,
|
||||||
admin_rpc_service::AdminRpcRequestMetadata {
|
admin_rpc_service::AdminRpcRequestMetadata {
|
||||||
|
@ -1524,6 +1606,7 @@ pub fn main() {
|
||||||
post_init: admin_service_post_init.clone(),
|
post_init: admin_service_post_init.clone(),
|
||||||
tower_storage: validator_config.tower_storage.clone(),
|
tower_storage: validator_config.tower_storage.clone(),
|
||||||
staked_nodes_overrides,
|
staked_nodes_overrides,
|
||||||
|
rpc_to_plugin_manager_sender,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -1682,6 +1765,7 @@ pub fn main() {
|
||||||
cluster_entrypoints,
|
cluster_entrypoints,
|
||||||
&validator_config,
|
&validator_config,
|
||||||
should_check_duplicate_instance,
|
should_check_duplicate_instance,
|
||||||
|
rpc_to_plugin_manager_receiver,
|
||||||
start_progress,
|
start_progress,
|
||||||
socket_addr_space,
|
socket_addr_space,
|
||||||
tpu_use_quic,
|
tpu_use_quic,
|
||||||
|
|
Loading…
Reference in New Issue