experiment with tokio tracing
This commit is contained in:
parent
0d1c95c327
commit
067c1a10f7
|
@ -27,6 +27,7 @@ log = { workspace = true }
|
|||
clap = { workspace = true }
|
||||
dashmap = { workspace = true }
|
||||
itertools = { workspace = true }
|
||||
tracing = "0.1.37"
|
||||
tracing-subscriber = { workspace = true }
|
||||
native-tls = { workspace = true }
|
||||
prometheus = { workspace = true }
|
||||
|
|
|
@ -4,7 +4,8 @@ use std::sync::Arc;
|
|||
use std::sync::atomic::{AtomicBool, AtomicU64};
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
use anyhow::{anyhow, bail};
|
||||
use tracing::{debug_span, instrument, Instrument, span};
|
||||
use anyhow::{anyhow, bail, Context};
|
||||
use itertools::{any, Itertools};
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use quinn::{Connecting, Connection, Endpoint, SendStream, ServerConfig, VarInt};
|
||||
|
@ -56,12 +57,15 @@ impl QuicForwardProxy {
|
|||
let quic_proxy: AnyhowJoinHandle = tokio::spawn(async move {
|
||||
info!("TPU Quic Proxy server start on {}", endpoint.local_addr()?);
|
||||
|
||||
while let Some(conn) = endpoint.accept().await {
|
||||
while let Some(connecting) = endpoint.accept().await {
|
||||
|
||||
let exit_signal = exit_signal.clone();
|
||||
let validator_identity_copy = self.validator_identity.clone();
|
||||
tokio::spawn(async move {
|
||||
match setup_connection(conn, exit_signal, validator_identity_copy).await {
|
||||
|
||||
let connection = connecting.await.context("accept connection").unwrap();
|
||||
match setup_connection(connection, exit_signal, validator_identity_copy)
|
||||
.instrument(debug_span!("inbound_conn")).await {
|
||||
Ok(()) => {}
|
||||
Err(err) => {
|
||||
error!("setup connection failed: {reason}", reason = err);
|
||||
|
@ -84,9 +88,8 @@ impl QuicForwardProxy {
|
|||
}
|
||||
|
||||
|
||||
async fn setup_connection(connecting: Connecting, exit_signal: Arc<AtomicBool>, validator_identity: Arc<Keypair>) -> anyhow::Result<()> {
|
||||
let connection = connecting.await?;
|
||||
debug!("inbound connection established, remote {connection}", connection = connection.remote_address());
|
||||
async fn setup_connection(connection: Connection, exit_signal: Arc<AtomicBool>, validator_identity: Arc<Keypair>) -> anyhow::Result<()> {
|
||||
debug!("inbound connection established, remote {}", connection.remote_address());
|
||||
|
||||
let active_tpu_connection =
|
||||
TpuQuicConnection::new_with_validator_identity(validator_identity.as_ref());
|
||||
|
@ -125,7 +128,7 @@ async fn setup_connection(connecting: Connecting, exit_signal: Arc<AtomicBool>,
|
|||
|
||||
active_tpu_connection_copy.send_txs_to_tpu(exit_signal_copy, validator_identity_copy, tpu_identity, tpu_address, &txs).await;
|
||||
|
||||
});
|
||||
}).instrument(debug_span!("send_txs_to_tpu")).await.unwrap();
|
||||
|
||||
} // -- loop
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue