Do not use UdpTpuConnection to forward votes (#26082)
* Do not use UdpTpuConnection to forward votes * fix tests
This commit is contained in:
parent
a9069244f5
commit
e344c8476f
|
@ -12,16 +12,14 @@ use {
|
||||||
tracer_packet_stats::TracerPacketStats,
|
tracer_packet_stats::TracerPacketStats,
|
||||||
unprocessed_packet_batches::{self, *},
|
unprocessed_packet_batches::{self, *},
|
||||||
},
|
},
|
||||||
|
core::iter::repeat,
|
||||||
crossbeam_channel::{
|
crossbeam_channel::{
|
||||||
Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender,
|
Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender,
|
||||||
},
|
},
|
||||||
histogram::Histogram,
|
histogram::Histogram,
|
||||||
itertools::Itertools,
|
itertools::Itertools,
|
||||||
min_max_heap::MinMaxHeap,
|
min_max_heap::MinMaxHeap,
|
||||||
solana_client::{
|
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
|
||||||
connection_cache::ConnectionCache, tpu_connection::TpuConnection,
|
|
||||||
udp_client::UdpTpuConnection,
|
|
||||||
},
|
|
||||||
solana_entry::entry::hash_transactions,
|
solana_entry::entry::hash_transactions,
|
||||||
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
|
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
|
||||||
solana_ledger::blockstore_processor::TransactionStatusSender,
|
solana_ledger::blockstore_processor::TransactionStatusSender,
|
||||||
|
@ -59,6 +57,7 @@ use {
|
||||||
},
|
},
|
||||||
transport::TransportError,
|
transport::TransportError,
|
||||||
},
|
},
|
||||||
|
solana_streamer::sendmmsg::batch_send,
|
||||||
solana_transaction_status::token_balances::{
|
solana_transaction_status::token_balances::{
|
||||||
collect_token_balances, TransactionTokenBalancesSet,
|
collect_token_balances, TransactionTokenBalancesSet,
|
||||||
},
|
},
|
||||||
|
@ -66,7 +65,7 @@ use {
|
||||||
cmp,
|
cmp,
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
env,
|
env,
|
||||||
net::SocketAddr,
|
net::{SocketAddr, UdpSocket},
|
||||||
rc::Rc,
|
rc::Rc,
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicU64, AtomicUsize, Ordering},
|
atomic::{AtomicU64, AtomicUsize, Ordering},
|
||||||
|
@ -539,6 +538,7 @@ impl BankingStage {
|
||||||
forward_option: &ForwardOption,
|
forward_option: &ForwardOption,
|
||||||
cluster_info: &ClusterInfo,
|
cluster_info: &ClusterInfo,
|
||||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||||
|
socket: &UdpSocket,
|
||||||
filter_forwarding_results: &FilterForwardingResults,
|
filter_forwarding_results: &FilterForwardingResults,
|
||||||
data_budget: &DataBudget,
|
data_budget: &DataBudget,
|
||||||
banking_stage_stats: &BankingStageStats,
|
banking_stage_stats: &BankingStageStats,
|
||||||
|
@ -600,21 +600,22 @@ impl BankingStage {
|
||||||
|
|
||||||
let mut measure = Measure::start("banking_stage-forward-us");
|
let mut measure = Measure::start("banking_stage-forward-us");
|
||||||
|
|
||||||
let conn = if let ForwardOption::ForwardTpuVote = forward_option {
|
let res = if let ForwardOption::ForwardTpuVote = forward_option {
|
||||||
// The vote must be forwarded using only UDP. Let's get the UDP connection.
|
// The vote must be forwarded using only UDP.
|
||||||
banking_stage_stats
|
banking_stage_stats
|
||||||
.forwarded_vote_count
|
.forwarded_vote_count
|
||||||
.fetch_add(packet_vec_len, Ordering::Relaxed);
|
.fetch_add(packet_vec_len, Ordering::Relaxed);
|
||||||
Arc::new(UdpTpuConnection::new_from_addr(addr).into())
|
let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(addr)).collect();
|
||||||
|
batch_send(socket, &pkts).map_err(|err| err.into())
|
||||||
} else {
|
} else {
|
||||||
// All other transactions can be forwarded using QUIC, get_connection() will use
|
// All other transactions can be forwarded using QUIC, get_connection() will use
|
||||||
// system wide setting to pick the correct connection object.
|
// system wide setting to pick the correct connection object.
|
||||||
banking_stage_stats
|
banking_stage_stats
|
||||||
.forwarded_transaction_count
|
.forwarded_transaction_count
|
||||||
.fetch_add(packet_vec_len, Ordering::Relaxed);
|
.fetch_add(packet_vec_len, Ordering::Relaxed);
|
||||||
connection_cache.get_connection(&addr)
|
let conn = connection_cache.get_connection(&addr);
|
||||||
|
conn.send_wire_transaction_batch_async(packet_vec)
|
||||||
};
|
};
|
||||||
let res = conn.send_wire_transaction_batch_async(packet_vec);
|
|
||||||
|
|
||||||
measure.stop();
|
measure.stop();
|
||||||
inc_new_counter_info!(
|
inc_new_counter_info!(
|
||||||
|
@ -903,6 +904,7 @@ impl BankingStage {
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
fn process_buffered_packets(
|
fn process_buffered_packets(
|
||||||
my_pubkey: &Pubkey,
|
my_pubkey: &Pubkey,
|
||||||
|
socket: &UdpSocket,
|
||||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||||
cluster_info: &ClusterInfo,
|
cluster_info: &ClusterInfo,
|
||||||
buffered_packet_batches: &mut UnprocessedPacketBatches,
|
buffered_packet_batches: &mut UnprocessedPacketBatches,
|
||||||
|
@ -988,6 +990,7 @@ impl BankingStage {
|
||||||
cluster_info,
|
cluster_info,
|
||||||
buffered_packet_batches,
|
buffered_packet_batches,
|
||||||
poh_recorder,
|
poh_recorder,
|
||||||
|
socket,
|
||||||
false,
|
false,
|
||||||
data_budget,
|
data_budget,
|
||||||
slot_metrics_tracker,
|
slot_metrics_tracker,
|
||||||
|
@ -1009,6 +1012,7 @@ impl BankingStage {
|
||||||
cluster_info,
|
cluster_info,
|
||||||
buffered_packet_batches,
|
buffered_packet_batches,
|
||||||
poh_recorder,
|
poh_recorder,
|
||||||
|
socket,
|
||||||
true,
|
true,
|
||||||
data_budget,
|
data_budget,
|
||||||
slot_metrics_tracker,
|
slot_metrics_tracker,
|
||||||
|
@ -1032,6 +1036,7 @@ impl BankingStage {
|
||||||
cluster_info: &ClusterInfo,
|
cluster_info: &ClusterInfo,
|
||||||
buffered_packet_batches: &mut UnprocessedPacketBatches,
|
buffered_packet_batches: &mut UnprocessedPacketBatches,
|
||||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||||
|
socket: &UdpSocket,
|
||||||
hold: bool,
|
hold: bool,
|
||||||
data_budget: &DataBudget,
|
data_budget: &DataBudget,
|
||||||
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
|
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
|
||||||
|
@ -1055,6 +1060,7 @@ impl BankingStage {
|
||||||
forward_option,
|
forward_option,
|
||||||
cluster_info,
|
cluster_info,
|
||||||
poh_recorder,
|
poh_recorder,
|
||||||
|
socket,
|
||||||
&filter_forwarding_result,
|
&filter_forwarding_result,
|
||||||
data_budget,
|
data_budget,
|
||||||
banking_stage_stats,
|
banking_stage_stats,
|
||||||
|
@ -1105,6 +1111,7 @@ impl BankingStage {
|
||||||
connection_cache: Arc<ConnectionCache>,
|
connection_cache: Arc<ConnectionCache>,
|
||||||
) {
|
) {
|
||||||
let recorder = poh_recorder.lock().unwrap().recorder();
|
let recorder = poh_recorder.lock().unwrap().recorder();
|
||||||
|
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let mut buffered_packet_batches = UnprocessedPacketBatches::with_capacity(batch_limit);
|
let mut buffered_packet_batches = UnprocessedPacketBatches::with_capacity(batch_limit);
|
||||||
let mut banking_stage_stats = BankingStageStats::new(id);
|
let mut banking_stage_stats = BankingStageStats::new(id);
|
||||||
let mut tracer_packet_stats = TracerPacketStats::new(id);
|
let mut tracer_packet_stats = TracerPacketStats::new(id);
|
||||||
|
@ -1121,6 +1128,7 @@ impl BankingStage {
|
||||||
let (_, process_buffered_packets_time) = measure!(
|
let (_, process_buffered_packets_time) = measure!(
|
||||||
Self::process_buffered_packets(
|
Self::process_buffered_packets(
|
||||||
&my_pubkey,
|
&my_pubkey,
|
||||||
|
&socket,
|
||||||
poh_recorder,
|
poh_recorder,
|
||||||
cluster_info,
|
cluster_info,
|
||||||
&mut buffered_packet_batches,
|
&mut buffered_packet_batches,
|
||||||
|
@ -4160,6 +4168,7 @@ mod tests {
|
||||||
];
|
];
|
||||||
|
|
||||||
let connection_cache = ConnectionCache::default();
|
let connection_cache = ConnectionCache::default();
|
||||||
|
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
for (name, data_budget, expected_num_forwarded) in test_cases {
|
for (name, data_budget, expected_num_forwarded) in test_cases {
|
||||||
let mut unprocessed_packet_batches: UnprocessedPacketBatches =
|
let mut unprocessed_packet_batches: UnprocessedPacketBatches =
|
||||||
UnprocessedPacketBatches::from_iter(
|
UnprocessedPacketBatches::from_iter(
|
||||||
|
@ -4172,6 +4181,7 @@ mod tests {
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&mut unprocessed_packet_batches,
|
&mut unprocessed_packet_batches,
|
||||||
&poh_recorder,
|
&poh_recorder,
|
||||||
|
&socket,
|
||||||
true,
|
true,
|
||||||
&data_budget,
|
&data_budget,
|
||||||
&mut LeaderSlotMetricsTracker::new(0),
|
&mut LeaderSlotMetricsTracker::new(0),
|
||||||
|
@ -4277,6 +4287,7 @@ mod tests {
|
||||||
),
|
),
|
||||||
];
|
];
|
||||||
|
|
||||||
|
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
for (name, forward_option, hold, expected_ids, expected_num_unprocessed) in test_cases {
|
for (name, forward_option, hold, expected_ids, expected_num_unprocessed) in test_cases {
|
||||||
let stats = BankingStageStats::default();
|
let stats = BankingStageStats::default();
|
||||||
BankingStage::handle_forwarding(
|
BankingStage::handle_forwarding(
|
||||||
|
@ -4284,6 +4295,7 @@ mod tests {
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&mut unprocessed_packet_batches,
|
&mut unprocessed_packet_batches,
|
||||||
&poh_recorder,
|
&poh_recorder,
|
||||||
|
&socket,
|
||||||
hold,
|
hold,
|
||||||
&DataBudget::default(),
|
&DataBudget::default(),
|
||||||
&mut LeaderSlotMetricsTracker::new(0),
|
&mut LeaderSlotMetricsTracker::new(0),
|
||||||
|
|
Loading…
Reference in New Issue