From 9dfe79f74d51735b9ab731d6342ff4ac04c16828 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Fri, 21 Jul 2023 22:57:00 -0400 Subject: [PATCH] geyser: add panic config option on failed block reconstruction (#165) --- CHANGELOG.md | 2 ++ README.md | 4 +++ yellowstone-grpc-geyser/config.json | 3 +- yellowstone-grpc-geyser/src/config.rs | 16 +++++++++++ yellowstone-grpc-geyser/src/grpc.rs | 41 +++++++++++++++++++++------ yellowstone-grpc-geyser/src/plugin.rs | 5 ++-- 6 files changed, 59 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 35c7145..9885a4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ The minor version will be incremented upon a breaking change and the patch versi ### Features +geyser: add panic config option on failed block reconstruction ([#162](https://github.com/rpcpool/yellowstone-grpc/pull/162)). + ### Fixes ### Breaking diff --git a/README.md b/README.md index 3fd7945..077b9d1 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,10 @@ $ solana-validator --geyser-plugin-config yellowstone-grpc-geyser/config.json cargo-fmt && cargo run --bin config-check -- --config yellowstone-grpc-geyser/config.json ``` +### Block reconstruction + +Geyser interface on block update do not provide detailed information about transactions and accounts updates. To provide this information with block message we need to collect all messages and expect specified order. By default if we failed to reconstruct full block we log error message and increase `invalid_full_blocks_total` counter in prometheus metrics. If you want to panic on invalid reconstruction you can change option `block_fail_action` in config to `panic` (default value is `log`). + ### Filters See [yellowstone-grpc-proto/proto/geyser.proto](yellowstone-grpc-proto/proto/geyser.proto). diff --git a/yellowstone-grpc-geyser/config.json b/yellowstone-grpc-geyser/config.json index 1881c1b..b62ebd7 100644 --- a/yellowstone-grpc-geyser/config.json +++ b/yellowstone-grpc-geyser/config.json @@ -42,5 +42,6 @@ }, "prometheus": { "address": "0.0.0.0:8999" - } + }, + "block_fail_action": "log" } diff --git a/yellowstone-grpc-geyser/src/config.rs b/yellowstone-grpc-geyser/src/config.rs index a7e6bca..4956598 100644 --- a/yellowstone-grpc-geyser/src/config.rs +++ b/yellowstone-grpc-geyser/src/config.rs @@ -16,6 +16,9 @@ pub struct Config { pub grpc: ConfigGrpc, #[serde(default)] pub prometheus: Option, + /// Action on block re-construction error + #[serde(default)] + pub block_fail_action: ConfigBlockFailAction, } impl Config { @@ -251,6 +254,19 @@ pub struct ConfigPrometheus { pub address: SocketAddr, } +#[derive(Debug, Clone, Copy, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum ConfigBlockFailAction { + Log, + Panic, +} + +impl Default for ConfigBlockFailAction { + fn default() -> Self { + Self::Log + } +} + fn deserialize_usize_str<'de, D>(deserializer: D) -> Result where D: Deserializer<'de>, diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index fd6e924..6d05f32 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -1,6 +1,6 @@ use { crate::{ - config::ConfigGrpc, + config::{ConfigBlockFailAction, ConfigGrpc}, filters::{Filter, FilterAccountsDataSlice}, prom::{CONNECTIONS_TOTAL, INVALID_FULL_BLOCKS, MESSAGE_QUEUE_SIZE}, proto::{ @@ -586,6 +586,7 @@ pub struct GrpcService { impl GrpcService { pub fn create( config: ConfigGrpc, + block_fail_action: ConfigBlockFailAction, ) -> Result< (mpsc::UnboundedSender, oneshot::Sender<()>), Box, @@ -615,7 +616,12 @@ impl GrpcService { // Run geyser message loop let (messages_tx, messages_rx) = mpsc::unbounded_channel(); - tokio::spawn(Self::geyser_loop(messages_rx, blocks_meta_tx, broadcast_tx)); + tokio::spawn(Self::geyser_loop( + messages_rx, + blocks_meta_tx, + broadcast_tx, + block_fail_action, + )); // Run Server let (shutdown_tx, shutdown_rx) = oneshot::channel(); @@ -641,6 +647,7 @@ impl GrpcService { mut messages_rx: mpsc::UnboundedReceiver, blocks_meta_tx: mpsc::UnboundedSender, broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc>)>, + block_fail_action: ConfigBlockFailAction, ) { const PROCESSED_MESSAGES_MAX: usize = 31; const PROCESSED_MESSAGES_SLEEP: Duration = Duration::from_millis(10); @@ -720,7 +727,15 @@ impl GrpcService { let processed_message = $message.clone(); let (vec, map, collected) = messages.entry($message.get_slot()).or_default(); if *collected && !matches!(&$message, Message::Block(_) | Message::BlockMeta(_)) { - error!("unexpected message order for slot {}", $message.get_slot()); + match block_fail_action { + ConfigBlockFailAction::Log => { + INVALID_FULL_BLOCKS.inc(); + error!("unexpected message order for slot {}", $message.get_slot()); + } + ConfigBlockFailAction::Panic => { + panic!("unexpected message order for slot {}", $message.get_slot()); + } + } } if let Message::Account(message) = &$message { let write_version = message.account.write_version; @@ -780,10 +795,11 @@ impl GrpcService { transactions.get(&slot), Some((Some(block_meta), transactions)) if block_meta.executed_transaction_count as usize == transactions.len() ) { - let (block_meta, mut transactions) = transactions.remove(&slot).expect("checked"); - transactions.sort_by(|tx1, tx2| tx1.index.cmp(&tx2.index)); - let mut message = Message::Block((block_meta.expect("checked"), transactions).into()); - process_message!(message); + if let Some((Some(block_meta), mut transactions)) = transactions.remove(&slot) { + transactions.sort_by(|tx1, tx2| tx1.index.cmp(&tx2.index)); + let mut message = Message::Block((block_meta, transactions).into()); + process_message!(message); + } } // remove outdated transactions @@ -797,8 +813,15 @@ impl GrpcService { // Maybe log error Some(kslot) if kslot == slot => { if let Some((Some(_), vec)) = transactions.remove(&kslot) { - INVALID_FULL_BLOCKS.inc(); - error!("{} transactions left for block {kslot}", vec.len()); + match block_fail_action { + ConfigBlockFailAction::Log => { + INVALID_FULL_BLOCKS.inc(); + error!("{} transactions left for block {kslot}", vec.len()); + } + ConfigBlockFailAction::Panic => { + panic!("{} transactions left for block {kslot}", vec.len()); + } + } } } _ => break, diff --git a/yellowstone-grpc-geyser/src/plugin.rs b/yellowstone-grpc-geyser/src/plugin.rs index 88e3298..7189160 100644 --- a/yellowstone-grpc-geyser/src/plugin.rs +++ b/yellowstone-grpc-geyser/src/plugin.rs @@ -76,8 +76,9 @@ impl GeyserPlugin for Plugin { let runtime = Runtime::new().map_err(|error| GeyserPluginError::Custom(Box::new(error)))?; let (grpc_channel, grpc_shutdown_tx, prometheus) = runtime.block_on(async move { - let (grpc_channel, grpc_shutdown_tx) = GrpcService::create(config.grpc) - .map_err(|error| GeyserPluginError::Custom(error))?; + let (grpc_channel, grpc_shutdown_tx) = + GrpcService::create(config.grpc, config.block_fail_action) + .map_err(|error| GeyserPluginError::Custom(error))?; let prometheus = PrometheusService::new(config.prometheus) .map_err(|error| GeyserPluginError::Custom(Box::new(error)))?; Ok::<_, GeyserPluginError>((grpc_channel, grpc_shutdown_tx, prometheus))