find most active accounts

This commit is contained in:
GroovieGermanikus 2024-11-28 08:48:03 +01:00
parent 9ce723fb06
commit 19b9138129
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 82 additions and 2 deletions

View File

@ -4,7 +4,7 @@
// ```
//
use log::info;
use log::{info, trace};
use solana_account_decoder::parse_token::spl_token_ids;
use solana_sdk::clock::UnixTimestamp;
use solana_sdk::pubkey::Pubkey;
@ -13,6 +13,7 @@ use std::env;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use itertools::Itertools;
use tokio::sync::mpsc::Receiver;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
@ -61,7 +62,7 @@ pub async fn main() {
);
let current_processed_slot = AtomicSlot::default();
start_tracking_account_consumer(geyser_messages_rx, current_processed_slot.clone());
start_account_scanner_sol_changes(geyser_messages_rx, current_processed_slot.clone());
// "infinite" sleep
sleep(Duration::from_secs(1800)).await;
@ -102,6 +103,67 @@ fn start_tracking_account_consumer(
});
}
// note: this keeps track of lot of data and might blow up memory
fn start_account_scanner_sol_changes(
mut geyser_messages_rx: Receiver<Message>,
_current_processed_slot: Arc<AtomicU64>,
) {
tokio::spawn(async move {
// note: account_pk -> (count, last_lamport)
let mut account_lamp_changes: HashMap<Pubkey, (u64, u64)> = HashMap::new();
let mut loop_count = 0;
loop {
loop_count += 1;
match geyser_messages_rx.recv().await {
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
Some(UpdateOneof::Account(update)) => {
let account_info = update.account.unwrap();
let account_pk = Pubkey::try_from(account_info.pubkey).unwrap();
let account_owner_pk = Pubkey::try_from(account_info.owner).unwrap();
// note: slot is referencing the block that is just built while the slot number reported from BlockMeta/Slot uses the slot after the block is built
let slot = update.slot;
let account_receive_time = get_epoch_sec();
let (count, last_lamport) = account_lamp_changes.entry(account_pk).or_insert((0, 999));
if *last_lamport != account_info.lamports {
*count += 1;
*last_lamport = account_info.lamports;
}
if loop_count % 100_000 == 0 {
info!("loop count: {}", loop_count);
info!("accounts: {}", account_lamp_changes.len());
info!("account changes: {:?}", account_lamp_changes.get(&account_pk));
let top_100 = account_lamp_changes.iter().filter(|(_k, (count, _lamp))| *count > 100)
.sorted_unstable_by_key(|(_k, (count, _lamp))| u64::MAX - count)
.take(100)
.collect_vec();
info!("top 100: {:?}", top_100);
}
trace!(
"Account update: slot: {}, account_pk: {}, account_owner_pk: {}, account_receive_time: {}",
slot, account_pk, account_owner_pk, account_receive_time
);
}
None => {}
_ => {}
},
None => {
log::warn!("multiplexer channel closed - aborting");
return;
}
Some(Message::Connecting(_)) => {}
}
}
});
}
fn get_epoch_sec() -> UnixTimestamp {
SystemTime::now()
.duration_since(UNIX_EPOCH)
@ -177,6 +239,24 @@ pub fn all_accounts() -> SubscribeRequest {
}
}
pub fn jup_account() -> SubscribeRequest {
let mut accounts_subs = HashMap::new();
accounts_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec!["3hvmJXPFW7ERXvWRDx6v9oKShwsxRsYn8jZoNddiHcaJ".to_string()],
owner: vec![],
filters: vec![],
},
);
SubscribeRequest {
accounts: accounts_subs,
..Default::default()
}
}
pub fn slots() -> SubscribeRequest {
let mut slots_subs = HashMap::new();
slots_subs.insert(