From d343713f6166be2dcbf22aaffc5f1b4930db115d Mon Sep 17 00:00:00 2001 From: anatoly yakovenko Date: Wed, 19 Jan 2022 13:58:20 -0800 Subject: [PATCH] Optimize packet dedup (#22571) * Use bloom filter to dedup packets * dedup first * Update bloom/src/bloom.rs Co-authored-by: Trent Nelson * Update core/src/sigverify_stage.rs Co-authored-by: Trent Nelson * Update core/src/sigverify_stage.rs Co-authored-by: Trent Nelson * Update core/src/sigverify_stage.rs Co-authored-by: Trent Nelson * fixup * fixup * fixup Co-authored-by: Trent Nelson --- Cargo.lock | 23 ++++++ Cargo.toml | 1 + banking-bench/src/main.rs | 5 +- bloom/Cargo.toml | 32 ++++++++ {runtime => bloom}/benches/bloom.rs | 2 +- bloom/build.rs | 1 + {runtime => bloom}/src/bloom.rs | 10 +-- bloom/src/lib.rs | 5 ++ core/Cargo.toml | 1 + core/benches/banking_stage.rs | 4 - core/src/banking_stage.rs | 46 +----------- core/src/lib.rs | 1 - core/src/packet_deduper.rs | 63 ---------------- core/src/sigverify_stage.rs | 111 ++++++++++++++++++++++------ core/src/tpu.rs | 2 - gossip/Cargo.toml | 1 + gossip/src/cluster_info.rs | 2 +- gossip/src/crds_gossip_pull.rs | 2 +- gossip/src/crds_gossip_push.rs | 2 +- perf/Cargo.toml | 3 + perf/benches/dedup.rs | 45 +++++++++++ perf/src/sigverify.rs | 63 +++++++++++++++- programs/bpf/Cargo.lock | 21 ++++++ runtime/Cargo.toml | 1 + runtime/src/lib.rs | 1 - 25 files changed, 296 insertions(+), 152 deletions(-) create mode 100644 bloom/Cargo.toml rename {runtime => bloom}/benches/bloom.rs (98%) create mode 120000 bloom/build.rs rename {runtime => bloom}/src/bloom.rs (97%) create mode 100644 bloom/src/lib.rs delete mode 100644 core/src/packet_deduper.rs create mode 100644 perf/benches/dedup.rs diff --git a/Cargo.lock b/Cargo.lock index db8c0de5c..66cfc94ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4437,6 +4437,23 @@ dependencies = [ "solana-version", ] +[[package]] +name = "solana-bloom" +version = "1.10.0" +dependencies = [ + "bv", + "fnv", + "log 0.4.14", + "rand 0.7.3", + "rayon", + "rustc_version 0.4.0", + "serde", + "serde_derive", + "solana-frozen-abi 1.10.0", + "solana-frozen-abi-macro 1.10.0", + "solana-sdk", +] + [[package]] name = "solana-bpf-loader-program" version = "1.10.0" @@ -4707,6 +4724,7 @@ dependencies = [ "serial_test", "solana-accountsdb-plugin-manager", "solana-address-lookup-table-program", + "solana-bloom", "solana-client", "solana-entry", "solana-frozen-abi 1.10.0", @@ -4930,6 +4948,7 @@ dependencies = [ "serde_bytes", "serde_derive", "serial_test", + "solana-bloom", "solana-clap-utils", "solana-client", "solana-entry", @@ -5241,10 +5260,12 @@ name = "solana-perf" version = "1.10.0" dependencies = [ "bincode", + "bv", "caps", "curve25519-dalek 3.2.0", "dlopen", "dlopen_derive", + "fnv", "lazy_static", "libc", "log 0.4.14", @@ -5253,6 +5274,7 @@ dependencies = [ "rand 0.7.3", "rayon", "serde", + "solana-bloom", "solana-logger 1.10.0", "solana-metrics", "solana-rayon-threadlimit", @@ -5609,6 +5631,7 @@ dependencies = [ "serde", "serde_derive", "solana-address-lookup-table-program", + "solana-bloom", "solana-bucket-map", "solana-compute-budget-program", "solana-config-program", diff --git a/Cargo.toml b/Cargo.toml index 513470201..f1ac04fb5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "banks-interface", "banks-server", "bucket_map", + "bloom", "clap-utils", "cli-config", "cli-output", diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 24181c666..2ba1fe46d 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -5,7 +5,7 @@ use { log::*, rand::{thread_rng, Rng}, rayon::prelude::*, - solana_core::{banking_stage::BankingStage, packet_deduper::PacketDeduper}, + solana_core::banking_stage::BankingStage, solana_gossip::cluster_info::{ClusterInfo, Node}, solana_ledger::{ blockstore::Blockstore, @@ -226,7 +226,6 @@ fn main() { SocketAddrSpace::Unspecified, ); let cluster_info = Arc::new(cluster_info); - let packet_deduper = PacketDeduper::default(); let banking_stage = BankingStage::new( &cluster_info, &poh_recorder, @@ -236,7 +235,6 @@ fn main() { None, replay_vote_sender, Arc::new(RwLock::new(CostModel::default())), - packet_deduper.clone(), ); poh_recorder.lock().unwrap().set_bank(&bank); @@ -351,7 +349,6 @@ fn main() { // in this chunk, but since we rotate between CHUNKS then // we should clear them by the time we come around again to re-use that chunk. bank.clear_signatures(); - packet_deduper.reset(); total_us += duration_as_us(&now.elapsed()); debug!( "time: {} us checked: {} sent: {}", diff --git a/bloom/Cargo.toml b/bloom/Cargo.toml new file mode 100644 index 000000000..0d17d2224 --- /dev/null +++ b/bloom/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "solana-bloom" +version = "1.10.0" +description = "Solana bloom filter" +authors = ["Solana Maintainers "] +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +documentation = "https://docs.rs/solana-bloom" +edition = "2021" + +[dependencies] +bv = { version = "0.11.1", features = ["serde"] } +fnv = "1.0.7" +rand = "0.7.0" +serde = { version = "1.0.133", features = ["rc"] } +rayon = "1.5.1" +serde_derive = "1.0.103" +solana-frozen-abi = { path = "../frozen-abi", version = "=1.10.0" } +solana-frozen-abi-macro = { path = "../frozen-abi/macro", version = "=1.10.0" } +solana-sdk = { path = "../sdk", version = "=1.10.0" } +log = "0.4.14" + +[lib] +crate-type = ["lib"] +name = "solana_bloom" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] + +[build-dependencies] +rustc_version = "0.4" diff --git a/runtime/benches/bloom.rs b/bloom/benches/bloom.rs similarity index 98% rename from runtime/benches/bloom.rs rename to bloom/benches/bloom.rs index ca2ed9de7..925c18fea 100644 --- a/runtime/benches/bloom.rs +++ b/bloom/benches/bloom.rs @@ -5,7 +5,7 @@ use { bv::BitVec, fnv::FnvHasher, rand::Rng, - solana_runtime::bloom::{AtomicBloom, Bloom, BloomHashIndex}, + solana_bloom::bloom::{AtomicBloom, Bloom, BloomHashIndex}, solana_sdk::{ hash::{hash, Hash}, signature::Signature, diff --git a/bloom/build.rs b/bloom/build.rs new file mode 120000 index 000000000..ae66c237c --- /dev/null +++ b/bloom/build.rs @@ -0,0 +1 @@ +../frozen-abi/build.rs \ No newline at end of file diff --git a/runtime/src/bloom.rs b/bloom/src/bloom.rs similarity index 97% rename from runtime/src/bloom.rs rename to bloom/src/bloom.rs index c3bbf1f4b..152c387e1 100644 --- a/runtime/src/bloom.rs +++ b/bloom/src/bloom.rs @@ -101,7 +101,7 @@ impl Bloom { } } fn pos(&self, key: &T, k: u64) -> u64 { - key.hash_at_index(k) % self.bits.len() + key.hash_at_index(k).wrapping_rem(self.bits.len()) } pub fn clear(&mut self) { self.bits = BitVec::new_fill(false, self.bits.len()); @@ -111,7 +111,7 @@ impl Bloom { for k in &self.keys { let pos = self.pos(key, *k); if !self.bits.get(pos) { - self.num_bits_set += 1; + self.num_bits_set = self.num_bits_set.saturating_add(1); self.bits.set(pos, true); } } @@ -164,13 +164,13 @@ impl From> for AtomicBloom { impl AtomicBloom { fn pos(&self, key: &T, hash_index: u64) -> (usize, u64) { - let pos = key.hash_at_index(hash_index) % self.num_bits; + let pos = key.hash_at_index(hash_index).wrapping_rem(self.num_bits); // Divide by 64 to figure out which of the // AtomicU64 bit chunks we need to modify. - let index = pos >> 6; + let index = pos.wrapping_shr(6); // (pos & 63) is equivalent to mod 64 so that we can find // the index of the bit within the AtomicU64 to modify. - let mask = 1u64 << (pos & 63); + let mask = 1u64.wrapping_shl(u32::try_from(pos & 63).unwrap()); (index as usize, mask) } diff --git a/bloom/src/lib.rs b/bloom/src/lib.rs new file mode 100644 index 000000000..9a78bdcd9 --- /dev/null +++ b/bloom/src/lib.rs @@ -0,0 +1,5 @@ +#![cfg_attr(RUSTC_WITH_SPECIALIZATION, feature(min_specialization))] +pub mod bloom; + +#[macro_use] +extern crate solana_frozen_abi_macro; diff --git a/core/Cargo.toml b/core/Cargo.toml index 4c732bd25..1d9fb5bd0 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -34,6 +34,7 @@ retain_mut = "0.1.5" serde = "1.0.133" serde_derive = "1.0.103" solana-address-lookup-table-program = { path = "../programs/address-lookup-table", version = "=1.10.0" } +solana-bloom = { path = "../bloom", version = "=1.10.0" } solana-accountsdb-plugin-manager = { path = "../accountsdb-plugin-manager", version = "=1.10.0" } solana-client = { path = "../client", version = "=1.10.0" } solana-entry = { path = "../entry", version = "=1.10.0" } diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index fa398c250..50ec5cbef 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -10,7 +10,6 @@ use { rayon::prelude::*, solana_core::{ banking_stage::{BankingStage, BankingStageStats}, - packet_deduper::PacketDeduper, qos_service::QosService, }, solana_entry::entry::{next_hash, Entry}, @@ -222,7 +221,6 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { ); let cluster_info = Arc::new(cluster_info); let (s, _r) = unbounded(); - let packet_deduper = PacketDeduper::default(); let _banking_stage = BankingStage::new( &cluster_info, &poh_recorder, @@ -232,7 +230,6 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { None, s, Arc::new(RwLock::new(CostModel::default())), - packet_deduper.clone(), ); poh_recorder.lock().unwrap().set_bank(&bank); @@ -267,7 +264,6 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { // in this chunk, but since we rotate between CHUNKS then // we should clear them by the time we come around again to re-use that chunk. bank.clear_signatures(); - packet_deduper.reset(); trace!( "time: {} checked: {} sent: {}", duration_as_us(&now.elapsed()), diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index c2e439c17..c5180a157 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -2,7 +2,7 @@ //! to contruct a software pipeline. The stage uses all available CPU cores and //! can do its processing in parallel with signature verification on the GPU. use { - crate::{packet_deduper::PacketDeduper, qos_service::QosService}, + crate::qos_service::QosService, crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}, histogram::Histogram, itertools::Itertools, @@ -328,7 +328,6 @@ impl BankingStage { transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, cost_model: Arc>, - packet_deduper: PacketDeduper, ) -> Self { Self::new_num_threads( cluster_info, @@ -340,7 +339,6 @@ impl BankingStage { transaction_status_sender, gossip_vote_sender, cost_model, - packet_deduper, ) } @@ -355,7 +353,6 @@ impl BankingStage { transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, cost_model: Arc>, - packet_deduper: PacketDeduper, ) -> Self { let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH); // Single thread to generate entries from many banks. @@ -384,7 +381,6 @@ impl BankingStage { let mut recv_start = Instant::now(); let transaction_status_sender = transaction_status_sender.clone(); let gossip_vote_sender = gossip_vote_sender.clone(); - let packet_deduper = packet_deduper.clone(); let data_budget = data_budget.clone(); let cost_model = cost_model.clone(); Builder::new() @@ -400,7 +396,6 @@ impl BankingStage { batch_limit, transaction_status_sender, gossip_vote_sender, - &packet_deduper, &data_budget, cost_model, ); @@ -755,7 +750,6 @@ impl BankingStage { batch_limit: usize, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, - packet_deduper: &PacketDeduper, data_budget: &DataBudget, cost_model: Arc>, ) { @@ -808,7 +802,6 @@ impl BankingStage { batch_limit, &mut buffered_packet_batches, &mut banking_stage_stats, - packet_deduper, ) { Ok(()) | Err(RecvTimeoutError::Timeout) => (), Err(RecvTimeoutError::Disconnected) => break, @@ -1413,7 +1406,6 @@ impl BankingStage { batch_limit: usize, buffered_packet_batches: &mut UnprocessedPacketBatches, banking_stage_stats: &mut BankingStageStats, - packet_deduper: &PacketDeduper, ) -> Result<(), RecvTimeoutError> { let mut recv_time = Measure::start("receive_and_buffer_packets_recv"); let packet_batches = verified_receiver.recv_timeout(recv_timeout)?; @@ -1444,7 +1436,6 @@ impl BankingStage { &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, - packet_deduper, banking_stage_stats, ); } @@ -1490,15 +1481,13 @@ impl BankingStage { fn push_unprocessed( unprocessed_packet_batches: &mut UnprocessedPacketBatches, packet_batch: PacketBatch, - mut packet_indexes: Vec, + packet_indexes: Vec, dropped_packet_batches_count: &mut usize, dropped_packets_count: &mut usize, newly_buffered_packets_count: &mut usize, batch_limit: usize, - packet_deduper: &PacketDeduper, banking_stage_stats: &mut BankingStageStats, ) { - packet_deduper.dedupe_packets(&packet_batch, &mut packet_indexes, banking_stage_stats); if Self::packet_has_more_unprocessed_transactions(&packet_indexes) { if unprocessed_packet_batches.len() >= batch_limit { *dropped_packet_batches_count += 1; @@ -1658,7 +1647,6 @@ mod tests { None, gossip_vote_sender, Arc::new(RwLock::new(CostModel::default())), - PacketDeduper::default(), ); drop(verified_sender); drop(gossip_verified_vote_sender); @@ -1708,7 +1696,6 @@ mod tests { None, gossip_vote_sender, Arc::new(RwLock::new(CostModel::default())), - PacketDeduper::default(), ); trace!("sending bank"); drop(verified_sender); @@ -1784,7 +1771,6 @@ mod tests { None, gossip_vote_sender, Arc::new(RwLock::new(CostModel::default())), - PacketDeduper::default(), ); // fund another account so we can send 2 good transactions in a single batch. @@ -1936,7 +1922,6 @@ mod tests { None, gossip_vote_sender, Arc::new(RwLock::new(CostModel::default())), - PacketDeduper::default(), ); // wait for banking_stage to eat the packets @@ -3237,7 +3222,6 @@ mod tests { let new_packet_batch = PacketBatch::new(vec![Packet::default()]); let packet_indexes = vec![]; - let packet_deduper = PacketDeduper::default(); let mut dropped_packet_batches_count = 0; let mut dropped_packets_count = 0; let mut newly_buffered_packets_count = 0; @@ -3252,7 +3236,6 @@ mod tests { &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, - &packet_deduper, &mut banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 1); @@ -3271,7 +3254,6 @@ mod tests { &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, - &packet_deduper, &mut banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); @@ -3287,27 +3269,6 @@ mod tests { ) .unwrap()]); assert_eq!(unprocessed_packets.len(), batch_limit); - BankingStage::push_unprocessed( - &mut unprocessed_packets, - new_packet_batch.clone(), - packet_indexes.clone(), - &mut dropped_packet_batches_count, - &mut dropped_packets_count, - &mut newly_buffered_packets_count, - batch_limit, - &packet_deduper, - &mut banking_stage_stats, - ); - assert_eq!(unprocessed_packets.len(), 2); - assert_eq!( - unprocessed_packets[1].0.packets[0], - new_packet_batch.packets[0] - ); - assert_eq!(dropped_packet_batches_count, 1); - assert_eq!(dropped_packets_count, 2); - assert_eq!(newly_buffered_packets_count, 2); - - // Check duplicates are dropped (newly buffered shouldn't change) BankingStage::push_unprocessed( &mut unprocessed_packets, new_packet_batch.clone(), @@ -3315,8 +3276,7 @@ mod tests { &mut dropped_packet_batches_count, &mut dropped_packets_count, &mut newly_buffered_packets_count, - 3, - &packet_deduper, + batch_limit, &mut banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); diff --git a/core/src/lib.rs b/core/src/lib.rs index 7db3c2591..ec1be4e61 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -31,7 +31,6 @@ pub mod latest_validator_votes_for_frozen_banks; pub mod ledger_cleanup_service; pub mod optimistic_confirmation_verifier; pub mod outstanding_requests; -pub mod packet_deduper; pub mod packet_hasher; pub mod progress_map; pub mod qos_service; diff --git a/core/src/packet_deduper.rs b/core/src/packet_deduper.rs deleted file mode 100644 index 24c3aea10..000000000 --- a/core/src/packet_deduper.rs +++ /dev/null @@ -1,63 +0,0 @@ -use { - crate::{banking_stage::BankingStageStats, packet_hasher::PacketHasher}, - lru::LruCache, - solana_measure::measure::Measure, - solana_perf::packet::PacketBatch, - std::{ - ops::DerefMut, - sync::{atomic::Ordering, Arc, Mutex}, - }, -}; - -const DEFAULT_LRU_SIZE: usize = 200_000; - -#[derive(Clone)] -pub struct PacketDeduper(Arc, PacketHasher)>>); - -impl Default for PacketDeduper { - fn default() -> Self { - Self(Arc::new(Mutex::new(( - LruCache::new(DEFAULT_LRU_SIZE), - PacketHasher::default(), - )))) - } -} - -impl PacketDeduper { - pub fn dedupe_packets( - &self, - packet_batch: &PacketBatch, - packet_indexes: &mut Vec, - banking_stage_stats: &BankingStageStats, - ) { - let original_packets_count = packet_indexes.len(); - let mut packet_duplicate_check_time = Measure::start("packet_duplicate_check"); - let mut duplicates = self.0.lock().unwrap(); - let (cache, hasher) = duplicates.deref_mut(); - packet_indexes.retain(|i| { - let packet_hash = hasher.hash_packet(&packet_batch.packets[*i]); - match cache.get_mut(&packet_hash) { - Some(_hash) => false, - None => { - cache.put(packet_hash, ()); - true - } - } - }); - packet_duplicate_check_time.stop(); - banking_stage_stats - .packet_duplicate_check_elapsed - .fetch_add(packet_duplicate_check_time.as_us(), Ordering::Relaxed); - banking_stage_stats - .dropped_duplicated_packets_count - .fetch_add( - original_packets_count.saturating_sub(packet_indexes.len()), - Ordering::Relaxed, - ); - } - - pub fn reset(&self) { - let mut duplicates = self.0.lock().unwrap(); - duplicates.0.clear(); - } -} diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 885bb7c8d..1d3101da4 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -7,10 +7,13 @@ use { crate::sigverify, + core::time::Duration, crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender}, itertools::Itertools, + solana_bloom::bloom::{AtomicBloom, Bloom}, solana_measure::measure::Measure, solana_perf::packet::PacketBatch, + solana_perf::sigverify::dedup_packets, solana_sdk::timing, solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError}, std::{ @@ -49,10 +52,13 @@ struct SigVerifierStats { recv_batches_us_hist: histogram::Histogram, // time to call recv_batch verify_batches_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch discard_packets_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch + dedup_packets_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch batches_hist: histogram::Histogram, // number of packet batches per verify call packets_hist: histogram::Histogram, // number of packets per verify call total_batches: usize, total_packets: usize, + total_dedup: usize, + total_excess_fail: usize, } impl SigVerifierStats { @@ -121,6 +127,26 @@ impl SigVerifierStats { self.discard_packets_pp_us_hist.mean().unwrap_or(0), i64 ), + ( + "dedup_packets_pp_us_90pct", + self.dedup_packets_pp_us_hist.percentile(90.0).unwrap_or(0), + i64 + ), + ( + "dedup_packets_pp_us_min", + self.dedup_packets_pp_us_hist.minimum().unwrap_or(0), + i64 + ), + ( + "dedup_packets_pp_us_max", + self.dedup_packets_pp_us_hist.maximum().unwrap_or(0), + i64 + ), + ( + "dedup_packets_pp_us_mean", + self.dedup_packets_pp_us_hist.mean().unwrap_or(0), + i64 + ), ( "batches_90pct", self.batches_hist.percentile(90.0).unwrap_or(0), @@ -139,6 +165,8 @@ impl SigVerifierStats { ("packets_mean", self.packets_hist.mean().unwrap_or(0), i64), ("total_batches", self.total_batches, i64), ("total_packets", self.total_packets, i64), + ("total_dedup", self.total_dedup, i64), + ("total_excess_fail", self.total_excess_fail, i64), ); } } @@ -186,6 +214,7 @@ impl SigVerifyStage { } fn verifier( + bloom: &AtomicBloom<&[u8]>, recvr: &PacketBatchReceiver, sendr: &Sender>, verifier: &T, @@ -199,13 +228,22 @@ impl SigVerifyStage { timing::timestamp(), num_packets, ); + + let mut dedup_time = Measure::start("sigverify_dedup_time"); + let dedup_fail = dedup_packets(bloom, &mut batches) as usize; + dedup_time.stop(); + let valid_packets = num_packets.saturating_sub(dedup_fail); + let mut discard_time = Measure::start("sigverify_discard_time"); - if num_packets > MAX_SIGVERIFY_BATCH { - Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH); - } + if valid_packets > MAX_SIGVERIFY_BATCH { + Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH) + }; + let excess_fail = valid_packets.saturating_sub(MAX_SIGVERIFY_BATCH); discard_time.stop(); + let mut verify_batch_time = Measure::start("sigverify_batch_time"); - sendr.send(verifier.verify_batches(batches))?; + let batches = verifier.verify_batches(batches); + sendr.send(batches)?; verify_batch_time.stop(); debug!( @@ -229,10 +267,16 @@ impl SigVerifyStage { .discard_packets_pp_us_hist .increment(discard_time.as_us() / (num_packets as u64)) .unwrap(); + stats + .dedup_packets_pp_us_hist + .increment(dedup_time.as_us() / (num_packets as u64)) + .unwrap(); stats.batches_hist.increment(batches_len as u64).unwrap(); stats.packets_hist.increment(num_packets as u64).unwrap(); stats.total_batches += batches_len; stats.total_packets += num_packets; + stats.total_dedup += dedup_fail; + stats.total_excess_fail += excess_fail; Ok(()) } @@ -245,29 +289,48 @@ impl SigVerifyStage { let verifier = verifier.clone(); let mut stats = SigVerifierStats::default(); let mut last_print = Instant::now(); + const MAX_BLOOM_AGE: Duration = Duration::from_millis(2_000); + const MAX_BLOOM_ITEMS: usize = 1_000_000; + const MAX_BLOOM_FAIL: f64 = 0.0001; + const MAX_BLOOM_BITS: usize = 8 << 22; Builder::new() .name("solana-verifier".to_string()) - .spawn(move || loop { - if let Err(e) = - Self::verifier(&packet_receiver, &verified_sender, &verifier, &mut stats) - { - match e { - SigVerifyServiceError::Streamer(StreamerError::RecvTimeout( - RecvTimeoutError::Disconnected, - )) => break, - SigVerifyServiceError::Streamer(StreamerError::RecvTimeout( - RecvTimeoutError::Timeout, - )) => (), - SigVerifyServiceError::Send(_) => { - break; - } - _ => error!("{:?}", e), + .spawn(move || { + let mut bloom = + Bloom::random(MAX_BLOOM_ITEMS, MAX_BLOOM_FAIL, MAX_BLOOM_BITS).into(); + let mut bloom_age = Instant::now(); + loop { + let now = Instant::now(); + if now.duration_since(bloom_age) > MAX_BLOOM_AGE { + bloom = + Bloom::random(MAX_BLOOM_ITEMS, MAX_BLOOM_FAIL, MAX_BLOOM_BITS).into(); + bloom_age = now; + } + if let Err(e) = Self::verifier( + &bloom, + &packet_receiver, + &verified_sender, + &verifier, + &mut stats, + ) { + match e { + SigVerifyServiceError::Streamer(StreamerError::RecvTimeout( + RecvTimeoutError::Disconnected, + )) => break, + SigVerifyServiceError::Streamer(StreamerError::RecvTimeout( + RecvTimeoutError::Timeout, + )) => (), + SigVerifyServiceError::Send(_) => { + break; + } + _ => error!("{:?}", e), + } + } + if last_print.elapsed().as_secs() > 2 { + stats.report(); + stats = SigVerifierStats::default(); + last_print = Instant::now(); } - } - if last_print.elapsed().as_secs() > 2 { - stats.report(); - stats = SigVerifierStats::default(); - last_print = Instant::now(); } }) .unwrap() diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 0309212f6..d8eb58da7 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -10,7 +10,6 @@ use { GossipVerifiedVoteHashSender, VerifiedVoteSender, VoteTracker, }, fetch_stage::FetchStage, - packet_deduper::PacketDeduper, sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage, }, @@ -141,7 +140,6 @@ impl Tpu { transaction_status_sender, replay_vote_sender, cost_model.clone(), - PacketDeduper::default(), ); let broadcast_stage = broadcast_type.new_broadcast_stage( diff --git a/gossip/Cargo.toml b/gossip/Cargo.toml index 2aadb98f7..e5c7a3cc3 100644 --- a/gossip/Cargo.toml +++ b/gossip/Cargo.toml @@ -39,6 +39,7 @@ solana-metrics = { path = "../metrics", version = "=1.10.0" } solana-net-utils = { path = "../net-utils", version = "=1.10.0" } solana-perf = { path = "../perf", version = "=1.10.0" } solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "=1.10.0" } +solana-bloom = { path = "../bloom", version = "=1.10.0" } solana-runtime = { path = "../runtime", version = "=1.10.0" } solana-streamer = { path = "../streamer", version = "=1.10.0" } solana-sdk = { path = "../sdk", version = "=1.10.0" } diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 99372a5b8..0268c73ee 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -261,7 +261,7 @@ pub fn make_accounts_hashes_message( pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; // TODO These messages should go through the gpu pipeline for spam filtering -#[frozen_abi(digest = "4qB65g6HSnHFxkhZuvMEBCLHARBda1HBwJ8qeQ5RZ6Pk")] +#[frozen_abi(digest = "C1nR7B7CgMyUYo6h3z2KXcS38JSwF6y8jmZ6Y9Cz7XEd")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] pub(crate) enum Protocol { diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index 5ce1559f1..4e0d8bcd6 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -25,7 +25,7 @@ use { lru::LruCache, rand::Rng, rayon::{prelude::*, ThreadPool}, - solana_runtime::bloom::{AtomicBloom, Bloom}, + solana_bloom::bloom::{AtomicBloom, Bloom}, solana_sdk::{ hash::{hash, Hash}, pubkey::Pubkey, diff --git a/gossip/src/crds_gossip_push.rs b/gossip/src/crds_gossip_push.rs index 9f3ba781a..98a618b17 100644 --- a/gossip/src/crds_gossip_push.rs +++ b/gossip/src/crds_gossip_push.rs @@ -26,7 +26,7 @@ use { itertools::Itertools, lru::LruCache, rand::{seq::SliceRandom, Rng}, - solana_runtime::bloom::{AtomicBloom, Bloom}, + solana_bloom::bloom::{AtomicBloom, Bloom}, solana_sdk::{packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::timestamp}, solana_streamer::socket::SocketAddrSpace, std::{ diff --git a/perf/Cargo.toml b/perf/Cargo.toml index e5c790624..9420e20d9 100644 --- a/perf/Cargo.toml +++ b/perf/Cargo.toml @@ -15,10 +15,13 @@ curve25519-dalek = { version = "3" } dlopen = "0.1.8" dlopen_derive = "0.1.4" lazy_static = "1.4.0" +bv = { version = "0.11.1", features = ["serde"] } +fnv = "1.0.7" log = "0.4.14" rand = "0.7.0" rayon = "1.5.1" serde = "1.0.133" +solana-bloom = { path = "../bloom", version = "=1.10.0" } solana-metrics = { path = "../metrics", version = "=1.10.0" } solana-sdk = { path = "../sdk", version = "=1.10.0" } solana-vote-program = { path = "../programs/vote", version = "=1.10.0" } diff --git a/perf/benches/dedup.rs b/perf/benches/dedup.rs new file mode 100644 index 000000000..e8590a67e --- /dev/null +++ b/perf/benches/dedup.rs @@ -0,0 +1,45 @@ +#![feature(test)] + +extern crate test; + +use { + solana_bloom::bloom::{AtomicBloom, Bloom}, + solana_perf::{packet::to_packet_batches, sigverify, test_tx::test_tx}, + test::Bencher, +}; + +#[bench] +fn bench_dedup_same(bencher: &mut Bencher) { + let tx = test_tx(); + + // generate packet vector + let mut batches = to_packet_batches( + &std::iter::repeat(tx).take(64 * 1024).collect::>(), + 128, + ); + let packet_count = sigverify::count_packets_in_batches(&batches); + let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into(); + + println!("packet_count {} {}", packet_count, batches.len()); + + // verify packets + bencher.iter(|| { + let _ans = sigverify::dedup_packets(&bloom, &mut batches); + }) +} + +#[bench] +fn bench_dedup_diff(bencher: &mut Bencher) { + // generate packet vector + let mut batches = + to_packet_batches(&(0..64 * 1024).map(|_| test_tx()).collect::>(), 128); + let packet_count = sigverify::count_packets_in_batches(&batches); + let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into(); + + println!("packet_count {} {}", packet_count, batches.len()); + + // verify packets + bencher.iter(|| { + let _ans = sigverify::dedup_packets(&bloom, &mut batches); + }) +} diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index 8396bb529..e6977430d 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -4,6 +4,7 @@ //! to the GPU. //! +use solana_bloom::bloom::AtomicBloom; #[cfg(test)] use solana_sdk::transaction::Transaction; use { @@ -23,6 +24,7 @@ use { short_vec::decode_shortu16_len, signature::Signature, }, + std::sync::atomic::{AtomicU64, Ordering}, std::{convert::TryFrom, mem::size_of}, }; @@ -418,6 +420,37 @@ pub fn generate_offsets( ) } +fn dedup_packet(count: &AtomicU64, packet: &mut Packet, bloom: &AtomicBloom<&[u8]>) { + // If this packet was already marked as discard, drop it + if packet.meta.discard() { + return; + } + + if bloom.contains(&packet.data.as_slice()) { + packet.meta.set_discard(true); + count.fetch_add(1, Ordering::Relaxed); + return; + } + bloom.add(&packet.data.as_slice()); +} + +pub fn dedup_packets(bloom: &AtomicBloom<&[u8]>, batches: &mut [PacketBatch]) -> u64 { + use rayon::prelude::*; + let packet_count = count_packets_in_batches(batches); + // machine specific random offset to read the u64 from the packet signature + let count = AtomicU64::new(0); + PAR_THREAD_POOL.install(|| { + batches.into_par_iter().for_each(|batch| { + batch + .packets + .par_iter_mut() + .for_each(|p| dedup_packet(&count, p, bloom)) + }) + }); + inc_new_counter_debug!("dedup_packets_total", packet_count); + count.load(Ordering::Relaxed) +} + pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool) { use rayon::prelude::*; let packet_count = count_packets_in_batches(batches); @@ -597,11 +630,12 @@ mod tests { use { super::*, crate::{ - packet::{Packet, PacketBatch}, + packet::{to_packet_batches, Packet, PacketBatch}, sigverify::{self, PacketOffsets}, test_tx::{new_test_vote_tx, test_multisig_tx, test_tx}, }, bincode::{deserialize, serialize}, + solana_bloom::bloom::{AtomicBloom, Bloom}, solana_sdk::{ instruction::CompiledInstruction, message::{Message, MessageHeader}, @@ -1261,4 +1295,31 @@ mod tests { current_offset = current_offset.saturating_add(size_of::()); }); } + + #[test] + fn test_dedup_same() { + let tx = test_tx(); + + // generate packet vector + let mut batches = + to_packet_batches(&std::iter::repeat(tx).take(1024).collect::>(), 128); + let packet_count = sigverify::count_packets_in_batches(&batches); + let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 20).into(); + let discard = sigverify::dedup_packets(&bloom, &mut batches) as usize; + // because dedup uses a threadpool, there maybe up to N threads of txs that go through + let n = get_thread_count(); + assert!(packet_count < discard + n * 2); + } + + #[test] + fn test_dedup_diff() { + // generate packet vector + let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::>(), 128); + + let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 20).into(); + let discard = sigverify::dedup_packets(&bloom, &mut batches) as usize; + // because dedup uses a threadpool, there maybe up to N threads of txs that go through + let n = get_thread_count(); + assert!(discard < n * 2); + } } diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index bc13e1adc..5e883be8e 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -2686,6 +2686,23 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "solana-bloom" +version = "1.10.0" +dependencies = [ + "bv", + "fnv", + "log", + "rand 0.7.3", + "rayon", + "rustc_version 0.4.0", + "serde", + "serde_derive", + "solana-frozen-abi 1.10.0", + "solana-frozen-abi-macro 1.10.0", + "solana-sdk", +] + [[package]] name = "solana-bpf-loader-program" version = "1.10.0" @@ -3309,10 +3326,12 @@ name = "solana-perf" version = "1.10.0" dependencies = [ "bincode", + "bv", "caps", "curve25519-dalek 3.2.0", "dlopen", "dlopen_derive", + "fnv", "lazy_static", "libc", "log", @@ -3320,6 +3339,7 @@ dependencies = [ "rand 0.7.3", "rayon", "serde", + "solana-bloom", "solana-metrics", "solana-rayon-threadlimit", "solana-sdk", @@ -3501,6 +3521,7 @@ dependencies = [ "serde", "serde_derive", "solana-address-lookup-table-program", + "solana-bloom", "solana-bucket-map", "solana-compute-budget-program", "solana-config-program", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 8759b7af8..0d3196e63 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -38,6 +38,7 @@ serde = { version = "1.0.133", features = ["rc"] } serde_derive = "1.0.103" solana-address-lookup-table-program = { path = "../programs/address-lookup-table", version = "=1.10.0" } solana-bucket-map = { path = "../bucket_map", version = "=1.10.0" } +solana-bloom = { path = "../bloom", version = "=1.10.0" } solana-compute-budget-program = { path = "../programs/compute-budget", version = "=1.10.0" } solana-config-program = { path = "../programs/config", version = "=1.10.0" } solana-frozen-abi = { path = "../frozen-abi", version = "=1.10.0" } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index ab6be2d24..50039a8b4 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -18,7 +18,6 @@ pub mod bank_forks; pub mod bank_utils; pub mod block_cost_limits; pub mod blockhash_queue; -pub mod bloom; pub mod bucket_map_holder; pub mod bucket_map_holder_stats; pub mod builtins;