agent cleanup

This commit is contained in:
GroovieGermanikus 2023-08-09 10:44:52 +02:00
parent d776ed9dca
commit 47c95d549e
1 changed files with 85 additions and 21 deletions

View File

@ -15,14 +15,30 @@ use solana_streamer::nonblocking::quic::ALPN_TPU_PROTOCOL_ID;
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use itertools::Itertools;
use tokio::sync::mpsc::Receiver;
const MAX_PARALLEL_STREAMS: usize = 6;
pub const PARALLEL_TPU_CONNECTION_COUNT: usize = 4;
struct AgentHandle {
pub tpu_address: SocketAddr,
pub agent_exit_signal: Arc<AtomicBool>,
pub created_at: Instant,
// relative to start
pub last_used_ms: AtomicU64,
}
impl AgentHandle {
pub fn touch(&self) {
let last_used_ms = Instant::now().duration_since(self.created_at).as_millis() as u64;
self.last_used_ms.store(last_used_ms, Ordering::Relaxed);
}
}
// takes transactions from upstream clients and forwards them to the TPU
pub async fn tx_forwarder(
validator_identity: ValidatorIdentity,
@ -35,7 +51,7 @@ pub async fn tx_forwarder(
let (broadcast_in, _) = tokio::sync::broadcast::channel::<Arc<ForwardPacket>>(1000);
let mut agents: HashMap<SocketAddr, Vec<Arc<AtomicBool>>> = HashMap::new();
let mut agents: HashMap<SocketAddr, AgentHandle> = HashMap::new();
loop {
if exit_signal.load(Ordering::Relaxed) {
@ -51,37 +67,48 @@ pub async fn tx_forwarder(
let tpu_address = forward_packet.tpu_address;
agents.entry(tpu_address).or_insert_with(|| {
let mut agent_exit_signals = Vec::new();
let agent_exit_signal = Arc::new(AtomicBool::new(false));
for connection_idx in 0..PARALLEL_TPU_CONNECTION_COUNT {
let sharder =
Sharder::new(connection_idx as u32, PARALLEL_TPU_CONNECTION_COUNT as u32);
let global_exit_signal = exit_signal.clone();
let agent_exit_signal = Arc::new(AtomicBool::new(false));
let endpoint_copy = endpoint.clone();
let agent_exit_signal_copy = agent_exit_signal.clone();
// by subscribing we expect to get a copy of each packet
let mut per_connection_receiver = broadcast_in.subscribe();
tokio::spawn(async move {
debug!(
"Start Quic forwarder agent #{} for TPU {}",
connection_idx, tpu_address
);
if global_exit_signal.load(Ordering::Relaxed) {
warn!("Caught global exit signal - stopping agent thread");
return;
}
if agent_exit_signal_copy.load(Ordering::Relaxed) {
warn!("Caught exit signal for this agent - stopping agent thread");
return;
}
// get a copy of the packet from broadcast channel
let auto_connection = AutoReconnect::new(endpoint_copy, tpu_address);
// TODO check exit signal (using select! or maybe replace with oneshot)
let _exit_signal_copy = global_exit_signal.clone();
while let Ok(packet) = per_connection_receiver.recv().await {
'tx_channel_loop: loop {
let timeout_result = timeout_fallback(per_connection_receiver.recv()).await;
if global_exit_signal.load(Ordering::Relaxed) {
warn!("Caught global exit signal - stopping agent thread");
return;
}
if agent_exit_signal_copy.load(Ordering::Relaxed) {
warn!("Caught exit signal for this agent ({} #{}) - stopping agent thread", tpu_address, connection_idx);
return;
}
if let Err(_elapsed) = timeout_result {
continue;
}
let maybe_packet = timeout_result.unwrap();
if let Err(_recv_error) = maybe_packet {
break 'tx_channel_loop;
}
let packet = maybe_packet.unwrap();
if packet.tpu_address != tpu_address {
continue;
}
@ -136,14 +163,22 @@ pub async fn tx_forwarder(
connection_idx, tpu_address
);
}); // -- spawned thread for one connection to one TPU
agent_exit_signals.push(agent_exit_signal);
} // -- for parallel connections to one TPU
// FanOut::new(senders)
agent_exit_signals
let now = Instant::now();
AgentHandle {
tpu_address,
agent_exit_signal,
created_at: now,
last_used_ms: AtomicU64::new(0),
}
}); // -- new agent
let _agent_channel = agents.get(&tpu_address).unwrap();
let agent = agents.get(&tpu_address).unwrap();
agent.touch();
// TODO only call from time to time
cleanup_agents(&mut agents, &tpu_address);
if broadcast_in.len() > 5 {
debug!("tx-forward queue len: {}", broadcast_in.len())
@ -158,6 +193,35 @@ pub async fn tx_forwarder(
// not reachable
}
// ms
const AGENT_SHUTDOWN_IDLE: u64 = 5_000;
fn cleanup_agents(agents: &mut HashMap<SocketAddr, AgentHandle>, current_tpu_address: &SocketAddr) {
let mut to_shutdown = Vec::new();
for (tpu_address, handle) in &*agents {
if tpu_address == current_tpu_address {
continue;
}
let last_used_ms = handle.last_used_ms.load(Ordering::Relaxed);
if last_used_ms > AGENT_SHUTDOWN_IDLE {
to_shutdown.push(tpu_address.to_owned())
}
}
for tpu_address in to_shutdown.iter() {
if let Some(removed_agent) = agents.remove(&tpu_address) {
if let Ok(_) = removed_agent.agent_exit_signal.compare_exchange(
false, true, Ordering::Relaxed, Ordering::Relaxed) {
let last_used_ms = removed_agent.last_used_ms.load(Ordering::Relaxed);
debug!("Agent for tpu node {} was IDLE for {}ms - sending exit signal", removed_agent.tpu_address, last_used_ms);
}
}
}
}
/// takes a validator identity and creates a new QUIC client; appears as staked peer to TPU
// note: ATM the provided identity might or might not be a valid validator keypair
async fn new_endpoint_with_validator_identity(validator_identity: ValidatorIdentity) -> Endpoint {