Compare commits
8 Commits
929b20edee
...
1e83353f37
Author | SHA1 | Date |
---|---|---|
GroovieGermanikus | 1e83353f37 | |
GroovieGermanikus | 2d65f2d688 | |
GroovieGermanikus | 28c3041def | |
GroovieGermanikus | eb964e3492 | |
GroovieGermanikus | edd3a1a543 | |
GroovieGermanikus | 1393527259 | |
GroovieGermanikus | e09e5e2d36 | |
GroovieGermanikus | f280f903d7 |
|
@ -1,103 +1,25 @@
|
|||
use std::collections::{HashMap, VecDeque};
|
||||
use futures::{Stream, StreamExt};
|
||||
use log::info;
|
||||
use solana_sdk::clock::Slot;
|
||||
use solana_sdk::clock::{Slot, UnixTimestamp};
|
||||
use solana_sdk::commitment_config::CommitmentConfig;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use std::env;
|
||||
use std::pin::pin;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use itertools::Itertools;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
|
||||
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
|
||||
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
|
||||
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
|
||||
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message};
|
||||
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, histogram_percentiles, Message};
|
||||
use tokio::time::{sleep, Duration};
|
||||
use tracing::warn;
|
||||
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
|
||||
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
|
||||
use yellowstone_grpc_proto::prost::Message as _;
|
||||
|
||||
fn start_example_account_consumer(mut multiplex_channel: Receiver<Message>) {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match multiplex_channel.recv().await {
|
||||
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
|
||||
Some(UpdateOneof::Account(update)) => {
|
||||
let account_info = update.account.unwrap();
|
||||
let account_pk = Pubkey::try_from(account_info.pubkey).unwrap();
|
||||
info!(
|
||||
"got account update (green)!!! {} - {:?} - {} bytes",
|
||||
update.slot,
|
||||
account_pk,
|
||||
account_info.data.len()
|
||||
);
|
||||
let bytes: [u8; 32] = account_pk.to_bytes();
|
||||
}
|
||||
None => {}
|
||||
_ => {}
|
||||
},
|
||||
None => {
|
||||
log::warn!("multiplexer channel closed - aborting");
|
||||
return;
|
||||
}
|
||||
Some(Message::Connecting(_)) => {}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn start_example_blockmini_consumer(
|
||||
multiplex_stream: impl Stream<Item = BlockMini> + Send + 'static,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
let mut blockmeta_stream = pin!(multiplex_stream);
|
||||
while let Some(mini) = blockmeta_stream.next().await {
|
||||
info!(
|
||||
"emitted block mini #{}@{} with {} bytes from multiplexer",
|
||||
mini.slot, mini.commitment_config.commitment, mini.blocksize
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub struct BlockMini {
|
||||
pub blocksize: usize,
|
||||
pub slot: Slot,
|
||||
pub commitment_config: CommitmentConfig,
|
||||
}
|
||||
|
||||
struct BlockMiniExtractor(CommitmentConfig);
|
||||
|
||||
impl FromYellowstoneExtractor for BlockMiniExtractor {
|
||||
type Target = BlockMini;
|
||||
fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)> {
|
||||
match update.update_oneof {
|
||||
Some(UpdateOneof::Block(update_block_message)) => {
|
||||
let blocksize = update_block_message.encoded_len();
|
||||
let slot = update_block_message.slot;
|
||||
let mini = BlockMini {
|
||||
blocksize,
|
||||
slot,
|
||||
commitment_config: self.0,
|
||||
};
|
||||
Some((slot, mini))
|
||||
}
|
||||
Some(UpdateOneof::BlockMeta(update_blockmeta_message)) => {
|
||||
let blocksize = update_blockmeta_message.encoded_len();
|
||||
let slot = update_blockmeta_message.slot;
|
||||
let mini = BlockMini {
|
||||
blocksize,
|
||||
slot,
|
||||
commitment_config: self.0,
|
||||
};
|
||||
Some((slot, mini))
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
pub async fn main() {
|
||||
// RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace
|
||||
|
@ -124,7 +46,7 @@ pub async fn main() {
|
|||
|
||||
info!("Write Block stream..");
|
||||
|
||||
let (autoconnect_tx, accounts_rx) = tokio::sync::mpsc::channel(10);
|
||||
let (autoconnect_tx, geyser_messages_rx) = tokio::sync::mpsc::channel(10);
|
||||
let (_exit, exit_notify) = tokio::sync::broadcast::channel(1);
|
||||
|
||||
let _accounts_task = create_geyser_autoconnection_task_with_mpsc(
|
||||
|
@ -134,8 +56,120 @@ pub async fn main() {
|
|||
exit_notify.resubscribe(),
|
||||
);
|
||||
|
||||
start_example_account_consumer(accounts_rx);
|
||||
let _blocksmeta_task = create_geyser_autoconnection_task_with_mpsc(
|
||||
config.clone(),
|
||||
GeyserFilter(CommitmentConfig::processed()).blocks_meta(),
|
||||
autoconnect_tx.clone(),
|
||||
exit_notify.resubscribe(),
|
||||
);
|
||||
|
||||
start_tracking_account_consumer(geyser_messages_rx);
|
||||
|
||||
// "infinite" sleep
|
||||
sleep(Duration::from_secs(1800)).await;
|
||||
}
|
||||
|
||||
|
||||
fn start_tracking_account_consumer(mut geyser_messages_rx: Receiver<Message>) {
|
||||
const RECENT_SLOTS_LIMIT: usize = 30;
|
||||
|
||||
tokio::spawn(async move {
|
||||
|
||||
let mut bytes_per_slot = HashMap::<Slot, usize>::new();
|
||||
let mut updates_per_slot = HashMap::<Slot, usize>::new();
|
||||
let mut count_updates_per_slot_account = HashMap::<(Slot, Pubkey), usize>::new();
|
||||
// slot written by account update
|
||||
let mut current_slot: Slot = 0;
|
||||
// slot from slot stream
|
||||
let mut actual_slot: Slot = 0;
|
||||
|
||||
// seconds since epoch
|
||||
let mut block_time_per_slot = HashMap::<Slot, UnixTimestamp>::new();
|
||||
let mut recent_slot_deltas: VecDeque<i64> = VecDeque::with_capacity(RECENT_SLOTS_LIMIT);
|
||||
|
||||
loop {
|
||||
match geyser_messages_rx.recv().await {
|
||||
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
|
||||
Some(UpdateOneof::Account(update)) => {
|
||||
let account_info = update.account.unwrap();
|
||||
let account_pk = Pubkey::try_from(account_info.pubkey).unwrap();
|
||||
// note: slot is referencing the block that is just built while the slot number reported from BlockMeta/Slot uses the slot after the block is built
|
||||
let slot = update.slot;
|
||||
let account_receive_time = get_epoch_sec();
|
||||
|
||||
if actual_slot != slot {
|
||||
if actual_slot != 0 {
|
||||
// the perfect is value "-1"
|
||||
recent_slot_deltas.push_back((actual_slot as i64) - (slot as i64));
|
||||
if recent_slot_deltas.len() > RECENT_SLOTS_LIMIT {
|
||||
recent_slot_deltas.pop_front();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bytes_per_slot.entry(slot)
|
||||
.and_modify(|entry| *entry += account_info.data.len())
|
||||
.or_insert(account_info.data.len());
|
||||
updates_per_slot.entry(slot)
|
||||
.and_modify(|entry| *entry += 1)
|
||||
.or_insert(1);
|
||||
count_updates_per_slot_account.entry((slot, account_pk))
|
||||
.and_modify(|entry| *entry += 1)
|
||||
.or_insert(1);
|
||||
|
||||
if current_slot != slot {
|
||||
info!("Slot: {}", slot);
|
||||
if current_slot != 0 {
|
||||
info!("Slot: {} - {:.2} MiB", slot, *bytes_per_slot.get(¤t_slot).unwrap() as f64 / 1024.0 / 1024.0 );
|
||||
|
||||
info!("Slot: {} - Updates: {}", slot, updates_per_slot.get(¤t_slot).unwrap());
|
||||
|
||||
let counters = count_updates_per_slot_account.iter()
|
||||
.filter(|((slot, _pubkey), _)| slot == ¤t_slot)
|
||||
.map(|((_slot, _pubkey), count)| *count as f64)
|
||||
.sorted_by(|a, b| a.partial_cmp(b).unwrap())
|
||||
.collect_vec();
|
||||
let count_histogram = histogram_percentiles::calculate_percentiles(&counters);
|
||||
info!("Count histogram: {}", count_histogram);
|
||||
|
||||
|
||||
let deltas = recent_slot_deltas.iter()
|
||||
.map(|x| *x as f64)
|
||||
.sorted_by(|a, b| a.partial_cmp(b).unwrap())
|
||||
.collect_vec();
|
||||
let deltas_histogram = histogram_percentiles::calculate_percentiles(&deltas);
|
||||
info!("Deltas slots list: {:?}", recent_slot_deltas);
|
||||
info!("Deltas histogram: {}", deltas_histogram);
|
||||
|
||||
if let Some(actual_block_time) = block_time_per_slot.get(¤t_slot) {
|
||||
info!("Block time for slot {}: delta {} seconds", current_slot, account_receive_time - *actual_block_time);
|
||||
}
|
||||
|
||||
}
|
||||
current_slot = slot;
|
||||
}
|
||||
|
||||
}
|
||||
Some(UpdateOneof::BlockMeta(update)) => {
|
||||
actual_slot = update.slot;
|
||||
block_time_per_slot.insert(actual_slot, update.block_time.unwrap().timestamp);
|
||||
}
|
||||
None => {}
|
||||
_ => {}
|
||||
},
|
||||
None => {
|
||||
log::warn!("multiplexer channel closed - aborting");
|
||||
return;
|
||||
}
|
||||
Some(Message::Connecting(_)) => {}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn get_epoch_sec() -> UnixTimestamp {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs() as UnixTimestamp
|
||||
}
|
||||
|
|
|
@ -1,3 +1,6 @@
|
|||
use crate::yellowstone_grpc_util::{
|
||||
connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig,
|
||||
};
|
||||
use crate::{Attempt, GrpcSourceConfig, Message};
|
||||
use async_stream::stream;
|
||||
use futures::{Stream, StreamExt};
|
||||
|
@ -8,7 +11,6 @@ use tokio::time::{sleep, timeout};
|
|||
use yellowstone_grpc_client::GeyserGrpcClientResult;
|
||||
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
|
||||
use yellowstone_grpc_proto::tonic::Status;
|
||||
use crate::yellowstone_grpc_util::{connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig};
|
||||
|
||||
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
|
||||
NotConnected(Attempt),
|
||||
|
|
|
@ -0,0 +1,244 @@
|
|||
use itertools::Itertools;
|
||||
use std::fmt::Display;
|
||||
use std::iter::zip;
|
||||
|
||||
// #[derive(Clone, Copy, Debug, Default)]
|
||||
pub struct Point {
|
||||
pub priority: f64,
|
||||
pub value: f64,
|
||||
}
|
||||
|
||||
impl From<(f64, f64)> for Point {
|
||||
fn from((priority, cu_consumed): (f64, f64)) -> Self {
|
||||
Point {
|
||||
priority,
|
||||
value: cu_consumed,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// #[derive(Clone, Debug, Eq, PartialEq, Hash)]
|
||||
pub struct HistValue {
|
||||
pub percentile: f32,
|
||||
pub value: f64,
|
||||
}
|
||||
|
||||
/// `quantile` function is the same as the median if q=50, the same as the minimum if q=0 and the same as the maximum if q=100.
|
||||
|
||||
pub fn calculate_percentiles(input: &[f64]) -> Percentiles {
|
||||
if input.is_empty() {
|
||||
// note: percentile for empty array is undefined
|
||||
return Percentiles {
|
||||
v: vec![],
|
||||
p: vec![],
|
||||
};
|
||||
}
|
||||
|
||||
let is_monotonic = input.windows(2).all(|w| w[0] <= w[1]);
|
||||
assert!(is_monotonic, "array of values must be sorted");
|
||||
|
||||
let p_step = 5;
|
||||
let i_percentiles = (0..=100).step_by(p_step).collect_vec();
|
||||
|
||||
let mut bucket_values = Vec::with_capacity(i_percentiles.len());
|
||||
let mut percentiles = Vec::with_capacity(i_percentiles.len());
|
||||
for p in i_percentiles {
|
||||
let value = {
|
||||
let index = input.len() * p / 100;
|
||||
let cap_index = index.min(input.len() - 1);
|
||||
input[cap_index]
|
||||
};
|
||||
|
||||
bucket_values.push(value);
|
||||
percentiles.push(p as f32 / 100.0);
|
||||
}
|
||||
|
||||
Percentiles {
|
||||
v: bucket_values,
|
||||
p: percentiles,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn calculate_cummulative(values: &[Point]) -> PercentilesCummulative {
|
||||
if values.is_empty() {
|
||||
// note: percentile for empty array is undefined
|
||||
return PercentilesCummulative {
|
||||
bucket_values: vec![],
|
||||
percentiles: vec![],
|
||||
};
|
||||
}
|
||||
|
||||
let is_monotonic = values.windows(2).all(|w| w[0].priority <= w[1].priority);
|
||||
assert!(is_monotonic, "array of values must be sorted");
|
||||
|
||||
let value_sum: f64 = values.iter().map(|x| x.value).sum();
|
||||
let mut agg: f64 = values[0].value;
|
||||
let mut index = 0;
|
||||
let p_step = 5;
|
||||
|
||||
let percentiles = (0..=100).step_by(p_step).map(|p| p as f64).collect_vec();
|
||||
|
||||
let dist = percentiles
|
||||
.iter()
|
||||
.map(|percentile| {
|
||||
while agg < (value_sum * *percentile) / 100.0 {
|
||||
index += 1;
|
||||
agg += values[index].value;
|
||||
}
|
||||
let priority = values[index].priority;
|
||||
HistValue {
|
||||
percentile: *percentile as f32,
|
||||
value: priority,
|
||||
}
|
||||
})
|
||||
.collect_vec();
|
||||
|
||||
PercentilesCummulative {
|
||||
bucket_values: dist.iter().map(|hv| hv.value).collect_vec(),
|
||||
percentiles: dist.iter().map(|hv| hv.percentile / 100.0).collect_vec(),
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Percentiles {
|
||||
// value
|
||||
pub v: Vec<f64>,
|
||||
// percentile in range 0.0..1.0
|
||||
pub p: Vec<f32>,
|
||||
}
|
||||
|
||||
impl Display for Percentiles {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
for i in 0..self.v.len() {
|
||||
write!(f, "p{}=>{} ", self.p[i] * 100.0, self.v[i])?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl Percentiles {
|
||||
fn get_bucket_value(&self, percentile: f32) -> Option<f64> {
|
||||
zip(&self.p, &self.v)
|
||||
.find(|(&p, _v)| p == percentile)
|
||||
.map(|(_p, &v)| v)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PercentilesCummulative {
|
||||
pub bucket_values: Vec<f64>,
|
||||
pub percentiles: Vec<f32>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl PercentilesCummulative {
|
||||
fn get_bucket_value(&self, percentile: f32) -> Option<f64> {
|
||||
zip(&self.percentiles, &self.bucket_values)
|
||||
.find(|(&p, _cu)| p == percentile)
|
||||
.map(|(_p, &cu)| cu)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_calculate_percentiles() {
|
||||
let mut values = vec![2.0, 4.0, 5.0, 3.0, 1.0];
|
||||
values.sort_by_key(|&x| (x * 100.0) as i64);
|
||||
let percentiles = calculate_percentiles(&values).v;
|
||||
assert_eq!(percentiles[0], 1.0);
|
||||
assert_eq!(percentiles[10], 3.0);
|
||||
assert_eq!(percentiles[15], 4.0);
|
||||
assert_eq!(percentiles[18], 5.0);
|
||||
assert_eq!(percentiles[20], 5.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_calculate_percentiles_by_cu() {
|
||||
// total of 20000 CU where consumed
|
||||
let values = vec![Point::from((100.0, 10000.0)), Point::from((200.0, 10000.0))];
|
||||
let PercentilesCummulative {
|
||||
bucket_values: by_cu,
|
||||
percentiles: by_cu_percentiles,
|
||||
..
|
||||
} = calculate_cummulative(&values);
|
||||
assert_eq!(by_cu_percentiles[10], 0.5);
|
||||
assert_eq!(by_cu[10], 100.0); // need more than 100 to beat 50% of the CU
|
||||
assert_eq!(by_cu[11], 200.0); // need more than 200 to beat 55% of the CU
|
||||
assert_eq!(by_cu[20], 200.0); // need more than 200 to beat 100% of the CU
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_array() {
|
||||
let values = vec![];
|
||||
let percentiles = calculate_percentiles(&values).v;
|
||||
// note: this is controversal
|
||||
assert!(percentiles.is_empty());
|
||||
}
|
||||
#[test]
|
||||
fn test_zeros() {
|
||||
let values = vec![Point::from((0.0, 0.0)), Point::from((0.0, 0.0))];
|
||||
let percentiles = calculate_cummulative(&values).bucket_values;
|
||||
assert_eq!(percentiles[0], 0.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_statisticshowto() {
|
||||
let values = vec![30.0, 33.0, 43.0, 53.0, 56.0, 67.0, 68.0, 72.0];
|
||||
let percentiles = calculate_percentiles(&values);
|
||||
assert_eq!(percentiles.v[5], 43.0);
|
||||
assert_eq!(percentiles.p[5], 0.25);
|
||||
assert_eq!(percentiles.get_bucket_value(0.25), Some(43.0));
|
||||
|
||||
let values = vec![
|
||||
Point::from((30.0, 1.0)),
|
||||
Point::from((33.0, 2.0)),
|
||||
Point::from((43.0, 3.0)),
|
||||
Point::from((53.0, 4.0)),
|
||||
Point::from((56.0, 5.0)),
|
||||
Point::from((67.0, 6.0)),
|
||||
Point::from((68.0, 7.0)),
|
||||
Point::from((72.0, 8.0)),
|
||||
];
|
||||
let percentiles = calculate_cummulative(&values);
|
||||
assert_eq!(percentiles.percentiles[20], 1.0);
|
||||
assert_eq!(percentiles.bucket_values[20], 72.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simple_non_integer_index() {
|
||||
// Messwerte: 3 – 5 – 5 – 6 – 7 – 7 – 8 – 10 – 10
|
||||
// In diesem Fall lautet es also 5.
|
||||
let values = vec![3.0, 5.0, 5.0, 6.0, 7.0, 7.0, 8.0, 10.0, 10.0];
|
||||
|
||||
let percentiles = calculate_percentiles(&values);
|
||||
assert_eq!(percentiles.p[4], 0.20);
|
||||
assert_eq!(percentiles.v[5], 5.0);
|
||||
|
||||
let values = vec![
|
||||
Point::from((3.0, 1.0)),
|
||||
Point::from((5.0, 2.0)),
|
||||
Point::from((5.0, 3.0)),
|
||||
Point::from((6.0, 4.0)),
|
||||
Point::from((7.0, 5.0)),
|
||||
Point::from((7.0, 6.0)),
|
||||
Point::from((8.0, 7.0)),
|
||||
Point::from((10.0, 8.0)),
|
||||
Point::from((10.0, 9.0)),
|
||||
];
|
||||
let percentiles = calculate_cummulative(&values);
|
||||
assert_eq!(percentiles.percentiles[19], 0.95);
|
||||
assert_eq!(percentiles.percentiles[20], 1.0);
|
||||
assert_eq!(percentiles.bucket_values[19], 10.0);
|
||||
assert_eq!(percentiles.bucket_values[20], 10.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_large_list() {
|
||||
let values = (0..1000).map(|i| i as f64).collect_vec();
|
||||
let percentiles = calculate_percentiles(&values);
|
||||
assert_eq!(percentiles.v[19], 950.0);
|
||||
assert_eq!(percentiles.p[19], 0.95);
|
||||
}
|
||||
}
|
|
@ -11,8 +11,9 @@ pub mod channel_plugger;
|
|||
pub mod grpc_subscription_autoreconnect_streams;
|
||||
pub mod grpc_subscription_autoreconnect_tasks;
|
||||
pub mod grpcmultiplex_fastestwins;
|
||||
pub mod yellowstone_grpc_util;
|
||||
mod obfuscate;
|
||||
pub mod histogram_percentiles;
|
||||
pub mod yellowstone_grpc_util;
|
||||
|
||||
// 1-based attempt counter
|
||||
type Attempt = u32;
|
||||
|
|
Loading…
Reference in New Issue