diff --git a/Cargo.lock b/Cargo.lock index 88174cfc7..6d01f67b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7288,9 +7288,11 @@ dependencies = [ "rand 0.8.5", "rustls", "solana-logger", + "solana-measure", "solana-metrics", "solana-perf", "solana-sdk", + "solana-transaction-metrics-tracker", "thiserror", "tokio", "x509-parser", @@ -7457,6 +7459,20 @@ dependencies = [ "solana-version", ] +[[package]] +name = "solana-transaction-metrics-tracker" +version = "2.0.0" +dependencies = [ + "Inflector", + "base64 0.22.0", + "bincode", + "lazy_static", + "log", + "rand 0.8.5", + "solana-perf", + "solana-sdk", +] + [[package]] name = "solana-transaction-status" version = "2.0.0" diff --git a/Cargo.toml b/Cargo.toml index d73b16c8e..0f85ef090 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,6 +107,7 @@ members = [ "tokens", "tpu-client", "transaction-dos", + "transaction-metrics-tracker", "transaction-status", "turbine", "udp-client", @@ -380,6 +381,7 @@ solana-test-validator = { path = "test-validator", version = "=2.0.0" } solana-thin-client = { path = "thin-client", version = "=2.0.0" } solana-tpu-client = { path = "tpu-client", version = "=2.0.0", default-features = false } solana-transaction-status = { path = "transaction-status", version = "=2.0.0" } +solana-transaction-metrics-tracker = { path = "transaction-metrics-tracker", version = "=2.0.0" } solana-turbine = { path = "turbine", version = "=2.0.0" } solana-udp-client = { path = "udp-client", version = "=2.0.0" } solana-version = { path = "version", version = "=2.0.0" } diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 8e6592dc2..3d383111c 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6333,9 +6333,11 @@ dependencies = [ "quinn-proto", "rand 0.8.5", "rustls", + "solana-measure", "solana-metrics", "solana-perf", "solana-sdk", + "solana-transaction-metrics-tracker", "thiserror", "tokio", "x509-parser", @@ -6437,6 +6439,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "solana-transaction-metrics-tracker" +version = "2.0.0" +dependencies = [ + "Inflector", + "base64 0.22.0", + "bincode", + "lazy_static", + "log", + "rand 0.8.5", + "solana-perf", + "solana-sdk", +] + [[package]] name = "solana-transaction-status" version = "2.0.0" diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index faea9ab47..8300b5721 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -33,6 +33,8 @@ bitflags! { /// the packet is built. /// This field can be removed when the above feature gate is adopted by mainnet-beta. const ROUND_COMPUTE_UNIT_PRICE = 0b0010_0000; + /// For tracking performance + const PERF_TRACK_PACKET = 0b0100_0000; } } @@ -228,6 +230,12 @@ impl Meta { self.flags.set(PacketFlags::TRACER_PACKET, is_tracer); } + #[inline] + pub fn set_track_performance(&mut self, is_performance_track: bool) { + self.flags + .set(PacketFlags::PERF_TRACK_PACKET, is_performance_track); + } + #[inline] pub fn set_simple_vote(&mut self, is_simple_vote: bool) { self.flags.set(PacketFlags::SIMPLE_VOTE_TX, is_simple_vote); @@ -261,6 +269,11 @@ impl Meta { self.flags.contains(PacketFlags::TRACER_PACKET) } + #[inline] + pub fn is_perf_track_packet(&self) -> bool { + self.flags.contains(PacketFlags::PERF_TRACK_PACKET) + } + #[inline] pub fn round_compute_unit_price(&self) -> bool { self.flags.contains(PacketFlags::ROUND_COMPUTE_UNIT_PRICE) diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 8e1eb12df..55d0030e7 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -26,9 +26,11 @@ quinn = { workspace = true } quinn-proto = { workspace = true } rand = { workspace = true } rustls = { workspace = true, features = ["dangerous_configuration"] } +solana-measure = { workspace = true } solana-metrics = { workspace = true } solana-perf = { workspace = true } solana-sdk = { workspace = true } +solana-transaction-metrics-tracker = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } x509-parser = { workspace = true } diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index c49690062..feb9bd2db 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -17,6 +17,7 @@ use { quinn::{Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt}, quinn_proto::VarIntBoundsExceeded, rand::{thread_rng, Rng}, + solana_measure::measure::Measure, solana_perf::packet::{PacketBatch, PACKETS_PER_BATCH}, solana_sdk::{ packet::{Meta, PACKET_DATA_SIZE}, @@ -27,9 +28,10 @@ use { QUIC_MIN_STAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO, }, - signature::Keypair, + signature::{Keypair, Signature}, timing, }, + solana_transaction_metrics_tracker::signature_if_should_track_packet, std::{ iter::repeat_with, net::{IpAddr, SocketAddr, UdpSocket}, @@ -94,6 +96,7 @@ struct PacketChunk { struct PacketAccumulator { pub meta: Meta, pub chunks: Vec, + pub start_time: Instant, } #[derive(Copy, Clone, Debug)] @@ -646,6 +649,7 @@ async fn packet_batch_sender( trace!("enter packet_batch_sender"); let mut batch_start_time = Instant::now(); loop { + let mut packet_perf_measure: Vec<([u8; 64], std::time::Instant)> = Vec::default(); let mut packet_batch = PacketBatch::with_capacity(PACKETS_PER_BATCH); let mut total_bytes: usize = 0; @@ -665,6 +669,8 @@ async fn packet_batch_sender( || (!packet_batch.is_empty() && elapsed >= coalesce) { let len = packet_batch.len(); + track_streamer_fetch_packet_performance(&mut packet_perf_measure, &stats); + if let Err(e) = packet_sender.send(packet_batch) { stats .total_packet_batch_send_err @@ -710,6 +716,14 @@ async fn packet_batch_sender( total_bytes += packet_batch[i].meta().size; + if let Some(signature) = signature_if_should_track_packet(&packet_batch[i]) + .ok() + .flatten() + { + packet_perf_measure.push((*signature, packet_accumulator.start_time)); + // we set the PERF_TRACK_PACKET on + packet_batch[i].meta_mut().set_track_performance(true); + } stats .total_chunks_processed_by_batcher .fetch_add(num_chunks, Ordering::Relaxed); @@ -718,6 +732,32 @@ async fn packet_batch_sender( } } +fn track_streamer_fetch_packet_performance( + packet_perf_measure: &mut [([u8; 64], Instant)], + stats: &Arc, +) { + if packet_perf_measure.is_empty() { + return; + } + let mut measure = Measure::start("track_perf"); + let mut process_sampled_packets_us_hist = stats.process_sampled_packets_us_hist.lock().unwrap(); + + for (signature, start_time) in packet_perf_measure.iter() { + let duration = Instant::now().duration_since(*start_time); + debug!( + "QUIC streamer fetch stage took {duration:?} for transaction {:?}", + Signature::from(*signature) + ); + process_sampled_packets_us_hist + .increment(duration.as_micros() as u64) + .unwrap(); + } + measure.stop(); + stats + .perf_track_overhead_us + .fetch_add(measure.as_us(), Ordering::Relaxed); +} + async fn handle_connection( connection: Connection, remote_addr: SocketAddr, @@ -872,6 +912,7 @@ async fn handle_chunk( *packet_accum = Some(PacketAccumulator { meta, chunks: Vec::new(), + start_time: Instant::now(), }); } @@ -1471,6 +1512,7 @@ pub mod test { offset, end_of_chunk: size, }], + start_time: Instant::now(), }; ptk_sender.send(packet_accum).await.unwrap(); } diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index a7a08c73f..3b1b6b21a 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -16,8 +16,8 @@ use { std::{ net::UdpSocket, sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, RwLock, + atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, + Arc, Mutex, RwLock, }, thread, time::{Duration, SystemTime}, @@ -175,10 +175,19 @@ pub struct StreamStats { pub(crate) stream_load_ema: AtomicUsize, pub(crate) stream_load_ema_overflow: AtomicUsize, pub(crate) stream_load_capacity_overflow: AtomicUsize, + pub(crate) process_sampled_packets_us_hist: Mutex, + pub(crate) perf_track_overhead_us: AtomicU64, } impl StreamStats { pub fn report(&self, name: &'static str) { + let process_sampled_packets_us_hist = { + let mut metrics = self.process_sampled_packets_us_hist.lock().unwrap(); + let process_sampled_packets_us_hist = metrics.clone(); + metrics.clear(); + process_sampled_packets_us_hist + }; + datapoint_info!( name, ( @@ -425,6 +434,38 @@ impl StreamStats { self.stream_load_capacity_overflow.load(Ordering::Relaxed), i64 ), + ( + "process_sampled_packets_us_90pct", + process_sampled_packets_us_hist + .percentile(90.0) + .unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_min", + process_sampled_packets_us_hist.minimum().unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_max", + process_sampled_packets_us_hist.maximum().unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_mean", + process_sampled_packets_us_hist.mean().unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_count", + process_sampled_packets_us_hist.entries(), + i64 + ), + ( + "perf_track_overhead_us", + self.perf_track_overhead_us.swap(0, Ordering::Relaxed), + i64 + ), ); } } diff --git a/transaction-metrics-tracker/Cargo.toml b/transaction-metrics-tracker/Cargo.toml new file mode 100644 index 000000000..9bd82702a --- /dev/null +++ b/transaction-metrics-tracker/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "solana-transaction-metrics-tracker" +description = "Solana transaction metrics tracker" +documentation = "https://docs.rs/solana-transaction-metrics-tracker" +version = { workspace = true } +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +edition = { workspace = true } +publish = false + +[dependencies] +Inflector = { workspace = true } +base64 = { workspace = true } +bincode = { workspace = true } +# Update this borsh dependency to the workspace version once +lazy_static = { workspace = true } +log = { workspace = true } +rand = { workspace = true } +solana-perf = { workspace = true } +solana-sdk = { workspace = true } + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/transaction-metrics-tracker/src/lib.rs b/transaction-metrics-tracker/src/lib.rs new file mode 100644 index 000000000..2baec195d --- /dev/null +++ b/transaction-metrics-tracker/src/lib.rs @@ -0,0 +1,157 @@ +use { + lazy_static::lazy_static, + log::*, + rand::Rng, + solana_perf::sigverify::PacketError, + solana_sdk::{packet::Packet, short_vec::decode_shortu16_len, signature::SIGNATURE_BYTES}, +}; + +// The mask is 12 bits long (1<<12 = 4096), it means the probability of matching +// the transaction is 1/4096 assuming the portion being matched is random. +lazy_static! { + static ref TXN_MASK: u16 = rand::thread_rng().gen_range(0..4096); +} + +/// Check if a transaction given its signature matches the randomly selected mask. +/// The signaure should be from the reference of Signature +pub fn should_track_transaction(signature: &[u8; SIGNATURE_BYTES]) -> bool { + // We do not use the highest signature byte as it is not really random + let match_portion: u16 = u16::from_le_bytes([signature[61], signature[62]]) >> 4; + trace!("Matching txn: {match_portion:016b} {:016b}", *TXN_MASK); + *TXN_MASK == match_portion +} + +/// Check if a transaction packet's signature matches the mask. +/// This does a rudimentry verification to make sure the packet at least +/// contains the signature data and it returns the reference to the signature. +pub fn signature_if_should_track_packet( + packet: &Packet, +) -> Result, PacketError> { + let signature = get_signature_from_packet(packet)?; + Ok(should_track_transaction(signature).then_some(signature)) +} + +/// Get the signature of the transaction packet +/// This does a rudimentry verification to make sure the packet at least +/// contains the signature data and it returns the reference to the signature. +pub fn get_signature_from_packet(packet: &Packet) -> Result<&[u8; SIGNATURE_BYTES], PacketError> { + let (sig_len_untrusted, sig_start) = packet + .data(..) + .and_then(|bytes| decode_shortu16_len(bytes).ok()) + .ok_or(PacketError::InvalidShortVec)?; + + if sig_len_untrusted < 1 { + return Err(PacketError::InvalidSignatureLen); + } + + let signature = packet + .data(sig_start..sig_start.saturating_add(SIGNATURE_BYTES)) + .ok_or(PacketError::InvalidSignatureLen)?; + let signature = signature + .try_into() + .map_err(|_| PacketError::InvalidSignatureLen)?; + Ok(signature) +} + +#[cfg(test)] +mod tests { + use { + super::*, + solana_sdk::{ + hash::Hash, + signature::{Keypair, Signature}, + system_transaction, + }, + }; + + #[test] + fn test_get_signature_from_packet() { + // Default invalid txn packet + let packet = Packet::default(); + let sig = get_signature_from_packet(&packet); + assert_eq!(sig, Err(PacketError::InvalidShortVec)); + + // Use a valid transaction, it should succeed + let tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let mut packet = Packet::from_data(None, tx).unwrap(); + + let sig = get_signature_from_packet(&packet); + assert!(sig.is_ok()); + + // Invalid signature length + packet.buffer_mut()[0] = 0x0; + let sig = get_signature_from_packet(&packet); + assert_eq!(sig, Err(PacketError::InvalidSignatureLen)); + } + + #[test] + fn test_should_track_transaction() { + let mut sig = [0x0; SIGNATURE_BYTES]; + let track = should_track_transaction(&sig); + assert!(!track); + + // Intentionally matching the randomly generated mask + // The lower four bits are ignored as only 12 highest bits from + // signature's 61 and 62 u8 are used for matching. + // We generate a random one + let mut rng = rand::thread_rng(); + let random_number: u8 = rng.gen_range(0..=15); + sig[61] = ((*TXN_MASK & 0xf_u16) << 4) as u8 | random_number; + sig[62] = (*TXN_MASK >> 4) as u8; + + let track = should_track_transaction(&sig); + assert!(track); + } + + #[test] + fn test_signature_if_should_track_packet() { + // Default invalid txn packet + let packet = Packet::default(); + let sig = signature_if_should_track_packet(&packet); + assert_eq!(sig, Err(PacketError::InvalidShortVec)); + + // Use a valid transaction which is not matched + let tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let packet = Packet::from_data(None, tx).unwrap(); + let sig = signature_if_should_track_packet(&packet); + assert_eq!(Ok(None), sig); + + // Now simulate a txn matching the signature mask + let mut tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let mut sig = [0x0; SIGNATURE_BYTES]; + sig[61] = ((*TXN_MASK & 0xf_u16) << 4) as u8; + sig[62] = (*TXN_MASK >> 4) as u8; + + let sig = Signature::from(sig); + tx.signatures[0] = sig; + let mut packet = Packet::from_data(None, tx).unwrap(); + let sig2 = signature_if_should_track_packet(&packet); + + match sig2 { + Ok(sig) => { + assert!(sig.is_some()); + } + Err(_) => panic!("Expected to get a matching signature!"), + } + + // Invalid signature length + packet.buffer_mut()[0] = 0x0; + let sig = signature_if_should_track_packet(&packet); + assert_eq!(sig, Err(PacketError::InvalidSignatureLen)); + } +}