diff --git a/examples/dump_slots_stream_samples.rs b/examples/dump_slots_stream_samples.rs index 8fcee34..bbca239 100644 --- a/examples/dump_slots_stream_samples.rs +++ b/examples/dump_slots_stream_samples.rs @@ -6,6 +6,7 @@ use solana_sdk::clock::Slot; use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; use std::collections::HashMap; use std::env; +use std::str::FromStr; use std::time::SystemTime; use base64::Engine; @@ -32,23 +33,35 @@ use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConf use tokio::time::{sleep, Duration}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; +use yellowstone_grpc_proto::prelude::SubscribeRequestFilterAccounts; fn start_slot_multi_consumer(mut slots_channel: Receiver) { tokio::spawn(async move { loop { match slots_channel.recv().await { Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof { - Some(UpdateOneof::Slot(meta)) => { + Some(UpdateOneof::Slot(update_slot)) => { let since_epoch_ms = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis(); - let short_status = match map_slot_status(&meta) { + let short_status = match map_slot_status(&update_slot) { CommitmentLevel::Processed => "P", CommitmentLevel::Confirmed => "C", CommitmentLevel::Finalized => "F", _ => { panic!("unexpected commitment level") } }; - // e.g. 2024-08-13T13:41:32.340860Z INFO dump_slots_stream_samples: DUMP 283356662,283356661,F,1723556492340 - info!("DUMP {},{:09},{},{}", meta.slot, meta.parent.unwrap_or(0), short_status, since_epoch_ms); + // DUMPSLOT 283356662,283356661,F,1723556492340 + info!("DUMPSLOT {},{:09},{},{}", update_slot.slot, update_slot.parent.unwrap_or(0), short_status, since_epoch_ms); + } + Some(UpdateOneof::Account(update_account)) => { + let since_epoch_ms = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis(); + + let account_info = update_account.account.unwrap(); + let slot = update_account.slot; + let account_pk = Pubkey::new_from_array(account_info.pubkey.try_into().unwrap()); + let write_version = account_info.write_version; + let data_len = account_info.data.len(); + // DUMPACCOUNT 283417593,HTQeo4GNbZfGY5G4fAkDr1S5xnz5qWXFgueRwgw53aU1,1332997857270,752,1723582355872 + info!("DUMPACCOUNT {},{},{},{},{}", slot, account_pk, write_version, data_len, since_epoch_ms); } None => {} _ => {} @@ -110,6 +123,8 @@ pub async fn main() { sleep(Duration::from_secs(3600 * 5)).await; } +const RAYDIUM_AMM_PUBKEY: &'static str = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8"; + fn slots_all_confirmation_levels() -> SubscribeRequest { let mut slot_subs = HashMap::new(); slot_subs.insert( @@ -119,9 +134,21 @@ fn slots_all_confirmation_levels() -> SubscribeRequest { filter_by_commitment: None, }, ); + let mut account_subs = HashMap::new(); + account_subs.insert( + "client".to_string(), + SubscribeRequestFilterAccounts { + account: vec![], + owner: vec![RAYDIUM_AMM_PUBKEY.to_string()], + filters: vec![], + }, + ); + SubscribeRequest { slots: slot_subs, + accounts: account_subs, ping: None, + // implies "processed" commitment: None, ..Default::default() }