From 57104022c5e1f6570bd682050c9960e4edb83c8a Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Fri, 17 May 2024 16:41:06 +0200 Subject: [PATCH] parse for time tagged log files --- examples/parse_yellowstone_timetagged_log.rs | 159 +++++++++++++++++++ 1 file changed, 159 insertions(+) create mode 100644 examples/parse_yellowstone_timetagged_log.rs diff --git a/examples/parse_yellowstone_timetagged_log.rs b/examples/parse_yellowstone_timetagged_log.rs new file mode 100644 index 0000000..1a8183c --- /dev/null +++ b/examples/parse_yellowstone_timetagged_log.rs @@ -0,0 +1,159 @@ +use std::collections::HashMap; +use std::fs::File; +use std::io; +use std::io::BufRead; +use std::path::PathBuf; +use clap::Parser; +use regex::Regex; + + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +pub struct Args { + #[arg(long)] + pub log_file: String, +} + + +pub fn main() { + let Args { log_file } = Args::parse(); + println!("Reading log file: {}", log_file); + + let log_file = PathBuf::from(log_file); + + const LIMIT_LINES: usize = 10000000; + + let mut timetag_sending_to_buffer: HashMap = HashMap::new(); + let mut timetag_before_sending_grpc: HashMap = HashMap::new(); + // contains only matches from previous sets + let mut timetag_geyser: HashMap = HashMap::new(); + + let mut count_sending_to_buffer_channel = 0; + { + let file = File::open(&log_file).expect("file must exist"); + let reader = io::BufReader::new(file); + for line in reader.lines().take(LIMIT_LINES) { + let line = line.expect("must be parsable"); + // println!("-> buffer channel"); + if let Some((write_version, timestamp_us)) = parse_log_entry_sending_to_buffer_channel(line) { + count_sending_to_buffer_channel += 1; + timetag_sending_to_buffer.insert(write_version, timestamp_us); + } + } + } + + let mut count_sending_grpc = 0; + { + let file = File::open(&log_file).expect("file must exist"); + let reader = io::BufReader::new(file); + for line in reader.lines().take(LIMIT_LINES) { + let line = line.expect("must be parsable"); + // println!("-> when sending to grpc"); + if let Some((write_version, timestamp_us)) = parse_log_entry_before_sending_grpc(line) { + count_sending_grpc += 1; + timetag_before_sending_grpc.insert(write_version, timestamp_us); + } + } + } + + // THIS is by far the largest set + let mut count_at_geyser = 0; + { + let file = File::open(&log_file).expect("file must exist"); + let reader = io::BufReader::new(file); + for line in reader.lines().take(LIMIT_LINES) { + let line = line.expect("must be parsable"); + // println!("-> at geyser interface"); + if let Some((write_version, timestamp_us)) = parse_log_entry_at_geyser_interface(line) { + count_at_geyser += 1; + if timetag_sending_to_buffer.contains_key(&write_version) && timetag_before_sending_grpc.contains_key(&write_version) { + timetag_geyser.insert(write_version, timestamp_us); + } + } + } + } + + println!("Count at geyser interface: {}", count_at_geyser); + println!("Count sending to buffer channel: {}", count_sending_to_buffer_channel); + println!("Count sending to grpc: {}", count_sending_grpc); + + for (write_version, geyser_timestamp_us) in timetag_geyser { + let timestamp_sending_to_buffer = timetag_sending_to_buffer.get(&write_version).unwrap(); + let timestamp_before_sending_grpc = timetag_before_sending_grpc.get(&write_version).unwrap(); + let delta1 = timestamp_sending_to_buffer - geyser_timestamp_us; + let delta2 = timestamp_before_sending_grpc - timestamp_sending_to_buffer; + println!("Write Version: {}, geyser - {}us - buffer - {}us - grpc", write_version, delta1, delta2); + + } + +} + + +fn parse_log_entry_at_geyser_interface(log_line: String) -> Option<(u64, u64)> { + if !log_line.contains("account update inspect from geyser") { + return None; + } + + // Split the log line by ': ' to separate the prefix from the data + let parts: Vec<&str> = log_line.split(": ").collect(); + +// The second part contains the data we need + let data = parts[1]; + +// Split the data by ';' to separate the different fields + let fields: Vec<&str> = data.split(';').collect(); + +// For each field, split by '=' to separate the key from the value + let write_version: u64 = fields[0].split('=').collect::>()[1].parse().unwrap(); + let timestamp_us: u64 = fields[1].split('=').collect::>()[1].parse().unwrap(); + let _slot: u64 = fields[2].split('=').collect::>()[1].parse().unwrap(); + + Some((write_version, timestamp_us)) +} + + +fn parse_log_entry_sending_to_buffer_channel(log_line: String) -> Option<(u64, u64)> { + if !log_line.contains("sending to buffer channel") { + return None; + } + +// Split the log line by ': ' to separate the prefix from the data + let parts: Vec<&str> = log_line.split(": ").collect(); + +// The second part contains the data we need + let data = parts[1]; + +// Split the data by ';' to separate the different fields + let fields: Vec<&str> = data.split(';').collect(); + +// For each field, split by '=' to separate the key from the value + let write_version: u64 = fields[0].split('=').collect::>()[1].parse().unwrap(); + let timestamp_us: u64 = fields[1].split('=').collect::>()[1].parse().unwrap(); + let _slot: u64 = fields[2].split('=').collect::>()[1].parse().unwrap(); + + Some((write_version, timestamp_us)) +} + + +fn parse_log_entry_before_sending_grpc(log_line: String) -> Option<(u64, u64)> { + if !log_line.contains("before sending to grpc") { + return None; + } + +// Split the log line by ': ' to separate the prefix from the data + let parts: Vec<&str> = log_line.split(": ").collect(); + +// The third part contains the data we need + let data = parts[1]; + +// Split the data by ';' to separate the different fields + let fields: Vec<&str> = data.split(';').collect(); + +// For each field, split by '=' to separate the key from the value + let write_version: u64 = fields[0].split('=').collect::>()[1].parse().unwrap(); + let timestamp_us: u64 = fields[1].split('=').collect::>()[1].parse().unwrap(); + let _slot: u64 = fields[2].split('=').collect::>()[1].parse().unwrap(); + + Some((write_version, timestamp_us)) +} +