diff --git a/Cargo.lock b/Cargo.lock index be4037ed7..342ba205e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4553,8 +4553,11 @@ dependencies = [ "itertools 0.10.3", "jsonrpc-core", "jsonrpc-http-server", + "lazy_static", "log", "quinn", + "rand 0.7.3", + "rand_chacha 0.2.2", "rayon", "reqwest", "rustls 0.20.4", diff --git a/client/Cargo.toml b/client/Cargo.toml index 98b9aa597..e986f528a 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -23,8 +23,11 @@ futures-util = "0.3.21" indicatif = "0.16.2" itertools = "0.10.2" jsonrpc-core = "18.0.0" +lazy_static = "1.4.0" log = "0.4.14" quinn = "0.8.0" +rand = "0.7.0" +rand_chacha = "0.2.2" rayon = "1.5.1" reqwest = { version = "0.11.10", default-features = false, features = ["blocking", "rustls-tls", "json"] } rustls = { version = "0.20.2", features = ["dangerous_configuration"] } diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs new file mode 100644 index 000000000..cee564230 --- /dev/null +++ b/client/src/connection_cache.rs @@ -0,0 +1,155 @@ +use { + crate::{tpu_connection::TpuConnection, udp_client::UdpTpuConnection}, + lazy_static::lazy_static, + std::{ + collections::{hash_map::Entry, BTreeMap, HashMap}, + net::{SocketAddr, UdpSocket}, + sync::{Arc, Mutex}, + }, +}; + +// Should be non-zero +static MAX_CONNECTIONS: usize = 64; + +struct ConnMap { + // Keeps track of the connection associated with an addr and the last time it was used + map: HashMap, u64)>, + // Helps to find the least recently used connection. The search and inserts are O(log(n)) + // but since we're bounding the size of the collections, this should be constant + // (and hopefully negligible) time. In theory, we can do this in constant time + // with a queue implemented as a doubly-linked list (and all the + // HashMap entries holding a "pointer" to the corresponding linked-list node), + // so we can push, pop and bump a used connection back to the end of the queue in O(1) time, but + // that seems non-"Rust-y" and low bang/buck. This is still pretty terrible though... + last_used_times: BTreeMap, + ticks: u64, +} + +impl ConnMap { + pub fn new() -> Self { + Self { + map: HashMap::new(), + last_used_times: BTreeMap::new(), + ticks: 0, + } + } +} + +lazy_static! { + static ref CONNECTION_MAP: Mutex = Mutex::new(ConnMap::new()); +} + +#[allow(dead_code)] +// TODO: see https://github.com/solana-labs/solana/issues/23661 +// remove lazy_static and optimize and refactor this +pub fn get_connection(addr: &SocketAddr) -> Arc { + let mut map = (*CONNECTION_MAP).lock().unwrap(); + let ticks = map.ticks; + + let (conn, target_ticks) = match map.map.entry(*addr) { + Entry::Occupied(mut entry) => { + let mut pair = entry.get_mut(); + let old_ticks = pair.1; + pair.1 = ticks; + (pair.0.clone(), old_ticks) + } + Entry::Vacant(entry) => { + let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + // TODO: see https://github.com/solana-labs/solana/issues/23659 + // make it configurable (e.g. via the command line) whether to use UDP or Quic + let conn = Arc::new(UdpTpuConnection::new(send_socket, *addr)); + entry.insert((conn.clone(), ticks)); + ( + conn as Arc, + ticks, + ) + } + }; + + let num_connections = map.map.len(); + if num_connections > MAX_CONNECTIONS { + let (old_ticks, target_addr) = { + let (old_ticks, target_addr) = map.last_used_times.iter().next().unwrap(); + (*old_ticks, *target_addr) + }; + map.map.remove(&target_addr); + map.last_used_times.remove(&old_ticks); + } + + if target_ticks != ticks { + map.last_used_times.remove(&target_ticks); + } + map.last_used_times.insert(ticks, *addr); + + map.ticks += 1; + conn +} + +#[cfg(test)] +mod tests { + use { + crate::connection_cache::{get_connection, CONNECTION_MAP, MAX_CONNECTIONS}, + rand::{Rng, SeedableRng}, + rand_chacha::ChaChaRng, + std::net::SocketAddr, + }; + + fn get_addr(rng: &mut ChaChaRng) -> SocketAddr { + let a = rng.gen_range(1, 255); + let b = rng.gen_range(1, 255); + let c = rng.gen_range(1, 255); + let d = rng.gen_range(1, 255); + + let addr_str = format!("{}.{}.{}.{}:80", a, b, c, d); + + addr_str.parse().expect("Invalid address") + } + + #[test] + fn test_connection_cache() { + // Allow the test to run deterministically + // with the same pseudorandom sequence between runs + // and on different platforms - the cryptographic security + // property isn't important here but ChaChaRng provides a way + // to get the same pseudorandom sequence on different platforms + let mut rng = ChaChaRng::seed_from_u64(42); + + // Generate a bunch of random addresses and create TPUConnections to them + // Since TPUConnection::new is infallible, it should't matter whether or not + // we can actually connect to those addresses - TPUConnection implementations should either + // be lazy and not connect until first use or handle connection errors somehow + // (without crashing, as would be required in a real practical validator) + let first_addr = get_addr(&mut rng); + assert!(get_connection(&first_addr).tpu_addr().ip() == first_addr.ip()); + let addrs = (0..MAX_CONNECTIONS) + .into_iter() + .map(|_| { + let addr = get_addr(&mut rng); + get_connection(&addr); + addr + }) + .collect::>(); + { + let map = (*CONNECTION_MAP).lock().unwrap(); + addrs.iter().for_each(|a| { + let conn = map.map.get(a).expect("Address not found"); + assert!(a.ip() == conn.0.tpu_addr().ip()); + }); + + assert!(map.map.get(&first_addr).is_none()); + } + + // Test that get_connection updates which connection is next up for eviction + // when an existing connection is used. Initially, addrs[0] should be next up for eviction, since + // it was the earliest added. But we do get_connection(&addrs[0]), thereby using + // that connection, and bumping it back to the end of the queue. So addrs[1] should be + // the next up for eviction. So we add a new connection, and test that addrs[0] is not + // evicted but addrs[1] is. + get_connection(&addrs[0]); + get_connection(&get_addr(&mut rng)); + + let map = (*CONNECTION_MAP).lock().unwrap(); + assert!(map.map.get(&addrs[0]).is_some()); + assert!(map.map.get(&addrs[1]).is_none()); + } +} diff --git a/client/src/lib.rs b/client/src/lib.rs index 6395ef286..7dbd8b11f 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -4,6 +4,7 @@ extern crate serde_derive; pub mod blockhash_query; pub mod client_error; +pub mod connection_cache; pub(crate) mod http_sender; pub(crate) mod mock_sender; pub mod nonblocking; diff --git a/client/src/tpu_connection.rs b/client/src/tpu_connection.rs index 4b5d434d7..4c9295b33 100644 --- a/client/src/tpu_connection.rs +++ b/client/src/tpu_connection.rs @@ -4,7 +4,9 @@ use { }; pub trait TpuConnection { - fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self; + fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self + where + Self: Sized; fn tpu_addr(&self) -> &SocketAddr; diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index c465bedab..284425c54 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -3220,8 +3220,11 @@ dependencies = [ "indicatif", "itertools 0.10.3", "jsonrpc-core", + "lazy_static", "log", "quinn", + "rand 0.7.3", + "rand_chacha 0.2.2", "rayon", "reqwest", "rustls",