stop subscribing blocks meta
This commit is contained in:
parent
430847b2c1
commit
61b202af9f
|
@ -8,7 +8,7 @@ use std::env;
|
||||||
use std::pin::pin;
|
use std::pin::pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::time::{SystemTime, UNIX_EPOCH};
|
use std::time::{Instant, SystemTime, UNIX_EPOCH};
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use solana_account_decoder::parse_token::spl_token_ids;
|
use solana_account_decoder::parse_token::spl_token_ids;
|
||||||
use tokio::sync::mpsc::Receiver;
|
use tokio::sync::mpsc::Receiver;
|
||||||
|
@ -68,9 +68,9 @@ pub async fn main() {
|
||||||
// exit_notify.resubscribe(),
|
// exit_notify.resubscribe(),
|
||||||
// );
|
// );
|
||||||
|
|
||||||
let _all_accounts_and_blocksmeta_task = create_geyser_autoconnection_task_with_mpsc(
|
let _all_accounts = create_geyser_autoconnection_task_with_mpsc(
|
||||||
config.clone(),
|
config.clone(),
|
||||||
all_accounts_and_blocksmeta(),
|
all_accounts(),
|
||||||
autoconnect_tx.clone(),
|
autoconnect_tx.clone(),
|
||||||
exit_notify.resubscribe(),
|
exit_notify.resubscribe(),
|
||||||
);
|
);
|
||||||
|
@ -182,6 +182,7 @@ fn start_tracking_account_consumer(mut geyser_messages_rx: Receiver<Message>, cu
|
||||||
match geyser_messages_rx.recv().await {
|
match geyser_messages_rx.recv().await {
|
||||||
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
|
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
|
||||||
Some(UpdateOneof::Account(update)) => {
|
Some(UpdateOneof::Account(update)) => {
|
||||||
|
let started_at = Instant::now();
|
||||||
let now = SystemTime::now();
|
let now = SystemTime::now();
|
||||||
let account_info = update.account.unwrap();
|
let account_info = update.account.unwrap();
|
||||||
let account_pk = Pubkey::try_from(account_info.pubkey).unwrap();
|
let account_pk = Pubkey::try_from(account_info.pubkey).unwrap();
|
||||||
|
@ -214,40 +215,37 @@ fn start_tracking_account_consumer(mut geyser_messages_rx: Receiver<Message>, cu
|
||||||
.and_modify(|entry| entry.push(now))
|
.and_modify(|entry| entry.push(now))
|
||||||
.or_insert(vec![now]);
|
.or_insert(vec![now]);
|
||||||
|
|
||||||
if current_slot != slot {
|
if current_slot != slot && current_slot != 0 {
|
||||||
info!("Slot: {}", slot);
|
info!("New Slot: {}", slot);
|
||||||
if current_slot != 0 {
|
info!("Slot: {} - account data transferred: {:.2} MiB", slot, *bytes_per_slot.get(¤t_slot).unwrap() as f64 / 1024.0 / 1024.0 );
|
||||||
info!("Slot: {} - account data transferred: {:.2} MiB", slot, *bytes_per_slot.get(¤t_slot).unwrap() as f64 / 1024.0 / 1024.0 );
|
|
||||||
|
|
||||||
info!("Slot: {} - num of update messages: {}", slot, updates_per_slot.get(¤t_slot).unwrap());
|
info!("Slot: {} - num of update messages: {}", slot, updates_per_slot.get(¤t_slot).unwrap());
|
||||||
|
|
||||||
let counters = wallclock_updates_per_slot_account.iter()
|
|
||||||
.filter(|((slot, _pubkey), _)| slot == ¤t_slot)
|
|
||||||
.map(|((_slot, _pubkey), updates)| updates.len() as f64)
|
|
||||||
.sorted_by(|a, b| a.partial_cmp(b).unwrap())
|
|
||||||
.collect_vec();
|
|
||||||
let count_histogram = histogram_percentiles::calculate_percentiles(&counters);
|
|
||||||
info!("Count histogram: {}", count_histogram);
|
|
||||||
|
|
||||||
if let Some(actual_block_time) = block_time_per_slot.get(¤t_slot) {
|
|
||||||
info!("Block time for slot {}: delta {} seconds", current_slot, account_receive_time - *actual_block_time);
|
|
||||||
}
|
|
||||||
|
|
||||||
let wallclock_minmax = wallclock_updates_per_slot_account.iter()
|
|
||||||
.filter(|((slot, _pubkey), _)| slot == ¤t_slot)
|
|
||||||
.flat_map(|((_slot, _pubkey), updates)| updates)
|
|
||||||
.minmax();
|
|
||||||
if let Some((min, max)) = wallclock_minmax.into_option() {
|
|
||||||
info!("Wallclock timestamp between first and last account update received for slot {}: {:.2}s",
|
|
||||||
current_slot,
|
|
||||||
max.duration_since(*min).unwrap().as_secs_f64()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
let counters = wallclock_updates_per_slot_account.iter()
|
||||||
|
.filter(|((slot, _pubkey), _)| slot == ¤t_slot)
|
||||||
|
.map(|((_slot, _pubkey), updates)| updates.len() as f64)
|
||||||
|
.sorted_by(|a, b| a.partial_cmp(b).unwrap())
|
||||||
|
.collect_vec();
|
||||||
|
let count_histogram = histogram_percentiles::calculate_percentiles(&counters);
|
||||||
|
info!("Count histogram: {}", count_histogram);
|
||||||
|
|
||||||
|
if let Some(actual_block_time) = block_time_per_slot.get(¤t_slot) {
|
||||||
|
info!("Block time for slot {}: delta {} seconds", current_slot, account_receive_time - *actual_block_time);
|
||||||
}
|
}
|
||||||
current_slot = slot;
|
|
||||||
}
|
let wallclock_minmax = wallclock_updates_per_slot_account.iter()
|
||||||
|
.filter(|((slot, _pubkey), _)| slot == ¤t_slot)
|
||||||
|
.flat_map(|((_slot, _pubkey), updates)| updates)
|
||||||
|
.minmax();
|
||||||
|
if let Some((min, max)) = wallclock_minmax.into_option() {
|
||||||
|
info!("Wallclock timestamp between first and last account update received for slot {}: {:.2}s",
|
||||||
|
current_slot,
|
||||||
|
max.duration_since(*min).unwrap().as_secs_f64()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
} // -- slot changed
|
||||||
|
current_slot = slot;
|
||||||
|
|
||||||
}
|
}
|
||||||
None => {}
|
None => {}
|
||||||
|
|
Loading…
Reference in New Issue