forward packets by prioritization in desc order (#25406)

- Forward packets by prioritization in desc order
- Add support of cost-tracking by transaction requested compute units
- Hook up account buckets to forwarder
- Add metrics for forwardable batches count
- Remove redundant invalid packets filtering at end of slot since forwarder will do the same when batch forwardable packets
- Add bench test for forwarding
This commit is contained in:
Tao Zhu 2022-07-05 23:24:58 -05:00 committed by GitHub
parent 38216aa781
commit c1d89ad749
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 952 additions and 460 deletions

View File

@ -249,8 +249,8 @@ fn main() {
let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let bank0 = Bank::new_for_benches(&genesis_config);
let mut bank_forks = BankForks::new(bank0);
let mut bank = bank_forks.working_bank();
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank0)));
let mut bank = bank_forks.read().unwrap().working_bank();
// set cost tracker limits to MAX so it will not filter out TXs
bank.write_cost_tracker()
@ -357,6 +357,7 @@ fn main() {
replay_vote_sender,
Arc::new(RwLock::new(CostModel::default())),
Arc::new(connection_cache),
bank_forks.clone(),
);
poh_recorder.write().unwrap().set_bank(&bank, false);
@ -428,8 +429,8 @@ fn main() {
new_bank_time.stop();
let mut insert_time = Measure::start("insert_time");
bank_forks.insert(new_bank);
bank = bank_forks.working_bank();
bank_forks.write().unwrap().insert(new_bank);
bank = bank_forks.read().unwrap().working_bank();
insert_time.stop();
// set cost tracker limits to MAX so it will not filter out TXs
@ -443,7 +444,10 @@ fn main() {
assert!(poh_recorder.read().unwrap().bank().is_some());
if bank.slot() > 32 {
leader_schedule_cache.set_root(&bank);
bank_forks.set_root(root, &AbsRequestSender::default(), None);
bank_forks
.write()
.unwrap()
.set_root(root, &AbsRequestSender::default(), None);
root += 1;
}
debug!(
@ -476,7 +480,11 @@ fn main() {
}
}
}
let txs_processed = bank_forks.working_bank().transaction_count();
let txs_processed = bank_forks
.read()
.unwrap()
.working_bank()
.transaction_count();
debug!("processed: {} base: {}", txs_processed, base_tx_count);
eprintln!(
"{{'name': 'banking_bench_total', 'median': '{:.2}'}}",

View File

@ -25,7 +25,7 @@ use {
},
solana_perf::{packet::to_packet_batches, test_tx::test_tx},
solana_poh::poh_recorder::{create_test_recorder, WorkingBankEntry},
solana_runtime::{bank::Bank, cost_model::CostModel},
solana_runtime::{bank::Bank, bank_forks::BankForks, cost_model::CostModel},
solana_sdk::{
genesis_config::GenesisConfig,
hash::Hash,
@ -170,7 +170,8 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
let mut bank = Bank::new_for_benches(&genesis_config);
// Allow arbitrary transaction processing time for the purposes of this bench
bank.ns_per_slot = u128::MAX;
let bank = Arc::new(bank);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank = bank_forks.read().unwrap().get(0).unwrap();
// set cost tracker limits to MAX so it will not filter out TXs
bank.write_cost_tracker()
@ -232,6 +233,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
s,
Arc::new(RwLock::new(CostModel::default())),
Arc::new(ConnectionCache::default()),
bank_forks,
);
poh_recorder.write().unwrap().set_bank(&bank, false);

View File

@ -5,10 +5,19 @@ extern crate test;
use {
rand::distributions::{Distribution, Uniform},
solana_core::unprocessed_packet_batches::*,
solana_core::{
banking_stage::*, forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts,
unprocessed_packet_batches::*,
},
solana_measure::measure::Measure,
solana_perf::packet::{Packet, PacketBatch},
solana_runtime::{
bank::Bank,
bank_forks::BankForks,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
},
solana_sdk::{hash::Hash, signature::Keypair, system_transaction},
std::sync::{Arc, RwLock},
test::Bencher,
};
@ -174,3 +183,78 @@ fn bench_unprocessed_packet_batches_randomized_beyond_limit(bencher: &mut Benche
insert_packet_batches(buffer_capacity, batch_count, packet_per_batch_count, true);
});
}
fn build_bank_forks_for_test() -> Arc<RwLock<BankForks>> {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = BankForks::new(bank);
Arc::new(RwLock::new(bank_forks))
}
fn buffer_iter_desc_and_forward(
buffer_max_size: usize,
batch_count: usize,
packet_per_batch_count: usize,
randomize: bool,
) {
solana_logger::setup();
let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(buffer_max_size);
// fill buffer
{
let mut timer = Measure::start("fill_buffer");
(0..batch_count).for_each(|_| {
let (packet_batch, packet_indexes) = if randomize {
build_randomized_packet_batch(packet_per_batch_count)
} else {
build_packet_batch(packet_per_batch_count)
};
let deserialized_packets = deserialize_packets(&packet_batch, &packet_indexes);
unprocessed_packet_batches.insert_batch(deserialized_packets);
});
timer.stop();
log::info!(
"inserted {} batch, elapsed {}",
buffer_max_size,
timer.as_us()
);
}
// forward whole buffer
{
let mut timer = Measure::start("forward_time");
let mut forward_packet_batches_by_accounts =
ForwardPacketBatchesByAccounts::new_with_default_batch_limits(
build_bank_forks_for_test().read().unwrap().root_bank(),
);
// iter_desc buffer
let filter_forwarding_results = BankingStage::filter_valid_packets_for_forwarding(
&mut unprocessed_packet_batches,
&mut forward_packet_batches_by_accounts,
);
timer.stop();
let batched_filter_forwarding_results: usize = forward_packet_batches_by_accounts
.iter_batches()
.map(|forward_batch| forward_batch.len())
.sum();
log::info!(
"filter_forwarding_results {:?}, batched_forwardable packets {}, elapsed {}",
filter_forwarding_results,
batched_filter_forwarding_results,
timer.as_us()
);
}
}
#[bench]
#[ignore]
fn bench_forwarding_unprocessed_packet_batches(bencher: &mut Bencher) {
let batch_count = 1_000;
let packet_per_batch_count = 64;
let buffer_capacity = batch_count * packet_per_batch_count;
bencher.iter(|| {
buffer_iter_desc_and_forward(buffer_capacity, batch_count, packet_per_batch_count, true);
});
}

View File

@ -3,6 +3,7 @@
//! can do its processing in parallel with signature verification on the GPU.
use {
crate::{
forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts,
leader_slot_banking_stage_metrics::{LeaderSlotMetricsTracker, ProcessTransactionsSummary},
leader_slot_banking_stage_timing_metrics::{
LeaderExecuteAndCommitTimings, RecordTransactionsTimings,
@ -37,6 +38,7 @@ use {
Bank, CommitTransactionCounts, LoadAndExecuteTransactionsOutput,
TransactionBalancesSet, TransactionCheckResult,
},
bank_forks::BankForks,
bank_utils,
cost_model::{CostModel, TransactionCost},
transaction_batch::TransactionBatch,
@ -48,13 +50,10 @@ use {
Slot, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE, MAX_TRANSACTION_FORWARDING_DELAY,
MAX_TRANSACTION_FORWARDING_DELAY_GPU,
},
feature_set,
pubkey::Pubkey,
saturating_add_assign,
timing::{duration_as_ms, timestamp, AtomicInterval},
transaction::{
self, AddressLoader, SanitizedTransaction, TransactionError, VersionedTransaction,
},
transaction::{self, SanitizedTransaction, TransactionError, VersionedTransaction},
transport::TransportError,
},
solana_streamer::sendmmsg::batch_send,
@ -151,7 +150,6 @@ pub struct BankingStageStats {
current_buffered_packet_batches_count: AtomicUsize,
rebuffered_packets_count: AtomicUsize,
consumed_buffered_packets_count: AtomicUsize,
end_of_slot_filtered_invalid_count: AtomicUsize,
forwarded_transaction_count: AtomicUsize,
forwarded_vote_count: AtomicUsize,
batch_packet_indexes_len: Histogram,
@ -162,7 +160,6 @@ pub struct BankingStageStats {
handle_retryable_packets_elapsed: AtomicU64,
filter_pending_packets_elapsed: AtomicU64,
packet_conversion_elapsed: AtomicU64,
unprocessed_packet_conversion_elapsed: AtomicU64,
transaction_processing_elapsed: AtomicU64,
}
@ -204,9 +201,6 @@ impl BankingStageStats {
.load(Ordering::Relaxed)
+ self.filter_pending_packets_elapsed.load(Ordering::Relaxed)
+ self.packet_conversion_elapsed.load(Ordering::Relaxed)
+ self
.unprocessed_packet_conversion_elapsed
.load(Ordering::Relaxed)
+ self.transaction_processing_elapsed.load(Ordering::Relaxed)
+ self.forwarded_transaction_count.load(Ordering::Relaxed) as u64
+ self.forwarded_vote_count.load(Ordering::Relaxed) as u64
@ -267,12 +261,6 @@ impl BankingStageStats {
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"end_of_slot_filtered_invalid_count",
self.end_of_slot_filtered_invalid_count
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"forwarded_transaction_count",
self.forwarded_transaction_count.swap(0, Ordering::Relaxed) as i64,
@ -312,12 +300,6 @@ impl BankingStageStats {
self.packet_conversion_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"unprocessed_packet_conversion_elapsed",
self.unprocessed_packet_conversion_elapsed
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"transaction_processing_elapsed",
self.transaction_processing_elapsed
@ -374,12 +356,6 @@ pub struct BatchedTransactionErrorDetails {
pub batched_dropped_txs_per_account_data_total_limit_count: u64,
}
#[derive(Debug, Default)]
struct EndOfSlot {
next_slot_leader: Option<Pubkey>,
working_bank: Option<Arc<Bank>>,
}
/// Stores the stage's thread handle and output receiver.
pub struct BankingStage {
bank_thread_hdls: Vec<JoinHandle<()>>,
@ -400,8 +376,9 @@ pub enum ForwardOption {
ForwardTransaction,
}
struct FilterForwardingResults<'a> {
forwardable_packets: Vec<&'a Packet>,
#[derive(Debug)]
pub struct FilterForwardingResults {
total_forwardable_packets: usize,
total_tracer_packets_in_buffer: usize,
total_forwardable_tracer_packets: usize,
}
@ -409,6 +386,7 @@ struct FilterForwardingResults<'a> {
impl BankingStage {
/// Create the stage using `bank`. Exit when `verified_receiver` is dropped.
#[allow(clippy::new_ret_no_self)]
#[allow(clippy::too_many_arguments)]
pub fn new(
cluster_info: &Arc<ClusterInfo>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
@ -419,6 +397,7 @@ impl BankingStage {
gossip_vote_sender: ReplayVoteSender,
cost_model: Arc<RwLock<CostModel>>,
connection_cache: Arc<ConnectionCache>,
bank_forks: Arc<RwLock<BankForks>>,
) -> Self {
Self::new_num_threads(
cluster_info,
@ -431,6 +410,7 @@ impl BankingStage {
gossip_vote_sender,
cost_model,
connection_cache,
bank_forks,
)
}
@ -446,6 +426,7 @@ impl BankingStage {
gossip_vote_sender: ReplayVoteSender,
cost_model: Arc<RwLock<CostModel>>,
connection_cache: Arc<ConnectionCache>,
bank_forks: Arc<RwLock<BankForks>>,
) -> Self {
assert!(num_threads >= MIN_TOTAL_THREADS);
// Single thread to generate entries from many banks.
@ -478,6 +459,7 @@ impl BankingStage {
let data_budget = data_budget.clone();
let cost_model = cost_model.clone();
let connection_cache = connection_cache.clone();
let bank_forks = bank_forks.clone();
Builder::new()
.name(format!("solana-banking-stage-tx-{}", i))
.spawn(move || {
@ -494,6 +476,7 @@ impl BankingStage {
&data_budget,
cost_model,
connection_cache,
&bank_forks,
);
})
.unwrap()
@ -502,32 +485,53 @@ impl BankingStage {
Self { bank_thread_hdls }
}
fn filter_valid_packets_for_forwarding<'a>(
deserialized_packets: impl Iterator<Item = &'a DeserializedPacket>,
) -> FilterForwardingResults<'a> {
let mut total_forwardable_tracer_packets = 0;
let mut total_tracer_packets_in_buffer = 0;
// filter forwardable Rc<immutable_deserialized_packet>s that:
// 1. are not forwarded, and
// 2. in priority order from max to min, and
// 3. not exceeding account bucket limit
// returns forwarded packets count
pub fn filter_valid_packets_for_forwarding(
buffered_packet_batches: &mut UnprocessedPacketBatches,
forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts,
) -> FilterForwardingResults {
let mut total_forwardable_tracer_packets: usize = 0;
let mut total_tracer_packets_in_buffer: usize = 0;
let mut total_forwardable_packets: usize = 0;
let mut dropped_tx_before_forwarding_count: usize = 0;
let filter_forwardable_packet = |deserialized_packet: &mut DeserializedPacket| -> bool {
let mut result = true;
let is_tracer_packet = deserialized_packet
.immutable_section()
.original_packet()
.meta
.is_tracer_packet();
if is_tracer_packet {
saturating_add_assign!(total_tracer_packets_in_buffer, 1);
}
if !deserialized_packet.forwarded {
saturating_add_assign!(total_forwardable_packets, 1);
if is_tracer_packet {
saturating_add_assign!(total_forwardable_tracer_packets, 1);
}
result = forward_packet_batches_by_accounts
.add_packet(deserialized_packet.immutable_section().clone());
if !result {
saturating_add_assign!(dropped_tx_before_forwarding_count, 1);
}
}
result
};
buffered_packet_batches.iter_desc(filter_forwardable_packet);
inc_new_counter_info!(
"banking_stage-dropped_tx_before_forwarding",
dropped_tx_before_forwarding_count
);
FilterForwardingResults {
forwardable_packets: deserialized_packets
.filter_map(|deserialized_packet| {
let is_tracer_packet = deserialized_packet
.immutable_section()
.original_packet()
.meta
.is_tracer_packet();
if is_tracer_packet {
total_tracer_packets_in_buffer += 1;
}
if !deserialized_packet.forwarded {
if is_tracer_packet {
total_forwardable_tracer_packets += 1;
}
Some(deserialized_packet.immutable_section().original_packet())
} else {
None
}
})
.collect(),
total_forwardable_packets,
total_tracer_packets_in_buffer,
total_forwardable_tracer_packets,
}
@ -535,19 +539,22 @@ impl BankingStage {
/// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns
/// the number of successfully forwarded packets in second part of tuple
fn forward_buffered_packets(
fn forward_buffered_packets<'a>(
connection_cache: &ConnectionCache,
forward_option: &ForwardOption,
cluster_info: &ClusterInfo,
poh_recorder: &Arc<RwLock<PohRecorder>>,
socket: &UdpSocket,
filter_forwarding_results: &FilterForwardingResults,
forwardable_packets: impl Iterator<Item = &'a Packet>,
data_budget: &DataBudget,
banking_stage_stats: &BankingStageStats,
tracer_packet_stats: &mut TracerPacketStats,
) -> (std::result::Result<(), TransportError>, usize) {
) -> (
std::result::Result<(), TransportError>,
usize,
Option<Pubkey>,
) {
let leader_and_addr = match forward_option {
ForwardOption::NotForward => return (Ok(()), 0),
ForwardOption::NotForward => return (Ok(()), 0, None),
ForwardOption::ForwardTransaction => {
next_leader_tpu_forwards(cluster_info, poh_recorder)
}
@ -556,20 +563,9 @@ impl BankingStage {
};
let (leader_pubkey, addr) = match leader_and_addr {
Some(leader_and_addr) => leader_and_addr,
None => return (Ok(()), 0),
None => return (Ok(()), 0, None),
};
let FilterForwardingResults {
forwardable_packets,
total_forwardable_tracer_packets,
..
} = filter_forwarding_results;
tracer_packet_stats.increment_total_forwardable_tracer_packets(
*total_forwardable_tracer_packets,
leader_pubkey,
);
const INTERVAL_MS: u64 = 100;
const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200;
const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000;
@ -582,7 +578,6 @@ impl BankingStage {
});
let packet_vec: Vec<_> = forwardable_packets
.iter()
.filter_map(|p| {
if !p.meta.forwarded() && data_budget.take(p.meta.size) {
Some(p.data(..)?.to_vec())
@ -629,16 +624,16 @@ impl BankingStage {
if let Err(err) = res {
inc_new_counter_info!("banking_stage-forward_packets-failed-batches", 1);
return (Err(err), 0);
return (Err(err), 0, Some(leader_pubkey));
}
}
(Ok(()), packet_vec_len)
(Ok(()), packet_vec_len, Some(leader_pubkey))
}
#[allow(clippy::too_many_arguments)]
pub fn consume_buffered_packets(
my_pubkey: &Pubkey,
_my_pubkey: &Pubkey,
max_tx_ingestion_ns: u128,
poh_recorder: &Arc<RwLock<PohRecorder>>,
buffered_packet_batches: &mut UnprocessedPacketBatches,
@ -655,7 +650,7 @@ impl BankingStage {
let mut consumed_buffered_packets_count = 0;
let buffered_packets_len = buffered_packet_batches.len();
let mut proc_start = Measure::start("consume_buffered_process");
let mut reached_end_of_slot: Option<EndOfSlot> = None;
let mut reached_end_of_slot = false;
let mut retryable_packets = MinMaxHeap::with_capacity(buffered_packet_batches.capacity());
std::mem::swap(
@ -717,13 +712,10 @@ impl BankingStage {
)
{
let poh_recorder_lock_time = {
let (poh_recorder_locked, poh_recorder_lock_time) =
let (_poh_recorder_locked, poh_recorder_lock_time) =
measure!(poh_recorder.read().unwrap(), "poh_recorder.read");
reached_end_of_slot = Some(EndOfSlot {
next_slot_leader: poh_recorder_locked.next_slot_leader(),
working_bank: Some(working_bank),
});
reached_end_of_slot = true;
poh_recorder_lock_time
};
@ -776,19 +768,16 @@ impl BankingStage {
);
result
} else if reached_end_of_slot.is_some() {
} else if reached_end_of_slot {
packets_to_process
} else {
// mark as end-of-slot to avoid aggressively lock poh for the remaining for
// packet batches in buffer
let poh_recorder_lock_time = {
let (poh_recorder_locked, poh_recorder_lock_time) =
let (_poh_recorder_locked, poh_recorder_lock_time) =
measure!(poh_recorder.read().unwrap(), "poh_recorder.read");
reached_end_of_slot = Some(EndOfSlot {
next_slot_leader: poh_recorder_locked.next_slot_leader(),
working_bank: None,
});
reached_end_of_slot = true;
poh_recorder_lock_time
};
slot_metrics_tracker.increment_consume_buffered_packets_poh_recorder_lock_us(
@ -805,43 +794,12 @@ impl BankingStage {
&mut buffered_packet_batches.packet_priority_queue,
);
if let Some(end_of_slot) = &reached_end_of_slot {
if reached_end_of_slot {
slot_metrics_tracker
.set_end_of_slot_unprocessed_buffer_len(buffered_packet_batches.len() as u64);
// We've hit the end of this slot, no need to perform more processing,
// just filter the remaining packets for the invalid (e.g. too old) ones
// if the working_bank is available
let mut end_of_slot_filtering_time = Measure::start("end_of_slot_filtering");
// TODO: This doesn't have to be done at the end of every slot, can instead
// hold multiple unbuffered queues without merging them
// TODO: update this here to filter the rest of the packets remaining
// TODO: this needs to be done even if there is no end_of_slot.working_bank
// to put retryable packets back in buffer
let end_of_slot_filtered_invalid_count =
Self::filter_unprocessed_packets_at_end_of_slot(
&end_of_slot.working_bank,
buffered_packet_batches,
my_pubkey,
end_of_slot.next_slot_leader,
banking_stage_stats,
);
inc_new_counter_info!(
"banking_stage-dropped_tx_before_forwarding",
end_of_slot_filtered_invalid_count
);
slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count(
end_of_slot_filtered_invalid_count as u64,
);
banking_stage_stats
.end_of_slot_filtered_invalid_count
.fetch_add(end_of_slot_filtered_invalid_count, Ordering::Relaxed);
end_of_slot_filtering_time.stop();
slot_metrics_tracker
.increment_end_of_slot_filtering_us(end_of_slot_filtering_time.as_us());
// Packet filtering will be done at `forward_packet_batches_by_accounts.add_packet()`
}
proc_start.stop();
@ -920,6 +878,7 @@ impl BankingStage {
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
connection_cache: &ConnectionCache,
tracer_packet_stats: &mut TracerPacketStats,
bank_forks: &Arc<RwLock<BankForks>>,
) {
let ((metrics_action, decision), make_decision_time) = measure!(
{
@ -999,6 +958,7 @@ impl BankingStage {
banking_stage_stats,
connection_cache,
tracer_packet_stats,
bank_forks,
),
"forward",
);
@ -1021,6 +981,7 @@ impl BankingStage {
banking_stage_stats,
connection_cache,
tracer_packet_stats,
bank_forks,
),
"forward_and_hold",
);
@ -1045,6 +1006,7 @@ impl BankingStage {
banking_stage_stats: &BankingStageStats,
connection_cache: &ConnectionCache,
tracer_packet_stats: &mut TracerPacketStats,
bank_forks: &Arc<RwLock<BankForks>>,
) {
if let ForwardOption::NotForward = forward_option {
if !hold {
@ -1053,43 +1015,65 @@ impl BankingStage {
return;
}
let filter_forwarding_result =
Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter());
let forwardable_packets_len = filter_forwarding_result.forwardable_packets.len();
let (_forward_result, sucessful_forwarded_packets_count) = Self::forward_buffered_packets(
connection_cache,
forward_option,
cluster_info,
poh_recorder,
socket,
&filter_forwarding_result,
data_budget,
banking_stage_stats,
tracer_packet_stats,
// get current root bank from bank_forks, use it to sanitize transaction and
// load all accounts from address loader;
let current_bank = bank_forks.read().unwrap().root_bank();
let mut forward_packet_batches_by_accounts =
ForwardPacketBatchesByAccounts::new_with_default_batch_limits(current_bank);
let filter_forwarding_result = Self::filter_valid_packets_for_forwarding(
buffered_packet_batches,
&mut forward_packet_batches_by_accounts,
);
let failed_forwarded_packets_count =
forwardable_packets_len.saturating_sub(sucessful_forwarded_packets_count);
forward_packet_batches_by_accounts
.iter_batches()
.filter(|&batch| !batch.is_empty())
.for_each(|forward_batch| {
slot_metrics_tracker.increment_forwardable_batches_count(1);
if failed_forwarded_packets_count > 0 {
slot_metrics_tracker
.increment_failed_forwarded_packets_count(failed_forwarded_packets_count as u64);
slot_metrics_tracker.increment_packet_batch_forward_failure_count(1);
}
let batched_forwardable_packets_count = forward_batch.len();
let (_forward_result, sucessful_forwarded_packets_count, leader_pubkey) =
Self::forward_buffered_packets(
connection_cache,
forward_option,
cluster_info,
poh_recorder,
socket,
forward_batch.get_forwardable_packets(),
data_budget,
banking_stage_stats,
);
if sucessful_forwarded_packets_count > 0 {
slot_metrics_tracker.increment_successful_forwarded_packets_count(
sucessful_forwarded_packets_count as u64,
);
}
if let Some(leader_pubkey) = leader_pubkey {
tracer_packet_stats.increment_total_forwardable_tracer_packets(
filter_forwarding_result.total_forwardable_tracer_packets,
leader_pubkey,
);
}
let failed_forwarded_packets_count = batched_forwardable_packets_count
.saturating_sub(sucessful_forwarded_packets_count);
if failed_forwarded_packets_count > 0 {
slot_metrics_tracker.increment_failed_forwarded_packets_count(
failed_forwarded_packets_count as u64,
);
slot_metrics_tracker.increment_packet_batch_forward_failure_count(1);
}
if sucessful_forwarded_packets_count > 0 {
slot_metrics_tracker.increment_successful_forwarded_packets_count(
sucessful_forwarded_packets_count as u64,
);
}
});
if hold {
for deserialized_packet in buffered_packet_batches.iter_mut() {
deserialized_packet.forwarded = true;
}
} else {
slot_metrics_tracker
.increment_cleared_from_buffer_after_forward_count(forwardable_packets_len as u64);
slot_metrics_tracker.increment_cleared_from_buffer_after_forward_count(
filter_forwarding_result.total_forwardable_packets as u64,
);
tracer_packet_stats.increment_total_cleared_from_buffer_after_forward(
filter_forwarding_result.total_tracer_packets_in_buffer,
);
@ -1111,6 +1095,7 @@ impl BankingStage {
data_budget: &DataBudget,
cost_model: Arc<RwLock<CostModel>>,
connection_cache: Arc<ConnectionCache>,
bank_forks: &Arc<RwLock<BankForks>>,
) {
let recorder = poh_recorder.read().unwrap().recorder();
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
@ -1144,6 +1129,7 @@ impl BankingStage {
&mut slot_metrics_tracker,
&connection_cache,
&mut tracer_packet_stats,
bank_forks,
),
"process_buffered_packets",
);
@ -1831,31 +1817,6 @@ impl BankingStage {
.collect_vec()
}
// This function deserializes packets into transactions, computes the blake3 hash of transaction
// messages, and verifies secp256k1 instructions. A list of sanitized transactions are returned
// with their packet indexes.
#[allow(clippy::needless_collect)]
fn transaction_from_deserialized_packet(
deserialized_packet: &ImmutableDeserializedPacket,
feature_set: &Arc<feature_set::FeatureSet>,
votes_only: bool,
address_loader: impl AddressLoader,
) -> Option<SanitizedTransaction> {
if votes_only && !deserialized_packet.is_simple_vote() {
return None;
}
let tx = SanitizedTransaction::try_new(
deserialized_packet.transaction().clone(),
*deserialized_packet.message_hash(),
deserialized_packet.is_simple_vote(),
address_loader,
)
.ok()?;
tx.verify_precompiles(feature_set).ok()?;
Some(tx)
}
/// This function filters pending packets that are still valid
/// # Arguments
/// * `transactions` - a batch of transactions deserialized from packets
@ -1934,7 +1895,7 @@ impl BankingStage {
deserialized_packets
.enumerate()
.filter_map(|(i, deserialized_packet)| {
Self::transaction_from_deserialized_packet(
unprocessed_packet_batches::transaction_from_deserialized_packet(
deserialized_packet,
&bank.feature_set,
bank.vote_only_bank(),
@ -2019,54 +1980,6 @@ impl BankingStage {
process_transactions_summary
}
// Returns the number of packets that were filtered out for
// no longer being valid (could be too old, a duplicate of something
// already processed, etc.)
fn filter_unprocessed_packets_at_end_of_slot(
bank: &Option<Arc<Bank>>,
unprocessed_packets: &mut UnprocessedPacketBatches,
my_pubkey: &Pubkey,
next_leader: Option<Pubkey>,
banking_stage_stats: &BankingStageStats,
) -> usize {
// Check if we are the next leader. If so, let's not filter the packets
// as we'll filter it again while processing the packets.
// Filtering helps if we were going to forward the packets to some other node
let will_still_be_leader = next_leader
.map(|next_leader| next_leader == *my_pubkey)
.unwrap_or(false);
let should_filter_unprocessed_packets = !will_still_be_leader && bank.is_some();
let original_unprocessed_packets_len = unprocessed_packets.len();
if should_filter_unprocessed_packets {
// If `should_filter_unprocessed_packets` is true, then the bank
// must be `Some`
let bank = bank.as_ref().unwrap();
let mut unprocessed_packet_conversion_time =
Measure::start("unprocessed_packet_conversion");
let should_retain = |deserialized_packet: &mut DeserializedPacket| {
Self::transaction_from_deserialized_packet(
deserialized_packet.immutable_section(),
&bank.feature_set,
bank.vote_only_bank(),
bank.as_ref(),
)
.is_some()
};
unprocessed_packets.retain(should_retain);
unprocessed_packet_conversion_time.stop();
banking_stage_stats
.unprocessed_packet_conversion_elapsed
.fetch_add(
unprocessed_packet_conversion_time.as_us(),
Ordering::Relaxed,
);
}
original_unprocessed_packets_len.saturating_sub(unprocessed_packets.len())
}
fn generate_packet_indexes(vers: &PacketBatch) -> Vec<usize> {
vers.iter()
.enumerate()
@ -2298,6 +2211,7 @@ mod tests {
},
solana_program_runtime::timings::ProgramTiming,
solana_rpc::transaction_status_service::TransactionStatusService,
solana_runtime::bank_forks::BankForks,
solana_sdk::{
account::AccountSharedData,
hash::Hash,
@ -2309,14 +2223,10 @@ mod tests {
poh_config::PohConfig,
signature::{Keypair, Signer},
system_transaction,
transaction::{
MessageHash, SimpleAddressLoader, Transaction, TransactionError,
VersionedTransaction,
},
transaction::{MessageHash, Transaction, TransactionError, VersionedTransaction},
},
solana_streamer::{recvmmsg::recv_mmsg, socket::SocketAddrSpace},
solana_transaction_status::{TransactionStatusMeta, VersionedTransactionWithStatusMeta},
solana_vote_program::vote_transaction,
std::{
borrow::Cow,
collections::HashSet,
@ -2338,7 +2248,9 @@ mod tests {
#[test]
fn test_banking_stage_shutdown1() {
let genesis_config = create_genesis_config(2).genesis_config;
let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config));
let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap());
let (verified_sender, verified_receiver) = unbounded();
let (gossip_verified_vote_sender, gossip_verified_vote_receiver) = unbounded();
let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
@ -2364,6 +2276,7 @@ mod tests {
gossip_vote_sender,
Arc::new(RwLock::new(CostModel::default())),
Arc::new(ConnectionCache::default()),
bank_forks,
);
drop(verified_sender);
drop(gossip_verified_vote_sender);
@ -2383,7 +2296,9 @@ mod tests {
} = create_genesis_config(2);
genesis_config.ticks_per_slot = 4;
let num_extra_ticks = 2;
let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config));
let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap());
let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = unbounded();
let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
@ -2414,6 +2329,7 @@ mod tests {
gossip_vote_sender,
Arc::new(RwLock::new(CostModel::default())),
Arc::new(ConnectionCache::default()),
bank_forks,
);
trace!("sending bank");
drop(verified_sender);
@ -2456,7 +2372,9 @@ mod tests {
mint_keypair,
..
} = create_slow_genesis_config(10);
let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config));
let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap());
let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = unbounded();
let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
@ -2489,6 +2407,7 @@ mod tests {
gossip_vote_sender,
Arc::new(RwLock::new(CostModel::default())),
Arc::new(ConnectionCache::default()),
bank_forks,
);
// fund another account so we can send 2 good transactions in a single batch.
@ -2615,7 +2534,9 @@ mod tests {
let entry_receiver = {
// start a banking_stage to eat verified receiver
let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config));
let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap());
let blockstore = Arc::new(
Blockstore::open(ledger_path.path())
.expect("Expected to be able to open database ledger"),
@ -2641,6 +2562,7 @@ mod tests {
gossip_vote_sender,
Arc::new(RwLock::new(CostModel::default())),
Arc::new(ConnectionCache::default()),
bank_forks,
);
// wait for banking_stage to eat the packets
@ -3337,6 +3259,11 @@ mod tests {
#[test]
fn test_filter_valid_packets() {
solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_slow_genesis_config(10);
let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let current_bank = bank_forks.read().unwrap().root_bank();
let mut packets: Vec<DeserializedPacket> = (0..256)
.map(|packets_id| {
// packets are deserialized upon receiving, failed packets will not be
@ -3352,43 +3279,69 @@ mod tests {
})
.collect_vec();
let FilterForwardingResults {
forwardable_packets,
total_tracer_packets_in_buffer,
total_forwardable_tracer_packets,
} = BankingStage::filter_valid_packets_for_forwarding(packets.iter());
assert_eq!(forwardable_packets.len(), 256);
assert_eq!(total_tracer_packets_in_buffer, 256);
assert_eq!(total_forwardable_tracer_packets, 256);
// all packets are forwarded
{
let mut buffered_packet_batches: UnprocessedPacketBatches =
UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len());
let mut forward_packet_batches_by_accounts =
ForwardPacketBatchesByAccounts::new(current_bank.clone(), 1, 2);
// packets in a batch are forwarded in arbitrary order; verify the ports match after
// sorting
let expected_ports: Vec<_> = (0..256).collect();
let mut forwarded_ports: Vec<_> = forwardable_packets
.into_iter()
.map(|p| p.meta.port)
.collect();
forwarded_ports.sort_unstable();
assert_eq!(expected_ports, forwarded_ports);
let FilterForwardingResults {
total_forwardable_packets,
total_tracer_packets_in_buffer,
total_forwardable_tracer_packets,
} = BankingStage::filter_valid_packets_for_forwarding(
&mut buffered_packet_batches,
&mut forward_packet_batches_by_accounts,
);
assert_eq!(total_forwardable_packets, 256);
assert_eq!(total_tracer_packets_in_buffer, 256);
assert_eq!(total_forwardable_tracer_packets, 256);
let num_already_forwarded = 16;
for packet in &mut packets[0..num_already_forwarded] {
packet.forwarded = true;
// packets in a batch are forwarded in arbitrary order; verify the ports match after
// sorting
let expected_ports: Vec<_> = (0..256).collect();
let mut forwarded_ports: Vec<_> = forward_packet_batches_by_accounts
.iter_batches()
.flat_map(|batch| {
batch
.get_forwardable_packets()
.into_iter()
.map(|p| p.meta.port)
})
.collect();
forwarded_ports.sort_unstable();
assert_eq!(expected_ports, forwarded_ports);
}
// some packets are forwarded
{
let num_already_forwarded = 16;
for packet in &mut packets[0..num_already_forwarded] {
packet.forwarded = true;
}
let mut buffered_packet_batches: UnprocessedPacketBatches =
UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len());
let mut forward_packet_batches_by_accounts =
ForwardPacketBatchesByAccounts::new(current_bank, 1, 2);
let FilterForwardingResults {
total_forwardable_packets,
total_tracer_packets_in_buffer,
total_forwardable_tracer_packets,
} = BankingStage::filter_valid_packets_for_forwarding(
&mut buffered_packet_batches,
&mut forward_packet_batches_by_accounts,
);
assert_eq!(
total_forwardable_packets,
packets.len() - num_already_forwarded
);
assert_eq!(total_tracer_packets_in_buffer, packets.len());
assert_eq!(
total_forwardable_tracer_packets,
packets.len() - num_already_forwarded
);
}
let FilterForwardingResults {
forwardable_packets,
total_tracer_packets_in_buffer,
total_forwardable_tracer_packets,
} = BankingStage::filter_valid_packets_for_forwarding(packets.iter());
assert_eq!(
forwardable_packets.len(),
packets.len() - num_already_forwarded
);
assert_eq!(total_tracer_packets_in_buffer, packets.len());
assert_eq!(
total_forwardable_tracer_packets,
packets.len() - num_already_forwarded
);
}
#[test]
@ -4164,7 +4117,9 @@ mod tests {
..
} = &genesis_config_info;
let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(genesis_config));
let bank = Bank::new_no_wallclock_throttle_for_tests(genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap());
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let blockstore = Arc::new(
@ -4211,6 +4166,7 @@ mod tests {
&stats,
&connection_cache,
&mut TracerPacketStats::new(0),
&bank_forks,
);
recv_socket
@ -4263,7 +4219,9 @@ mod tests {
validator_pubkey,
..
} = &genesis_config_info;
let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(genesis_config));
let bank = Bank::new_no_wallclock_throttle_for_tests(genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap());
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let blockstore = Arc::new(
@ -4325,6 +4283,7 @@ mod tests {
&stats,
&connection_cache,
&mut TracerPacketStats::new(0),
&bank_forks,
);
recv_socket
@ -4360,134 +4319,6 @@ mod tests {
Blockstore::destroy(ledger_path.path()).unwrap();
}
#[cfg(test)]
fn make_test_packets(
transactions: Vec<Transaction>,
vote_indexes: Vec<usize>,
) -> Vec<DeserializedPacket> {
let capacity = transactions.len();
let mut packet_vector = Vec::with_capacity(capacity);
for tx in transactions.iter() {
packet_vector.push(Packet::from_data(None, &tx).unwrap());
}
for index in vote_indexes.iter() {
packet_vector[*index].meta.flags |= PacketFlags::SIMPLE_VOTE_TX;
}
packet_vector
.into_iter()
.map(|p| DeserializedPacket::new(p).unwrap())
.collect()
}
#[test]
fn test_transaction_from_deserialized_packet() {
use solana_sdk::feature_set::FeatureSet;
let keypair = Keypair::new();
let transfer_tx =
system_transaction::transfer(&keypair, &keypair.pubkey(), 1, Hash::default());
let vote_tx = vote_transaction::new_vote_transaction(
vec![42],
Hash::default(),
Hash::default(),
&keypair,
&keypair,
&keypair,
None,
);
// packets with no votes
{
let vote_indexes = vec![];
let packet_vector =
make_test_packets(vec![transfer_tx.clone(), transfer_tx.clone()], vote_indexes);
let mut votes_only = false;
let txs = packet_vector.iter().filter_map(|tx| {
BankingStage::transaction_from_deserialized_packet(
tx.immutable_section(),
&Arc::new(FeatureSet::default()),
votes_only,
SimpleAddressLoader::Disabled,
)
});
assert_eq!(2, txs.count());
votes_only = true;
let txs = packet_vector.iter().filter_map(|tx| {
BankingStage::transaction_from_deserialized_packet(
tx.immutable_section(),
&Arc::new(FeatureSet::default()),
votes_only,
SimpleAddressLoader::Disabled,
)
});
assert_eq!(0, txs.count());
}
// packets with some votes
{
let vote_indexes = vec![0, 2];
let packet_vector = make_test_packets(
vec![vote_tx.clone(), transfer_tx, vote_tx.clone()],
vote_indexes,
);
let mut votes_only = false;
let txs = packet_vector.iter().filter_map(|tx| {
BankingStage::transaction_from_deserialized_packet(
tx.immutable_section(),
&Arc::new(FeatureSet::default()),
votes_only,
SimpleAddressLoader::Disabled,
)
});
assert_eq!(3, txs.count());
votes_only = true;
let txs = packet_vector.iter().filter_map(|tx| {
BankingStage::transaction_from_deserialized_packet(
tx.immutable_section(),
&Arc::new(FeatureSet::default()),
votes_only,
SimpleAddressLoader::Disabled,
)
});
assert_eq!(2, txs.count());
}
// packets with all votes
{
let vote_indexes = vec![0, 1, 2];
let packet_vector = make_test_packets(
vec![vote_tx.clone(), vote_tx.clone(), vote_tx],
vote_indexes,
);
let mut votes_only = false;
let txs = packet_vector.iter().filter_map(|tx| {
BankingStage::transaction_from_deserialized_packet(
tx.immutable_section(),
&Arc::new(FeatureSet::default()),
votes_only,
SimpleAddressLoader::Disabled,
)
});
assert_eq!(3, txs.count());
votes_only = true;
let txs = packet_vector.iter().filter_map(|tx| {
BankingStage::transaction_from_deserialized_packet(
tx.immutable_section(),
&Arc::new(FeatureSet::default()),
votes_only,
SimpleAddressLoader::Disabled,
)
});
assert_eq!(3, txs.count());
}
}
#[test]
fn test_accumulate_batched_transaction_costs() {
let signature_cost = 1;

View File

@ -0,0 +1,343 @@
use {
crate::unprocessed_packet_batches::{self, ImmutableDeserializedPacket},
solana_perf::packet::Packet,
solana_runtime::{
bank::Bank,
block_cost_limits,
cost_tracker::{CostTracker, CostTrackerError},
},
solana_sdk::pubkey::Pubkey,
std::{rc::Rc, sync::Arc},
};
/// `ForwardBatch` to have half of default cost_tracker limits, as smaller batch
/// allows better granularity in composing forwarding transactions; e.g.,
/// transactions in each batch are potentially more evenly distributed across accounts.
const FORWARDED_BLOCK_COMPUTE_RATIO: u32 = 2;
/// this number divided by`FORWARDED_BLOCK_COMPUTE_RATIO` is the total blocks to forward.
/// To accommodate transactions without `compute_budget` instruction, which will
/// have default 200_000 compute units, it has 100 batches as default to forward
/// up to 12_000 such transaction. (120 such transactions fill up a batch, 100
/// batches allows 12_000 transactions)
const DEFAULT_NUMBER_OF_BATCHES: u32 = 100;
/// `ForwardBatch` represents one forwardable batch of transactions with a
/// limited number of total compute units
#[derive(Debug)]
pub struct ForwardBatch {
cost_tracker: CostTracker,
// `forwardable_packets` keeps forwardable packets in a vector in its
// original fee prioritized order
forwardable_packets: Vec<Rc<ImmutableDeserializedPacket>>,
}
impl Default for ForwardBatch {
/// default ForwardBatch has cost_tracker with default limits
fn default() -> Self {
Self::new(1)
}
}
impl ForwardBatch {
/// `ForwardBatch` keeps forwardable packets in a vector in its original fee prioritized order,
/// Number of packets are limited by `cost_tracker` with customized `limit_ratio` to lower
/// (when `limit_ratio` > 1) `cost_tracker`'s default limits.
/// Lower limits yield smaller batch for forwarding.
fn new(limit_ratio: u32) -> Self {
let mut cost_tracker = CostTracker::default();
cost_tracker.set_limits(
block_cost_limits::MAX_WRITABLE_ACCOUNT_UNITS.saturating_div(limit_ratio as u64),
block_cost_limits::MAX_BLOCK_UNITS.saturating_div(limit_ratio as u64),
block_cost_limits::MAX_VOTE_UNITS.saturating_div(limit_ratio as u64),
);
Self {
cost_tracker,
forwardable_packets: Vec::default(),
}
}
fn try_add(
&mut self,
write_lock_accounts: &[Pubkey],
compute_units: u64,
immutable_packet: Rc<ImmutableDeserializedPacket>,
) -> Result<u64, CostTrackerError> {
let res = self.cost_tracker.try_add_requested_cus(
write_lock_accounts,
compute_units,
immutable_packet.is_simple_vote(),
);
if res.is_ok() {
self.forwardable_packets.push(immutable_packet);
}
res
}
pub fn get_forwardable_packets(&self) -> impl Iterator<Item = &Packet> {
self.forwardable_packets
.iter()
.map(|immutable_packet| immutable_packet.original_packet())
}
pub fn len(&self) -> usize {
self.forwardable_packets.len()
}
pub fn is_empty(&self) -> bool {
self.forwardable_packets.is_empty()
}
}
/// To avoid forward queue being saturated by transactions for single hot account,
/// the forwarder will group and send prioritized transactions by account limit
/// to allow transactions on non-congested accounts to be forwarded alongside higher fee
/// transactions that saturate those highly demanded accounts.
#[derive(Debug)]
pub struct ForwardPacketBatchesByAccounts {
// Need a `bank` to load all accounts for VersionedTransaction. Currently
// using current rooted bank for it.
current_bank: Arc<Bank>,
// Forwardable packets are staged in number of batches, each batch is limited
// by cost_tracker on both account limit and block limits. Those limits are
// set as `limit_ratio` of regular block limits to facilitate quicker iteration.
forward_batches: Vec<ForwardBatch>,
}
impl ForwardPacketBatchesByAccounts {
pub fn new_with_default_batch_limits(current_bank: Arc<Bank>) -> Self {
Self::new(
current_bank,
FORWARDED_BLOCK_COMPUTE_RATIO,
DEFAULT_NUMBER_OF_BATCHES,
)
}
pub fn new(current_bank: Arc<Bank>, limit_ratio: u32, number_of_batches: u32) -> Self {
let forward_batches = (0..number_of_batches)
.map(|_| ForwardBatch::new(limit_ratio))
.collect();
Self {
current_bank,
forward_batches,
}
}
pub fn add_packet(&mut self, packet: Rc<ImmutableDeserializedPacket>) -> bool {
// do not forward packet that cannot be sanitized
if let Some(sanitized_transaction) =
unprocessed_packet_batches::transaction_from_deserialized_packet(
&packet,
&self.current_bank.feature_set,
self.current_bank.vote_only_bank(),
self.current_bank.as_ref(),
)
{
// get write_lock_accounts
let message = sanitized_transaction.message();
let write_lock_accounts: Vec<_> = message
.account_keys()
.iter()
.enumerate()
.filter_map(|(i, account_key)| {
if message.is_writable(i) {
Some(*account_key)
} else {
None
}
})
.collect();
// get requested CUs
let requested_cu = packet.compute_unit_limit();
// try to fill into forward batches
self.add_packet_to_batches(&write_lock_accounts, requested_cu, packet)
} else {
false
}
}
pub fn iter_batches(&self) -> impl Iterator<Item = &ForwardBatch> {
self.forward_batches.iter()
}
/// transaction will try to be filled into 'batches', if can't fit into first batch
/// due to cost_tracker (eg., exceeding account limit or block limit), it will try
/// next batch until either being added to one of 'bucket' or not being forwarded.
fn add_packet_to_batches(
&mut self,
write_lock_accounts: &[Pubkey],
compute_units: u64,
immutable_packet: Rc<ImmutableDeserializedPacket>,
) -> bool {
for forward_batch in self.forward_batches.iter_mut() {
if forward_batch
.try_add(write_lock_accounts, compute_units, immutable_packet.clone())
.is_ok()
{
return true;
}
}
false
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::unprocessed_packet_batches::{DeserializedPacket, TransactionPriorityDetails},
solana_runtime::{
bank::Bank,
bank_forks::BankForks,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
},
solana_sdk::{hash::Hash, signature::Keypair, system_transaction},
std::sync::RwLock,
};
fn build_bank_forks_for_test() -> Arc<RwLock<BankForks>> {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = BankForks::new(bank);
Arc::new(RwLock::new(bank_forks))
}
fn build_deserialized_packet_for_test(
priority: u64,
compute_unit_limit: u64,
) -> DeserializedPacket {
let tx = system_transaction::transfer(
&Keypair::new(),
&solana_sdk::pubkey::new_rand(),
1,
Hash::new_unique(),
);
let packet = Packet::from_data(None, &tx).unwrap();
DeserializedPacket::new_with_priority_details(
packet,
TransactionPriorityDetails {
priority,
compute_unit_limit,
},
)
.unwrap()
}
#[test]
fn test_try_add_to_forward_batch() {
// set test batch limit to be 1 millionth of regular block limit
let limit_ratio = 1_000_000u32;
// set requested_cu to be half of batch account limit
let requested_cu =
block_cost_limits::MAX_WRITABLE_ACCOUNT_UNITS.saturating_div(limit_ratio as u64);
let mut forward_batch = ForwardBatch::new(limit_ratio);
let write_lock_accounts = vec![Pubkey::new_unique(), Pubkey::new_unique()];
let packet = build_deserialized_packet_for_test(10, requested_cu);
// first packet will be successful
assert!(forward_batch
.try_add(
&write_lock_accounts,
requested_cu,
packet.immutable_section().clone()
)
.is_ok());
assert_eq!(1, forward_batch.forwardable_packets.len());
// second packet will hit account limit, therefore not added
assert!(forward_batch
.try_add(
&write_lock_accounts,
requested_cu,
packet.immutable_section().clone()
)
.is_err());
assert_eq!(1, forward_batch.forwardable_packets.len());
}
#[test]
fn test_add_packet_to_batches() {
solana_logger::setup();
// set test batch limit to be 1 millionth of regular block limit
let limit_ratio = 1_000_000u32;
let number_of_batches = 2;
// set requested_cu to be half of batch account limit
let requested_cu =
block_cost_limits::MAX_WRITABLE_ACCOUNT_UNITS.saturating_div(limit_ratio as u64);
let mut forward_packet_batches_by_accounts = ForwardPacketBatchesByAccounts::new(
build_bank_forks_for_test().read().unwrap().root_bank(),
limit_ratio,
number_of_batches,
);
// initially both batches are empty
{
let mut batches = forward_packet_batches_by_accounts.iter_batches();
assert_eq!(0, batches.next().unwrap().len());
assert_eq!(0, batches.next().unwrap().len());
assert!(batches.next().is_none());
}
let hot_account = solana_sdk::pubkey::new_rand();
let other_account = solana_sdk::pubkey::new_rand();
let packet_high_priority = build_deserialized_packet_for_test(10, requested_cu);
let packet_low_priority = build_deserialized_packet_for_test(0, requested_cu);
// with 4 packets, first 3 write to same hot_account with higher priority,
// the 4th write to other_account with lower priority;
// assert the 1st and 4th fit in fist batch, the 2nd in 2nd batch and 3rd will be dropped.
// 1st high-priority packet added to 1st batch
{
forward_packet_batches_by_accounts.add_packet_to_batches(
&[hot_account],
requested_cu,
packet_high_priority.immutable_section().clone(),
);
let mut batches = forward_packet_batches_by_accounts.iter_batches();
assert_eq!(1, batches.next().unwrap().len());
assert_eq!(0, batches.next().unwrap().len());
assert!(batches.next().is_none());
}
// 2nd high-priority packet added to 2nd packet
{
forward_packet_batches_by_accounts.add_packet_to_batches(
&[hot_account],
requested_cu,
packet_high_priority.immutable_section().clone(),
);
let mut batches = forward_packet_batches_by_accounts.iter_batches();
assert_eq!(1, batches.next().unwrap().len());
assert_eq!(1, batches.next().unwrap().len());
}
// 3rd high-priority packet not included in forwarding
{
forward_packet_batches_by_accounts.add_packet_to_batches(
&[hot_account],
requested_cu,
packet_high_priority.immutable_section().clone(),
);
let mut batches = forward_packet_batches_by_accounts.iter_batches();
assert_eq!(1, batches.next().unwrap().len());
assert_eq!(1, batches.next().unwrap().len());
assert!(batches.next().is_none());
}
// 4rd lower priority packet added to 1st bucket on non-content account
{
forward_packet_batches_by_accounts.add_packet_to_batches(
&[other_account],
requested_cu,
packet_low_priority.immutable_section().clone(),
);
let mut batches = forward_packet_batches_by_accounts.iter_batches();
assert_eq!(2, batches.next().unwrap().len());
assert_eq!(1, batches.next().unwrap().len());
assert!(batches.next().is_none());
}
}
}

View File

@ -122,8 +122,9 @@ struct LeaderSlotPacketCountMetrics {
// total number of valid unprocessed packets in the buffer that were removed after being forwarded
cleared_from_buffer_after_forward_count: u64,
// total number of packets removed at the end of the slot due to being too old, duplicate, etc.
end_of_slot_filtered_invalid_count: u64,
// total number of forwardable batches that were attempted for forwarding. A forwardable batch
// is defined in `ForwardPacketBatchesByAccounts` in `forward_packet_batches_by_accounts.rs`
forwardable_batches_count: u64,
}
impl LeaderSlotPacketCountMetrics {
@ -222,8 +223,8 @@ impl LeaderSlotPacketCountMetrics {
i64
),
(
"end_of_slot_filtered_invalid_count",
self.end_of_slot_filtered_invalid_count as i64,
"forwardable_batches_count",
self.forwardable_batches_count as i64,
i64
),
(
@ -573,6 +574,17 @@ impl LeaderSlotMetricsTracker {
}
}
pub(crate) fn increment_forwardable_batches_count(&mut self, count: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.packet_count_metrics
.forwardable_batches_count,
count
);
}
}
pub(crate) fn increment_retryable_packets_count(&mut self, count: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
@ -584,17 +596,6 @@ impl LeaderSlotMetricsTracker {
}
}
pub(crate) fn increment_end_of_slot_filtered_invalid_count(&mut self, count: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.packet_count_metrics
.end_of_slot_filtered_invalid_count,
count
);
}
}
pub(crate) fn set_end_of_slot_unprocessed_buffer_len(&mut self, len: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
leader_slot_metrics
@ -684,19 +685,6 @@ impl LeaderSlotMetricsTracker {
}
}
// Consuming buffered packets timing metrics
pub(crate) fn increment_end_of_slot_filtering_us(&mut self, us: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.timing_metrics
.consume_buffered_packets_timings
.end_of_slot_filtering_us,
us
);
}
}
pub(crate) fn increment_consume_buffered_packets_poh_recorder_lock_us(&mut self, us: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(

View File

@ -227,9 +227,6 @@ pub(crate) struct ConsumeBufferedPacketsTimings {
// Time spent grabbing poh recorder lock
pub poh_recorder_lock_us: u64,
// Time spent filtering invalid packets after leader slot has ended
pub end_of_slot_filtering_us: u64,
// Time spent processing transactions
pub process_packets_transactions_us: u64,
}
@ -245,11 +242,6 @@ impl ConsumeBufferedPacketsTimings {
self.poh_recorder_lock_us as i64,
i64
),
(
"end_of_slot_filtering_us",
self.end_of_slot_filtering_us as i64,
i64
),
(
"process_packets_transactions_us",
self.process_packets_transactions_us as i64,

View File

@ -27,6 +27,7 @@ pub mod duplicate_repair_status;
pub mod fetch_stage;
pub mod find_packet_sender_stake_stage;
pub mod fork_choice;
pub mod forward_packet_batches_by_accounts;
pub mod gen_keys;
pub mod heaviest_subtree_fork_choice;
pub mod latest_validator_votes_for_frozen_banks;

View File

@ -230,6 +230,7 @@ impl Tpu {
replay_vote_sender,
cost_model.clone(),
connection_cache.clone(),
bank_forks.clone(),
);
let broadcast_stage = broadcast_type.new_broadcast_stage(

View File

@ -3,18 +3,23 @@ use {
solana_perf::packet::{Packet, PacketBatch},
solana_program_runtime::compute_budget::ComputeBudget,
solana_sdk::{
feature_set,
hash::Hash,
message::{Message, SanitizedVersionedMessage},
sanitize::SanitizeError,
short_vec::decode_shortu16_len,
signature::Signature,
transaction::{SanitizedVersionedTransaction, Transaction, VersionedTransaction},
transaction::{
AddressLoader, SanitizedTransaction, SanitizedVersionedTransaction, Transaction,
VersionedTransaction,
},
},
std::{
cmp::Ordering,
collections::{hash_map::Entry, HashMap},
mem::size_of,
rc::Rc,
sync::Arc,
},
thiserror::Error,
};
@ -36,8 +41,8 @@ pub enum DeserializedPacketError {
#[derive(Debug, PartialEq, Eq)]
pub struct TransactionPriorityDetails {
priority: u64,
compute_unit_limit: u64,
pub priority: u64,
pub compute_unit_limit: u64,
}
#[derive(Debug, PartialEq, Eq)]
@ -93,7 +98,7 @@ impl DeserializedPacket {
}
#[cfg(test)]
fn new_with_priority_details(
pub fn new_with_priority_details(
packet: Packet,
priority_details: TransactionPriorityDetails,
) -> Result<Self, DeserializedPacketError> {
@ -254,12 +259,40 @@ impl UnprocessedPacketBatches {
self.message_hash_to_transaction.iter_mut().map(|(_k, v)| v)
}
/// Iterates DeserializedPackets in descending priority (max-first) order,
/// calls FnMut for each DeserializedPacket.
pub fn iter_desc<F>(&mut self, mut f: F)
where
F: FnMut(&mut DeserializedPacket) -> bool,
{
let mut packet_priority_queue_clone = self.packet_priority_queue.clone();
for immutable_packet in packet_priority_queue_clone.drain_desc() {
match self
.message_hash_to_transaction
.entry(*immutable_packet.message_hash())
{
Entry::Vacant(_vacant_entry) => {
panic!(
"entry {} must exist to be consistent with `packet_priority_queue`",
immutable_packet.message_hash()
);
}
Entry::Occupied(mut occupied_entry) => {
if !f(occupied_entry.get_mut()) {
return;
}
}
}
}
}
pub fn retain<F>(&mut self, mut f: F)
where
F: FnMut(&mut DeserializedPacket) -> bool,
{
// TODO: optimize this only when number of packets
// with oudated blockhash is high
// with outdated blockhash is high
let new_packet_priority_queue: MinMaxHeap<Rc<ImmutableDeserializedPacket>> = self
.packet_priority_queue
.drain()
@ -415,14 +448,45 @@ pub fn transactions_to_deserialized_packets(
.collect()
}
// This function deserializes packets into transactions, computes the blake3 hash of transaction
// messages, and verifies secp256k1 instructions. A list of sanitized transactions are returned
// with their packet indexes.
#[allow(clippy::needless_collect)]
pub fn transaction_from_deserialized_packet(
deserialized_packet: &ImmutableDeserializedPacket,
feature_set: &Arc<feature_set::FeatureSet>,
votes_only: bool,
address_loader: impl AddressLoader,
) -> Option<SanitizedTransaction> {
if votes_only && !deserialized_packet.is_simple_vote() {
return None;
}
let tx = SanitizedTransaction::try_new(
deserialized_packet.transaction().clone(),
*deserialized_packet.message_hash(),
deserialized_packet.is_simple_vote(),
address_loader,
)
.ok()?;
tx.verify_precompiles(feature_set).ok()?;
Some(tx)
}
#[cfg(test)]
mod tests {
use {
super::*,
solana_perf::packet::PacketFlags,
solana_sdk::{
compute_budget::ComputeBudgetInstruction, message::VersionedMessage, pubkey::Pubkey,
signature::Keypair, system_instruction, system_transaction,
compute_budget::ComputeBudgetInstruction,
message::VersionedMessage,
pubkey::Pubkey,
signature::{Keypair, Signer},
system_instruction, system_transaction,
transaction::{SimpleAddressLoader, Transaction},
},
solana_vote_program::vote_transaction,
std::net::IpAddr,
};
@ -622,4 +686,132 @@ mod tests {
})
);
}
#[cfg(test)]
fn make_test_packets(
transactions: Vec<Transaction>,
vote_indexes: Vec<usize>,
) -> Vec<DeserializedPacket> {
let capacity = transactions.len();
let mut packet_vector = Vec::with_capacity(capacity);
for tx in transactions.iter() {
packet_vector.push(Packet::from_data(None, &tx).unwrap());
}
for index in vote_indexes.iter() {
packet_vector[*index].meta.flags |= PacketFlags::SIMPLE_VOTE_TX;
}
packet_vector
.into_iter()
.map(|p| DeserializedPacket::new(p).unwrap())
.collect()
}
#[test]
fn test_transaction_from_deserialized_packet() {
use solana_sdk::feature_set::FeatureSet;
let keypair = Keypair::new();
let transfer_tx =
system_transaction::transfer(&keypair, &keypair.pubkey(), 1, Hash::default());
let vote_tx = vote_transaction::new_vote_transaction(
vec![42],
Hash::default(),
Hash::default(),
&keypair,
&keypair,
&keypair,
None,
);
// packets with no votes
{
let vote_indexes = vec![];
let packet_vector =
make_test_packets(vec![transfer_tx.clone(), transfer_tx.clone()], vote_indexes);
let mut votes_only = false;
let txs = packet_vector.iter().filter_map(|tx| {
transaction_from_deserialized_packet(
tx.immutable_section(),
&Arc::new(FeatureSet::default()),
votes_only,
SimpleAddressLoader::Disabled,
)
});
assert_eq!(2, txs.count());
votes_only = true;
let txs = packet_vector.iter().filter_map(|tx| {
transaction_from_deserialized_packet(
tx.immutable_section(),
&Arc::new(FeatureSet::default()),
votes_only,
SimpleAddressLoader::Disabled,
)
});
assert_eq!(0, txs.count());
}
// packets with some votes
{
let vote_indexes = vec![0, 2];
let packet_vector = make_test_packets(
vec![vote_tx.clone(), transfer_tx, vote_tx.clone()],
vote_indexes,
);
let mut votes_only = false;
let txs = packet_vector.iter().filter_map(|tx| {
transaction_from_deserialized_packet(
tx.immutable_section(),
&Arc::new(FeatureSet::default()),
votes_only,
SimpleAddressLoader::Disabled,
)
});
assert_eq!(3, txs.count());
votes_only = true;
let txs = packet_vector.iter().filter_map(|tx| {
transaction_from_deserialized_packet(
tx.immutable_section(),
&Arc::new(FeatureSet::default()),
votes_only,
SimpleAddressLoader::Disabled,
)
});
assert_eq!(2, txs.count());
}
// packets with all votes
{
let vote_indexes = vec![0, 1, 2];
let packet_vector = make_test_packets(
vec![vote_tx.clone(), vote_tx.clone(), vote_tx],
vote_indexes,
);
let mut votes_only = false;
let txs = packet_vector.iter().filter_map(|tx| {
transaction_from_deserialized_packet(
tx.immutable_section(),
&Arc::new(FeatureSet::default()),
votes_only,
SimpleAddressLoader::Disabled,
)
});
assert_eq!(3, txs.count());
votes_only = true;
let txs = packet_vector.iter().filter_map(|tx| {
transaction_from_deserialized_packet(
tx.immutable_section(),
&Arc::new(FeatureSet::default()),
votes_only,
SimpleAddressLoader::Disabled,
)
});
assert_eq!(3, txs.count());
}
}
}

View File

@ -77,7 +77,7 @@ impl CostTracker {
}
}
// bench tests needs to reset limits
/// allows to adjust limits initiated during construction
pub fn set_limits(
&mut self,
account_cost_limit: u64,
@ -95,6 +95,18 @@ impl CostTracker {
Ok(self.block_cost)
}
/// Using user requested compute-units to track cost.
pub fn try_add_requested_cus(
&mut self,
write_lock_accounts: &[Pubkey],
requested_cus: u64,
is_vote: bool,
) -> Result<u64, CostTrackerError> {
self.would_fit_internal(write_lock_accounts.iter(), requested_cus, is_vote, 0)?;
self.add_transaction_cost_internal(write_lock_accounts.iter(), requested_cus, is_vote, 0);
Ok(self.block_cost)
}
pub fn update_execution_cost(
&mut self,
estimated_tx_cost: &TransactionCost,
@ -165,9 +177,22 @@ impl CostTracker {
}
fn would_fit(&self, tx_cost: &TransactionCost) -> Result<(), CostTrackerError> {
let writable_accounts = &tx_cost.writable_accounts;
let cost = tx_cost.sum();
let vote_cost = if tx_cost.is_simple_vote { cost } else { 0 };
self.would_fit_internal(
tx_cost.writable_accounts.iter(),
tx_cost.sum(),
tx_cost.is_simple_vote,
tx_cost.account_data_size,
)
}
fn would_fit_internal<'a>(
&self,
write_lock_accounts: impl Iterator<Item = &'a Pubkey>,
cost: u64,
is_vote: bool,
account_data_size: u64,
) -> Result<(), CostTrackerError> {
let vote_cost = if is_vote { cost } else { 0 };
// check against the total package cost
if self.block_cost.saturating_add(cost) > self.block_cost_limit {
@ -186,9 +211,7 @@ impl CostTracker {
// NOTE: Check if the total accounts data size is exceeded *before* the block accounts data
// size. This way, transactions are not unnecessarily retried.
let account_data_size = self
.account_data_size
.saturating_add(tx_cost.account_data_size);
let account_data_size = self.account_data_size.saturating_add(account_data_size);
if let Some(account_data_size_limit) = self.account_data_size_limit {
if account_data_size > account_data_size_limit {
return Err(CostTrackerError::WouldExceedAccountDataTotalLimit);
@ -200,7 +223,7 @@ impl CostTracker {
}
// check each account against account_cost_limit,
for account_key in writable_accounts.iter() {
for account_key in write_lock_accounts {
match self.cost_by_writable_accounts.get(account_key) {
Some(chained_cost) => {
if chained_cost.saturating_add(cost) > self.account_cost_limit {
@ -217,9 +240,23 @@ impl CostTracker {
}
fn add_transaction_cost(&mut self, tx_cost: &TransactionCost) {
let cost = tx_cost.sum();
self.add_transaction_execution_cost(tx_cost, cost);
saturating_add_assign!(self.account_data_size, tx_cost.account_data_size);
self.add_transaction_cost_internal(
tx_cost.writable_accounts.iter(),
tx_cost.sum(),
tx_cost.is_simple_vote,
tx_cost.account_data_size,
)
}
fn add_transaction_cost_internal<'a>(
&mut self,
write_lock_accounts: impl Iterator<Item = &'a Pubkey>,
cost: u64,
is_vote: bool,
account_data_size: u64,
) {
self.add_transaction_execution_cost_internal(write_lock_accounts, is_vote, cost);
saturating_add_assign!(self.account_data_size, account_data_size);
saturating_add_assign!(self.transaction_count, 1);
}
@ -234,7 +271,20 @@ impl CostTracker {
/// Apply additional actual execution units to cost_tracker
fn add_transaction_execution_cost(&mut self, tx_cost: &TransactionCost, adjustment: u64) {
for account_key in tx_cost.writable_accounts.iter() {
self.add_transaction_execution_cost_internal(
tx_cost.writable_accounts.iter(),
tx_cost.is_simple_vote,
adjustment,
)
}
fn add_transaction_execution_cost_internal<'a>(
&mut self,
write_lock_accounts: impl Iterator<Item = &'a Pubkey>,
is_vote: bool,
adjustment: u64,
) {
for account_key in write_lock_accounts {
let account_cost = self
.cost_by_writable_accounts
.entry(*account_key)
@ -242,7 +292,7 @@ impl CostTracker {
*account_cost = account_cost.saturating_add(adjustment);
}
self.block_cost = self.block_cost.saturating_add(adjustment);
if tx_cost.is_simple_vote {
if is_vote {
self.vote_cost = self.vote_cost.saturating_add(adjustment);
}
}