track stream size

This commit is contained in:
GroovieGermanikus 2024-05-17 19:47:33 +02:00
parent c797d19515
commit a10518486e
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 38 additions and 1 deletions

View File

@ -1262,7 +1262,7 @@ impl Geyser for GrpcService {
)
.expect("empty filter");
let snapshot_rx = self.snapshot_rx.lock().await.take();
let (stream_tx, mut stream_rx) = mpsc::channel(if snapshot_rx.is_some() {
let (stream_tx, stream_rx) = mpsc::channel(if snapshot_rx.is_some() {
self.config.snapshot_client_channel_capacity
} else {
self.config.channel_capacity
@ -1349,6 +1349,15 @@ impl Geyser for GrpcService {
}
});
let stream_tx_foo = stream_tx.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(20)).await;
info!("client #{id}: stream_tx fill: {}", stream_tx_foo.max_capacity() - stream_tx_foo.capacity());
}
});
tokio::spawn(Self::client_loop(
id,
filter,
@ -1362,6 +1371,34 @@ impl Geyser for GrpcService {
},
));
// tokio::spawn(async move {
// while let Some(Ok(message)) = stream_rx.recv().await {
// match message.update_oneof.as_ref().unwrap() {
// UpdateOneof::Account(update) => {
// if let Some(ref account_info) = update.account {
// if account_info.write_version % THROTTLE_ACCOUNT_LOGGING == 0 {
// let now = SystemTime::now();
// let since_the_epoch = now
// .duration_since(SystemTime::UNIX_EPOCH)
// .expect("Time went backwards");
//
// warn!("DRAIN stream but do not send to client");
//
// info!("account update inspect before sending to grpc stream: write_version={};timestamp_us={};slot={}",
// account_info.write_version, since_the_epoch.as_micros(), update.slot);
// }
// }
// }
// _ => {}
// }
// }
// });
//
// let (dummy_stream_tx, dummy_stream_rx) = mpsc::channel(10);
//
// Ok(Response::new(TimeTaggingReceiverStream::new(dummy_stream_rx)))
Ok(Response::new(TimeTaggingReceiverStream::new(stream_rx)))
}