log obfuscated url
This commit is contained in:
parent
342303a9ec
commit
40bebb07b5
|
@ -1255,6 +1255,7 @@ dependencies = [
|
|||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
"yellowstone-grpc-client",
|
||||
"yellowstone-grpc-proto",
|
||||
]
|
||||
|
|
|
@ -9,6 +9,7 @@ yellowstone-grpc-proto = "1.11.0"
|
|||
|
||||
solana-sdk = "~1.16.17"
|
||||
|
||||
url = "2.5.0"
|
||||
async-stream = "0.3.5"
|
||||
tokio = { version = "1.28" , features = ["rt"] }
|
||||
futures = "0.3.28"
|
||||
|
|
|
@ -3,6 +3,7 @@ use futures::{Stream, StreamExt};
|
|||
use log::{debug, info, log, trace, warn, Level};
|
||||
use solana_sdk::commitment_config::CommitmentConfig;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Display;
|
||||
use std::pin::Pin;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::task::JoinHandle;
|
||||
|
@ -34,6 +35,12 @@ pub struct GrpcSourceConfig {
|
|||
timeouts: Option<GrpcConnectionTimeouts>,
|
||||
}
|
||||
|
||||
impl Display for GrpcSourceConfig {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "grpc_addr: {}", crate::obfuscate::url_obfuscate_api_token(&self.grpc_addr))
|
||||
}
|
||||
}
|
||||
|
||||
impl GrpcSourceConfig {
|
||||
/// Create a grpc source without tls and timeouts
|
||||
pub fn new_simple(grpc_addr: String) -> Self {
|
||||
|
@ -198,7 +205,7 @@ pub fn create_geyser_reconnecting_stream(
|
|||
Ok(Ok(subscribed_stream)) => (ConnectionState::Ready(attempt, subscribed_stream), Message::Connecting(attempt)),
|
||||
Ok(Err(geyser_error)) => {
|
||||
// TODO identify non-recoverable errors and cancel stream
|
||||
warn!("Subscribe failed - retrying: {:?}", geyser_error);
|
||||
warn!("Subscribe failed on {} - retrying: {:?}", grpc_source, geyser_error);
|
||||
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
|
||||
},
|
||||
Err(geyser_grpc_task_error) => {
|
||||
|
@ -212,17 +219,17 @@ pub fn create_geyser_reconnecting_stream(
|
|||
|
||||
match geyser_stream.next().await {
|
||||
Some(Ok(update_message)) => {
|
||||
trace!("> recv update message");
|
||||
trace!("> recv update message from {}", grpc_source);
|
||||
(ConnectionState::Ready(attempt, geyser_stream), Message::GeyserSubscribeUpdate(update_message))
|
||||
}
|
||||
Some(Err(tonic_status)) => {
|
||||
// TODO identify non-recoverable errors and cancel stream
|
||||
debug!("! error - retrying: {:?}", tonic_status);
|
||||
debug!("! error on {} - retrying: {:?}", grpc_source, tonic_status);
|
||||
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
|
||||
}
|
||||
None => {
|
||||
//TODO should not arrive. Mean the stream close.
|
||||
warn!("! geyser stream closed - retrying");
|
||||
warn!("! geyser stream closed on {} - retrying", grpc_source);
|
||||
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
|
||||
}
|
||||
}
|
||||
|
@ -231,7 +238,7 @@ pub fn create_geyser_reconnecting_stream(
|
|||
|
||||
ConnectionState::WaitReconnect(attempt) => {
|
||||
let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0);
|
||||
info!("Waiting {} seconds, then reconnect", backoff_secs);
|
||||
info!("Waiting {} seconds, then reconnect to {}", backoff_secs, grpc_source);
|
||||
sleep(Duration::from_secs_f32(backoff_secs)).await;
|
||||
(ConnectionState::NotConnected(attempt), Message::Connecting(attempt))
|
||||
}
|
||||
|
|
|
@ -3,3 +3,4 @@ pub mod grpc_subscription;
|
|||
pub mod grpc_subscription_autoreconnect;
|
||||
pub mod grpcmultiplex_fastestwins;
|
||||
mod grpc_stream_utils;
|
||||
mod obfuscate;
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
use std::borrow::Cow;
|
||||
use url::Url;
|
||||
|
||||
/// obfuscate urls with api token like http://mango.rpcpool.com/a991fba00fagbad
|
||||
pub fn url_obfuscate_api_token(url: &str) -> Cow<str> {
|
||||
if let Ok(mut parsed) = Url::parse(url) {
|
||||
if parsed.path() == "/" {
|
||||
return Cow::Borrowed(url);
|
||||
} else {
|
||||
parsed.set_path("omitted-secret");
|
||||
Cow::Owned(parsed.to_string())
|
||||
}
|
||||
} else {
|
||||
Cow::Borrowed(url)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[test]
|
||||
fn test_obfuscate_path() {
|
||||
let url_mango = "http://mango.rpcpool.com/121sdfsdf21";
|
||||
let obfuscated = url_obfuscate_api_token(url_mango);
|
||||
assert_eq!(obfuscated, "http://mango.rpcpool.com/omitted-secret");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_obfuscate_nopath() {
|
||||
let url_localhost = "http://127.0.0.1";
|
||||
let obfuscated = url_obfuscate_api_token(url_localhost);
|
||||
assert_eq!(obfuscated, "http://127.0.0.1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_obfuscate_invalid() {
|
||||
let url_localhost = "::::invalid";
|
||||
let obfuscated = url_obfuscate_api_token(url_localhost);
|
||||
assert_eq!(obfuscated, "::::invalid");
|
||||
}
|
Loading…
Reference in New Issue