implementing snapshots and adding rpc server to the quic_geyser plugin to get snapshot

This commit is contained in:
godmodegalactus 2024-07-12 14:51:42 +02:00
parent 471c20686b
commit 181071acb7
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
7 changed files with 943 additions and 485 deletions

1162
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -27,6 +27,8 @@ solana-program = "~1.18.15"
solana-transaction-status = "~1.18.15"
solana-logger = "~1.18.15"
solana-rpc-client = "~1.18.15"
solana-rpc-client-api = "~1.18.15"
solana-account-decoder = "~1.18.15"
itertools = "0.10.5"
serde = "1.0.201"

View File

@ -20,13 +20,25 @@ serde = { workspace = true }
solana-sdk = { workspace = true }
solana-logger = { workspace = true }
serde_json = { workspace = true }
solana-rpc-client-api = { workspace = true }
solana-account-decoder = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
thiserror = {workspace = true}
jsonrpsee = { version = "0.20.0", features = ["macros", "full"] }
tower = "0.4.13"
tower-http = { version = "0.4.0", features = ["full"] }
quic-geyser-common = { workspace = true }
quic-geyser-server = { workspace = true }
quic-geyser-block-builder = { workspace = true }
quic-geyser-snapshot = { workspace = true }
lite-account-manager-common = { workspace = true }
itertools = { workspace = true }
tokio = {workspace = true}
[build-dependencies]
anyhow = { workspace = true }

View File

@ -2,6 +2,7 @@ use std::{fs::read_to_string, path::Path};
use agave_geyser_plugin_interface::geyser_plugin_interface::GeyserPluginError;
use quic_geyser_common::config::ConfigQuicPlugin;
use quic_geyser_snapshot::snapshot_config::SnapshotConfig;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -10,6 +11,8 @@ pub struct Config {
pub libpath: String,
pub quic_plugin: ConfigQuicPlugin,
pub rpc_server: RpcServiceConfig,
}
impl Config {
@ -24,3 +27,23 @@ impl Config {
Self::load_from_str(&config)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct RpcServiceConfig {
#[serde(default = "RpcServiceConfig::default_rpc_service_enable")]
pub enable: bool,
#[serde(default = "RpcServiceConfig::default_port")]
pub port: u16,
#[serde(default)]
pub snapshot_config: SnapshotConfig,
}
impl RpcServiceConfig {
pub fn default_rpc_service_enable() -> bool {
true
}
pub fn default_port() -> u16 {
10801
}
}

View File

@ -1,2 +1,3 @@
pub mod config;
pub mod quic_plugin;
pub mod rpc_server;

View File

@ -13,17 +13,21 @@ use quic_geyser_common::{
},
};
use quic_geyser_server::quic_server::QuicServer;
use quic_geyser_snapshot::snapshot_creator::SnapshotCreator;
use solana_sdk::{
account::Account, clock::Slot, commitment_config::CommitmentConfig, message::v0::Message,
pubkey::Pubkey,
};
use tokio::runtime::Runtime;
use crate::config::Config;
use crate::{config::Config, rpc_server::RpcServerImpl};
#[derive(Debug, Default)]
pub struct QuicGeyserPlugin {
runtime: Option<Runtime>,
quic_server: Option<QuicServer>,
block_builder_channel: Option<std::sync::mpsc::Sender<ChannelMessage>>,
rpc_server_message_channel: Option<tokio::sync::mpsc::UnboundedSender<ChannelMessage>>,
}
impl GeyserPlugin for QuicGeyserPlugin {
@ -34,9 +38,11 @@ impl GeyserPlugin for QuicGeyserPlugin {
fn on_load(&mut self, config_file: &str, _is_reload: bool) -> PluginResult<()> {
log::info!("loading quic_geyser plugin");
let config = Config::load_from_file(config_file)?;
let compression_params = config.quic_plugin.compression_parameters.clone();
let compression_type = config.quic_plugin.compression_parameters.compression_type;
let enable_block_builder = config.quic_plugin.enable_block_builder;
let build_blocks_with_accounts = config.quic_plugin.build_blocks_with_accounts;
let snapshot_config = config.rpc_server.snapshot_config;
log::info!("Quic plugin config correctly loaded");
solana_logger::setup_with_default(&config.quic_plugin.log_level);
let quic_server = QuicServer::new(config.quic_plugin).map_err(|_| {
@ -52,6 +58,26 @@ impl GeyserPlugin for QuicGeyserPlugin {
);
self.block_builder_channel = Some(sx);
}
if config.rpc_server.enable {
let runtime =
Runtime::new().map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;
let port = config.rpc_server.port;
let (server_sx, server_rx) = tokio::sync::mpsc::unbounded_channel();
self.rpc_server_message_channel = Some(server_sx);
runtime.block_on(async move {
let snapshot_creator = SnapshotCreator::new(snapshot_config, compression_params);
snapshot_creator.start_listening(server_rx);
let rpc_server = RpcServerImpl::new(snapshot_creator);
if let Err(e) = RpcServerImpl::start_serving(rpc_server, port).await {
log::error!("Error starting http server: {e:?}");
}
});
self.runtime = Some(runtime);
}
self.quic_server = Some(quic_server);
Ok(())
@ -104,6 +130,10 @@ impl GeyserPlugin for QuicGeyserPlugin {
let _ = block_channel.send(channel_message.clone());
}
if let Some(rpc_server_message_channel) = &self.rpc_server_message_channel {
let _ = rpc_server_message_channel.send(channel_message.clone());
}
quic_server
.send_message(channel_message)
.map_err(|e| GeyserPluginError::Custom(Box::new(e)))?;
@ -135,6 +165,10 @@ impl GeyserPlugin for QuicGeyserPlugin {
let _ = block_channel.send(slot_message.clone());
}
if let Some(rpc_server_message_channel) = &self.rpc_server_message_channel {
let _ = rpc_server_message_channel.send(slot_message.clone());
}
quic_server
.send_message(slot_message)
.map_err(|e| GeyserPluginError::Custom(Box::new(e)))?;
@ -245,6 +279,10 @@ impl GeyserPlugin for QuicGeyserPlugin {
let _ = block_channel.send(block_meta_message.clone());
}
if let Some(rpc_server_message_channel) = &self.rpc_server_message_channel {
let _ = rpc_server_message_channel.send(block_meta_message.clone());
}
quic_server
.send_message(block_meta_message)
.map_err(|e| GeyserPluginError::Custom(Box::new(e)))?;

188
plugin/src/rpc_server.rs Normal file
View File

@ -0,0 +1,188 @@
use std::str::FromStr;
use std::time::Duration;
use itertools::Itertools;
use jsonrpsee::server::ServerBuilder;
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use lite_account_manager_common::account_filter::AccountFilterType as AmAccountFilterType;
use lite_account_manager_common::{account_data::AccountData, commitment::Commitment};
use quic_geyser_snapshot::snapshot_creator::SnapshotCreator;
use solana_account_decoder::UiAccount;
use solana_rpc_client_api::client_error::reqwest::Method;
use solana_rpc_client_api::{
config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
response::{OptionalContext, Response as RpcResponse, RpcKeyedAccount, RpcResponseContext},
};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
use tower_http::cors::{Any, CorsLayer};
#[rpc(server)]
pub trait PluginRpc {
#[method(name = "getProgramAccounts")]
async fn get_program_accounts(
&self,
program_id_str: String,
config: Option<RpcProgramAccountsConfig>,
) -> RpcResult<OptionalContext<Vec<RpcKeyedAccount>>>;
#[method(name = "getSnapshot")]
async fn get_snapshot(&self, program_id_str: String) -> RpcResult<Vec<u8>>;
}
pub struct RpcServerImpl {
snapshot_creator: SnapshotCreator,
}
impl RpcServerImpl {
pub fn new(snapshot_creator: SnapshotCreator) -> Self {
Self { snapshot_creator }
}
pub async fn start_serving(rpc_impl: RpcServerImpl, port: u16) -> anyhow::Result<()> {
let http_addr = format!("[::]:{port}");
let cors = CorsLayer::new()
.max_age(Duration::from_secs(86400))
// Allow `POST` when accessing the resource
.allow_methods([Method::POST, Method::GET, Method::OPTIONS])
// Allow requests from any origin
.allow_origin(Any)
.allow_headers(Any);
let middleware = tower::ServiceBuilder::new().layer(cors);
let http_server_handle = ServerBuilder::default()
.set_middleware(middleware)
.max_connections(10)
.max_request_body_size(16 * 1024 * 1024) // 16 MB
.max_response_body_size(64 * 1024 * 1024 * 1024) // 64 GB
.http_only()
.build(http_addr.clone())
.await?
.start(rpc_impl.into_rpc());
tokio::spawn(async move {
log::info!("HTTP Server started at {http_addr:?}");
http_server_handle.stopped().await;
log::error!("QUIC GEYSER PLUGIN HTTP SERVER STOPPED");
panic!("QUIC GEYSER PLUGIN HTTP SERVER STOPPED")
});
Ok(())
}
}
#[jsonrpsee::core::async_trait]
impl PluginRpcServer for RpcServerImpl {
async fn get_program_accounts(
&self,
program_id_str: String,
config: Option<RpcProgramAccountsConfig>,
) -> RpcResult<OptionalContext<Vec<RpcKeyedAccount>>> {
let Ok(program_id) = Pubkey::from_str(&program_id_str) else {
return Err(jsonrpsee::types::error::ErrorCode::InternalError.into());
};
let with_context = config
.as_ref()
.map(|value| value.with_context.unwrap_or_default())
.unwrap_or_default();
let commitment: CommitmentConfig = config
.as_ref()
.and_then(|x| x.account_config.commitment)
.unwrap_or_default();
let account_filters = config
.as_ref()
.map(|x| {
x.filters
.as_ref()
.map(|filters| filters.iter().map(AmAccountFilterType::from).collect_vec())
})
.unwrap_or_default();
let commitment = Commitment::from(commitment);
let gpa = self
.snapshot_creator
.get_program_accounts(program_id, account_filters, commitment)
.await
.map_err(|_| jsonrpsee::types::error::ErrorCode::InternalError)?;
let min_context_slot = config
.as_ref()
.map(|c| {
if c.with_context.unwrap_or_default() {
c.account_config.min_context_slot
} else {
None
}
})
.unwrap_or_default()
.unwrap_or_default();
let slot = gpa
.iter()
.map(|program_account| program_account.updated_slot)
.max()
.unwrap_or_default();
let acc_config = config.map(|c| c.account_config);
let rpc_keyed_accounts = gpa
.iter()
.filter_map(|account_data| {
if account_data.updated_slot >= min_context_slot {
Some(RpcKeyedAccount {
pubkey: account_data.pubkey.to_string(),
account: convert_account_data_to_ui_account(
account_data,
acc_config.clone(),
),
})
} else {
None
}
})
.collect_vec();
if with_context {
Ok(OptionalContext::Context(RpcResponse {
context: RpcResponseContext {
slot,
api_version: None,
},
value: rpc_keyed_accounts,
}))
} else {
Ok(OptionalContext::NoContext(rpc_keyed_accounts))
}
}
async fn get_snapshot(&self, program_id_str: String) -> RpcResult<Vec<u8>> {
let program_id = Pubkey::from_str(program_id_str.as_str())
.map_err(|_| jsonrpsee::types::error::ErrorCode::InvalidParams)?;
let res = self
.snapshot_creator
.create_snapshot(program_id)
.await
.map_err(|_| jsonrpsee::types::error::ErrorCode::InternalError)?;
Ok(res)
}
}
pub fn convert_account_data_to_ui_account(
account_data: &AccountData,
config: Option<RpcAccountInfoConfig>,
) -> UiAccount {
let encoding = config
.as_ref()
.map(|c| c.encoding)
.unwrap_or_default()
.unwrap_or(solana_account_decoder::UiAccountEncoding::Base64);
let data_slice = config.as_ref().map(|c| c.data_slice).unwrap_or_default();
UiAccount::encode(
&account_data.pubkey,
&account_data.account.to_solana_account(),
encoding,
None,
data_slice,
)
}