calculate delta
This commit is contained in:
parent
e09e5e2d36
commit
1393527259
|
@ -1,3 +1,4 @@
|
|||
use std::collections::{HashMap, VecDeque};
|
||||
use futures::{Stream, StreamExt};
|
||||
use log::info;
|
||||
use solana_sdk::clock::Slot;
|
||||
|
@ -5,12 +6,13 @@ use solana_sdk::commitment_config::CommitmentConfig;
|
|||
use solana_sdk::pubkey::Pubkey;
|
||||
use std::env;
|
||||
use std::pin::pin;
|
||||
use itertools::Itertools;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
|
||||
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
|
||||
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
|
||||
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
|
||||
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message};
|
||||
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, histogram_percentiles, Message};
|
||||
use tokio::time::{sleep, Duration};
|
||||
use tracing::warn;
|
||||
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
|
||||
|
@ -43,7 +45,7 @@ pub async fn main() {
|
|||
|
||||
info!("Write Block stream..");
|
||||
|
||||
let (autoconnect_tx, accounts_rx) = tokio::sync::mpsc::channel(10);
|
||||
let (autoconnect_tx, geyser_messages_rx) = tokio::sync::mpsc::channel(10);
|
||||
let (_exit, exit_notify) = tokio::sync::broadcast::channel(1);
|
||||
|
||||
let _accounts_task = create_geyser_autoconnection_task_with_mpsc(
|
||||
|
@ -53,28 +55,92 @@ pub async fn main() {
|
|||
exit_notify.resubscribe(),
|
||||
);
|
||||
|
||||
start_tracking_account_consumer(accounts_rx);
|
||||
let _slots_task = create_geyser_autoconnection_task_with_mpsc(
|
||||
config.clone(),
|
||||
GeyserFilter(CommitmentConfig::processed()).slots(),
|
||||
autoconnect_tx.clone(),
|
||||
exit_notify.resubscribe(),
|
||||
);
|
||||
|
||||
start_tracking_account_consumer(geyser_messages_rx);
|
||||
|
||||
// "infinite" sleep
|
||||
sleep(Duration::from_secs(1800)).await;
|
||||
}
|
||||
|
||||
|
||||
fn start_tracking_account_consumer(mut multiplex_channel: Receiver<Message>) {
|
||||
fn start_tracking_account_consumer(mut geyser_messages_rx: Receiver<Message>) {
|
||||
const RECENT_SLOTS_LIMIT: usize = 30;
|
||||
|
||||
tokio::spawn(async move {
|
||||
|
||||
let mut bytes_per_slot = HashMap::<Slot, usize>::new();
|
||||
let mut updates_per_slot = HashMap::<Slot, usize>::new();
|
||||
let mut count_updates_per_slot_account = HashMap::<(Slot, Pubkey), usize>::new();
|
||||
// slot written by account update
|
||||
let mut current_slot: Slot = 0;
|
||||
// slot from slot stream
|
||||
let mut actual_slot: Slot = 0;
|
||||
|
||||
let mut recent_slot_deltas: VecDeque<i64> = VecDeque::with_capacity(RECENT_SLOTS_LIMIT);
|
||||
|
||||
loop {
|
||||
match multiplex_channel.recv().await {
|
||||
match geyser_messages_rx.recv().await {
|
||||
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
|
||||
Some(UpdateOneof::Account(update)) => {
|
||||
let account_info = update.account.unwrap();
|
||||
let account_pk = Pubkey::try_from(account_info.pubkey).unwrap();
|
||||
info!(
|
||||
"got account update (green)!!! {} - {:?} - {} bytes",
|
||||
update.slot,
|
||||
account_pk,
|
||||
account_info.data.len()
|
||||
);
|
||||
let bytes: [u8; 32] = account_pk.to_bytes();
|
||||
let slot = update.slot;
|
||||
|
||||
if actual_slot != slot {
|
||||
if actual_slot != 0 {
|
||||
recent_slot_deltas.push_back((actual_slot as i64) - (slot as i64));
|
||||
if recent_slot_deltas.len() > RECENT_SLOTS_LIMIT {
|
||||
recent_slot_deltas.pop_front();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bytes_per_slot.entry(slot)
|
||||
.and_modify(|entry| *entry += account_info.data.len())
|
||||
.or_insert(account_info.data.len());
|
||||
updates_per_slot.entry(slot)
|
||||
.and_modify(|entry| *entry += 1)
|
||||
.or_insert(1);
|
||||
count_updates_per_slot_account.entry((slot, account_pk))
|
||||
.and_modify(|entry| *entry += 1)
|
||||
.or_insert(1);
|
||||
|
||||
if current_slot != slot {
|
||||
info!("Slot: {}", slot);
|
||||
if current_slot != 0 {
|
||||
info!("Slot: {} - {:.2} MiB", slot, *bytes_per_slot.get(¤t_slot).unwrap() as f64 / 1024.0 / 1024.0 );
|
||||
|
||||
info!("Slot: {} - Updates: {}", slot, updates_per_slot.get(¤t_slot).unwrap());
|
||||
|
||||
let mut counters = count_updates_per_slot_account.iter()
|
||||
.filter(|((slot, _pubkey), _)| slot == ¤t_slot)
|
||||
.map(|((_slot, _pubkey), count)| *count as f64)
|
||||
.sorted_by(|a, b| a.partial_cmp(b).unwrap())
|
||||
.collect_vec();
|
||||
let count_histogram = histogram_percentiles::calculate_percentiles(&counters);
|
||||
info!("Count histogram: {}", count_histogram);
|
||||
|
||||
|
||||
let deltas = recent_slot_deltas.iter()
|
||||
.map(|x| *x as f64)
|
||||
.sorted_by(|a, b| a.partial_cmp(b).unwrap())
|
||||
.collect_vec();
|
||||
let deltas_histogram = histogram_percentiles::calculate_percentiles(&deltas);
|
||||
info!("Deltas histogram: {}", deltas_histogram);
|
||||
|
||||
}
|
||||
current_slot = slot;
|
||||
}
|
||||
|
||||
}
|
||||
Some(UpdateOneof::Slot(update)) => {
|
||||
actual_slot = update.slot;
|
||||
}
|
||||
None => {}
|
||||
_ => {}
|
||||
|
|
|
@ -11,8 +11,9 @@ pub mod channel_plugger;
|
|||
pub mod grpc_subscription_autoreconnect_streams;
|
||||
pub mod grpc_subscription_autoreconnect_tasks;
|
||||
pub mod grpcmultiplex_fastestwins;
|
||||
pub mod yellowstone_grpc_util;
|
||||
mod obfuscate;
|
||||
pub mod histogram_percentiles;
|
||||
pub mod yellowstone_grpc_util;
|
||||
|
||||
// 1-based attempt counter
|
||||
type Attempt = u32;
|
||||
|
|
Loading…
Reference in New Issue