BankingStage Refactor: Add state to Forwarder (#29403)
This commit is contained in:
parent
ae7803a55e
commit
8fa396a321
|
@ -35,7 +35,7 @@ use {
|
||||||
solana_ledger::{
|
solana_ledger::{
|
||||||
blockstore_processor::TransactionStatusSender, token_balances::collect_token_balances,
|
blockstore_processor::TransactionStatusSender, token_balances::collect_token_balances,
|
||||||
},
|
},
|
||||||
solana_measure::{measure, measure::Measure},
|
solana_measure::{measure, measure::Measure, measure_us},
|
||||||
solana_metrics::inc_new_counter_info,
|
solana_metrics::inc_new_counter_info,
|
||||||
solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH},
|
solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH},
|
||||||
solana_poh::poh_recorder::{BankStart, PohRecorder, PohRecorderError, TransactionRecorder},
|
solana_poh::poh_recorder::{BankStart, PohRecorder, PohRecorderError, TransactionRecorder},
|
||||||
|
@ -59,7 +59,6 @@ use {
|
||||||
cmp,
|
cmp,
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
env,
|
env,
|
||||||
net::UdpSocket,
|
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicU64, AtomicUsize, Ordering},
|
atomic::{AtomicU64, AtomicUsize, Ordering},
|
||||||
Arc, RwLock,
|
Arc, RwLock,
|
||||||
|
@ -454,14 +453,17 @@ impl BankingStage {
|
||||||
|
|
||||||
let mut packet_deserializer = PacketDeserializer::new(packet_receiver);
|
let mut packet_deserializer = PacketDeserializer::new(packet_receiver);
|
||||||
let poh_recorder = poh_recorder.clone();
|
let poh_recorder = poh_recorder.clone();
|
||||||
let cluster_info = cluster_info.clone();
|
|
||||||
let transaction_status_sender = transaction_status_sender.clone();
|
let transaction_status_sender = transaction_status_sender.clone();
|
||||||
let replay_vote_sender = replay_vote_sender.clone();
|
let replay_vote_sender = replay_vote_sender.clone();
|
||||||
let data_budget = data_budget.clone();
|
|
||||||
let connection_cache = connection_cache.clone();
|
|
||||||
let bank_forks = bank_forks.clone();
|
|
||||||
|
|
||||||
let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
|
let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
|
||||||
|
let forwarder = Forwarder::new(
|
||||||
|
poh_recorder.clone(),
|
||||||
|
bank_forks.clone(),
|
||||||
|
cluster_info.clone(),
|
||||||
|
connection_cache.clone(),
|
||||||
|
data_budget.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name(format!("solBanknStgTx{i:02}"))
|
.name(format!("solBanknStgTx{i:02}"))
|
||||||
|
@ -469,15 +471,12 @@ impl BankingStage {
|
||||||
Self::process_loop(
|
Self::process_loop(
|
||||||
&mut packet_deserializer,
|
&mut packet_deserializer,
|
||||||
&decision_maker,
|
&decision_maker,
|
||||||
|
&forwarder,
|
||||||
&poh_recorder,
|
&poh_recorder,
|
||||||
&cluster_info,
|
|
||||||
i,
|
i,
|
||||||
transaction_status_sender,
|
transaction_status_sender,
|
||||||
replay_vote_sender,
|
replay_vote_sender,
|
||||||
&data_budget,
|
|
||||||
log_messages_bytes_limit,
|
log_messages_bytes_limit,
|
||||||
connection_cache,
|
|
||||||
&bank_forks,
|
|
||||||
unprocessed_transaction_storage,
|
unprocessed_transaction_storage,
|
||||||
);
|
);
|
||||||
})
|
})
|
||||||
|
@ -634,21 +633,16 @@ impl BankingStage {
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
fn process_buffered_packets(
|
fn process_buffered_packets(
|
||||||
decision_maker: &DecisionMaker,
|
decision_maker: &DecisionMaker,
|
||||||
socket: &UdpSocket,
|
forwarder: &Forwarder,
|
||||||
poh_recorder: &Arc<RwLock<PohRecorder>>,
|
|
||||||
cluster_info: &ClusterInfo,
|
|
||||||
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
|
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
|
||||||
transaction_status_sender: &Option<TransactionStatusSender>,
|
transaction_status_sender: &Option<TransactionStatusSender>,
|
||||||
replay_vote_sender: &ReplayVoteSender,
|
replay_vote_sender: &ReplayVoteSender,
|
||||||
banking_stage_stats: &BankingStageStats,
|
banking_stage_stats: &BankingStageStats,
|
||||||
recorder: &TransactionRecorder,
|
recorder: &TransactionRecorder,
|
||||||
data_budget: &DataBudget,
|
|
||||||
qos_service: &QosService,
|
qos_service: &QosService,
|
||||||
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
|
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
|
||||||
log_messages_bytes_limit: Option<usize>,
|
log_messages_bytes_limit: Option<usize>,
|
||||||
connection_cache: &ConnectionCache,
|
|
||||||
tracer_packet_stats: &mut TracerPacketStats,
|
tracer_packet_stats: &mut TracerPacketStats,
|
||||||
bank_forks: &Arc<RwLock<BankForks>>,
|
|
||||||
) {
|
) {
|
||||||
if unprocessed_transaction_storage.should_not_process() {
|
if unprocessed_transaction_storage.should_not_process() {
|
||||||
return;
|
return;
|
||||||
|
@ -683,45 +677,27 @@ impl BankingStage {
|
||||||
.increment_consume_buffered_packets_us(consume_buffered_packets_time.as_us());
|
.increment_consume_buffered_packets_us(consume_buffered_packets_time.as_us());
|
||||||
}
|
}
|
||||||
BufferedPacketsDecision::Forward => {
|
BufferedPacketsDecision::Forward => {
|
||||||
let (_, forward_time) = measure!(
|
let ((), forward_us) = measure_us!(forwarder.handle_forwarding(
|
||||||
Forwarder::handle_forwarding(
|
|
||||||
cluster_info,
|
|
||||||
unprocessed_transaction_storage,
|
unprocessed_transaction_storage,
|
||||||
poh_recorder,
|
|
||||||
socket,
|
|
||||||
false,
|
false,
|
||||||
data_budget,
|
|
||||||
slot_metrics_tracker,
|
slot_metrics_tracker,
|
||||||
banking_stage_stats,
|
banking_stage_stats,
|
||||||
connection_cache,
|
|
||||||
tracer_packet_stats,
|
tracer_packet_stats,
|
||||||
bank_forks,
|
));
|
||||||
),
|
slot_metrics_tracker.increment_forward_us(forward_us);
|
||||||
"forward",
|
|
||||||
);
|
|
||||||
slot_metrics_tracker.increment_forward_us(forward_time.as_us());
|
|
||||||
// Take metrics action after forwarding packets to include forwarded
|
// Take metrics action after forwarding packets to include forwarded
|
||||||
// metrics into current slot
|
// metrics into current slot
|
||||||
slot_metrics_tracker.apply_action(metrics_action);
|
slot_metrics_tracker.apply_action(metrics_action);
|
||||||
}
|
}
|
||||||
BufferedPacketsDecision::ForwardAndHold => {
|
BufferedPacketsDecision::ForwardAndHold => {
|
||||||
let (_, forward_and_hold_time) = measure!(
|
let ((), forward_and_hold_us) = measure_us!(forwarder.handle_forwarding(
|
||||||
Forwarder::handle_forwarding(
|
|
||||||
cluster_info,
|
|
||||||
unprocessed_transaction_storage,
|
unprocessed_transaction_storage,
|
||||||
poh_recorder,
|
|
||||||
socket,
|
|
||||||
true,
|
true,
|
||||||
data_budget,
|
|
||||||
slot_metrics_tracker,
|
slot_metrics_tracker,
|
||||||
banking_stage_stats,
|
banking_stage_stats,
|
||||||
connection_cache,
|
|
||||||
tracer_packet_stats,
|
tracer_packet_stats,
|
||||||
bank_forks,
|
));
|
||||||
),
|
slot_metrics_tracker.increment_forward_and_hold_us(forward_and_hold_us);
|
||||||
"forward_and_hold",
|
|
||||||
);
|
|
||||||
slot_metrics_tracker.increment_forward_and_hold_us(forward_and_hold_time.as_us());
|
|
||||||
// Take metrics action after forwarding packets
|
// Take metrics action after forwarding packets
|
||||||
slot_metrics_tracker.apply_action(metrics_action);
|
slot_metrics_tracker.apply_action(metrics_action);
|
||||||
}
|
}
|
||||||
|
@ -733,19 +709,15 @@ impl BankingStage {
|
||||||
fn process_loop(
|
fn process_loop(
|
||||||
packet_deserializer: &mut PacketDeserializer,
|
packet_deserializer: &mut PacketDeserializer,
|
||||||
decision_maker: &DecisionMaker,
|
decision_maker: &DecisionMaker,
|
||||||
|
forwarder: &Forwarder,
|
||||||
poh_recorder: &Arc<RwLock<PohRecorder>>,
|
poh_recorder: &Arc<RwLock<PohRecorder>>,
|
||||||
cluster_info: &ClusterInfo,
|
|
||||||
id: u32,
|
id: u32,
|
||||||
transaction_status_sender: Option<TransactionStatusSender>,
|
transaction_status_sender: Option<TransactionStatusSender>,
|
||||||
replay_vote_sender: ReplayVoteSender,
|
replay_vote_sender: ReplayVoteSender,
|
||||||
data_budget: &DataBudget,
|
|
||||||
log_messages_bytes_limit: Option<usize>,
|
log_messages_bytes_limit: Option<usize>,
|
||||||
connection_cache: Arc<ConnectionCache>,
|
|
||||||
bank_forks: &Arc<RwLock<BankForks>>,
|
|
||||||
mut unprocessed_transaction_storage: UnprocessedTransactionStorage,
|
mut unprocessed_transaction_storage: UnprocessedTransactionStorage,
|
||||||
) {
|
) {
|
||||||
let recorder = poh_recorder.read().unwrap().recorder();
|
let recorder = poh_recorder.read().unwrap().recorder();
|
||||||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
|
||||||
let mut banking_stage_stats = BankingStageStats::new(id);
|
let mut banking_stage_stats = BankingStageStats::new(id);
|
||||||
let mut tracer_packet_stats = TracerPacketStats::new(id);
|
let mut tracer_packet_stats = TracerPacketStats::new(id);
|
||||||
let qos_service = QosService::new(id);
|
let qos_service = QosService::new(id);
|
||||||
|
@ -760,21 +732,16 @@ impl BankingStage {
|
||||||
let (_, process_buffered_packets_time) = measure!(
|
let (_, process_buffered_packets_time) = measure!(
|
||||||
Self::process_buffered_packets(
|
Self::process_buffered_packets(
|
||||||
decision_maker,
|
decision_maker,
|
||||||
&socket,
|
forwarder,
|
||||||
poh_recorder,
|
|
||||||
cluster_info,
|
|
||||||
&mut unprocessed_transaction_storage,
|
&mut unprocessed_transaction_storage,
|
||||||
&transaction_status_sender,
|
&transaction_status_sender,
|
||||||
&replay_vote_sender,
|
&replay_vote_sender,
|
||||||
&banking_stage_stats,
|
&banking_stage_stats,
|
||||||
&recorder,
|
&recorder,
|
||||||
data_budget,
|
|
||||||
&qos_service,
|
&qos_service,
|
||||||
&mut slot_metrics_tracker,
|
&mut slot_metrics_tracker,
|
||||||
log_messages_bytes_limit,
|
log_messages_bytes_limit,
|
||||||
&connection_cache,
|
|
||||||
&mut tracer_packet_stats,
|
&mut tracer_packet_stats,
|
||||||
bank_forks,
|
|
||||||
),
|
),
|
||||||
"process_buffered_packets",
|
"process_buffered_packets",
|
||||||
);
|
);
|
||||||
|
|
|
@ -22,28 +22,46 @@ use {
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct Forwarder;
|
pub(crate) struct Forwarder {
|
||||||
|
poh_recorder: Arc<RwLock<PohRecorder>>,
|
||||||
|
bank_forks: Arc<RwLock<BankForks>>,
|
||||||
|
socket: UdpSocket,
|
||||||
|
cluster_info: Arc<ClusterInfo>,
|
||||||
|
connection_cache: Arc<ConnectionCache>,
|
||||||
|
data_budget: Arc<DataBudget>,
|
||||||
|
}
|
||||||
|
|
||||||
impl Forwarder {
|
impl Forwarder {
|
||||||
#[allow(clippy::too_many_arguments)]
|
pub(crate) fn new(
|
||||||
pub fn handle_forwarding(
|
poh_recorder: Arc<RwLock<PohRecorder>>,
|
||||||
cluster_info: &ClusterInfo,
|
bank_forks: Arc<RwLock<BankForks>>,
|
||||||
|
cluster_info: Arc<ClusterInfo>,
|
||||||
|
connection_cache: Arc<ConnectionCache>,
|
||||||
|
data_budget: Arc<DataBudget>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
poh_recorder,
|
||||||
|
bank_forks,
|
||||||
|
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
|
||||||
|
cluster_info,
|
||||||
|
connection_cache,
|
||||||
|
data_budget,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn handle_forwarding(
|
||||||
|
&self,
|
||||||
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
|
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
|
||||||
poh_recorder: &Arc<RwLock<PohRecorder>>,
|
|
||||||
socket: &UdpSocket,
|
|
||||||
hold: bool,
|
hold: bool,
|
||||||
data_budget: &DataBudget,
|
|
||||||
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
|
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
|
||||||
banking_stage_stats: &BankingStageStats,
|
banking_stage_stats: &BankingStageStats,
|
||||||
connection_cache: &ConnectionCache,
|
|
||||||
tracer_packet_stats: &mut TracerPacketStats,
|
tracer_packet_stats: &mut TracerPacketStats,
|
||||||
bank_forks: &Arc<RwLock<BankForks>>,
|
|
||||||
) {
|
) {
|
||||||
let forward_option = unprocessed_transaction_storage.forward_option();
|
let forward_option = unprocessed_transaction_storage.forward_option();
|
||||||
|
|
||||||
// get current root bank from bank_forks, use it to sanitize transaction and
|
// get current root bank from bank_forks, use it to sanitize transaction and
|
||||||
// load all accounts from address loader;
|
// load all accounts from address loader;
|
||||||
let current_bank = bank_forks.read().unwrap().root_bank();
|
let current_bank = self.bank_forks.read().unwrap().root_bank();
|
||||||
|
|
||||||
let mut forward_packet_batches_by_accounts =
|
let mut forward_packet_batches_by_accounts =
|
||||||
ForwardPacketBatchesByAccounts::new_with_default_batch_limits();
|
ForwardPacketBatchesByAccounts::new_with_default_batch_limits();
|
||||||
|
@ -76,15 +94,10 @@ impl Forwarder {
|
||||||
slot_metrics_tracker.increment_forwardable_batches_count(1);
|
slot_metrics_tracker.increment_forwardable_batches_count(1);
|
||||||
|
|
||||||
let batched_forwardable_packets_count = forward_batch.len();
|
let batched_forwardable_packets_count = forward_batch.len();
|
||||||
let (_forward_result, sucessful_forwarded_packets_count, leader_pubkey) =
|
let (_forward_result, sucessful_forwarded_packets_count, leader_pubkey) = self
|
||||||
Self::forward_buffered_packets(
|
.forward_buffered_packets(
|
||||||
connection_cache,
|
|
||||||
&forward_option,
|
&forward_option,
|
||||||
cluster_info,
|
|
||||||
poh_recorder,
|
|
||||||
socket,
|
|
||||||
forward_batch.get_forwardable_packets(),
|
forward_batch.get_forwardable_packets(),
|
||||||
data_budget,
|
|
||||||
banking_stage_stats,
|
banking_stage_stats,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -125,13 +138,9 @@ impl Forwarder {
|
||||||
/// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns
|
/// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns
|
||||||
/// the number of successfully forwarded packets in second part of tuple
|
/// the number of successfully forwarded packets in second part of tuple
|
||||||
fn forward_buffered_packets<'a>(
|
fn forward_buffered_packets<'a>(
|
||||||
connection_cache: &ConnectionCache,
|
&self,
|
||||||
forward_option: &ForwardOption,
|
forward_option: &ForwardOption,
|
||||||
cluster_info: &ClusterInfo,
|
|
||||||
poh_recorder: &Arc<RwLock<PohRecorder>>,
|
|
||||||
socket: &UdpSocket,
|
|
||||||
forwardable_packets: impl Iterator<Item = &'a Packet>,
|
forwardable_packets: impl Iterator<Item = &'a Packet>,
|
||||||
data_budget: &DataBudget,
|
|
||||||
banking_stage_stats: &BankingStageStats,
|
banking_stage_stats: &BankingStageStats,
|
||||||
) -> (
|
) -> (
|
||||||
std::result::Result<(), TransportError>,
|
std::result::Result<(), TransportError>,
|
||||||
|
@ -141,10 +150,12 @@ impl Forwarder {
|
||||||
let leader_and_addr = match forward_option {
|
let leader_and_addr = match forward_option {
|
||||||
ForwardOption::NotForward => return (Ok(()), 0, None),
|
ForwardOption::NotForward => return (Ok(()), 0, None),
|
||||||
ForwardOption::ForwardTransaction => {
|
ForwardOption::ForwardTransaction => {
|
||||||
next_leader_tpu_forwards(cluster_info, poh_recorder)
|
next_leader_tpu_forwards(&self.cluster_info, &self.poh_recorder)
|
||||||
}
|
}
|
||||||
|
|
||||||
ForwardOption::ForwardTpuVote => next_leader_tpu_vote(cluster_info, poh_recorder),
|
ForwardOption::ForwardTpuVote => {
|
||||||
|
next_leader_tpu_vote(&self.cluster_info, &self.poh_recorder)
|
||||||
|
}
|
||||||
};
|
};
|
||||||
let (leader_pubkey, addr) = match leader_and_addr {
|
let (leader_pubkey, addr) = match leader_and_addr {
|
||||||
Some(leader_and_addr) => leader_and_addr,
|
Some(leader_and_addr) => leader_and_addr,
|
||||||
|
@ -156,7 +167,7 @@ impl Forwarder {
|
||||||
const MAX_BYTES_PER_SECOND: usize = 12_000_000;
|
const MAX_BYTES_PER_SECOND: usize = 12_000_000;
|
||||||
const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000;
|
const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000;
|
||||||
const MAX_BYTES_BUDGET: usize = MAX_BYTES_PER_INTERVAL * 5;
|
const MAX_BYTES_BUDGET: usize = MAX_BYTES_PER_INTERVAL * 5;
|
||||||
data_budget.update(INTERVAL_MS, |bytes| {
|
self.data_budget.update(INTERVAL_MS, |bytes| {
|
||||||
std::cmp::min(
|
std::cmp::min(
|
||||||
bytes.saturating_add(MAX_BYTES_PER_INTERVAL),
|
bytes.saturating_add(MAX_BYTES_PER_INTERVAL),
|
||||||
MAX_BYTES_BUDGET,
|
MAX_BYTES_BUDGET,
|
||||||
|
@ -165,7 +176,7 @@ impl Forwarder {
|
||||||
|
|
||||||
let packet_vec: Vec<_> = forwardable_packets
|
let packet_vec: Vec<_> = forwardable_packets
|
||||||
.filter_map(|p| {
|
.filter_map(|p| {
|
||||||
if !p.meta().forwarded() && data_budget.take(p.meta().size) {
|
if !p.meta().forwarded() && self.data_budget.take(p.meta().size) {
|
||||||
Some(p.data(..)?.to_vec())
|
Some(p.data(..)?.to_vec())
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
|
@ -189,14 +200,14 @@ impl Forwarder {
|
||||||
.forwarded_vote_count
|
.forwarded_vote_count
|
||||||
.fetch_add(packet_vec_len, Ordering::Relaxed);
|
.fetch_add(packet_vec_len, Ordering::Relaxed);
|
||||||
let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(addr)).collect();
|
let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(addr)).collect();
|
||||||
batch_send(socket, &pkts).map_err(|err| err.into())
|
batch_send(&self.socket, &pkts).map_err(|err| err.into())
|
||||||
} else {
|
} else {
|
||||||
// All other transactions can be forwarded using QUIC, get_connection() will use
|
// All other transactions can be forwarded using QUIC, get_connection() will use
|
||||||
// system wide setting to pick the correct connection object.
|
// system wide setting to pick the correct connection object.
|
||||||
banking_stage_stats
|
banking_stage_stats
|
||||||
.forwarded_transaction_count
|
.forwarded_transaction_count
|
||||||
.fetch_add(packet_vec_len, Ordering::Relaxed);
|
.fetch_add(packet_vec_len, Ordering::Relaxed);
|
||||||
let conn = connection_cache.get_connection(&addr);
|
let conn = self.connection_cache.get_connection(&addr);
|
||||||
conn.send_data_batch_async(packet_vec)
|
conn.send_data_batch_async(packet_vec)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -280,37 +291,36 @@ mod tests {
|
||||||
create_test_recorder(&bank, &blockstore, Some(poh_config), None);
|
create_test_recorder(&bank, &blockstore, Some(poh_config), None);
|
||||||
|
|
||||||
let (local_node, cluster_info) = new_test_cluster_info(Some(validator_keypair));
|
let (local_node, cluster_info) = new_test_cluster_info(Some(validator_keypair));
|
||||||
|
let cluster_info = Arc::new(cluster_info);
|
||||||
let recv_socket = &local_node.sockets.tpu_forwards[0];
|
let recv_socket = &local_node.sockets.tpu_forwards[0];
|
||||||
|
|
||||||
let test_cases = vec![
|
let test_cases = vec![
|
||||||
("budget-restricted", DataBudget::restricted(), 0),
|
("budget-restricted", DataBudget::restricted(), 0),
|
||||||
("budget-available", DataBudget::default(), 1),
|
("budget-available", DataBudget::default(), 1),
|
||||||
];
|
];
|
||||||
|
|
||||||
let connection_cache = ConnectionCache::default();
|
|
||||||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
|
||||||
for (name, data_budget, expected_num_forwarded) in test_cases {
|
for (name, data_budget, expected_num_forwarded) in test_cases {
|
||||||
|
let forwarder = Forwarder::new(
|
||||||
|
poh_recorder.clone(),
|
||||||
|
bank_forks.clone(),
|
||||||
|
cluster_info.clone(),
|
||||||
|
Arc::new(ConnectionCache::default()),
|
||||||
|
Arc::new(data_budget),
|
||||||
|
);
|
||||||
let unprocessed_packet_batches: UnprocessedPacketBatches =
|
let unprocessed_packet_batches: UnprocessedPacketBatches =
|
||||||
UnprocessedPacketBatches::from_iter(
|
UnprocessedPacketBatches::from_iter(
|
||||||
vec![deserialized_packet.clone()].into_iter(),
|
vec![deserialized_packet.clone()].into_iter(),
|
||||||
1,
|
1,
|
||||||
);
|
);
|
||||||
let stats = BankingStageStats::default();
|
let stats = BankingStageStats::default();
|
||||||
Forwarder::handle_forwarding(
|
forwarder.handle_forwarding(
|
||||||
&cluster_info,
|
|
||||||
&mut UnprocessedTransactionStorage::new_transaction_storage(
|
&mut UnprocessedTransactionStorage::new_transaction_storage(
|
||||||
unprocessed_packet_batches,
|
unprocessed_packet_batches,
|
||||||
ThreadType::Transactions,
|
ThreadType::Transactions,
|
||||||
),
|
),
|
||||||
&poh_recorder,
|
|
||||||
&socket,
|
|
||||||
true,
|
true,
|
||||||
&data_budget,
|
|
||||||
&mut LeaderSlotMetricsTracker::new(0),
|
&mut LeaderSlotMetricsTracker::new(0),
|
||||||
&stats,
|
&stats,
|
||||||
&connection_cache,
|
|
||||||
&mut TracerPacketStats::new(0),
|
&mut TracerPacketStats::new(0),
|
||||||
&bank_forks,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
recv_socket
|
recv_socket
|
||||||
|
@ -393,21 +403,21 @@ mod tests {
|
||||||
("fwd-no-hold", false, vec![], 0),
|
("fwd-no-hold", false, vec![], 0),
|
||||||
];
|
];
|
||||||
|
|
||||||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let forwarder = Forwarder::new(
|
||||||
|
poh_recorder,
|
||||||
|
bank_forks,
|
||||||
|
Arc::new(cluster_info),
|
||||||
|
Arc::new(connection_cache),
|
||||||
|
Arc::new(DataBudget::default()),
|
||||||
|
);
|
||||||
for (name, hold, expected_ids, expected_num_unprocessed) in test_cases {
|
for (name, hold, expected_ids, expected_num_unprocessed) in test_cases {
|
||||||
let stats = BankingStageStats::default();
|
let stats = BankingStageStats::default();
|
||||||
Forwarder::handle_forwarding(
|
forwarder.handle_forwarding(
|
||||||
&cluster_info,
|
|
||||||
&mut unprocessed_packet_batches,
|
&mut unprocessed_packet_batches,
|
||||||
&poh_recorder,
|
|
||||||
&socket,
|
|
||||||
hold,
|
hold,
|
||||||
&DataBudget::default(),
|
|
||||||
&mut LeaderSlotMetricsTracker::new(0),
|
&mut LeaderSlotMetricsTracker::new(0),
|
||||||
&stats,
|
&stats,
|
||||||
&connection_cache,
|
|
||||||
&mut TracerPacketStats::new(0),
|
&mut TracerPacketStats::new(0),
|
||||||
&bank_forks,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
recv_socket
|
recv_socket
|
||||||
|
|
Loading…
Reference in New Issue