diff --git a/Cargo.lock b/Cargo.lock index 026238d21a..5fb1bd5df8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -134,6 +134,45 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eab1c04a571841102f5345a8fc0f6bb3d31c315dec879b5c6e42e40ce7ffa34e" +[[package]] +name = "asn1-rs" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf6690c370453db30743b373a60ba498fc0d6d83b11f4abfd87a84a075db5dd4" +dependencies = [ + "asn1-rs-derive", + "asn1-rs-impl", + "displaydoc", + "nom 7.0.0", + "num-traits", + "rusticata-macros", + "thiserror", + "time 0.3.9", +] + +[[package]] +name = "asn1-rs-derive" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "726535892e8eae7e70657b4c8ea93d26b8553afb1ce617caee529ef96d7dee6c" +dependencies = [ + "proc-macro2 1.0.38", + "quote 1.0.18", + "syn 1.0.93", + "synstructure", +] + +[[package]] +name = "asn1-rs-impl" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2777730b2039ac0f95f093556e61b6d26cebed5393ca6f152717777cec3a42ed" +dependencies = [ + "proc-macro2 1.0.38", + "quote 1.0.18", + "syn 1.0.93", +] + [[package]] name = "assert_cmd" version = "2.0.4" @@ -1080,6 +1119,12 @@ dependencies = [ "rayon", ] +[[package]] +name = "data-encoding" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57" + [[package]] name = "der" version = "0.5.1" @@ -1089,6 +1134,20 @@ dependencies = [ "const-oid", ] +[[package]] +name = "der-parser" +version = "8.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42d4bc9b0db0a0df9ae64634ac5bdefb7afcb534e182275ca0beadbe486701c1" +dependencies = [ + "asn1-rs", + "displaydoc", + "nom 7.0.0", + "num-bigint 0.4.3", + "num-traits", + "rusticata-macros", +] + [[package]] name = "derivation-path" version = "0.2.0" @@ -1184,6 +1243,17 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "displaydoc" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bf95dc3f046b9da4f2d51833c0d3547d8564ef6910f5c1ed130306a75b92886" +dependencies = [ + "proc-macro2 1.0.38", + "quote 1.0.18", + "syn 1.0.93", +] + [[package]] name = "dlopen" version = "0.1.8" @@ -2832,6 +2902,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" +[[package]] +name = "oid-registry" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d4bda43fd1b844cbc6e6e54b5444e2b1bc7838bce59ad205902cccbb26d6761" +dependencies = [ + "asn1-rs", +] + [[package]] name = "once_cell" version = "1.12.0" @@ -3910,6 +3989,15 @@ dependencies = [ "semver 1.0.10", ] +[[package]] +name = "rusticata-macros" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faf0c4a6ece9950b9abdb62b1cfcf2a68b3b67a10ba445b3bb85be2a293d0632" +dependencies = [ + "nom 7.0.0", +] + [[package]] name = "rustix" version = "0.34.3" @@ -4849,11 +4937,13 @@ dependencies = [ "jsonrpc-http-server", "lazy_static", "log", + "pkcs8", "quinn", "quinn-proto", "rand 0.7.3", "rand_chacha 0.2.2", "rayon", + "rcgen", "reqwest", "rustls 0.20.6", "semver 1.0.10", @@ -6203,6 +6293,7 @@ dependencies = [ "solana-sdk 1.11.3", "thiserror", "tokio", + "x509-parser", ] [[package]] @@ -6948,10 +7039,18 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2702e08a7a860f005826c6815dcac101b19b5eb330c27fe4a5928fec1d20ddd" dependencies = [ + "itoa 1.0.1", "libc", "num_threads", + "time-macros", ] +[[package]] +name = "time-macros" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42657b1a6f4d817cda8e7a0ace261fe0cc946cf3a80314390b22cc61ae080792" + [[package]] name = "tiny-bip39" version = "0.8.2" @@ -7858,6 +7957,24 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214" +[[package]] +name = "x509-parser" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0ecbeb7b67ce215e40e3cc7f2ff902f94a223acf44995934763467e7b1febc8" +dependencies = [ + "asn1-rs", + "base64 0.13.0", + "data-encoding", + "der-parser", + "lazy_static", + "nom 7.0.0", + "oid-registry", + "rusticata-macros", + "thiserror", + "time 0.3.9", +] + [[package]] name = "xattr" version = "0.2.2" diff --git a/client/Cargo.toml b/client/Cargo.toml index 3d6bcc8d2a..6a4a8f6da1 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -27,11 +27,13 @@ itertools = "0.10.2" jsonrpc-core = "18.0.0" lazy_static = "1.4.0" log = "0.4.17" +pkcs8 = { version = "0.8.0", features = ["alloc"] } quinn = "0.8.3" quinn-proto = "0.8.3" rand = "0.7.0" rand_chacha = "0.2.2" rayon = "1.5.3" +rcgen = "0.9.2" reqwest = { version = "0.11.11", default-features = false, features = ["blocking", "brotli", "deflate", "gzip", "rustls-tls", "json"] } rustls = { version = "0.20.6", features = ["dangerous_configuration"] } semver = "1.0.10" diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index 34a56cd24a..8dad17fc9a 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -1,16 +1,19 @@ use { crate::{ nonblocking::{ - quic_client::{QuicClient, QuicLazyInitializedEndpoint}, + quic_client::{QuicClient, QuicClientCertificate, QuicLazyInitializedEndpoint}, tpu_connection::NonblockingConnection, }, tpu_connection::{BlockingConnection, ClientStats}, }, indexmap::map::{Entry, IndexMap}, + pkcs8::{der::Document, AlgorithmIdentifier, ObjectIdentifier}, rand::{thread_rng, Rng}, + rcgen::{CertificateParams, DistinguishedName, DnType, SanType}, solana_measure::measure::Measure, - solana_sdk::{quic::QUIC_PORT_OFFSET, timing::AtomicInterval}, + solana_sdk::{quic::QUIC_PORT_OFFSET, signature::Keypair, timing::AtomicInterval}, std::{ + error::Error, net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, sync::{ atomic::{AtomicU64, Ordering}, @@ -224,6 +227,7 @@ pub struct ConnectionCache { last_stats: AtomicInterval, connection_pool_size: usize, tpu_udp_socket: Option>, + client_certificate: Arc, } /// Models the pool of connections @@ -251,6 +255,68 @@ impl ConnectionPool { } } +pub(crate) fn new_cert( + identity_keypair: &Keypair, + san: IpAddr, +) -> Result> { + // Generate a self-signed cert from client's identity key + let cert_params = new_cert_params(identity_keypair, san); + let cert = rcgen::Certificate::from_params(cert_params)?; + let cert_der = cert.serialize_der().unwrap(); + let priv_key = cert.serialize_private_key_der(); + let priv_key = rustls::PrivateKey(priv_key); + Ok(QuicClientCertificate { + certificates: vec![rustls::Certificate(cert_der)], + key: priv_key, + }) +} + +fn convert_to_rcgen_keypair(identity_keypair: &Keypair) -> rcgen::KeyPair { + // from https://datatracker.ietf.org/doc/html/rfc8410#section-3 + const ED25519_IDENTIFIER: [u32; 4] = [1, 3, 101, 112]; + let mut private_key = Vec::::with_capacity(34); + private_key.extend_from_slice(&[0x04, 0x20]); // ASN.1 OCTET STRING + private_key.extend_from_slice(identity_keypair.secret().as_bytes()); + let key_pkcs8 = pkcs8::PrivateKeyInfo { + algorithm: AlgorithmIdentifier { + oid: ObjectIdentifier::from_arcs(&ED25519_IDENTIFIER).unwrap(), + parameters: None, + }, + private_key: &private_key, + public_key: None, + }; + let key_pkcs8_der = key_pkcs8 + .to_der() + .expect("Failed to convert keypair to DER") + .to_der(); + + // Parse private key into rcgen::KeyPair struct. + rcgen::KeyPair::from_der(&key_pkcs8_der).expect("Failed to parse keypair from DER") +} + +fn new_cert_params(identity_keypair: &Keypair, san: IpAddr) -> CertificateParams { + // TODO(terorie): Is it safe to sign the TLS cert with the identity private key? + + // Unfortunately, rcgen does not accept a "raw" Ed25519 key. + // We have to convert it to DER and pass it to the library. + + // Convert private key into PKCS#8 v1 object. + // RFC 8410, Section 7: Private Key Format + // https://datatracker.ietf.org/doc/html/rfc8410#section- + + let keypair = convert_to_rcgen_keypair(identity_keypair); + + let mut cert_params = CertificateParams::default(); + cert_params.subject_alt_names = vec![SanType::IpAddress(san)]; + cert_params.alg = &rcgen::PKCS_ED25519; + cert_params.key_pair = Some(keypair); + cert_params.distinguished_name = DistinguishedName::new(); + cert_params + .distinguished_name + .push(DnType::CommonName, "Solana node"); + cert_params +} + impl ConnectionCache { pub fn new(connection_pool_size: usize) -> Self { // The minimum pool size is 1. @@ -262,6 +328,16 @@ impl ConnectionCache { } } + pub fn update_client_certificate( + &mut self, + keypair: &Keypair, + ipaddr: IpAddr, + ) -> Result<(), Box> { + let cert = new_cert(keypair, ipaddr)?; + self.client_certificate = Arc::new(cert); + Ok(()) + } + pub fn with_udp(connection_pool_size: usize) -> Self { // The minimum pool size is 1. let connection_pool_size = 1.max(connection_pool_size); @@ -277,7 +353,9 @@ impl ConnectionCache { fn create_endpoint(&self) -> Option> { if self.use_quic() { - Some(Arc::new(QuicLazyInitializedEndpoint::new())) + Some(Arc::new(QuicLazyInitializedEndpoint::new( + self.client_certificate.clone(), + ))) } else { None } @@ -499,6 +577,9 @@ impl Default for ConnectionCache { .expect("Unable to bind to UDP socket"), ) }), + client_certificate: new_cert(&Keypair::new(), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))) + .map(Arc::new) + .expect("Failed to initialize QUIC client certificates"), } } } diff --git a/client/src/nonblocking/quic_client.rs b/client/src/nonblocking/quic_client.rs index 0456f873c3..c9a8caeafb 100644 --- a/client/src/nonblocking/quic_client.rs +++ b/client/src/nonblocking/quic_client.rs @@ -3,8 +3,10 @@ //! server's flow control. use { crate::{ - client_error::ClientErrorKind, connection_cache::ConnectionCacheStats, - nonblocking::tpu_connection::TpuConnection, tpu_connection::ClientStats, + client_error::ClientErrorKind, + connection_cache::{new_cert, ConnectionCacheStats}, + nonblocking::tpu_connection::TpuConnection, + tpu_connection::ClientStats, }, async_mutex::Mutex, async_trait::async_trait, @@ -22,6 +24,7 @@ use { QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, }, + signature::Keypair, transport::Result as TransportResult, }, std::{ @@ -55,19 +58,26 @@ impl rustls::client::ServerCertVerifier for SkipServerVerification { } } +pub struct QuicClientCertificate { + pub certificates: Vec, + pub key: rustls::PrivateKey, +} + /// A lazy-initialized Quic Endpoint pub struct QuicLazyInitializedEndpoint { endpoint: RwLock>>, + client_certificate: Arc, } impl QuicLazyInitializedEndpoint { - pub fn new() -> Self { + pub fn new(client_certificate: Arc) -> Self { Self { endpoint: RwLock::new(None), + client_certificate, } } - fn create_endpoint() -> Endpoint { + fn create_endpoint(&self) -> Endpoint { let (_, client_socket) = solana_net_utils::bind_in_range( IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), VALIDATOR_PORT_RANGE, @@ -77,7 +87,11 @@ impl QuicLazyInitializedEndpoint { let mut crypto = rustls::ClientConfig::builder() .with_safe_defaults() .with_custom_certificate_verifier(SkipServerVerification::new()) - .with_no_client_auth(); + .with_single_cert( + self.client_certificate.certificates.clone(), + self.client_certificate.key.clone(), + ) + .expect("Failed to set QUIC client certificates"); crypto.enable_early_data = true; let mut endpoint = @@ -108,7 +122,7 @@ impl QuicLazyInitializedEndpoint { match endpoint { Some(endpoint) => endpoint.clone(), None => { - let connection = Arc::new(Self::create_endpoint()); + let connection = Arc::new(self.create_endpoint()); *lock = Some(connection.clone()); connection } @@ -120,7 +134,9 @@ impl QuicLazyInitializedEndpoint { impl Default for QuicLazyInitializedEndpoint { fn default() -> Self { - Self::new() + let certificate = new_cert(&Keypair::new(), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))) + .expect("Failed to create QUIC client certificate"); + Self::new(Arc::new(certificate)) } } diff --git a/core/src/find_packet_sender_stake_stage.rs b/core/src/find_packet_sender_stake_stage.rs index ec02d080c9..d62d6afe7c 100644 --- a/core/src/find_packet_sender_stake_stage.rs +++ b/core/src/find_packet_sender_stake_stage.rs @@ -105,7 +105,7 @@ impl FindPacketSenderStakeStage { Measure::start("apply_sender_stakes_time"); let mut apply_stake = || { let ip_to_stake = staked_nodes.read().unwrap(); - Self::apply_sender_stakes(&mut batches, &ip_to_stake.stake_map); + Self::apply_sender_stakes(&mut batches, &ip_to_stake.ip_stake_map); }; apply_stake(); apply_sender_stakes_time.stop(); diff --git a/core/src/staked_nodes_updater_service.rs b/core/src/staked_nodes_updater_service.rs index 23a3587b0d..a5bd351859 100644 --- a/core/src/staked_nodes_updater_service.rs +++ b/core/src/staked_nodes_updater_service.rs @@ -1,6 +1,7 @@ use { solana_gossip::cluster_info::ClusterInfo, solana_runtime::bank_forks::BankForks, + solana_sdk::pubkey::Pubkey, solana_streamer::streamer::StakedNodes, std::{ collections::HashMap, @@ -33,17 +34,20 @@ impl StakedNodesUpdaterService { let mut last_stakes = Instant::now(); while !exit.load(Ordering::Relaxed) { let mut new_ip_to_stake = HashMap::new(); + let mut new_id_to_stake = HashMap::new(); let mut total_stake = 0; - if Self::try_refresh_ip_to_stake( + if Self::try_refresh_stake_maps( &mut last_stakes, &mut new_ip_to_stake, + &mut new_id_to_stake, &mut total_stake, &bank_forks, &cluster_info, ) { let mut shared = shared_staked_nodes.write().unwrap(); shared.total_stake = total_stake; - shared.stake_map = new_ip_to_stake; + shared.ip_stake_map = new_ip_to_stake; + shared.pubkey_stake_map = new_id_to_stake; } } }) @@ -52,9 +56,10 @@ impl StakedNodesUpdaterService { Self { thread_hdl } } - fn try_refresh_ip_to_stake( + fn try_refresh_stake_maps( last_stakes: &mut Instant, ip_to_stake: &mut HashMap, + id_to_stake: &mut HashMap, total_stake: &mut u64, bank_forks: &RwLock, cluster_info: &ClusterInfo, @@ -66,6 +71,14 @@ impl StakedNodesUpdaterService { .iter() .map(|(_pubkey, stake)| stake) .sum::(); + *id_to_stake = cluster_info + .tvu_peers() + .into_iter() + .filter_map(|node| { + let stake = staked_nodes.get(&node.id)?; + Some((node.id, *stake)) + }) + .collect(); *ip_to_stake = cluster_info .tvu_peers() .into_iter() diff --git a/core/src/validator.rs b/core/src/validator.rs index fc4228cc11..d9cd778efe 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -758,7 +758,13 @@ impl Validator { let poh_recorder = Arc::new(RwLock::new(poh_recorder)); let connection_cache = match use_quic { - true => Arc::new(ConnectionCache::new(tpu_connection_pool_size)), + true => { + let mut connection_cache = ConnectionCache::new(tpu_connection_pool_size); + connection_cache + .update_client_certificate(&identity_keypair, node.info.gossip.ip()) + .expect("Failed to update QUIC client certificates"); + Arc::new(connection_cache) + } false => Arc::new(ConnectionCache::with_udp(tpu_connection_pool_size)), }; diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 80e426fdc2..fd59110bad 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -134,6 +134,45 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eab1c04a571841102f5345a8fc0f6bb3d31c315dec879b5c6e42e40ce7ffa34e" +[[package]] +name = "asn1-rs" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf6690c370453db30743b373a60ba498fc0d6d83b11f4abfd87a84a075db5dd4" +dependencies = [ + "asn1-rs-derive", + "asn1-rs-impl", + "displaydoc", + "nom", + "num-traits", + "rusticata-macros", + "thiserror", + "time 0.3.9", +] + +[[package]] +name = "asn1-rs-derive" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "726535892e8eae7e70657b4c8ea93d26b8553afb1ce617caee529ef96d7dee6c" +dependencies = [ + "proc-macro2 1.0.38", + "quote 1.0.18", + "syn 1.0.93", + "synstructure", +] + +[[package]] +name = "asn1-rs-impl" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2777730b2039ac0f95f093556e61b6d26cebed5393ca6f152717777cec3a42ed" +dependencies = [ + "proc-macro2 1.0.38", + "quote 1.0.18", + "syn 1.0.93", +] + [[package]] name = "assert_matches" version = "1.5.0" @@ -902,6 +941,12 @@ dependencies = [ "rayon", ] +[[package]] +name = "data-encoding" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57" + [[package]] name = "der" version = "0.5.1" @@ -911,6 +956,20 @@ dependencies = [ "const-oid", ] +[[package]] +name = "der-parser" +version = "8.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42d4bc9b0db0a0df9ae64634ac5bdefb7afcb534e182275ca0beadbe486701c1" +dependencies = [ + "asn1-rs", + "displaydoc", + "nom", + "num-bigint 0.4.3", + "num-traits", + "rusticata-macros", +] + [[package]] name = "derivation-path" version = "0.2.0" @@ -1000,6 +1059,17 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "displaydoc" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bf95dc3f046b9da4f2d51833c0d3547d8564ef6910f5c1ed130306a75b92886" +dependencies = [ + "proc-macro2 1.0.38", + "quote 1.0.18", + "syn 1.0.93", +] + [[package]] name = "dlopen" version = "0.1.8" @@ -2560,6 +2630,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" +[[package]] +name = "oid-registry" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d4bda43fd1b844cbc6e6e54b5444e2b1bc7838bce59ad205902cccbb26d6761" +dependencies = [ + "asn1-rs", +] + [[package]] name = "once_cell" version = "1.12.0" @@ -3485,6 +3564,15 @@ dependencies = [ "semver", ] +[[package]] +name = "rusticata-macros" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faf0c4a6ece9950b9abdb62b1cfcf2a68b3b67a10ba445b3bb85be2a293d0632" +dependencies = [ + "nom", +] + [[package]] name = "rustix" version = "0.34.4" @@ -4505,11 +4593,13 @@ dependencies = [ "jsonrpc-core", "lazy_static", "log", + "pkcs8", "quinn", "quinn-proto", "rand 0.7.3", "rand_chacha 0.2.2", "rayon", + "rcgen", "reqwest", "rustls 0.20.6", "semver", @@ -5489,6 +5579,7 @@ dependencies = [ "solana-sdk 1.11.3", "thiserror", "tokio", + "x509-parser", ] [[package]] @@ -6096,10 +6187,18 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2702e08a7a860f005826c6815dcac101b19b5eb330c27fe4a5928fec1d20ddd" dependencies = [ + "itoa", "libc", "num_threads", + "time-macros", ] +[[package]] +name = "time-macros" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42657b1a6f4d817cda8e7a0ace261fe0cc946cf3a80314390b22cc61ae080792" + [[package]] name = "tiny-bip39" version = "0.8.2" @@ -6995,6 +7094,24 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "x509-parser" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0ecbeb7b67ce215e40e3cc7f2ff902f94a223acf44995934763467e7b1febc8" +dependencies = [ + "asn1-rs", + "base64 0.13.0", + "data-encoding", + "der-parser", + "lazy_static", + "nom", + "oid-registry", + "rusticata-macros", + "thiserror", + "time 0.3.9", +] + [[package]] name = "xattr" version = "0.2.2" diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index d1f3e78db9..c1e4961438 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -30,6 +30,7 @@ solana-perf = { path = "../perf", version = "=1.11.3" } solana-sdk = { path = "../sdk", version = "=1.11.3" } thiserror = "1.0" tokio = { version = "~1.14.1", features = ["full"] } +x509-parser = "0.14.0" [dev-dependencies] solana-logger = { path = "../logger", version = "=1.11.3" } diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index d495070cdb..679eb0d7c8 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -15,6 +15,7 @@ use { solana_perf::packet::PacketBatch, solana_sdk::{ packet::{Packet, PACKET_DATA_SIZE}, + pubkey::Pubkey, quic::{QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS}, signature::Keypair, timing, @@ -31,6 +32,7 @@ use { task::JoinHandle, time::{sleep, timeout}, }, + x509_parser::{prelude::*, public_key::PublicKey}, }; const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: f64 = 100_000f64; @@ -132,6 +134,31 @@ fn prune_unstaked_connection_table( } } +fn get_connection_stake(connection: &Connection, staked_nodes: Arc>) -> u64 { + connection + .peer_identity() + .and_then(|der_cert_any| der_cert_any.downcast::>().ok()) + .and_then(|der_certs| { + der_certs.first().and_then(|der_cert| { + X509Certificate::from_der(der_cert.as_ref()) + .ok() + .and_then(|(_, cert)| { + cert.public_key().parsed().ok().and_then(|key| match key { + PublicKey::Unknown(inner_key) => { + let pubkey = Pubkey::new(inner_key); + debug!("Peer public key is {:?}", pubkey); + + let staked_nodes = staked_nodes.read().unwrap(); + staked_nodes.pubkey_stake_map.get(&pubkey).copied() + } + _ => None, + }) + }) + }) + }) + .unwrap_or(0) +} + async fn setup_connection( connecting: Connecting, unstaked_connection_table: Arc>, @@ -161,11 +188,8 @@ async fn setup_connection( let remote_addr = connection.remote_address(); let table_and_stake = { - let staked_nodes = staked_nodes.read().unwrap(); - if let Some(stake) = staked_nodes.stake_map.get(&remote_addr.ip()) { - let stake = *stake; - drop(staked_nodes); - + let stake = get_connection_stake(&connection, staked_nodes.clone()); + if stake > 0 { let mut connection_table_l = staked_connection_table.lock().unwrap(); if connection_table_l.total_size >= max_staked_connections { let num_pruned = connection_table_l.prune_random(stake); @@ -195,7 +219,6 @@ async fn setup_connection( Some((connection_table_l, stake)) } } else if max_unstaked_connections > 0 { - drop(staked_nodes); let mut connection_table_l = unstaked_connection_table.lock().unwrap(); prune_unstaked_connection_table( &mut connection_table_l, @@ -237,8 +260,18 @@ async fn setup_connection( drop(connection_table_l); let stats = stats.clone(); let connection_table = match table_type { - ConnectionPeerType::Unstaked => unstaked_connection_table.clone(), - ConnectionPeerType::Staked => staked_connection_table.clone(), + ConnectionPeerType::Unstaked => { + stats + .connection_added_from_unstaked_peer + .fetch_add(1, Ordering::Relaxed); + unstaked_connection_table.clone() + } + ConnectionPeerType::Staked => { + stats + .connection_added_from_staked_peer + .fetch_add(1, Ordering::Relaxed); + staked_connection_table.clone() + } }; tokio::spawn(handle_connection( uni_streams, @@ -624,12 +657,13 @@ impl ConnectionTable { pub mod test { use { super::*, - crate::quic::{MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, + crate::quic::{new_cert, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, crossbeam_channel::{unbounded, Receiver}, quinn::{ClientConfig, IdleTimeout, VarInt}, solana_sdk::{ quic::{QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS}, signature::Keypair, + signer::Signer, }, std::net::Ipv4Addr, tokio::time::sleep, @@ -657,11 +691,19 @@ pub mod test { } } - pub fn get_client_config() -> ClientConfig { - let crypto = rustls::ClientConfig::builder() + pub fn get_client_config(keypair: &Keypair) -> ClientConfig { + let ipaddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); + let (certs, key) = + new_cert(keypair, ipaddr).expect("Failed to generate client certificate"); + + let mut crypto = rustls::ClientConfig::builder() .with_safe_defaults() .with_custom_certificate_verifier(SkipServerVerification::new()) - .with_no_client_auth(); + .with_single_cert(certs, key) + .expect("Failed to use client certificate"); + + crypto.enable_early_data = true; + let mut config = ClientConfig::new(Arc::new(crypto)); let transport_config = Arc::get_mut(&mut config.transport).unwrap(); @@ -705,17 +747,27 @@ pub mod test { (t, exit, receiver, server_address, stats) } - pub async fn make_client_endpoint(addr: &SocketAddr) -> NewConnection { + pub async fn make_client_endpoint( + addr: &SocketAddr, + client_keypair: Option<&Keypair>, + ) -> NewConnection { let client_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); let mut endpoint = quinn::Endpoint::new(EndpointConfig::default(), None, client_socket) .unwrap() .0; - endpoint.set_default_client_config(get_client_config()); - endpoint.connect(*addr, "localhost").unwrap().await.unwrap() + let default_keypair = Keypair::new(); + endpoint.set_default_client_config(get_client_config( + client_keypair.unwrap_or(&default_keypair), + )); + endpoint + .connect(*addr, "localhost") + .expect("Failed in connecting") + .await + .expect("Failed in waiting") } pub async fn check_timeout(receiver: Receiver, server_address: SocketAddr) { - let conn1 = make_client_endpoint(&server_address).await; + let conn1 = make_client_endpoint(&server_address, None).await; let total = 30; for i in 0..total { let mut s1 = conn1.connection.open_uni().await.unwrap(); @@ -737,8 +789,8 @@ pub mod test { } pub async fn check_block_multiple_connections(server_address: SocketAddr) { - let conn1 = make_client_endpoint(&server_address).await; - let conn2 = make_client_endpoint(&server_address).await; + let conn1 = make_client_endpoint(&server_address, None).await; + let conn2 = make_client_endpoint(&server_address, None).await; let mut s1 = conn1.connection.open_uni().await.unwrap(); let mut s2 = conn2.connection.open_uni().await.unwrap(); s1.write_all(&[0u8]).await.unwrap(); @@ -759,8 +811,8 @@ pub mod test { receiver: Receiver, server_address: SocketAddr, ) { - let conn1 = Arc::new(make_client_endpoint(&server_address).await); - let conn2 = Arc::new(make_client_endpoint(&server_address).await); + let conn1 = Arc::new(make_client_endpoint(&server_address, None).await); + let conn2 = Arc::new(make_client_endpoint(&server_address, None).await); let mut num_expected_packets = 0; for i in 0..10 { info!("sending: {}", i); @@ -798,8 +850,9 @@ pub mod test { pub async fn check_multiple_writes( receiver: Receiver, server_address: SocketAddr, + client_keypair: Option<&Keypair>, ) { - let conn1 = Arc::new(make_client_endpoint(&server_address).await); + let conn1 = Arc::new(make_client_endpoint(&server_address, client_keypair).await); // Send a full size packet with single byte writes. let num_bytes = PACKET_DATA_SIZE; @@ -831,7 +884,7 @@ pub mod test { } pub async fn check_unstaked_node_connect_failure(server_address: SocketAddr) { - let conn1 = Arc::new(make_client_endpoint(&server_address).await); + let conn1 = Arc::new(make_client_endpoint(&server_address, None).await); // Send a full size packet with single byte writes. if let Ok(mut s1) = conn1.connection.open_uni().await { @@ -864,7 +917,7 @@ pub mod test { solana_logger::setup(); let (t, exit, _receiver, server_address, stats) = setup_quic_server(None); - let conn1 = make_client_endpoint(&server_address).await; + let conn1 = make_client_endpoint(&server_address, None).await; assert_eq!(stats.total_streams.load(Ordering::Relaxed), 0); assert_eq!(stats.total_stream_read_timeouts.load(Ordering::Relaxed), 0); @@ -904,7 +957,7 @@ pub mod test { async fn test_quic_server_multiple_writes() { solana_logger::setup(); let (t, exit, receiver, server_address, _stats) = setup_quic_server(None); - check_multiple_writes(receiver, server_address).await; + check_multiple_writes(receiver, server_address, None).await; exit.store(true, Ordering::Relaxed); t.await.unwrap(); } @@ -913,16 +966,51 @@ pub mod test { async fn test_quic_server_staked_connection_removal() { solana_logger::setup(); + let client_keypair = Keypair::new(); let mut staked_nodes = StakedNodes::default(); staked_nodes - .stake_map - .insert(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 100000); + .pubkey_stake_map + .insert(client_keypair.pubkey(), 100000); staked_nodes.total_stake = 100000; let (t, exit, receiver, server_address, stats) = setup_quic_server(Some(staked_nodes)); - check_multiple_writes(receiver, server_address).await; + check_multiple_writes(receiver, server_address, Some(&client_keypair)).await; exit.store(true, Ordering::Relaxed); t.await.unwrap(); + sleep(Duration::from_millis(100)).await; + assert_eq!( + stats + .connection_added_from_unstaked_peer + .load(Ordering::Relaxed), + 0 + ); + assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 1); + assert_eq!(stats.connection_remove_failed.load(Ordering::Relaxed), 0); + } + + #[tokio::test] + async fn test_quic_server_zero_staked_connection_removal() { + // In this test, the client has a pubkey, but is not in stake table. + solana_logger::setup(); + + let client_keypair = Keypair::new(); + let mut staked_nodes = StakedNodes::default(); + staked_nodes + .pubkey_stake_map + .insert(client_keypair.pubkey(), 0); + staked_nodes.total_stake = 0; + + let (t, exit, receiver, server_address, stats) = setup_quic_server(Some(staked_nodes)); + check_multiple_writes(receiver, server_address, Some(&client_keypair)).await; + exit.store(true, Ordering::Relaxed); + t.await.unwrap(); + sleep(Duration::from_millis(100)).await; + assert_eq!( + stats + .connection_added_from_staked_peer + .load(Ordering::Relaxed), + 0 + ); assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 1); assert_eq!(stats.connection_remove_failed.load(Ordering::Relaxed), 0); } @@ -931,9 +1019,16 @@ pub mod test { async fn test_quic_server_unstaked_connection_removal() { solana_logger::setup(); let (t, exit, receiver, server_address, stats) = setup_quic_server(None); - check_multiple_writes(receiver, server_address).await; + check_multiple_writes(receiver, server_address, None).await; exit.store(true, Ordering::Relaxed); t.await.unwrap(); + sleep(Duration::from_millis(100)).await; + assert_eq!( + stats + .connection_added_from_staked_peer + .load(Ordering::Relaxed), + 0 + ); assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 1); assert_eq!(stats.connection_remove_failed.load(Ordering::Relaxed), 0); } diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 1e20e10aa0..c5fef2a5e9 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -5,6 +5,7 @@ use { pkcs8::{der::Document, AlgorithmIdentifier, ObjectIdentifier}, quinn::{IdleTimeout, ServerConfig, VarInt}, rcgen::{CertificateParams, DistinguishedName, DnType, SanType}, + rustls::{server::ClientCertVerified, Certificate, DistinguishedNames}, solana_perf::packet::PacketBatch, solana_sdk::{ packet::PACKET_DATA_SIZE, @@ -19,6 +20,7 @@ use { Arc, RwLock, }, thread, + time::SystemTime, }, tokio::runtime::{Builder, Runtime}, }; @@ -27,6 +29,29 @@ pub const MAX_STAKED_CONNECTIONS: usize = 2000; pub const MAX_UNSTAKED_CONNECTIONS: usize = 500; const NUM_QUIC_STREAMER_WORKER_THREADS: usize = 4; +struct SkipClientVerification; + +impl SkipClientVerification { + pub fn new() -> Arc { + Arc::new(Self) + } +} + +impl rustls::server::ClientCertVerifier for SkipClientVerification { + fn client_auth_root_subjects(&self) -> Option { + Some(DistinguishedNames::new()) + } + + fn verify_client_cert( + &self, + _end_entity: &Certificate, + _intermediates: &[Certificate], + _now: SystemTime, + ) -> Result { + Ok(rustls::server::ClientCertVerified::assertion()) + } +} + /// Returns default server configuration along with its PEM certificate chain. #[allow(clippy::field_reassign_with_default)] // https://github.com/rust-lang/rust-clippy/issues/6527 pub(crate) fn configure_server( @@ -44,8 +69,13 @@ pub(crate) fn configure_server( .collect(); let cert_chain_pem = pem::encode_many(&cert_chain_pem_parts); - let mut server_config = ServerConfig::with_single_cert(cert_chain, priv_key) + let server_tls_config = rustls::ServerConfig::builder() + .with_safe_defaults() + .with_client_cert_verifier(SkipClientVerification::new()) + .with_single_cert(cert_chain, priv_key) .map_err(|_e| QuicServerError::ConfigureFailed)?; + + let mut server_config = ServerConfig::with_crypto(Arc::new(server_tls_config)); let config = Arc::get_mut(&mut server_config.transport).unwrap(); // QUIC_MAX_CONCURRENT_STREAMS doubled, which was found to improve reliability @@ -64,7 +94,7 @@ pub(crate) fn configure_server( Ok((server_config, cert_chain_pem)) } -fn new_cert( +pub(crate) fn new_cert( identity_keypair: &Keypair, san: IpAddr, ) -> Result<(Vec, rustls::PrivateKey), Box> { @@ -157,6 +187,8 @@ pub struct StreamStats { pub(crate) total_stream_read_errors: AtomicUsize, pub(crate) total_stream_read_timeouts: AtomicUsize, pub(crate) num_evictions: AtomicUsize, + pub(crate) connection_added_from_staked_peer: AtomicUsize, + pub(crate) connection_added_from_unstaked_peer: AtomicUsize, pub(crate) connection_add_failed: AtomicUsize, pub(crate) connection_add_failed_invalid_stream_count: AtomicUsize, pub(crate) connection_add_failed_unstaked_node: AtomicUsize, @@ -196,6 +228,18 @@ impl StreamStats { self.num_evictions.swap(0, Ordering::Relaxed), i64 ), + ( + "connection_added_from_staked_peer", + self.connection_added_from_staked_peer + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "connection_added_from_unstaked_peer", + self.connection_added_from_unstaked_peer + .swap(0, Ordering::Relaxed), + i64 + ), ( "connection_add_failed", self.connection_add_failed.swap(0, Ordering::Relaxed), @@ -427,7 +471,7 @@ mod test { let (t, exit, receiver, server_address) = setup_quic_server(); let runtime = rt(); - runtime.block_on(check_multiple_writes(receiver, server_address)); + runtime.block_on(check_multiple_writes(receiver, server_address, None)); exit.store(true, Ordering::Relaxed); t.join().unwrap(); } diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 4dad613535..4d8ee2d1c0 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -9,7 +9,7 @@ use { }, crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender}, histogram::Histogram, - solana_sdk::{packet::Packet, timing::timestamp}, + solana_sdk::{packet::Packet, pubkey::Pubkey, timing::timestamp}, std::{ cmp::Reverse, collections::HashMap, @@ -28,7 +28,8 @@ use { #[derive(Default)] pub struct StakedNodes { pub total_stake: u64, - pub stake_map: HashMap, + pub ip_stake_map: HashMap, + pub pubkey_stake_map: HashMap, } pub type PacketBatchReceiver = Receiver;