Quic Connection Cache (#23598)
Add a connection cache to allow add modules that send data to get or create connections (e.g. for quic) associated with a certain SocketAddr
This commit is contained in:
parent
2d3501dff9
commit
9b46f9b2da
|
@ -4553,8 +4553,11 @@ dependencies = [
|
|||
"itertools 0.10.3",
|
||||
"jsonrpc-core",
|
||||
"jsonrpc-http-server",
|
||||
"lazy_static",
|
||||
"log",
|
||||
"quinn",
|
||||
"rand 0.7.3",
|
||||
"rand_chacha 0.2.2",
|
||||
"rayon",
|
||||
"reqwest",
|
||||
"rustls 0.20.4",
|
||||
|
|
|
@ -23,8 +23,11 @@ futures-util = "0.3.21"
|
|||
indicatif = "0.16.2"
|
||||
itertools = "0.10.2"
|
||||
jsonrpc-core = "18.0.0"
|
||||
lazy_static = "1.4.0"
|
||||
log = "0.4.14"
|
||||
quinn = "0.8.0"
|
||||
rand = "0.7.0"
|
||||
rand_chacha = "0.2.2"
|
||||
rayon = "1.5.1"
|
||||
reqwest = { version = "0.11.10", default-features = false, features = ["blocking", "rustls-tls", "json"] }
|
||||
rustls = { version = "0.20.2", features = ["dangerous_configuration"] }
|
||||
|
|
|
@ -0,0 +1,155 @@
|
|||
use {
|
||||
crate::{tpu_connection::TpuConnection, udp_client::UdpTpuConnection},
|
||||
lazy_static::lazy_static,
|
||||
std::{
|
||||
collections::{hash_map::Entry, BTreeMap, HashMap},
|
||||
net::{SocketAddr, UdpSocket},
|
||||
sync::{Arc, Mutex},
|
||||
},
|
||||
};
|
||||
|
||||
// Should be non-zero
|
||||
static MAX_CONNECTIONS: usize = 64;
|
||||
|
||||
struct ConnMap {
|
||||
// Keeps track of the connection associated with an addr and the last time it was used
|
||||
map: HashMap<SocketAddr, (Arc<dyn TpuConnection + 'static + Sync + Send>, u64)>,
|
||||
// Helps to find the least recently used connection. The search and inserts are O(log(n))
|
||||
// but since we're bounding the size of the collections, this should be constant
|
||||
// (and hopefully negligible) time. In theory, we can do this in constant time
|
||||
// with a queue implemented as a doubly-linked list (and all the
|
||||
// HashMap entries holding a "pointer" to the corresponding linked-list node),
|
||||
// so we can push, pop and bump a used connection back to the end of the queue in O(1) time, but
|
||||
// that seems non-"Rust-y" and low bang/buck. This is still pretty terrible though...
|
||||
last_used_times: BTreeMap<u64, SocketAddr>,
|
||||
ticks: u64,
|
||||
}
|
||||
|
||||
impl ConnMap {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
map: HashMap::new(),
|
||||
last_used_times: BTreeMap::new(),
|
||||
ticks: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
static ref CONNECTION_MAP: Mutex<ConnMap> = Mutex::new(ConnMap::new());
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
// TODO: see https://github.com/solana-labs/solana/issues/23661
|
||||
// remove lazy_static and optimize and refactor this
|
||||
pub fn get_connection(addr: &SocketAddr) -> Arc<dyn TpuConnection + 'static + Sync + Send> {
|
||||
let mut map = (*CONNECTION_MAP).lock().unwrap();
|
||||
let ticks = map.ticks;
|
||||
|
||||
let (conn, target_ticks) = match map.map.entry(*addr) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
let mut pair = entry.get_mut();
|
||||
let old_ticks = pair.1;
|
||||
pair.1 = ticks;
|
||||
(pair.0.clone(), old_ticks)
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
// TODO: see https://github.com/solana-labs/solana/issues/23659
|
||||
// make it configurable (e.g. via the command line) whether to use UDP or Quic
|
||||
let conn = Arc::new(UdpTpuConnection::new(send_socket, *addr));
|
||||
entry.insert((conn.clone(), ticks));
|
||||
(
|
||||
conn as Arc<dyn TpuConnection + 'static + Sync + Send>,
|
||||
ticks,
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let num_connections = map.map.len();
|
||||
if num_connections > MAX_CONNECTIONS {
|
||||
let (old_ticks, target_addr) = {
|
||||
let (old_ticks, target_addr) = map.last_used_times.iter().next().unwrap();
|
||||
(*old_ticks, *target_addr)
|
||||
};
|
||||
map.map.remove(&target_addr);
|
||||
map.last_used_times.remove(&old_ticks);
|
||||
}
|
||||
|
||||
if target_ticks != ticks {
|
||||
map.last_used_times.remove(&target_ticks);
|
||||
}
|
||||
map.last_used_times.insert(ticks, *addr);
|
||||
|
||||
map.ticks += 1;
|
||||
conn
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use {
|
||||
crate::connection_cache::{get_connection, CONNECTION_MAP, MAX_CONNECTIONS},
|
||||
rand::{Rng, SeedableRng},
|
||||
rand_chacha::ChaChaRng,
|
||||
std::net::SocketAddr,
|
||||
};
|
||||
|
||||
fn get_addr(rng: &mut ChaChaRng) -> SocketAddr {
|
||||
let a = rng.gen_range(1, 255);
|
||||
let b = rng.gen_range(1, 255);
|
||||
let c = rng.gen_range(1, 255);
|
||||
let d = rng.gen_range(1, 255);
|
||||
|
||||
let addr_str = format!("{}.{}.{}.{}:80", a, b, c, d);
|
||||
|
||||
addr_str.parse().expect("Invalid address")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_connection_cache() {
|
||||
// Allow the test to run deterministically
|
||||
// with the same pseudorandom sequence between runs
|
||||
// and on different platforms - the cryptographic security
|
||||
// property isn't important here but ChaChaRng provides a way
|
||||
// to get the same pseudorandom sequence on different platforms
|
||||
let mut rng = ChaChaRng::seed_from_u64(42);
|
||||
|
||||
// Generate a bunch of random addresses and create TPUConnections to them
|
||||
// Since TPUConnection::new is infallible, it should't matter whether or not
|
||||
// we can actually connect to those addresses - TPUConnection implementations should either
|
||||
// be lazy and not connect until first use or handle connection errors somehow
|
||||
// (without crashing, as would be required in a real practical validator)
|
||||
let first_addr = get_addr(&mut rng);
|
||||
assert!(get_connection(&first_addr).tpu_addr().ip() == first_addr.ip());
|
||||
let addrs = (0..MAX_CONNECTIONS)
|
||||
.into_iter()
|
||||
.map(|_| {
|
||||
let addr = get_addr(&mut rng);
|
||||
get_connection(&addr);
|
||||
addr
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
{
|
||||
let map = (*CONNECTION_MAP).lock().unwrap();
|
||||
addrs.iter().for_each(|a| {
|
||||
let conn = map.map.get(a).expect("Address not found");
|
||||
assert!(a.ip() == conn.0.tpu_addr().ip());
|
||||
});
|
||||
|
||||
assert!(map.map.get(&first_addr).is_none());
|
||||
}
|
||||
|
||||
// Test that get_connection updates which connection is next up for eviction
|
||||
// when an existing connection is used. Initially, addrs[0] should be next up for eviction, since
|
||||
// it was the earliest added. But we do get_connection(&addrs[0]), thereby using
|
||||
// that connection, and bumping it back to the end of the queue. So addrs[1] should be
|
||||
// the next up for eviction. So we add a new connection, and test that addrs[0] is not
|
||||
// evicted but addrs[1] is.
|
||||
get_connection(&addrs[0]);
|
||||
get_connection(&get_addr(&mut rng));
|
||||
|
||||
let map = (*CONNECTION_MAP).lock().unwrap();
|
||||
assert!(map.map.get(&addrs[0]).is_some());
|
||||
assert!(map.map.get(&addrs[1]).is_none());
|
||||
}
|
||||
}
|
|
@ -4,6 +4,7 @@ extern crate serde_derive;
|
|||
|
||||
pub mod blockhash_query;
|
||||
pub mod client_error;
|
||||
pub mod connection_cache;
|
||||
pub(crate) mod http_sender;
|
||||
pub(crate) mod mock_sender;
|
||||
pub mod nonblocking;
|
||||
|
|
|
@ -4,7 +4,9 @@ use {
|
|||
};
|
||||
|
||||
pub trait TpuConnection {
|
||||
fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self;
|
||||
fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self
|
||||
where
|
||||
Self: Sized;
|
||||
|
||||
fn tpu_addr(&self) -> &SocketAddr;
|
||||
|
||||
|
|
|
@ -3220,8 +3220,11 @@ dependencies = [
|
|||
"indicatif",
|
||||
"itertools 0.10.3",
|
||||
"jsonrpc-core",
|
||||
"lazy_static",
|
||||
"log",
|
||||
"quinn",
|
||||
"rand 0.7.3",
|
||||
"rand_chacha 0.2.2",
|
||||
"rayon",
|
||||
"reqwest",
|
||||
"rustls",
|
||||
|
|
Loading…
Reference in New Issue