diff --git a/src/worker/trade_fetching/parsing.rs b/src/worker/trade_fetching/parsing.rs index d5d1491..0ead868 100644 --- a/src/worker/trade_fetching/parsing.rs +++ b/src/worker/trade_fetching/parsing.rs @@ -15,10 +15,11 @@ const PROGRAM_DATA: &str = "Program data: "; pub fn parse_trades_from_openbook_txns( txns: &mut Vec>, - sig_strings: &Vec, + mut sig_strings: Vec, target_markets: &HashMap, -) -> Vec { +) -> (Vec, Vec) { let mut fills_vector = Vec::::new(); + let mut failed_sigs = vec![]; for (idx, txn) in txns.iter_mut().enumerate() { match txn { Ok(t) => { @@ -42,13 +43,15 @@ pub fn parse_trades_from_openbook_txns( } Err(e) => { warn!("rpc error in get_transaction {}", e); + failed_sigs.push(sig_strings[idx].clone()); METRIC_RPC_ERRORS_TOTAL .with_label_values(&["getTransaction"]) .inc(); } } } - fills_vector + sig_strings.retain(|s| !failed_sigs.contains(&s)); + (fills_vector, sig_strings) } fn parse_openbook_fills_from_logs( diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs index fa1fd10..9f79a1e 100644 --- a/src/worker/trade_fetching/scrape.rs +++ b/src/worker/trade_fetching/scrape.rs @@ -111,10 +111,10 @@ pub async fn scrape_fills( let mut txns = join_all(txn_futs).await; // TODO: reenable total fills metric - let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets); + let (fills, completed_sigs) = parse_trades_from_openbook_txns(&mut txns, sig_strings, target_markets); - // Write any fills to the database, and update the transactions as processed - insert_fills_atomically(pool, worker_id, fills, sig_strings).await?; + // Write fills to the database, and update properly fetched transactions as processed + insert_fills_atomically(pool, worker_id, fills, completed_sigs).await?; } Ok(())