geyser: add panic config option on failed block reconstruction (#165)

This commit is contained in:
Kirill Fomichev 2023-07-21 22:57:00 -04:00 committed by GitHub
parent b482e67db1
commit 9dfe79f74d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 59 additions and 12 deletions

View File

@ -12,6 +12,8 @@ The minor version will be incremented upon a breaking change and the patch versi
### Features
geyser: add panic config option on failed block reconstruction ([#162](https://github.com/rpcpool/yellowstone-grpc/pull/162)).
### Fixes
### Breaking

View File

@ -18,6 +18,10 @@ $ solana-validator --geyser-plugin-config yellowstone-grpc-geyser/config.json
cargo-fmt && cargo run --bin config-check -- --config yellowstone-grpc-geyser/config.json
```
### Block reconstruction
Geyser interface on block update do not provide detailed information about transactions and accounts updates. To provide this information with block message we need to collect all messages and expect specified order. By default if we failed to reconstruct full block we log error message and increase `invalid_full_blocks_total` counter in prometheus metrics. If you want to panic on invalid reconstruction you can change option `block_fail_action` in config to `panic` (default value is `log`).
### Filters
See [yellowstone-grpc-proto/proto/geyser.proto](yellowstone-grpc-proto/proto/geyser.proto).

View File

@ -42,5 +42,6 @@
},
"prometheus": {
"address": "0.0.0.0:8999"
}
},
"block_fail_action": "log"
}

View File

@ -16,6 +16,9 @@ pub struct Config {
pub grpc: ConfigGrpc,
#[serde(default)]
pub prometheus: Option<ConfigPrometheus>,
/// Action on block re-construction error
#[serde(default)]
pub block_fail_action: ConfigBlockFailAction,
}
impl Config {
@ -251,6 +254,19 @@ pub struct ConfigPrometheus {
pub address: SocketAddr,
}
#[derive(Debug, Clone, Copy, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum ConfigBlockFailAction {
Log,
Panic,
}
impl Default for ConfigBlockFailAction {
fn default() -> Self {
Self::Log
}
}
fn deserialize_usize_str<'de, D>(deserializer: D) -> Result<usize, D::Error>
where
D: Deserializer<'de>,

View File

@ -1,6 +1,6 @@
use {
crate::{
config::ConfigGrpc,
config::{ConfigBlockFailAction, ConfigGrpc},
filters::{Filter, FilterAccountsDataSlice},
prom::{CONNECTIONS_TOTAL, INVALID_FULL_BLOCKS, MESSAGE_QUEUE_SIZE},
proto::{
@ -586,6 +586,7 @@ pub struct GrpcService {
impl GrpcService {
pub fn create(
config: ConfigGrpc,
block_fail_action: ConfigBlockFailAction,
) -> Result<
(mpsc::UnboundedSender<Message>, oneshot::Sender<()>),
Box<dyn std::error::Error + Send + Sync>,
@ -615,7 +616,12 @@ impl GrpcService {
// Run geyser message loop
let (messages_tx, messages_rx) = mpsc::unbounded_channel();
tokio::spawn(Self::geyser_loop(messages_rx, blocks_meta_tx, broadcast_tx));
tokio::spawn(Self::geyser_loop(
messages_rx,
blocks_meta_tx,
broadcast_tx,
block_fail_action,
));
// Run Server
let (shutdown_tx, shutdown_rx) = oneshot::channel();
@ -641,6 +647,7 @@ impl GrpcService {
mut messages_rx: mpsc::UnboundedReceiver<Message>,
blocks_meta_tx: mpsc::UnboundedSender<Message>,
broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc<Vec<Message>>)>,
block_fail_action: ConfigBlockFailAction,
) {
const PROCESSED_MESSAGES_MAX: usize = 31;
const PROCESSED_MESSAGES_SLEEP: Duration = Duration::from_millis(10);
@ -720,7 +727,15 @@ impl GrpcService {
let processed_message = $message.clone();
let (vec, map, collected) = messages.entry($message.get_slot()).or_default();
if *collected && !matches!(&$message, Message::Block(_) | Message::BlockMeta(_)) {
error!("unexpected message order for slot {}", $message.get_slot());
match block_fail_action {
ConfigBlockFailAction::Log => {
INVALID_FULL_BLOCKS.inc();
error!("unexpected message order for slot {}", $message.get_slot());
}
ConfigBlockFailAction::Panic => {
panic!("unexpected message order for slot {}", $message.get_slot());
}
}
}
if let Message::Account(message) = &$message {
let write_version = message.account.write_version;
@ -780,10 +795,11 @@ impl GrpcService {
transactions.get(&slot),
Some((Some(block_meta), transactions)) if block_meta.executed_transaction_count as usize == transactions.len()
) {
let (block_meta, mut transactions) = transactions.remove(&slot).expect("checked");
transactions.sort_by(|tx1, tx2| tx1.index.cmp(&tx2.index));
let mut message = Message::Block((block_meta.expect("checked"), transactions).into());
process_message!(message);
if let Some((Some(block_meta), mut transactions)) = transactions.remove(&slot) {
transactions.sort_by(|tx1, tx2| tx1.index.cmp(&tx2.index));
let mut message = Message::Block((block_meta, transactions).into());
process_message!(message);
}
}
// remove outdated transactions
@ -797,8 +813,15 @@ impl GrpcService {
// Maybe log error
Some(kslot) if kslot == slot => {
if let Some((Some(_), vec)) = transactions.remove(&kslot) {
INVALID_FULL_BLOCKS.inc();
error!("{} transactions left for block {kslot}", vec.len());
match block_fail_action {
ConfigBlockFailAction::Log => {
INVALID_FULL_BLOCKS.inc();
error!("{} transactions left for block {kslot}", vec.len());
}
ConfigBlockFailAction::Panic => {
panic!("{} transactions left for block {kslot}", vec.len());
}
}
}
}
_ => break,

View File

@ -76,8 +76,9 @@ impl GeyserPlugin for Plugin {
let runtime = Runtime::new().map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;
let (grpc_channel, grpc_shutdown_tx, prometheus) = runtime.block_on(async move {
let (grpc_channel, grpc_shutdown_tx) = GrpcService::create(config.grpc)
.map_err(|error| GeyserPluginError::Custom(error))?;
let (grpc_channel, grpc_shutdown_tx) =
GrpcService::create(config.grpc, config.block_fail_action)
.map_err(|error| GeyserPluginError::Custom(error))?;
let prometheus = PrometheusService::new(config.prometheus)
.map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;
Ok::<_, GeyserPluginError>((grpc_channel, grpc_shutdown_tx, prometheus))