From 29b68f42e9a386071db098f7e6d9723c6177c44a Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Tue, 13 Aug 2024 15:42:21 +0200 Subject: [PATCH] add tool to dump slot stream --- examples/dump_slots_stream_samples.rs | 50 ++++++++++++++++++--------- 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/examples/dump_slots_stream_samples.rs b/examples/dump_slots_stream_samples.rs index 115c7ee..70acc56 100644 --- a/examples/dump_slots_stream_samples.rs +++ b/examples/dump_slots_stream_samples.rs @@ -1,12 +1,12 @@ -use std::collections::HashMap; /// /// get a sample stream of slots with slot number, parent and status to test logic of chain_data with that /// - use log::{info, warn}; use solana_sdk::clock::Slot; -use solana_sdk::commitment_config::CommitmentConfig; +use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; +use std::collections::HashMap; use std::env; +use std::time::SystemTime; use base64::Engine; use itertools::Itertools; @@ -23,7 +23,7 @@ use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signature; use solana_sdk::transaction::TransactionError; use tokio::sync::mpsc::Receiver; -use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots, SubscribeUpdateBlock}; +use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterSlots, SubscribeUpdateSlot}; use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc; use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor; @@ -38,7 +38,16 @@ fn start_slot_multi_consumer(mut slots_channel: Receiver) { match slots_channel.recv().await { Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof { Some(UpdateOneof::Slot(meta)) => { - info!("emitted slot #{} from multiplexer", meta.slot); + let since_epoch_ms = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis(); + + let short_status = match map_slot_status(&meta) { + CommitmentLevel::Processed => "P", + CommitmentLevel::Confirmed => "C", + CommitmentLevel::Finalized => "F", + _ => { panic!("unexpected commitment level") } + }; + // e.g. 2024-08-13T13:41:32.340860Z INFO dump_slots_stream_samples: DUMP 283356662,283356661,F,1723556492340 + info!("DUMP {},{:09},{},{}", meta.slot, meta.parent.unwrap_or(0), short_status, since_epoch_ms); } None => {} _ => {} @@ -53,6 +62,16 @@ fn start_slot_multi_consumer(mut slots_channel: Receiver) { }); } +fn map_slot_status(slot_update: &SubscribeUpdateSlot) -> solana_sdk::commitment_config::CommitmentLevel { + use yellowstone_grpc_proto::geyser::CommitmentLevel as yCL; + use solana_sdk::commitment_config::CommitmentLevel as solanaCL; + yellowstone_grpc_proto::geyser::CommitmentLevel::try_from(slot_update.status).map(|v| match v { + yCL::Processed => solanaCL::Processed, + yCL::Confirmed => solanaCL::Confirmed, + yCL::Finalized => solanaCL::Finalized, + }).expect("valid commitment level") +} + #[tokio::main] pub async fn main() { @@ -62,7 +81,7 @@ pub async fn main() { let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok(); info!( - "Using green on {} ({})", + "Using gRPC source {} ({})", grpc_addr_green, grpc_x_token_green.is_some() ); @@ -73,33 +92,32 @@ pub async fn main() { subscribe_timeout: Duration::from_secs(5), receive_timeout: Duration::from_secs(5), }; - let (_, exit_notify) = tokio::sync::broadcast::channel(1); let green_config = GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone()); - let (autoconnect_tx, blockmeta_rx) = tokio::sync::mpsc::channel(10); + let (autoconnect_tx, slots_rx) = tokio::sync::mpsc::channel(10); let _green_stream_ah = create_geyser_autoconnection_task_with_mpsc( green_config.clone(), slots_all_confirmation_levels(), autoconnect_tx.clone(), - exit_notify.resubscribe(), ); - start_slot_multi_consumer(blockmeta_rx); - + start_slot_multi_consumer(slots_rx); // "infinite" sleep sleep(Duration::from_secs(1800)).await; } - fn slots_all_confirmation_levels() -> SubscribeRequest { let mut slot_subs = HashMap::new(); - slot_subs.insert("client".to_string(), SubscribeRequestFilterSlots { - // implies all slots - filter_by_commitment: None, - }); + slot_subs.insert( + "client".to_string(), + SubscribeRequestFilterSlots { + // implies all slots + filter_by_commitment: None, + }, + ); SubscribeRequest { slots: slot_subs, ping: None,