Open/closed connections

This commit is contained in:
waterquarks 2022-09-23 21:02:40 +02:00
parent 1bebc2f6bc
commit 82564e117f
1 changed files with 17 additions and 1 deletions

View File

@ -13,6 +13,7 @@ use solana_geyser_connector_lib::{
fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage, MarketConfig},
grpc_plugin_source, metrics, websocket_source, SourceConfig,
};
use solana_geyser_connector_lib::metrics::MetricU64;
type CheckpointMap = Arc<Mutex<HashMap<String, FillCheckpoint>>>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, UnboundedSender<Message>>>>;
@ -22,11 +23,18 @@ async fn handle_connection_error(
peer_map: PeerMap,
raw_stream: TcpStream,
addr: SocketAddr,
metrics_opened_connections: MetricU64,
metrics_closed_connections: MetricU64
) {
metrics_opened_connections.clone().increment();
let result = handle_connection(checkpoint_map, peer_map.clone(), raw_stream, addr).await;
if result.is_err() {
error!("connection {} error {}", addr, result.unwrap_err());
};
metrics_closed_connections.clone().increment();
peer_map.lock().unwrap().remove(&addr);
}
@ -78,8 +86,10 @@ pub struct Config {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args: Vec<String> = std::env::args().collect();
if args.len() < 2 {
error!("requires a config file argument");
eprintln!("Please enter a config file path argument.");
return Ok(());
}
@ -94,6 +104,10 @@ async fn main() -> anyhow::Result<()> {
let metrics_tx = metrics::start();
let metrics_opened_connections = metrics_tx.register_u64("opened_connections".into());
let metrics_closed_connections = metrics_tx.register_u64("closed_connections".into());
let (account_write_queue_sender, slot_queue_sender, fill_receiver) =
fill_event_filter::init(
config.markets.clone(),
@ -148,6 +162,8 @@ async fn main() -> anyhow::Result<()> {
peers.clone(),
stream,
addr,
metrics_opened_connections.clone(),
metrics_closed_connections.clone(),
));
}
});