Use client certs in QUIC to get peer's stake (#26477)

* Use client certs in QUIC to get peer's stake

* fixes to cert processing

* integrate the code

* clippy

* more cleanup

* sort cargo deps

* test fixes

* info -> debug
This commit is contained in:
Pankaj Garg 2022-07-11 11:06:40 -07:00 committed by GitHub
parent a3b094300b
commit ea7448c568
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 541 additions and 48 deletions

117
Cargo.lock generated
View File

@ -134,6 +134,45 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eab1c04a571841102f5345a8fc0f6bb3d31c315dec879b5c6e42e40ce7ffa34e"
[[package]]
name = "asn1-rs"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf6690c370453db30743b373a60ba498fc0d6d83b11f4abfd87a84a075db5dd4"
dependencies = [
"asn1-rs-derive",
"asn1-rs-impl",
"displaydoc",
"nom 7.0.0",
"num-traits",
"rusticata-macros",
"thiserror",
"time 0.3.9",
]
[[package]]
name = "asn1-rs-derive"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "726535892e8eae7e70657b4c8ea93d26b8553afb1ce617caee529ef96d7dee6c"
dependencies = [
"proc-macro2 1.0.38",
"quote 1.0.18",
"syn 1.0.93",
"synstructure",
]
[[package]]
name = "asn1-rs-impl"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2777730b2039ac0f95f093556e61b6d26cebed5393ca6f152717777cec3a42ed"
dependencies = [
"proc-macro2 1.0.38",
"quote 1.0.18",
"syn 1.0.93",
]
[[package]]
name = "assert_cmd"
version = "2.0.4"
@ -1080,6 +1119,12 @@ dependencies = [
"rayon",
]
[[package]]
name = "data-encoding"
version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57"
[[package]]
name = "der"
version = "0.5.1"
@ -1089,6 +1134,20 @@ dependencies = [
"const-oid",
]
[[package]]
name = "der-parser"
version = "8.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42d4bc9b0db0a0df9ae64634ac5bdefb7afcb534e182275ca0beadbe486701c1"
dependencies = [
"asn1-rs",
"displaydoc",
"nom 7.0.0",
"num-bigint 0.4.3",
"num-traits",
"rusticata-macros",
]
[[package]]
name = "derivation-path"
version = "0.2.0"
@ -1184,6 +1243,17 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "displaydoc"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3bf95dc3f046b9da4f2d51833c0d3547d8564ef6910f5c1ed130306a75b92886"
dependencies = [
"proc-macro2 1.0.38",
"quote 1.0.18",
"syn 1.0.93",
]
[[package]]
name = "dlopen"
version = "0.1.8"
@ -2832,6 +2902,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
[[package]]
name = "oid-registry"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d4bda43fd1b844cbc6e6e54b5444e2b1bc7838bce59ad205902cccbb26d6761"
dependencies = [
"asn1-rs",
]
[[package]]
name = "once_cell"
version = "1.12.0"
@ -3910,6 +3989,15 @@ dependencies = [
"semver 1.0.10",
]
[[package]]
name = "rusticata-macros"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "faf0c4a6ece9950b9abdb62b1cfcf2a68b3b67a10ba445b3bb85be2a293d0632"
dependencies = [
"nom 7.0.0",
]
[[package]]
name = "rustix"
version = "0.34.3"
@ -4849,11 +4937,13 @@ dependencies = [
"jsonrpc-http-server",
"lazy_static",
"log",
"pkcs8",
"quinn",
"quinn-proto",
"rand 0.7.3",
"rand_chacha 0.2.2",
"rayon",
"rcgen",
"reqwest",
"rustls 0.20.6",
"semver 1.0.10",
@ -6203,6 +6293,7 @@ dependencies = [
"solana-sdk 1.11.3",
"thiserror",
"tokio",
"x509-parser",
]
[[package]]
@ -6948,10 +7039,18 @@ version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2702e08a7a860f005826c6815dcac101b19b5eb330c27fe4a5928fec1d20ddd"
dependencies = [
"itoa 1.0.1",
"libc",
"num_threads",
"time-macros",
]
[[package]]
name = "time-macros"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42657b1a6f4d817cda8e7a0ace261fe0cc946cf3a80314390b22cc61ae080792"
[[package]]
name = "tiny-bip39"
version = "0.8.2"
@ -7858,6 +7957,24 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214"
[[package]]
name = "x509-parser"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0ecbeb7b67ce215e40e3cc7f2ff902f94a223acf44995934763467e7b1febc8"
dependencies = [
"asn1-rs",
"base64 0.13.0",
"data-encoding",
"der-parser",
"lazy_static",
"nom 7.0.0",
"oid-registry",
"rusticata-macros",
"thiserror",
"time 0.3.9",
]
[[package]]
name = "xattr"
version = "0.2.2"

View File

@ -27,11 +27,13 @@ itertools = "0.10.2"
jsonrpc-core = "18.0.0"
lazy_static = "1.4.0"
log = "0.4.17"
pkcs8 = { version = "0.8.0", features = ["alloc"] }
quinn = "0.8.3"
quinn-proto = "0.8.3"
rand = "0.7.0"
rand_chacha = "0.2.2"
rayon = "1.5.3"
rcgen = "0.9.2"
reqwest = { version = "0.11.11", default-features = false, features = ["blocking", "brotli", "deflate", "gzip", "rustls-tls", "json"] }
rustls = { version = "0.20.6", features = ["dangerous_configuration"] }
semver = "1.0.10"

View File

@ -1,16 +1,19 @@
use {
crate::{
nonblocking::{
quic_client::{QuicClient, QuicLazyInitializedEndpoint},
quic_client::{QuicClient, QuicClientCertificate, QuicLazyInitializedEndpoint},
tpu_connection::NonblockingConnection,
},
tpu_connection::{BlockingConnection, ClientStats},
},
indexmap::map::{Entry, IndexMap},
pkcs8::{der::Document, AlgorithmIdentifier, ObjectIdentifier},
rand::{thread_rng, Rng},
rcgen::{CertificateParams, DistinguishedName, DnType, SanType},
solana_measure::measure::Measure,
solana_sdk::{quic::QUIC_PORT_OFFSET, timing::AtomicInterval},
solana_sdk::{quic::QUIC_PORT_OFFSET, signature::Keypair, timing::AtomicInterval},
std::{
error::Error,
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{
atomic::{AtomicU64, Ordering},
@ -224,6 +227,7 @@ pub struct ConnectionCache {
last_stats: AtomicInterval,
connection_pool_size: usize,
tpu_udp_socket: Option<Arc<UdpSocket>>,
client_certificate: Arc<QuicClientCertificate>,
}
/// Models the pool of connections
@ -251,6 +255,68 @@ impl ConnectionPool {
}
}
pub(crate) fn new_cert(
identity_keypair: &Keypair,
san: IpAddr,
) -> Result<QuicClientCertificate, Box<dyn Error>> {
// Generate a self-signed cert from client's identity key
let cert_params = new_cert_params(identity_keypair, san);
let cert = rcgen::Certificate::from_params(cert_params)?;
let cert_der = cert.serialize_der().unwrap();
let priv_key = cert.serialize_private_key_der();
let priv_key = rustls::PrivateKey(priv_key);
Ok(QuicClientCertificate {
certificates: vec![rustls::Certificate(cert_der)],
key: priv_key,
})
}
fn convert_to_rcgen_keypair(identity_keypair: &Keypair) -> rcgen::KeyPair {
// from https://datatracker.ietf.org/doc/html/rfc8410#section-3
const ED25519_IDENTIFIER: [u32; 4] = [1, 3, 101, 112];
let mut private_key = Vec::<u8>::with_capacity(34);
private_key.extend_from_slice(&[0x04, 0x20]); // ASN.1 OCTET STRING
private_key.extend_from_slice(identity_keypair.secret().as_bytes());
let key_pkcs8 = pkcs8::PrivateKeyInfo {
algorithm: AlgorithmIdentifier {
oid: ObjectIdentifier::from_arcs(&ED25519_IDENTIFIER).unwrap(),
parameters: None,
},
private_key: &private_key,
public_key: None,
};
let key_pkcs8_der = key_pkcs8
.to_der()
.expect("Failed to convert keypair to DER")
.to_der();
// Parse private key into rcgen::KeyPair struct.
rcgen::KeyPair::from_der(&key_pkcs8_der).expect("Failed to parse keypair from DER")
}
fn new_cert_params(identity_keypair: &Keypair, san: IpAddr) -> CertificateParams {
// TODO(terorie): Is it safe to sign the TLS cert with the identity private key?
// Unfortunately, rcgen does not accept a "raw" Ed25519 key.
// We have to convert it to DER and pass it to the library.
// Convert private key into PKCS#8 v1 object.
// RFC 8410, Section 7: Private Key Format
// https://datatracker.ietf.org/doc/html/rfc8410#section-
let keypair = convert_to_rcgen_keypair(identity_keypair);
let mut cert_params = CertificateParams::default();
cert_params.subject_alt_names = vec![SanType::IpAddress(san)];
cert_params.alg = &rcgen::PKCS_ED25519;
cert_params.key_pair = Some(keypair);
cert_params.distinguished_name = DistinguishedName::new();
cert_params
.distinguished_name
.push(DnType::CommonName, "Solana node");
cert_params
}
impl ConnectionCache {
pub fn new(connection_pool_size: usize) -> Self {
// The minimum pool size is 1.
@ -262,6 +328,16 @@ impl ConnectionCache {
}
}
pub fn update_client_certificate(
&mut self,
keypair: &Keypair,
ipaddr: IpAddr,
) -> Result<(), Box<dyn Error>> {
let cert = new_cert(keypair, ipaddr)?;
self.client_certificate = Arc::new(cert);
Ok(())
}
pub fn with_udp(connection_pool_size: usize) -> Self {
// The minimum pool size is 1.
let connection_pool_size = 1.max(connection_pool_size);
@ -277,7 +353,9 @@ impl ConnectionCache {
fn create_endpoint(&self) -> Option<Arc<QuicLazyInitializedEndpoint>> {
if self.use_quic() {
Some(Arc::new(QuicLazyInitializedEndpoint::new()))
Some(Arc::new(QuicLazyInitializedEndpoint::new(
self.client_certificate.clone(),
)))
} else {
None
}
@ -499,6 +577,9 @@ impl Default for ConnectionCache {
.expect("Unable to bind to UDP socket"),
)
}),
client_certificate: new_cert(&Keypair::new(), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))
.map(Arc::new)
.expect("Failed to initialize QUIC client certificates"),
}
}
}

View File

@ -3,8 +3,10 @@
//! server's flow control.
use {
crate::{
client_error::ClientErrorKind, connection_cache::ConnectionCacheStats,
nonblocking::tpu_connection::TpuConnection, tpu_connection::ClientStats,
client_error::ClientErrorKind,
connection_cache::{new_cert, ConnectionCacheStats},
nonblocking::tpu_connection::TpuConnection,
tpu_connection::ClientStats,
},
async_mutex::Mutex,
async_trait::async_trait,
@ -22,6 +24,7 @@ use {
QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS,
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
},
signature::Keypair,
transport::Result as TransportResult,
},
std::{
@ -55,19 +58,26 @@ impl rustls::client::ServerCertVerifier for SkipServerVerification {
}
}
pub struct QuicClientCertificate {
pub certificates: Vec<rustls::Certificate>,
pub key: rustls::PrivateKey,
}
/// A lazy-initialized Quic Endpoint
pub struct QuicLazyInitializedEndpoint {
endpoint: RwLock<Option<Arc<Endpoint>>>,
client_certificate: Arc<QuicClientCertificate>,
}
impl QuicLazyInitializedEndpoint {
pub fn new() -> Self {
pub fn new(client_certificate: Arc<QuicClientCertificate>) -> Self {
Self {
endpoint: RwLock::new(None),
client_certificate,
}
}
fn create_endpoint() -> Endpoint {
fn create_endpoint(&self) -> Endpoint {
let (_, client_socket) = solana_net_utils::bind_in_range(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
VALIDATOR_PORT_RANGE,
@ -77,7 +87,11 @@ impl QuicLazyInitializedEndpoint {
let mut crypto = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_custom_certificate_verifier(SkipServerVerification::new())
.with_no_client_auth();
.with_single_cert(
self.client_certificate.certificates.clone(),
self.client_certificate.key.clone(),
)
.expect("Failed to set QUIC client certificates");
crypto.enable_early_data = true;
let mut endpoint =
@ -108,7 +122,7 @@ impl QuicLazyInitializedEndpoint {
match endpoint {
Some(endpoint) => endpoint.clone(),
None => {
let connection = Arc::new(Self::create_endpoint());
let connection = Arc::new(self.create_endpoint());
*lock = Some(connection.clone());
connection
}
@ -120,7 +134,9 @@ impl QuicLazyInitializedEndpoint {
impl Default for QuicLazyInitializedEndpoint {
fn default() -> Self {
Self::new()
let certificate = new_cert(&Keypair::new(), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))
.expect("Failed to create QUIC client certificate");
Self::new(Arc::new(certificate))
}
}

View File

@ -105,7 +105,7 @@ impl FindPacketSenderStakeStage {
Measure::start("apply_sender_stakes_time");
let mut apply_stake = || {
let ip_to_stake = staked_nodes.read().unwrap();
Self::apply_sender_stakes(&mut batches, &ip_to_stake.stake_map);
Self::apply_sender_stakes(&mut batches, &ip_to_stake.ip_stake_map);
};
apply_stake();
apply_sender_stakes_time.stop();

View File

@ -1,6 +1,7 @@
use {
solana_gossip::cluster_info::ClusterInfo,
solana_runtime::bank_forks::BankForks,
solana_sdk::pubkey::Pubkey,
solana_streamer::streamer::StakedNodes,
std::{
collections::HashMap,
@ -33,17 +34,20 @@ impl StakedNodesUpdaterService {
let mut last_stakes = Instant::now();
while !exit.load(Ordering::Relaxed) {
let mut new_ip_to_stake = HashMap::new();
let mut new_id_to_stake = HashMap::new();
let mut total_stake = 0;
if Self::try_refresh_ip_to_stake(
if Self::try_refresh_stake_maps(
&mut last_stakes,
&mut new_ip_to_stake,
&mut new_id_to_stake,
&mut total_stake,
&bank_forks,
&cluster_info,
) {
let mut shared = shared_staked_nodes.write().unwrap();
shared.total_stake = total_stake;
shared.stake_map = new_ip_to_stake;
shared.ip_stake_map = new_ip_to_stake;
shared.pubkey_stake_map = new_id_to_stake;
}
}
})
@ -52,9 +56,10 @@ impl StakedNodesUpdaterService {
Self { thread_hdl }
}
fn try_refresh_ip_to_stake(
fn try_refresh_stake_maps(
last_stakes: &mut Instant,
ip_to_stake: &mut HashMap<IpAddr, u64>,
id_to_stake: &mut HashMap<Pubkey, u64>,
total_stake: &mut u64,
bank_forks: &RwLock<BankForks>,
cluster_info: &ClusterInfo,
@ -66,6 +71,14 @@ impl StakedNodesUpdaterService {
.iter()
.map(|(_pubkey, stake)| stake)
.sum::<u64>();
*id_to_stake = cluster_info
.tvu_peers()
.into_iter()
.filter_map(|node| {
let stake = staked_nodes.get(&node.id)?;
Some((node.id, *stake))
})
.collect();
*ip_to_stake = cluster_info
.tvu_peers()
.into_iter()

View File

@ -758,7 +758,13 @@ impl Validator {
let poh_recorder = Arc::new(RwLock::new(poh_recorder));
let connection_cache = match use_quic {
true => Arc::new(ConnectionCache::new(tpu_connection_pool_size)),
true => {
let mut connection_cache = ConnectionCache::new(tpu_connection_pool_size);
connection_cache
.update_client_certificate(&identity_keypair, node.info.gossip.ip())
.expect("Failed to update QUIC client certificates");
Arc::new(connection_cache)
}
false => Arc::new(ConnectionCache::with_udp(tpu_connection_pool_size)),
};

117
programs/bpf/Cargo.lock generated
View File

@ -134,6 +134,45 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eab1c04a571841102f5345a8fc0f6bb3d31c315dec879b5c6e42e40ce7ffa34e"
[[package]]
name = "asn1-rs"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf6690c370453db30743b373a60ba498fc0d6d83b11f4abfd87a84a075db5dd4"
dependencies = [
"asn1-rs-derive",
"asn1-rs-impl",
"displaydoc",
"nom",
"num-traits",
"rusticata-macros",
"thiserror",
"time 0.3.9",
]
[[package]]
name = "asn1-rs-derive"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "726535892e8eae7e70657b4c8ea93d26b8553afb1ce617caee529ef96d7dee6c"
dependencies = [
"proc-macro2 1.0.38",
"quote 1.0.18",
"syn 1.0.93",
"synstructure",
]
[[package]]
name = "asn1-rs-impl"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2777730b2039ac0f95f093556e61b6d26cebed5393ca6f152717777cec3a42ed"
dependencies = [
"proc-macro2 1.0.38",
"quote 1.0.18",
"syn 1.0.93",
]
[[package]]
name = "assert_matches"
version = "1.5.0"
@ -902,6 +941,12 @@ dependencies = [
"rayon",
]
[[package]]
name = "data-encoding"
version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57"
[[package]]
name = "der"
version = "0.5.1"
@ -911,6 +956,20 @@ dependencies = [
"const-oid",
]
[[package]]
name = "der-parser"
version = "8.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42d4bc9b0db0a0df9ae64634ac5bdefb7afcb534e182275ca0beadbe486701c1"
dependencies = [
"asn1-rs",
"displaydoc",
"nom",
"num-bigint 0.4.3",
"num-traits",
"rusticata-macros",
]
[[package]]
name = "derivation-path"
version = "0.2.0"
@ -1000,6 +1059,17 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "displaydoc"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3bf95dc3f046b9da4f2d51833c0d3547d8564ef6910f5c1ed130306a75b92886"
dependencies = [
"proc-macro2 1.0.38",
"quote 1.0.18",
"syn 1.0.93",
]
[[package]]
name = "dlopen"
version = "0.1.8"
@ -2560,6 +2630,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
[[package]]
name = "oid-registry"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d4bda43fd1b844cbc6e6e54b5444e2b1bc7838bce59ad205902cccbb26d6761"
dependencies = [
"asn1-rs",
]
[[package]]
name = "once_cell"
version = "1.12.0"
@ -3485,6 +3564,15 @@ dependencies = [
"semver",
]
[[package]]
name = "rusticata-macros"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "faf0c4a6ece9950b9abdb62b1cfcf2a68b3b67a10ba445b3bb85be2a293d0632"
dependencies = [
"nom",
]
[[package]]
name = "rustix"
version = "0.34.4"
@ -4505,11 +4593,13 @@ dependencies = [
"jsonrpc-core",
"lazy_static",
"log",
"pkcs8",
"quinn",
"quinn-proto",
"rand 0.7.3",
"rand_chacha 0.2.2",
"rayon",
"rcgen",
"reqwest",
"rustls 0.20.6",
"semver",
@ -5489,6 +5579,7 @@ dependencies = [
"solana-sdk 1.11.3",
"thiserror",
"tokio",
"x509-parser",
]
[[package]]
@ -6096,10 +6187,18 @@ version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2702e08a7a860f005826c6815dcac101b19b5eb330c27fe4a5928fec1d20ddd"
dependencies = [
"itoa",
"libc",
"num_threads",
"time-macros",
]
[[package]]
name = "time-macros"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42657b1a6f4d817cda8e7a0ace261fe0cc946cf3a80314390b22cc61ae080792"
[[package]]
name = "tiny-bip39"
version = "0.8.2"
@ -6995,6 +7094,24 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "x509-parser"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0ecbeb7b67ce215e40e3cc7f2ff902f94a223acf44995934763467e7b1febc8"
dependencies = [
"asn1-rs",
"base64 0.13.0",
"data-encoding",
"der-parser",
"lazy_static",
"nom",
"oid-registry",
"rusticata-macros",
"thiserror",
"time 0.3.9",
]
[[package]]
name = "xattr"
version = "0.2.2"

View File

@ -30,6 +30,7 @@ solana-perf = { path = "../perf", version = "=1.11.3" }
solana-sdk = { path = "../sdk", version = "=1.11.3" }
thiserror = "1.0"
tokio = { version = "~1.14.1", features = ["full"] }
x509-parser = "0.14.0"
[dev-dependencies]
solana-logger = { path = "../logger", version = "=1.11.3" }

View File

@ -15,6 +15,7 @@ use {
solana_perf::packet::PacketBatch,
solana_sdk::{
packet::{Packet, PACKET_DATA_SIZE},
pubkey::Pubkey,
quic::{QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS},
signature::Keypair,
timing,
@ -31,6 +32,7 @@ use {
task::JoinHandle,
time::{sleep, timeout},
},
x509_parser::{prelude::*, public_key::PublicKey},
};
const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: f64 = 100_000f64;
@ -132,6 +134,31 @@ fn prune_unstaked_connection_table(
}
}
fn get_connection_stake(connection: &Connection, staked_nodes: Arc<RwLock<StakedNodes>>) -> u64 {
connection
.peer_identity()
.and_then(|der_cert_any| der_cert_any.downcast::<Vec<rustls::Certificate>>().ok())
.and_then(|der_certs| {
der_certs.first().and_then(|der_cert| {
X509Certificate::from_der(der_cert.as_ref())
.ok()
.and_then(|(_, cert)| {
cert.public_key().parsed().ok().and_then(|key| match key {
PublicKey::Unknown(inner_key) => {
let pubkey = Pubkey::new(inner_key);
debug!("Peer public key is {:?}", pubkey);
let staked_nodes = staked_nodes.read().unwrap();
staked_nodes.pubkey_stake_map.get(&pubkey).copied()
}
_ => None,
})
})
})
})
.unwrap_or(0)
}
async fn setup_connection(
connecting: Connecting,
unstaked_connection_table: Arc<Mutex<ConnectionTable>>,
@ -161,11 +188,8 @@ async fn setup_connection(
let remote_addr = connection.remote_address();
let table_and_stake = {
let staked_nodes = staked_nodes.read().unwrap();
if let Some(stake) = staked_nodes.stake_map.get(&remote_addr.ip()) {
let stake = *stake;
drop(staked_nodes);
let stake = get_connection_stake(&connection, staked_nodes.clone());
if stake > 0 {
let mut connection_table_l = staked_connection_table.lock().unwrap();
if connection_table_l.total_size >= max_staked_connections {
let num_pruned = connection_table_l.prune_random(stake);
@ -195,7 +219,6 @@ async fn setup_connection(
Some((connection_table_l, stake))
}
} else if max_unstaked_connections > 0 {
drop(staked_nodes);
let mut connection_table_l = unstaked_connection_table.lock().unwrap();
prune_unstaked_connection_table(
&mut connection_table_l,
@ -237,8 +260,18 @@ async fn setup_connection(
drop(connection_table_l);
let stats = stats.clone();
let connection_table = match table_type {
ConnectionPeerType::Unstaked => unstaked_connection_table.clone(),
ConnectionPeerType::Staked => staked_connection_table.clone(),
ConnectionPeerType::Unstaked => {
stats
.connection_added_from_unstaked_peer
.fetch_add(1, Ordering::Relaxed);
unstaked_connection_table.clone()
}
ConnectionPeerType::Staked => {
stats
.connection_added_from_staked_peer
.fetch_add(1, Ordering::Relaxed);
staked_connection_table.clone()
}
};
tokio::spawn(handle_connection(
uni_streams,
@ -624,12 +657,13 @@ impl ConnectionTable {
pub mod test {
use {
super::*,
crate::quic::{MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
crate::quic::{new_cert, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
crossbeam_channel::{unbounded, Receiver},
quinn::{ClientConfig, IdleTimeout, VarInt},
solana_sdk::{
quic::{QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS},
signature::Keypair,
signer::Signer,
},
std::net::Ipv4Addr,
tokio::time::sleep,
@ -657,11 +691,19 @@ pub mod test {
}
}
pub fn get_client_config() -> ClientConfig {
let crypto = rustls::ClientConfig::builder()
pub fn get_client_config(keypair: &Keypair) -> ClientConfig {
let ipaddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
let (certs, key) =
new_cert(keypair, ipaddr).expect("Failed to generate client certificate");
let mut crypto = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_custom_certificate_verifier(SkipServerVerification::new())
.with_no_client_auth();
.with_single_cert(certs, key)
.expect("Failed to use client certificate");
crypto.enable_early_data = true;
let mut config = ClientConfig::new(Arc::new(crypto));
let transport_config = Arc::get_mut(&mut config.transport).unwrap();
@ -705,17 +747,27 @@ pub mod test {
(t, exit, receiver, server_address, stats)
}
pub async fn make_client_endpoint(addr: &SocketAddr) -> NewConnection {
pub async fn make_client_endpoint(
addr: &SocketAddr,
client_keypair: Option<&Keypair>,
) -> NewConnection {
let client_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
let mut endpoint = quinn::Endpoint::new(EndpointConfig::default(), None, client_socket)
.unwrap()
.0;
endpoint.set_default_client_config(get_client_config());
endpoint.connect(*addr, "localhost").unwrap().await.unwrap()
let default_keypair = Keypair::new();
endpoint.set_default_client_config(get_client_config(
client_keypair.unwrap_or(&default_keypair),
));
endpoint
.connect(*addr, "localhost")
.expect("Failed in connecting")
.await
.expect("Failed in waiting")
}
pub async fn check_timeout(receiver: Receiver<PacketBatch>, server_address: SocketAddr) {
let conn1 = make_client_endpoint(&server_address).await;
let conn1 = make_client_endpoint(&server_address, None).await;
let total = 30;
for i in 0..total {
let mut s1 = conn1.connection.open_uni().await.unwrap();
@ -737,8 +789,8 @@ pub mod test {
}
pub async fn check_block_multiple_connections(server_address: SocketAddr) {
let conn1 = make_client_endpoint(&server_address).await;
let conn2 = make_client_endpoint(&server_address).await;
let conn1 = make_client_endpoint(&server_address, None).await;
let conn2 = make_client_endpoint(&server_address, None).await;
let mut s1 = conn1.connection.open_uni().await.unwrap();
let mut s2 = conn2.connection.open_uni().await.unwrap();
s1.write_all(&[0u8]).await.unwrap();
@ -759,8 +811,8 @@ pub mod test {
receiver: Receiver<PacketBatch>,
server_address: SocketAddr,
) {
let conn1 = Arc::new(make_client_endpoint(&server_address).await);
let conn2 = Arc::new(make_client_endpoint(&server_address).await);
let conn1 = Arc::new(make_client_endpoint(&server_address, None).await);
let conn2 = Arc::new(make_client_endpoint(&server_address, None).await);
let mut num_expected_packets = 0;
for i in 0..10 {
info!("sending: {}", i);
@ -798,8 +850,9 @@ pub mod test {
pub async fn check_multiple_writes(
receiver: Receiver<PacketBatch>,
server_address: SocketAddr,
client_keypair: Option<&Keypair>,
) {
let conn1 = Arc::new(make_client_endpoint(&server_address).await);
let conn1 = Arc::new(make_client_endpoint(&server_address, client_keypair).await);
// Send a full size packet with single byte writes.
let num_bytes = PACKET_DATA_SIZE;
@ -831,7 +884,7 @@ pub mod test {
}
pub async fn check_unstaked_node_connect_failure(server_address: SocketAddr) {
let conn1 = Arc::new(make_client_endpoint(&server_address).await);
let conn1 = Arc::new(make_client_endpoint(&server_address, None).await);
// Send a full size packet with single byte writes.
if let Ok(mut s1) = conn1.connection.open_uni().await {
@ -864,7 +917,7 @@ pub mod test {
solana_logger::setup();
let (t, exit, _receiver, server_address, stats) = setup_quic_server(None);
let conn1 = make_client_endpoint(&server_address).await;
let conn1 = make_client_endpoint(&server_address, None).await;
assert_eq!(stats.total_streams.load(Ordering::Relaxed), 0);
assert_eq!(stats.total_stream_read_timeouts.load(Ordering::Relaxed), 0);
@ -904,7 +957,7 @@ pub mod test {
async fn test_quic_server_multiple_writes() {
solana_logger::setup();
let (t, exit, receiver, server_address, _stats) = setup_quic_server(None);
check_multiple_writes(receiver, server_address).await;
check_multiple_writes(receiver, server_address, None).await;
exit.store(true, Ordering::Relaxed);
t.await.unwrap();
}
@ -913,16 +966,51 @@ pub mod test {
async fn test_quic_server_staked_connection_removal() {
solana_logger::setup();
let client_keypair = Keypair::new();
let mut staked_nodes = StakedNodes::default();
staked_nodes
.stake_map
.insert(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 100000);
.pubkey_stake_map
.insert(client_keypair.pubkey(), 100000);
staked_nodes.total_stake = 100000;
let (t, exit, receiver, server_address, stats) = setup_quic_server(Some(staked_nodes));
check_multiple_writes(receiver, server_address).await;
check_multiple_writes(receiver, server_address, Some(&client_keypair)).await;
exit.store(true, Ordering::Relaxed);
t.await.unwrap();
sleep(Duration::from_millis(100)).await;
assert_eq!(
stats
.connection_added_from_unstaked_peer
.load(Ordering::Relaxed),
0
);
assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 1);
assert_eq!(stats.connection_remove_failed.load(Ordering::Relaxed), 0);
}
#[tokio::test]
async fn test_quic_server_zero_staked_connection_removal() {
// In this test, the client has a pubkey, but is not in stake table.
solana_logger::setup();
let client_keypair = Keypair::new();
let mut staked_nodes = StakedNodes::default();
staked_nodes
.pubkey_stake_map
.insert(client_keypair.pubkey(), 0);
staked_nodes.total_stake = 0;
let (t, exit, receiver, server_address, stats) = setup_quic_server(Some(staked_nodes));
check_multiple_writes(receiver, server_address, Some(&client_keypair)).await;
exit.store(true, Ordering::Relaxed);
t.await.unwrap();
sleep(Duration::from_millis(100)).await;
assert_eq!(
stats
.connection_added_from_staked_peer
.load(Ordering::Relaxed),
0
);
assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 1);
assert_eq!(stats.connection_remove_failed.load(Ordering::Relaxed), 0);
}
@ -931,9 +1019,16 @@ pub mod test {
async fn test_quic_server_unstaked_connection_removal() {
solana_logger::setup();
let (t, exit, receiver, server_address, stats) = setup_quic_server(None);
check_multiple_writes(receiver, server_address).await;
check_multiple_writes(receiver, server_address, None).await;
exit.store(true, Ordering::Relaxed);
t.await.unwrap();
sleep(Duration::from_millis(100)).await;
assert_eq!(
stats
.connection_added_from_staked_peer
.load(Ordering::Relaxed),
0
);
assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 1);
assert_eq!(stats.connection_remove_failed.load(Ordering::Relaxed), 0);
}

View File

@ -5,6 +5,7 @@ use {
pkcs8::{der::Document, AlgorithmIdentifier, ObjectIdentifier},
quinn::{IdleTimeout, ServerConfig, VarInt},
rcgen::{CertificateParams, DistinguishedName, DnType, SanType},
rustls::{server::ClientCertVerified, Certificate, DistinguishedNames},
solana_perf::packet::PacketBatch,
solana_sdk::{
packet::PACKET_DATA_SIZE,
@ -19,6 +20,7 @@ use {
Arc, RwLock,
},
thread,
time::SystemTime,
},
tokio::runtime::{Builder, Runtime},
};
@ -27,6 +29,29 @@ pub const MAX_STAKED_CONNECTIONS: usize = 2000;
pub const MAX_UNSTAKED_CONNECTIONS: usize = 500;
const NUM_QUIC_STREAMER_WORKER_THREADS: usize = 4;
struct SkipClientVerification;
impl SkipClientVerification {
pub fn new() -> Arc<Self> {
Arc::new(Self)
}
}
impl rustls::server::ClientCertVerifier for SkipClientVerification {
fn client_auth_root_subjects(&self) -> Option<DistinguishedNames> {
Some(DistinguishedNames::new())
}
fn verify_client_cert(
&self,
_end_entity: &Certificate,
_intermediates: &[Certificate],
_now: SystemTime,
) -> Result<ClientCertVerified, rustls::Error> {
Ok(rustls::server::ClientCertVerified::assertion())
}
}
/// Returns default server configuration along with its PEM certificate chain.
#[allow(clippy::field_reassign_with_default)] // https://github.com/rust-lang/rust-clippy/issues/6527
pub(crate) fn configure_server(
@ -44,8 +69,13 @@ pub(crate) fn configure_server(
.collect();
let cert_chain_pem = pem::encode_many(&cert_chain_pem_parts);
let mut server_config = ServerConfig::with_single_cert(cert_chain, priv_key)
let server_tls_config = rustls::ServerConfig::builder()
.with_safe_defaults()
.with_client_cert_verifier(SkipClientVerification::new())
.with_single_cert(cert_chain, priv_key)
.map_err(|_e| QuicServerError::ConfigureFailed)?;
let mut server_config = ServerConfig::with_crypto(Arc::new(server_tls_config));
let config = Arc::get_mut(&mut server_config.transport).unwrap();
// QUIC_MAX_CONCURRENT_STREAMS doubled, which was found to improve reliability
@ -64,7 +94,7 @@ pub(crate) fn configure_server(
Ok((server_config, cert_chain_pem))
}
fn new_cert(
pub(crate) fn new_cert(
identity_keypair: &Keypair,
san: IpAddr,
) -> Result<(Vec<rustls::Certificate>, rustls::PrivateKey), Box<dyn Error>> {
@ -157,6 +187,8 @@ pub struct StreamStats {
pub(crate) total_stream_read_errors: AtomicUsize,
pub(crate) total_stream_read_timeouts: AtomicUsize,
pub(crate) num_evictions: AtomicUsize,
pub(crate) connection_added_from_staked_peer: AtomicUsize,
pub(crate) connection_added_from_unstaked_peer: AtomicUsize,
pub(crate) connection_add_failed: AtomicUsize,
pub(crate) connection_add_failed_invalid_stream_count: AtomicUsize,
pub(crate) connection_add_failed_unstaked_node: AtomicUsize,
@ -196,6 +228,18 @@ impl StreamStats {
self.num_evictions.swap(0, Ordering::Relaxed),
i64
),
(
"connection_added_from_staked_peer",
self.connection_added_from_staked_peer
.swap(0, Ordering::Relaxed),
i64
),
(
"connection_added_from_unstaked_peer",
self.connection_added_from_unstaked_peer
.swap(0, Ordering::Relaxed),
i64
),
(
"connection_add_failed",
self.connection_add_failed.swap(0, Ordering::Relaxed),
@ -427,7 +471,7 @@ mod test {
let (t, exit, receiver, server_address) = setup_quic_server();
let runtime = rt();
runtime.block_on(check_multiple_writes(receiver, server_address));
runtime.block_on(check_multiple_writes(receiver, server_address, None));
exit.store(true, Ordering::Relaxed);
t.join().unwrap();
}

View File

@ -9,7 +9,7 @@ use {
},
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
histogram::Histogram,
solana_sdk::{packet::Packet, timing::timestamp},
solana_sdk::{packet::Packet, pubkey::Pubkey, timing::timestamp},
std::{
cmp::Reverse,
collections::HashMap,
@ -28,7 +28,8 @@ use {
#[derive(Default)]
pub struct StakedNodes {
pub total_stake: u64,
pub stake_map: HashMap<IpAddr, u64>,
pub ip_stake_map: HashMap<IpAddr, u64>,
pub pubkey_stake_map: HashMap<Pubkey, u64>,
}
pub type PacketBatchReceiver = Receiver<PacketBatch>;