From 5a230f418d34a5dd2ea0c8ea8f460379ff2713ee Mon Sep 17 00:00:00 2001 From: sakridge Date: Fri, 4 Feb 2022 15:27:09 +0100 Subject: [PATCH] Add quic port for accepting transactions (#22753) using quinn library streamer: Sign TLS cert with validator identity key Handle multiple incoming chunks --- Cargo.lock | 174 ++++++++++++++- core/src/tpu.rs | 16 ++ core/src/validator.rs | 9 +- gossip/src/cluster_info.rs | 18 +- programs/bpf/Cargo.lock | 30 +-- sdk/src/lib.rs | 1 + sdk/src/quic.rs | 1 + streamer/Cargo.toml | 15 +- streamer/src/lib.rs | 1 + streamer/src/quic.rs | 421 +++++++++++++++++++++++++++++++++++++ 10 files changed, 660 insertions(+), 26 deletions(-) create mode 100644 sdk/src/quic.rs create mode 100644 streamer/src/quic.rs diff --git a/Cargo.lock b/Cargo.lock index 3de221b21f..dde26d3ec6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -275,6 +275,12 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" +[[package]] +name = "base64ct" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "874f8444adcb4952a8bc51305c8be95c8ec8237bb0d2e78d2e039f771f8828a0" + [[package]] name = "bincode" version = "1.3.3" @@ -813,6 +819,12 @@ dependencies = [ "web-sys", ] +[[package]] +name = "const-oid" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4c78c047431fee22c1a7bb92e00ad095a02a983affe4d8a72e2a2c62c1b94f3" + [[package]] name = "const_fn" version = "0.4.8" @@ -1079,6 +1091,15 @@ dependencies = [ "rayon", ] +[[package]] +name = "der" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6919815d73839e7ad218de758883aae3a257ba6759ce7a9992501efbb53d705c" +dependencies = [ + "const-oid", +] + [[package]] name = "derivation-path" version = "0.1.3" @@ -1655,6 +1676,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "gag" version = "1.0.0" @@ -3026,6 +3056,15 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" +[[package]] +name = "pem" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9a3b09a20e374558580a4914d3b7d89bd61b954a5a5e1dcbea98753addb1947" +dependencies = [ + "base64 0.13.0", +] + [[package]] name = "percent-encoding" version = "1.0.1" @@ -3136,6 +3175,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkcs8" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cabda3fb821068a9a4fab19a683eac3af12edf0f34b94a8be53c4972b8149d0" +dependencies = [ + "der", + "spki", + "zeroize", +] + [[package]] name = "pkg-config" version = "0.3.22" @@ -3360,6 +3410,60 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" +[[package]] +name = "quinn" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61a84d97630b137463c8e6802adc1dfe9de81457b41bb1ac59189e6761ab9255" +dependencies = [ + "bytes 1.1.0", + "futures-channel", + "futures-util", + "fxhash", + "quinn-proto", + "quinn-udp", + "rustls 0.20.2", + "thiserror", + "tokio", + "tracing", + "webpki 0.22.0", +] + +[[package]] +name = "quinn-proto" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "063dedf7983c8d57db474218f258daa85b627de6f2dbc458b690a93b1de790e8" +dependencies = [ + "bytes 1.1.0", + "fxhash", + "rand 0.8.4", + "ring", + "rustls 0.20.2", + "rustls-native-certs", + "rustls-pemfile", + "slab", + "thiserror", + "tinyvec", + "tracing", + "webpki 0.22.0", +] + +[[package]] +name = "quinn-udp" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f7996776e9ee3fc0e5c14476c1a640a17e993c847ae9c81191c2c102fbef903" +dependencies = [ + "futures-util", + "libc", + "mio 0.7.14", + "quinn-proto", + "socket2", + "tokio", + "tracing", +] + [[package]] name = "quote" version = "0.6.13" @@ -3648,6 +3752,18 @@ dependencies = [ "solana_rbpf", ] +[[package]] +name = "rcgen" +version = "0.8.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5911d1403f4143c9d56a702069d593e8d0f3fab880a85e103604d0893ea31ba7" +dependencies = [ + "chrono", + "pem", + "ring", + "yasna", +] + [[package]] name = "rdrand" version = "0.4.0" @@ -3876,9 +3992,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.20.0" +version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b5ac6078ca424dc1d3ae2328526a76787fecc7f8011f520e3276730e711fc95" +checksum = "d37e5e2290f3e040b594b1a9e04377c2c671f1a1cfd9bfdef82106ac1c113f84" dependencies = [ "log 0.4.14", "ring", @@ -3886,6 +4002,27 @@ dependencies = [ "webpki 0.22.0", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ca9ebdfa27d3fc180e42879037b5338ab1c040c06affd00d8338598e7800943" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9" +dependencies = [ + "base64 0.13.0", +] + [[package]] name = "rustversion" version = "1.0.6" @@ -5959,16 +6096,24 @@ name = "solana-streamer" version = "1.10.0" dependencies = [ "crossbeam-channel", + "futures-util", "histogram", "itertools 0.10.3", "libc", "log 0.4.14", "nix", + "pem", + "pkcs8", + "quinn", + "rand 0.7.3", + "rcgen", + "rustls 0.20.2", "solana-logger 1.10.0", "solana-metrics", "solana-perf", "solana-sdk", "thiserror", + "tokio", ] [[package]] @@ -6273,6 +6418,16 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "511254be0c5bcf062b019a6c89c01a664aa359ded62f78aa72c6fc137c0590e5" +[[package]] +name = "spki" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d01ac02a6ccf3e07db148d2be087da624fea0221a16152ed01f0496a6b0a27" +dependencies = [ + "base64ct", + "der", +] + [[package]] name = "spl-associated-token-account" version = "1.0.3" @@ -6847,7 +7002,7 @@ version = "0.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a27d5f2b839802bd8267fa19b0530f5a08b9c08cd417976be2a65d130fe1c11b" dependencies = [ - "rustls 0.20.0", + "rustls 0.20.2", "tokio", "webpki 0.22.0", ] @@ -6922,7 +7077,7 @@ checksum = "e80b39df6afcc12cdf752398ade96a6b9e99c903dfdc36e53ad10b9c366bca72" dependencies = [ "futures-util", "log 0.4.14", - "rustls 0.20.0", + "rustls 0.20.2", "tokio", "tokio-rustls 0.23.2", "tungstenite", @@ -7129,7 +7284,7 @@ dependencies = [ "httparse", "log 0.4.14", "rand 0.8.4", - "rustls 0.20.0", + "rustls 0.20.2", "sha-1 0.9.8", "thiserror", "url 2.2.2", @@ -7678,6 +7833,15 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "yasna" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e262a29d0e61ccf2b6190d7050d4b237535fc76ce4c1210d9caa316f71dffa75" +dependencies = [ + "chrono", +] + [[package]] name = "zeroize" version = "1.3.0" diff --git a/core/src/tpu.rs b/core/src/tpu.rs index d8eb58da70..a058075d08 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -26,6 +26,7 @@ use { cost_model::CostModel, vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender}, }, + solana_sdk::signature::Keypair, std::{ net::UdpSocket, sync::{atomic::AtomicBool, Arc, Mutex, RwLock}, @@ -40,6 +41,7 @@ pub struct TpuSockets { pub transaction_forwards: Vec, pub vote: Vec, pub broadcast: Vec, + pub transactions_quic: UdpSocket, } pub struct Tpu { @@ -49,6 +51,7 @@ pub struct Tpu { banking_stage: BankingStage, cluster_info_vote_listener: ClusterInfoVoteListener, broadcast_stage: BroadcastStage, + tpu_quic_t: thread::JoinHandle<()>, } impl Tpu { @@ -75,12 +78,14 @@ impl Tpu { tpu_coalesce_ms: u64, cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender, cost_model: &Arc>, + keypair: &Keypair, ) -> Self { let TpuSockets { transactions: transactions_sockets, transaction_forwards: tpu_forwards_sockets, vote: tpu_vote_sockets, broadcast: broadcast_sockets, + transactions_quic: transactions_quic_sockets, } = sockets; let (packet_sender, packet_receiver) = unbounded(); @@ -97,6 +102,15 @@ impl Tpu { ); let (verified_sender, verified_receiver) = unbounded(); + let tpu_quic_t = solana_streamer::quic::spawn_server( + transactions_quic_sockets, + keypair, + cluster_info.my_contact_info().tpu.ip(), + packet_sender, + exit.clone(), + ) + .unwrap(); + let sigverify_stage = { let verifier = TransactionSigVerifier::default(); SigVerifyStage::new(packet_receiver, verified_sender, verifier) @@ -160,6 +174,7 @@ impl Tpu { banking_stage, cluster_info_vote_listener, broadcast_stage, + tpu_quic_t, } } @@ -171,6 +186,7 @@ impl Tpu { self.cluster_info_vote_listener.join(), self.banking_stage.join(), ]; + self.tpu_quic_t.join()?; let broadcast_result = self.broadcast_stage.join(); for result in results { result?; diff --git a/core/src/validator.rs b/core/src/validator.rs index f047d014fc..df07030905 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -540,8 +540,11 @@ impl Validator { } } - let mut cluster_info = - ClusterInfo::new(node.info.clone(), identity_keypair, socket_addr_space); + let mut cluster_info = ClusterInfo::new( + node.info.clone(), + identity_keypair.clone(), + socket_addr_space, + ); cluster_info.set_contact_debug_interval(config.contact_debug_interval); cluster_info.set_entrypoints(cluster_entrypoints); cluster_info.restore_contact_info(ledger_path, config.contact_save_interval); @@ -886,6 +889,7 @@ impl Validator { transaction_forwards: node.sockets.tpu_forwards, vote: node.sockets.tpu_vote, broadcast: node.sockets.broadcast, + transactions_quic: node.sockets.tpu_quic, }, &rpc_subscriptions, transaction_status_sender, @@ -903,6 +907,7 @@ impl Validator { config.tpu_coalesce_ms, cluster_confirmed_slot_sender, &cost_model, + &identity_keypair, ); datapoint_info!("validator-new", ("id", id.to_string(), String)); diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 0847fdbe93..26aa6bb4fd 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -59,6 +59,7 @@ use { feature_set::FeatureSet, hash::Hash, pubkey::Pubkey, + quic::QUIC_PORT_OFFSET, sanitize::{Sanitize, SanitizeError}, signature::{Keypair, Signable, Signature, Signer}, timing::timestamp, @@ -92,7 +93,7 @@ use { }; pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000); -pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 11; // VALIDATOR_PORT_RANGE must be at least this wide +pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 12; // VALIDATOR_PORT_RANGE must be at least this wide /// The Data plane fanout size, also used as the neighborhood size pub const DATA_PLANE_FANOUT: usize = 200; @@ -2741,6 +2742,7 @@ pub struct Sockets { pub retransmit_sockets: Vec, pub serve_repair: UdpSocket, pub ancestor_hashes_requests: UdpSocket, + pub tpu_quic: UdpSocket, } #[derive(Debug)] @@ -2757,6 +2759,8 @@ impl Node { pub fn new_localhost_with_pubkey(pubkey: &Pubkey) -> Self { let bind_ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); let tpu = UdpSocket::bind("127.0.0.1:0").unwrap(); + let tpu_quic_port = tpu.local_addr().unwrap().port() + QUIC_PORT_OFFSET; + let tpu_quic = UdpSocket::bind(format!("127.0.0.1:{}", tpu_quic_port)).unwrap(); let (gossip_port, (gossip, ip_echo)) = bind_common_in_range(bind_ip_addr, (1024, 65535)).unwrap(); let gossip_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), gossip_port); @@ -2806,6 +2810,7 @@ impl Node { retransmit_sockets: vec![retransmit_socket], serve_repair, ancestor_hashes_requests, + tpu_quic, }, } } @@ -2841,6 +2846,10 @@ impl Node { let (tvu_port, tvu) = Self::bind(bind_ip_addr, port_range); let (tvu_forwards_port, tvu_forwards) = Self::bind(bind_ip_addr, port_range); let (tpu_port, tpu) = Self::bind(bind_ip_addr, port_range); + let (_tpu_port_quic, tpu_quic) = Self::bind( + bind_ip_addr, + (tpu_port + QUIC_PORT_OFFSET, tpu_port + QUIC_PORT_OFFSET + 1), + ); let (tpu_forwards_port, tpu_forwards) = Self::bind(bind_ip_addr, port_range); let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range); let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range); @@ -2884,6 +2893,7 @@ impl Node { retransmit_sockets: vec![retransmit_socket], serve_repair, ancestor_hashes_requests, + tpu_quic, }, } } @@ -2906,6 +2916,11 @@ impl Node { let (tpu_port, tpu_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 32).expect("tpu multi_bind"); + let (_tpu_port_quic, tpu_quic) = Self::bind( + bind_ip_addr, + (tpu_port + QUIC_PORT_OFFSET, tpu_port + QUIC_PORT_OFFSET + 1), + ); + let (tpu_forwards_port, tpu_forwards_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind"); @@ -2955,6 +2970,7 @@ impl Node { serve_repair, ip_echo: Some(ip_echo), ancestor_hashes_requests, + tpu_quic, }, } } diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index c4c80ee66c..6471c0c291 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -369,9 +369,9 @@ checksum = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" [[package]] name = "bytes" -version = "1.0.1" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040" +checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" [[package]] name = "bzip2" @@ -1195,7 +1195,7 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "825343c4eef0b63f541f8903f395dc5beb362a979b5799a84062527ef1e37726" dependencies = [ - "bytes 1.0.1", + "bytes 1.1.0", "fnv", "futures-core", "futures-sink", @@ -1298,7 +1298,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2861bd27ee074e5ee891e8b539837a9430012e249d7f0ca2d795650f579c1994" dependencies = [ - "bytes 1.0.1", + "bytes 1.1.0", "http", ] @@ -1326,7 +1326,7 @@ version = "0.14.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b61cf2d1aebcf6e6352c97b81dc2244ca29194be1b276f5d8ad5c6330fffb11" dependencies = [ - "bytes 1.0.1", + "bytes 1.1.0", "futures-channel", "futures-core", "futures-util", @@ -2239,7 +2239,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66d2927ca2f685faf0fc620ac4834690d29e7abb153add10f5812eef20b5e280" dependencies = [ "base64 0.13.0", - "bytes 1.0.1", + "bytes 1.1.0", "encoding_rs", "futures-core", "futures-util", @@ -2338,9 +2338,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.20.0" +version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b5ac6078ca424dc1d3ae2328526a76787fecc7f8011f520e3276730e711fc95" +checksum = "d37e5e2290f3e040b594b1a9e04377c2c671f1a1cfd9bfdef82106ac1c113f84" dependencies = [ "log", "ring", @@ -4053,7 +4053,7 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fbbf1c778ec206785635ce8ad57fe52b3009ae9e0c9f574a728f3049d3e55838" dependencies = [ - "bytes 1.0.1", + "bytes 1.1.0", "libc", "memchr", "mio", @@ -4094,7 +4094,7 @@ version = "0.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a27d5f2b839802bd8267fa19b0530f5a08b9c08cd417976be2a65d130fe1c11b" dependencies = [ - "rustls 0.20.0", + "rustls 0.20.2", "tokio", "webpki 0.22.0", ] @@ -4106,7 +4106,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "911a61637386b789af998ee23f50aa30d5fd7edcec8d6d3dedae5e5815205466" dependencies = [ "bincode", - "bytes 1.0.1", + "bytes 1.1.0", "educe", "futures-core", "futures-sink", @@ -4134,7 +4134,7 @@ checksum = "e80b39df6afcc12cdf752398ade96a6b9e99c903dfdc36e53ad10b9c366bca72" dependencies = [ "futures-util", "log", - "rustls 0.20.0", + "rustls 0.20.2", "tokio", "tokio-rustls 0.23.2", "tungstenite", @@ -4148,7 +4148,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec31e5cc6b46e653cf57762f36f71d5e6386391d88a72fd6db4508f8f676fb29" dependencies = [ - "bytes 1.0.1", + "bytes 1.1.0", "futures-core", "futures-sink", "log", @@ -4242,12 +4242,12 @@ checksum = "6ad3713a14ae247f22a728a0456a545df14acf3867f905adff84be99e23b3ad1" dependencies = [ "base64 0.13.0", "byteorder 1.4.3", - "bytes 1.0.1", + "bytes 1.1.0", "http", "httparse", "log", "rand 0.8.2", - "rustls 0.20.0", + "rustls 0.20.2", "sha-1", "thiserror", "url", diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index 38cf07db86..d51f03fb10 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -38,6 +38,7 @@ pub mod poh_config; pub mod precompiles; pub mod program_utils; pub mod pubkey; +pub mod quic; pub mod recent_blockhashes_account; pub mod rpc_port; pub mod secp256k1_instruction; diff --git a/sdk/src/quic.rs b/sdk/src/quic.rs new file mode 100644 index 0000000000..2938c3536a --- /dev/null +++ b/sdk/src/quic.rs @@ -0,0 +1 @@ +pub const QUIC_PORT_OFFSET: u16 = 1; diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index d0ca98f272..a2bbf9b310 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -11,15 +11,24 @@ edition = "2021" [dependencies] crossbeam-channel = "0.5" +futures-util = "0.3.19" histogram = "0.6.9" itertools = "0.10.3" +libc = "0.2.115" log = "0.4.14" +nix = "0.23.1" +quinn = "0.8.0" +rand = "0.7.0" +rcgen = "0.8.14" +rustls = { version = "0.20.2", features = ["dangerous_configuration"] } +pem = "1.0.2" +pkcs8 = { version = "0.8.0", features = ["alloc"] } solana-metrics = { path = "../metrics", version = "=1.10.0" } solana-sdk = { path = "../sdk", version = "=1.10.0" } -thiserror = "1.0" -libc = "0.2.115" -nix = "0.23.1" solana-perf = { path = "../perf", version = "=1.10.0" } +thiserror = "1.0" +tokio = { version = "1", features = ["full"] } + [dev-dependencies] solana-logger = { path = "../logger", version = "=1.10.0" } diff --git a/streamer/src/lib.rs b/streamer/src/lib.rs index c8ba5f6d4b..3fbad9a2df 100644 --- a/streamer/src/lib.rs +++ b/streamer/src/lib.rs @@ -1,5 +1,6 @@ #![allow(clippy::integer_arithmetic)] pub mod packet; +pub mod quic; pub mod recvmmsg; pub mod sendmmsg; pub mod socket; diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs new file mode 100644 index 0000000000..b936a04218 --- /dev/null +++ b/streamer/src/quic.rs @@ -0,0 +1,421 @@ +use { + crossbeam_channel::Sender, + futures_util::stream::StreamExt, + pem::Pem, + pkcs8::{der::Document, AlgorithmIdentifier, ObjectIdentifier}, + quinn::{Endpoint, EndpointConfig, ServerConfig}, + rcgen::{CertificateParams, DistinguishedName, DnType, SanType}, + solana_perf::packet::PacketBatch, + solana_sdk::{ + packet::{Packet, PACKET_DATA_SIZE}, + signature::Keypair, + }, + std::{ + error::Error, + net::{IpAddr, SocketAddr, UdpSocket}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread, + time::Duration, + }, + tokio::{ + runtime::{Builder, Runtime}, + time::timeout, + }, +}; + +/// Returns default server configuration along with its PEM certificate chain. +#[allow(clippy::field_reassign_with_default)] // https://github.com/rust-lang/rust-clippy/issues/6527 +fn configure_server( + identity_keypair: &Keypair, + gossip_host: IpAddr, +) -> Result<(ServerConfig, String), QuicServerError> { + let (cert_chain, priv_key) = + new_cert(identity_keypair, gossip_host).map_err(|_e| QuicServerError::ConfigureFailed)?; + let cert_chain_pem_parts: Vec = cert_chain + .iter() + .map(|cert| Pem { + tag: "CERTIFICATE".to_string(), + contents: cert.0.clone(), + }) + .collect(); + let cert_chain_pem = pem::encode_many(&cert_chain_pem_parts); + + let mut server_config = ServerConfig::with_single_cert(cert_chain, priv_key) + .map_err(|_e| QuicServerError::ConfigureFailed)?; + let config = Arc::get_mut(&mut server_config.transport).unwrap(); + + const MAX_CONCURRENT_UNI_STREAMS: u32 = 1; + config.max_concurrent_uni_streams(MAX_CONCURRENT_UNI_STREAMS.into()); + config.stream_receive_window((PACKET_DATA_SIZE as u32).into()); + config.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into()); + + // disable bidi & datagrams + const MAX_CONCURRENT_BIDI_STREAMS: u32 = 0; + config.max_concurrent_bidi_streams(MAX_CONCURRENT_BIDI_STREAMS.into()); + config.datagram_receive_buffer_size(None); + + Ok((server_config, cert_chain_pem)) +} + +fn new_cert( + identity_keypair: &Keypair, + san: IpAddr, +) -> Result<(Vec, rustls::PrivateKey), Box> { + // Generate a self-signed cert from validator identity key + let cert_params = new_cert_params(identity_keypair, san); + let cert = rcgen::Certificate::from_params(cert_params)?; + let cert_der = cert.serialize_der().unwrap(); + let priv_key = cert.serialize_private_key_der(); + let priv_key = rustls::PrivateKey(priv_key); + let cert_chain = vec![rustls::Certificate(cert_der)]; + Ok((cert_chain, priv_key)) +} + +fn convert_to_rcgen_keypair(identity_keypair: &Keypair) -> rcgen::KeyPair { + // from https://datatracker.ietf.org/doc/html/rfc8410#section-3 + const ED25519_IDENTIFIER: [u32; 4] = [1, 3, 101, 112]; + let mut private_key = Vec::::with_capacity(34); + private_key.extend_from_slice(&[0x04, 0x20]); // ASN.1 OCTET STRING + private_key.extend_from_slice(identity_keypair.secret().as_bytes()); + let key_pkcs8 = pkcs8::PrivateKeyInfo { + algorithm: AlgorithmIdentifier { + oid: ObjectIdentifier::from_arcs(&ED25519_IDENTIFIER).unwrap(), + parameters: None, + }, + private_key: &private_key, + public_key: None, + }; + let key_pkcs8_der = key_pkcs8 + .to_der() + .expect("Failed to convert keypair to DER") + .to_der(); + + // Parse private key into rcgen::KeyPair struct. + rcgen::KeyPair::from_der(&key_pkcs8_der).expect("Failed to parse keypair from DER") +} + +fn new_cert_params(identity_keypair: &Keypair, san: IpAddr) -> CertificateParams { + // TODO(terorie): Is it safe to sign the TLS cert with the identity private key? + + // Unfortunately, rcgen does not accept a "raw" Ed25519 key. + // We have to convert it to DER and pass it to the library. + + // Convert private key into PKCS#8 v1 object. + // RFC 8410, Section 7: Private Key Format + // https://datatracker.ietf.org/doc/html/rfc8410#section- + + let keypair = convert_to_rcgen_keypair(identity_keypair); + + let mut cert_params = CertificateParams::default(); + cert_params.subject_alt_names = vec![SanType::IpAddress(san)]; + cert_params.alg = &rcgen::PKCS_ED25519; + cert_params.key_pair = Some(keypair); + cert_params.distinguished_name = DistinguishedName::new(); + cert_params + .distinguished_name + .push(DnType::CommonName, "Solana node"); + cert_params +} + +pub fn rt() -> Runtime { + Builder::new_current_thread().enable_all().build().unwrap() +} + +#[derive(thiserror::Error, Debug)] +pub enum QuicServerError { + #[error("Server configure failed")] + ConfigureFailed, + + #[error("Endpoint creation failed")] + EndpointFailed, +} + +// Return true if the server should drop the stream +fn handle_chunk( + chunk: &Result, quinn::ReadError>, + maybe_batch: &mut Option, + remote_addr: &SocketAddr, + packet_sender: &Sender, +) -> bool { + match chunk { + Ok(maybe_chunk) => { + if let Some(chunk) = maybe_chunk { + trace!("got chunk: {:?}", chunk); + let chunk_len = chunk.bytes.len() as u64; + + // shouldn't happen, but sanity check the size and offsets + if chunk.offset > PACKET_DATA_SIZE as u64 || chunk_len > PACKET_DATA_SIZE as u64 { + return true; + } + if chunk.offset + chunk_len > PACKET_DATA_SIZE as u64 { + return true; + } + + // chunk looks valid + if maybe_batch.is_none() { + let mut batch = PacketBatch::with_capacity(1); + let mut packet = Packet::default(); + packet.meta.set_addr(remote_addr); + batch.packets.push(packet); + *maybe_batch = Some(batch); + } + + if let Some(batch) = maybe_batch.as_mut() { + let end = chunk.offset as usize + chunk.bytes.len(); + batch.packets[0].data[chunk.offset as usize..end].copy_from_slice(&chunk.bytes); + batch.packets[0].meta.size = std::cmp::max(batch.packets[0].meta.size, end); + } + } else { + trace!("chunk is none"); + // done receiving chunks + if let Some(batch) = maybe_batch.take() { + let len = batch.packets[0].meta.size; + if let Err(e) = packet_sender.send(batch) { + info!("send error: {}", e); + } else { + trace!("sent {} byte packet", len); + } + } + return true; + } + } + Err(e) => { + debug!("Received stream error: {:?}", e); + return true; + } + } + false +} + +pub fn spawn_server( + sock: UdpSocket, + keypair: &Keypair, + gossip_host: IpAddr, + packet_sender: Sender, + exit: Arc, +) -> Result, QuicServerError> { + let (config, _cert) = configure_server(keypair, gossip_host)?; + + let runtime = rt(); + let (_, mut incoming) = { + let _guard = runtime.enter(); + Endpoint::new(EndpointConfig::default(), Some(config), sock) + .map_err(|_e| QuicServerError::EndpointFailed)? + }; + + let handle = thread::spawn(move || { + let handle = runtime.spawn(async move { + while !exit.load(Ordering::Relaxed) { + const WAIT_FOR_CONNECTION_TIMEOUT_MS: u64 = 1000; + let timeout_connection = timeout( + Duration::from_millis(WAIT_FOR_CONNECTION_TIMEOUT_MS), + incoming.next(), + ) + .await; + + if let Ok(Some(connection)) = timeout_connection { + if let Ok(new_connection) = connection.await { + let exit = exit.clone(); + let quinn::NewConnection { + connection, + mut uni_streams, + .. + } = new_connection; + + let remote_addr = connection.remote_address(); + let packet_sender = packet_sender.clone(); + tokio::spawn(async move { + debug!("new connection {}", remote_addr); + while let Some(Ok(mut stream)) = uni_streams.next().await { + let mut maybe_batch = None; + while !exit.load(Ordering::Relaxed) { + if handle_chunk( + &stream.read_chunk(PACKET_DATA_SIZE, false).await, + &mut maybe_batch, + &remote_addr, + &packet_sender, + ) { + break; + } + } + } + }); + } + } + } + }); + if let Err(e) = runtime.block_on(handle) { + warn!("error from runtime.block_on: {:?}", e); + } + }); + Ok(handle) +} + +#[cfg(test)] +mod test { + use super::*; + use crossbeam_channel::unbounded; + use quinn::{ClientConfig, NewConnection}; + use std::{net::SocketAddr, time::Instant}; + + struct SkipServerVerification; + + impl SkipServerVerification { + fn new() -> Arc { + Arc::new(Self) + } + } + + impl rustls::client::ServerCertVerifier for SkipServerVerification { + fn verify_server_cert( + &self, + _end_entity: &rustls::Certificate, + _intermediates: &[rustls::Certificate], + _server_name: &rustls::ServerName, + _scts: &mut dyn Iterator, + _ocsp_response: &[u8], + _now: std::time::SystemTime, + ) -> Result { + Ok(rustls::client::ServerCertVerified::assertion()) + } + } + + pub fn get_client_config() -> quinn::ClientConfig { + let crypto = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_custom_certificate_verifier(SkipServerVerification::new()) + .with_no_client_auth(); + ClientConfig::new(Arc::new(crypto)) + } + + #[test] + fn test_quic_server_exit() { + let s = UdpSocket::bind("127.0.0.1:0").unwrap(); + let exit = Arc::new(AtomicBool::new(false)); + let (sender, _receiver) = unbounded(); + let keypair = Keypair::new(); + let ip = "127.0.0.1".parse().unwrap(); + let t = spawn_server(s, &keypair, ip, sender, exit.clone()).unwrap(); + exit.store(true, Ordering::Relaxed); + t.join().unwrap(); + } + + fn make_client_endpoint(runtime: &Runtime, addr: &SocketAddr) -> NewConnection { + let client_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); + let mut endpoint = quinn::Endpoint::new(EndpointConfig::default(), None, client_socket) + .unwrap() + .0; + endpoint.set_default_client_config(get_client_config()); + runtime + .block_on(endpoint.connect(*addr, "localhost").unwrap()) + .unwrap() + } + + #[test] + fn test_quic_server_multiple_streams() { + solana_logger::setup(); + let s = UdpSocket::bind("127.0.0.1:0").unwrap(); + let exit = Arc::new(AtomicBool::new(false)); + let (sender, receiver) = unbounded(); + let keypair = Keypair::new(); + let ip = "127.0.0.1".parse().unwrap(); + let server_address = s.local_addr().unwrap(); + let t = spawn_server(s, &keypair, ip, sender, exit.clone()).unwrap(); + + let runtime = rt(); + let _rt_guard = runtime.enter(); + let conn1 = Arc::new(make_client_endpoint(&runtime, &server_address)); + let conn2 = Arc::new(make_client_endpoint(&runtime, &server_address)); + let mut num_expected_packets = 0; + for i in 0..10 { + info!("sending: {}", i); + let c1 = conn1.clone(); + let c2 = conn2.clone(); + let handle = runtime.spawn(async move { + let mut s1 = c1.connection.open_uni().await.unwrap(); + let mut s2 = c2.connection.open_uni().await.unwrap(); + s1.write_all(&[0u8]).await.unwrap(); + s1.finish().await.unwrap(); + s2.write_all(&[0u8]).await.unwrap(); + s2.finish().await.unwrap(); + }); + runtime.block_on(handle).unwrap(); + num_expected_packets += 2; + thread::sleep(Duration::from_millis(200)); + } + let mut all_packets = vec![]; + let now = Instant::now(); + let mut total_packets = 0; + while now.elapsed().as_secs() < 10 { + if let Ok(packets) = receiver.recv_timeout(Duration::from_secs(1)) { + total_packets += packets.packets.len(); + all_packets.push(packets) + } + if total_packets == num_expected_packets { + break; + } + } + for batch in all_packets { + for p in &batch.packets { + assert_eq!(p.meta.size, 1); + } + } + assert_eq!(total_packets, num_expected_packets); + + exit.store(true, Ordering::Relaxed); + t.join().unwrap(); + } + + #[test] + fn test_quic_server_multiple_writes() { + solana_logger::setup(); + let s = UdpSocket::bind("127.0.0.1:0").unwrap(); + let exit = Arc::new(AtomicBool::new(false)); + let (sender, receiver) = unbounded(); + let keypair = Keypair::new(); + let ip = "127.0.0.1".parse().unwrap(); + let server_address = s.local_addr().unwrap(); + let t = spawn_server(s, &keypair, ip, sender, exit.clone()).unwrap(); + + let runtime = rt(); + let _rt_guard = runtime.enter(); + let conn1 = Arc::new(make_client_endpoint(&runtime, &server_address)); + + // Send a full size packet with single byte writes. + let num_bytes = PACKET_DATA_SIZE; + let num_expected_packets = 1; + let handle = runtime.spawn(async move { + let mut s1 = conn1.connection.open_uni().await.unwrap(); + for _ in 0..num_bytes { + s1.write_all(&[0u8]).await.unwrap(); + } + s1.finish().await.unwrap(); + }); + runtime.block_on(handle).unwrap(); + + let mut all_packets = vec![]; + let now = Instant::now(); + let mut total_packets = 0; + while now.elapsed().as_secs() < 5 { + if let Ok(packets) = receiver.recv_timeout(Duration::from_secs(1)) { + total_packets += packets.packets.len(); + all_packets.push(packets) + } + if total_packets > num_expected_packets { + break; + } + } + for batch in all_packets { + for p in &batch.packets { + assert_eq!(p.meta.size, num_bytes); + } + } + assert_eq!(total_packets, num_expected_packets); + + exit.store(true, Ordering::Relaxed); + t.join().unwrap(); + } +}