solana/validator/src/admin_rpc_service.rs

184 lines
6.1 KiB
Rust
Raw Normal View History

use {
jsonrpc_core::{MetaIoHandler, Metadata, Result},
jsonrpc_core_client::{transports::ipc, RpcError},
jsonrpc_derive::rpc,
jsonrpc_ipc_server::{RequestContext, ServerBuilder},
jsonrpc_server_utils::tokio,
log::*,
solana_core::validator::{ValidatorExit, ValidatorStartProgress},
solana_sdk::signature::{read_keypair_file, Keypair, Signer},
std::{
net::SocketAddr,
path::Path,
sync::{Arc, RwLock},
thread::Builder,
time::SystemTime,
},
};
#[derive(Clone)]
pub struct AdminRpcRequestMetadata {
pub rpc_addr: Option<SocketAddr>,
pub start_time: SystemTime,
pub start_progress: Arc<RwLock<ValidatorStartProgress>>,
pub validator_exit: Arc<RwLock<ValidatorExit>>,
pub authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
}
impl Metadata for AdminRpcRequestMetadata {}
#[rpc]
pub trait AdminRpc {
type Metadata;
#[rpc(meta, name = "exit")]
fn exit(&self, meta: Self::Metadata) -> Result<()>;
#[rpc(meta, name = "rpcAddress")]
fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>>;
#[rpc(name = "setLogFilter")]
fn set_log_filter(&self, filter: String) -> Result<()>;
#[rpc(meta, name = "startTime")]
fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime>;
#[rpc(meta, name = "startProgress")]
fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress>;
#[rpc(meta, name = "addAuthorizedVoter")]
fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()>;
#[rpc(meta, name = "removeAllAuthorizedVoters")]
fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()>;
}
pub struct AdminRpcImpl;
impl AdminRpc for AdminRpcImpl {
type Metadata = AdminRpcRequestMetadata;
fn exit(&self, meta: Self::Metadata) -> Result<()> {
debug!("exit admin rpc request received");
// Delay exit signal until this RPC request completes, otherwise the caller of `exit` might
// receive a confusing error as the validator shuts down before a response is sent back.
tokio::spawn(async move {
meta.validator_exit.write().unwrap().exit();
});
Ok(())
}
fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>> {
debug!("rpc_addr admin rpc request received");
Ok(meta.rpc_addr)
}
fn set_log_filter(&self, filter: String) -> Result<()> {
debug!("set_log_filter admin rpc request received");
solana_logger::setup_with(&filter);
Ok(())
}
fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime> {
debug!("start_time admin rpc request received");
Ok(meta.start_time)
}
fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress> {
debug!("start_progress admin rpc request received");
Ok(*meta.start_progress.read().unwrap())
}
fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()> {
debug!("add_authorized_voter request received");
let authorized_voter = read_keypair_file(keypair_file)
.map_err(|err| jsonrpc_core::error::Error::invalid_params(format!("{}", err)))?;
let mut authorized_voter_keypairs = meta.authorized_voter_keypairs.write().unwrap();
if authorized_voter_keypairs
.iter()
.any(|x| x.pubkey() == authorized_voter.pubkey())
{
Err(jsonrpc_core::error::Error::invalid_params(
"Authorized voter already present",
))
} else {
authorized_voter_keypairs.push(Arc::new(authorized_voter));
Ok(())
}
}
fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()> {
debug!("remove_all_authorized_voters received");
let mut a = meta.authorized_voter_keypairs.write().unwrap();
error!("authorized_voter_keypairs pre len: {}", a.len());
a.clear();
error!("authorized_voter_keypairs post len: {}", a.len());
//meta.authorized_voter_keypairs.write().unwrap().clear();
Ok(())
}
}
// Start the Admin RPC interface
pub fn run(ledger_path: &Path, metadata: AdminRpcRequestMetadata) {
let admin_rpc_path = ledger_path.join("admin.rpc");
let event_loop = tokio::runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.thread_name("sol-adminrpc-el")
.build()
.unwrap();
Builder::new()
.name("solana-adminrpc".to_string())
.spawn(move || {
let mut io = MetaIoHandler::default();
io.extend_with(AdminRpcImpl.to_delegate());
let validator_exit = metadata.validator_exit.clone();
let server = ServerBuilder::with_meta_extractor(io, move |_req: &RequestContext| {
metadata.clone()
})
.event_loop_executor(event_loop.handle().clone())
.start(&format!("{}", admin_rpc_path.display()));
match server {
Err(err) => {
warn!("Unable to start admin rpc service: {:?}", err);
}
Ok(server) => {
let close_handle = server.close_handle();
validator_exit
.write()
.unwrap()
.register_exit(Box::new(move || {
close_handle.close();
}));
server.wait();
}
}
})
.unwrap();
}
// Connect to the Admin RPC interface
pub async fn connect(ledger_path: &Path) -> std::result::Result<gen_client::Client, RpcError> {
let admin_rpc_path = ledger_path.join("admin.rpc");
if !admin_rpc_path.exists() {
Err(RpcError::Client(format!(
"{} does not exist",
admin_rpc_path.display()
)))
} else {
ipc::connect::<_, gen_client::Client>(&format!("{}", admin_rpc_path.display())).await
}
}
pub fn runtime() -> jsonrpc_server_utils::tokio::runtime::Runtime {
jsonrpc_server_utils::tokio::runtime::Runtime::new().expect("new tokio runtime")
}