From 47c95d549e24182158846bb13b7b17baf9206df7 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 9 Aug 2023 10:44:52 +0200 Subject: [PATCH] agent cleanup --- quic-forward-proxy/src/outbound/tx_forward.rs | 106 ++++++++++++++---- 1 file changed, 85 insertions(+), 21 deletions(-) diff --git a/quic-forward-proxy/src/outbound/tx_forward.rs b/quic-forward-proxy/src/outbound/tx_forward.rs index c51b9c31..2a8a5c5a 100644 --- a/quic-forward-proxy/src/outbound/tx_forward.rs +++ b/quic-forward-proxy/src/outbound/tx_forward.rs @@ -15,14 +15,30 @@ use solana_streamer::nonblocking::quic::ALPN_TPU_PROTOCOL_ID; use solana_streamer::tls_certificates::new_self_signed_tls_certificate; use std::collections::HashMap; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; +use itertools::Itertools; use tokio::sync::mpsc::Receiver; const MAX_PARALLEL_STREAMS: usize = 6; pub const PARALLEL_TPU_CONNECTION_COUNT: usize = 4; +struct AgentHandle { + pub tpu_address: SocketAddr, + pub agent_exit_signal: Arc, + pub created_at: Instant, + // relative to start + pub last_used_ms: AtomicU64, +} + +impl AgentHandle { + pub fn touch(&self) { + let last_used_ms = Instant::now().duration_since(self.created_at).as_millis() as u64; + self.last_used_ms.store(last_used_ms, Ordering::Relaxed); + } +} + // takes transactions from upstream clients and forwards them to the TPU pub async fn tx_forwarder( validator_identity: ValidatorIdentity, @@ -35,7 +51,7 @@ pub async fn tx_forwarder( let (broadcast_in, _) = tokio::sync::broadcast::channel::>(1000); - let mut agents: HashMap>> = HashMap::new(); + let mut agents: HashMap = HashMap::new(); loop { if exit_signal.load(Ordering::Relaxed) { @@ -51,37 +67,48 @@ pub async fn tx_forwarder( let tpu_address = forward_packet.tpu_address; agents.entry(tpu_address).or_insert_with(|| { - let mut agent_exit_signals = Vec::new(); + let agent_exit_signal = Arc::new(AtomicBool::new(false)); + for connection_idx in 0..PARALLEL_TPU_CONNECTION_COUNT { let sharder = Sharder::new(connection_idx as u32, PARALLEL_TPU_CONNECTION_COUNT as u32); let global_exit_signal = exit_signal.clone(); - let agent_exit_signal = Arc::new(AtomicBool::new(false)); let endpoint_copy = endpoint.clone(); let agent_exit_signal_copy = agent_exit_signal.clone(); - // by subscribing we expect to get a copy of each packet - let mut per_connection_receiver = broadcast_in.subscribe(); tokio::spawn(async move { debug!( "Start Quic forwarder agent #{} for TPU {}", connection_idx, tpu_address ); - if global_exit_signal.load(Ordering::Relaxed) { - warn!("Caught global exit signal - stopping agent thread"); - return; - } - if agent_exit_signal_copy.load(Ordering::Relaxed) { - warn!("Caught exit signal for this agent - stopping agent thread"); - return; - } - // get a copy of the packet from broadcast channel let auto_connection = AutoReconnect::new(endpoint_copy, tpu_address); // TODO check exit signal (using select! or maybe replace with oneshot) let _exit_signal_copy = global_exit_signal.clone(); - while let Ok(packet) = per_connection_receiver.recv().await { + 'tx_channel_loop: loop { + let timeout_result = timeout_fallback(per_connection_receiver.recv()).await; + + if global_exit_signal.load(Ordering::Relaxed) { + warn!("Caught global exit signal - stopping agent thread"); + return; + } + if agent_exit_signal_copy.load(Ordering::Relaxed) { + warn!("Caught exit signal for this agent ({} #{}) - stopping agent thread", tpu_address, connection_idx); + return; + } + + if let Err(_elapsed) = timeout_result { + continue; + } + let maybe_packet = timeout_result.unwrap(); + + if let Err(_recv_error) = maybe_packet { + break 'tx_channel_loop; + } + + let packet = maybe_packet.unwrap(); + if packet.tpu_address != tpu_address { continue; } @@ -136,14 +163,22 @@ pub async fn tx_forwarder( connection_idx, tpu_address ); }); // -- spawned thread for one connection to one TPU - agent_exit_signals.push(agent_exit_signal); } // -- for parallel connections to one TPU - // FanOut::new(senders) - agent_exit_signals + let now = Instant::now(); + AgentHandle { + tpu_address, + agent_exit_signal, + created_at: now, + last_used_ms: AtomicU64::new(0), + } }); // -- new agent - let _agent_channel = agents.get(&tpu_address).unwrap(); + let agent = agents.get(&tpu_address).unwrap(); + agent.touch(); + + // TODO only call from time to time + cleanup_agents(&mut agents, &tpu_address); if broadcast_in.len() > 5 { debug!("tx-forward queue len: {}", broadcast_in.len()) @@ -158,6 +193,35 @@ pub async fn tx_forwarder( // not reachable } +// ms +const AGENT_SHUTDOWN_IDLE: u64 = 5_000; + +fn cleanup_agents(agents: &mut HashMap, current_tpu_address: &SocketAddr) { + let mut to_shutdown = Vec::new(); + for (tpu_address, handle) in &*agents { + if tpu_address == current_tpu_address { + continue; + } + + let last_used_ms = handle.last_used_ms.load(Ordering::Relaxed); + + if last_used_ms > AGENT_SHUTDOWN_IDLE { + to_shutdown.push(tpu_address.to_owned()) + } + } + + for tpu_address in to_shutdown.iter() { + if let Some(removed_agent) = agents.remove(&tpu_address) { + if let Ok(_) = removed_agent.agent_exit_signal.compare_exchange( + false, true, Ordering::Relaxed, Ordering::Relaxed) { + let last_used_ms = removed_agent.last_used_ms.load(Ordering::Relaxed); + debug!("Agent for tpu node {} was IDLE for {}ms - sending exit signal", removed_agent.tpu_address, last_used_ms); + } + } + } + +} + /// takes a validator identity and creates a new QUIC client; appears as staked peer to TPU // note: ATM the provided identity might or might not be a valid validator keypair async fn new_endpoint_with_validator_identity(validator_identity: ValidatorIdentity) -> Endpoint {