throttle logging

This commit is contained in:
GroovieGermanikus 2024-05-17 10:51:52 +02:00
parent fe143cdbaa
commit 97ae2eba49
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
3 changed files with 24 additions and 12 deletions

View File

@ -58,6 +58,7 @@ use {
}, },
}, },
}; };
use crate::THROTTLE_ACCOUNT_LOGGING;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct MessageAccountInfo { pub struct MessageAccountInfo {
@ -1184,10 +1185,12 @@ impl GrpcService {
for message in messages.iter() { for message in messages.iter() {
for message in filter.get_update(message, Some(commitment)) { for message in filter.get_update(message, Some(commitment)) {
match message.update_oneof.as_ref().unwrap() { match message.update_oneof.as_ref().unwrap() {
UpdateOneof::Account(update) => { UpdateOneof::Account(update) => {
// message is put in bounded queue which gets consumed by GRPC receiver // message is put in bounded queue which gets consumed by GRPC receiver
if let Some(ref account_info) = update.account { if let Some(ref account_info) = update.account {
if account_info.write_version % THROTTLE_ACCOUNT_LOGGING == 0 {
let now = SystemTime::now(); let now = SystemTime::now();
let since_the_epoch = now.duration_since(SystemTime::UNIX_EPOCH).expect("Time went backwards"); let since_the_epoch = now.duration_since(SystemTime::UNIX_EPOCH).expect("Time went backwards");
@ -1195,6 +1198,7 @@ impl GrpcService {
account_info.write_version, since_the_epoch.as_micros(), update.slot); account_info.write_version, since_the_epoch.as_micros(), update.slot);
} }
} }
}
_ => {} _ => {}
} }
@ -1458,12 +1462,14 @@ fn spawn_plugger_mpcs(
UpdateOneof::Account(update) => { UpdateOneof::Account(update) => {
if let Some(ref account_info) = update.account { if let Some(ref account_info) = update.account {
if account_info.write_version % THROTTLE_ACCOUNT_LOGGING == 0 {
let now = SystemTime::now(); let now = SystemTime::now();
let since_the_epoch = now.duration_since(SystemTime::UNIX_EPOCH).expect("Time went backwards"); let since_the_epoch = now.duration_since(SystemTime::UNIX_EPOCH).expect("Time went backwards");
info!("account update inspect before sending to grpc stream: write_version={};timestamp_us={};slot={}", info!("account update inspect before sending to grpc stream: write_version={};timestamp_us={};slot={}",
account_info.write_version, since_the_epoch.as_micros(), update.slot); account_info.write_version, since_the_epoch.as_micros(), update.slot);
} }
}
// message is put in bounded queue which gets consumed by GRPC receiver // message is put in bounded queue which gets consumed by GRPC receiver
info!("client: inspect message last - {}", update.slot); info!("client: inspect message last - {}", update.slot);

View File

@ -8,3 +8,6 @@ pub mod grpc;
pub mod plugin; pub mod plugin;
pub mod prom; pub mod prom;
pub mod version; pub mod version;
// log every X account write
pub const THROTTLE_ACCOUNT_LOGGING: u64 = 50;

View File

@ -30,6 +30,7 @@ use {
}, },
}; };
use crate::grpc::{MessageAccount, MessageAccountInfo}; use crate::grpc::{MessageAccount, MessageAccountInfo};
use crate::THROTTLE_ACCOUNT_LOGGING;
#[derive(Debug)] #[derive(Debug)]
pub struct PluginInner { pub struct PluginInner {
@ -170,11 +171,13 @@ impl GeyserPlugin for Plugin {
// Message::Account((account, slot, is_startup).into()) // Message::Account((account, slot, is_startup).into())
// }; // };
if account.write_version % THROTTLE_ACCOUNT_LOGGING == 0 {
let now = SystemTime::now(); let now = SystemTime::now();
let since_the_epoch = now.duration_since(SystemTime::UNIX_EPOCH).expect("Time went backwards"); let since_the_epoch = now.duration_since(SystemTime::UNIX_EPOCH).expect("Time went backwards");
info!("account update inspect from geyser: write_version={};timestamp_us={};slot={}", info!("account update inspect from geyser: write_version={};timestamp_us={};slot={}",
account.write_version, since_the_epoch.as_micros(), slot); account.write_version, since_the_epoch.as_micros(), slot);
}
let message = Message::Account((account, slot, is_startup).into()); let message = Message::Account((account, slot, is_startup).into());