make ping cache rate limit delay configurable (#27955)
This commit is contained in:
parent
cdb95a362f
commit
8b0f9b4917
|
@ -3,7 +3,7 @@ use {
|
|||
cluster_slots::ClusterSlots,
|
||||
duplicate_repair_status::ANCESTOR_HASH_REPAIR_SAMPLE_SIZE,
|
||||
repair_response,
|
||||
repair_service::{OutstandingShredRepairs, RepairStats},
|
||||
repair_service::{OutstandingShredRepairs, RepairStats, REPAIR_MS},
|
||||
request_response::RequestResponse,
|
||||
result::{Error, Result},
|
||||
},
|
||||
|
@ -76,6 +76,7 @@ pub const MAX_ANCESTOR_RESPONSES: usize =
|
|||
pub(crate) const REPAIR_PING_TOKEN_SIZE: usize = HASH_BYTES;
|
||||
pub const REPAIR_PING_CACHE_CAPACITY: usize = 65536;
|
||||
pub const REPAIR_PING_CACHE_TTL: Duration = Duration::from_secs(1280);
|
||||
const REPAIR_PING_CACHE_RATE_LIMIT_DELAY: Duration = Duration::from_secs(2);
|
||||
pub(crate) const REPAIR_RESPONSE_SERIALIZED_PING_BYTES: usize =
|
||||
4 /*enum discriminator*/ + PUBKEY_BYTES + REPAIR_PING_TOKEN_SIZE + SIGNATURE_BYTES;
|
||||
const SIGNED_REPAIR_TIME_WINDOW: Duration = Duration::from_secs(60 * 10); // 10 min
|
||||
|
@ -555,7 +556,14 @@ impl ServeRepair {
|
|||
const MAX_BYTES_PER_SECOND: usize = 12_000_000;
|
||||
const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000;
|
||||
|
||||
let mut ping_cache = PingCache::new(REPAIR_PING_CACHE_TTL, REPAIR_PING_CACHE_CAPACITY);
|
||||
// rate limit delay should be greater than the repair request iteration delay
|
||||
assert!(REPAIR_PING_CACHE_RATE_LIMIT_DELAY > Duration::from_millis(REPAIR_MS));
|
||||
|
||||
let mut ping_cache = PingCache::new(
|
||||
REPAIR_PING_CACHE_TTL,
|
||||
REPAIR_PING_CACHE_RATE_LIMIT_DELAY,
|
||||
REPAIR_PING_CACHE_CAPACITY,
|
||||
);
|
||||
|
||||
let recycler = PacketBatchRecycler::default();
|
||||
Builder::new()
|
||||
|
|
|
@ -127,6 +127,7 @@ const MAX_PRUNE_DATA_NODES: usize = 32;
|
|||
const GOSSIP_PING_TOKEN_SIZE: usize = 32;
|
||||
const GOSSIP_PING_CACHE_CAPACITY: usize = 65536;
|
||||
const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(1280);
|
||||
const GOSSIP_PING_CACHE_RATE_LIMIT_DELAY: Duration = Duration::from_secs(1280 / 64);
|
||||
pub const DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS: u64 = 10_000;
|
||||
pub const DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS: u64 = 60_000;
|
||||
/// Minimum serialized size of a Protocol::PullResponse packet.
|
||||
|
@ -412,6 +413,7 @@ impl ClusterInfo {
|
|||
my_contact_info: RwLock::new(contact_info),
|
||||
ping_cache: Mutex::new(PingCache::new(
|
||||
GOSSIP_PING_CACHE_TTL,
|
||||
GOSSIP_PING_CACHE_RATE_LIMIT_DELAY,
|
||||
GOSSIP_PING_CACHE_CAPACITY,
|
||||
)),
|
||||
stats: GossipStats::default(),
|
||||
|
|
|
@ -1011,6 +1011,7 @@ pub(crate) mod tests {
|
|||
let mut pings = Vec::new();
|
||||
let ping_cache = Mutex::new(PingCache::new(
|
||||
Duration::from_secs(20 * 60), // ttl
|
||||
Duration::from_secs(20 * 60) / 64, // rate_limit_delay
|
||||
128, // capacity
|
||||
));
|
||||
assert_eq!(
|
||||
|
@ -1109,6 +1110,7 @@ pub(crate) mod tests {
|
|||
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
|
||||
let mut ping_cache = PingCache::new(
|
||||
Duration::from_secs(20 * 60), // ttl
|
||||
Duration::from_secs(20 * 60) / 64, // rate_limit_delay
|
||||
128, // capacity
|
||||
);
|
||||
let mut crds = Crds::default();
|
||||
|
@ -1208,6 +1210,7 @@ pub(crate) mod tests {
|
|||
let mut node_crds = Crds::default();
|
||||
let mut ping_cache = PingCache::new(
|
||||
Duration::from_secs(20 * 60), // ttl
|
||||
Duration::from_secs(20 * 60) / 64, // rate_limit_delay
|
||||
128, // capacity
|
||||
);
|
||||
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
||||
|
@ -1320,6 +1323,7 @@ pub(crate) mod tests {
|
|||
.unwrap();
|
||||
let mut ping_cache = PingCache::new(
|
||||
Duration::from_secs(20 * 60), // ttl
|
||||
Duration::from_secs(20 * 60) / 64, // rate_limit_delay
|
||||
128, // capacity
|
||||
);
|
||||
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
||||
|
@ -1380,6 +1384,7 @@ pub(crate) mod tests {
|
|||
.unwrap();
|
||||
let mut ping_cache = PingCache::new(
|
||||
Duration::from_secs(20 * 60), // ttl
|
||||
Duration::from_secs(20 * 60) / 64, // rate_limit_delay
|
||||
128, // capacity
|
||||
);
|
||||
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 1);
|
||||
|
|
|
@ -38,6 +38,8 @@ pub struct Pong {
|
|||
pub struct PingCache {
|
||||
// Time-to-live of received pong messages.
|
||||
ttl: Duration,
|
||||
// Rate limit delay to generate pings for a given address
|
||||
rate_limit_delay: Duration,
|
||||
// Timestamp of last ping message sent to a remote node.
|
||||
// Used to rate limit pings to remote nodes.
|
||||
pings: LruCache<(Pubkey, SocketAddr), Instant>,
|
||||
|
@ -153,9 +155,12 @@ impl Signable for Pong {
|
|||
}
|
||||
|
||||
impl PingCache {
|
||||
pub fn new(ttl: Duration, cap: usize) -> Self {
|
||||
pub fn new(ttl: Duration, rate_limit_delay: Duration, cap: usize) -> Self {
|
||||
// Sanity check ttl/rate_limit_delay
|
||||
assert!(rate_limit_delay <= ttl / 2);
|
||||
Self {
|
||||
ttl,
|
||||
rate_limit_delay,
|
||||
pings: LruCache::new(cap),
|
||||
pongs: LruCache::new(cap),
|
||||
pending_cache: LruCache::new(cap),
|
||||
|
@ -192,10 +197,9 @@ impl PingCache {
|
|||
T: Serialize,
|
||||
F: FnMut() -> Option<Ping<T>>,
|
||||
{
|
||||
// Rate limit consecutive pings sent to a remote node.
|
||||
let delay = self.ttl / 64;
|
||||
match self.pings.peek(&node) {
|
||||
Some(t) if now.saturating_duration_since(*t) < delay => None,
|
||||
// Rate limit consecutive pings sent to a remote node.
|
||||
Some(t) if now.saturating_duration_since(*t) < self.rate_limit_delay => None,
|
||||
_ => {
|
||||
let ping = pingf()?;
|
||||
let token = serialize(&ping.token).ok()?;
|
||||
|
@ -255,6 +259,7 @@ impl PingCache {
|
|||
pub(crate) fn mock_clone(&self) -> Self {
|
||||
let mut clone = Self {
|
||||
ttl: self.ttl,
|
||||
rate_limit_delay: self.rate_limit_delay,
|
||||
pings: LruCache::new(self.pings.cap()),
|
||||
pongs: LruCache::new(self.pongs.cap()),
|
||||
pending_cache: LruCache::new(self.pending_cache.cap()),
|
||||
|
@ -317,7 +322,8 @@ mod tests {
|
|||
let now = Instant::now();
|
||||
let mut rng = rand::thread_rng();
|
||||
let ttl = Duration::from_millis(256);
|
||||
let mut cache = PingCache::new(ttl, /*cap=*/ 1000);
|
||||
let delay = ttl / 64;
|
||||
let mut cache = PingCache::new(ttl, delay, /*cap=*/ 1000);
|
||||
let this_node = Keypair::new();
|
||||
let keypairs: Vec<_> = repeat_with(Keypair::new).take(8).collect();
|
||||
let sockets: Vec<_> = repeat_with(|| {
|
||||
|
|
|
@ -54,6 +54,7 @@ impl Node {
|
|||
) -> Self {
|
||||
let ping_cache = Arc::new(Mutex::new(PingCache::new(
|
||||
Duration::from_secs(20 * 60), // ttl
|
||||
Duration::from_secs(20 * 60) / 64, // delay
|
||||
2048, // capacity
|
||||
)));
|
||||
Node {
|
||||
|
|
Loading…
Reference in New Issue