2022-06-07 09:22:46 -07:00
|
|
|
use {
|
2022-06-21 12:06:44 -07:00
|
|
|
crate::{
|
|
|
|
quic::{configure_server, QuicServerError, StreamStats},
|
|
|
|
streamer::StakedNodes,
|
2022-07-12 13:34:37 -07:00
|
|
|
tls_certificates::get_pubkey_from_tls_certificate,
|
2022-06-21 12:06:44 -07:00
|
|
|
},
|
2023-03-16 06:50:57 -07:00
|
|
|
async_channel::{
|
|
|
|
unbounded as async_unbounded, Receiver as AsyncReceiver, Sender as AsyncSender,
|
|
|
|
},
|
|
|
|
bytes::Bytes,
|
2022-06-07 09:22:46 -07:00
|
|
|
crossbeam_channel::Sender,
|
2022-06-30 17:56:15 -07:00
|
|
|
indexmap::map::{Entry, IndexMap},
|
2022-06-21 16:22:11 -07:00
|
|
|
percentage::Percentage,
|
2023-01-11 10:08:22 -08:00
|
|
|
quinn::{Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt},
|
2022-08-09 10:02:47 -07:00
|
|
|
quinn_proto::VarIntBoundsExceeded,
|
2022-06-30 17:56:15 -07:00
|
|
|
rand::{thread_rng, Rng},
|
2023-03-16 06:50:57 -07:00
|
|
|
solana_perf::packet::{PacketBatch, PACKETS_PER_BATCH},
|
2022-06-07 09:22:46 -07:00
|
|
|
solana_sdk::{
|
2023-03-16 06:50:57 -07:00
|
|
|
packet::{Meta, PACKET_DATA_SIZE},
|
2022-07-13 09:59:01 -07:00
|
|
|
pubkey::Pubkey,
|
2022-07-22 08:56:15 -07:00
|
|
|
quic::{
|
2022-11-15 12:58:10 -08:00
|
|
|
QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_STAKED_CONCURRENT_STREAMS,
|
|
|
|
QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
|
|
|
|
QUIC_MIN_STAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO,
|
|
|
|
QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO,
|
2022-07-22 08:56:15 -07:00
|
|
|
},
|
2022-06-07 09:22:46 -07:00
|
|
|
signature::Keypair,
|
|
|
|
timing,
|
|
|
|
},
|
|
|
|
std::{
|
|
|
|
net::{IpAddr, SocketAddr, UdpSocket},
|
|
|
|
sync::{
|
|
|
|
atomic::{AtomicBool, AtomicU64, Ordering},
|
2022-06-30 17:56:15 -07:00
|
|
|
Arc, Mutex, MutexGuard, RwLock,
|
2022-06-07 09:22:46 -07:00
|
|
|
},
|
|
|
|
time::{Duration, Instant},
|
|
|
|
},
|
2022-06-21 16:22:11 -07:00
|
|
|
tokio::{
|
|
|
|
task::JoinHandle,
|
|
|
|
time::{sleep, timeout},
|
|
|
|
},
|
2022-06-07 09:22:46 -07:00
|
|
|
};
|
|
|
|
|
2022-06-27 09:01:39 -07:00
|
|
|
const WAIT_FOR_STREAM_TIMEOUT_MS: u64 = 100;
|
2022-11-02 10:09:32 -07:00
|
|
|
pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS: u64 = 10000;
|
2022-06-21 12:06:44 -07:00
|
|
|
|
2022-07-13 11:55:13 -07:00
|
|
|
pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";
|
|
|
|
|
2022-10-11 19:13:43 -07:00
|
|
|
const CONNECTION_CLOSE_CODE_DROPPED_ENTRY: u32 = 1;
|
|
|
|
const CONNECTION_CLOSE_REASON_DROPPED_ENTRY: &[u8] = b"dropped";
|
2022-10-06 13:17:04 -07:00
|
|
|
|
|
|
|
const CONNECTION_CLOSE_CODE_DISALLOWED: u32 = 2;
|
|
|
|
const CONNECTION_CLOSE_REASON_DISALLOWED: &[u8] = b"disallowed";
|
|
|
|
|
2022-10-11 19:13:43 -07:00
|
|
|
const CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT: u32 = 3;
|
|
|
|
const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = b"exceed_max_stream_count";
|
2022-10-06 13:17:04 -07:00
|
|
|
|
|
|
|
const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4;
|
|
|
|
const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many";
|
|
|
|
|
2023-03-16 06:50:57 -07:00
|
|
|
// A sequence of bytes that is part of a packet
|
|
|
|
// along with where in the packet it is
|
|
|
|
struct PacketChunk {
|
|
|
|
pub bytes: Bytes,
|
|
|
|
// The offset of these bytes in the Quic stream
|
|
|
|
// and thus the beginning offset in the slice of the
|
|
|
|
// Packet data array into which the bytes will be copied
|
|
|
|
pub offset: usize,
|
|
|
|
// The end offset of these bytes in the Quic stream
|
|
|
|
// and thus the end of the slice in the Packet data array
|
|
|
|
// into which the bytes will be copied
|
|
|
|
pub end_of_chunk: usize,
|
|
|
|
}
|
|
|
|
|
|
|
|
// A struct to accumulate the bytes making up
|
|
|
|
// a packet, along with their offsets, and the
|
|
|
|
// packet metadata. We use this accumulator to avoid
|
|
|
|
// multiple copies of the Bytes (when building up
|
|
|
|
// the Packet and then when copying the Packet into a PacketBatch)
|
|
|
|
struct PacketAccumulator {
|
|
|
|
pub meta: Meta,
|
|
|
|
pub chunks: Vec<PacketChunk>,
|
|
|
|
}
|
|
|
|
|
2022-06-07 09:22:46 -07:00
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
|
|
pub fn spawn_server(
|
|
|
|
sock: UdpSocket,
|
|
|
|
keypair: &Keypair,
|
|
|
|
gossip_host: IpAddr,
|
|
|
|
packet_sender: Sender<PacketBatch>,
|
|
|
|
exit: Arc<AtomicBool>,
|
2022-07-13 09:59:01 -07:00
|
|
|
max_connections_per_peer: usize,
|
2022-06-21 12:06:44 -07:00
|
|
|
staked_nodes: Arc<RwLock<StakedNodes>>,
|
2022-06-07 09:22:46 -07:00
|
|
|
max_staked_connections: usize,
|
|
|
|
max_unstaked_connections: usize,
|
|
|
|
stats: Arc<StreamStats>,
|
2022-11-02 10:09:32 -07:00
|
|
|
wait_for_chunk_timeout_ms: u64,
|
2023-03-16 06:50:57 -07:00
|
|
|
coalesce_ms: u64,
|
2022-12-09 10:59:43 -08:00
|
|
|
) -> Result<(Endpoint, JoinHandle<()>), QuicServerError> {
|
|
|
|
info!("Start quic server on {:?}", sock);
|
2022-06-07 09:22:46 -07:00
|
|
|
let (config, _cert) = configure_server(keypair, gossip_host)?;
|
|
|
|
|
2023-01-11 10:08:22 -08:00
|
|
|
let endpoint = {
|
|
|
|
Endpoint::new(EndpointConfig::default(), Some(config), sock, TokioRuntime)
|
2022-06-07 09:22:46 -07:00
|
|
|
.map_err(|_e| QuicServerError::EndpointFailed)?
|
|
|
|
};
|
|
|
|
|
|
|
|
let handle = tokio::spawn(run_server(
|
2023-01-11 10:08:22 -08:00
|
|
|
endpoint.clone(),
|
2022-06-07 09:22:46 -07:00
|
|
|
packet_sender,
|
|
|
|
exit,
|
2022-07-13 09:59:01 -07:00
|
|
|
max_connections_per_peer,
|
2022-06-07 09:22:46 -07:00
|
|
|
staked_nodes,
|
|
|
|
max_staked_connections,
|
|
|
|
max_unstaked_connections,
|
|
|
|
stats,
|
2022-11-02 10:09:32 -07:00
|
|
|
wait_for_chunk_timeout_ms,
|
2023-03-16 06:50:57 -07:00
|
|
|
coalesce_ms,
|
2022-06-07 09:22:46 -07:00
|
|
|
));
|
2022-12-09 10:59:43 -08:00
|
|
|
Ok((endpoint, handle))
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
|
|
|
|
2023-03-16 06:50:57 -07:00
|
|
|
#[allow(clippy::too_many_arguments)]
|
2022-06-07 09:22:46 -07:00
|
|
|
pub async fn run_server(
|
2023-01-11 10:08:22 -08:00
|
|
|
incoming: Endpoint,
|
2022-06-07 09:22:46 -07:00
|
|
|
packet_sender: Sender<PacketBatch>,
|
|
|
|
exit: Arc<AtomicBool>,
|
2022-07-13 09:59:01 -07:00
|
|
|
max_connections_per_peer: usize,
|
2022-06-21 12:06:44 -07:00
|
|
|
staked_nodes: Arc<RwLock<StakedNodes>>,
|
2022-06-07 09:22:46 -07:00
|
|
|
max_staked_connections: usize,
|
|
|
|
max_unstaked_connections: usize,
|
|
|
|
stats: Arc<StreamStats>,
|
2022-11-02 10:09:32 -07:00
|
|
|
wait_for_chunk_timeout_ms: u64,
|
2023-03-16 06:50:57 -07:00
|
|
|
coalesce_ms: u64,
|
2022-06-07 09:22:46 -07:00
|
|
|
) {
|
|
|
|
debug!("spawn quic server");
|
|
|
|
let mut last_datapoint = Instant::now();
|
2022-06-30 10:21:53 -07:00
|
|
|
let unstaked_connection_table: Arc<Mutex<ConnectionTable>> = Arc::new(Mutex::new(
|
|
|
|
ConnectionTable::new(ConnectionPeerType::Unstaked),
|
|
|
|
));
|
2022-06-07 09:22:46 -07:00
|
|
|
let staked_connection_table: Arc<Mutex<ConnectionTable>> =
|
2022-06-30 10:21:53 -07:00
|
|
|
Arc::new(Mutex::new(ConnectionTable::new(ConnectionPeerType::Staked)));
|
2023-03-16 06:50:57 -07:00
|
|
|
let (sender, receiver) = async_unbounded();
|
|
|
|
tokio::spawn(packet_batch_sender(
|
|
|
|
packet_sender,
|
|
|
|
receiver,
|
|
|
|
exit.clone(),
|
|
|
|
stats.clone(),
|
|
|
|
coalesce_ms,
|
|
|
|
));
|
2022-06-07 09:22:46 -07:00
|
|
|
while !exit.load(Ordering::Relaxed) {
|
|
|
|
const WAIT_FOR_CONNECTION_TIMEOUT_MS: u64 = 1000;
|
2022-06-21 16:22:11 -07:00
|
|
|
const WAIT_BETWEEN_NEW_CONNECTIONS_US: u64 = 1000;
|
2022-06-07 09:22:46 -07:00
|
|
|
let timeout_connection = timeout(
|
|
|
|
Duration::from_millis(WAIT_FOR_CONNECTION_TIMEOUT_MS),
|
2023-01-11 10:08:22 -08:00
|
|
|
incoming.accept(),
|
2022-06-07 09:22:46 -07:00
|
|
|
)
|
|
|
|
.await;
|
|
|
|
|
|
|
|
if last_datapoint.elapsed().as_secs() >= 5 {
|
|
|
|
stats.report();
|
|
|
|
last_datapoint = Instant::now();
|
|
|
|
}
|
|
|
|
|
|
|
|
if let Ok(Some(connection)) = timeout_connection {
|
2022-12-09 10:59:43 -08:00
|
|
|
info!("Got a connection {:?}", connection.remote_address());
|
2022-06-21 16:22:11 -07:00
|
|
|
tokio::spawn(setup_connection(
|
|
|
|
connection,
|
2022-06-30 10:21:53 -07:00
|
|
|
unstaked_connection_table.clone(),
|
2022-06-21 16:22:11 -07:00
|
|
|
staked_connection_table.clone(),
|
2023-03-16 06:50:57 -07:00
|
|
|
sender.clone(),
|
2022-07-13 09:59:01 -07:00
|
|
|
max_connections_per_peer,
|
2022-06-21 16:22:11 -07:00
|
|
|
staked_nodes.clone(),
|
|
|
|
max_staked_connections,
|
|
|
|
max_unstaked_connections,
|
|
|
|
stats.clone(),
|
2022-11-02 10:09:32 -07:00
|
|
|
wait_for_chunk_timeout_ms,
|
2022-06-21 16:22:11 -07:00
|
|
|
));
|
|
|
|
sleep(Duration::from_micros(WAIT_BETWEEN_NEW_CONNECTIONS_US)).await;
|
2022-12-09 10:59:43 -08:00
|
|
|
} else {
|
2023-01-13 19:22:57 -08:00
|
|
|
debug!("accept(): Timed out waiting for connection");
|
2022-06-21 16:22:11 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-06-30 17:56:15 -07:00
|
|
|
fn prune_unstaked_connection_table(
|
|
|
|
unstaked_connection_table: &mut MutexGuard<ConnectionTable>,
|
|
|
|
max_unstaked_connections: usize,
|
|
|
|
stats: Arc<StreamStats>,
|
|
|
|
) {
|
|
|
|
if unstaked_connection_table.total_size >= max_unstaked_connections {
|
|
|
|
const PRUNE_TABLE_TO_PERCENTAGE: u8 = 90;
|
|
|
|
let max_percentage_full = Percentage::from(PRUNE_TABLE_TO_PERCENTAGE);
|
|
|
|
|
|
|
|
let max_connections = max_percentage_full.apply_to(max_unstaked_connections);
|
|
|
|
let num_pruned = unstaked_connection_table.prune_oldest(max_connections);
|
|
|
|
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-07-13 09:59:01 -07:00
|
|
|
fn get_connection_stake(
|
|
|
|
connection: &Connection,
|
|
|
|
staked_nodes: Arc<RwLock<StakedNodes>>,
|
2022-08-09 10:02:47 -07:00
|
|
|
) -> Option<(Pubkey, u64, u64, u64, u64)> {
|
2022-07-11 11:06:40 -07:00
|
|
|
connection
|
|
|
|
.peer_identity()
|
|
|
|
.and_then(|der_cert_any| der_cert_any.downcast::<Vec<rustls::Certificate>>().ok())
|
|
|
|
.and_then(|der_certs| {
|
2023-01-12 15:24:02 -08:00
|
|
|
if der_certs.len() == 1 {
|
|
|
|
// Use the client cert only if it is self signed and the chain length is 1
|
|
|
|
get_pubkey_from_tls_certificate(&der_certs[0]).and_then(|pubkey| {
|
|
|
|
debug!("Peer public key is {:?}", pubkey);
|
|
|
|
|
|
|
|
let staked_nodes = staked_nodes.read().unwrap();
|
|
|
|
let total_stake = staked_nodes.total_stake;
|
|
|
|
let max_stake = staked_nodes.max_stake;
|
|
|
|
let min_stake = staked_nodes.min_stake;
|
|
|
|
staked_nodes
|
|
|
|
.pubkey_stake_map
|
|
|
|
.get(&pubkey)
|
|
|
|
.map(|stake| (pubkey, *stake, total_stake, max_stake, min_stake))
|
|
|
|
})
|
|
|
|
} else {
|
|
|
|
None
|
|
|
|
}
|
2022-07-11 11:06:40 -07:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2022-07-29 08:44:24 -07:00
|
|
|
pub fn compute_max_allowed_uni_streams(
|
2022-07-22 08:56:15 -07:00
|
|
|
peer_type: ConnectionPeerType,
|
|
|
|
peer_stake: u64,
|
2022-07-29 08:44:24 -07:00
|
|
|
total_stake: u64,
|
2022-07-22 08:56:15 -07:00
|
|
|
) -> usize {
|
2022-11-03 05:45:44 -07:00
|
|
|
// Treat stake = 0 as unstaked
|
2022-07-22 08:56:15 -07:00
|
|
|
if peer_stake == 0 {
|
|
|
|
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
|
|
|
|
} else {
|
|
|
|
match peer_type {
|
|
|
|
ConnectionPeerType::Staked => {
|
|
|
|
// No checked math for f64 type. So let's explicitly check for 0 here
|
2022-11-03 05:45:44 -07:00
|
|
|
if total_stake == 0 || peer_stake > total_stake {
|
|
|
|
warn!(
|
|
|
|
"Invalid stake values: peer_stake: {:?}, total_stake: {:?}",
|
|
|
|
peer_stake, total_stake,
|
|
|
|
);
|
|
|
|
|
2022-07-22 08:56:15 -07:00
|
|
|
QUIC_MIN_STAKED_CONCURRENT_STREAMS
|
|
|
|
} else {
|
2022-11-03 05:45:44 -07:00
|
|
|
let delta = (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS
|
|
|
|
- QUIC_MIN_STAKED_CONCURRENT_STREAMS)
|
|
|
|
as f64;
|
|
|
|
|
2022-11-15 12:58:10 -08:00
|
|
|
(((peer_stake as f64 / total_stake as f64) * delta) as usize
|
|
|
|
+ QUIC_MIN_STAKED_CONCURRENT_STREAMS)
|
|
|
|
.clamp(
|
|
|
|
QUIC_MIN_STAKED_CONCURRENT_STREAMS,
|
|
|
|
QUIC_MAX_STAKED_CONCURRENT_STREAMS,
|
|
|
|
)
|
2022-07-22 08:56:15 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
_ => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-08-05 11:24:07 -07:00
|
|
|
enum ConnectionHandlerError {
|
|
|
|
ConnectionAddError,
|
|
|
|
MaxStreamError,
|
|
|
|
}
|
|
|
|
|
|
|
|
struct NewConnectionHandlerParams {
|
2023-03-16 06:50:57 -07:00
|
|
|
// In principle, the code should work as-is if we replaced this with a crossbeam channel
|
|
|
|
// as the server code never does a blocking send (as the channel's unbounded)
|
|
|
|
// or a blocking recv (as we always use try_recv)
|
|
|
|
// but I've found that it's simply too easy to accidentally block
|
|
|
|
// in async code when using the crossbeam channel, so for the sake of maintainability,
|
|
|
|
// we're sticking with an async channel
|
|
|
|
packet_sender: AsyncSender<PacketAccumulator>,
|
2022-08-05 11:24:07 -07:00
|
|
|
remote_pubkey: Option<Pubkey>,
|
|
|
|
stake: u64,
|
|
|
|
total_stake: u64,
|
|
|
|
max_connections_per_peer: usize,
|
|
|
|
stats: Arc<StreamStats>,
|
2022-08-09 10:02:47 -07:00
|
|
|
max_stake: u64,
|
|
|
|
min_stake: u64,
|
2022-08-05 11:24:07 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
impl NewConnectionHandlerParams {
|
|
|
|
fn new_unstaked(
|
2023-03-16 06:50:57 -07:00
|
|
|
packet_sender: AsyncSender<PacketAccumulator>,
|
2022-08-05 11:24:07 -07:00
|
|
|
max_connections_per_peer: usize,
|
|
|
|
stats: Arc<StreamStats>,
|
|
|
|
) -> NewConnectionHandlerParams {
|
|
|
|
NewConnectionHandlerParams {
|
|
|
|
packet_sender,
|
|
|
|
remote_pubkey: None,
|
|
|
|
stake: 0,
|
|
|
|
total_stake: 0,
|
|
|
|
max_connections_per_peer,
|
|
|
|
stats,
|
2022-08-09 10:02:47 -07:00
|
|
|
max_stake: 0,
|
|
|
|
min_stake: 0,
|
2022-08-05 11:24:07 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn handle_and_cache_new_connection(
|
2023-01-11 10:08:22 -08:00
|
|
|
connection: Connection,
|
2022-08-05 11:24:07 -07:00
|
|
|
mut connection_table_l: MutexGuard<ConnectionTable>,
|
|
|
|
connection_table: Arc<Mutex<ConnectionTable>>,
|
|
|
|
params: &NewConnectionHandlerParams,
|
2022-11-02 10:09:32 -07:00
|
|
|
wait_for_chunk_timeout_ms: u64,
|
2022-08-05 11:24:07 -07:00
|
|
|
) -> Result<(), ConnectionHandlerError> {
|
|
|
|
if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams(
|
|
|
|
connection_table_l.peer_type,
|
|
|
|
params.stake,
|
|
|
|
params.total_stake,
|
|
|
|
) as u64)
|
|
|
|
{
|
|
|
|
connection.set_max_concurrent_uni_streams(max_uni_streams);
|
2022-08-09 10:02:47 -07:00
|
|
|
let receive_window = compute_recieve_window(
|
|
|
|
params.max_stake,
|
|
|
|
params.min_stake,
|
2022-08-05 11:24:07 -07:00
|
|
|
connection_table_l.peer_type,
|
|
|
|
params.stake,
|
|
|
|
);
|
|
|
|
|
2022-08-09 10:02:47 -07:00
|
|
|
if let Ok(receive_window) = receive_window {
|
|
|
|
connection.set_receive_window(receive_window);
|
|
|
|
}
|
|
|
|
|
2022-08-05 11:24:07 -07:00
|
|
|
let remote_addr = connection.remote_address();
|
|
|
|
|
2022-08-09 10:02:47 -07:00
|
|
|
debug!(
|
|
|
|
"Peer type: {:?}, stake {}, total stake {}, max streams {} receive_window {:?} from peer {}",
|
|
|
|
connection_table_l.peer_type,
|
|
|
|
params.stake,
|
|
|
|
params.total_stake,
|
|
|
|
max_uni_streams.into_inner(),
|
|
|
|
receive_window,
|
|
|
|
remote_addr,
|
|
|
|
);
|
|
|
|
|
2022-08-05 11:24:07 -07:00
|
|
|
if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection(
|
|
|
|
ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey),
|
|
|
|
remote_addr.port(),
|
2023-01-11 10:08:22 -08:00
|
|
|
Some(connection.clone()),
|
2022-08-05 11:24:07 -07:00
|
|
|
params.stake,
|
|
|
|
timing::timestamp(),
|
|
|
|
params.max_connections_per_peer,
|
|
|
|
) {
|
2022-08-09 17:39:14 -07:00
|
|
|
let peer_type = connection_table_l.peer_type;
|
2022-08-05 11:24:07 -07:00
|
|
|
drop(connection_table_l);
|
|
|
|
tokio::spawn(handle_connection(
|
2023-01-11 10:08:22 -08:00
|
|
|
connection,
|
2022-08-05 11:24:07 -07:00
|
|
|
params.packet_sender.clone(),
|
|
|
|
remote_addr,
|
|
|
|
params.remote_pubkey,
|
|
|
|
last_update,
|
|
|
|
connection_table,
|
|
|
|
stream_exit,
|
|
|
|
params.stats.clone(),
|
|
|
|
params.stake,
|
2022-08-09 17:39:14 -07:00
|
|
|
peer_type,
|
2022-11-02 10:09:32 -07:00
|
|
|
wait_for_chunk_timeout_ms,
|
2022-08-05 11:24:07 -07:00
|
|
|
));
|
|
|
|
Ok(())
|
|
|
|
} else {
|
|
|
|
params
|
|
|
|
.stats
|
|
|
|
.connection_add_failed
|
|
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
|
|
Err(ConnectionHandlerError::ConnectionAddError)
|
|
|
|
}
|
|
|
|
} else {
|
2022-10-06 13:17:04 -07:00
|
|
|
connection.close(
|
2022-10-11 19:13:43 -07:00
|
|
|
CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT.into(),
|
|
|
|
CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT,
|
2022-10-06 13:17:04 -07:00
|
|
|
);
|
2022-08-05 11:24:07 -07:00
|
|
|
params
|
|
|
|
.stats
|
|
|
|
.connection_add_failed_invalid_stream_count
|
|
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
|
|
Err(ConnectionHandlerError::MaxStreamError)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn prune_unstaked_connections_and_add_new_connection(
|
2023-01-11 10:08:22 -08:00
|
|
|
connection: Connection,
|
2022-08-05 11:24:07 -07:00
|
|
|
mut connection_table_l: MutexGuard<ConnectionTable>,
|
|
|
|
connection_table: Arc<Mutex<ConnectionTable>>,
|
|
|
|
max_connections: usize,
|
|
|
|
params: &NewConnectionHandlerParams,
|
2022-11-02 10:09:32 -07:00
|
|
|
wait_for_chunk_timeout_ms: u64,
|
2022-08-05 11:24:07 -07:00
|
|
|
) -> Result<(), ConnectionHandlerError> {
|
|
|
|
let stats = params.stats.clone();
|
|
|
|
if max_connections > 0 {
|
|
|
|
prune_unstaked_connection_table(&mut connection_table_l, max_connections, stats);
|
|
|
|
handle_and_cache_new_connection(
|
2023-01-11 10:08:22 -08:00
|
|
|
connection,
|
2022-08-05 11:24:07 -07:00
|
|
|
connection_table_l,
|
|
|
|
connection_table,
|
|
|
|
params,
|
2022-11-02 10:09:32 -07:00
|
|
|
wait_for_chunk_timeout_ms,
|
2022-08-05 11:24:07 -07:00
|
|
|
)
|
|
|
|
} else {
|
2023-01-11 10:08:22 -08:00
|
|
|
connection.close(
|
2022-10-06 13:17:04 -07:00
|
|
|
CONNECTION_CLOSE_CODE_DISALLOWED.into(),
|
|
|
|
CONNECTION_CLOSE_REASON_DISALLOWED,
|
|
|
|
);
|
2022-08-05 11:24:07 -07:00
|
|
|
Err(ConnectionHandlerError::ConnectionAddError)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-08-09 10:02:47 -07:00
|
|
|
/// Calculate the ratio for per connection receive window from a staked peer
|
|
|
|
fn compute_receive_window_ratio_for_staked_node(max_stake: u64, min_stake: u64, stake: u64) -> u64 {
|
|
|
|
// Testing shows the maximum througput from a connection is achieved at receive_window =
|
|
|
|
// PACKET_DATA_SIZE * 10. Beyond that, there is not much gain. We linearly map the
|
|
|
|
// stake to the ratio range from QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO to
|
|
|
|
// QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO. Where the linear algebra of finding the ratio 'r'
|
|
|
|
// for stake 's' is,
|
|
|
|
// r(s) = a * s + b. Given the max_stake, min_stake, max_ratio, min_ratio, we can find
|
|
|
|
// a and b.
|
|
|
|
|
|
|
|
if stake > max_stake {
|
|
|
|
return QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
|
|
|
|
}
|
|
|
|
|
|
|
|
let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
|
|
|
|
let min_ratio = QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO;
|
|
|
|
if max_stake > min_stake {
|
|
|
|
let a = (max_ratio - min_ratio) as f64 / (max_stake - min_stake) as f64;
|
|
|
|
let b = max_ratio as f64 - ((max_stake as f64) * a);
|
|
|
|
let ratio = (a * stake as f64) + b;
|
|
|
|
ratio.round() as u64
|
|
|
|
} else {
|
|
|
|
QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn compute_recieve_window(
|
|
|
|
max_stake: u64,
|
|
|
|
min_stake: u64,
|
|
|
|
peer_type: ConnectionPeerType,
|
|
|
|
peer_stake: u64,
|
|
|
|
) -> Result<VarInt, VarIntBoundsExceeded> {
|
|
|
|
match peer_type {
|
|
|
|
ConnectionPeerType::Unstaked => {
|
2022-11-09 11:39:38 -08:00
|
|
|
VarInt::from_u64(PACKET_DATA_SIZE as u64 * QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO)
|
2022-08-09 10:02:47 -07:00
|
|
|
}
|
|
|
|
ConnectionPeerType::Staked => {
|
|
|
|
let ratio =
|
|
|
|
compute_receive_window_ratio_for_staked_node(max_stake, min_stake, peer_stake);
|
2022-11-09 11:39:38 -08:00
|
|
|
VarInt::from_u64(PACKET_DATA_SIZE as u64 * ratio)
|
2022-08-09 10:02:47 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-11-02 10:09:32 -07:00
|
|
|
#[allow(clippy::too_many_arguments)]
|
2022-06-21 16:22:11 -07:00
|
|
|
async fn setup_connection(
|
2022-07-10 23:10:12 -07:00
|
|
|
connecting: Connecting,
|
2022-06-30 10:21:53 -07:00
|
|
|
unstaked_connection_table: Arc<Mutex<ConnectionTable>>,
|
2022-06-21 16:22:11 -07:00
|
|
|
staked_connection_table: Arc<Mutex<ConnectionTable>>,
|
2023-03-16 06:50:57 -07:00
|
|
|
packet_sender: AsyncSender<PacketAccumulator>,
|
2022-07-13 09:59:01 -07:00
|
|
|
max_connections_per_peer: usize,
|
2022-06-21 16:22:11 -07:00
|
|
|
staked_nodes: Arc<RwLock<StakedNodes>>,
|
|
|
|
max_staked_connections: usize,
|
|
|
|
max_unstaked_connections: usize,
|
|
|
|
stats: Arc<StreamStats>,
|
2022-11-02 10:09:32 -07:00
|
|
|
wait_for_chunk_timeout_ms: u64,
|
2022-06-21 16:22:11 -07:00
|
|
|
) {
|
2022-07-10 23:10:12 -07:00
|
|
|
if let Ok(connecting_result) = timeout(
|
|
|
|
Duration::from_millis(QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS),
|
|
|
|
connecting,
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
{
|
|
|
|
if let Ok(new_connection) = connecting_result {
|
|
|
|
stats.total_new_connections.fetch_add(1, Ordering::Relaxed);
|
2022-06-07 09:22:46 -07:00
|
|
|
|
2023-01-11 10:08:22 -08:00
|
|
|
let params = get_connection_stake(&new_connection, staked_nodes.clone()).map_or(
|
|
|
|
NewConnectionHandlerParams::new_unstaked(
|
|
|
|
packet_sender.clone(),
|
|
|
|
max_connections_per_peer,
|
|
|
|
stats.clone(),
|
|
|
|
),
|
|
|
|
|(pubkey, stake, total_stake, max_stake, min_stake)| NewConnectionHandlerParams {
|
|
|
|
packet_sender,
|
|
|
|
remote_pubkey: Some(pubkey),
|
|
|
|
stake,
|
|
|
|
total_stake,
|
|
|
|
max_connections_per_peer,
|
|
|
|
stats: stats.clone(),
|
|
|
|
max_stake,
|
|
|
|
min_stake,
|
|
|
|
},
|
|
|
|
);
|
2022-08-05 11:24:07 -07:00
|
|
|
|
|
|
|
if params.stake > 0 {
|
|
|
|
let mut connection_table_l = staked_connection_table.lock().unwrap();
|
|
|
|
if connection_table_l.total_size >= max_staked_connections {
|
|
|
|
let num_pruned = connection_table_l.prune_random(params.stake);
|
|
|
|
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
|
|
|
|
}
|
|
|
|
|
|
|
|
if connection_table_l.total_size < max_staked_connections {
|
|
|
|
if let Ok(()) = handle_and_cache_new_connection(
|
|
|
|
new_connection,
|
|
|
|
connection_table_l,
|
|
|
|
staked_connection_table.clone(),
|
|
|
|
¶ms,
|
2022-11-02 10:09:32 -07:00
|
|
|
wait_for_chunk_timeout_ms,
|
2022-07-10 23:10:12 -07:00
|
|
|
) {
|
2022-08-05 11:24:07 -07:00
|
|
|
stats
|
|
|
|
.connection_added_from_staked_peer
|
|
|
|
.fetch_add(1, Ordering::Relaxed);
|
2022-07-10 23:10:12 -07:00
|
|
|
}
|
2022-06-30 17:56:15 -07:00
|
|
|
} else {
|
2022-08-05 11:24:07 -07:00
|
|
|
// If we couldn't prune a connection in the staked connection table, let's
|
|
|
|
// put this connection in the unstaked connection table. If needed, prune a
|
|
|
|
// connection from the unstaked connection table.
|
|
|
|
if let Ok(()) = prune_unstaked_connections_and_add_new_connection(
|
|
|
|
new_connection,
|
|
|
|
unstaked_connection_table.lock().unwrap(),
|
|
|
|
unstaked_connection_table.clone(),
|
|
|
|
max_unstaked_connections,
|
|
|
|
¶ms,
|
2022-11-02 10:09:32 -07:00
|
|
|
wait_for_chunk_timeout_ms,
|
2022-08-05 11:24:07 -07:00
|
|
|
) {
|
|
|
|
stats
|
|
|
|
.connection_added_from_staked_peer
|
|
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
|
|
} else {
|
|
|
|
stats
|
|
|
|
.connection_add_failed_on_pruning
|
|
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
|
|
stats
|
|
|
|
.connection_add_failed_staked_node
|
|
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
|
|
}
|
2022-06-30 17:56:15 -07:00
|
|
|
}
|
2022-08-05 11:24:07 -07:00
|
|
|
} else if let Ok(()) = prune_unstaked_connections_and_add_new_connection(
|
|
|
|
new_connection,
|
|
|
|
unstaked_connection_table.lock().unwrap(),
|
|
|
|
unstaked_connection_table.clone(),
|
|
|
|
max_unstaked_connections,
|
|
|
|
¶ms,
|
2022-11-02 10:09:32 -07:00
|
|
|
wait_for_chunk_timeout_ms,
|
2022-08-05 11:24:07 -07:00
|
|
|
) {
|
|
|
|
stats
|
|
|
|
.connection_added_from_unstaked_peer
|
|
|
|
.fetch_add(1, Ordering::Relaxed);
|
2022-06-07 09:22:46 -07:00
|
|
|
} else {
|
2022-06-30 17:56:15 -07:00
|
|
|
stats
|
2022-07-10 23:10:12 -07:00
|
|
|
.connection_add_failed_unstaked_node
|
2022-06-30 17:56:15 -07:00
|
|
|
.fetch_add(1, Ordering::Relaxed);
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
2022-06-21 16:22:11 -07:00
|
|
|
} else {
|
2022-07-10 23:10:12 -07:00
|
|
|
stats.connection_setup_error.fetch_add(1, Ordering::Relaxed);
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
2022-06-21 16:22:11 -07:00
|
|
|
} else {
|
|
|
|
stats
|
|
|
|
.connection_setup_timeout
|
|
|
|
.fetch_add(1, Ordering::Relaxed);
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-03-16 06:50:57 -07:00
|
|
|
async fn packet_batch_sender(
|
|
|
|
packet_sender: Sender<PacketBatch>,
|
|
|
|
packet_receiver: AsyncReceiver<PacketAccumulator>,
|
|
|
|
exit: Arc<AtomicBool>,
|
|
|
|
stats: Arc<StreamStats>,
|
|
|
|
coalesce_ms: u64,
|
|
|
|
) {
|
|
|
|
trace!("enter packet_batch_sender");
|
|
|
|
let coalesce_ms = coalesce_ms as u128;
|
|
|
|
let mut batch_start_time = Instant::now();
|
|
|
|
loop {
|
|
|
|
let mut packet_batch = PacketBatch::with_capacity(PACKETS_PER_BATCH);
|
|
|
|
|
|
|
|
stats
|
|
|
|
.total_packets_allocated
|
|
|
|
.fetch_add(PACKETS_PER_BATCH, Ordering::Relaxed);
|
|
|
|
loop {
|
|
|
|
if exit.load(Ordering::Relaxed) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
let elapsed = batch_start_time.elapsed();
|
|
|
|
if packet_batch.len() >= PACKETS_PER_BATCH
|
|
|
|
|| (!packet_batch.is_empty() && elapsed.as_millis() >= coalesce_ms)
|
|
|
|
{
|
|
|
|
let len = packet_batch.len();
|
|
|
|
if let Err(e) = packet_sender.send(packet_batch) {
|
|
|
|
stats
|
|
|
|
.total_packet_batch_send_err
|
|
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
|
|
trace!("Send error: {}", e);
|
|
|
|
} else {
|
|
|
|
stats
|
|
|
|
.total_packet_batches_sent
|
|
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
|
|
|
|
|
|
stats
|
|
|
|
.total_packets_sent_to_consumer
|
|
|
|
.fetch_add(len, Ordering::Relaxed);
|
|
|
|
|
|
|
|
trace!("Sent {} packet batch", len);
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
let timeout_res = timeout(Duration::from_micros(250), packet_receiver.recv()).await;
|
|
|
|
|
|
|
|
if let Ok(Ok(packet_accumulator)) = timeout_res {
|
|
|
|
unsafe {
|
|
|
|
packet_batch.set_len(packet_batch.len() + 1);
|
|
|
|
}
|
|
|
|
|
|
|
|
let i = packet_batch.len() - 1;
|
|
|
|
*packet_batch[i].meta_mut() = packet_accumulator.meta;
|
|
|
|
for chunk in packet_accumulator.chunks {
|
|
|
|
packet_batch[i].buffer_mut()[chunk.offset..chunk.end_of_chunk]
|
|
|
|
.copy_from_slice(&chunk.bytes);
|
|
|
|
}
|
|
|
|
|
|
|
|
if packet_batch.len() == 1 {
|
|
|
|
batch_start_time = Instant::now();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-08-09 17:39:14 -07:00
|
|
|
#[allow(clippy::too_many_arguments)]
|
2022-06-07 09:22:46 -07:00
|
|
|
async fn handle_connection(
|
2023-01-11 10:08:22 -08:00
|
|
|
connection: Connection,
|
2023-03-16 06:50:57 -07:00
|
|
|
packet_sender: AsyncSender<PacketAccumulator>,
|
2022-06-07 09:22:46 -07:00
|
|
|
remote_addr: SocketAddr,
|
2022-07-13 09:59:01 -07:00
|
|
|
remote_pubkey: Option<Pubkey>,
|
2022-06-07 09:22:46 -07:00
|
|
|
last_update: Arc<AtomicU64>,
|
|
|
|
connection_table: Arc<Mutex<ConnectionTable>>,
|
|
|
|
stream_exit: Arc<AtomicBool>,
|
|
|
|
stats: Arc<StreamStats>,
|
|
|
|
stake: u64,
|
2022-08-09 17:39:14 -07:00
|
|
|
peer_type: ConnectionPeerType,
|
2022-11-02 10:09:32 -07:00
|
|
|
wait_for_chunk_timeout_ms: u64,
|
2022-06-07 09:22:46 -07:00
|
|
|
) {
|
|
|
|
debug!(
|
|
|
|
"quic new connection {} streams: {} connections: {}",
|
|
|
|
remote_addr,
|
|
|
|
stats.total_streams.load(Ordering::Relaxed),
|
|
|
|
stats.total_connections.load(Ordering::Relaxed),
|
|
|
|
);
|
2023-01-25 16:14:25 -08:00
|
|
|
let stable_id = connection.stable_id();
|
2022-09-06 09:36:55 -07:00
|
|
|
stats.total_connections.fetch_add(1, Ordering::Relaxed);
|
2022-06-07 09:22:46 -07:00
|
|
|
while !stream_exit.load(Ordering::Relaxed) {
|
2022-06-21 18:56:47 -07:00
|
|
|
if let Ok(stream) = tokio::time::timeout(
|
|
|
|
Duration::from_millis(WAIT_FOR_STREAM_TIMEOUT_MS),
|
2023-01-11 10:08:22 -08:00
|
|
|
connection.accept_uni(),
|
2022-06-21 18:56:47 -07:00
|
|
|
)
|
|
|
|
.await
|
|
|
|
{
|
|
|
|
match stream {
|
2023-01-11 10:08:22 -08:00
|
|
|
Ok(mut stream) => {
|
|
|
|
stats.total_streams.fetch_add(1, Ordering::Relaxed);
|
|
|
|
stats.total_new_streams.fetch_add(1, Ordering::Relaxed);
|
|
|
|
let stream_exit = stream_exit.clone();
|
|
|
|
let stats = stats.clone();
|
|
|
|
let packet_sender = packet_sender.clone();
|
|
|
|
let last_update = last_update.clone();
|
|
|
|
tokio::spawn(async move {
|
|
|
|
let mut maybe_batch = None;
|
|
|
|
// The min is to guard against a value too small which can wake up unnecessarily
|
|
|
|
// frequently and wasting CPU cycles. The max guard against waiting for too long
|
|
|
|
// which delay exit and cause some test failures when the timeout value is large.
|
|
|
|
// Within this value, the heuristic is to wake up 10 times to check for exit
|
|
|
|
// for the set timeout if there are no data.
|
|
|
|
let exit_check_interval = (wait_for_chunk_timeout_ms / 10).clamp(10, 1000);
|
|
|
|
let mut start = Instant::now();
|
|
|
|
while !stream_exit.load(Ordering::Relaxed) {
|
|
|
|
if let Ok(chunk) = tokio::time::timeout(
|
|
|
|
Duration::from_millis(exit_check_interval),
|
|
|
|
stream.read_chunk(PACKET_DATA_SIZE, false),
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
{
|
|
|
|
if handle_chunk(
|
2023-03-16 06:50:57 -07:00
|
|
|
chunk,
|
2023-01-11 10:08:22 -08:00
|
|
|
&mut maybe_batch,
|
|
|
|
&remote_addr,
|
|
|
|
&packet_sender,
|
|
|
|
stats.clone(),
|
|
|
|
stake,
|
|
|
|
peer_type,
|
2023-03-16 06:50:57 -07:00
|
|
|
)
|
|
|
|
.await
|
|
|
|
{
|
2023-01-11 10:08:22 -08:00
|
|
|
last_update.store(timing::timestamp(), Ordering::Relaxed);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
start = Instant::now();
|
|
|
|
} else {
|
|
|
|
let elapse = Instant::now() - start;
|
|
|
|
if elapse.as_millis() as u64 > wait_for_chunk_timeout_ms {
|
|
|
|
debug!("Timeout in receiving on stream");
|
|
|
|
stats
|
|
|
|
.total_stream_read_timeouts
|
|
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
|
|
break;
|
2022-06-21 18:56:47 -07:00
|
|
|
}
|
|
|
|
}
|
2023-01-11 10:08:22 -08:00
|
|
|
}
|
|
|
|
stats.total_streams.fetch_sub(1, Ordering::Relaxed);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
debug!("stream error: {:?}", e);
|
2022-06-07 09:22:46 -07:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2022-07-13 09:59:01 -07:00
|
|
|
|
2023-01-25 16:14:25 -08:00
|
|
|
let removed_connection_count = connection_table.lock().unwrap().remove_connection(
|
2022-07-13 09:59:01 -07:00
|
|
|
ConnectionTableKey::new(remote_addr.ip(), remote_pubkey),
|
|
|
|
remote_addr.port(),
|
2023-01-25 16:14:25 -08:00
|
|
|
stable_id,
|
|
|
|
);
|
|
|
|
if removed_connection_count > 0 {
|
|
|
|
stats
|
|
|
|
.connection_removed
|
|
|
|
.fetch_add(removed_connection_count, Ordering::Relaxed);
|
2022-06-30 10:21:53 -07:00
|
|
|
} else {
|
|
|
|
stats
|
|
|
|
.connection_remove_failed
|
|
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
|
|
}
|
2022-06-07 09:22:46 -07:00
|
|
|
stats.total_connections.fetch_sub(1, Ordering::Relaxed);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Return true if the server should drop the stream
|
2023-03-16 06:50:57 -07:00
|
|
|
async fn handle_chunk(
|
|
|
|
chunk: Result<Option<quinn::Chunk>, quinn::ReadError>,
|
|
|
|
packet_accum: &mut Option<PacketAccumulator>,
|
2022-06-07 09:22:46 -07:00
|
|
|
remote_addr: &SocketAddr,
|
2023-03-16 06:50:57 -07:00
|
|
|
packet_sender: &AsyncSender<PacketAccumulator>,
|
2022-06-07 09:22:46 -07:00
|
|
|
stats: Arc<StreamStats>,
|
|
|
|
stake: u64,
|
2022-08-09 17:39:14 -07:00
|
|
|
peer_type: ConnectionPeerType,
|
2022-06-07 09:22:46 -07:00
|
|
|
) -> bool {
|
|
|
|
match chunk {
|
|
|
|
Ok(maybe_chunk) => {
|
|
|
|
if let Some(chunk) = maybe_chunk {
|
|
|
|
trace!("got chunk: {:?}", chunk);
|
|
|
|
let chunk_len = chunk.bytes.len() as u64;
|
|
|
|
|
|
|
|
// shouldn't happen, but sanity check the size and offsets
|
|
|
|
if chunk.offset > PACKET_DATA_SIZE as u64 || chunk_len > PACKET_DATA_SIZE as u64 {
|
|
|
|
stats.total_invalid_chunks.fetch_add(1, Ordering::Relaxed);
|
|
|
|
return true;
|
|
|
|
}
|
2022-07-16 08:03:31 -07:00
|
|
|
let end_of_chunk = match chunk.offset.checked_add(chunk_len) {
|
|
|
|
Some(end) => end,
|
|
|
|
None => return true,
|
|
|
|
};
|
|
|
|
if end_of_chunk > PACKET_DATA_SIZE as u64 {
|
2022-06-07 09:22:46 -07:00
|
|
|
stats
|
|
|
|
.total_invalid_chunk_size
|
|
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
// chunk looks valid
|
2023-03-16 06:50:57 -07:00
|
|
|
if packet_accum.is_none() {
|
|
|
|
let mut meta = Meta::default();
|
|
|
|
meta.set_socket_addr(remote_addr);
|
|
|
|
meta.sender_stake = stake;
|
|
|
|
*packet_accum = Some(PacketAccumulator {
|
|
|
|
meta,
|
|
|
|
chunks: Vec::new(),
|
|
|
|
});
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
|
|
|
|
2023-03-16 06:50:57 -07:00
|
|
|
if let Some(accum) = packet_accum.as_mut() {
|
|
|
|
let offset = chunk.offset;
|
2022-07-16 08:03:31 -07:00
|
|
|
let end_of_chunk = match (chunk.offset as usize).checked_add(chunk.bytes.len())
|
|
|
|
{
|
|
|
|
Some(end) => end,
|
|
|
|
None => return true,
|
|
|
|
};
|
2023-03-16 06:50:57 -07:00
|
|
|
accum.chunks.push(PacketChunk {
|
|
|
|
bytes: chunk.bytes,
|
|
|
|
offset: offset as usize,
|
|
|
|
end_of_chunk,
|
|
|
|
});
|
|
|
|
|
|
|
|
accum.meta.size = std::cmp::max(accum.meta.size, end_of_chunk);
|
|
|
|
}
|
|
|
|
|
|
|
|
match peer_type {
|
|
|
|
ConnectionPeerType::Staked => {
|
|
|
|
stats
|
|
|
|
.total_staked_chunks_received
|
|
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
|
|
}
|
|
|
|
ConnectionPeerType::Unstaked => {
|
|
|
|
stats
|
|
|
|
.total_unstaked_chunks_received
|
|
|
|
.fetch_add(1, Ordering::Relaxed);
|
2022-08-09 17:39:14 -07:00
|
|
|
}
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// done receiving chunks
|
2023-03-16 06:50:57 -07:00
|
|
|
trace!("chunk is none");
|
|
|
|
if let Some(accum) = packet_accum.take() {
|
|
|
|
let len = accum
|
|
|
|
.chunks
|
|
|
|
.iter()
|
|
|
|
.map(|packet_bytes| packet_bytes.bytes.len())
|
|
|
|
.sum::<usize>();
|
|
|
|
if let Err(err) = packet_sender.send(accum).await {
|
2022-06-07 09:22:46 -07:00
|
|
|
stats
|
2023-03-16 06:50:57 -07:00
|
|
|
.total_handle_chunk_to_packet_batcher_send_err
|
2022-06-07 09:22:46 -07:00
|
|
|
.fetch_add(1, Ordering::Relaxed);
|
2023-03-16 06:50:57 -07:00
|
|
|
trace!("packet batch send error {:?}", err);
|
2022-06-07 09:22:46 -07:00
|
|
|
} else {
|
|
|
|
stats
|
2023-03-16 06:50:57 -07:00
|
|
|
.total_packets_sent_for_batching
|
2022-06-07 09:22:46 -07:00
|
|
|
.fetch_add(1, Ordering::Relaxed);
|
2023-03-16 06:50:57 -07:00
|
|
|
trace!("sent {} byte packet for batching", len);
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
stats
|
|
|
|
.total_packet_batches_none
|
|
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
debug!("Received stream error: {:?}", e);
|
|
|
|
stats
|
|
|
|
.total_stream_read_errors
|
|
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
false
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
struct ConnectionEntry {
|
|
|
|
exit: Arc<AtomicBool>,
|
2022-06-30 17:56:15 -07:00
|
|
|
stake: u64,
|
2022-06-07 09:22:46 -07:00
|
|
|
last_update: Arc<AtomicU64>,
|
|
|
|
port: u16,
|
2022-06-28 13:46:56 -07:00
|
|
|
connection: Option<Connection>,
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
impl ConnectionEntry {
|
2022-06-28 13:46:56 -07:00
|
|
|
fn new(
|
|
|
|
exit: Arc<AtomicBool>,
|
2022-06-30 17:56:15 -07:00
|
|
|
stake: u64,
|
2022-06-28 13:46:56 -07:00
|
|
|
last_update: Arc<AtomicU64>,
|
|
|
|
port: u16,
|
|
|
|
connection: Option<Connection>,
|
|
|
|
) -> Self {
|
2022-06-07 09:22:46 -07:00
|
|
|
Self {
|
|
|
|
exit,
|
2022-06-30 17:56:15 -07:00
|
|
|
stake,
|
2022-06-07 09:22:46 -07:00
|
|
|
last_update,
|
|
|
|
port,
|
2022-06-28 13:46:56 -07:00
|
|
|
connection,
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn last_update(&self) -> u64 {
|
|
|
|
self.last_update.load(Ordering::Relaxed)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Drop for ConnectionEntry {
|
|
|
|
fn drop(&mut self) {
|
2022-06-28 13:46:56 -07:00
|
|
|
if let Some(conn) = self.connection.take() {
|
2022-10-06 13:17:04 -07:00
|
|
|
conn.close(
|
2022-10-11 19:13:43 -07:00
|
|
|
CONNECTION_CLOSE_CODE_DROPPED_ENTRY.into(),
|
|
|
|
CONNECTION_CLOSE_REASON_DROPPED_ENTRY,
|
2022-10-06 13:17:04 -07:00
|
|
|
);
|
2022-06-28 13:46:56 -07:00
|
|
|
}
|
2022-06-07 09:22:46 -07:00
|
|
|
self.exit.store(true, Ordering::Relaxed);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-07-22 08:56:15 -07:00
|
|
|
#[derive(Copy, Clone, Debug)]
|
2022-07-29 08:44:24 -07:00
|
|
|
pub enum ConnectionPeerType {
|
2022-06-30 10:21:53 -07:00
|
|
|
Unstaked,
|
|
|
|
Staked,
|
|
|
|
}
|
|
|
|
|
2022-07-13 09:59:01 -07:00
|
|
|
#[derive(Copy, Clone, Eq, Hash, PartialEq)]
|
|
|
|
enum ConnectionTableKey {
|
|
|
|
IP(IpAddr),
|
|
|
|
Pubkey(Pubkey),
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ConnectionTableKey {
|
|
|
|
fn new(ip: IpAddr, maybe_pubkey: Option<Pubkey>) -> Self {
|
|
|
|
maybe_pubkey.map_or(ConnectionTableKey::IP(ip), |pubkey| {
|
|
|
|
ConnectionTableKey::Pubkey(pubkey)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-06-07 09:22:46 -07:00
|
|
|
// Map of IP to list of connection entries
|
|
|
|
struct ConnectionTable {
|
2022-07-13 09:59:01 -07:00
|
|
|
table: IndexMap<ConnectionTableKey, Vec<ConnectionEntry>>,
|
2022-06-07 09:22:46 -07:00
|
|
|
total_size: usize,
|
2022-06-30 10:21:53 -07:00
|
|
|
peer_type: ConnectionPeerType,
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// Prune the connection which has the oldest update
|
|
|
|
// Return number pruned
|
|
|
|
impl ConnectionTable {
|
2022-06-30 10:21:53 -07:00
|
|
|
fn new(peer_type: ConnectionPeerType) -> Self {
|
|
|
|
Self {
|
2022-06-30 17:56:15 -07:00
|
|
|
table: IndexMap::default(),
|
2022-06-30 10:21:53 -07:00
|
|
|
total_size: 0,
|
|
|
|
peer_type,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-06-07 09:22:46 -07:00
|
|
|
fn prune_oldest(&mut self, max_size: usize) -> usize {
|
|
|
|
let mut num_pruned = 0;
|
|
|
|
while self.total_size > max_size {
|
|
|
|
let mut oldest = std::u64::MAX;
|
2022-07-13 09:59:01 -07:00
|
|
|
let mut oldest_index = None;
|
|
|
|
for (index, (_key, connections)) in self.table.iter().enumerate() {
|
2022-06-07 09:22:46 -07:00
|
|
|
for entry in connections {
|
|
|
|
let last_update = entry.last_update();
|
|
|
|
if last_update < oldest {
|
|
|
|
oldest = last_update;
|
2022-07-13 09:59:01 -07:00
|
|
|
oldest_index = Some(index);
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2022-07-13 09:59:01 -07:00
|
|
|
if let Some(oldest_index) = oldest_index {
|
|
|
|
if let Some((_, removed)) = self.table.swap_remove_index(oldest_index) {
|
2022-06-27 17:17:08 -07:00
|
|
|
self.total_size -= removed.len();
|
|
|
|
num_pruned += removed.len();
|
|
|
|
}
|
|
|
|
} else {
|
2022-07-13 09:59:01 -07:00
|
|
|
// No valid entries in the table. Continuing the loop will cause
|
2022-06-27 17:17:08 -07:00
|
|
|
// infinite looping.
|
|
|
|
break;
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
num_pruned
|
|
|
|
}
|
|
|
|
|
2022-06-30 17:56:15 -07:00
|
|
|
fn connection_stake(&self, index: usize) -> Option<u64> {
|
|
|
|
self.table
|
|
|
|
.get_index(index)
|
|
|
|
.and_then(|(_, connection_vec)| connection_vec.first())
|
|
|
|
.map(|connection| connection.stake)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Randomly select two connections, and evict the one with lower stake. If the stakes of both
|
|
|
|
// the connections are higher than the threshold_stake, reject the pruning attempt, and return 0.
|
|
|
|
fn prune_random(&mut self, threshold_stake: u64) -> usize {
|
|
|
|
let mut num_pruned = 0;
|
|
|
|
let mut rng = thread_rng();
|
|
|
|
// The candidate1 and candidate2 could potentially be the same. If so, the stake of the candidate
|
|
|
|
// will be compared just against the threshold_stake.
|
|
|
|
let candidate1 = rng.gen_range(0, self.table.len());
|
|
|
|
let candidate2 = rng.gen_range(0, self.table.len());
|
|
|
|
|
|
|
|
let candidate1_stake = self.connection_stake(candidate1).unwrap_or(0);
|
|
|
|
let candidate2_stake = self.connection_stake(candidate2).unwrap_or(0);
|
|
|
|
|
|
|
|
if candidate1_stake < threshold_stake || candidate2_stake < threshold_stake {
|
|
|
|
let removed = if candidate1_stake < candidate2_stake {
|
|
|
|
self.table.swap_remove_index(candidate1)
|
|
|
|
} else {
|
|
|
|
self.table.swap_remove_index(candidate2)
|
|
|
|
};
|
|
|
|
|
|
|
|
if let Some((_, removed_value)) = removed {
|
|
|
|
self.total_size -= removed_value.len();
|
|
|
|
num_pruned += removed_value.len();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
num_pruned
|
|
|
|
}
|
|
|
|
|
2022-06-07 09:22:46 -07:00
|
|
|
fn try_add_connection(
|
|
|
|
&mut self,
|
2022-07-13 09:59:01 -07:00
|
|
|
key: ConnectionTableKey,
|
|
|
|
port: u16,
|
2022-06-28 13:46:56 -07:00
|
|
|
connection: Option<Connection>,
|
2022-06-30 17:56:15 -07:00
|
|
|
stake: u64,
|
2022-06-07 09:22:46 -07:00
|
|
|
last_update: u64,
|
2022-07-13 09:59:01 -07:00
|
|
|
max_connections_per_peer: usize,
|
2022-06-07 09:22:46 -07:00
|
|
|
) -> Option<(Arc<AtomicU64>, Arc<AtomicBool>)> {
|
2022-07-13 09:59:01 -07:00
|
|
|
let connection_entry = self.table.entry(key).or_insert_with(Vec::new);
|
2022-06-07 09:22:46 -07:00
|
|
|
let has_connection_capacity = connection_entry
|
|
|
|
.len()
|
|
|
|
.checked_add(1)
|
2022-07-13 09:59:01 -07:00
|
|
|
.map(|c| c <= max_connections_per_peer)
|
2022-06-07 09:22:46 -07:00
|
|
|
.unwrap_or(false);
|
|
|
|
if has_connection_capacity {
|
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
|
|
|
let last_update = Arc::new(AtomicU64::new(last_update));
|
|
|
|
connection_entry.push(ConnectionEntry::new(
|
|
|
|
exit.clone(),
|
2022-06-30 17:56:15 -07:00
|
|
|
stake,
|
2022-06-07 09:22:46 -07:00
|
|
|
last_update.clone(),
|
2022-07-13 09:59:01 -07:00
|
|
|
port,
|
2022-06-28 13:46:56 -07:00
|
|
|
connection,
|
2022-06-07 09:22:46 -07:00
|
|
|
));
|
|
|
|
self.total_size += 1;
|
|
|
|
Some((last_update, exit))
|
|
|
|
} else {
|
2022-10-06 13:17:04 -07:00
|
|
|
if let Some(connection) = connection {
|
|
|
|
connection.close(
|
|
|
|
CONNECTION_CLOSE_CODE_TOO_MANY.into(),
|
|
|
|
CONNECTION_CLOSE_REASON_TOO_MANY,
|
|
|
|
);
|
|
|
|
}
|
2022-06-07 09:22:46 -07:00
|
|
|
None
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-01-25 16:14:25 -08:00
|
|
|
// Returns number of connections that were removed
|
|
|
|
fn remove_connection(&mut self, key: ConnectionTableKey, port: u16, stable_id: usize) -> usize {
|
2022-07-13 09:59:01 -07:00
|
|
|
if let Entry::Occupied(mut e) = self.table.entry(key) {
|
2022-06-07 09:22:46 -07:00
|
|
|
let e_ref = e.get_mut();
|
2022-06-21 00:13:35 -07:00
|
|
|
let old_size = e_ref.len();
|
2023-01-25 16:14:25 -08:00
|
|
|
|
|
|
|
e_ref.retain(|connection_entry| {
|
|
|
|
// Retain the connection entry if the port is different, or if the connection's
|
|
|
|
// stable_id doesn't match the provided stable_id.
|
|
|
|
// (Some unit tests do not fill in a valid connection in the table. To support that,
|
|
|
|
// if the connection is none, the stable_id check is ignored. i.e. if the port matches,
|
|
|
|
// the connection gets removed)
|
|
|
|
connection_entry.port != port
|
|
|
|
|| connection_entry
|
|
|
|
.connection
|
|
|
|
.as_ref()
|
|
|
|
.and_then(|connection| (connection.stable_id() != stable_id).then_some(0))
|
|
|
|
.is_some()
|
|
|
|
});
|
2022-06-21 00:13:35 -07:00
|
|
|
let new_size = e_ref.len();
|
2022-06-07 09:22:46 -07:00
|
|
|
if e_ref.is_empty() {
|
|
|
|
e.remove_entry();
|
|
|
|
}
|
2023-01-25 16:14:25 -08:00
|
|
|
let connections_removed = old_size.saturating_sub(new_size);
|
|
|
|
self.total_size = self.total_size.saturating_sub(connections_removed);
|
|
|
|
connections_removed
|
2022-06-30 10:21:53 -07:00
|
|
|
} else {
|
2023-01-25 16:14:25 -08:00
|
|
|
0
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
pub mod test {
|
|
|
|
use {
|
|
|
|
super::*,
|
2022-07-12 13:34:37 -07:00
|
|
|
crate::{
|
2022-07-22 08:56:15 -07:00
|
|
|
nonblocking::quic::compute_max_allowed_uni_streams,
|
2022-07-12 13:34:37 -07:00
|
|
|
quic::{MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
|
2023-01-12 15:24:02 -08:00
|
|
|
tls_certificates::new_self_signed_tls_certificate,
|
2022-07-12 13:34:37 -07:00
|
|
|
},
|
2023-03-16 06:50:57 -07:00
|
|
|
async_channel::unbounded as async_unbounded,
|
2022-06-07 09:22:46 -07:00
|
|
|
crossbeam_channel::{unbounded, Receiver},
|
2023-01-11 10:08:22 -08:00
|
|
|
quinn::{ClientConfig, IdleTimeout, TransportConfig, VarInt},
|
2022-06-07 09:22:46 -07:00
|
|
|
solana_sdk::{
|
2023-03-16 06:50:57 -07:00
|
|
|
net::DEFAULT_TPU_COALESCE_MS,
|
2022-06-07 09:22:46 -07:00
|
|
|
quic::{QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS},
|
|
|
|
signature::Keypair,
|
2022-07-11 11:06:40 -07:00
|
|
|
signer::Signer,
|
2022-06-07 09:22:46 -07:00
|
|
|
},
|
2022-06-30 10:21:53 -07:00
|
|
|
std::net::Ipv4Addr,
|
2022-06-07 09:22:46 -07:00
|
|
|
tokio::time::sleep,
|
|
|
|
};
|
|
|
|
|
|
|
|
struct SkipServerVerification;
|
|
|
|
|
|
|
|
impl SkipServerVerification {
|
|
|
|
fn new() -> Arc<Self> {
|
|
|
|
Arc::new(Self)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl rustls::client::ServerCertVerifier for SkipServerVerification {
|
|
|
|
fn verify_server_cert(
|
|
|
|
&self,
|
|
|
|
_end_entity: &rustls::Certificate,
|
|
|
|
_intermediates: &[rustls::Certificate],
|
|
|
|
_server_name: &rustls::ServerName,
|
|
|
|
_scts: &mut dyn Iterator<Item = &[u8]>,
|
|
|
|
_ocsp_response: &[u8],
|
|
|
|
_now: std::time::SystemTime,
|
|
|
|
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
|
|
|
|
Ok(rustls::client::ServerCertVerified::assertion())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-07-11 11:06:40 -07:00
|
|
|
pub fn get_client_config(keypair: &Keypair) -> ClientConfig {
|
2023-01-23 14:49:51 -08:00
|
|
|
let ipaddr = IpAddr::V4(Ipv4Addr::LOCALHOST);
|
2023-01-12 15:24:02 -08:00
|
|
|
let (cert, key) = new_self_signed_tls_certificate(keypair, ipaddr)
|
2022-07-12 13:34:37 -07:00
|
|
|
.expect("Failed to generate client certificate");
|
2022-07-11 11:06:40 -07:00
|
|
|
|
|
|
|
let mut crypto = rustls::ClientConfig::builder()
|
2022-06-07 09:22:46 -07:00
|
|
|
.with_safe_defaults()
|
|
|
|
.with_custom_certificate_verifier(SkipServerVerification::new())
|
2023-01-12 15:24:02 -08:00
|
|
|
.with_single_cert(vec![cert], key)
|
2022-07-11 11:06:40 -07:00
|
|
|
.expect("Failed to use client certificate");
|
|
|
|
|
|
|
|
crypto.enable_early_data = true;
|
2022-07-13 11:55:13 -07:00
|
|
|
crypto.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()];
|
2022-07-11 11:06:40 -07:00
|
|
|
|
2022-06-07 09:22:46 -07:00
|
|
|
let mut config = ClientConfig::new(Arc::new(crypto));
|
|
|
|
|
2023-01-11 10:08:22 -08:00
|
|
|
let mut transport_config = TransportConfig::default();
|
2022-06-07 09:22:46 -07:00
|
|
|
let timeout = IdleTimeout::from(VarInt::from_u32(QUIC_MAX_TIMEOUT_MS));
|
|
|
|
transport_config.max_idle_timeout(Some(timeout));
|
|
|
|
transport_config.keep_alive_interval(Some(Duration::from_millis(QUIC_KEEP_ALIVE_MS)));
|
2023-01-11 10:08:22 -08:00
|
|
|
config.transport_config(Arc::new(transport_config));
|
2022-06-07 09:22:46 -07:00
|
|
|
|
|
|
|
config
|
|
|
|
}
|
|
|
|
|
2022-06-30 10:21:53 -07:00
|
|
|
fn setup_quic_server(
|
|
|
|
option_staked_nodes: Option<StakedNodes>,
|
2023-01-25 16:14:25 -08:00
|
|
|
max_connections_per_peer: usize,
|
2022-06-30 10:21:53 -07:00
|
|
|
) -> (
|
2022-06-07 09:22:46 -07:00
|
|
|
JoinHandle<()>,
|
|
|
|
Arc<AtomicBool>,
|
|
|
|
crossbeam_channel::Receiver<PacketBatch>,
|
|
|
|
SocketAddr,
|
2022-06-21 18:56:47 -07:00
|
|
|
Arc<StreamStats>,
|
2022-06-07 09:22:46 -07:00
|
|
|
) {
|
|
|
|
let s = UdpSocket::bind("127.0.0.1:0").unwrap();
|
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
|
|
|
let (sender, receiver) = unbounded();
|
|
|
|
let keypair = Keypair::new();
|
|
|
|
let ip = "127.0.0.1".parse().unwrap();
|
|
|
|
let server_address = s.local_addr().unwrap();
|
2022-06-30 10:21:53 -07:00
|
|
|
let staked_nodes = Arc::new(RwLock::new(option_staked_nodes.unwrap_or_default()));
|
2022-06-07 09:22:46 -07:00
|
|
|
let stats = Arc::new(StreamStats::default());
|
2022-12-09 10:59:43 -08:00
|
|
|
let (_, t) = spawn_server(
|
2022-06-07 09:22:46 -07:00
|
|
|
s,
|
|
|
|
&keypair,
|
|
|
|
ip,
|
|
|
|
sender,
|
|
|
|
exit.clone(),
|
2023-01-25 16:14:25 -08:00
|
|
|
max_connections_per_peer,
|
2022-06-07 09:22:46 -07:00
|
|
|
staked_nodes,
|
|
|
|
MAX_STAKED_CONNECTIONS,
|
|
|
|
MAX_UNSTAKED_CONNECTIONS,
|
2022-06-21 18:56:47 -07:00
|
|
|
stats.clone(),
|
2022-12-16 14:26:04 -08:00
|
|
|
2000,
|
2023-03-16 06:50:57 -07:00
|
|
|
DEFAULT_TPU_COALESCE_MS,
|
2022-06-07 09:22:46 -07:00
|
|
|
)
|
|
|
|
.unwrap();
|
2022-06-21 18:56:47 -07:00
|
|
|
(t, exit, receiver, server_address, stats)
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
|
|
|
|
2022-07-11 11:06:40 -07:00
|
|
|
pub async fn make_client_endpoint(
|
|
|
|
addr: &SocketAddr,
|
|
|
|
client_keypair: Option<&Keypair>,
|
2023-01-11 10:08:22 -08:00
|
|
|
) -> Connection {
|
2022-06-07 09:22:46 -07:00
|
|
|
let client_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
|
2023-01-11 10:08:22 -08:00
|
|
|
let mut endpoint =
|
|
|
|
quinn::Endpoint::new(EndpointConfig::default(), None, client_socket, TokioRuntime)
|
|
|
|
.unwrap();
|
2022-07-11 11:06:40 -07:00
|
|
|
let default_keypair = Keypair::new();
|
|
|
|
endpoint.set_default_client_config(get_client_config(
|
|
|
|
client_keypair.unwrap_or(&default_keypair),
|
|
|
|
));
|
|
|
|
endpoint
|
|
|
|
.connect(*addr, "localhost")
|
|
|
|
.expect("Failed in connecting")
|
|
|
|
.await
|
|
|
|
.expect("Failed in waiting")
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn check_timeout(receiver: Receiver<PacketBatch>, server_address: SocketAddr) {
|
2022-07-11 11:06:40 -07:00
|
|
|
let conn1 = make_client_endpoint(&server_address, None).await;
|
2022-06-07 09:22:46 -07:00
|
|
|
let total = 30;
|
|
|
|
for i in 0..total {
|
2023-01-11 10:08:22 -08:00
|
|
|
let mut s1 = conn1.open_uni().await.unwrap();
|
2022-06-07 09:22:46 -07:00
|
|
|
s1.write_all(&[0u8]).await.unwrap();
|
|
|
|
s1.finish().await.unwrap();
|
|
|
|
info!("done {}", i);
|
|
|
|
sleep(Duration::from_millis(1000)).await;
|
|
|
|
}
|
|
|
|
let mut received = 0;
|
|
|
|
loop {
|
2023-03-16 06:50:57 -07:00
|
|
|
if let Ok(_x) = receiver.try_recv() {
|
2022-06-07 09:22:46 -07:00
|
|
|
received += 1;
|
|
|
|
info!("got {}", received);
|
2023-03-16 06:50:57 -07:00
|
|
|
} else {
|
|
|
|
sleep(Duration::from_millis(500)).await;
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
|
|
|
if received >= total {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn check_block_multiple_connections(server_address: SocketAddr) {
|
2022-07-11 11:06:40 -07:00
|
|
|
let conn1 = make_client_endpoint(&server_address, None).await;
|
|
|
|
let conn2 = make_client_endpoint(&server_address, None).await;
|
2023-01-11 10:08:22 -08:00
|
|
|
let mut s1 = conn1.open_uni().await.unwrap();
|
|
|
|
let mut s2 = conn2.open_uni().await.unwrap();
|
2022-06-07 09:22:46 -07:00
|
|
|
s1.write_all(&[0u8]).await.unwrap();
|
|
|
|
s1.finish().await.unwrap();
|
|
|
|
// Send enough data to create more than 1 chunks.
|
|
|
|
// The first will try to open the connection (which should fail).
|
|
|
|
// The following chunks will enable the detection of connection failure.
|
|
|
|
let data = vec![1u8; PACKET_DATA_SIZE * 2];
|
|
|
|
s2.write_all(&data)
|
|
|
|
.await
|
|
|
|
.expect_err("shouldn't be able to open 2 connections");
|
|
|
|
s2.finish()
|
|
|
|
.await
|
|
|
|
.expect_err("shouldn't be able to open 2 connections");
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn check_multiple_streams(
|
|
|
|
receiver: Receiver<PacketBatch>,
|
|
|
|
server_address: SocketAddr,
|
|
|
|
) {
|
2022-07-11 11:06:40 -07:00
|
|
|
let conn1 = Arc::new(make_client_endpoint(&server_address, None).await);
|
|
|
|
let conn2 = Arc::new(make_client_endpoint(&server_address, None).await);
|
2022-06-07 09:22:46 -07:00
|
|
|
let mut num_expected_packets = 0;
|
|
|
|
for i in 0..10 {
|
|
|
|
info!("sending: {}", i);
|
|
|
|
let c1 = conn1.clone();
|
|
|
|
let c2 = conn2.clone();
|
2023-01-11 10:08:22 -08:00
|
|
|
let mut s1 = c1.open_uni().await.unwrap();
|
|
|
|
let mut s2 = c2.open_uni().await.unwrap();
|
2022-06-07 09:22:46 -07:00
|
|
|
s1.write_all(&[0u8]).await.unwrap();
|
|
|
|
s1.finish().await.unwrap();
|
|
|
|
s2.write_all(&[0u8]).await.unwrap();
|
|
|
|
s2.finish().await.unwrap();
|
|
|
|
num_expected_packets += 2;
|
|
|
|
sleep(Duration::from_millis(200)).await;
|
|
|
|
}
|
|
|
|
let mut all_packets = vec![];
|
|
|
|
let now = Instant::now();
|
|
|
|
let mut total_packets = 0;
|
|
|
|
while now.elapsed().as_secs() < 10 {
|
2023-03-16 06:50:57 -07:00
|
|
|
if let Ok(packets) = receiver.try_recv() {
|
2022-06-07 09:22:46 -07:00
|
|
|
total_packets += packets.len();
|
|
|
|
all_packets.push(packets)
|
2023-03-16 06:50:57 -07:00
|
|
|
} else {
|
|
|
|
sleep(Duration::from_secs(1)).await;
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
|
|
|
if total_packets == num_expected_packets {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for batch in all_packets {
|
|
|
|
for p in batch.iter() {
|
2022-12-06 03:54:49 -08:00
|
|
|
assert_eq!(p.meta().size, 1);
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
assert_eq!(total_packets, num_expected_packets);
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn check_multiple_writes(
|
|
|
|
receiver: Receiver<PacketBatch>,
|
|
|
|
server_address: SocketAddr,
|
2022-07-11 11:06:40 -07:00
|
|
|
client_keypair: Option<&Keypair>,
|
2022-06-07 09:22:46 -07:00
|
|
|
) {
|
2022-07-11 11:06:40 -07:00
|
|
|
let conn1 = Arc::new(make_client_endpoint(&server_address, client_keypair).await);
|
2022-06-07 09:22:46 -07:00
|
|
|
|
|
|
|
// Send a full size packet with single byte writes.
|
|
|
|
let num_bytes = PACKET_DATA_SIZE;
|
|
|
|
let num_expected_packets = 1;
|
2023-01-11 10:08:22 -08:00
|
|
|
let mut s1 = conn1.open_uni().await.unwrap();
|
2022-06-07 09:22:46 -07:00
|
|
|
for _ in 0..num_bytes {
|
|
|
|
s1.write_all(&[0u8]).await.unwrap();
|
|
|
|
}
|
|
|
|
s1.finish().await.unwrap();
|
|
|
|
|
|
|
|
let mut all_packets = vec![];
|
|
|
|
let now = Instant::now();
|
|
|
|
let mut total_packets = 0;
|
|
|
|
while now.elapsed().as_secs() < 5 {
|
2023-03-16 06:50:57 -07:00
|
|
|
// We're running in an async environment, we (almost) never
|
|
|
|
// want to block
|
|
|
|
if let Ok(packets) = receiver.try_recv() {
|
2022-06-07 09:22:46 -07:00
|
|
|
total_packets += packets.len();
|
|
|
|
all_packets.push(packets)
|
2023-03-16 06:50:57 -07:00
|
|
|
} else {
|
|
|
|
sleep(Duration::from_secs(1)).await;
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
2022-11-02 10:09:32 -07:00
|
|
|
if total_packets >= num_expected_packets {
|
2022-06-07 09:22:46 -07:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for batch in all_packets {
|
|
|
|
for p in batch.iter() {
|
2022-12-06 03:54:49 -08:00
|
|
|
assert_eq!(p.meta().size, num_bytes);
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
assert_eq!(total_packets, num_expected_packets);
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn check_unstaked_node_connect_failure(server_address: SocketAddr) {
|
2022-07-11 11:06:40 -07:00
|
|
|
let conn1 = Arc::new(make_client_endpoint(&server_address, None).await);
|
2022-06-07 09:22:46 -07:00
|
|
|
|
|
|
|
// Send a full size packet with single byte writes.
|
2023-01-11 10:08:22 -08:00
|
|
|
if let Ok(mut s1) = conn1.open_uni().await {
|
2022-06-07 09:22:46 -07:00
|
|
|
for _ in 0..PACKET_DATA_SIZE {
|
|
|
|
// Ignoring any errors here. s1.finish() will test the error condition
|
|
|
|
s1.write_all(&[0u8]).await.unwrap_or_default();
|
|
|
|
}
|
|
|
|
s1.finish().await.unwrap_err();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn test_quic_server_exit() {
|
2023-01-25 16:14:25 -08:00
|
|
|
let (t, exit, _receiver, _server_address, _stats) = setup_quic_server(None, 1);
|
2022-06-07 09:22:46 -07:00
|
|
|
exit.store(true, Ordering::Relaxed);
|
|
|
|
t.await.unwrap();
|
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn test_quic_timeout() {
|
|
|
|
solana_logger::setup();
|
2023-01-25 16:14:25 -08:00
|
|
|
let (t, exit, receiver, server_address, _stats) = setup_quic_server(None, 1);
|
2022-06-07 09:22:46 -07:00
|
|
|
check_timeout(receiver, server_address).await;
|
|
|
|
exit.store(true, Ordering::Relaxed);
|
|
|
|
t.await.unwrap();
|
|
|
|
}
|
|
|
|
|
2023-03-16 06:50:57 -07:00
|
|
|
#[tokio::test]
|
|
|
|
async fn test_packet_batcher() {
|
|
|
|
solana_logger::setup();
|
|
|
|
let (pkt_batch_sender, pkt_batch_receiver) = unbounded();
|
|
|
|
let (ptk_sender, pkt_receiver) = async_unbounded();
|
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
|
|
|
let stats = Arc::new(StreamStats::default());
|
|
|
|
|
|
|
|
let handle = tokio::spawn(packet_batch_sender(
|
|
|
|
pkt_batch_sender,
|
|
|
|
pkt_receiver,
|
|
|
|
exit.clone(),
|
|
|
|
stats,
|
|
|
|
DEFAULT_TPU_COALESCE_MS,
|
|
|
|
));
|
|
|
|
|
|
|
|
let num_packets = 1000;
|
|
|
|
|
|
|
|
for _i in 0..num_packets {
|
|
|
|
let mut meta = Meta::default();
|
|
|
|
let bytes = Bytes::from("Hello world");
|
|
|
|
let offset = 0;
|
|
|
|
let size = bytes.len();
|
|
|
|
meta.size = size;
|
|
|
|
let packet_accum = PacketAccumulator {
|
|
|
|
meta,
|
|
|
|
chunks: vec![PacketChunk {
|
|
|
|
bytes,
|
|
|
|
offset,
|
|
|
|
end_of_chunk: size,
|
|
|
|
}],
|
|
|
|
};
|
|
|
|
ptk_sender.send(packet_accum).await.unwrap();
|
|
|
|
}
|
|
|
|
let mut i = 0;
|
|
|
|
let start = Instant::now();
|
|
|
|
while i < num_packets && start.elapsed().as_secs() < 2 {
|
|
|
|
if let Ok(batch) = pkt_batch_receiver.try_recv() {
|
|
|
|
i += batch.len();
|
|
|
|
} else {
|
|
|
|
sleep(Duration::from_millis(1)).await;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
assert_eq!(i, num_packets);
|
|
|
|
exit.store(true, Ordering::Relaxed);
|
|
|
|
handle.await.unwrap();
|
|
|
|
}
|
|
|
|
|
2022-06-21 18:56:47 -07:00
|
|
|
#[tokio::test]
|
|
|
|
async fn test_quic_stream_timeout() {
|
|
|
|
solana_logger::setup();
|
2023-01-25 16:14:25 -08:00
|
|
|
let (t, exit, _receiver, server_address, stats) = setup_quic_server(None, 1);
|
2022-06-21 18:56:47 -07:00
|
|
|
|
2022-07-11 11:06:40 -07:00
|
|
|
let conn1 = make_client_endpoint(&server_address, None).await;
|
2022-06-21 18:56:47 -07:00
|
|
|
assert_eq!(stats.total_streams.load(Ordering::Relaxed), 0);
|
|
|
|
assert_eq!(stats.total_stream_read_timeouts.load(Ordering::Relaxed), 0);
|
|
|
|
|
|
|
|
// Send one byte to start the stream
|
2023-01-11 10:08:22 -08:00
|
|
|
let mut s1 = conn1.open_uni().await.unwrap();
|
2022-06-21 18:56:47 -07:00
|
|
|
s1.write_all(&[0u8]).await.unwrap_or_default();
|
|
|
|
|
|
|
|
// Wait long enough for the stream to timeout in receiving chunks
|
2022-11-02 10:09:32 -07:00
|
|
|
let sleep_time = (WAIT_FOR_STREAM_TIMEOUT_MS * 1000).min(3000);
|
2022-06-22 14:07:31 -07:00
|
|
|
sleep(Duration::from_millis(sleep_time)).await;
|
2022-06-21 18:56:47 -07:00
|
|
|
|
|
|
|
// Test that the stream was created, but timed out in read
|
2022-11-02 10:09:32 -07:00
|
|
|
assert_eq!(stats.total_streams.load(Ordering::Relaxed), 0);
|
2022-06-22 14:07:31 -07:00
|
|
|
assert_ne!(stats.total_stream_read_timeouts.load(Ordering::Relaxed), 0);
|
2022-06-21 18:56:47 -07:00
|
|
|
|
2022-11-02 10:09:32 -07:00
|
|
|
// Test that more writes to the stream will fail (i.e. the stream is no longer writable
|
|
|
|
// after the timeouts)
|
|
|
|
assert!(s1.write_all(&[0u8]).await.is_err());
|
|
|
|
assert!(s1.finish().await.is_err());
|
2022-06-21 18:56:47 -07:00
|
|
|
|
|
|
|
exit.store(true, Ordering::Relaxed);
|
|
|
|
t.await.unwrap();
|
|
|
|
}
|
|
|
|
|
2022-06-07 09:22:46 -07:00
|
|
|
#[tokio::test]
|
|
|
|
async fn test_quic_server_block_multiple_connections() {
|
|
|
|
solana_logger::setup();
|
2023-01-25 16:14:25 -08:00
|
|
|
let (t, exit, _receiver, server_address, _stats) = setup_quic_server(None, 1);
|
2022-06-07 09:22:46 -07:00
|
|
|
check_block_multiple_connections(server_address).await;
|
|
|
|
exit.store(true, Ordering::Relaxed);
|
|
|
|
t.await.unwrap();
|
|
|
|
}
|
|
|
|
|
2023-01-25 16:14:25 -08:00
|
|
|
#[tokio::test]
|
|
|
|
async fn test_quic_server_multiple_connections_on_single_client_endpoint() {
|
|
|
|
solana_logger::setup();
|
|
|
|
let (t, exit, _receiver, server_address, stats) = setup_quic_server(None, 2);
|
|
|
|
|
|
|
|
let client_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
|
|
|
|
let mut endpoint =
|
|
|
|
quinn::Endpoint::new(EndpointConfig::default(), None, client_socket, TokioRuntime)
|
|
|
|
.unwrap();
|
|
|
|
let default_keypair = Keypair::new();
|
|
|
|
endpoint.set_default_client_config(get_client_config(&default_keypair));
|
|
|
|
let conn1 = endpoint
|
|
|
|
.connect(server_address, "localhost")
|
|
|
|
.expect("Failed in connecting")
|
|
|
|
.await
|
|
|
|
.expect("Failed in waiting");
|
|
|
|
|
|
|
|
let conn2 = endpoint
|
|
|
|
.connect(server_address, "localhost")
|
|
|
|
.expect("Failed in connecting")
|
|
|
|
.await
|
|
|
|
.expect("Failed in waiting");
|
|
|
|
|
|
|
|
let mut s1 = conn1.open_uni().await.unwrap();
|
|
|
|
s1.write_all(&[0u8]).await.unwrap();
|
|
|
|
s1.finish().await.unwrap();
|
|
|
|
|
|
|
|
let mut s2 = conn2.open_uni().await.unwrap();
|
|
|
|
conn1.close(
|
|
|
|
CONNECTION_CLOSE_CODE_DROPPED_ENTRY.into(),
|
|
|
|
CONNECTION_CLOSE_REASON_DROPPED_ENTRY,
|
|
|
|
);
|
|
|
|
// Wait long enough for the stream to timeout in receiving chunks
|
|
|
|
let sleep_time = (WAIT_FOR_STREAM_TIMEOUT_MS * 1000).min(1000);
|
|
|
|
sleep(Duration::from_millis(sleep_time)).await;
|
|
|
|
|
|
|
|
assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 1);
|
|
|
|
|
|
|
|
s2.write_all(&[0u8]).await.unwrap();
|
|
|
|
s2.finish().await.unwrap();
|
|
|
|
|
|
|
|
conn2.close(
|
|
|
|
CONNECTION_CLOSE_CODE_DROPPED_ENTRY.into(),
|
|
|
|
CONNECTION_CLOSE_REASON_DROPPED_ENTRY,
|
|
|
|
);
|
|
|
|
// Wait long enough for the stream to timeout in receiving chunks
|
|
|
|
let sleep_time = (WAIT_FOR_STREAM_TIMEOUT_MS * 1000).min(1000);
|
|
|
|
sleep(Duration::from_millis(sleep_time)).await;
|
|
|
|
|
|
|
|
assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 2);
|
|
|
|
|
|
|
|
exit.store(true, Ordering::Relaxed);
|
|
|
|
t.await.unwrap();
|
|
|
|
}
|
|
|
|
|
2022-06-07 09:22:46 -07:00
|
|
|
#[tokio::test]
|
|
|
|
async fn test_quic_server_multiple_writes() {
|
|
|
|
solana_logger::setup();
|
2023-01-25 16:14:25 -08:00
|
|
|
let (t, exit, receiver, server_address, _stats) = setup_quic_server(None, 1);
|
2022-07-11 11:06:40 -07:00
|
|
|
check_multiple_writes(receiver, server_address, None).await;
|
2022-06-30 10:21:53 -07:00
|
|
|
exit.store(true, Ordering::Relaxed);
|
|
|
|
t.await.unwrap();
|
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn test_quic_server_staked_connection_removal() {
|
|
|
|
solana_logger::setup();
|
|
|
|
|
2022-07-11 11:06:40 -07:00
|
|
|
let client_keypair = Keypair::new();
|
2022-06-30 10:21:53 -07:00
|
|
|
let mut staked_nodes = StakedNodes::default();
|
|
|
|
staked_nodes
|
2022-07-11 11:06:40 -07:00
|
|
|
.pubkey_stake_map
|
|
|
|
.insert(client_keypair.pubkey(), 100000);
|
2022-06-30 17:56:15 -07:00
|
|
|
staked_nodes.total_stake = 100000;
|
2022-06-30 10:21:53 -07:00
|
|
|
|
2023-01-25 16:14:25 -08:00
|
|
|
let (t, exit, receiver, server_address, stats) = setup_quic_server(Some(staked_nodes), 1);
|
2022-07-11 11:06:40 -07:00
|
|
|
check_multiple_writes(receiver, server_address, Some(&client_keypair)).await;
|
2022-06-30 10:21:53 -07:00
|
|
|
exit.store(true, Ordering::Relaxed);
|
|
|
|
t.await.unwrap();
|
2022-07-11 11:06:40 -07:00
|
|
|
sleep(Duration::from_millis(100)).await;
|
|
|
|
assert_eq!(
|
|
|
|
stats
|
|
|
|
.connection_added_from_unstaked_peer
|
|
|
|
.load(Ordering::Relaxed),
|
|
|
|
0
|
|
|
|
);
|
|
|
|
assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 1);
|
|
|
|
assert_eq!(stats.connection_remove_failed.load(Ordering::Relaxed), 0);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn test_quic_server_zero_staked_connection_removal() {
|
|
|
|
// In this test, the client has a pubkey, but is not in stake table.
|
|
|
|
solana_logger::setup();
|
|
|
|
|
|
|
|
let client_keypair = Keypair::new();
|
|
|
|
let mut staked_nodes = StakedNodes::default();
|
|
|
|
staked_nodes
|
|
|
|
.pubkey_stake_map
|
|
|
|
.insert(client_keypair.pubkey(), 0);
|
|
|
|
staked_nodes.total_stake = 0;
|
|
|
|
|
2023-01-25 16:14:25 -08:00
|
|
|
let (t, exit, receiver, server_address, stats) = setup_quic_server(Some(staked_nodes), 1);
|
2022-07-11 11:06:40 -07:00
|
|
|
check_multiple_writes(receiver, server_address, Some(&client_keypair)).await;
|
|
|
|
exit.store(true, Ordering::Relaxed);
|
|
|
|
t.await.unwrap();
|
|
|
|
sleep(Duration::from_millis(100)).await;
|
|
|
|
assert_eq!(
|
|
|
|
stats
|
|
|
|
.connection_added_from_staked_peer
|
|
|
|
.load(Ordering::Relaxed),
|
|
|
|
0
|
|
|
|
);
|
2022-06-30 10:21:53 -07:00
|
|
|
assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 1);
|
|
|
|
assert_eq!(stats.connection_remove_failed.load(Ordering::Relaxed), 0);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn test_quic_server_unstaked_connection_removal() {
|
|
|
|
solana_logger::setup();
|
2023-01-25 16:14:25 -08:00
|
|
|
let (t, exit, receiver, server_address, stats) = setup_quic_server(None, 1);
|
2022-07-11 11:06:40 -07:00
|
|
|
check_multiple_writes(receiver, server_address, None).await;
|
2022-06-07 09:22:46 -07:00
|
|
|
exit.store(true, Ordering::Relaxed);
|
|
|
|
t.await.unwrap();
|
2022-07-11 11:06:40 -07:00
|
|
|
sleep(Duration::from_millis(100)).await;
|
|
|
|
assert_eq!(
|
|
|
|
stats
|
|
|
|
.connection_added_from_staked_peer
|
|
|
|
.load(Ordering::Relaxed),
|
|
|
|
0
|
|
|
|
);
|
2022-06-30 10:21:53 -07:00
|
|
|
assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 1);
|
|
|
|
assert_eq!(stats.connection_remove_failed.load(Ordering::Relaxed), 0);
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn test_quic_server_unstaked_node_connect_failure() {
|
|
|
|
solana_logger::setup();
|
|
|
|
let s = UdpSocket::bind("127.0.0.1:0").unwrap();
|
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
|
|
|
let (sender, _) = unbounded();
|
|
|
|
let keypair = Keypair::new();
|
|
|
|
let ip = "127.0.0.1".parse().unwrap();
|
|
|
|
let server_address = s.local_addr().unwrap();
|
2022-06-21 12:06:44 -07:00
|
|
|
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
|
2022-06-07 09:22:46 -07:00
|
|
|
let stats = Arc::new(StreamStats::default());
|
2022-12-09 10:59:43 -08:00
|
|
|
let (_, t) = spawn_server(
|
2022-06-07 09:22:46 -07:00
|
|
|
s,
|
|
|
|
&keypair,
|
|
|
|
ip,
|
|
|
|
sender,
|
|
|
|
exit.clone(),
|
|
|
|
1,
|
|
|
|
staked_nodes,
|
|
|
|
MAX_STAKED_CONNECTIONS,
|
|
|
|
0, // Do not allow any connection from unstaked clients/nodes
|
|
|
|
stats,
|
2022-11-02 10:09:32 -07:00
|
|
|
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
|
2023-03-16 06:50:57 -07:00
|
|
|
DEFAULT_TPU_COALESCE_MS,
|
2022-06-07 09:22:46 -07:00
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
check_unstaked_node_connect_failure(server_address).await;
|
|
|
|
exit.store(true, Ordering::Relaxed);
|
|
|
|
t.await.unwrap();
|
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn test_quic_server_multiple_streams() {
|
|
|
|
solana_logger::setup();
|
|
|
|
let s = UdpSocket::bind("127.0.0.1:0").unwrap();
|
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
|
|
|
let (sender, receiver) = unbounded();
|
|
|
|
let keypair = Keypair::new();
|
|
|
|
let ip = "127.0.0.1".parse().unwrap();
|
|
|
|
let server_address = s.local_addr().unwrap();
|
2022-06-21 12:06:44 -07:00
|
|
|
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
|
2022-06-07 09:22:46 -07:00
|
|
|
let stats = Arc::new(StreamStats::default());
|
2022-12-09 10:59:43 -08:00
|
|
|
let (_, t) = spawn_server(
|
2022-06-07 09:22:46 -07:00
|
|
|
s,
|
|
|
|
&keypair,
|
|
|
|
ip,
|
|
|
|
sender,
|
|
|
|
exit.clone(),
|
|
|
|
2,
|
|
|
|
staked_nodes,
|
|
|
|
MAX_STAKED_CONNECTIONS,
|
|
|
|
MAX_UNSTAKED_CONNECTIONS,
|
2022-06-23 14:09:43 -07:00
|
|
|
stats.clone(),
|
2022-11-02 10:09:32 -07:00
|
|
|
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
|
2023-03-16 06:50:57 -07:00
|
|
|
DEFAULT_TPU_COALESCE_MS,
|
2022-06-07 09:22:46 -07:00
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
check_multiple_streams(receiver, server_address).await;
|
2022-06-23 14:09:43 -07:00
|
|
|
assert_eq!(stats.total_streams.load(Ordering::Relaxed), 0);
|
|
|
|
assert_eq!(stats.total_new_streams.load(Ordering::Relaxed), 20);
|
|
|
|
assert_eq!(stats.total_connections.load(Ordering::Relaxed), 2);
|
|
|
|
assert_eq!(stats.total_new_connections.load(Ordering::Relaxed), 2);
|
2022-06-07 09:22:46 -07:00
|
|
|
exit.store(true, Ordering::Relaxed);
|
|
|
|
t.await.unwrap();
|
2022-06-23 14:09:43 -07:00
|
|
|
assert_eq!(stats.total_connections.load(Ordering::Relaxed), 0);
|
|
|
|
assert_eq!(stats.total_new_connections.load(Ordering::Relaxed), 2);
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
2022-07-13 09:59:01 -07:00
|
|
|
fn test_prune_table_with_ip() {
|
2022-06-07 09:22:46 -07:00
|
|
|
use std::net::Ipv4Addr;
|
|
|
|
solana_logger::setup();
|
2022-06-30 10:21:53 -07:00
|
|
|
let mut table = ConnectionTable::new(ConnectionPeerType::Staked);
|
2022-06-07 09:22:46 -07:00
|
|
|
let mut num_entries = 5;
|
2022-07-13 09:59:01 -07:00
|
|
|
let max_connections_per_peer = 10;
|
2022-06-07 09:22:46 -07:00
|
|
|
let sockets: Vec<_> = (0..num_entries)
|
|
|
|
.map(|i| SocketAddr::new(IpAddr::V4(Ipv4Addr::new(i, 0, 0, 0)), 0))
|
|
|
|
.collect();
|
|
|
|
for (i, socket) in sockets.iter().enumerate() {
|
|
|
|
table
|
2022-07-13 09:59:01 -07:00
|
|
|
.try_add_connection(
|
|
|
|
ConnectionTableKey::IP(socket.ip()),
|
|
|
|
socket.port(),
|
|
|
|
None,
|
|
|
|
0,
|
|
|
|
i as u64,
|
|
|
|
max_connections_per_peer,
|
|
|
|
)
|
2022-06-07 09:22:46 -07:00
|
|
|
.unwrap();
|
|
|
|
}
|
|
|
|
num_entries += 1;
|
|
|
|
table
|
2022-07-13 09:59:01 -07:00
|
|
|
.try_add_connection(
|
|
|
|
ConnectionTableKey::IP(sockets[0].ip()),
|
|
|
|
sockets[0].port(),
|
|
|
|
None,
|
|
|
|
0,
|
|
|
|
5,
|
|
|
|
max_connections_per_peer,
|
|
|
|
)
|
2022-06-07 09:22:46 -07:00
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let new_size = 3;
|
|
|
|
let pruned = table.prune_oldest(new_size);
|
|
|
|
assert_eq!(pruned, num_entries as usize - new_size);
|
|
|
|
for v in table.table.values() {
|
|
|
|
for x in v {
|
|
|
|
assert!((x.last_update() + 1) >= (num_entries as u64 - new_size as u64));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
assert_eq!(table.table.len(), new_size);
|
|
|
|
assert_eq!(table.total_size, new_size);
|
|
|
|
for socket in sockets.iter().take(num_entries as usize).skip(new_size - 1) {
|
2023-01-25 16:14:25 -08:00
|
|
|
table.remove_connection(ConnectionTableKey::IP(socket.ip()), socket.port(), 0);
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|
|
|
|
assert_eq!(table.total_size, 0);
|
|
|
|
}
|
2022-06-21 00:13:35 -07:00
|
|
|
|
2022-07-13 09:59:01 -07:00
|
|
|
#[test]
|
|
|
|
fn test_prune_table_with_unique_pubkeys() {
|
|
|
|
solana_logger::setup();
|
|
|
|
let mut table = ConnectionTable::new(ConnectionPeerType::Staked);
|
|
|
|
|
|
|
|
// We should be able to add more entries than max_connections_per_peer, since each entry is
|
|
|
|
// from a different peer pubkey.
|
|
|
|
let num_entries = 15;
|
|
|
|
let max_connections_per_peer = 10;
|
|
|
|
|
2023-01-05 10:05:32 -08:00
|
|
|
let pubkeys: Vec<_> = (0..num_entries).map(|_| Pubkey::new_unique()).collect();
|
2022-07-13 09:59:01 -07:00
|
|
|
for (i, pubkey) in pubkeys.iter().enumerate() {
|
|
|
|
table
|
|
|
|
.try_add_connection(
|
|
|
|
ConnectionTableKey::Pubkey(*pubkey),
|
|
|
|
0,
|
|
|
|
None,
|
|
|
|
0,
|
|
|
|
i as u64,
|
|
|
|
max_connections_per_peer,
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
}
|
|
|
|
|
|
|
|
let new_size = 3;
|
|
|
|
let pruned = table.prune_oldest(new_size);
|
|
|
|
assert_eq!(pruned, num_entries as usize - new_size);
|
|
|
|
assert_eq!(table.table.len(), new_size);
|
|
|
|
assert_eq!(table.total_size, new_size);
|
|
|
|
for pubkey in pubkeys.iter().take(num_entries as usize).skip(new_size - 1) {
|
2023-01-25 16:14:25 -08:00
|
|
|
table.remove_connection(ConnectionTableKey::Pubkey(*pubkey), 0, 0);
|
2022-07-13 09:59:01 -07:00
|
|
|
}
|
|
|
|
assert_eq!(table.total_size, 0);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_prune_table_with_non_unique_pubkeys() {
|
|
|
|
solana_logger::setup();
|
|
|
|
let mut table = ConnectionTable::new(ConnectionPeerType::Staked);
|
|
|
|
|
|
|
|
let max_connections_per_peer = 10;
|
|
|
|
let pubkey = Pubkey::new_unique();
|
|
|
|
(0..max_connections_per_peer).for_each(|i| {
|
|
|
|
table
|
|
|
|
.try_add_connection(
|
|
|
|
ConnectionTableKey::Pubkey(pubkey),
|
|
|
|
0,
|
|
|
|
None,
|
|
|
|
0,
|
|
|
|
i as u64,
|
|
|
|
max_connections_per_peer,
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
});
|
|
|
|
|
|
|
|
// We should NOT be able to add more entries than max_connections_per_peer, since we are
|
|
|
|
// using the same peer pubkey.
|
|
|
|
assert!(table
|
|
|
|
.try_add_connection(
|
|
|
|
ConnectionTableKey::Pubkey(pubkey),
|
|
|
|
0,
|
|
|
|
None,
|
|
|
|
0,
|
|
|
|
10,
|
|
|
|
max_connections_per_peer,
|
|
|
|
)
|
|
|
|
.is_none());
|
|
|
|
|
|
|
|
// We should be able to add an entry from another peer pubkey
|
|
|
|
let num_entries = max_connections_per_peer + 1;
|
|
|
|
let pubkey2 = Pubkey::new_unique();
|
|
|
|
assert!(table
|
|
|
|
.try_add_connection(
|
|
|
|
ConnectionTableKey::Pubkey(pubkey2),
|
|
|
|
0,
|
|
|
|
None,
|
|
|
|
0,
|
|
|
|
10,
|
|
|
|
max_connections_per_peer,
|
|
|
|
)
|
|
|
|
.is_some());
|
|
|
|
|
2022-11-09 11:39:38 -08:00
|
|
|
assert_eq!(table.total_size, num_entries);
|
2022-07-13 09:59:01 -07:00
|
|
|
|
|
|
|
let new_max_size = 3;
|
|
|
|
let pruned = table.prune_oldest(new_max_size);
|
2022-11-09 11:39:38 -08:00
|
|
|
assert!(pruned >= num_entries - new_max_size);
|
2022-07-13 09:59:01 -07:00
|
|
|
assert!(table.table.len() <= new_max_size);
|
|
|
|
assert!(table.total_size <= new_max_size);
|
|
|
|
|
2023-01-25 16:14:25 -08:00
|
|
|
table.remove_connection(ConnectionTableKey::Pubkey(pubkey2), 0, 0);
|
2022-07-13 09:59:01 -07:00
|
|
|
assert_eq!(table.total_size, 0);
|
|
|
|
}
|
|
|
|
|
2022-06-30 17:56:15 -07:00
|
|
|
#[test]
|
|
|
|
fn test_prune_table_random() {
|
|
|
|
use std::net::Ipv4Addr;
|
|
|
|
solana_logger::setup();
|
|
|
|
let mut table = ConnectionTable::new(ConnectionPeerType::Staked);
|
|
|
|
let num_entries = 5;
|
2022-07-13 09:59:01 -07:00
|
|
|
let max_connections_per_peer = 10;
|
2022-06-30 17:56:15 -07:00
|
|
|
let sockets: Vec<_> = (0..num_entries)
|
|
|
|
.map(|i| SocketAddr::new(IpAddr::V4(Ipv4Addr::new(i, 0, 0, 0)), 0))
|
|
|
|
.collect();
|
|
|
|
for (i, socket) in sockets.iter().enumerate() {
|
|
|
|
table
|
|
|
|
.try_add_connection(
|
2022-07-13 09:59:01 -07:00
|
|
|
ConnectionTableKey::IP(socket.ip()),
|
|
|
|
socket.port(),
|
2022-06-30 17:56:15 -07:00
|
|
|
None,
|
|
|
|
(i + 1) as u64,
|
|
|
|
i as u64,
|
2022-07-13 09:59:01 -07:00
|
|
|
max_connections_per_peer,
|
2022-06-30 17:56:15 -07:00
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
}
|
|
|
|
|
|
|
|
// Try pruninng with threshold stake less than all the entries in the table
|
|
|
|
// It should fail to prune (i.e. return 0 number of pruned entries)
|
|
|
|
let pruned = table.prune_random(0);
|
|
|
|
assert_eq!(pruned, 0);
|
|
|
|
|
|
|
|
// Try pruninng with threshold stake higher than all the entries in the table
|
|
|
|
// It should succeed to prune (i.e. return 1 number of pruned entries)
|
|
|
|
let pruned = table.prune_random(num_entries as u64 + 1);
|
|
|
|
assert_eq!(pruned, 1);
|
|
|
|
}
|
|
|
|
|
2022-06-21 00:13:35 -07:00
|
|
|
#[test]
|
|
|
|
fn test_remove_connections() {
|
|
|
|
use std::net::Ipv4Addr;
|
|
|
|
solana_logger::setup();
|
2022-06-30 10:21:53 -07:00
|
|
|
let mut table = ConnectionTable::new(ConnectionPeerType::Staked);
|
2022-06-21 00:13:35 -07:00
|
|
|
let num_ips = 5;
|
2022-07-13 09:59:01 -07:00
|
|
|
let max_connections_per_peer = 10;
|
2022-06-21 00:13:35 -07:00
|
|
|
let mut sockets: Vec<_> = (0..num_ips)
|
|
|
|
.map(|i| SocketAddr::new(IpAddr::V4(Ipv4Addr::new(i, 0, 0, 0)), 0))
|
|
|
|
.collect();
|
|
|
|
for (i, socket) in sockets.iter().enumerate() {
|
|
|
|
table
|
2022-07-13 09:59:01 -07:00
|
|
|
.try_add_connection(
|
|
|
|
ConnectionTableKey::IP(socket.ip()),
|
|
|
|
socket.port(),
|
|
|
|
None,
|
|
|
|
0,
|
|
|
|
(i * 2) as u64,
|
|
|
|
max_connections_per_peer,
|
|
|
|
)
|
2022-06-21 00:13:35 -07:00
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
table
|
2022-07-13 09:59:01 -07:00
|
|
|
.try_add_connection(
|
|
|
|
ConnectionTableKey::IP(socket.ip()),
|
|
|
|
socket.port(),
|
|
|
|
None,
|
|
|
|
0,
|
|
|
|
(i * 2 + 1) as u64,
|
|
|
|
max_connections_per_peer,
|
|
|
|
)
|
2022-06-21 00:13:35 -07:00
|
|
|
.unwrap();
|
|
|
|
}
|
|
|
|
|
|
|
|
let single_connection_addr =
|
|
|
|
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(num_ips, 0, 0, 0)), 0);
|
|
|
|
table
|
|
|
|
.try_add_connection(
|
2022-07-13 09:59:01 -07:00
|
|
|
ConnectionTableKey::IP(single_connection_addr.ip()),
|
|
|
|
single_connection_addr.port(),
|
2022-06-28 13:46:56 -07:00
|
|
|
None,
|
2022-06-30 17:56:15 -07:00
|
|
|
0,
|
2022-06-21 00:13:35 -07:00
|
|
|
(num_ips * 2) as u64,
|
2022-07-13 09:59:01 -07:00
|
|
|
max_connections_per_peer,
|
2022-06-21 00:13:35 -07:00
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let zero_connection_addr =
|
|
|
|
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(num_ips + 1, 0, 0, 0)), 0);
|
|
|
|
|
|
|
|
sockets.push(single_connection_addr);
|
|
|
|
sockets.push(zero_connection_addr);
|
|
|
|
|
|
|
|
for socket in sockets.iter() {
|
2023-01-25 16:14:25 -08:00
|
|
|
table.remove_connection(ConnectionTableKey::IP(socket.ip()), socket.port(), 0);
|
2022-06-21 00:13:35 -07:00
|
|
|
}
|
|
|
|
assert_eq!(table.total_size, 0);
|
|
|
|
}
|
2022-07-22 08:56:15 -07:00
|
|
|
|
|
|
|
#[test]
|
2022-08-09 10:02:47 -07:00
|
|
|
|
2022-07-22 08:56:15 -07:00
|
|
|
fn test_max_allowed_uni_streams() {
|
|
|
|
assert_eq!(
|
2022-07-29 08:44:24 -07:00
|
|
|
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0, 0),
|
2022-07-22 08:56:15 -07:00
|
|
|
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
|
|
|
|
);
|
|
|
|
assert_eq!(
|
2022-07-29 08:44:24 -07:00
|
|
|
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 10, 0),
|
2022-07-22 08:56:15 -07:00
|
|
|
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
|
|
|
|
);
|
|
|
|
assert_eq!(
|
2022-07-29 08:44:24 -07:00
|
|
|
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 0, 0),
|
2022-07-22 08:56:15 -07:00
|
|
|
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
|
|
|
|
);
|
|
|
|
assert_eq!(
|
2022-07-29 08:44:24 -07:00
|
|
|
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 10, 0),
|
2022-07-22 08:56:15 -07:00
|
|
|
QUIC_MIN_STAKED_CONCURRENT_STREAMS
|
|
|
|
);
|
2022-11-03 05:45:44 -07:00
|
|
|
let delta =
|
|
|
|
(QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64;
|
2022-07-22 08:56:15 -07:00
|
|
|
assert_eq!(
|
2022-07-29 08:44:24 -07:00
|
|
|
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 1000, 10000),
|
2022-11-15 12:58:10 -08:00
|
|
|
QUIC_MAX_STAKED_CONCURRENT_STREAMS,
|
2022-07-22 08:56:15 -07:00
|
|
|
);
|
|
|
|
assert_eq!(
|
2022-07-29 08:44:24 -07:00
|
|
|
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 100, 10000),
|
2022-11-03 05:45:44 -07:00
|
|
|
(delta / (100_f64)) as usize + QUIC_MIN_STAKED_CONCURRENT_STREAMS
|
2022-07-22 08:56:15 -07:00
|
|
|
);
|
|
|
|
assert_eq!(
|
2022-07-29 08:44:24 -07:00
|
|
|
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 0, 10000),
|
2022-07-22 08:56:15 -07:00
|
|
|
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
|
|
|
|
);
|
|
|
|
assert_eq!(
|
2022-07-29 08:44:24 -07:00
|
|
|
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 1000, 10000),
|
2022-07-22 08:56:15 -07:00
|
|
|
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
|
|
|
|
);
|
|
|
|
assert_eq!(
|
2022-07-29 08:44:24 -07:00
|
|
|
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 1, 10000),
|
2022-07-22 08:56:15 -07:00
|
|
|
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
|
|
|
|
);
|
|
|
|
assert_eq!(
|
2022-07-29 08:44:24 -07:00
|
|
|
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0, 10000),
|
2022-07-22 08:56:15 -07:00
|
|
|
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
|
|
|
|
);
|
|
|
|
}
|
2022-08-09 10:02:47 -07:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_cacluate_receive_window_ratio_for_staked_node() {
|
|
|
|
let mut max_stake = 10000;
|
|
|
|
let mut min_stake = 0;
|
|
|
|
let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, min_stake);
|
|
|
|
assert_eq!(ratio, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO);
|
|
|
|
|
|
|
|
let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake);
|
|
|
|
let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
|
|
|
|
assert_eq!(ratio, max_ratio);
|
|
|
|
|
|
|
|
let ratio =
|
|
|
|
compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake / 2);
|
|
|
|
let average_ratio =
|
|
|
|
(QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO + QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO) / 2;
|
|
|
|
assert_eq!(ratio, average_ratio);
|
|
|
|
|
|
|
|
max_stake = 10000;
|
|
|
|
min_stake = 10000;
|
|
|
|
let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake);
|
|
|
|
assert_eq!(ratio, max_ratio);
|
|
|
|
|
|
|
|
max_stake = 0;
|
|
|
|
min_stake = 0;
|
|
|
|
let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake);
|
|
|
|
assert_eq!(ratio, max_ratio);
|
|
|
|
|
|
|
|
max_stake = 1000;
|
|
|
|
min_stake = 10;
|
|
|
|
let ratio =
|
|
|
|
compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake + 10);
|
|
|
|
assert_eq!(ratio, max_ratio);
|
|
|
|
}
|
2022-06-07 09:22:46 -07:00
|
|
|
}
|