track recent delta and build histogram

This commit is contained in:
GroovieGermanikus 2024-05-17 21:21:46 +02:00
parent 6230725881
commit 31b575c25b
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
5 changed files with 34 additions and 6 deletions

1
Cargo.lock generated
View File

@ -5312,6 +5312,7 @@ dependencies = [
"git-version",
"hostname",
"hyper",
"itertools 0.10.5",
"lazy_static",
"log",
"lz4_flex",

View File

@ -48,6 +48,7 @@ tracing = "0.1.40"
tracing-subscriber = { version = "0.2.15", features = ["env-filter"] }
lz4_flex = { version = "0.11" }
async-stream = "0.3.5"
itertools = "0.10.5"
[build-dependencies]
anyhow = { workspace = true }

View File

@ -59,7 +59,9 @@ async fn mainnet_traffic(grpc_channel: UnboundedSender<Message>) {
let sizes = vec![
0, 8, 8, 165, 165, 165, 165, 11099, 11099, 11099, 11099, 11099, 11099,
];
const target_bytes_total: usize = 10_000_000;
// 10MB -> stream buffer size peaks at 30
// 30MB -> stream buffer size peaks at 10000th and more
const TARGET_BYTES_TOTAL: usize = 30_000_000;
let mut bytes_total = 0;
let mut requested_sizes: Vec<usize> = Vec::new();
@ -67,7 +69,7 @@ async fn mainnet_traffic(grpc_channel: UnboundedSender<Message>) {
for i in 0..99_999_999 {
let data_size = sizes[i % sizes.len()];
if bytes_total + data_size > target_bytes_total {
if bytes_total + data_size > TARGET_BYTES_TOTAL {
break;
}

View File

@ -1,3 +1,4 @@
use std::collections::VecDeque;
use crate::THROTTLE_ACCOUNT_LOGGING;
use async_stream::__private::AsyncStream;
use async_stream::stream;
@ -5,7 +6,8 @@ use futures::{Stream, StreamExt, TryStreamExt};
use log::{debug, warn};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::SystemTime;
use std::time::{SystemTime};
use itertools::Itertools;
use tokio::sync::mpsc::Receiver;
use {
crate::{
@ -63,6 +65,7 @@ use {
},
},
};
use crate::histogram_stats_calculation::calculate_percentiles;
#[derive(Debug, Clone)]
pub struct MessageAccountInfo {
@ -1351,10 +1354,30 @@ impl Geyser for GrpcService {
let stream_tx_foo = stream_tx.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(20)).await;
info!("client #{id}: stream_tx fill: {}", stream_tx_foo.max_capacity() - stream_tx_foo.capacity());
const SIZE: usize = 1000;
let mut ring_buffer = VecDeque::with_capacity(SIZE);
let mut max_delta = 0;
for i in 0.. {
tokio::time::sleep(Duration::from_millis(10)).await;
let delta = stream_tx_foo.max_capacity() - stream_tx_foo.capacity();
max_delta = max_delta.max(delta);
// info!("client #{id}: stream_tx fill: {}", delta);
ring_buffer.truncate(SIZE - 1);
ring_buffer.push_front(delta);
info!("client #{id}: stream_tx fill: {} max={}", delta, max_delta);
// info!("client #{id}: stream_tx fill: {:?}", ring_buffer);
if ring_buffer.len() == SIZE && i % 100 == 0 {
let vec_float = ring_buffer.iter().sorted().map(|x| *x as f64).collect_vec();
let percentiles = calculate_percentiles(&vec_float);
info!("client #{id}: stream_tx fill percentiles: {}", percentiles);
}
}
});

View File

@ -8,6 +8,7 @@ pub mod grpc;
pub mod plugin;
pub mod prom;
pub mod version;
pub mod histogram_stats_calculation;
// log every X account write
pub const THROTTLE_ACCOUNT_LOGGING: u64 = 50;