Add quic port for accepting transactions (#22753)
using quinn library streamer: Sign TLS cert with validator identity key Handle multiple incoming chunks
This commit is contained in:
parent
a47b76afcc
commit
5a230f418d
|
@ -275,6 +275,12 @@ version = "0.13.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
|
||||
|
||||
[[package]]
|
||||
name = "base64ct"
|
||||
version = "1.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "874f8444adcb4952a8bc51305c8be95c8ec8237bb0d2e78d2e039f771f8828a0"
|
||||
|
||||
[[package]]
|
||||
name = "bincode"
|
||||
version = "1.3.3"
|
||||
|
@ -813,6 +819,12 @@ dependencies = [
|
|||
"web-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "const-oid"
|
||||
version = "0.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e4c78c047431fee22c1a7bb92e00ad095a02a983affe4d8a72e2a2c62c1b94f3"
|
||||
|
||||
[[package]]
|
||||
name = "const_fn"
|
||||
version = "0.4.8"
|
||||
|
@ -1079,6 +1091,15 @@ dependencies = [
|
|||
"rayon",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "der"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6919815d73839e7ad218de758883aae3a257ba6759ce7a9992501efbb53d705c"
|
||||
dependencies = [
|
||||
"const-oid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "derivation-path"
|
||||
version = "0.1.3"
|
||||
|
@ -1655,6 +1676,15 @@ dependencies = [
|
|||
"slab",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fxhash"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gag"
|
||||
version = "1.0.0"
|
||||
|
@ -3026,6 +3056,15 @@ version = "0.1.2"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
|
||||
|
||||
[[package]]
|
||||
name = "pem"
|
||||
version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e9a3b09a20e374558580a4914d3b7d89bd61b954a5a5e1dcbea98753addb1947"
|
||||
dependencies = [
|
||||
"base64 0.13.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "percent-encoding"
|
||||
version = "1.0.1"
|
||||
|
@ -3136,6 +3175,17 @@ version = "0.1.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
|
||||
|
||||
[[package]]
|
||||
name = "pkcs8"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7cabda3fb821068a9a4fab19a683eac3af12edf0f34b94a8be53c4972b8149d0"
|
||||
dependencies = [
|
||||
"der",
|
||||
"spki",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pkg-config"
|
||||
version = "0.3.22"
|
||||
|
@ -3360,6 +3410,60 @@ version = "2.0.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3"
|
||||
|
||||
[[package]]
|
||||
name = "quinn"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "61a84d97630b137463c8e6802adc1dfe9de81457b41bb1ac59189e6761ab9255"
|
||||
dependencies = [
|
||||
"bytes 1.1.0",
|
||||
"futures-channel",
|
||||
"futures-util",
|
||||
"fxhash",
|
||||
"quinn-proto",
|
||||
"quinn-udp",
|
||||
"rustls 0.20.2",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"webpki 0.22.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quinn-proto"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "063dedf7983c8d57db474218f258daa85b627de6f2dbc458b690a93b1de790e8"
|
||||
dependencies = [
|
||||
"bytes 1.1.0",
|
||||
"fxhash",
|
||||
"rand 0.8.4",
|
||||
"ring",
|
||||
"rustls 0.20.2",
|
||||
"rustls-native-certs",
|
||||
"rustls-pemfile",
|
||||
"slab",
|
||||
"thiserror",
|
||||
"tinyvec",
|
||||
"tracing",
|
||||
"webpki 0.22.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quinn-udp"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5f7996776e9ee3fc0e5c14476c1a640a17e993c847ae9c81191c2c102fbef903"
|
||||
dependencies = [
|
||||
"futures-util",
|
||||
"libc",
|
||||
"mio 0.7.14",
|
||||
"quinn-proto",
|
||||
"socket2",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quote"
|
||||
version = "0.6.13"
|
||||
|
@ -3648,6 +3752,18 @@ dependencies = [
|
|||
"solana_rbpf",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rcgen"
|
||||
version = "0.8.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5911d1403f4143c9d56a702069d593e8d0f3fab880a85e103604d0893ea31ba7"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"pem",
|
||||
"ring",
|
||||
"yasna",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rdrand"
|
||||
version = "0.4.0"
|
||||
|
@ -3876,9 +3992,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.20.0"
|
||||
version = "0.20.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9b5ac6078ca424dc1d3ae2328526a76787fecc7f8011f520e3276730e711fc95"
|
||||
checksum = "d37e5e2290f3e040b594b1a9e04377c2c671f1a1cfd9bfdef82106ac1c113f84"
|
||||
dependencies = [
|
||||
"log 0.4.14",
|
||||
"ring",
|
||||
|
@ -3886,6 +4002,27 @@ dependencies = [
|
|||
"webpki 0.22.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-native-certs"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5ca9ebdfa27d3fc180e42879037b5338ab1c040c06affd00d8338598e7800943"
|
||||
dependencies = [
|
||||
"openssl-probe",
|
||||
"rustls-pemfile",
|
||||
"schannel",
|
||||
"security-framework",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-pemfile"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9"
|
||||
dependencies = [
|
||||
"base64 0.13.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustversion"
|
||||
version = "1.0.6"
|
||||
|
@ -5959,16 +6096,24 @@ name = "solana-streamer"
|
|||
version = "1.10.0"
|
||||
dependencies = [
|
||||
"crossbeam-channel",
|
||||
"futures-util",
|
||||
"histogram",
|
||||
"itertools 0.10.3",
|
||||
"libc",
|
||||
"log 0.4.14",
|
||||
"nix",
|
||||
"pem",
|
||||
"pkcs8",
|
||||
"quinn",
|
||||
"rand 0.7.3",
|
||||
"rcgen",
|
||||
"rustls 0.20.2",
|
||||
"solana-logger 1.10.0",
|
||||
"solana-metrics",
|
||||
"solana-perf",
|
||||
"solana-sdk",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -6273,6 +6418,16 @@ version = "0.9.2"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "511254be0c5bcf062b019a6c89c01a664aa359ded62f78aa72c6fc137c0590e5"
|
||||
|
||||
[[package]]
|
||||
name = "spki"
|
||||
version = "0.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "44d01ac02a6ccf3e07db148d2be087da624fea0221a16152ed01f0496a6b0a27"
|
||||
dependencies = [
|
||||
"base64ct",
|
||||
"der",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "spl-associated-token-account"
|
||||
version = "1.0.3"
|
||||
|
@ -6847,7 +7002,7 @@ version = "0.23.2"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a27d5f2b839802bd8267fa19b0530f5a08b9c08cd417976be2a65d130fe1c11b"
|
||||
dependencies = [
|
||||
"rustls 0.20.0",
|
||||
"rustls 0.20.2",
|
||||
"tokio",
|
||||
"webpki 0.22.0",
|
||||
]
|
||||
|
@ -6922,7 +7077,7 @@ checksum = "e80b39df6afcc12cdf752398ade96a6b9e99c903dfdc36e53ad10b9c366bca72"
|
|||
dependencies = [
|
||||
"futures-util",
|
||||
"log 0.4.14",
|
||||
"rustls 0.20.0",
|
||||
"rustls 0.20.2",
|
||||
"tokio",
|
||||
"tokio-rustls 0.23.2",
|
||||
"tungstenite",
|
||||
|
@ -7129,7 +7284,7 @@ dependencies = [
|
|||
"httparse",
|
||||
"log 0.4.14",
|
||||
"rand 0.8.4",
|
||||
"rustls 0.20.0",
|
||||
"rustls 0.20.2",
|
||||
"sha-1 0.9.8",
|
||||
"thiserror",
|
||||
"url 2.2.2",
|
||||
|
@ -7678,6 +7833,15 @@ dependencies = [
|
|||
"linked-hash-map",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "yasna"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e262a29d0e61ccf2b6190d7050d4b237535fc76ce4c1210d9caa316f71dffa75"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zeroize"
|
||||
version = "1.3.0"
|
||||
|
|
|
@ -26,6 +26,7 @@ use {
|
|||
cost_model::CostModel,
|
||||
vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
|
||||
},
|
||||
solana_sdk::signature::Keypair,
|
||||
std::{
|
||||
net::UdpSocket,
|
||||
sync::{atomic::AtomicBool, Arc, Mutex, RwLock},
|
||||
|
@ -40,6 +41,7 @@ pub struct TpuSockets {
|
|||
pub transaction_forwards: Vec<UdpSocket>,
|
||||
pub vote: Vec<UdpSocket>,
|
||||
pub broadcast: Vec<UdpSocket>,
|
||||
pub transactions_quic: UdpSocket,
|
||||
}
|
||||
|
||||
pub struct Tpu {
|
||||
|
@ -49,6 +51,7 @@ pub struct Tpu {
|
|||
banking_stage: BankingStage,
|
||||
cluster_info_vote_listener: ClusterInfoVoteListener,
|
||||
broadcast_stage: BroadcastStage,
|
||||
tpu_quic_t: thread::JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl Tpu {
|
||||
|
@ -75,12 +78,14 @@ impl Tpu {
|
|||
tpu_coalesce_ms: u64,
|
||||
cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender,
|
||||
cost_model: &Arc<RwLock<CostModel>>,
|
||||
keypair: &Keypair,
|
||||
) -> Self {
|
||||
let TpuSockets {
|
||||
transactions: transactions_sockets,
|
||||
transaction_forwards: tpu_forwards_sockets,
|
||||
vote: tpu_vote_sockets,
|
||||
broadcast: broadcast_sockets,
|
||||
transactions_quic: transactions_quic_sockets,
|
||||
} = sockets;
|
||||
|
||||
let (packet_sender, packet_receiver) = unbounded();
|
||||
|
@ -97,6 +102,15 @@ impl Tpu {
|
|||
);
|
||||
let (verified_sender, verified_receiver) = unbounded();
|
||||
|
||||
let tpu_quic_t = solana_streamer::quic::spawn_server(
|
||||
transactions_quic_sockets,
|
||||
keypair,
|
||||
cluster_info.my_contact_info().tpu.ip(),
|
||||
packet_sender,
|
||||
exit.clone(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let sigverify_stage = {
|
||||
let verifier = TransactionSigVerifier::default();
|
||||
SigVerifyStage::new(packet_receiver, verified_sender, verifier)
|
||||
|
@ -160,6 +174,7 @@ impl Tpu {
|
|||
banking_stage,
|
||||
cluster_info_vote_listener,
|
||||
broadcast_stage,
|
||||
tpu_quic_t,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -171,6 +186,7 @@ impl Tpu {
|
|||
self.cluster_info_vote_listener.join(),
|
||||
self.banking_stage.join(),
|
||||
];
|
||||
self.tpu_quic_t.join()?;
|
||||
let broadcast_result = self.broadcast_stage.join();
|
||||
for result in results {
|
||||
result?;
|
||||
|
|
|
@ -540,8 +540,11 @@ impl Validator {
|
|||
}
|
||||
}
|
||||
|
||||
let mut cluster_info =
|
||||
ClusterInfo::new(node.info.clone(), identity_keypair, socket_addr_space);
|
||||
let mut cluster_info = ClusterInfo::new(
|
||||
node.info.clone(),
|
||||
identity_keypair.clone(),
|
||||
socket_addr_space,
|
||||
);
|
||||
cluster_info.set_contact_debug_interval(config.contact_debug_interval);
|
||||
cluster_info.set_entrypoints(cluster_entrypoints);
|
||||
cluster_info.restore_contact_info(ledger_path, config.contact_save_interval);
|
||||
|
@ -886,6 +889,7 @@ impl Validator {
|
|||
transaction_forwards: node.sockets.tpu_forwards,
|
||||
vote: node.sockets.tpu_vote,
|
||||
broadcast: node.sockets.broadcast,
|
||||
transactions_quic: node.sockets.tpu_quic,
|
||||
},
|
||||
&rpc_subscriptions,
|
||||
transaction_status_sender,
|
||||
|
@ -903,6 +907,7 @@ impl Validator {
|
|||
config.tpu_coalesce_ms,
|
||||
cluster_confirmed_slot_sender,
|
||||
&cost_model,
|
||||
&identity_keypair,
|
||||
);
|
||||
|
||||
datapoint_info!("validator-new", ("id", id.to_string(), String));
|
||||
|
|
|
@ -59,6 +59,7 @@ use {
|
|||
feature_set::FeatureSet,
|
||||
hash::Hash,
|
||||
pubkey::Pubkey,
|
||||
quic::QUIC_PORT_OFFSET,
|
||||
sanitize::{Sanitize, SanitizeError},
|
||||
signature::{Keypair, Signable, Signature, Signer},
|
||||
timing::timestamp,
|
||||
|
@ -92,7 +93,7 @@ use {
|
|||
};
|
||||
|
||||
pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000);
|
||||
pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 11; // VALIDATOR_PORT_RANGE must be at least this wide
|
||||
pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 12; // VALIDATOR_PORT_RANGE must be at least this wide
|
||||
|
||||
/// The Data plane fanout size, also used as the neighborhood size
|
||||
pub const DATA_PLANE_FANOUT: usize = 200;
|
||||
|
@ -2741,6 +2742,7 @@ pub struct Sockets {
|
|||
pub retransmit_sockets: Vec<UdpSocket>,
|
||||
pub serve_repair: UdpSocket,
|
||||
pub ancestor_hashes_requests: UdpSocket,
|
||||
pub tpu_quic: UdpSocket,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -2757,6 +2759,8 @@ impl Node {
|
|||
pub fn new_localhost_with_pubkey(pubkey: &Pubkey) -> Self {
|
||||
let bind_ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
|
||||
let tpu = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let tpu_quic_port = tpu.local_addr().unwrap().port() + QUIC_PORT_OFFSET;
|
||||
let tpu_quic = UdpSocket::bind(format!("127.0.0.1:{}", tpu_quic_port)).unwrap();
|
||||
let (gossip_port, (gossip, ip_echo)) =
|
||||
bind_common_in_range(bind_ip_addr, (1024, 65535)).unwrap();
|
||||
let gossip_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), gossip_port);
|
||||
|
@ -2806,6 +2810,7 @@ impl Node {
|
|||
retransmit_sockets: vec![retransmit_socket],
|
||||
serve_repair,
|
||||
ancestor_hashes_requests,
|
||||
tpu_quic,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -2841,6 +2846,10 @@ impl Node {
|
|||
let (tvu_port, tvu) = Self::bind(bind_ip_addr, port_range);
|
||||
let (tvu_forwards_port, tvu_forwards) = Self::bind(bind_ip_addr, port_range);
|
||||
let (tpu_port, tpu) = Self::bind(bind_ip_addr, port_range);
|
||||
let (_tpu_port_quic, tpu_quic) = Self::bind(
|
||||
bind_ip_addr,
|
||||
(tpu_port + QUIC_PORT_OFFSET, tpu_port + QUIC_PORT_OFFSET + 1),
|
||||
);
|
||||
let (tpu_forwards_port, tpu_forwards) = Self::bind(bind_ip_addr, port_range);
|
||||
let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range);
|
||||
let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range);
|
||||
|
@ -2884,6 +2893,7 @@ impl Node {
|
|||
retransmit_sockets: vec![retransmit_socket],
|
||||
serve_repair,
|
||||
ancestor_hashes_requests,
|
||||
tpu_quic,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -2906,6 +2916,11 @@ impl Node {
|
|||
let (tpu_port, tpu_sockets) =
|
||||
multi_bind_in_range(bind_ip_addr, port_range, 32).expect("tpu multi_bind");
|
||||
|
||||
let (_tpu_port_quic, tpu_quic) = Self::bind(
|
||||
bind_ip_addr,
|
||||
(tpu_port + QUIC_PORT_OFFSET, tpu_port + QUIC_PORT_OFFSET + 1),
|
||||
);
|
||||
|
||||
let (tpu_forwards_port, tpu_forwards_sockets) =
|
||||
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind");
|
||||
|
||||
|
@ -2955,6 +2970,7 @@ impl Node {
|
|||
serve_repair,
|
||||
ip_echo: Some(ip_echo),
|
||||
ancestor_hashes_requests,
|
||||
tpu_quic,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -369,9 +369,9 @@ checksum = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1"
|
|||
|
||||
[[package]]
|
||||
name = "bytes"
|
||||
version = "1.0.1"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040"
|
||||
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
|
||||
|
||||
[[package]]
|
||||
name = "bzip2"
|
||||
|
@ -1195,7 +1195,7 @@ version = "0.3.3"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "825343c4eef0b63f541f8903f395dc5beb362a979b5799a84062527ef1e37726"
|
||||
dependencies = [
|
||||
"bytes 1.0.1",
|
||||
"bytes 1.1.0",
|
||||
"fnv",
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
|
@ -1298,7 +1298,7 @@ version = "0.4.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2861bd27ee074e5ee891e8b539837a9430012e249d7f0ca2d795650f579c1994"
|
||||
dependencies = [
|
||||
"bytes 1.0.1",
|
||||
"bytes 1.1.0",
|
||||
"http",
|
||||
]
|
||||
|
||||
|
@ -1326,7 +1326,7 @@ version = "0.14.11"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0b61cf2d1aebcf6e6352c97b81dc2244ca29194be1b276f5d8ad5c6330fffb11"
|
||||
dependencies = [
|
||||
"bytes 1.0.1",
|
||||
"bytes 1.1.0",
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
|
@ -2239,7 +2239,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "66d2927ca2f685faf0fc620ac4834690d29e7abb153add10f5812eef20b5e280"
|
||||
dependencies = [
|
||||
"base64 0.13.0",
|
||||
"bytes 1.0.1",
|
||||
"bytes 1.1.0",
|
||||
"encoding_rs",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
|
@ -2338,9 +2338,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.20.0"
|
||||
version = "0.20.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9b5ac6078ca424dc1d3ae2328526a76787fecc7f8011f520e3276730e711fc95"
|
||||
checksum = "d37e5e2290f3e040b594b1a9e04377c2c671f1a1cfd9bfdef82106ac1c113f84"
|
||||
dependencies = [
|
||||
"log",
|
||||
"ring",
|
||||
|
@ -4053,7 +4053,7 @@ version = "1.15.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fbbf1c778ec206785635ce8ad57fe52b3009ae9e0c9f574a728f3049d3e55838"
|
||||
dependencies = [
|
||||
"bytes 1.0.1",
|
||||
"bytes 1.1.0",
|
||||
"libc",
|
||||
"memchr",
|
||||
"mio",
|
||||
|
@ -4094,7 +4094,7 @@ version = "0.23.2"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a27d5f2b839802bd8267fa19b0530f5a08b9c08cd417976be2a65d130fe1c11b"
|
||||
dependencies = [
|
||||
"rustls 0.20.0",
|
||||
"rustls 0.20.2",
|
||||
"tokio",
|
||||
"webpki 0.22.0",
|
||||
]
|
||||
|
@ -4106,7 +4106,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "911a61637386b789af998ee23f50aa30d5fd7edcec8d6d3dedae5e5815205466"
|
||||
dependencies = [
|
||||
"bincode",
|
||||
"bytes 1.0.1",
|
||||
"bytes 1.1.0",
|
||||
"educe",
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
|
@ -4134,7 +4134,7 @@ checksum = "e80b39df6afcc12cdf752398ade96a6b9e99c903dfdc36e53ad10b9c366bca72"
|
|||
dependencies = [
|
||||
"futures-util",
|
||||
"log",
|
||||
"rustls 0.20.0",
|
||||
"rustls 0.20.2",
|
||||
"tokio",
|
||||
"tokio-rustls 0.23.2",
|
||||
"tungstenite",
|
||||
|
@ -4148,7 +4148,7 @@ version = "0.6.4"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ec31e5cc6b46e653cf57762f36f71d5e6386391d88a72fd6db4508f8f676fb29"
|
||||
dependencies = [
|
||||
"bytes 1.0.1",
|
||||
"bytes 1.1.0",
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"log",
|
||||
|
@ -4242,12 +4242,12 @@ checksum = "6ad3713a14ae247f22a728a0456a545df14acf3867f905adff84be99e23b3ad1"
|
|||
dependencies = [
|
||||
"base64 0.13.0",
|
||||
"byteorder 1.4.3",
|
||||
"bytes 1.0.1",
|
||||
"bytes 1.1.0",
|
||||
"http",
|
||||
"httparse",
|
||||
"log",
|
||||
"rand 0.8.2",
|
||||
"rustls 0.20.0",
|
||||
"rustls 0.20.2",
|
||||
"sha-1",
|
||||
"thiserror",
|
||||
"url",
|
||||
|
|
|
@ -38,6 +38,7 @@ pub mod poh_config;
|
|||
pub mod precompiles;
|
||||
pub mod program_utils;
|
||||
pub mod pubkey;
|
||||
pub mod quic;
|
||||
pub mod recent_blockhashes_account;
|
||||
pub mod rpc_port;
|
||||
pub mod secp256k1_instruction;
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
pub const QUIC_PORT_OFFSET: u16 = 1;
|
|
@ -11,15 +11,24 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
crossbeam-channel = "0.5"
|
||||
futures-util = "0.3.19"
|
||||
histogram = "0.6.9"
|
||||
itertools = "0.10.3"
|
||||
libc = "0.2.115"
|
||||
log = "0.4.14"
|
||||
nix = "0.23.1"
|
||||
quinn = "0.8.0"
|
||||
rand = "0.7.0"
|
||||
rcgen = "0.8.14"
|
||||
rustls = { version = "0.20.2", features = ["dangerous_configuration"] }
|
||||
pem = "1.0.2"
|
||||
pkcs8 = { version = "0.8.0", features = ["alloc"] }
|
||||
solana-metrics = { path = "../metrics", version = "=1.10.0" }
|
||||
solana-sdk = { path = "../sdk", version = "=1.10.0" }
|
||||
thiserror = "1.0"
|
||||
libc = "0.2.115"
|
||||
nix = "0.23.1"
|
||||
solana-perf = { path = "../perf", version = "=1.10.0" }
|
||||
thiserror = "1.0"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
|
||||
|
||||
[dev-dependencies]
|
||||
solana-logger = { path = "../logger", version = "=1.10.0" }
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
#![allow(clippy::integer_arithmetic)]
|
||||
pub mod packet;
|
||||
pub mod quic;
|
||||
pub mod recvmmsg;
|
||||
pub mod sendmmsg;
|
||||
pub mod socket;
|
||||
|
|
|
@ -0,0 +1,421 @@
|
|||
use {
|
||||
crossbeam_channel::Sender,
|
||||
futures_util::stream::StreamExt,
|
||||
pem::Pem,
|
||||
pkcs8::{der::Document, AlgorithmIdentifier, ObjectIdentifier},
|
||||
quinn::{Endpoint, EndpointConfig, ServerConfig},
|
||||
rcgen::{CertificateParams, DistinguishedName, DnType, SanType},
|
||||
solana_perf::packet::PacketBatch,
|
||||
solana_sdk::{
|
||||
packet::{Packet, PACKET_DATA_SIZE},
|
||||
signature::Keypair,
|
||||
},
|
||||
std::{
|
||||
error::Error,
|
||||
net::{IpAddr, SocketAddr, UdpSocket},
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
thread,
|
||||
time::Duration,
|
||||
},
|
||||
tokio::{
|
||||
runtime::{Builder, Runtime},
|
||||
time::timeout,
|
||||
},
|
||||
};
|
||||
|
||||
/// Returns default server configuration along with its PEM certificate chain.
|
||||
#[allow(clippy::field_reassign_with_default)] // https://github.com/rust-lang/rust-clippy/issues/6527
|
||||
fn configure_server(
|
||||
identity_keypair: &Keypair,
|
||||
gossip_host: IpAddr,
|
||||
) -> Result<(ServerConfig, String), QuicServerError> {
|
||||
let (cert_chain, priv_key) =
|
||||
new_cert(identity_keypair, gossip_host).map_err(|_e| QuicServerError::ConfigureFailed)?;
|
||||
let cert_chain_pem_parts: Vec<Pem> = cert_chain
|
||||
.iter()
|
||||
.map(|cert| Pem {
|
||||
tag: "CERTIFICATE".to_string(),
|
||||
contents: cert.0.clone(),
|
||||
})
|
||||
.collect();
|
||||
let cert_chain_pem = pem::encode_many(&cert_chain_pem_parts);
|
||||
|
||||
let mut server_config = ServerConfig::with_single_cert(cert_chain, priv_key)
|
||||
.map_err(|_e| QuicServerError::ConfigureFailed)?;
|
||||
let config = Arc::get_mut(&mut server_config.transport).unwrap();
|
||||
|
||||
const MAX_CONCURRENT_UNI_STREAMS: u32 = 1;
|
||||
config.max_concurrent_uni_streams(MAX_CONCURRENT_UNI_STREAMS.into());
|
||||
config.stream_receive_window((PACKET_DATA_SIZE as u32).into());
|
||||
config.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into());
|
||||
|
||||
// disable bidi & datagrams
|
||||
const MAX_CONCURRENT_BIDI_STREAMS: u32 = 0;
|
||||
config.max_concurrent_bidi_streams(MAX_CONCURRENT_BIDI_STREAMS.into());
|
||||
config.datagram_receive_buffer_size(None);
|
||||
|
||||
Ok((server_config, cert_chain_pem))
|
||||
}
|
||||
|
||||
fn new_cert(
|
||||
identity_keypair: &Keypair,
|
||||
san: IpAddr,
|
||||
) -> Result<(Vec<rustls::Certificate>, rustls::PrivateKey), Box<dyn Error>> {
|
||||
// Generate a self-signed cert from validator identity key
|
||||
let cert_params = new_cert_params(identity_keypair, san);
|
||||
let cert = rcgen::Certificate::from_params(cert_params)?;
|
||||
let cert_der = cert.serialize_der().unwrap();
|
||||
let priv_key = cert.serialize_private_key_der();
|
||||
let priv_key = rustls::PrivateKey(priv_key);
|
||||
let cert_chain = vec![rustls::Certificate(cert_der)];
|
||||
Ok((cert_chain, priv_key))
|
||||
}
|
||||
|
||||
fn convert_to_rcgen_keypair(identity_keypair: &Keypair) -> rcgen::KeyPair {
|
||||
// from https://datatracker.ietf.org/doc/html/rfc8410#section-3
|
||||
const ED25519_IDENTIFIER: [u32; 4] = [1, 3, 101, 112];
|
||||
let mut private_key = Vec::<u8>::with_capacity(34);
|
||||
private_key.extend_from_slice(&[0x04, 0x20]); // ASN.1 OCTET STRING
|
||||
private_key.extend_from_slice(identity_keypair.secret().as_bytes());
|
||||
let key_pkcs8 = pkcs8::PrivateKeyInfo {
|
||||
algorithm: AlgorithmIdentifier {
|
||||
oid: ObjectIdentifier::from_arcs(&ED25519_IDENTIFIER).unwrap(),
|
||||
parameters: None,
|
||||
},
|
||||
private_key: &private_key,
|
||||
public_key: None,
|
||||
};
|
||||
let key_pkcs8_der = key_pkcs8
|
||||
.to_der()
|
||||
.expect("Failed to convert keypair to DER")
|
||||
.to_der();
|
||||
|
||||
// Parse private key into rcgen::KeyPair struct.
|
||||
rcgen::KeyPair::from_der(&key_pkcs8_der).expect("Failed to parse keypair from DER")
|
||||
}
|
||||
|
||||
fn new_cert_params(identity_keypair: &Keypair, san: IpAddr) -> CertificateParams {
|
||||
// TODO(terorie): Is it safe to sign the TLS cert with the identity private key?
|
||||
|
||||
// Unfortunately, rcgen does not accept a "raw" Ed25519 key.
|
||||
// We have to convert it to DER and pass it to the library.
|
||||
|
||||
// Convert private key into PKCS#8 v1 object.
|
||||
// RFC 8410, Section 7: Private Key Format
|
||||
// https://datatracker.ietf.org/doc/html/rfc8410#section-
|
||||
|
||||
let keypair = convert_to_rcgen_keypair(identity_keypair);
|
||||
|
||||
let mut cert_params = CertificateParams::default();
|
||||
cert_params.subject_alt_names = vec![SanType::IpAddress(san)];
|
||||
cert_params.alg = &rcgen::PKCS_ED25519;
|
||||
cert_params.key_pair = Some(keypair);
|
||||
cert_params.distinguished_name = DistinguishedName::new();
|
||||
cert_params
|
||||
.distinguished_name
|
||||
.push(DnType::CommonName, "Solana node");
|
||||
cert_params
|
||||
}
|
||||
|
||||
pub fn rt() -> Runtime {
|
||||
Builder::new_current_thread().enable_all().build().unwrap()
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum QuicServerError {
|
||||
#[error("Server configure failed")]
|
||||
ConfigureFailed,
|
||||
|
||||
#[error("Endpoint creation failed")]
|
||||
EndpointFailed,
|
||||
}
|
||||
|
||||
// Return true if the server should drop the stream
|
||||
fn handle_chunk(
|
||||
chunk: &Result<Option<quinn::Chunk>, quinn::ReadError>,
|
||||
maybe_batch: &mut Option<PacketBatch>,
|
||||
remote_addr: &SocketAddr,
|
||||
packet_sender: &Sender<PacketBatch>,
|
||||
) -> bool {
|
||||
match chunk {
|
||||
Ok(maybe_chunk) => {
|
||||
if let Some(chunk) = maybe_chunk {
|
||||
trace!("got chunk: {:?}", chunk);
|
||||
let chunk_len = chunk.bytes.len() as u64;
|
||||
|
||||
// shouldn't happen, but sanity check the size and offsets
|
||||
if chunk.offset > PACKET_DATA_SIZE as u64 || chunk_len > PACKET_DATA_SIZE as u64 {
|
||||
return true;
|
||||
}
|
||||
if chunk.offset + chunk_len > PACKET_DATA_SIZE as u64 {
|
||||
return true;
|
||||
}
|
||||
|
||||
// chunk looks valid
|
||||
if maybe_batch.is_none() {
|
||||
let mut batch = PacketBatch::with_capacity(1);
|
||||
let mut packet = Packet::default();
|
||||
packet.meta.set_addr(remote_addr);
|
||||
batch.packets.push(packet);
|
||||
*maybe_batch = Some(batch);
|
||||
}
|
||||
|
||||
if let Some(batch) = maybe_batch.as_mut() {
|
||||
let end = chunk.offset as usize + chunk.bytes.len();
|
||||
batch.packets[0].data[chunk.offset as usize..end].copy_from_slice(&chunk.bytes);
|
||||
batch.packets[0].meta.size = std::cmp::max(batch.packets[0].meta.size, end);
|
||||
}
|
||||
} else {
|
||||
trace!("chunk is none");
|
||||
// done receiving chunks
|
||||
if let Some(batch) = maybe_batch.take() {
|
||||
let len = batch.packets[0].meta.size;
|
||||
if let Err(e) = packet_sender.send(batch) {
|
||||
info!("send error: {}", e);
|
||||
} else {
|
||||
trace!("sent {} byte packet", len);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Received stream error: {:?}", e);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
pub fn spawn_server(
|
||||
sock: UdpSocket,
|
||||
keypair: &Keypair,
|
||||
gossip_host: IpAddr,
|
||||
packet_sender: Sender<PacketBatch>,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> Result<thread::JoinHandle<()>, QuicServerError> {
|
||||
let (config, _cert) = configure_server(keypair, gossip_host)?;
|
||||
|
||||
let runtime = rt();
|
||||
let (_, mut incoming) = {
|
||||
let _guard = runtime.enter();
|
||||
Endpoint::new(EndpointConfig::default(), Some(config), sock)
|
||||
.map_err(|_e| QuicServerError::EndpointFailed)?
|
||||
};
|
||||
|
||||
let handle = thread::spawn(move || {
|
||||
let handle = runtime.spawn(async move {
|
||||
while !exit.load(Ordering::Relaxed) {
|
||||
const WAIT_FOR_CONNECTION_TIMEOUT_MS: u64 = 1000;
|
||||
let timeout_connection = timeout(
|
||||
Duration::from_millis(WAIT_FOR_CONNECTION_TIMEOUT_MS),
|
||||
incoming.next(),
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Ok(Some(connection)) = timeout_connection {
|
||||
if let Ok(new_connection) = connection.await {
|
||||
let exit = exit.clone();
|
||||
let quinn::NewConnection {
|
||||
connection,
|
||||
mut uni_streams,
|
||||
..
|
||||
} = new_connection;
|
||||
|
||||
let remote_addr = connection.remote_address();
|
||||
let packet_sender = packet_sender.clone();
|
||||
tokio::spawn(async move {
|
||||
debug!("new connection {}", remote_addr);
|
||||
while let Some(Ok(mut stream)) = uni_streams.next().await {
|
||||
let mut maybe_batch = None;
|
||||
while !exit.load(Ordering::Relaxed) {
|
||||
if handle_chunk(
|
||||
&stream.read_chunk(PACKET_DATA_SIZE, false).await,
|
||||
&mut maybe_batch,
|
||||
&remote_addr,
|
||||
&packet_sender,
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
if let Err(e) = runtime.block_on(handle) {
|
||||
warn!("error from runtime.block_on: {:?}", e);
|
||||
}
|
||||
});
|
||||
Ok(handle)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use crossbeam_channel::unbounded;
|
||||
use quinn::{ClientConfig, NewConnection};
|
||||
use std::{net::SocketAddr, time::Instant};
|
||||
|
||||
struct SkipServerVerification;
|
||||
|
||||
impl SkipServerVerification {
|
||||
fn new() -> Arc<Self> {
|
||||
Arc::new(Self)
|
||||
}
|
||||
}
|
||||
|
||||
impl rustls::client::ServerCertVerifier for SkipServerVerification {
|
||||
fn verify_server_cert(
|
||||
&self,
|
||||
_end_entity: &rustls::Certificate,
|
||||
_intermediates: &[rustls::Certificate],
|
||||
_server_name: &rustls::ServerName,
|
||||
_scts: &mut dyn Iterator<Item = &[u8]>,
|
||||
_ocsp_response: &[u8],
|
||||
_now: std::time::SystemTime,
|
||||
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
|
||||
Ok(rustls::client::ServerCertVerified::assertion())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_client_config() -> quinn::ClientConfig {
|
||||
let crypto = rustls::ClientConfig::builder()
|
||||
.with_safe_defaults()
|
||||
.with_custom_certificate_verifier(SkipServerVerification::new())
|
||||
.with_no_client_auth();
|
||||
ClientConfig::new(Arc::new(crypto))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_quic_server_exit() {
|
||||
let s = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let (sender, _receiver) = unbounded();
|
||||
let keypair = Keypair::new();
|
||||
let ip = "127.0.0.1".parse().unwrap();
|
||||
let t = spawn_server(s, &keypair, ip, sender, exit.clone()).unwrap();
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
t.join().unwrap();
|
||||
}
|
||||
|
||||
fn make_client_endpoint(runtime: &Runtime, addr: &SocketAddr) -> NewConnection {
|
||||
let client_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let mut endpoint = quinn::Endpoint::new(EndpointConfig::default(), None, client_socket)
|
||||
.unwrap()
|
||||
.0;
|
||||
endpoint.set_default_client_config(get_client_config());
|
||||
runtime
|
||||
.block_on(endpoint.connect(*addr, "localhost").unwrap())
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_quic_server_multiple_streams() {
|
||||
solana_logger::setup();
|
||||
let s = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let (sender, receiver) = unbounded();
|
||||
let keypair = Keypair::new();
|
||||
let ip = "127.0.0.1".parse().unwrap();
|
||||
let server_address = s.local_addr().unwrap();
|
||||
let t = spawn_server(s, &keypair, ip, sender, exit.clone()).unwrap();
|
||||
|
||||
let runtime = rt();
|
||||
let _rt_guard = runtime.enter();
|
||||
let conn1 = Arc::new(make_client_endpoint(&runtime, &server_address));
|
||||
let conn2 = Arc::new(make_client_endpoint(&runtime, &server_address));
|
||||
let mut num_expected_packets = 0;
|
||||
for i in 0..10 {
|
||||
info!("sending: {}", i);
|
||||
let c1 = conn1.clone();
|
||||
let c2 = conn2.clone();
|
||||
let handle = runtime.spawn(async move {
|
||||
let mut s1 = c1.connection.open_uni().await.unwrap();
|
||||
let mut s2 = c2.connection.open_uni().await.unwrap();
|
||||
s1.write_all(&[0u8]).await.unwrap();
|
||||
s1.finish().await.unwrap();
|
||||
s2.write_all(&[0u8]).await.unwrap();
|
||||
s2.finish().await.unwrap();
|
||||
});
|
||||
runtime.block_on(handle).unwrap();
|
||||
num_expected_packets += 2;
|
||||
thread::sleep(Duration::from_millis(200));
|
||||
}
|
||||
let mut all_packets = vec![];
|
||||
let now = Instant::now();
|
||||
let mut total_packets = 0;
|
||||
while now.elapsed().as_secs() < 10 {
|
||||
if let Ok(packets) = receiver.recv_timeout(Duration::from_secs(1)) {
|
||||
total_packets += packets.packets.len();
|
||||
all_packets.push(packets)
|
||||
}
|
||||
if total_packets == num_expected_packets {
|
||||
break;
|
||||
}
|
||||
}
|
||||
for batch in all_packets {
|
||||
for p in &batch.packets {
|
||||
assert_eq!(p.meta.size, 1);
|
||||
}
|
||||
}
|
||||
assert_eq!(total_packets, num_expected_packets);
|
||||
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
t.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_quic_server_multiple_writes() {
|
||||
solana_logger::setup();
|
||||
let s = UdpSocket::bind("127.0.0.1:0").unwrap();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let (sender, receiver) = unbounded();
|
||||
let keypair = Keypair::new();
|
||||
let ip = "127.0.0.1".parse().unwrap();
|
||||
let server_address = s.local_addr().unwrap();
|
||||
let t = spawn_server(s, &keypair, ip, sender, exit.clone()).unwrap();
|
||||
|
||||
let runtime = rt();
|
||||
let _rt_guard = runtime.enter();
|
||||
let conn1 = Arc::new(make_client_endpoint(&runtime, &server_address));
|
||||
|
||||
// Send a full size packet with single byte writes.
|
||||
let num_bytes = PACKET_DATA_SIZE;
|
||||
let num_expected_packets = 1;
|
||||
let handle = runtime.spawn(async move {
|
||||
let mut s1 = conn1.connection.open_uni().await.unwrap();
|
||||
for _ in 0..num_bytes {
|
||||
s1.write_all(&[0u8]).await.unwrap();
|
||||
}
|
||||
s1.finish().await.unwrap();
|
||||
});
|
||||
runtime.block_on(handle).unwrap();
|
||||
|
||||
let mut all_packets = vec![];
|
||||
let now = Instant::now();
|
||||
let mut total_packets = 0;
|
||||
while now.elapsed().as_secs() < 5 {
|
||||
if let Ok(packets) = receiver.recv_timeout(Duration::from_secs(1)) {
|
||||
total_packets += packets.packets.len();
|
||||
all_packets.push(packets)
|
||||
}
|
||||
if total_packets > num_expected_packets {
|
||||
break;
|
||||
}
|
||||
}
|
||||
for batch in all_packets {
|
||||
for p in &batch.packets {
|
||||
assert_eq!(p.meta.size, num_bytes);
|
||||
}
|
||||
}
|
||||
assert_eq!(total_packets, num_expected_packets);
|
||||
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
t.join().unwrap();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue