From 88f8d71ba8008f4db43d9969c4a63294d14fb4fb Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Tue, 7 May 2024 20:31:19 +0200 Subject: [PATCH] count data --- examples/stream_token_accounts.rs | 50 ++++++++++++++++++++++++++----- 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/examples/stream_token_accounts.rs b/examples/stream_token_accounts.rs index b5ca82f..0af0d03 100644 --- a/examples/stream_token_accounts.rs +++ b/examples/stream_token_accounts.rs @@ -19,11 +19,11 @@ use tokio::time::{sleep, Duration}; use tracing::field::debug; use tracing::warn; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; -use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeUpdate}; +use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots, SubscribeUpdate}; use yellowstone_grpc_proto::prost::Message as _; -const ENABLE_TIMESTAMP_TAGGING: bool = true; +const ENABLE_TIMESTAMP_TAGGING: bool = false; #[tokio::main] pub async fn main() { @@ -63,16 +63,27 @@ pub async fn main() { tokio::spawn(async move { + let mut bytes_per_slot: HashMap = HashMap::new(); + let mut updates_per_slot: HashMap = HashMap::new(); + + let mut changing_slot = 0; + let mut current_slot = 0; + + let mut green_stream = pin!(green_stream); while let Some(message) = green_stream.next().await { match message { Message::GeyserSubscribeUpdate(subscriber_update) => { match subscriber_update.update_oneof { + Some(UpdateOneof::Slot(update)) => { + current_slot = update.slot; + } Some(UpdateOneof::Account(update)) => { + let slot = update.slot as Slot; let account = update.account.unwrap(); let account_pk = Pubkey::try_from(account.pubkey).unwrap(); - let size = account.data.len(); - info!("got account update (green)!!! {} - {:?} - {} bytes", + let size = account.data.len() as u64; + trace!("got account update (green)!!! {} - {:?} - {} bytes", update.slot, account_pk, account.data.len()); if ENABLE_TIMESTAMP_TAGGING { @@ -120,6 +131,22 @@ pub async fn main() { .or_insert_with(DashMap::new) .insert(mint, account_ui); + bytes_per_slot.entry(slot) + .and_modify(|total| *total += size).or_insert(size); + + updates_per_slot.entry(slot) + .and_modify(|total| *total += 1).or_insert(1); + + info!("delta: {}", (slot as i64) - (current_slot as i64)); + + if slot != changing_slot && changing_slot != 0 { + let total_bytes = bytes_per_slot.get(&changing_slot).unwrap(); + let updates_count = updates_per_slot.get(&changing_slot).unwrap(); + // info!("Slot {} - Total bytes: {} in {} updates", slot, total_bytes, updates_count); + } + changing_slot = slot; + + } Ok(TokenAccountType::Mint(mint)) => { // not interesting @@ -173,16 +200,23 @@ pub fn token_accounts() -> SubscribeRequest { accounts_subs.insert( "client".to_string(), SubscribeRequestFilterAccounts { - account: vec!["4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg".to_string()], - owner: vec![], - // spl_token_ids().iter().map(|pubkey| pubkey.to_string()).collect(), + account: vec![], + // vec!["4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg".to_string()], + owner: + spl_token_ids().iter().map(|pubkey| pubkey.to_string()).collect(), filters: vec![], }, ); + let mut slots_subs = HashMap::new(); + slots_subs.insert("client".to_string(), SubscribeRequestFilterSlots { + filter_by_commitment: Some(true), + }); + + SubscribeRequest { - slots: HashMap::new(), + slots: slots_subs, accounts: accounts_subs, transactions: HashMap::new(), entry: Default::default(),