diff --git a/src/workers/tpu_utils/tpu_connection_manager.rs b/src/workers/tpu_utils/tpu_connection_manager.rs index 9da87b3b..f7583e60 100644 --- a/src/workers/tpu_utils/tpu_connection_manager.rs +++ b/src/workers/tpu_utils/tpu_connection_manager.rs @@ -31,6 +31,8 @@ lazy_static::lazy_static! { register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap(); static ref NB_QUIC_TASKS: GenericGauge = register_int_gauge!(opts!("literpc_nb_quic_tasks", "Number quic tasks that are running")).unwrap(); + static ref NB_CONNECTIONS_TO_KEEP: GenericGauge = + register_int_gauge!(opts!("literpc_connections_to_keep", "Number of connections to keep asked by tpu service")).unwrap(); } struct ActiveConnection { @@ -77,7 +79,6 @@ impl ActiveConnection { { connection } else { - error!("timeout while connecting"); return Err(ConnectionError::TimedOut.into()); } } @@ -90,7 +91,6 @@ impl ActiveConnection { { connecting_result? } else { - error!("timeout while connecting"); return Err(ConnectionError::TimedOut.into()); } } @@ -155,7 +155,8 @@ impl ActiveConnection { } }, Err(e) => { - warn!("Could not reconnect to {} because of error {}", identity, e); + // validator no longer accepting connection + trace!("Could not reconnect to {} because of error {}", identity, e); continue; } } @@ -195,13 +196,13 @@ impl ActiveConnection { trace!("Sending {} transaction", identity); if let Err(e) = send_stream.write_all(tx.as_slice()).await { - error!( + warn!( "Error while writing transaction for {} error {}", identity, e ); } if let Err(e) = send_stream.finish().await { - error!( + warn!( "Error finishing for {}, error {}", identity, e, ) @@ -217,11 +218,6 @@ impl ActiveConnection { } }, _ = exit_oneshot_channel.recv() => { - if let Some(_) = &mut connection { - NB_QUIC_CONNECTIONS.dec(); - connection = None; - } - break; } }; @@ -258,7 +254,7 @@ impl ActiveConnection { struct ActiveConnectionWithExitChannel { pub active_connection: ActiveConnection, - pub exit_channel: tokio::sync::mpsc::Sender<()>, + pub exit_stream: tokio::sync::mpsc::Sender<()>, } pub struct TpuConnectionManager { @@ -315,9 +311,10 @@ impl TpuConnectionManager { transaction_sender: Arc>>, connections_to_keep: HashMap, ) { + NB_CONNECTIONS_TO_KEEP.set(connections_to_keep.len() as i64); for (identity, socket_addr) in &connections_to_keep { if self.identity_to_active_connection.get(&identity).is_none() { - info!("added a connection for {}, {}", identity, socket_addr); + trace!("added a connection for {}, {}", identity, socket_addr); let endpoint = self.endpoints.get(); let active_connection = ActiveConnection::new(endpoint, socket_addr.clone(), identity.clone()); @@ -330,7 +327,7 @@ impl TpuConnectionManager { identity.clone(), Arc::new(ActiveConnectionWithExitChannel { active_connection, - exit_channel: sx, + exit_stream: sx, }), ); } @@ -344,13 +341,13 @@ impl TpuConnectionManager { .collect::>(); for (identity, value) in collect_current_active_connections.iter() { if !connections_to_keep.contains_key(identity) { - info!("removing a connection for {}", identity); + trace!("removing a connection for {}", identity); // ignore error for exit channel value .active_connection .exit_signal .store(true, Ordering::Relaxed); - let _ = value.exit_channel.send(()).await; + let _ = value.exit_stream.send(()).await; self.identity_to_active_connection.remove(identity); } } diff --git a/src/workers/tx_sender.rs b/src/workers/tx_sender.rs index a7071bca..59ff8ea7 100644 --- a/src/workers/tx_sender.rs +++ b/src/workers/tx_sender.rs @@ -138,7 +138,7 @@ impl TxSender { MESSAGES_IN_POSTGRES_CHANNEL.inc(); } histo_timer.observe_duration(); - info!( + trace!( "It took {} ms to send a batch of {} transaction(s)", start.elapsed().as_millis(), sigs_and_slots.len()