Compare commits

...

2 Commits

Author SHA1 Message Date
GroovieGermanikus 4157779f12
subscribe to all accounts 2024-05-10 18:23:16 +02:00
GroovieGermanikus f6206641b9
fix ordering of slot change 2024-05-10 18:21:57 +02:00
1 changed files with 23 additions and 21 deletions

View File

@ -68,20 +68,20 @@ pub async fn main() {
// exit_notify.resubscribe(),
// );
// let _all_accounts = create_geyser_autoconnection_task_with_mpsc(
// config.clone(),
// all_accounts(),
// autoconnect_tx.clone(),
// exit_notify.resubscribe(),
// );
let _token_accounts_task = create_geyser_autoconnection_task_with_mpsc(
let _all_accounts = create_geyser_autoconnection_task_with_mpsc(
config.clone(),
token_accounts(),
all_accounts(),
autoconnect_tx.clone(),
exit_notify.resubscribe(),
);
// let _token_accounts_task = create_geyser_autoconnection_task_with_mpsc(
// config.clone(),
// token_accounts(),
// autoconnect_tx.clone(),
// exit_notify.resubscribe(),
// );
let current_processed_slot = AtomicSlot::default();
start_tracking_slots(current_processed_slot.clone());
start_tracking_account_consumer(geyser_messages_rx, current_processed_slot.clone());
@ -190,15 +190,6 @@ fn start_tracking_account_consumer(mut geyser_messages_rx: Receiver<Message>, cu
let slot = update.slot;
let account_receive_time = get_epoch_sec();
let latest_slot = current_processed_slot.load(Ordering::Relaxed);
if latest_slot != 0 {
// the perfect is value "-1"
let delta = (latest_slot as i64) - (slot as i64);
if debouncer.can_fire() {
debug!("Account info for upcoming slot {} was {} behind current processed slot", slot, delta);
}
}
// if account_info.data.len() > 1000 {
// trace!("got account update!!! {} - {:?} - {} bytes",
@ -221,13 +212,13 @@ fn start_tracking_account_consumer(mut geyser_messages_rx: Receiver<Message>, cu
info!("Slot: {} - num of update messages: {}", slot, updates_per_slot.get(&current_slot).unwrap());
let counters = wallclock_updates_per_slot_account.iter()
let per_account_updates = wallclock_updates_per_slot_account.iter()
.filter(|((slot, _pubkey), _)| slot == &current_slot)
.map(|((_slot, _pubkey), updates)| updates.len() as f64)
.sorted_by(|a, b| a.partial_cmp(b).unwrap())
.collect_vec();
let count_histogram = histogram_percentiles::calculate_percentiles(&counters);
info!("Count histogram: {}", count_histogram);
let per_account_updates_histogram = histogram_percentiles::calculate_percentiles(&per_account_updates);
info!("Per-account updates histogram: {}", per_account_updates_histogram);
if let Some(actual_block_time) = block_time_per_slot.get(&current_slot) {
info!("Block time for slot {}: delta {} seconds", current_slot, account_receive_time - *actual_block_time);
@ -247,6 +238,17 @@ fn start_tracking_account_consumer(mut geyser_messages_rx: Receiver<Message>, cu
} // -- slot changed
current_slot = slot;
let latest_slot = current_processed_slot.load(Ordering::Relaxed);
if latest_slot != 0 {
// the perfect is value "-1"
let delta = (latest_slot as i64) - (slot as i64);
if debouncer.can_fire() {
debug!("Account info for upcoming slot {} was {} behind current processed slot", slot, delta);
}
}
}
None => {}
_ => {}