add tool to dump slot stream

This commit is contained in:
GroovieGermanikus 2024-08-13 15:42:21 +02:00
parent 0ef4317a1d
commit 29b68f42e9
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 34 additions and 16 deletions

View File

@ -1,12 +1,12 @@
use std::collections::HashMap;
///
/// get a sample stream of slots with slot number, parent and status to test logic of chain_data with that
///
use log::{info, warn};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use std::collections::HashMap;
use std::env;
use std::time::SystemTime;
use base64::Engine;
use itertools::Itertools;
@ -23,7 +23,7 @@ use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature;
use solana_sdk::transaction::TransactionError;
use tokio::sync::mpsc::Receiver;
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots, SubscribeUpdateBlock};
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterSlots, SubscribeUpdateSlot};
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
@ -38,7 +38,16 @@ fn start_slot_multi_consumer(mut slots_channel: Receiver<Message>) {
match slots_channel.recv().await {
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
Some(UpdateOneof::Slot(meta)) => {
info!("emitted slot #{} from multiplexer", meta.slot);
let since_epoch_ms = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis();
let short_status = match map_slot_status(&meta) {
CommitmentLevel::Processed => "P",
CommitmentLevel::Confirmed => "C",
CommitmentLevel::Finalized => "F",
_ => { panic!("unexpected commitment level") }
};
// e.g. 2024-08-13T13:41:32.340860Z INFO dump_slots_stream_samples: DUMP 283356662,283356661,F,1723556492340
info!("DUMP {},{:09},{},{}", meta.slot, meta.parent.unwrap_or(0), short_status, since_epoch_ms);
}
None => {}
_ => {}
@ -53,6 +62,16 @@ fn start_slot_multi_consumer(mut slots_channel: Receiver<Message>) {
});
}
fn map_slot_status(slot_update: &SubscribeUpdateSlot) -> solana_sdk::commitment_config::CommitmentLevel {
use yellowstone_grpc_proto::geyser::CommitmentLevel as yCL;
use solana_sdk::commitment_config::CommitmentLevel as solanaCL;
yellowstone_grpc_proto::geyser::CommitmentLevel::try_from(slot_update.status).map(|v| match v {
yCL::Processed => solanaCL::Processed,
yCL::Confirmed => solanaCL::Confirmed,
yCL::Finalized => solanaCL::Finalized,
}).expect("valid commitment level")
}
#[tokio::main]
pub async fn main() {
@ -62,7 +81,7 @@ pub async fn main() {
let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok();
info!(
"Using green on {} ({})",
"Using gRPC source {} ({})",
grpc_addr_green,
grpc_x_token_green.is_some()
);
@ -73,33 +92,32 @@ pub async fn main() {
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};
let (_, exit_notify) = tokio::sync::broadcast::channel(1);
let green_config =
GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
let (autoconnect_tx, blockmeta_rx) = tokio::sync::mpsc::channel(10);
let (autoconnect_tx, slots_rx) = tokio::sync::mpsc::channel(10);
let _green_stream_ah = create_geyser_autoconnection_task_with_mpsc(
green_config.clone(),
slots_all_confirmation_levels(),
autoconnect_tx.clone(),
exit_notify.resubscribe(),
);
start_slot_multi_consumer(blockmeta_rx);
start_slot_multi_consumer(slots_rx);
// "infinite" sleep
sleep(Duration::from_secs(1800)).await;
}
fn slots_all_confirmation_levels() -> SubscribeRequest {
let mut slot_subs = HashMap::new();
slot_subs.insert("client".to_string(), SubscribeRequestFilterSlots {
// implies all slots
filter_by_commitment: None,
});
slot_subs.insert(
"client".to_string(),
SubscribeRequestFilterSlots {
// implies all slots
filter_by_commitment: None,
},
);
SubscribeRequest {
slots: slot_subs,
ping: None,