From 40bbf99c7421dd97a9ba85add78687194e5fba4d Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Wed, 25 Jan 2023 21:54:38 +0900 Subject: [PATCH] Add fully-reproducible online tracer for banking (#29196) * Add fully-reproducible online tracer for banking * Don't use eprintln!()... * Update programs/sbf/Cargo.lock... * Remove meaningless assert_eq * Group test-only code under aptly named mod * Remove needless overflow handling in receive_until * Delay stat aggregation as it's possible now * Use Cow to avoid needless heap allocs * Properly consume metrics action as soon as hold * Trace UnprocessedTransactionStorage::len() instead * Loosen joining api over type safety for replaystage * Introce hash event to override these when simulating * Use serde_with/serde_as instead of hacky workaround * Update another Cargo.lock... * Add detailed comment for Packet::buffer serialize * Rename sender_overhead_minimized_receiver_loop() * Use type interference for TraceError * Another minor rename * Retire now useless ForEach to simplify code * Use type alias as much as possible * Properly translate and propagate tracing errors * Clarify --enable-banking-trace with better naming * Consider unclean (signal-based) node restarts.. * Tweak logging and cli * Remove Bank events as it's not needed anymore * Make tpu own banking tracer thread * Reduce diff a bit.. * Use latest serde_with * Finally use the published rolling-file crate * Make test code change more consistent * Revive dead and non-terminating test code path... * Dispose batches early now that possible * Split off thread handle very early at ::new() * Tweak message for TooSmallDirByteLimitl * Remove too much of indirection * Remove needless pub from ::channel() * Clarify test comments * Avoid needless event creation if tracer is disabled * Write tests around file rotation and spill-over * Remove unneeded PathBuf::clone()s... * Introduce inner struct instead of tuple... * Remove unused enum BankStatus... * Avoid .unwrap() for the case of disabled tracer... --- Cargo.lock | 74 +++ banking-bench/src/main.rs | 30 +- core/Cargo.toml | 1 + core/benches/banking_stage.rs | 22 +- core/benches/banking_trace.rs | 169 +++++++ core/benches/sigverify_stage.rs | 47 +- core/src/banking_stage.rs | 76 +-- core/src/banking_trace.rs | 609 +++++++++++++++++++++++++ core/src/cluster_info_vote_listener.rs | 9 +- core/src/lib.rs | 1 + core/src/packet_deserializer.rs | 125 ++--- core/src/replay_stage.rs | 7 + core/src/sigverify.rs | 18 +- core/src/sigverify_stage.rs | 52 ++- core/src/tpu.rs | 20 +- core/src/tvu.rs | 4 + core/src/validator.rs | 22 + ledger/src/blockstore.rs | 5 +- local-cluster/src/validator_configs.rs | 1 + perf/src/cuda_runtime.rs | 4 +- perf/src/packet.rs | 4 +- programs/sbf/Cargo.lock | 78 +++- sdk/Cargo.toml | 1 + sdk/src/packet.rs | 35 +- validator/src/cli.rs | 22 + validator/src/main.rs | 18 + 26 files changed, 1291 insertions(+), 163 deletions(-) create mode 100644 core/benches/banking_trace.rs create mode 100644 core/src/banking_trace.rs diff --git a/Cargo.lock b/Cargo.lock index a6b82eecf0..e84841a2ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1198,6 +1198,41 @@ dependencies = [ "zeroize", ] +[[package]] +name = "darling" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0dd3cd20dc6b5a876612a6e5accfe7f3dd883db6d07acfbf14c128f61550dfa" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a784d2ccaf7c98501746bf0be29b2022ba41fd62a2e622af997a03e9f972859f" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2 1.0.41", + "quote 1.0.18", + "strsim 0.10.0", + "syn 1.0.98", +] + +[[package]] +name = "darling_macro" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7618812407e9402654622dd402b0a89dff9ba93badd6540781526117b92aab7e" +dependencies = [ + "darling_core", + "quote 1.0.18", + "syn 1.0.98", +] + [[package]] name = "dashmap" version = "4.0.2" @@ -2229,6 +2264,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.1.5" @@ -4081,6 +4122,15 @@ dependencies = [ "librocksdb-sys", ] +[[package]] +name = "rolling-file" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8395b4f860856b740f20a296ea2cd4d823e81a2658cf05ef61be22916026a906" +dependencies = [ + "chrono", +] + [[package]] name = "rpassword" version = "7.0.0" @@ -4390,6 +4440,28 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30d904179146de381af4c93d3af6ca4984b3152db687dacb9c3c35e86f39809c" +dependencies = [ + "serde", + "serde_with_macros", +] + +[[package]] +name = "serde_with_macros" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1966009f3c05f095697c537312f5415d1e3ed31ce0a56942bac4c771c5c335e" +dependencies = [ + "darling", + "proc-macro2 1.0.41", + "quote 1.0.18", + "syn 1.0.98", +] + [[package]] name = "serde_yaml" version = "0.8.26" @@ -5190,6 +5262,7 @@ dependencies = [ "rand_chacha 0.2.2", "raptorq", "rayon", + "rolling-file", "rustc_version 0.4.0", "serde", "serde_derive", @@ -6452,6 +6525,7 @@ dependencies = [ "serde_bytes", "serde_derive", "serde_json", + "serde_with", "sha2 0.10.5", "sha3 0.10.4", "solana-frozen-abi 1.15.0", diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index afeec1dfa5..4fbe37400b 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -6,7 +6,10 @@ use { rand::{thread_rng, Rng}, rayon::prelude::*, solana_client::connection_cache::ConnectionCache, - solana_core::banking_stage::BankingStage, + solana_core::{ + banking_stage::BankingStage, + banking_trace::{BankingPacketBatch, BankingTracer, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT}, + }, solana_gossip::cluster_info::{ClusterInfo, Node}, solana_ledger::{ blockstore::Blockstore, @@ -255,6 +258,12 @@ fn main() { .takes_value(false) .help("Skip transaction sanity execution"), ) + .arg( + Arg::new("trace_banking") + .long("trace-banking") + .takes_value(false) + .help("Enable banking tracing"), + ) .arg( Arg::new("write_lock_contention") .long("write-lock-contention") @@ -407,9 +416,17 @@ fn main() { let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank, &blockstore, None, Some(leader_schedule_cache)); - let (non_vote_sender, non_vote_receiver) = unbounded(); - let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); - let (gossip_vote_sender, gossip_vote_receiver) = unbounded(); + let (banking_tracer, tracer_thread) = + BankingTracer::new(matches.is_present("trace_banking").then_some(( + &blockstore.banking_trace_path(), + exit.clone(), + BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, + ))) + .unwrap(); + let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); + let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); + let (gossip_vote_sender, gossip_vote_receiver) = + banking_tracer.create_channel_gossip_vote(); let cluster_info = { let keypair = Arc::new(Keypair::new()); let node = Node::new_localhost_with_pubkey(&keypair.pubkey()); @@ -462,7 +479,7 @@ fn main() { timestamp(), ); non_vote_sender - .send((vec![packet_batch.clone()], None)) + .send(BankingPacketBatch::new((vec![packet_batch.clone()], None))) .unwrap(); } @@ -583,6 +600,9 @@ fn main() { poh_service.join().unwrap(); sleep(Duration::from_secs(1)); debug!("waited for poh_service"); + if let Some(tracer_thread) = tracer_thread { + tracer_thread.join().unwrap().unwrap(); + } } let _unused = Blockstore::destroy(&ledger_path); } diff --git a/core/Cargo.toml b/core/Cargo.toml index ef8bf57f2a..e6d0e00656 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -34,6 +34,7 @@ num_enum = "0.5.7" rand = "0.7.0" rand_chacha = "0.2.2" rayon = "1.5.3" +rolling-file = "0.2.0" serde = "1.0.144" serde_derive = "1.0.103" solana-address-lookup-table-program = { path = "../programs/address-lookup-table", version = "=1.15.0" } diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index bc4539c2d9..8f8b2d0a25 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -11,6 +11,7 @@ use { solana_client::connection_cache::ConnectionCache, solana_core::{ banking_stage::{BankingStage, BankingStageStats}, + banking_trace::{BankingPacketBatch, BankingTracer}, leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker, qos_service::QosService, unprocessed_packet_batches::*, @@ -197,9 +198,10 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { // during the benchmark genesis_config.ticks_per_slot = 10_000; - let (non_vote_sender, non_vote_receiver) = unbounded(); - let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); - let (gossip_vote_sender, gossip_vote_receiver) = unbounded(); + let banking_tracer = BankingTracer::new_disabled(); + let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); + let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); + let (gossip_vote_sender, gossip_vote_receiver) = banking_tracer.create_channel_gossip_vote(); let mut bank = Bank::new_for_benches(&genesis_config); // Allow arbitrary transaction processing time for the purposes of this bench @@ -304,10 +306,16 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { let mut sent = 0; if let Some(vote_packets) = &vote_packets { tpu_vote_sender - .send((vote_packets[start..start + chunk_len].to_vec(), None)) + .send(BankingPacketBatch::new(( + vote_packets[start..start + chunk_len].to_vec(), + None, + ))) .unwrap(); gossip_vote_sender - .send((vote_packets[start..start + chunk_len].to_vec(), None)) + .send(BankingPacketBatch::new(( + vote_packets[start..start + chunk_len].to_vec(), + None, + ))) .unwrap(); } for v in verified[start..start + chunk_len].chunks(chunk_len / num_threads) { @@ -321,7 +329,9 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { for xv in v { sent += xv.len(); } - non_vote_sender.send((v.to_vec(), None)).unwrap(); + non_vote_sender + .send(BankingPacketBatch::new((v.to_vec(), None))) + .unwrap(); } check_txs(&signal_receiver2, txes / CHUNKS); diff --git a/core/benches/banking_trace.rs b/core/benches/banking_trace.rs new file mode 100644 index 0000000000..fb93deebc1 --- /dev/null +++ b/core/benches/banking_trace.rs @@ -0,0 +1,169 @@ +#![feature(test)] + +extern crate test; + +use { + solana_core::banking_trace::{ + for_test::{ + drop_and_clean_temp_dir_unless_suppressed, sample_packet_batch, terminate_tracer, + }, + receiving_loop_with_minimized_sender_overhead, BankingPacketBatch, BankingTracer, + TraceError, TracerThreadResult, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, + }, + std::{ + path::PathBuf, + sync::{atomic::AtomicBool, Arc}, + thread, + }, + tempfile::TempDir, + test::Bencher, +}; + +fn ensure_fresh_setup_to_benchmark(path: &PathBuf) { + // make sure fresh setup; otherwise banking tracer appends and rotates + // trace files created by prior bench iterations, slightly skewing perf + // result... + BankingTracer::ensure_cleanup_path(path).unwrap(); +} + +fn black_box_packet_batch(packet_batch: BankingPacketBatch) -> TracerThreadResult { + test::black_box(packet_batch); + Ok(()) +} + +#[bench] +fn bench_banking_tracer_main_thread_overhead_noop_baseline(bencher: &mut Bencher) { + let exit = Arc::::default(); + let tracer = BankingTracer::new_disabled(); + let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote(); + + let exit_for_dummy_thread = exit.clone(); + let dummy_main_thread = thread::spawn(move || { + receiving_loop_with_minimized_sender_overhead::<_, TraceError, 0>( + exit_for_dummy_thread, + non_vote_receiver, + black_box_packet_batch, + ) + }); + + let packet_batch = sample_packet_batch(); + bencher.iter(|| { + non_vote_sender.send(packet_batch.clone()).unwrap(); + }); + terminate_tracer(tracer, None, dummy_main_thread, non_vote_sender, Some(exit)); +} + +#[bench] +fn bench_banking_tracer_main_thread_overhead_under_peak_write(bencher: &mut Bencher) { + let temp_dir = TempDir::new().unwrap(); + + let exit = Arc::::default(); + let (tracer, tracer_thread) = BankingTracer::new(Some(( + &temp_dir.path().join("banking-trace"), + exit.clone(), + BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, + ))) + .unwrap(); + let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote(); + + let exit_for_dummy_thread = exit.clone(); + let dummy_main_thread = thread::spawn(move || { + receiving_loop_with_minimized_sender_overhead::<_, TraceError, 0>( + exit_for_dummy_thread, + non_vote_receiver, + black_box_packet_batch, + ) + }); + + let packet_batch = sample_packet_batch(); + bencher.iter(|| { + non_vote_sender.send(packet_batch.clone()).unwrap(); + }); + + terminate_tracer( + tracer, + tracer_thread, + dummy_main_thread, + non_vote_sender, + Some(exit), + ); + drop_and_clean_temp_dir_unless_suppressed(temp_dir); +} + +#[bench] +fn bench_banking_tracer_main_thread_overhead_under_sustained_write(bencher: &mut Bencher) { + let temp_dir = TempDir::new().unwrap(); + + let exit = Arc::::default(); + let (tracer, tracer_thread) = BankingTracer::new(Some(( + &temp_dir.path().join("banking-trace"), + exit.clone(), + 1024 * 1024, // cause more frequent trace file rotation + ))) + .unwrap(); + let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote(); + + let exit_for_dummy_thread = exit.clone(); + let dummy_main_thread = thread::spawn(move || { + receiving_loop_with_minimized_sender_overhead::<_, TraceError, 0>( + exit_for_dummy_thread, + non_vote_receiver, + black_box_packet_batch, + ) + }); + + let packet_batch = sample_packet_batch(); + bencher.iter(|| { + non_vote_sender.send(packet_batch.clone()).unwrap(); + }); + + terminate_tracer( + tracer, + tracer_thread, + dummy_main_thread, + non_vote_sender, + Some(exit), + ); + drop_and_clean_temp_dir_unless_suppressed(temp_dir); +} + +#[bench] +fn bench_banking_tracer_background_thread_throughput(bencher: &mut Bencher) { + let temp_dir = TempDir::new().unwrap(); + let base_path = temp_dir.path(); + + let packet_batch = sample_packet_batch(); + + bencher.iter(move || { + let path = base_path.join("banking-trace"); + ensure_fresh_setup_to_benchmark(&path); + + let exit = Arc::::default(); + + let (tracer, tracer_thread) = + BankingTracer::new(Some((&path, exit.clone(), 50 * 1024 * 1024))).unwrap(); + let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote(); + + let dummy_main_thread = thread::spawn(move || { + receiving_loop_with_minimized_sender_overhead::<_, TraceError, 0>( + exit.clone(), + non_vote_receiver, + black_box_packet_batch, + ) + }); + + for _ in 0..1000 { + non_vote_sender.send(packet_batch.clone()).unwrap(); + } + + terminate_tracer( + tracer, + tracer_thread, + dummy_main_thread, + non_vote_sender, + None, + ); + }); + + drop_and_clean_temp_dir_unless_suppressed(temp_dir); +} diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index 9090388948..c5b6541488 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -12,6 +12,7 @@ use { thread_rng, Rng, }, solana_core::{ + banking_trace::BankingTracer, sigverify::TransactionSigVerifier, sigverify_stage::{SigVerifier, SigVerifyStage}, }, @@ -22,6 +23,7 @@ use { }, solana_sdk::{ hash::Hash, + packet::PacketFlags, signature::{Keypair, Signer}, system_transaction, timing::duration_as_ms, @@ -143,18 +145,26 @@ fn gen_batches(use_same_tx: bool) -> Vec { } #[bench] -fn bench_sigverify_stage(bencher: &mut Bencher) { +fn bench_sigverify_stage_with_same_tx(bencher: &mut Bencher) { + bench_sigverify_stage(bencher, true) +} + +#[bench] +fn bench_sigverify_stage_without_same_tx(bencher: &mut Bencher) { + bench_sigverify_stage(bencher, false) +} + +fn bench_sigverify_stage(bencher: &mut Bencher, use_same_tx: bool) { solana_logger::setup(); trace!("start"); let (packet_s, packet_r) = unbounded(); - let (verified_s, verified_r) = unbounded(); + let (verified_s, verified_r) = BankingTracer::channel_for_test(); let verifier = TransactionSigVerifier::new(verified_s); let stage = SigVerifyStage::new(packet_r, verifier, "bench"); - let use_same_tx = true; bencher.iter(move || { let now = Instant::now(); - let mut batches = gen_batches(use_same_tx); + let batches = gen_batches(use_same_tx); trace!( "starting... generation took: {} ms batches: {}", duration_as_ms(&now.elapsed()), @@ -162,21 +172,24 @@ fn bench_sigverify_stage(bencher: &mut Bencher) { ); let mut sent_len = 0; - for _ in 0..batches.len() { - if let Some(batch) = batches.pop() { - sent_len += batch.len(); - packet_s.send(vec![batch]).unwrap(); - } + for mut batch in batches.into_iter() { + sent_len += batch.len(); + batch + .iter_mut() + .for_each(|packet| packet.meta_mut().flags |= PacketFlags::TRACER_PACKET); + packet_s.send(vec![batch]).unwrap(); } let mut received = 0; + let mut total_tracer_packets_received_in_sigverify_stage = 0; trace!("sent: {}", sent_len); loop { - if let Ok((mut verifieds, _)) = verified_r.recv_timeout(Duration::from_millis(10)) { - while let Some(v) = verifieds.pop() { - received += v.len(); - batches.push(v); - } - if use_same_tx || received >= sent_len { + if let Ok(message) = verified_r.recv_timeout(Duration::from_millis(10)) { + let (verifieds, tracer_packet_stats) = (&message.0, message.1.as_ref().unwrap()); + received += verifieds.iter().map(|batch| batch.len()).sum::(); + total_tracer_packets_received_in_sigverify_stage += + tracer_packet_stats.total_tracer_packets_received_in_sigverify_stage; + test::black_box(message); + if total_tracer_packets_received_in_sigverify_stage >= sent_len { break; } } @@ -224,8 +237,8 @@ fn prepare_batches(discard_factor: i32) -> (Vec, usize) { fn bench_shrink_sigverify_stage_core(bencher: &mut Bencher, discard_factor: i32) { let (batches0, num_valid_packets) = prepare_batches(discard_factor); - let (_verified_s, _verified_r) = unbounded(); - let verifier = TransactionSigVerifier::new(_verified_s); + let (verified_s, _verified_r) = BankingTracer::channel_for_test(); + let verifier = TransactionSigVerifier::new(verified_s); let mut c = 0; let mut total_shrink_time = 0; diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 7ec397acb5..5f828d86cb 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -8,6 +8,7 @@ use { packet_receiver::PacketReceiver, }, crate::{ + banking_trace::BankingPacketReceiver, forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts, immutable_deserialized_packet::ImmutableDeserializedPacket, latest_unprocessed_votes::{LatestUnprocessedVotes, VoteSource}, @@ -18,7 +19,6 @@ use { next_leader::{next_leader_tpu_forwards, next_leader_tpu_vote}, packet_deserializer::PacketDeserializer, qos_service::QosService, - sigverify::SigverifyTracerPacketStats, tracer_packet_stats::TracerPacketStats, unprocessed_packet_batches::*, unprocessed_transaction_storage::{ @@ -26,9 +26,7 @@ use { }, }, core::iter::repeat, - crossbeam_channel::{ - Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, - }, + crossbeam_channel::RecvTimeoutError, histogram::Histogram, itertools::Itertools, solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, @@ -41,7 +39,7 @@ use { solana_metrics::inc_new_counter_info, solana_perf::{ data_budget::DataBudget, - packet::{Packet, PacketBatch, PACKETS_PER_BATCH}, + packet::{Packet, PACKETS_PER_BATCH}, }, solana_poh::poh_recorder::{BankStart, PohRecorder, PohRecorderError, TransactionRecorder}, solana_program_runtime::timings::ExecuteTimings, @@ -100,9 +98,6 @@ const MIN_THREADS_BANKING: u32 = 1; const MIN_TOTAL_THREADS: u32 = NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING; const SLOT_BOUNDARY_CHECK_PERIOD: Duration = Duration::from_millis(10); -pub type BankingPacketBatch = (Vec, Option); -pub type BankingPacketSender = CrossbeamSender; -pub type BankingPacketReceiver = CrossbeamReceiver; pub struct ProcessTransactionBatchOutput { // The number of transactions filtered out by the cost model @@ -1748,7 +1743,10 @@ impl BankingStage { mod tests { use { super::*, - crate::unprocessed_packet_batches, + crate::{ + banking_trace::{BankingPacketBatch, BankingTracer}, + unprocessed_packet_batches, + }, crossbeam_channel::{unbounded, Receiver}, solana_address_lookup_table_program::state::{AddressLookupTable, LookupTableMeta}, solana_entry::entry::{next_entry, next_versioned_entry, Entry, EntrySlice}, @@ -1761,7 +1759,7 @@ mod tests { get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache, }, - solana_perf::packet::{to_packet_batches, PacketFlags}, + solana_perf::packet::{to_packet_batches, PacketBatch, PacketFlags}, solana_poh::{ poh_recorder::{create_test_recorder, Record, WorkingBankEntry}, poh_service::PohService, @@ -1812,9 +1810,11 @@ mod tests { let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); - let (non_vote_sender, non_vote_receiver) = unbounded(); - let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); - let (gossip_vote_sender, gossip_vote_receiver) = unbounded(); + let banking_tracer = BankingTracer::new_disabled(); + let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); + let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); + let (gossip_vote_sender, gossip_vote_receiver) = + banking_tracer.create_channel_gossip_vote(); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( @@ -1861,9 +1861,11 @@ mod tests { let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); let start_hash = bank.last_blockhash(); - let (non_vote_sender, non_vote_receiver) = unbounded(); - let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); - let (gossip_vote_sender, gossip_vote_receiver) = unbounded(); + let banking_tracer = BankingTracer::new_disabled(); + let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); + let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); + let (gossip_vote_sender, gossip_vote_receiver) = + banking_tracer.create_channel_gossip_vote(); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( @@ -1937,9 +1939,11 @@ mod tests { let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); let start_hash = bank.last_blockhash(); - let (non_vote_sender, non_vote_receiver) = unbounded(); - let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); - let (gossip_vote_sender, gossip_vote_receiver) = unbounded(); + let banking_tracer = BankingTracer::new_disabled(); + let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); + let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); + let (gossip_vote_sender, gossip_vote_receiver) = + banking_tracer.create_channel_gossip_vote(); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( @@ -2002,7 +2006,7 @@ mod tests { .collect(); let packet_batches = convert_from_old_verified(packet_batches); non_vote_sender // no_ver, anf, tx - .send((packet_batches, None)) + .send(BankingPacketBatch::new((packet_batches, None))) .unwrap(); drop(non_vote_sender); @@ -2061,7 +2065,8 @@ mod tests { mint_keypair, .. } = create_slow_genesis_config(2); - let (non_vote_sender, non_vote_receiver) = unbounded(); + let banking_tracer = BankingTracer::new_disabled(); + let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); // Process a batch that includes a transaction that receives two lamports. let alice = Keypair::new(); @@ -2074,7 +2079,9 @@ mod tests { .map(|batch| (batch, vec![1u8])) .collect(); let packet_batches = convert_from_old_verified(packet_batches); - non_vote_sender.send((packet_batches, None)).unwrap(); + non_vote_sender + .send(BankingPacketBatch::new((packet_batches, None))) + .unwrap(); // Process a second batch that uses the same from account, so conflicts with above TX let tx = @@ -2085,10 +2092,13 @@ mod tests { .map(|batch| (batch, vec![1u8])) .collect(); let packet_batches = convert_from_old_verified(packet_batches); - non_vote_sender.send((packet_batches, None)).unwrap(); + non_vote_sender + .send(BankingPacketBatch::new((packet_batches, None))) + .unwrap(); - let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); - let (gossip_vote_sender, gossip_vote_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); + let (gossip_vote_sender, gossip_vote_receiver) = + banking_tracer.create_channel_gossip_vote(); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let (replay_vote_sender, _replay_vote_receiver) = unbounded(); @@ -2651,7 +2661,7 @@ mod tests { } fn simulate_poh( - record_receiver: CrossbeamReceiver, + record_receiver: Receiver, poh_recorder: &Arc>, ) -> JoinHandle<()> { let poh_recorder = poh_recorder.clone(); @@ -3780,9 +3790,11 @@ mod tests { let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); let start_hash = bank.last_blockhash(); - let (non_vote_sender, non_vote_receiver) = unbounded(); - let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); - let (gossip_vote_sender, gossip_vote_receiver) = unbounded(); + let banking_tracer = BankingTracer::new_disabled(); + let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); + let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); + let (gossip_vote_sender, gossip_vote_receiver) = + banking_tracer.create_channel_gossip_vote(); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( @@ -3885,7 +3897,11 @@ mod tests { .into_iter() .map(|(packet_batches, sender)| { Builder::new() - .spawn(move || sender.send((packet_batches, None)).unwrap()) + .spawn(move || { + sender + .send(BankingPacketBatch::new((packet_batches, None))) + .unwrap() + }) .unwrap() }) .for_each(|handle| handle.join().unwrap()); diff --git a/core/src/banking_trace.rs b/core/src/banking_trace.rs new file mode 100644 index 0000000000..b245e1fb59 --- /dev/null +++ b/core/src/banking_trace.rs @@ -0,0 +1,609 @@ +use { + crate::sigverify::SigverifyTracerPacketStats, + bincode::serialize_into, + chrono::{DateTime, Local}, + crossbeam_channel::{unbounded, Receiver, SendError, Sender, TryRecvError}, + rolling_file::{RollingCondition, RollingConditionBasic, RollingFileAppender}, + solana_perf::{ + packet::{to_packet_batches, PacketBatch}, + test_tx::test_tx, + }, + solana_sdk::{hash::Hash, slot_history::Slot}, + std::{ + fs::{create_dir_all, remove_dir_all}, + io::{self, Write}, + path::PathBuf, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, sleep, JoinHandle}, + time::{Duration, SystemTime}, + }, + tempfile::TempDir, + thiserror::Error, +}; + +pub type BankingPacketBatch = Arc<(Vec, Option)>; +pub type BankingPacketSender = TracedSender; +pub type BankingPacketReceiver = Receiver; +pub type TracerThreadResult = Result<(), TraceError>; +pub type TracerThread = Option>; +pub type DirByteLimit = u64; + +#[derive(Error, Debug)] +pub enum TraceError { + #[error("IO Error: {0}")] + IoError(#[from] std::io::Error), + + #[error("Serialization Error: {0}")] + SerializeError(#[from] bincode::Error), + + #[error("Integer Cast Error: {0}")] + IntegerCastError(#[from] std::num::TryFromIntError), + + #[error("Trace directory's byte limit is too small (must be larger than {1}): {0}")] + TooSmallDirByteLimit(DirByteLimit, DirByteLimit), +} + +const BASENAME: &str = "events"; +const TRACE_FILE_ROTATE_COUNT: u64 = 14; // target 2 weeks retention under normal load +const TRACE_FILE_WRITE_INTERVAL_MS: u64 = 100; +const BUF_WRITER_CAPACITY: usize = 10 * 1024 * 1024; +pub const TRACE_FILE_DEFAULT_ROTATE_BYTE_THRESHOLD: u64 = 1024 * 1024 * 1024; +pub const DISABLED_BAKING_TRACE_DIR: DirByteLimit = 0; +pub const BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT: DirByteLimit = + TRACE_FILE_DEFAULT_ROTATE_BYTE_THRESHOLD * TRACE_FILE_ROTATE_COUNT; + +#[derive(Clone, Debug)] +struct ActiveTracer { + trace_sender: Sender, + exit: Arc, +} + +#[derive(Debug)] +pub struct BankingTracer { + active_tracer: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct TimedTracedEvent(std::time::SystemTime, TracedEvent); + +#[derive(Serialize, Deserialize, Debug)] +enum TracedEvent { + PacketBatch(ChannelLabel, BankingPacketBatch), + BlockAndBankHash(Slot, Hash, Hash), +} + +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] +pub enum ChannelLabel { + NonVote, + TpuVote, + GossipVote, + Dummy, +} + +struct RollingConditionGrouped { + basic: RollingConditionBasic, + tried_rollover_after_opened: bool, + is_checked: bool, +} + +impl RollingConditionGrouped { + fn new(basic: RollingConditionBasic) -> Self { + Self { + basic, + tried_rollover_after_opened: bool::default(), + is_checked: bool::default(), + } + } + + fn reset(&mut self) { + self.is_checked = false; + } +} + +struct GroupedWriter<'a> { + now: DateTime, + underlying: &'a mut RollingFileAppender, +} + +impl<'a> GroupedWriter<'a> { + fn new(underlying: &'a mut RollingFileAppender) -> Self { + Self { + now: Local::now(), + underlying, + } + } +} + +impl RollingCondition for RollingConditionGrouped { + fn should_rollover(&mut self, now: &DateTime, current_filesize: u64) -> bool { + if !self.tried_rollover_after_opened { + self.tried_rollover_after_opened = true; + + // rollover normally if empty to reuse it if possible + if current_filesize > 0 { + // forcibly rollover anew, so that we always avoid to append + // to a possibly-damaged tracing file even after unclean + // restarts + return true; + } + } + + if !self.is_checked { + self.is_checked = true; + self.basic.should_rollover(now, current_filesize) + } else { + false + } + } +} + +impl<'a> Write for GroupedWriter<'a> { + fn write(&mut self, buf: &[u8]) -> std::result::Result { + self.underlying.write_with_datetime(buf, &self.now) + } + fn flush(&mut self) -> std::result::Result<(), io::Error> { + self.underlying.flush() + } +} + +pub fn receiving_loop_with_minimized_sender_overhead( + exit: Arc, + receiver: Receiver, + mut on_recv: impl FnMut(T) -> Result<(), E>, +) -> Result<(), E> { + 'outer: while !exit.load(Ordering::Relaxed) { + 'inner: loop { + // avoid futex-based blocking here, otherwise a sender would have to + // wake me up at a syscall cost... + match receiver.try_recv() { + Ok(message) => on_recv(message)?, + Err(TryRecvError::Empty) => break 'inner, + Err(TryRecvError::Disconnected) => { + break 'outer; + } + }; + if exit.load(Ordering::Relaxed) { + break 'outer; + } + } + sleep(Duration::from_millis(SLEEP_MS)); + } + + Ok(()) +} + +impl BankingTracer { + pub fn new( + maybe_config: Option<(&PathBuf, Arc, DirByteLimit)>, + ) -> Result<(Arc, TracerThread), TraceError> { + match maybe_config { + None => Ok((Self::new_disabled(), None)), + Some((path, exit, dir_byte_limit)) => { + let rotate_threshold_size = dir_byte_limit / TRACE_FILE_ROTATE_COUNT; + if rotate_threshold_size == 0 { + return Err(TraceError::TooSmallDirByteLimit( + dir_byte_limit, + TRACE_FILE_ROTATE_COUNT, + )); + } + + let (trace_sender, trace_receiver) = unbounded(); + + let file_appender = Self::create_file_appender(path, rotate_threshold_size)?; + + let tracer_thread = + Self::spawn_background_thread(trace_receiver, file_appender, exit.clone())?; + + Ok(( + Arc::new(Self { + active_tracer: Some(ActiveTracer { trace_sender, exit }), + }), + Some(tracer_thread), + )) + } + } + } + + pub fn new_disabled() -> Arc { + Arc::new(Self { + active_tracer: None, + }) + } + + pub fn is_enabled(&self) -> bool { + self.active_tracer.is_some() + } + + fn create_channel(&self, label: ChannelLabel) -> (BankingPacketSender, BankingPacketReceiver) { + Self::channel(label, self.active_tracer.as_ref().cloned()) + } + + pub fn create_channel_non_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) { + self.create_channel(ChannelLabel::NonVote) + } + + pub fn create_channel_tpu_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) { + self.create_channel(ChannelLabel::TpuVote) + } + + pub fn create_channel_gossip_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) { + self.create_channel(ChannelLabel::GossipVote) + } + + pub fn hash_event(&self, slot: Slot, blockhash: &Hash, bank_hash: &Hash) { + self.trace_event(|| { + TimedTracedEvent( + SystemTime::now(), + TracedEvent::BlockAndBankHash(slot, *blockhash, *bank_hash), + ) + }) + } + + fn trace_event(&self, on_trace: impl Fn() -> TimedTracedEvent) { + if let Some(ActiveTracer { trace_sender, exit }) = &self.active_tracer { + if !exit.load(Ordering::Relaxed) { + trace_sender + .send(on_trace()) + .expect("active tracer thread unless exited"); + } + } + } + + pub fn channel_for_test() -> (TracedSender, Receiver) { + Self::channel(ChannelLabel::Dummy, None) + } + + fn channel( + label: ChannelLabel, + active_tracer: Option, + ) -> (TracedSender, Receiver) { + let (sender, receiver) = unbounded(); + (TracedSender::new(label, sender, active_tracer), receiver) + } + + pub fn ensure_cleanup_path(path: &PathBuf) -> Result<(), io::Error> { + remove_dir_all(path).or_else(|err| { + if err.kind() == io::ErrorKind::NotFound { + Ok(()) + } else { + Err(err) + } + }) + } + + fn create_file_appender( + path: &PathBuf, + rotate_threshold_size: u64, + ) -> Result, TraceError> { + create_dir_all(path)?; + let grouped = RollingConditionGrouped::new( + RollingConditionBasic::new() + .daily() + .max_size(rotate_threshold_size), + ); + let appender = RollingFileAppender::new_with_buffer_capacity( + path.join(BASENAME), + grouped, + (TRACE_FILE_ROTATE_COUNT - 1).try_into()?, + BUF_WRITER_CAPACITY, + )?; + Ok(appender) + } + + fn spawn_background_thread( + trace_receiver: Receiver, + mut file_appender: RollingFileAppender, + exit: Arc, + ) -> Result, TraceError> { + let thread = thread::Builder::new().name("solBanknTracer".into()).spawn( + move || -> TracerThreadResult { + receiving_loop_with_minimized_sender_overhead::<_, _, TRACE_FILE_WRITE_INTERVAL_MS>( + exit, + trace_receiver, + |event| -> Result<(), TraceError> { + file_appender.condition_mut().reset(); + serialize_into(&mut GroupedWriter::new(&mut file_appender), &event)?; + Ok(()) + }, + )?; + file_appender.flush()?; + Ok(()) + }, + )?; + + Ok(thread) + } +} + +pub struct TracedSender { + label: ChannelLabel, + sender: Sender, + active_tracer: Option, +} + +impl TracedSender { + fn new( + label: ChannelLabel, + sender: Sender, + active_tracer: Option, + ) -> Self { + Self { + label, + sender, + active_tracer, + } + } + + pub fn send(&self, batch: BankingPacketBatch) -> Result<(), SendError> { + if let Some(ActiveTracer { trace_sender, exit }) = &self.active_tracer { + if !exit.load(Ordering::Relaxed) { + trace_sender + .send(TimedTracedEvent( + SystemTime::now(), + TracedEvent::PacketBatch(self.label, BankingPacketBatch::clone(&batch)), + )) + .map_err(|err| { + error!( + "unexpected error when tracing a banking event...: {:?}", + err + ); + SendError(BankingPacketBatch::clone(&batch)) + })?; + } + } + self.sender.send(batch) + } +} + +pub mod for_test { + use super::*; + + pub fn sample_packet_batch() -> BankingPacketBatch { + BankingPacketBatch::new((to_packet_batches(&vec![test_tx(); 4], 10), None)) + } + + pub fn drop_and_clean_temp_dir_unless_suppressed(temp_dir: TempDir) { + std::env::var("BANKING_TRACE_LEAVE_FILES").is_ok().then(|| { + warn!("prevented to remove {:?}", temp_dir.path()); + drop(temp_dir.into_path()); + }); + } + + pub fn terminate_tracer( + tracer: Arc, + tracer_thread: TracerThread, + main_thread: JoinHandle, + sender: TracedSender, + exit: Option>, + ) { + if let Some(exit) = exit { + exit.store(true, Ordering::Relaxed); + } + drop((sender, tracer)); + main_thread.join().unwrap().unwrap(); + if let Some(tracer_thread) = tracer_thread { + tracer_thread.join().unwrap().unwrap(); + } + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + bincode::ErrorKind::Io as BincodeIoError, + std::{ + fs::File, + io::{BufReader, ErrorKind::UnexpectedEof}, + str::FromStr, + }, + }; + + #[test] + fn test_new_disabled() { + let exit = Arc::::default(); + + let tracer = BankingTracer::new_disabled(); + let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote(); + + let dummy_main_thread = thread::spawn(move || { + receiving_loop_with_minimized_sender_overhead::<_, TraceError, 0>( + exit, + non_vote_receiver, + |_packet_batch| Ok(()), + ) + }); + + non_vote_sender + .send(BankingPacketBatch::new((vec![], None))) + .unwrap(); + for_test::terminate_tracer(tracer, None, dummy_main_thread, non_vote_sender, None); + } + + #[test] + fn test_send_after_exited() { + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("banking-trace"); + let exit = Arc::::default(); + let (tracer, tracer_thread) = + BankingTracer::new(Some((&path, exit.clone(), DirByteLimit::max_value()))).unwrap(); + let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote(); + + let exit_for_dummy_thread = Arc::::default(); + let exit_for_dummy_thread2 = exit_for_dummy_thread.clone(); + let dummy_main_thread = thread::spawn(move || { + receiving_loop_with_minimized_sender_overhead::<_, TraceError, 0>( + exit_for_dummy_thread, + non_vote_receiver, + |_packet_batch| Ok(()), + ) + }); + + // kill and join the tracer thread + exit.store(true, Ordering::Relaxed); + tracer_thread.unwrap().join().unwrap().unwrap(); + + // .hash_event() must succeed even after exit is already set to true + let blockhash = Hash::from_str("B1ockhash1111111111111111111111111111111111").unwrap(); + let bank_hash = Hash::from_str("BankHash11111111111111111111111111111111111").unwrap(); + tracer.hash_event(4, &blockhash, &bank_hash); + + drop(tracer); + + // .send() must succeed even after exit is already set to true and further tracer is + // already dropped + non_vote_sender + .send(for_test::sample_packet_batch()) + .unwrap(); + + // finally terminate and join the main thread + exit_for_dummy_thread2.store(true, Ordering::Relaxed); + dummy_main_thread.join().unwrap().unwrap(); + } + + #[test] + fn test_record_and_restore() { + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("banking-trace"); + let exit = Arc::::default(); + let (tracer, tracer_thread) = + BankingTracer::new(Some((&path, exit.clone(), DirByteLimit::max_value()))).unwrap(); + let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote(); + + let dummy_main_thread = thread::spawn(move || { + receiving_loop_with_minimized_sender_overhead::<_, TraceError, 0>( + exit, + non_vote_receiver, + |_packet_batch| Ok(()), + ) + }); + + non_vote_sender + .send(for_test::sample_packet_batch()) + .unwrap(); + let blockhash = Hash::from_str("B1ockhash1111111111111111111111111111111111").unwrap(); + let bank_hash = Hash::from_str("BankHash11111111111111111111111111111111111").unwrap(); + tracer.hash_event(4, &blockhash, &bank_hash); + + for_test::terminate_tracer( + tracer, + tracer_thread, + dummy_main_thread, + non_vote_sender, + None, + ); + + let mut stream = BufReader::new(File::open(path.join(BASENAME)).unwrap()); + let results = (0..=3) + .map(|_| bincode::deserialize_from::<_, TimedTracedEvent>(&mut stream)) + .collect::>(); + + let mut i = 0; + assert_matches!( + results[i], + Ok(TimedTracedEvent( + _, + TracedEvent::PacketBatch(ChannelLabel::NonVote, _) + )) + ); + i += 1; + assert_matches!( + results[i], + Ok(TimedTracedEvent( + _, + TracedEvent::BlockAndBankHash(4, actual_blockhash, actual_bank_hash) + )) if actual_blockhash == blockhash && actual_bank_hash == bank_hash + ); + i += 1; + assert_matches!( + results[i], + Err(ref err) if matches!( + **err, + BincodeIoError(ref error) if error.kind() == UnexpectedEof + ) + ); + + for_test::drop_and_clean_temp_dir_unless_suppressed(temp_dir); + } + + #[test] + fn test_spill_over_at_rotation() { + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("banking-trace"); + const REALLY_SMALL_ROTATION_THRESHOLD: u64 = 1; + + let mut file_appender = + BankingTracer::create_file_appender(&path, REALLY_SMALL_ROTATION_THRESHOLD).unwrap(); + file_appender.write_all(b"foo").unwrap(); + file_appender.condition_mut().reset(); + file_appender.write_all(b"bar").unwrap(); + file_appender.condition_mut().reset(); + file_appender.flush().unwrap(); + + assert_eq!( + [ + std::fs::read_to_string(path.join("events")).ok(), + std::fs::read_to_string(path.join("events.1")).ok(), + std::fs::read_to_string(path.join("events.2")).ok(), + ], + [Some("bar".into()), Some("foo".into()), None] + ); + + for_test::drop_and_clean_temp_dir_unless_suppressed(temp_dir); + } + + #[test] + fn test_reopen_with_blank_file() { + let temp_dir = TempDir::new().unwrap(); + + let path = temp_dir.path().join("banking-trace"); + + let mut file_appender = + BankingTracer::create_file_appender(&path, TRACE_FILE_DEFAULT_ROTATE_BYTE_THRESHOLD) + .unwrap(); + // assume this is unclean write + file_appender.write_all(b"f").unwrap(); + file_appender.flush().unwrap(); + + // reopen while shadow-dropping the old tracer + let mut file_appender = + BankingTracer::create_file_appender(&path, TRACE_FILE_DEFAULT_ROTATE_BYTE_THRESHOLD) + .unwrap(); + // new file won't be created as appender is lazy + assert_eq!( + [ + std::fs::read_to_string(path.join("events")).ok(), + std::fs::read_to_string(path.join("events.1")).ok(), + std::fs::read_to_string(path.join("events.2")).ok(), + ], + [Some("f".into()), None, None] + ); + + // initial write actually creates the new blank file + file_appender.write_all(b"bar").unwrap(); + assert_eq!( + [ + std::fs::read_to_string(path.join("events")).ok(), + std::fs::read_to_string(path.join("events.1")).ok(), + std::fs::read_to_string(path.join("events.2")).ok(), + ], + [Some("".into()), Some("f".into()), None] + ); + + // flush actually write the actual data + file_appender.flush().unwrap(); + assert_eq!( + [ + std::fs::read_to_string(path.join("events")).ok(), + std::fs::read_to_string(path.join("events.1")).ok(), + std::fs::read_to_string(path.join("events.2")).ok(), + ], + [Some("bar".into()), Some("f".into()), None] + ); + + for_test::drop_and_clean_temp_dir_unless_suppressed(temp_dir); + } +} diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 8e56a210ed..e250e59c49 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -1,6 +1,6 @@ use { crate::{ - banking_stage::BankingPacketSender, + banking_trace::{BankingPacketBatch, BankingPacketSender}, optimistic_confirmation_verifier::OptimisticConfirmationVerifier, replay_stage::DUPLICATE_THRESHOLD, result::{Error, Result}, @@ -472,7 +472,8 @@ impl ClusterInfoVoteListener { for single_validator_votes in gossip_votes_iterator { bank_send_votes_stats.num_votes_sent += single_validator_votes.len(); bank_send_votes_stats.num_batches_sent += 1; - verified_packets_sender.send((single_validator_votes, None))?; + verified_packets_sender + .send(BankingPacketBatch::new((single_validator_votes, None)))?; } filter_gossip_votes_timing.stop(); bank_send_votes_stats.total_elapsed += filter_gossip_votes_timing.as_us(); @@ -871,6 +872,7 @@ impl ClusterInfoVoteListener { mod tests { use { super::*, + crate::banking_trace::BankingTracer, solana_perf::packet, solana_rpc::optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, solana_runtime::{ @@ -1685,7 +1687,8 @@ mod tests { let current_leader_bank = Arc::new(Bank::new_for_tests(&genesis_config)); let mut bank_vote_sender_state_option: Option = None; let verified_vote_packets = VerifiedVotePackets::default(); - let (verified_packets_sender, _verified_packets_receiver) = unbounded(); + let (verified_packets_sender, _verified_packets_receiver) = + BankingTracer::channel_for_test(); // 1) If we hand over a `current_leader_bank`, vote sender state should be updated ClusterInfoVoteListener::check_for_leader_bank_and_send_votes( diff --git a/core/src/lib.rs b/core/src/lib.rs index ff0b259246..b74164b52e 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -11,6 +11,7 @@ pub mod accounts_hash_verifier; pub mod ancestor_hashes_service; pub mod banking_stage; +pub mod banking_trace; pub mod broadcast_stage; pub mod cache_block_meta_service; pub mod cluster_info_vote_listener; diff --git a/core/src/packet_deserializer.rs b/core/src/packet_deserializer.rs index 8ae302ee3c..094228dd07 100644 --- a/core/src/packet_deserializer.rs +++ b/core/src/packet_deserializer.rs @@ -2,17 +2,15 @@ use { crate::{ + banking_trace::{BankingPacketBatch, BankingPacketReceiver}, immutable_deserialized_packet::ImmutableDeserializedPacket, sigverify::SigverifyTracerPacketStats, }, - crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}, + crossbeam_channel::RecvTimeoutError, solana_perf::packet::PacketBatch, std::time::{Duration, Instant}, }; -pub type BankingPacketBatch = (Vec, Option); -pub type BankingPacketReceiver = CrossbeamReceiver; - /// Results from deserializing packet batches. pub struct ReceivePacketResults { /// Deserialized packets from all received packet batches @@ -43,84 +41,87 @@ impl PacketDeserializer { recv_timeout: Duration, capacity: usize, ) -> Result { - let (packet_batches, sigverify_tracer_stats_option) = - self.receive_until(recv_timeout, capacity)?; + let (packet_count, packet_batches) = self.receive_until(recv_timeout, capacity)?; Ok(Self::deserialize_and_collect_packets( + packet_count, &packet_batches, - sigverify_tracer_stats_option, )) } - /// Deserialize packet batches and collect them into ReceivePacketResults + /// Deserialize packet batches, aggregates tracer packet stats, and collect + /// them into ReceivePacketResults fn deserialize_and_collect_packets( - packet_batches: &[PacketBatch], - sigverify_tracer_stats_option: Option, + packet_count: usize, + banking_batches: &[BankingPacketBatch], ) -> ReceivePacketResults { - let packet_count: usize = packet_batches.iter().map(|x| x.len()).sum(); let mut passed_sigverify_count: usize = 0; let mut failed_sigverify_count: usize = 0; let mut deserialized_packets = Vec::with_capacity(packet_count); - for packet_batch in packet_batches { - let packet_indexes = Self::generate_packet_indexes(packet_batch); + let mut aggregated_tracer_packet_stats_option = None::; - passed_sigverify_count += packet_indexes.len(); - failed_sigverify_count += packet_batch.len().saturating_sub(packet_indexes.len()); + for banking_batch in banking_batches { + for packet_batch in &banking_batch.0 { + let packet_indexes = Self::generate_packet_indexes(packet_batch); - deserialized_packets.extend(Self::deserialize_packets(packet_batch, &packet_indexes)); - } + passed_sigverify_count += packet_indexes.len(); + failed_sigverify_count += packet_batch.len().saturating_sub(packet_indexes.len()); - ReceivePacketResults { - deserialized_packets, - new_tracer_stats_option: sigverify_tracer_stats_option, - passed_sigverify_count: passed_sigverify_count as u64, - failed_sigverify_count: failed_sigverify_count as u64, - } - } + deserialized_packets + .extend(Self::deserialize_packets(packet_batch, &packet_indexes)); + } - /// Receives packet batches from sigverify stage with a timeout, and aggregates tracer packet stats - fn receive_until( - &self, - recv_timeout: Duration, - packet_count_upperbound: usize, - ) -> Result<(Vec, Option), RecvTimeoutError> { - let start = Instant::now(); - - let (mut packet_batches_received_so_far, mut aggregated_tracer_packet_stats_option) = - self.packet_batch_receiver.recv_timeout(recv_timeout)?; - let mut num_packets_received = packet_batches_received_so_far - .iter() - .map(|batch| batch.len()) - .sum::(); - - while let Ok((packet_batches, tracer_packet_stats_option)) = - self.packet_batch_receiver.try_recv() - { - trace!("got more packet batches in packet deserializer"); - num_packets_received += packet_batches - .iter() - .map(|batch| batch.len()) - .sum::(); - packet_batches_received_so_far.extend(packet_batches); - - if let Some(tracer_packet_stats) = &tracer_packet_stats_option { + if let Some(tracer_packet_stats) = &banking_batch.1 { if let Some(aggregated_tracer_packet_stats) = &mut aggregated_tracer_packet_stats_option { aggregated_tracer_packet_stats.aggregate(tracer_packet_stats); } else { - aggregated_tracer_packet_stats_option = tracer_packet_stats_option; + // BankingPacketBatch is owned by Arc; so we have to clone its internal field + // (SigverifyTracerPacketStats). + aggregated_tracer_packet_stats_option = Some(tracer_packet_stats.clone()); } } + } + + ReceivePacketResults { + deserialized_packets, + new_tracer_stats_option: aggregated_tracer_packet_stats_option, + passed_sigverify_count: passed_sigverify_count as u64, + failed_sigverify_count: failed_sigverify_count as u64, + } + } + + /// Receives packet batches from sigverify stage with a timeout + fn receive_until( + &self, + recv_timeout: Duration, + packet_count_upperbound: usize, + ) -> Result<(usize, Vec), RecvTimeoutError> { + let start = Instant::now(); + + let message = self.packet_batch_receiver.recv_timeout(recv_timeout)?; + let packet_batches = &message.0; + let mut num_packets_received = packet_batches + .iter() + .map(|batch| batch.len()) + .sum::(); + let mut messages = vec![message]; + + while let Ok(message) = self.packet_batch_receiver.try_recv() { + let packet_batches = &message.0; + trace!("got more packet batches in packet deserializer"); + num_packets_received += packet_batches + .iter() + .map(|batch| batch.len()) + .sum::(); + messages.push(message); if start.elapsed() >= recv_timeout || num_packets_received >= packet_count_upperbound { break; } } - Ok(( - packet_batches_received_so_far, - aggregated_tracer_packet_stats_option, - )) + Ok((num_packets_received, messages)) } fn generate_packet_indexes(packet_batch: &PacketBatch) -> Vec { @@ -159,7 +160,7 @@ mod tests { #[test] fn test_deserialize_and_collect_packets_empty() { - let results = PacketDeserializer::deserialize_and_collect_packets(&[], None); + let results = PacketDeserializer::deserialize_and_collect_packets(0, &[]); assert_eq!(results.deserialized_packets.len(), 0); assert!(results.new_tracer_stats_option.is_none()); assert_eq!(results.passed_sigverify_count, 0); @@ -172,7 +173,11 @@ mod tests { let packet_batches = to_packet_batches(&transactions, 1); assert_eq!(packet_batches.len(), 2); - let results = PacketDeserializer::deserialize_and_collect_packets(&packet_batches, None); + let packet_count: usize = packet_batches.iter().map(|x| x.len()).sum(); + let results = PacketDeserializer::deserialize_and_collect_packets( + packet_count, + &[BankingPacketBatch::new((packet_batches, None))], + ); assert_eq!(results.deserialized_packets.len(), 2); assert!(results.new_tracer_stats_option.is_none()); assert_eq!(results.passed_sigverify_count, 2); @@ -186,7 +191,11 @@ mod tests { assert_eq!(packet_batches.len(), 2); packet_batches[0][0].meta_mut().set_discard(true); - let results = PacketDeserializer::deserialize_and_collect_packets(&packet_batches, None); + let packet_count: usize = packet_batches.iter().map(|x| x.len()).sum(); + let results = PacketDeserializer::deserialize_and_collect_packets( + packet_count, + &[BankingPacketBatch::new((packet_batches, None))], + ); assert_eq!(results.deserialized_packets.len(), 1); assert!(results.new_tracer_stats_option.is_none()); assert_eq!(results.passed_sigverify_count, 1); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 67be683632..879ac05d4b 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -3,6 +3,7 @@ use { crate::{ ancestor_hashes_service::AncestorHashesReplayUpdateSender, + banking_trace::BankingTracer, broadcast_stage::RetransmitSlotsSender, cache_block_meta_service::CacheBlockMetaSender, cluster_info_vote_listener::{ @@ -402,6 +403,7 @@ impl ReplayStage { log_messages_bytes_limit: Option, prioritization_fee_cache: Arc, dumped_slots_sender: DumpedSlotsSender, + banking_tracer: Arc, ) -> Result { let mut tower = if let Some(process_blockstore) = maybe_process_blockstore { let tower = process_blockstore.process_to_create_tower()?; @@ -943,6 +945,7 @@ impl ReplayStage { &mut progress, &retransmit_slots_sender, &mut skipped_slots_info, + &banking_tracer, has_new_vote_been_rooted, transaction_status_sender.is_some(), ); @@ -1654,6 +1657,7 @@ impl ReplayStage { progress_map: &mut ProgressMap, retransmit_slots_sender: &RetransmitSlotsSender, skipped_slots_info: &mut SkippedSlotsInfo, + banking_tracer: &Arc, has_new_vote_been_rooted: bool, track_transaction_indexes: bool, ) { @@ -1774,6 +1778,9 @@ impl ReplayStage { rpc_subscriptions, NewBankOptions { vote_only_bank }, ); + // make sure parent is frozen for finalized hashes via the above + // new()-ing of its child bank + banking_tracer.hash_event(parent.slot(), &parent.last_blockhash(), &parent.hash()); let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank); poh_recorder diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index 573da5eef1..8140efac7e 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -9,15 +9,14 @@ pub use solana_perf::sigverify::{ }; use { crate::{ - banking_stage::BankingPacketBatch, + banking_trace::{BankingPacketBatch, BankingPacketSender}, sigverify_stage::{SigVerifier, SigVerifyServiceError}, }, - crossbeam_channel::Sender, solana_perf::{cuda_runtime::PinnedVec, packet::PacketBatch, recycler::Recycler, sigverify}, solana_sdk::{packet::Packet, saturating_add_assign}, }; -#[derive(Debug, Default, Clone, PartialEq, Eq)] +#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct SigverifyTracerPacketStats { pub total_removed_before_sigverify_stage: usize, pub total_tracer_packets_received_in_sigverify_stage: usize, @@ -55,9 +54,8 @@ impl SigverifyTracerPacketStats { } } -#[derive(Clone)] pub struct TransactionSigVerifier { - packet_sender: Sender<::SendType>, + packet_sender: BankingPacketSender, tracer_packet_stats: SigverifyTracerPacketStats, recycler: Recycler, recycler_out: Recycler>, @@ -65,13 +63,13 @@ pub struct TransactionSigVerifier { } impl TransactionSigVerifier { - pub fn new_reject_non_vote(packet_sender: Sender<::SendType>) -> Self { + pub fn new_reject_non_vote(packet_sender: BankingPacketSender) -> Self { let mut new_self = Self::new(packet_sender); new_self.reject_non_vote = true; new_self } - pub fn new(packet_sender: Sender<::SendType>) -> Self { + pub fn new(packet_sender: BankingPacketSender) -> Self { init(); Self { packet_sender, @@ -128,8 +126,10 @@ impl SigVerifier for TransactionSigVerifier { packet_batches: Vec, ) -> Result<(), SigVerifyServiceError> { let tracer_packet_stats_to_send = std::mem::take(&mut self.tracer_packet_stats); - self.packet_sender - .send((packet_batches, Some(tracer_packet_stats_to_send)))?; + self.packet_sender.send(BankingPacketBatch::new(( + packet_batches, + Some(tracer_packet_stats_to_send), + )))?; Ok(()) } diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 8edadebe2f..b636a89dcb 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -234,7 +234,7 @@ impl SigVerifier for DisabledSigVerifier { impl SigVerifyStage { #[allow(clippy::new_ret_no_self)] - pub fn new( + pub fn new( packet_receiver: FindPacketSenderStakeReceiver, verifier: T, name: &'static str, @@ -402,7 +402,7 @@ impl SigVerifyStage { Ok(()) } - fn verifier_service( + fn verifier_service( packet_receiver: FindPacketSenderStakeReceiver, mut verifier: T, name: &'static str, @@ -443,7 +443,7 @@ impl SigVerifyStage { .unwrap() } - fn verifier_services( + fn verifier_services( packet_receiver: FindPacketSenderStakeReceiver, verifier: T, name: &'static str, @@ -460,7 +460,10 @@ impl SigVerifyStage { mod tests { use { super::*, - crate::{sigverify::TransactionSigVerifier, sigverify_stage::timing::duration_as_ms}, + crate::{ + banking_trace::BankingTracer, sigverify::TransactionSigVerifier, + sigverify_stage::timing::duration_as_ms, + }, crossbeam_channel::unbounded, solana_perf::{ packet::{to_packet_batches, Packet}, @@ -528,22 +531,30 @@ mod tests { } #[test] - fn test_sigverify_stage() { + fn test_sigverify_stage_with_same_tx() { + test_sigverify_stage(true) + } + + #[test] + fn test_sigverify_stage_without_same_tx() { + test_sigverify_stage(false) + } + + fn test_sigverify_stage(use_same_tx: bool) { solana_logger::setup(); trace!("start"); let (packet_s, packet_r) = unbounded(); - let (verified_s, verified_r) = unbounded(); + let (verified_s, verified_r) = BankingTracer::channel_for_test(); let verifier = TransactionSigVerifier::new(verified_s); let stage = SigVerifyStage::new(packet_r, verifier, "test"); - let use_same_tx = true; let now = Instant::now(); let packets_per_batch = 128; let total_packets = 1920; // This is important so that we don't discard any packets and fail asserts below about // `total_excess_tracer_packets` assert!(total_packets < MAX_SIGVERIFY_BATCH); - let mut batches = gen_batches(use_same_tx, packets_per_batch, total_packets); + let batches = gen_batches(use_same_tx, packets_per_batch, total_packets); trace!( "starting... generation took: {} ms batches: {}", duration_as_ms(&now.elapsed()), @@ -551,22 +562,20 @@ mod tests { ); let mut sent_len = 0; - for _ in 0..batches.len() { - if let Some(mut batch) = batches.pop() { - sent_len += batch.len(); - batch - .iter_mut() - .for_each(|packet| packet.meta_mut().flags |= PacketFlags::TRACER_PACKET); - assert_eq!(batch.len(), packets_per_batch); - packet_s.send(vec![batch]).unwrap(); - } + for mut batch in batches.into_iter() { + sent_len += batch.len(); + batch + .iter_mut() + .for_each(|packet| packet.meta_mut().flags |= PacketFlags::TRACER_PACKET); + assert_eq!(batch.len(), packets_per_batch); + packet_s.send(vec![batch]).unwrap(); } let mut received = 0; let mut total_tracer_packets_received_in_sigverify_stage = 0; trace!("sent: {}", sent_len); loop { - if let Ok((mut verifieds, tracer_packet_stats_option)) = verified_r.recv() { - let tracer_packet_stats = tracer_packet_stats_option.unwrap(); + if let Ok(message) = verified_r.recv() { + let (verifieds, tracer_packet_stats) = (&message.0, message.1.as_ref().unwrap()); total_tracer_packets_received_in_sigverify_stage += tracer_packet_stats.total_tracer_packets_received_in_sigverify_stage; assert_eq!( @@ -603,10 +612,7 @@ mod tests { ); } assert_eq!(tracer_packet_stats.total_excess_tracer_packets, 0); - while let Some(v) = verifieds.pop() { - received += v.len(); - batches.push(v); - } + received += verifieds.iter().map(|batch| batch.len()).sum::(); } if total_tracer_packets_received_in_sigverify_stage >= sent_len { diff --git a/core/src/tpu.rs b/core/src/tpu.rs index fd8030703f..fca209c7d8 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -4,6 +4,7 @@ use { crate::{ banking_stage::BankingStage, + banking_trace::{BankingTracer, TracerThread}, broadcast_stage::{BroadcastStage, BroadcastStageType, RetransmitSlotsReceiver}, cluster_info_vote_listener::{ ClusterInfoVoteListener, GossipDuplicateConfirmedSlotsSender, @@ -68,6 +69,7 @@ pub struct Tpu { find_packet_sender_stake_stage: FindPacketSenderStakeStage, vote_find_packet_sender_stake_stage: FindPacketSenderStakeStage, staked_nodes_updater_service: StakedNodesUpdaterService, + tracer_thread_hdl: TracerThread, } impl Tpu { @@ -98,6 +100,8 @@ impl Tpu { log_messages_bytes_limit: Option, staked_nodes: &Arc>, shared_staked_nodes_overrides: Arc>>, + banking_tracer: Arc, + tracer_thread_hdl: TracerThread, tpu_enable_udp: bool, ) -> Self { let TpuSockets { @@ -154,7 +158,7 @@ impl Tpu { "Vote", ); - let (non_vote_sender, non_vote_receiver) = unbounded(); + let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); let stats = Arc::new(StreamStats::default()); let (_, tpu_quic_t) = spawn_server( @@ -192,7 +196,7 @@ impl Tpu { SigVerifyStage::new(find_packet_sender_stake_receiver, verifier, "tpu-verifier") }; - let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); let vote_sigverify_stage = { let verifier = TransactionSigVerifier::new_reject_non_vote(tpu_vote_sender); @@ -203,7 +207,8 @@ impl Tpu { ) }; - let (gossip_vote_sender, gossip_vote_receiver) = unbounded(); + let (gossip_vote_sender, gossip_vote_receiver) = + banking_tracer.create_channel_gossip_vote(); let cluster_info_vote_listener = ClusterInfoVoteListener::new( exit.clone(), cluster_info.clone(), @@ -256,6 +261,7 @@ impl Tpu { find_packet_sender_stake_stage, vote_find_packet_sender_stake_stage, staked_nodes_updater_service, + tracer_thread_hdl, } } @@ -277,6 +283,14 @@ impl Tpu { result?; } let _ = broadcast_result?; + if let Some(tracer_thread_hdl) = self.tracer_thread_hdl { + if let Err(tracer_result) = tracer_thread_hdl.join()? { + error!( + "banking tracer thread returned error after successful thread join: {:?}", + tracer_result + ); + } + } Ok(()) } } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 5d44711604..d6dac79f8d 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -3,6 +3,7 @@ use { crate::{ + banking_trace::BankingTracer, broadcast_stage::RetransmitSlotsSender, cache_block_meta_service::CacheBlockMetaSender, cluster_info_vote_listener::{ @@ -135,6 +136,7 @@ impl Tvu { log_messages_bytes_limit: Option, connection_cache: &Arc, prioritization_fee_cache: &Arc, + banking_tracer: Arc, ) -> Result { let TvuSockets { repair: repair_socket, @@ -299,6 +301,7 @@ impl Tvu { log_messages_bytes_limit, prioritization_fee_cache.clone(), dumped_slots_sender, + banking_tracer, )?; let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { @@ -468,6 +471,7 @@ pub mod tests { None, &Arc::new(ConnectionCache::default()), &_ignored_prioritization_fee_cache, + BankingTracer::new_disabled(), ) .expect("assume success"); exit.store(true, Ordering::Relaxed); diff --git a/core/src/validator.rs b/core/src/validator.rs index 7c73f4c3e0..f52dd56df0 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -4,6 +4,7 @@ pub use solana_perf::report_target_features; use { crate::{ accounts_hash_verifier::AccountsHashVerifier, + banking_trace::{self, BankingTracer}, broadcast_stage::BroadcastStageType, cache_block_meta_service::{CacheBlockMetaSender, CacheBlockMetaService}, cluster_info_vote_listener::VoteTracker, @@ -176,6 +177,7 @@ pub struct ValidatorConfig { pub ledger_column_options: LedgerColumnOptions, pub runtime_config: RuntimeConfig, pub replay_slots_concurrently: bool, + pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit, } impl Default for ValidatorConfig { @@ -238,6 +240,7 @@ impl Default for ValidatorConfig { ledger_column_options: LedgerColumnOptions::default(), runtime_config: RuntimeConfig::default(), replay_slots_concurrently: false, + banking_trace_dir_byte_limit: 0, } } } @@ -932,6 +935,22 @@ impl Validator { exit.clone(), ); + let (banking_tracer, tracer_thread) = + BankingTracer::new((config.banking_trace_dir_byte_limit > 0).then_some(( + &blockstore.banking_trace_path(), + exit.clone(), + config.banking_trace_dir_byte_limit, + ))) + .map_err(|err| format!("{} [{:?}]", &err, &err))?; + if banking_tracer.is_enabled() { + info!( + "Enabled banking tracer (dir_byte_limit: {})", + config.banking_trace_dir_byte_limit + ); + } else { + info!("Disabled banking tracer"); + } + let (replay_vote_sender, replay_vote_receiver) = unbounded(); let tvu = Tvu::new( vote_account, @@ -981,6 +1000,7 @@ impl Validator { config.runtime_config.log_messages_bytes_limit, &connection_cache, &prioritization_fee_cache, + banking_tracer.clone(), )?; let tpu = Tpu::new( @@ -1016,6 +1036,8 @@ impl Validator { config.runtime_config.log_messages_bytes_limit, &staked_nodes, config.staked_nodes_overrides.clone(), + banking_tracer, + tracer_thread, tpu_enable_udp, ); diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index c0d59c8e01..3d08feeaf4 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -221,11 +221,14 @@ impl Blockstore { self.db } - /// The path to the ledger store pub fn ledger_path(&self) -> &PathBuf { &self.ledger_path } + pub fn banking_trace_path(&self) -> PathBuf { + self.ledger_path.join("banking_trace") + } + /// Opens a Ledger in directory, provides "infinite" window of shreds pub fn open(ledger_path: &Path) -> Result { Self::do_open(ledger_path, BlockstoreOptions::default()) diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index a2b32fec36..0790d4f2a6 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -64,6 +64,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { ledger_column_options: config.ledger_column_options.clone(), runtime_config: config.runtime_config.clone(), replay_slots_concurrently: config.replay_slots_concurrently, + banking_trace_dir_byte_limit: config.banking_trace_dir_byte_limit, } } diff --git a/perf/src/cuda_runtime.rs b/perf/src/cuda_runtime.rs index 6c802caa9e..efc7545c6e 100644 --- a/perf/src/cuda_runtime.rs +++ b/perf/src/cuda_runtime.rs @@ -12,6 +12,7 @@ use { }, rand::{seq::SliceRandom, Rng}, rayon::prelude::*, + serde::{Deserialize, Serialize}, std::{ ops::{Index, IndexMut}, os::raw::c_int, @@ -53,11 +54,12 @@ fn unpin(_mem: *mut T) { // A vector wrapper where the underlying memory can be // page-pinned. Controlled by flags in case user only wants // to pin in certain circumstances. -#[derive(Debug, Default)] +#[derive(Debug, Default, Serialize, Deserialize)] pub struct PinnedVec { x: Vec, pinned: bool, pinnable: bool, + #[serde(skip)] recycler: Weak>>, } diff --git a/perf/src/packet.rs b/perf/src/packet.rs index 4e6b64fe15..2cb07b2b6f 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -4,7 +4,7 @@ use { crate::{cuda_runtime::PinnedVec, recycler::Recycler}, bincode::config::Options, rayon::prelude::{IntoParallelIterator, IntoParallelRefIterator, IntoParallelRefMutIterator}, - serde::{de::DeserializeOwned, Serialize}, + serde::{de::DeserializeOwned, Deserialize, Serialize}, std::{ io::Read, net::SocketAddr, @@ -18,7 +18,7 @@ pub const NUM_PACKETS: usize = 1024 * 8; pub const PACKETS_PER_BATCH: usize = 64; pub const NUM_RCVMMSGS: usize = 64; -#[derive(Debug, Default, Clone)] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct PacketBatch { packets: PinnedVec, } diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index a28285f7e8..251e366c0d 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -745,9 +745,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.22" +version = "0.4.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1" +checksum = "16b0a3d9ed01224b22057780a37bb8c5dbfe1be8ba48678e7bf57ec4b385411f" dependencies = [ "iana-time-zone", "js-sys", @@ -1034,6 +1034,41 @@ dependencies = [ "zeroize", ] +[[package]] +name = "darling" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0dd3cd20dc6b5a876612a6e5accfe7f3dd883db6d07acfbf14c128f61550dfa" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a784d2ccaf7c98501746bf0be29b2022ba41fd62a2e622af997a03e9f972859f" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2 1.0.41", + "quote 1.0.18", + "strsim 0.10.0", + "syn 1.0.98", +] + +[[package]] +name = "darling_macro" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7618812407e9402654622dd402b0a89dff9ba93badd6540781526117b92aab7e" +dependencies = [ + "darling_core", + "quote 1.0.18", + "syn 1.0.98", +] + [[package]] name = "dashmap" version = "4.0.2" @@ -1968,6 +2003,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.1.5" @@ -3694,6 +3735,15 @@ dependencies = [ "librocksdb-sys", ] +[[package]] +name = "rolling-file" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8395b4f860856b740f20a296ea2cd4d823e81a2658cf05ef61be22916026a906" +dependencies = [ + "chrono", +] + [[package]] name = "rpassword" version = "7.0.0" @@ -3988,6 +4038,28 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30d904179146de381af4c93d3af6ca4984b3152db687dacb9c3c35e86f39809c" +dependencies = [ + "serde", + "serde_with_macros", +] + +[[package]] +name = "serde_with_macros" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1966009f3c05f095697c537312f5415d1e3ed31ce0a56942bac4c771c5c335e" +dependencies = [ + "darling", + "proc-macro2 1.0.41", + "quote 1.0.18", + "syn 1.0.98", +] + [[package]] name = "serde_yaml" version = "0.9.13" @@ -4473,6 +4545,7 @@ dependencies = [ "rand 0.7.3", "rand_chacha 0.2.2", "rayon", + "rolling-file", "rustc_version 0.4.0", "serde", "serde_derive", @@ -5775,6 +5848,7 @@ dependencies = [ "serde_bytes", "serde_derive", "serde_json", + "serde_with", "sha2 0.10.5", "sha3 0.10.4", "solana-frozen-abi 1.15.0", diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index 290632634a..61ae764300 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -70,6 +70,7 @@ serde = "1.0.144" serde_bytes = "0.11" serde_derive = "1.0.103" serde_json = { version = "1.0.83", optional = true } +serde_with = { version = "2.2.0", default-features = false, features = ["macros"] } sha2 = "0.10.5" sha3 = { version = "0.10.4", optional = true } solana-frozen-abi = { path = "../frozen-abi", version = "=1.15.0" } diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index 3301b7d42e..df0c6bd4a0 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -3,7 +3,8 @@ use { bincode::{Options, Result}, bitflags::bitflags, - serde::Serialize, + serde::{Deserialize, Serialize}, + serde_with::{serde_as, Bytes}, std::{ fmt, io, net::{IpAddr, Ipv4Addr, SocketAddr}, @@ -21,6 +22,7 @@ pub const PACKET_DATA_SIZE: usize = 1280 - 40 - 8; bitflags! { #[repr(C)] + #[derive(Serialize, Deserialize)] pub struct PacketFlags: u8 { const DISCARD = 0b0000_0001; const FORWARDED = 0b0000_0010; @@ -30,7 +32,7 @@ bitflags! { } } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[repr(C)] pub struct Meta { pub size: usize, @@ -40,11 +42,38 @@ pub struct Meta { pub sender_stake: u64, } -#[derive(Clone, Eq)] +// serde_as is used as a work around because array isn't supported by serde +// (and serde_bytes). +// +// the root cause is of a historical special handling for [T; 0] in rust's +// `Default` and supposedly mirrored serde's `Serialize` (macro) impls, +// pre-dating stabilized const generics, meaning it'll take long time...: +// https://github.com/rust-lang/rust/issues/61415 +// https://github.com/rust-lang/rust/issues/88744#issuecomment-1138678928 +// +// Due to the nature of the root cause, the current situation is complicated. +// All in all, the serde_as solution is chosen for good perf and low maintenance +// need at the cost of another crate dependency.. +// +// For details, please refer to the below various links... +// +// relevant merged/published pr for this serde_as functionality used here: +// https://github.com/jonasbb/serde_with/pull/277 +// open pr at serde_bytes: +// https://github.com/serde-rs/bytes/pull/28 +// open issue at serde: +// https://github.com/serde-rs/serde/issues/1937 +// closed pr at serde (due to the above mentioned [N; 0] issue): +// https://github.com/serde-rs/serde/pull/1860 +// ryoqun's dirty experiments: +// https://github.com/ryoqun/serde-array-comparisons +#[serde_as] +#[derive(Clone, Eq, Serialize, Deserialize)] #[repr(C)] pub struct Packet { // Bytes past Packet.meta.size are not valid to read from. // Use Packet.data(index) to read from the buffer. + #[serde_as(as = "Bytes")] buffer: [u8; PACKET_DATA_SIZE], meta: Meta, } diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 1038099377..2f5be98a5d 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -10,6 +10,7 @@ use { }, keypair::SKIP_SEED_PHRASE_VALIDATION_ARG, }, + solana_core::banking_trace::{DirByteLimit, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT}, solana_faucet::faucet::{self, FAUCET_PORT}, solana_net_utils::{MINIMUM_VALIDATOR_PORT_RANGE_WIDTH, VALIDATOR_PORT_RANGE}, solana_rpc::{rpc::MAX_REQUEST_BODY_SIZE, rpc_pubsub_service::PubSubConfig}, @@ -1302,6 +1303,24 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .long("replay-slots-concurrently") .help("Allow concurrent replay of slots on different forks") ) + .arg( + Arg::with_name("banking_trace_dir_byte_limit") + // expose friendly alternative name to cli than internal + // implementation-oriented one + .long("enable-banking-trace") + .value_name("BYTES") + .validator(is_parsable::) + .takes_value(true) + // Firstly, zero limit value causes tracer to be disabled + // altogether, intuitively. On the other hand, this non-zero + // default doesn't enable banking tracer unless this flag is + // explicitly given, similar to --limit-ledger-size. + // see configure_banking_trace_dir_byte_limit() for this. + .default_value(&default_args.banking_trace_dir_byte_limit) + .help("Write trace files for simulate-leader-blocks, retaining \ + up to the default or specified total bytes in the \ + ledger") + ) .args(&get_deprecated_arguments()) .after_help("The default subcommand is run") .subcommand( @@ -1683,6 +1702,8 @@ pub struct DefaultArgs { // Wait subcommand pub wait_for_restart_window_min_idle_time: String, pub wait_for_restart_window_max_delinquent_stake: String, + + pub banking_trace_dir_byte_limit: String, } impl DefaultArgs { @@ -1761,6 +1782,7 @@ impl DefaultArgs { exit_max_delinquent_stake: "5".to_string(), wait_for_restart_window_min_idle_time: "10".to_string(), wait_for_restart_window_max_delinquent_stake: "5".to_string(), + banking_trace_dir_byte_limit: BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT.to_string(), } } } diff --git a/validator/src/main.rs b/validator/src/main.rs index 67f39206a8..f031caa697 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -8,6 +8,7 @@ use { rand::{seq::SliceRandom, thread_rng}, solana_clap_utils::input_parsers::{keypair_of, keypairs_of, pubkey_of, value_of}, solana_core::{ + banking_trace::DISABLED_BAKING_TRACE_DIR, ledger_cleanup_service::{DEFAULT_MAX_LEDGER_SHREDS, DEFAULT_MIN_MAX_LEDGER_SHREDS}, system_monitor_service::SystemMonitorService, tower_storage, @@ -427,6 +428,21 @@ fn get_cluster_shred_version(entrypoints: &[SocketAddr]) -> Option { None } +fn configure_banking_trace_dir_byte_limit( + validator_config: &mut ValidatorConfig, + matches: &ArgMatches, +) { + validator_config.banking_trace_dir_byte_limit = + if matches.occurrences_of("banking_trace_dir_byte_limit") == 0 { + // disable with no explicit flag; then, this effectively becomes `opt-in` even if we're + // specifying a default value in clap configuration. + DISABLED_BAKING_TRACE_DIR + } else { + // BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT or user-supplied override value + value_t_or_exit!(matches, "banking_trace_dir_byte_limit", u64) + }; +} + pub fn main() { let default_args = DefaultArgs::new(); let solana_version = solana_version::version!(); @@ -1427,6 +1443,8 @@ pub fn main() { validator_config.max_ledger_shreds = Some(limit_ledger_size); } + configure_banking_trace_dir_byte_limit(&mut validator_config, &matches); + validator_config.ledger_column_options = LedgerColumnOptions { compression_type: match matches.value_of("rocksdb_ledger_compression") { None => BlockstoreCompressionType::default(),