Compare commits

...

8 Commits

Author SHA1 Message Date
GroovieGermanikus 1e83353f37
document on slot delta 2024-04-29 15:34:00 +02:00
GroovieGermanikus 2d65f2d688
flip delta time 2024-04-29 15:27:41 +02:00
GroovieGermanikus 28c3041def
log blocktime 2024-04-29 15:25:46 +02:00
GroovieGermanikus eb964e3492
log deltas 2024-04-29 14:55:00 +02:00
GroovieGermanikus edd3a1a543
histogram 2024-04-29 14:54:37 +02:00
GroovieGermanikus 1393527259
calculate delta 2024-04-29 14:49:34 +02:00
GroovieGermanikus e09e5e2d36
collectors 2024-04-29 09:58:39 +02:00
GroovieGermanikus f280f903d7
cleanup 2024-04-29 09:12:53 +02:00
4 changed files with 368 additions and 87 deletions

View File

@ -1,103 +1,25 @@
use std::collections::{HashMap, VecDeque};
use futures::{Stream, StreamExt};
use log::info;
use solana_sdk::clock::Slot;
use solana_sdk::clock::{Slot, UnixTimestamp};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use std::env;
use std::pin::pin;
use std::time::{SystemTime, UNIX_EPOCH};
use itertools::Itertools;
use tokio::sync::mpsc::Receiver;
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, histogram_percentiles, Message};
use tokio::time::{sleep, Duration};
use tracing::warn;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
use yellowstone_grpc_proto::prost::Message as _;
fn start_example_account_consumer(mut multiplex_channel: Receiver<Message>) {
tokio::spawn(async move {
loop {
match multiplex_channel.recv().await {
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
Some(UpdateOneof::Account(update)) => {
let account_info = update.account.unwrap();
let account_pk = Pubkey::try_from(account_info.pubkey).unwrap();
info!(
"got account update (green)!!! {} - {:?} - {} bytes",
update.slot,
account_pk,
account_info.data.len()
);
let bytes: [u8; 32] = account_pk.to_bytes();
}
None => {}
_ => {}
},
None => {
log::warn!("multiplexer channel closed - aborting");
return;
}
Some(Message::Connecting(_)) => {}
}
}
});
}
#[allow(dead_code)]
fn start_example_blockmini_consumer(
multiplex_stream: impl Stream<Item = BlockMini> + Send + 'static,
) {
tokio::spawn(async move {
let mut blockmeta_stream = pin!(multiplex_stream);
while let Some(mini) = blockmeta_stream.next().await {
info!(
"emitted block mini #{}@{} with {} bytes from multiplexer",
mini.slot, mini.commitment_config.commitment, mini.blocksize
);
}
});
}
pub struct BlockMini {
pub blocksize: usize,
pub slot: Slot,
pub commitment_config: CommitmentConfig,
}
struct BlockMiniExtractor(CommitmentConfig);
impl FromYellowstoneExtractor for BlockMiniExtractor {
type Target = BlockMini;
fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)> {
match update.update_oneof {
Some(UpdateOneof::Block(update_block_message)) => {
let blocksize = update_block_message.encoded_len();
let slot = update_block_message.slot;
let mini = BlockMini {
blocksize,
slot,
commitment_config: self.0,
};
Some((slot, mini))
}
Some(UpdateOneof::BlockMeta(update_blockmeta_message)) => {
let blocksize = update_blockmeta_message.encoded_len();
let slot = update_blockmeta_message.slot;
let mini = BlockMini {
blocksize,
slot,
commitment_config: self.0,
};
Some((slot, mini))
}
_ => None,
}
}
}
#[tokio::main]
pub async fn main() {
// RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace
@ -124,7 +46,7 @@ pub async fn main() {
info!("Write Block stream..");
let (autoconnect_tx, accounts_rx) = tokio::sync::mpsc::channel(10);
let (autoconnect_tx, geyser_messages_rx) = tokio::sync::mpsc::channel(10);
let (_exit, exit_notify) = tokio::sync::broadcast::channel(1);
let _accounts_task = create_geyser_autoconnection_task_with_mpsc(
@ -134,8 +56,120 @@ pub async fn main() {
exit_notify.resubscribe(),
);
start_example_account_consumer(accounts_rx);
let _blocksmeta_task = create_geyser_autoconnection_task_with_mpsc(
config.clone(),
GeyserFilter(CommitmentConfig::processed()).blocks_meta(),
autoconnect_tx.clone(),
exit_notify.resubscribe(),
);
start_tracking_account_consumer(geyser_messages_rx);
// "infinite" sleep
sleep(Duration::from_secs(1800)).await;
}
fn start_tracking_account_consumer(mut geyser_messages_rx: Receiver<Message>) {
const RECENT_SLOTS_LIMIT: usize = 30;
tokio::spawn(async move {
let mut bytes_per_slot = HashMap::<Slot, usize>::new();
let mut updates_per_slot = HashMap::<Slot, usize>::new();
let mut count_updates_per_slot_account = HashMap::<(Slot, Pubkey), usize>::new();
// slot written by account update
let mut current_slot: Slot = 0;
// slot from slot stream
let mut actual_slot: Slot = 0;
// seconds since epoch
let mut block_time_per_slot = HashMap::<Slot, UnixTimestamp>::new();
let mut recent_slot_deltas: VecDeque<i64> = VecDeque::with_capacity(RECENT_SLOTS_LIMIT);
loop {
match geyser_messages_rx.recv().await {
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
Some(UpdateOneof::Account(update)) => {
let account_info = update.account.unwrap();
let account_pk = Pubkey::try_from(account_info.pubkey).unwrap();
// note: slot is referencing the block that is just built while the slot number reported from BlockMeta/Slot uses the slot after the block is built
let slot = update.slot;
let account_receive_time = get_epoch_sec();
if actual_slot != slot {
if actual_slot != 0 {
// the perfect is value "-1"
recent_slot_deltas.push_back((actual_slot as i64) - (slot as i64));
if recent_slot_deltas.len() > RECENT_SLOTS_LIMIT {
recent_slot_deltas.pop_front();
}
}
}
bytes_per_slot.entry(slot)
.and_modify(|entry| *entry += account_info.data.len())
.or_insert(account_info.data.len());
updates_per_slot.entry(slot)
.and_modify(|entry| *entry += 1)
.or_insert(1);
count_updates_per_slot_account.entry((slot, account_pk))
.and_modify(|entry| *entry += 1)
.or_insert(1);
if current_slot != slot {
info!("Slot: {}", slot);
if current_slot != 0 {
info!("Slot: {} - {:.2} MiB", slot, *bytes_per_slot.get(&current_slot).unwrap() as f64 / 1024.0 / 1024.0 );
info!("Slot: {} - Updates: {}", slot, updates_per_slot.get(&current_slot).unwrap());
let counters = count_updates_per_slot_account.iter()
.filter(|((slot, _pubkey), _)| slot == &current_slot)
.map(|((_slot, _pubkey), count)| *count as f64)
.sorted_by(|a, b| a.partial_cmp(b).unwrap())
.collect_vec();
let count_histogram = histogram_percentiles::calculate_percentiles(&counters);
info!("Count histogram: {}", count_histogram);
let deltas = recent_slot_deltas.iter()
.map(|x| *x as f64)
.sorted_by(|a, b| a.partial_cmp(b).unwrap())
.collect_vec();
let deltas_histogram = histogram_percentiles::calculate_percentiles(&deltas);
info!("Deltas slots list: {:?}", recent_slot_deltas);
info!("Deltas histogram: {}", deltas_histogram);
if let Some(actual_block_time) = block_time_per_slot.get(&current_slot) {
info!("Block time for slot {}: delta {} seconds", current_slot, account_receive_time - *actual_block_time);
}
}
current_slot = slot;
}
}
Some(UpdateOneof::BlockMeta(update)) => {
actual_slot = update.slot;
block_time_per_slot.insert(actual_slot, update.block_time.unwrap().timestamp);
}
None => {}
_ => {}
},
None => {
log::warn!("multiplexer channel closed - aborting");
return;
}
Some(Message::Connecting(_)) => {}
}
}
});
}
fn get_epoch_sec() -> UnixTimestamp {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as UnixTimestamp
}

View File

@ -1,3 +1,6 @@
use crate::yellowstone_grpc_util::{
connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig,
};
use crate::{Attempt, GrpcSourceConfig, Message};
use async_stream::stream;
use futures::{Stream, StreamExt};
@ -8,7 +11,6 @@ use tokio::time::{sleep, timeout};
use yellowstone_grpc_client::GeyserGrpcClientResult;
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
use yellowstone_grpc_proto::tonic::Status;
use crate::yellowstone_grpc_util::{connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig};
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
NotConnected(Attempt),

View File

@ -0,0 +1,244 @@
use itertools::Itertools;
use std::fmt::Display;
use std::iter::zip;
// #[derive(Clone, Copy, Debug, Default)]
pub struct Point {
pub priority: f64,
pub value: f64,
}
impl From<(f64, f64)> for Point {
fn from((priority, cu_consumed): (f64, f64)) -> Self {
Point {
priority,
value: cu_consumed,
}
}
}
// #[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct HistValue {
pub percentile: f32,
pub value: f64,
}
/// `quantile` function is the same as the median if q=50, the same as the minimum if q=0 and the same as the maximum if q=100.
pub fn calculate_percentiles(input: &[f64]) -> Percentiles {
if input.is_empty() {
// note: percentile for empty array is undefined
return Percentiles {
v: vec![],
p: vec![],
};
}
let is_monotonic = input.windows(2).all(|w| w[0] <= w[1]);
assert!(is_monotonic, "array of values must be sorted");
let p_step = 5;
let i_percentiles = (0..=100).step_by(p_step).collect_vec();
let mut bucket_values = Vec::with_capacity(i_percentiles.len());
let mut percentiles = Vec::with_capacity(i_percentiles.len());
for p in i_percentiles {
let value = {
let index = input.len() * p / 100;
let cap_index = index.min(input.len() - 1);
input[cap_index]
};
bucket_values.push(value);
percentiles.push(p as f32 / 100.0);
}
Percentiles {
v: bucket_values,
p: percentiles,
}
}
pub fn calculate_cummulative(values: &[Point]) -> PercentilesCummulative {
if values.is_empty() {
// note: percentile for empty array is undefined
return PercentilesCummulative {
bucket_values: vec![],
percentiles: vec![],
};
}
let is_monotonic = values.windows(2).all(|w| w[0].priority <= w[1].priority);
assert!(is_monotonic, "array of values must be sorted");
let value_sum: f64 = values.iter().map(|x| x.value).sum();
let mut agg: f64 = values[0].value;
let mut index = 0;
let p_step = 5;
let percentiles = (0..=100).step_by(p_step).map(|p| p as f64).collect_vec();
let dist = percentiles
.iter()
.map(|percentile| {
while agg < (value_sum * *percentile) / 100.0 {
index += 1;
agg += values[index].value;
}
let priority = values[index].priority;
HistValue {
percentile: *percentile as f32,
value: priority,
}
})
.collect_vec();
PercentilesCummulative {
bucket_values: dist.iter().map(|hv| hv.value).collect_vec(),
percentiles: dist.iter().map(|hv| hv.percentile / 100.0).collect_vec(),
}
}
pub struct Percentiles {
// value
pub v: Vec<f64>,
// percentile in range 0.0..1.0
pub p: Vec<f32>,
}
impl Display for Percentiles {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for i in 0..self.v.len() {
write!(f, "p{}=>{} ", self.p[i] * 100.0, self.v[i])?;
}
Ok(())
}
}
#[allow(dead_code)]
impl Percentiles {
fn get_bucket_value(&self, percentile: f32) -> Option<f64> {
zip(&self.p, &self.v)
.find(|(&p, _v)| p == percentile)
.map(|(_p, &v)| v)
}
}
pub struct PercentilesCummulative {
pub bucket_values: Vec<f64>,
pub percentiles: Vec<f32>,
}
#[allow(dead_code)]
impl PercentilesCummulative {
fn get_bucket_value(&self, percentile: f32) -> Option<f64> {
zip(&self.percentiles, &self.bucket_values)
.find(|(&p, _cu)| p == percentile)
.map(|(_p, &cu)| cu)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_calculate_percentiles() {
let mut values = vec![2.0, 4.0, 5.0, 3.0, 1.0];
values.sort_by_key(|&x| (x * 100.0) as i64);
let percentiles = calculate_percentiles(&values).v;
assert_eq!(percentiles[0], 1.0);
assert_eq!(percentiles[10], 3.0);
assert_eq!(percentiles[15], 4.0);
assert_eq!(percentiles[18], 5.0);
assert_eq!(percentiles[20], 5.0);
}
#[test]
fn test_calculate_percentiles_by_cu() {
// total of 20000 CU where consumed
let values = vec![Point::from((100.0, 10000.0)), Point::from((200.0, 10000.0))];
let PercentilesCummulative {
bucket_values: by_cu,
percentiles: by_cu_percentiles,
..
} = calculate_cummulative(&values);
assert_eq!(by_cu_percentiles[10], 0.5);
assert_eq!(by_cu[10], 100.0); // need more than 100 to beat 50% of the CU
assert_eq!(by_cu[11], 200.0); // need more than 200 to beat 55% of the CU
assert_eq!(by_cu[20], 200.0); // need more than 200 to beat 100% of the CU
}
#[test]
fn test_empty_array() {
let values = vec![];
let percentiles = calculate_percentiles(&values).v;
// note: this is controversal
assert!(percentiles.is_empty());
}
#[test]
fn test_zeros() {
let values = vec![Point::from((0.0, 0.0)), Point::from((0.0, 0.0))];
let percentiles = calculate_cummulative(&values).bucket_values;
assert_eq!(percentiles[0], 0.0);
}
#[test]
fn test_statisticshowto() {
let values = vec![30.0, 33.0, 43.0, 53.0, 56.0, 67.0, 68.0, 72.0];
let percentiles = calculate_percentiles(&values);
assert_eq!(percentiles.v[5], 43.0);
assert_eq!(percentiles.p[5], 0.25);
assert_eq!(percentiles.get_bucket_value(0.25), Some(43.0));
let values = vec![
Point::from((30.0, 1.0)),
Point::from((33.0, 2.0)),
Point::from((43.0, 3.0)),
Point::from((53.0, 4.0)),
Point::from((56.0, 5.0)),
Point::from((67.0, 6.0)),
Point::from((68.0, 7.0)),
Point::from((72.0, 8.0)),
];
let percentiles = calculate_cummulative(&values);
assert_eq!(percentiles.percentiles[20], 1.0);
assert_eq!(percentiles.bucket_values[20], 72.0);
}
#[test]
fn test_simple_non_integer_index() {
// Messwerte: 3 5 5 6 7 7 8 10 10
// In diesem Fall lautet es also 5.
let values = vec![3.0, 5.0, 5.0, 6.0, 7.0, 7.0, 8.0, 10.0, 10.0];
let percentiles = calculate_percentiles(&values);
assert_eq!(percentiles.p[4], 0.20);
assert_eq!(percentiles.v[5], 5.0);
let values = vec![
Point::from((3.0, 1.0)),
Point::from((5.0, 2.0)),
Point::from((5.0, 3.0)),
Point::from((6.0, 4.0)),
Point::from((7.0, 5.0)),
Point::from((7.0, 6.0)),
Point::from((8.0, 7.0)),
Point::from((10.0, 8.0)),
Point::from((10.0, 9.0)),
];
let percentiles = calculate_cummulative(&values);
assert_eq!(percentiles.percentiles[19], 0.95);
assert_eq!(percentiles.percentiles[20], 1.0);
assert_eq!(percentiles.bucket_values[19], 10.0);
assert_eq!(percentiles.bucket_values[20], 10.0);
}
#[test]
fn test_large_list() {
let values = (0..1000).map(|i| i as f64).collect_vec();
let percentiles = calculate_percentiles(&values);
assert_eq!(percentiles.v[19], 950.0);
assert_eq!(percentiles.p[19], 0.95);
}
}

View File

@ -11,8 +11,9 @@ pub mod channel_plugger;
pub mod grpc_subscription_autoreconnect_streams;
pub mod grpc_subscription_autoreconnect_tasks;
pub mod grpcmultiplex_fastestwins;
pub mod yellowstone_grpc_util;
mod obfuscate;
pub mod histogram_percentiles;
pub mod yellowstone_grpc_util;
// 1-based attempt counter
type Attempt = u32;