Fix forwarding of transactions over QUIC (#25674)
* Spawn QUIC server to receive forwarded txs * Update validator port range * forward votes using UDP * no forwarding from unstaked nodes * forwarding stats in banking stage * fix test builds * fix lifetime of forward sender
This commit is contained in:
parent
ed68c0b889
commit
1c2ae470c5
|
@ -19,7 +19,7 @@ pub struct UdpTpuConnection {
|
|||
}
|
||||
|
||||
impl UdpTpuConnection {
|
||||
pub fn new(tpu_addr: SocketAddr, _connection_stats: Arc<ConnectionCacheStats>) -> Self {
|
||||
pub fn new_from_addr(tpu_addr: SocketAddr) -> Self {
|
||||
let (_, client_socket) = solana_net_utils::bind_in_range(
|
||||
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
|
||||
VALIDATOR_PORT_RANGE,
|
||||
|
@ -31,6 +31,10 @@ impl UdpTpuConnection {
|
|||
addr: tpu_addr,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(tpu_addr: SocketAddr, _connection_stats: Arc<ConnectionCacheStats>) -> Self {
|
||||
Self::new_from_addr(tpu_addr)
|
||||
}
|
||||
}
|
||||
|
||||
impl TpuConnection for UdpTpuConnection {
|
||||
|
|
|
@ -7,7 +7,7 @@ mod tests {
|
|||
tpu_connection::TpuConnection,
|
||||
},
|
||||
solana_sdk::{packet::PACKET_DATA_SIZE, quic::QUIC_PORT_OFFSET, signature::Keypair},
|
||||
solana_streamer::quic::spawn_server,
|
||||
solana_streamer::quic::{spawn_server, StreamStats},
|
||||
std::{
|
||||
collections::HashMap,
|
||||
net::{SocketAddr, UdpSocket},
|
||||
|
@ -28,6 +28,7 @@ mod tests {
|
|||
let keypair = Keypair::new();
|
||||
let ip = "127.0.0.1".parse().unwrap();
|
||||
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
|
||||
let stats = Arc::new(StreamStats::default());
|
||||
let t = spawn_server(
|
||||
s.try_clone().unwrap(),
|
||||
&keypair,
|
||||
|
@ -38,6 +39,7 @@ mod tests {
|
|||
staked_nodes,
|
||||
10,
|
||||
10,
|
||||
stats,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
|
|
@ -17,7 +17,10 @@ use {
|
|||
histogram::Histogram,
|
||||
itertools::Itertools,
|
||||
min_max_heap::MinMaxHeap,
|
||||
solana_client::{connection_cache::get_connection, tpu_connection::TpuConnection},
|
||||
solana_client::{
|
||||
connection_cache::get_connection, tpu_connection::TpuConnection,
|
||||
udp_client::UdpTpuConnection,
|
||||
},
|
||||
solana_entry::entry::hash_transactions,
|
||||
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
|
||||
solana_ledger::blockstore_processor::TransactionStatusSender,
|
||||
|
@ -147,6 +150,8 @@ pub struct BankingStageStats {
|
|||
rebuffered_packets_count: AtomicUsize,
|
||||
consumed_buffered_packets_count: AtomicUsize,
|
||||
end_of_slot_filtered_invalid_count: AtomicUsize,
|
||||
forwarded_transaction_count: AtomicUsize,
|
||||
forwarded_vote_count: AtomicUsize,
|
||||
batch_packet_indexes_len: Histogram,
|
||||
|
||||
// Timing
|
||||
|
@ -201,6 +206,8 @@ impl BankingStageStats {
|
|||
.unprocessed_packet_conversion_elapsed
|
||||
.load(Ordering::Relaxed)
|
||||
+ self.transaction_processing_elapsed.load(Ordering::Relaxed)
|
||||
+ self.forwarded_transaction_count.load(Ordering::Relaxed) as u64
|
||||
+ self.forwarded_vote_count.load(Ordering::Relaxed) as u64
|
||||
+ self.batch_packet_indexes_len.entries()
|
||||
}
|
||||
|
||||
|
@ -264,6 +271,16 @@ impl BankingStageStats {
|
|||
.swap(0, Ordering::Relaxed) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"forwarded_transaction_count",
|
||||
self.forwarded_transaction_count.swap(0, Ordering::Relaxed) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"forwarded_vote_count",
|
||||
self.forwarded_vote_count.swap(0, Ordering::Relaxed) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"consume_buffered_packets_elapsed",
|
||||
self.consume_buffered_packets_elapsed
|
||||
|
@ -489,10 +506,26 @@ impl BankingStage {
|
|||
/// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns
|
||||
/// the number of successfully forwarded packets in second part of tuple
|
||||
fn forward_buffered_packets(
|
||||
tpu_forwards: &std::net::SocketAddr,
|
||||
forward_option: &ForwardOption,
|
||||
cluster_info: &ClusterInfo,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
packets: Vec<&Packet>,
|
||||
data_budget: &DataBudget,
|
||||
banking_stage_stats: &BankingStageStats,
|
||||
) -> (std::result::Result<(), TransportError>, usize) {
|
||||
let addr = match forward_option {
|
||||
ForwardOption::NotForward => return (Ok(()), 0),
|
||||
ForwardOption::ForwardTransaction => {
|
||||
next_leader_tpu_forwards(cluster_info, poh_recorder)
|
||||
}
|
||||
|
||||
ForwardOption::ForwardTpuVote => next_leader_tpu_vote(cluster_info, poh_recorder),
|
||||
};
|
||||
let addr = match addr {
|
||||
Some(addr) => addr,
|
||||
None => return (Ok(()), 0),
|
||||
};
|
||||
|
||||
const INTERVAL_MS: u64 = 100;
|
||||
const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200;
|
||||
const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000;
|
||||
|
@ -525,7 +558,20 @@ impl BankingStage {
|
|||
|
||||
let mut measure = Measure::start("banking_stage-forward-us");
|
||||
|
||||
let conn = get_connection(tpu_forwards);
|
||||
let conn = if let ForwardOption::ForwardTpuVote = forward_option {
|
||||
// The vote must be forwarded using only UDP. Let's get the UDP connection.
|
||||
banking_stage_stats
|
||||
.forwarded_vote_count
|
||||
.fetch_add(packet_vec_len, Ordering::Relaxed);
|
||||
Arc::new(UdpTpuConnection::new_from_addr(addr).into())
|
||||
} else {
|
||||
// All other transactions can be forwarded using QUIC, get_connection() will use
|
||||
// system wide setting to pick the correct connection object.
|
||||
banking_stage_stats
|
||||
.forwarded_transaction_count
|
||||
.fetch_add(packet_vec_len, Ordering::Relaxed);
|
||||
get_connection(&addr)
|
||||
};
|
||||
let res = conn.send_wire_transaction_batch_async(packet_vec);
|
||||
|
||||
measure.stop();
|
||||
|
@ -908,6 +954,7 @@ impl BankingStage {
|
|||
false,
|
||||
data_budget,
|
||||
slot_metrics_tracker,
|
||||
banking_stage_stats,
|
||||
)
|
||||
},
|
||||
(),
|
||||
|
@ -926,6 +973,7 @@ impl BankingStage {
|
|||
true,
|
||||
data_budget,
|
||||
slot_metrics_tracker,
|
||||
banking_stage_stats,
|
||||
)
|
||||
},
|
||||
(),
|
||||
|
@ -945,29 +993,26 @@ impl BankingStage {
|
|||
hold: bool,
|
||||
data_budget: &DataBudget,
|
||||
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
|
||||
banking_stage_stats: &BankingStageStats,
|
||||
) {
|
||||
let addr = match forward_option {
|
||||
ForwardOption::NotForward => {
|
||||
if !hold {
|
||||
buffered_packet_batches.clear();
|
||||
}
|
||||
return;
|
||||
if let ForwardOption::NotForward = forward_option {
|
||||
if !hold {
|
||||
buffered_packet_batches.clear();
|
||||
}
|
||||
ForwardOption::ForwardTransaction => {
|
||||
next_leader_tpu_forwards(cluster_info, poh_recorder)
|
||||
}
|
||||
ForwardOption::ForwardTpuVote => next_leader_tpu_vote(cluster_info, poh_recorder),
|
||||
};
|
||||
let addr = match addr {
|
||||
Some(addr) => addr,
|
||||
None => return,
|
||||
};
|
||||
return;
|
||||
}
|
||||
|
||||
let forwardable_packets =
|
||||
Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter());
|
||||
let forwardable_packets_len = forwardable_packets.len();
|
||||
let (_forward_result, sucessful_forwarded_packets_count) =
|
||||
Self::forward_buffered_packets(&addr, forwardable_packets, data_budget);
|
||||
let (_forward_result, sucessful_forwarded_packets_count) = Self::forward_buffered_packets(
|
||||
forward_option,
|
||||
cluster_info,
|
||||
poh_recorder,
|
||||
forwardable_packets,
|
||||
data_budget,
|
||||
banking_stage_stats,
|
||||
);
|
||||
let failed_forwarded_packets_count =
|
||||
forwardable_packets_len.saturating_sub(sucessful_forwarded_packets_count);
|
||||
|
||||
|
@ -4072,6 +4117,7 @@ mod tests {
|
|||
vec![deserialized_packet.clone()].into_iter(),
|
||||
1,
|
||||
);
|
||||
let stats = BankingStageStats::default();
|
||||
BankingStage::handle_forwarding(
|
||||
&ForwardOption::ForwardTransaction,
|
||||
&cluster_info,
|
||||
|
@ -4080,6 +4126,7 @@ mod tests {
|
|||
true,
|
||||
&data_budget,
|
||||
&mut LeaderSlotMetricsTracker::new(0),
|
||||
&stats,
|
||||
);
|
||||
|
||||
recv_socket
|
||||
|
@ -4179,6 +4226,7 @@ mod tests {
|
|||
];
|
||||
|
||||
for (name, forward_option, hold, expected_ids, expected_num_unprocessed) in test_cases {
|
||||
let stats = BankingStageStats::default();
|
||||
BankingStage::handle_forwarding(
|
||||
&forward_option,
|
||||
&cluster_info,
|
||||
|
@ -4187,6 +4235,7 @@ mod tests {
|
|||
hold,
|
||||
&DataBudget::default(),
|
||||
&mut LeaderSlotMetricsTracker::new(0),
|
||||
&stats,
|
||||
);
|
||||
|
||||
recv_socket
|
||||
|
|
|
@ -43,6 +43,7 @@ impl FetchStage {
|
|||
) -> (Self, PacketBatchReceiver, PacketBatchReceiver) {
|
||||
let (sender, receiver) = unbounded();
|
||||
let (vote_sender, vote_receiver) = unbounded();
|
||||
let (forward_sender, forward_receiver) = unbounded();
|
||||
(
|
||||
Self::new_with_sender(
|
||||
sockets,
|
||||
|
@ -51,6 +52,8 @@ impl FetchStage {
|
|||
exit,
|
||||
&sender,
|
||||
&vote_sender,
|
||||
&forward_sender,
|
||||
forward_receiver,
|
||||
poh_recorder,
|
||||
coalesce_ms,
|
||||
None,
|
||||
|
@ -60,6 +63,7 @@ impl FetchStage {
|
|||
)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new_with_sender(
|
||||
sockets: Vec<UdpSocket>,
|
||||
tpu_forwards_sockets: Vec<UdpSocket>,
|
||||
|
@ -67,6 +71,8 @@ impl FetchStage {
|
|||
exit: &Arc<AtomicBool>,
|
||||
sender: &PacketBatchSender,
|
||||
vote_sender: &PacketBatchSender,
|
||||
forward_sender: &PacketBatchSender,
|
||||
forward_receiver: PacketBatchReceiver,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
coalesce_ms: u64,
|
||||
in_vote_only_mode: Option<Arc<AtomicBool>>,
|
||||
|
@ -81,6 +87,8 @@ impl FetchStage {
|
|||
exit,
|
||||
sender,
|
||||
vote_sender,
|
||||
forward_sender,
|
||||
forward_receiver,
|
||||
poh_recorder,
|
||||
coalesce_ms,
|
||||
in_vote_only_mode,
|
||||
|
@ -129,6 +137,7 @@ impl FetchStage {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn new_multi_socket(
|
||||
tpu_sockets: Vec<Arc<UdpSocket>>,
|
||||
tpu_forwards_sockets: Vec<Arc<UdpSocket>>,
|
||||
|
@ -136,6 +145,8 @@ impl FetchStage {
|
|||
exit: &Arc<AtomicBool>,
|
||||
sender: &PacketBatchSender,
|
||||
vote_sender: &PacketBatchSender,
|
||||
forward_sender: &PacketBatchSender,
|
||||
forward_receiver: PacketBatchReceiver,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
coalesce_ms: u64,
|
||||
in_vote_only_mode: Option<Arc<AtomicBool>>,
|
||||
|
@ -160,7 +171,6 @@ impl FetchStage {
|
|||
.collect();
|
||||
|
||||
let tpu_forward_stats = Arc::new(StreamerReceiveStats::new("tpu_forwards_receiver"));
|
||||
let (forward_sender, forward_receiver) = unbounded();
|
||||
let tpu_forwards_threads: Vec<_> = tpu_forwards_sockets
|
||||
.into_iter()
|
||||
.map(|socket| {
|
||||
|
|
|
@ -29,7 +29,9 @@ use {
|
|||
vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
|
||||
},
|
||||
solana_sdk::signature::Keypair,
|
||||
solana_streamer::quic::{spawn_server, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
|
||||
solana_streamer::quic::{
|
||||
spawn_server, StreamStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS,
|
||||
},
|
||||
std::{
|
||||
collections::HashMap,
|
||||
net::UdpSocket,
|
||||
|
@ -53,6 +55,7 @@ pub struct TpuSockets {
|
|||
pub vote: Vec<UdpSocket>,
|
||||
pub broadcast: Vec<UdpSocket>,
|
||||
pub transactions_quic: UdpSocket,
|
||||
pub transactions_forwards_quic: UdpSocket,
|
||||
}
|
||||
|
||||
pub struct Tpu {
|
||||
|
@ -63,6 +66,7 @@ pub struct Tpu {
|
|||
cluster_info_vote_listener: ClusterInfoVoteListener,
|
||||
broadcast_stage: BroadcastStage,
|
||||
tpu_quic_t: thread::JoinHandle<()>,
|
||||
tpu_forwards_quic_t: thread::JoinHandle<()>,
|
||||
find_packet_sender_stake_stage: FindPacketSenderStakeStage,
|
||||
vote_find_packet_sender_stake_stage: FindPacketSenderStakeStage,
|
||||
staked_nodes_updater_service: StakedNodesUpdaterService,
|
||||
|
@ -100,10 +104,12 @@ impl Tpu {
|
|||
vote: tpu_vote_sockets,
|
||||
broadcast: broadcast_sockets,
|
||||
transactions_quic: transactions_quic_sockets,
|
||||
transactions_forwards_quic: transactions_forwards_quic_sockets,
|
||||
} = sockets;
|
||||
|
||||
let (packet_sender, packet_receiver) = unbounded();
|
||||
let (vote_packet_sender, vote_packet_receiver) = unbounded();
|
||||
let (forwarded_packet_sender, forwarded_packet_receiver) = unbounded();
|
||||
let fetch_stage = FetchStage::new_with_sender(
|
||||
transactions_sockets,
|
||||
tpu_forwards_sockets,
|
||||
|
@ -111,6 +117,8 @@ impl Tpu {
|
|||
exit,
|
||||
&packet_sender,
|
||||
&vote_packet_sender,
|
||||
&forwarded_packet_sender,
|
||||
forwarded_packet_receiver,
|
||||
poh_recorder,
|
||||
tpu_coalesce_ms,
|
||||
Some(bank_forks.read().unwrap().get_vote_only_mode_signal()),
|
||||
|
@ -145,6 +153,7 @@ impl Tpu {
|
|||
|
||||
let (verified_sender, verified_receiver) = unbounded();
|
||||
|
||||
let stats = Arc::new(StreamStats::default());
|
||||
let tpu_quic_t = spawn_server(
|
||||
transactions_quic_sockets,
|
||||
keypair,
|
||||
|
@ -152,9 +161,24 @@ impl Tpu {
|
|||
packet_sender,
|
||||
exit.clone(),
|
||||
MAX_QUIC_CONNECTIONS_PER_IP,
|
||||
staked_nodes,
|
||||
staked_nodes.clone(),
|
||||
MAX_STAKED_CONNECTIONS,
|
||||
MAX_UNSTAKED_CONNECTIONS,
|
||||
stats.clone(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let tpu_forwards_quic_t = spawn_server(
|
||||
transactions_forwards_quic_sockets,
|
||||
keypair,
|
||||
cluster_info.my_contact_info().tpu_forwards.ip(),
|
||||
forwarded_packet_sender,
|
||||
exit.clone(),
|
||||
MAX_QUIC_CONNECTIONS_PER_IP,
|
||||
staked_nodes,
|
||||
MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS),
|
||||
0, // Prevent unstaked nodes from forwarding transactions
|
||||
stats,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
@ -223,6 +247,7 @@ impl Tpu {
|
|||
cluster_info_vote_listener,
|
||||
broadcast_stage,
|
||||
tpu_quic_t,
|
||||
tpu_forwards_quic_t,
|
||||
find_packet_sender_stake_stage,
|
||||
vote_find_packet_sender_stake_stage,
|
||||
staked_nodes_updater_service,
|
||||
|
@ -240,7 +265,7 @@ impl Tpu {
|
|||
// exit can deadlock. put an upper-bound on how long we wait for it
|
||||
let timeout = Duration::from_secs(TPU_THREADS_JOIN_TIMEOUT_SECONDS);
|
||||
if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) {
|
||||
error!("timeout for closing tvu");
|
||||
error!("timeout for closing tpu");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -257,6 +282,7 @@ impl Tpu {
|
|||
self.staked_nodes_updater_service.join(),
|
||||
];
|
||||
self.tpu_quic_t.join()?;
|
||||
self.tpu_forwards_quic_t.join()?;
|
||||
let broadcast_result = self.broadcast_stage.join();
|
||||
for result in results {
|
||||
result?;
|
||||
|
|
|
@ -983,6 +983,7 @@ impl Validator {
|
|||
vote: node.sockets.tpu_vote,
|
||||
broadcast: node.sockets.broadcast,
|
||||
transactions_quic: node.sockets.tpu_quic,
|
||||
transactions_forwards_quic: node.sockets.tpu_forwards_quic,
|
||||
},
|
||||
&rpc_subscriptions,
|
||||
transaction_status_sender,
|
||||
|
|
|
@ -2719,6 +2719,7 @@ pub struct Sockets {
|
|||
pub serve_repair: UdpSocket,
|
||||
pub ancestor_hashes_requests: UdpSocket,
|
||||
pub tpu_quic: UdpSocket,
|
||||
pub tpu_forwards_quic: UdpSocket,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -2741,7 +2742,8 @@ impl Node {
|
|||
let gossip_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), gossip_port);
|
||||
let tvu = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let tvu_forwards = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let tpu_forwards = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let ((_tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) =
|
||||
bind_two_consecutive_in_range(bind_ip_addr, (1024, 65535)).unwrap();
|
||||
let tpu_vote = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let repair = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let rpc_port = find_available_port_in_range(bind_ip_addr, (1024, 65535)).unwrap();
|
||||
|
@ -2786,6 +2788,7 @@ impl Node {
|
|||
serve_repair,
|
||||
ancestor_hashes_requests,
|
||||
tpu_quic,
|
||||
tpu_forwards_quic,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -2822,7 +2825,8 @@ impl Node {
|
|||
let (tvu_forwards_port, tvu_forwards) = Self::bind(bind_ip_addr, port_range);
|
||||
let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) =
|
||||
bind_two_consecutive_in_range(bind_ip_addr, port_range).unwrap();
|
||||
let (tpu_forwards_port, tpu_forwards) = Self::bind(bind_ip_addr, port_range);
|
||||
let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) =
|
||||
bind_two_consecutive_in_range(bind_ip_addr, port_range).unwrap();
|
||||
let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range);
|
||||
let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range);
|
||||
let (repair_port, repair) = Self::bind(bind_ip_addr, port_range);
|
||||
|
@ -2866,6 +2870,7 @@ impl Node {
|
|||
serve_repair,
|
||||
ancestor_hashes_requests,
|
||||
tpu_quic,
|
||||
tpu_forwards_quic,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -2897,6 +2902,14 @@ impl Node {
|
|||
let (tpu_forwards_port, tpu_forwards_sockets) =
|
||||
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind");
|
||||
|
||||
let (_tpu_forwards_port_quic, tpu_forwards_quic) = Self::bind(
|
||||
bind_ip_addr,
|
||||
(
|
||||
tpu_forwards_port + QUIC_PORT_OFFSET,
|
||||
tpu_forwards_port + QUIC_PORT_OFFSET + 1,
|
||||
),
|
||||
);
|
||||
|
||||
let (tpu_vote_port, tpu_vote_sockets) =
|
||||
multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind");
|
||||
|
||||
|
@ -2944,6 +2957,7 @@ impl Node {
|
|||
ip_echo: Some(ip_echo),
|
||||
ancestor_hashes_requests,
|
||||
tpu_quic,
|
||||
tpu_forwards_quic,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ pub struct UdpSocketPair {
|
|||
pub type PortRange = (u16, u16);
|
||||
|
||||
pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000);
|
||||
pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 12; // VALIDATOR_PORT_RANGE must be at least this wide
|
||||
pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 13; // VALIDATOR_PORT_RANGE must be at least this wide
|
||||
|
||||
pub(crate) const HEADER_LENGTH: usize = 4;
|
||||
pub(crate) const IP_ECHO_SERVER_RESPONSE_LENGTH: usize = HEADER_LENGTH + 23;
|
||||
|
|
|
@ -327,7 +327,7 @@ impl ConnectionTable {
|
|||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct StreamStats {
|
||||
pub struct StreamStats {
|
||||
total_connections: AtomicUsize,
|
||||
total_new_connections: AtomicUsize,
|
||||
total_streams: AtomicUsize,
|
||||
|
@ -342,6 +342,7 @@ struct StreamStats {
|
|||
total_stream_read_errors: AtomicUsize,
|
||||
num_evictions: AtomicUsize,
|
||||
connection_add_failed: AtomicUsize,
|
||||
connection_add_failed_unstaked_node: AtomicUsize,
|
||||
connection_setup_timeout: AtomicUsize,
|
||||
}
|
||||
|
||||
|
@ -379,6 +380,12 @@ impl StreamStats {
|
|||
self.connection_add_failed.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"connection_add_failed_unstaked_node",
|
||||
self.connection_add_failed_unstaked_node
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"connection_setup_timeout",
|
||||
self.connection_setup_timeout.swap(0, Ordering::Relaxed),
|
||||
|
@ -486,6 +493,7 @@ fn handle_connection(
|
|||
});
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn spawn_server(
|
||||
sock: UdpSocket,
|
||||
keypair: &Keypair,
|
||||
|
@ -496,6 +504,7 @@ pub fn spawn_server(
|
|||
staked_nodes: Arc<RwLock<HashMap<IpAddr, u64>>>,
|
||||
max_staked_connections: usize,
|
||||
max_unstaked_connections: usize,
|
||||
stats: Arc<StreamStats>,
|
||||
) -> Result<thread::JoinHandle<()>, QuicServerError> {
|
||||
let (config, _cert) = configure_server(keypair, gossip_host)?;
|
||||
|
||||
|
@ -506,7 +515,6 @@ pub fn spawn_server(
|
|||
.map_err(|_e| QuicServerError::EndpointFailed)?
|
||||
};
|
||||
|
||||
let stats = Arc::new(StreamStats::default());
|
||||
let handle = thread::spawn(move || {
|
||||
let handle = runtime.spawn(async move {
|
||||
debug!("spawn quic server");
|
||||
|
@ -561,29 +569,36 @@ pub fn spawn_server(
|
|||
}
|
||||
};
|
||||
|
||||
if let Some((last_update, stream_exit)) = connection_table_l
|
||||
.try_add_connection(
|
||||
&remote_addr,
|
||||
timing::timestamp(),
|
||||
max_connections_per_ip,
|
||||
)
|
||||
{
|
||||
drop(connection_table_l);
|
||||
let packet_sender = packet_sender.clone();
|
||||
let stats = stats.clone();
|
||||
let connection_table1 = connection_table.clone();
|
||||
handle_connection(
|
||||
uni_streams,
|
||||
packet_sender,
|
||||
remote_addr,
|
||||
last_update,
|
||||
connection_table1,
|
||||
stream_exit,
|
||||
stats,
|
||||
stake,
|
||||
);
|
||||
if stake != 0 || max_unstaked_connections > 0 {
|
||||
if let Some((last_update, stream_exit)) = connection_table_l
|
||||
.try_add_connection(
|
||||
&remote_addr,
|
||||
timing::timestamp(),
|
||||
max_connections_per_ip,
|
||||
)
|
||||
{
|
||||
drop(connection_table_l);
|
||||
let packet_sender = packet_sender.clone();
|
||||
let stats = stats.clone();
|
||||
let connection_table1 = connection_table.clone();
|
||||
handle_connection(
|
||||
uni_streams,
|
||||
packet_sender,
|
||||
remote_addr,
|
||||
last_update,
|
||||
connection_table1,
|
||||
stream_exit,
|
||||
stats,
|
||||
stake,
|
||||
);
|
||||
} else {
|
||||
stats.connection_add_failed.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
} else {
|
||||
stats.connection_add_failed.fetch_add(1, Ordering::Relaxed);
|
||||
connection.close(0u32.into(), &[0u8]);
|
||||
stats
|
||||
.connection_add_failed_unstaked_node
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
} else {
|
||||
stats
|
||||
|
@ -739,6 +754,7 @@ mod test {
|
|||
let ip = "127.0.0.1".parse().unwrap();
|
||||
let server_address = s.local_addr().unwrap();
|
||||
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
|
||||
let stats = Arc::new(StreamStats::default());
|
||||
let t = spawn_server(
|
||||
s,
|
||||
&keypair,
|
||||
|
@ -749,6 +765,7 @@ mod test {
|
|||
staked_nodes,
|
||||
10,
|
||||
10,
|
||||
stats,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
@ -809,6 +826,7 @@ mod test {
|
|||
let ip = "127.0.0.1".parse().unwrap();
|
||||
let server_address = s.local_addr().unwrap();
|
||||
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
|
||||
let stats = Arc::new(StreamStats::default());
|
||||
let t = spawn_server(
|
||||
s,
|
||||
&keypair,
|
||||
|
@ -819,6 +837,7 @@ mod test {
|
|||
staked_nodes,
|
||||
MAX_STAKED_CONNECTIONS,
|
||||
MAX_UNSTAKED_CONNECTIONS,
|
||||
stats,
|
||||
)
|
||||
.unwrap();
|
||||
(t, exit, receiver, server_address)
|
||||
|
@ -868,6 +887,52 @@ mod test {
|
|||
t.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_quic_server_unstaked_node_connect_failure() {
|
||||
solana_logger::setup();
|
||||
|
||||
let s = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let (sender, _receiver) = unbounded();
|
||||
let keypair = Keypair::new();
|
||||
let ip = "127.0.0.1".parse().unwrap();
|
||||
let server_address = s.local_addr().unwrap();
|
||||
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
|
||||
let stats = Arc::new(StreamStats::default());
|
||||
let t = spawn_server(
|
||||
s,
|
||||
&keypair,
|
||||
ip,
|
||||
sender,
|
||||
exit.clone(),
|
||||
1,
|
||||
staked_nodes,
|
||||
MAX_STAKED_CONNECTIONS,
|
||||
0, // Do not allow any connection from unstaked clients/nodes
|
||||
stats,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let runtime = rt();
|
||||
let _rt_guard = runtime.enter();
|
||||
let conn1 = Arc::new(make_client_endpoint(&runtime, &server_address));
|
||||
|
||||
// Send a full size packet with single byte writes.
|
||||
let handle = runtime.spawn(async move {
|
||||
if let Ok(mut s1) = conn1.connection.open_uni().await {
|
||||
for _ in 0..PACKET_DATA_SIZE {
|
||||
// Ignoring any errors here. s1.finish() will test the error condition
|
||||
s1.write_all(&[0u8]).await.unwrap_or_default();
|
||||
}
|
||||
s1.finish().await.unwrap_err();
|
||||
}
|
||||
});
|
||||
runtime.block_on(handle).unwrap();
|
||||
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
t.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_table() {
|
||||
use std::net::Ipv4Addr;
|
||||
|
|
|
@ -152,6 +152,7 @@ fn verify_reachable_ports(
|
|||
}
|
||||
if ContactInfo::is_valid_address(&node.info.tpu_forwards, socket_addr_space) {
|
||||
udp_sockets.extend(node.sockets.tpu_forwards.iter());
|
||||
udp_sockets.push(&node.sockets.tpu_forwards_quic);
|
||||
}
|
||||
if ContactInfo::is_valid_address(&node.info.tpu_vote, socket_addr_space) {
|
||||
udp_sockets.extend(node.sockets.tpu_vote.iter());
|
||||
|
|
Loading…
Reference in New Issue