Merge pull request #112 from blockworks-foundation/custom_tpu_client
moving some messages to trace, to make logs less verbose
This commit is contained in:
commit
4caf43e867
|
@ -31,6 +31,8 @@ lazy_static::lazy_static! {
|
|||
register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap();
|
||||
static ref NB_QUIC_TASKS: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_nb_quic_tasks", "Number quic tasks that are running")).unwrap();
|
||||
static ref NB_CONNECTIONS_TO_KEEP: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_connections_to_keep", "Number of connections to keep asked by tpu service")).unwrap();
|
||||
}
|
||||
|
||||
struct ActiveConnection {
|
||||
|
@ -77,7 +79,6 @@ impl ActiveConnection {
|
|||
{
|
||||
connection
|
||||
} else {
|
||||
error!("timeout while connecting");
|
||||
return Err(ConnectionError::TimedOut.into());
|
||||
}
|
||||
}
|
||||
|
@ -90,7 +91,6 @@ impl ActiveConnection {
|
|||
{
|
||||
connecting_result?
|
||||
} else {
|
||||
error!("timeout while connecting");
|
||||
return Err(ConnectionError::TimedOut.into());
|
||||
}
|
||||
}
|
||||
|
@ -150,13 +150,14 @@ impl ActiveConnection {
|
|||
Ok(stream) => stream,
|
||||
Err(e) => {
|
||||
warn!("error opening a unistream for {} error {}", identity, e);
|
||||
continue;
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
warn!("Could not reconnect to {} because of error {}", identity, e);
|
||||
continue;
|
||||
// validator no longer accepting connection
|
||||
trace!("Could not reconnect to {} because of error {}", identity, e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -186,7 +187,7 @@ impl ActiveConnection {
|
|||
},
|
||||
Err(e) => {
|
||||
warn!("Could not connect to {} because of error {}", identity, e);
|
||||
continue;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -195,13 +196,13 @@ impl ActiveConnection {
|
|||
|
||||
trace!("Sending {} transaction", identity);
|
||||
if let Err(e) = send_stream.write_all(tx.as_slice()).await {
|
||||
error!(
|
||||
warn!(
|
||||
"Error while writing transaction for {} error {}",
|
||||
identity, e
|
||||
);
|
||||
}
|
||||
if let Err(e) = send_stream.finish().await {
|
||||
error!(
|
||||
warn!(
|
||||
"Error finishing for {}, error {}",
|
||||
identity, e,
|
||||
)
|
||||
|
@ -217,11 +218,6 @@ impl ActiveConnection {
|
|||
}
|
||||
},
|
||||
_ = exit_oneshot_channel.recv() => {
|
||||
if let Some(_) = &mut connection {
|
||||
NB_QUIC_CONNECTIONS.dec();
|
||||
connection = None;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
@ -258,7 +254,7 @@ impl ActiveConnection {
|
|||
|
||||
struct ActiveConnectionWithExitChannel {
|
||||
pub active_connection: ActiveConnection,
|
||||
pub exit_channel: tokio::sync::mpsc::Sender<()>,
|
||||
pub exit_stream: tokio::sync::mpsc::Sender<()>,
|
||||
}
|
||||
|
||||
pub struct TpuConnectionManager {
|
||||
|
@ -315,9 +311,10 @@ impl TpuConnectionManager {
|
|||
transaction_sender: Arc<Sender<Vec<u8>>>,
|
||||
connections_to_keep: HashMap<Pubkey, SocketAddr>,
|
||||
) {
|
||||
NB_CONNECTIONS_TO_KEEP.set(connections_to_keep.len() as i64);
|
||||
for (identity, socket_addr) in &connections_to_keep {
|
||||
if self.identity_to_active_connection.get(&identity).is_none() {
|
||||
info!("added a connection for {}, {}", identity, socket_addr);
|
||||
trace!("added a connection for {}, {}", identity, socket_addr);
|
||||
let endpoint = self.endpoints.get();
|
||||
let active_connection =
|
||||
ActiveConnection::new(endpoint, socket_addr.clone(), identity.clone());
|
||||
|
@ -330,7 +327,7 @@ impl TpuConnectionManager {
|
|||
identity.clone(),
|
||||
Arc::new(ActiveConnectionWithExitChannel {
|
||||
active_connection,
|
||||
exit_channel: sx,
|
||||
exit_stream: sx,
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
@ -344,13 +341,13 @@ impl TpuConnectionManager {
|
|||
.collect::<Vec<_>>();
|
||||
for (identity, value) in collect_current_active_connections.iter() {
|
||||
if !connections_to_keep.contains_key(identity) {
|
||||
info!("removing a connection for {}", identity);
|
||||
trace!("removing a connection for {}", identity);
|
||||
// ignore error for exit channel
|
||||
value
|
||||
.active_connection
|
||||
.exit_signal
|
||||
.store(true, Ordering::Relaxed);
|
||||
let _ = value.exit_channel.send(()).await;
|
||||
let _ = value.exit_stream.send(()).await;
|
||||
self.identity_to_active_connection.remove(identity);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -138,7 +138,7 @@ impl TxSender {
|
|||
MESSAGES_IN_POSTGRES_CHANNEL.inc();
|
||||
}
|
||||
histo_timer.observe_duration();
|
||||
info!(
|
||||
trace!(
|
||||
"It took {} ms to send a batch of {} transaction(s)",
|
||||
start.elapsed().as_millis(),
|
||||
sigs_and_slots.len()
|
||||
|
|
Loading…
Reference in New Issue