transaction performance tracking -- streamer stage (#257)

* transaction performance tracking -- streamer stage
This commit is contained in:
Lijun Wang 2024-04-04 13:19:13 -07:00 committed by GitHub
parent bc81153d60
commit 2b0391049d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 317 additions and 3 deletions

16
Cargo.lock generated
View File

@ -7288,9 +7288,11 @@ dependencies = [
"rand 0.8.5",
"rustls",
"solana-logger",
"solana-measure",
"solana-metrics",
"solana-perf",
"solana-sdk",
"solana-transaction-metrics-tracker",
"thiserror",
"tokio",
"x509-parser",
@ -7457,6 +7459,20 @@ dependencies = [
"solana-version",
]
[[package]]
name = "solana-transaction-metrics-tracker"
version = "2.0.0"
dependencies = [
"Inflector",
"base64 0.22.0",
"bincode",
"lazy_static",
"log",
"rand 0.8.5",
"solana-perf",
"solana-sdk",
]
[[package]]
name = "solana-transaction-status"
version = "2.0.0"

View File

@ -107,6 +107,7 @@ members = [
"tokens",
"tpu-client",
"transaction-dos",
"transaction-metrics-tracker",
"transaction-status",
"turbine",
"udp-client",
@ -380,6 +381,7 @@ solana-test-validator = { path = "test-validator", version = "=2.0.0" }
solana-thin-client = { path = "thin-client", version = "=2.0.0" }
solana-tpu-client = { path = "tpu-client", version = "=2.0.0", default-features = false }
solana-transaction-status = { path = "transaction-status", version = "=2.0.0" }
solana-transaction-metrics-tracker = { path = "transaction-metrics-tracker", version = "=2.0.0" }
solana-turbine = { path = "turbine", version = "=2.0.0" }
solana-udp-client = { path = "udp-client", version = "=2.0.0" }
solana-version = { path = "version", version = "=2.0.0" }

View File

@ -6333,9 +6333,11 @@ dependencies = [
"quinn-proto",
"rand 0.8.5",
"rustls",
"solana-measure",
"solana-metrics",
"solana-perf",
"solana-sdk",
"solana-transaction-metrics-tracker",
"thiserror",
"tokio",
"x509-parser",
@ -6437,6 +6439,20 @@ dependencies = [
"tokio",
]
[[package]]
name = "solana-transaction-metrics-tracker"
version = "2.0.0"
dependencies = [
"Inflector",
"base64 0.22.0",
"bincode",
"lazy_static",
"log",
"rand 0.8.5",
"solana-perf",
"solana-sdk",
]
[[package]]
name = "solana-transaction-status"
version = "2.0.0"

View File

@ -33,6 +33,8 @@ bitflags! {
/// the packet is built.
/// This field can be removed when the above feature gate is adopted by mainnet-beta.
const ROUND_COMPUTE_UNIT_PRICE = 0b0010_0000;
/// For tracking performance
const PERF_TRACK_PACKET = 0b0100_0000;
}
}
@ -228,6 +230,12 @@ impl Meta {
self.flags.set(PacketFlags::TRACER_PACKET, is_tracer);
}
#[inline]
pub fn set_track_performance(&mut self, is_performance_track: bool) {
self.flags
.set(PacketFlags::PERF_TRACK_PACKET, is_performance_track);
}
#[inline]
pub fn set_simple_vote(&mut self, is_simple_vote: bool) {
self.flags.set(PacketFlags::SIMPLE_VOTE_TX, is_simple_vote);
@ -261,6 +269,11 @@ impl Meta {
self.flags.contains(PacketFlags::TRACER_PACKET)
}
#[inline]
pub fn is_perf_track_packet(&self) -> bool {
self.flags.contains(PacketFlags::PERF_TRACK_PACKET)
}
#[inline]
pub fn round_compute_unit_price(&self) -> bool {
self.flags.contains(PacketFlags::ROUND_COMPUTE_UNIT_PRICE)

View File

@ -26,9 +26,11 @@ quinn = { workspace = true }
quinn-proto = { workspace = true }
rand = { workspace = true }
rustls = { workspace = true, features = ["dangerous_configuration"] }
solana-measure = { workspace = true }
solana-metrics = { workspace = true }
solana-perf = { workspace = true }
solana-sdk = { workspace = true }
solana-transaction-metrics-tracker = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
x509-parser = { workspace = true }

View File

@ -17,6 +17,7 @@ use {
quinn::{Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt},
quinn_proto::VarIntBoundsExceeded,
rand::{thread_rng, Rng},
solana_measure::measure::Measure,
solana_perf::packet::{PacketBatch, PACKETS_PER_BATCH},
solana_sdk::{
packet::{Meta, PACKET_DATA_SIZE},
@ -27,9 +28,10 @@ use {
QUIC_MIN_STAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO,
QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO,
},
signature::Keypair,
signature::{Keypair, Signature},
timing,
},
solana_transaction_metrics_tracker::signature_if_should_track_packet,
std::{
iter::repeat_with,
net::{IpAddr, SocketAddr, UdpSocket},
@ -94,6 +96,7 @@ struct PacketChunk {
struct PacketAccumulator {
pub meta: Meta,
pub chunks: Vec<PacketChunk>,
pub start_time: Instant,
}
#[derive(Copy, Clone, Debug)]
@ -646,6 +649,7 @@ async fn packet_batch_sender(
trace!("enter packet_batch_sender");
let mut batch_start_time = Instant::now();
loop {
let mut packet_perf_measure: Vec<([u8; 64], std::time::Instant)> = Vec::default();
let mut packet_batch = PacketBatch::with_capacity(PACKETS_PER_BATCH);
let mut total_bytes: usize = 0;
@ -665,6 +669,8 @@ async fn packet_batch_sender(
|| (!packet_batch.is_empty() && elapsed >= coalesce)
{
let len = packet_batch.len();
track_streamer_fetch_packet_performance(&mut packet_perf_measure, &stats);
if let Err(e) = packet_sender.send(packet_batch) {
stats
.total_packet_batch_send_err
@ -710,6 +716,14 @@ async fn packet_batch_sender(
total_bytes += packet_batch[i].meta().size;
if let Some(signature) = signature_if_should_track_packet(&packet_batch[i])
.ok()
.flatten()
{
packet_perf_measure.push((*signature, packet_accumulator.start_time));
// we set the PERF_TRACK_PACKET on
packet_batch[i].meta_mut().set_track_performance(true);
}
stats
.total_chunks_processed_by_batcher
.fetch_add(num_chunks, Ordering::Relaxed);
@ -718,6 +732,32 @@ async fn packet_batch_sender(
}
}
fn track_streamer_fetch_packet_performance(
packet_perf_measure: &mut [([u8; 64], Instant)],
stats: &Arc<StreamStats>,
) {
if packet_perf_measure.is_empty() {
return;
}
let mut measure = Measure::start("track_perf");
let mut process_sampled_packets_us_hist = stats.process_sampled_packets_us_hist.lock().unwrap();
for (signature, start_time) in packet_perf_measure.iter() {
let duration = Instant::now().duration_since(*start_time);
debug!(
"QUIC streamer fetch stage took {duration:?} for transaction {:?}",
Signature::from(*signature)
);
process_sampled_packets_us_hist
.increment(duration.as_micros() as u64)
.unwrap();
}
measure.stop();
stats
.perf_track_overhead_us
.fetch_add(measure.as_us(), Ordering::Relaxed);
}
async fn handle_connection(
connection: Connection,
remote_addr: SocketAddr,
@ -872,6 +912,7 @@ async fn handle_chunk(
*packet_accum = Some(PacketAccumulator {
meta,
chunks: Vec::new(),
start_time: Instant::now(),
});
}
@ -1471,6 +1512,7 @@ pub mod test {
offset,
end_of_chunk: size,
}],
start_time: Instant::now(),
};
ptk_sender.send(packet_accum).await.unwrap();
}

View File

@ -16,8 +16,8 @@ use {
std::{
net::UdpSocket,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, RwLock,
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc, Mutex, RwLock,
},
thread,
time::{Duration, SystemTime},
@ -175,10 +175,19 @@ pub struct StreamStats {
pub(crate) stream_load_ema: AtomicUsize,
pub(crate) stream_load_ema_overflow: AtomicUsize,
pub(crate) stream_load_capacity_overflow: AtomicUsize,
pub(crate) process_sampled_packets_us_hist: Mutex<histogram::Histogram>,
pub(crate) perf_track_overhead_us: AtomicU64,
}
impl StreamStats {
pub fn report(&self, name: &'static str) {
let process_sampled_packets_us_hist = {
let mut metrics = self.process_sampled_packets_us_hist.lock().unwrap();
let process_sampled_packets_us_hist = metrics.clone();
metrics.clear();
process_sampled_packets_us_hist
};
datapoint_info!(
name,
(
@ -425,6 +434,38 @@ impl StreamStats {
self.stream_load_capacity_overflow.load(Ordering::Relaxed),
i64
),
(
"process_sampled_packets_us_90pct",
process_sampled_packets_us_hist
.percentile(90.0)
.unwrap_or(0),
i64
),
(
"process_sampled_packets_us_min",
process_sampled_packets_us_hist.minimum().unwrap_or(0),
i64
),
(
"process_sampled_packets_us_max",
process_sampled_packets_us_hist.maximum().unwrap_or(0),
i64
),
(
"process_sampled_packets_us_mean",
process_sampled_packets_us_hist.mean().unwrap_or(0),
i64
),
(
"process_sampled_packets_count",
process_sampled_packets_us_hist.entries(),
i64
),
(
"perf_track_overhead_us",
self.perf_track_overhead_us.swap(0, Ordering::Relaxed),
i64
),
);
}
}

View File

@ -0,0 +1,25 @@
[package]
name = "solana-transaction-metrics-tracker"
description = "Solana transaction metrics tracker"
documentation = "https://docs.rs/solana-transaction-metrics-tracker"
version = { workspace = true }
authors = { workspace = true }
repository = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
edition = { workspace = true }
publish = false
[dependencies]
Inflector = { workspace = true }
base64 = { workspace = true }
bincode = { workspace = true }
# Update this borsh dependency to the workspace version once
lazy_static = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
solana-perf = { workspace = true }
solana-sdk = { workspace = true }
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]

View File

@ -0,0 +1,157 @@
use {
lazy_static::lazy_static,
log::*,
rand::Rng,
solana_perf::sigverify::PacketError,
solana_sdk::{packet::Packet, short_vec::decode_shortu16_len, signature::SIGNATURE_BYTES},
};
// The mask is 12 bits long (1<<12 = 4096), it means the probability of matching
// the transaction is 1/4096 assuming the portion being matched is random.
lazy_static! {
static ref TXN_MASK: u16 = rand::thread_rng().gen_range(0..4096);
}
/// Check if a transaction given its signature matches the randomly selected mask.
/// The signaure should be from the reference of Signature
pub fn should_track_transaction(signature: &[u8; SIGNATURE_BYTES]) -> bool {
// We do not use the highest signature byte as it is not really random
let match_portion: u16 = u16::from_le_bytes([signature[61], signature[62]]) >> 4;
trace!("Matching txn: {match_portion:016b} {:016b}", *TXN_MASK);
*TXN_MASK == match_portion
}
/// Check if a transaction packet's signature matches the mask.
/// This does a rudimentry verification to make sure the packet at least
/// contains the signature data and it returns the reference to the signature.
pub fn signature_if_should_track_packet(
packet: &Packet,
) -> Result<Option<&[u8; SIGNATURE_BYTES]>, PacketError> {
let signature = get_signature_from_packet(packet)?;
Ok(should_track_transaction(signature).then_some(signature))
}
/// Get the signature of the transaction packet
/// This does a rudimentry verification to make sure the packet at least
/// contains the signature data and it returns the reference to the signature.
pub fn get_signature_from_packet(packet: &Packet) -> Result<&[u8; SIGNATURE_BYTES], PacketError> {
let (sig_len_untrusted, sig_start) = packet
.data(..)
.and_then(|bytes| decode_shortu16_len(bytes).ok())
.ok_or(PacketError::InvalidShortVec)?;
if sig_len_untrusted < 1 {
return Err(PacketError::InvalidSignatureLen);
}
let signature = packet
.data(sig_start..sig_start.saturating_add(SIGNATURE_BYTES))
.ok_or(PacketError::InvalidSignatureLen)?;
let signature = signature
.try_into()
.map_err(|_| PacketError::InvalidSignatureLen)?;
Ok(signature)
}
#[cfg(test)]
mod tests {
use {
super::*,
solana_sdk::{
hash::Hash,
signature::{Keypair, Signature},
system_transaction,
},
};
#[test]
fn test_get_signature_from_packet() {
// Default invalid txn packet
let packet = Packet::default();
let sig = get_signature_from_packet(&packet);
assert_eq!(sig, Err(PacketError::InvalidShortVec));
// Use a valid transaction, it should succeed
let tx = system_transaction::transfer(
&Keypair::new(),
&solana_sdk::pubkey::new_rand(),
1,
Hash::new_unique(),
);
let mut packet = Packet::from_data(None, tx).unwrap();
let sig = get_signature_from_packet(&packet);
assert!(sig.is_ok());
// Invalid signature length
packet.buffer_mut()[0] = 0x0;
let sig = get_signature_from_packet(&packet);
assert_eq!(sig, Err(PacketError::InvalidSignatureLen));
}
#[test]
fn test_should_track_transaction() {
let mut sig = [0x0; SIGNATURE_BYTES];
let track = should_track_transaction(&sig);
assert!(!track);
// Intentionally matching the randomly generated mask
// The lower four bits are ignored as only 12 highest bits from
// signature's 61 and 62 u8 are used for matching.
// We generate a random one
let mut rng = rand::thread_rng();
let random_number: u8 = rng.gen_range(0..=15);
sig[61] = ((*TXN_MASK & 0xf_u16) << 4) as u8 | random_number;
sig[62] = (*TXN_MASK >> 4) as u8;
let track = should_track_transaction(&sig);
assert!(track);
}
#[test]
fn test_signature_if_should_track_packet() {
// Default invalid txn packet
let packet = Packet::default();
let sig = signature_if_should_track_packet(&packet);
assert_eq!(sig, Err(PacketError::InvalidShortVec));
// Use a valid transaction which is not matched
let tx = system_transaction::transfer(
&Keypair::new(),
&solana_sdk::pubkey::new_rand(),
1,
Hash::new_unique(),
);
let packet = Packet::from_data(None, tx).unwrap();
let sig = signature_if_should_track_packet(&packet);
assert_eq!(Ok(None), sig);
// Now simulate a txn matching the signature mask
let mut tx = system_transaction::transfer(
&Keypair::new(),
&solana_sdk::pubkey::new_rand(),
1,
Hash::new_unique(),
);
let mut sig = [0x0; SIGNATURE_BYTES];
sig[61] = ((*TXN_MASK & 0xf_u16) << 4) as u8;
sig[62] = (*TXN_MASK >> 4) as u8;
let sig = Signature::from(sig);
tx.signatures[0] = sig;
let mut packet = Packet::from_data(None, tx).unwrap();
let sig2 = signature_if_should_track_packet(&packet);
match sig2 {
Ok(sig) => {
assert!(sig.is_some());
}
Err(_) => panic!("Expected to get a matching signature!"),
}
// Invalid signature length
packet.buffer_mut()[0] = 0x0;
let sig = signature_if_should_track_packet(&packet);
assert_eq!(sig, Err(PacketError::InvalidSignatureLen));
}
}