use crate::postgres_config::PostgresConfig; use chrono::{TimeZone, Utc}; use log::*; use mango_feeds_connector::metrics::{MetricType, MetricU64, Metrics}; use postgres_query::Caching; use service_mango_fills::*; use services_mango_lib::postgres_configuration::PostgresConfiguration; use std::{ sync::{ atomic::{AtomicBool, Ordering}, Arc, }, time::Duration, }; use tokio_postgres::Client; async fn postgres_connection( config: &PostgresConfig, metric_retries: MetricU64, metric_live: MetricU64, exit: Arc, ) -> anyhow::Result>> { let (tx, rx) = async_channel::unbounded(); let config = config.clone(); let lib_config = PostgresConfiguration { connection_string: config.connection_string.clone(), allow_invalid_certs: config.allow_invalid_certs, tls: config.tls.clone(), }; let mut initial = Some(services_mango_lib::postgres_connection::connect(&lib_config).await?); let mut metric_retries = metric_retries; let mut metric_live = metric_live; tokio::spawn(async move { loop { // don't acquire a new connection if we're shutting down if exit.load(Ordering::Relaxed) { warn!("shutting down fill_event_postgres_target..."); break; } let (client, connection) = match initial.take() { Some(v) => v, None => { let result = services_mango_lib::postgres_connection::connect(&lib_config).await; match result { Ok(v) => v, Err(err) => { warn!("could not connect to postgres: {:?}", err); tokio::time::sleep(Duration::from_secs( config.retry_connection_sleep_secs, )) .await; continue; } } } }; tx.send(Some(client)).await.expect("send success"); metric_live.increment(); let result = connection.await; metric_retries.increment(); metric_live.decrement(); tx.send(None).await.expect("send success"); warn!("postgres connection error: {:?}", result); tokio::time::sleep(Duration::from_secs(config.retry_connection_sleep_secs)).await; } }); Ok(rx) } async fn update_postgres_client<'a>( client: &'a mut Option>, rx: &async_channel::Receiver>, config: &PostgresConfig, ) -> &'a postgres_query::Caching { // get the most recent client, waiting if there's a disconnect while !rx.is_empty() || client.is_none() { tokio::select! { client_raw_opt = rx.recv() => { *client = client_raw_opt.expect("not closed").map(postgres_query::Caching::new); }, _ = tokio::time::sleep(Duration::from_secs(config.fatal_connection_timeout_secs)) => { error!("waited too long for new postgres client"); std::process::exit(1); }, } } client.as_ref().expect("must contain value") } async fn process_update(client: &Caching, update: &FillUpdate) -> anyhow::Result<()> { let market = &update.market_key; let seq_num = update.event.seq_num as i64; let fill_timestamp = Utc.timestamp_opt(update.event.timestamp as i64, 0).unwrap(); let price = update.event.price; let quantity = update.event.quantity; let slot = update.slot as i64; let write_version = update.write_version as i64; if update.status == FillUpdateStatus::New { // insert new events let query = postgres_query::query!( "INSERT INTO transactions_v4.perp_fills_feed_events (market, seq_num, fill_timestamp, price, quantity, slot, write_version) VALUES ($market, $seq_num, $fill_timestamp, $price, $quantity, $slot, $write_version) ON CONFLICT (market, seq_num) DO NOTHING", market, seq_num, fill_timestamp, price, quantity, slot, write_version, ); let _ = query.execute(&client).await?; } else { // delete revoked events let query = postgres_query::query!( "DELETE FROM transactions_v4.perp_fills_feed_events WHERE market=$market AND seq_num=$seq_num", market, seq_num, ); let _ = query.execute(&client).await?; } Ok(()) } pub async fn init( config: &PostgresConfig, metrics_sender: Metrics, exit: Arc, ) -> anyhow::Result> { // The actual message may want to also contain a retry count, if it self-reinserts on failure? let (fill_update_queue_sender, fill_update_queue_receiver) = async_channel::bounded::(config.max_queue_size); let metric_con_retries = metrics_sender.register_u64( "fills_postgres_connection_retries".into(), MetricType::Counter, ); let metric_con_live = metrics_sender.register_u64("fills_postgres_connections_alive".into(), MetricType::Gauge); // postgres fill update sending worker threads for _ in 0..config.connection_count { let postgres_account_writes = postgres_connection( config, metric_con_retries.clone(), metric_con_live.clone(), exit.clone(), ) .await?; let fill_update_queue_receiver_c = fill_update_queue_receiver.clone(); let config = config.clone(); let mut metric_retries = metrics_sender.register_u64("fills_postgres_retries".into(), MetricType::Counter); tokio::spawn(async move { let mut client_opt = None; loop { // Retrieve up to batch_size updates let mut batch = Vec::new(); batch.push( fill_update_queue_receiver_c .recv() .await .expect("sender must stay alive"), ); while batch.len() < config.max_batch_size { match fill_update_queue_receiver_c.try_recv() { Ok(update) => batch.push(update), Err(async_channel::TryRecvError::Empty) => break, Err(async_channel::TryRecvError::Closed) => { panic!("sender must stay alive") } }; } info!( "updates, batch {}, channel size {}", batch.len(), fill_update_queue_receiver_c.len(), ); let mut error_count = 0; loop { let client = update_postgres_client(&mut client_opt, &postgres_account_writes, &config) .await; let mut results = futures::future::join_all( batch.iter().map(|update| process_update(client, update)), ) .await; let mut iter = results.iter(); batch.retain(|_| iter.next().unwrap().is_err()); if !batch.is_empty() { metric_retries.add(batch.len() as u64); error_count += 1; if error_count - 1 < config.retry_query_max_count { results.retain(|r| r.is_err()); warn!("failed to process fill update, retrying: {:?}", results); tokio::time::sleep(Duration::from_secs(config.retry_query_sleep_secs)) .await; continue; } else { error!("failed to process account write, exiting"); std::process::exit(1); } }; break; } } }); } Ok(fill_update_queue_sender) }