From 67b3583ba6efa1bce023e54ea23347a763d6c5a1 Mon Sep 17 00:00:00 2001 From: dboures Date: Thu, 8 Jun 2023 02:47:05 -0500 Subject: [PATCH] refactor: worker uses a transactions table --- src/backfill-trades/main.rs | 87 ++---------- src/database/insert.rs | 6 +- src/structs/openbook.rs | 9 +- src/worker/main.rs | 41 +++--- src/worker/metrics/mod.rs | 10 +- src/worker/trade_fetching/parsing.rs | 2 +- src/worker/trade_fetching/scrape.rs | 196 +++++++++++++-------------- 7 files changed, 146 insertions(+), 205 deletions(-) diff --git a/src/backfill-trades/main.rs b/src/backfill-trades/main.rs index ec515d6..c0a73b5 100644 --- a/src/backfill-trades/main.rs +++ b/src/backfill-trades/main.rs @@ -1,28 +1,24 @@ use anchor_lang::prelude::Pubkey; use chrono::{DateTime, Duration, NaiveDateTime, Utc}; use deadpool_postgres::Pool; -use futures::future::join_all; use log::debug; use openbook_candles::{ database::{ - fetch::fetch_worker_transactions, initialize::{connect_to_database, setup_database}, - insert::{add_fills_atomically, build_transactions_insert_statement}, + insert::build_transactions_insert_statement, }, structs::{ markets::{fetch_market_infos, load_markets}, transaction::{PgTransaction, NUM_TRANSACTION_PARTITIONS}, }, utils::{AnyhowWrap, Config, OPENBOOK_KEY}, - worker::trade_fetching::parsing::parse_trades_from_openbook_txns, + worker::trade_fetching::scrape::scrape_fills, }; use solana_client::{ nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, - rpc_config::RpcTransactionConfig }; use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature}; -use solana_transaction_status::UiTransactionEncoding; -use std::{collections::HashMap, env, str::FromStr,time::Duration as WaitDuration }; +use std::{collections::HashMap, env, str::FromStr}; #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() -> anyhow::Result<()> { @@ -54,15 +50,18 @@ async fn main() -> anyhow::Result<()> { let rpc_clone = rpc_url.clone(); let pool_clone = pool.clone(); handles.push(tokio::spawn(async move { - fetch_signatures(rpc_clone, &pool_clone, num_days).await.unwrap(); + fetch_signatures(rpc_clone, &pool_clone, num_days) + .await + .unwrap(); })); + // Low priority improvement: batch fills into 1000's per worker for id in 0..NUM_TRANSACTION_PARTITIONS { let rpc_clone = rpc_url.clone(); let pool_clone = pool.clone(); let markets_clone = target_markets.clone(); handles.push(tokio::spawn(async move { - backfill(id as i32, rpc_clone, &pool_clone, &markets_clone) + scrape_fills(id as i32, rpc_clone, &pool_clone, &markets_clone) .await .unwrap(); })); @@ -89,10 +88,7 @@ pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> an }; let sigs = match rpc_client - .get_signatures_for_address_with_config( - &OPENBOOK_KEY, - rpc_config, - ) + .get_signatures_for_address_with_config(&OPENBOOK_KEY, rpc_config) .await { Ok(sigs) => sigs, @@ -114,15 +110,15 @@ pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> an .collect::>(); if transactions.is_empty() { - println!("No transactions found, trying again"); + println!("No transactions found, trying again"); } debug!("writing: {:?} txns to DB\n", transactions.len()); let upsert_statement = build_transactions_insert_statement(transactions); - let client = pool.get().await?; - client - .execute(&upsert_statement, &[]) - .await - .map_err_anyhow()?; + let client = pool.get().await?; + client + .execute(&upsert_statement, &[]) + .await + .map_err_anyhow()?; now_time = last_time; before_sig = Some(Signature::from_str(&last_signature)?); @@ -136,59 +132,6 @@ pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> an Ok(()) } -pub async fn backfill( - worker_id: i32, - rpc_url: String, - pool: &Pool, - target_markets: &HashMap, -) -> anyhow::Result<()> { - println!("Worker {} up \n", worker_id); - let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed()); - - loop { - let transactions = fetch_worker_transactions(worker_id, pool).await?; - if transactions.len() == 0 { - println!("No signatures found by worker {}", worker_id); - tokio::time::sleep(WaitDuration::from_secs(1)).await; - continue; - }; - - // for each signature, fetch the transaction - let txn_config = RpcTransactionConfig { - encoding: Some(UiTransactionEncoding::Json), - commitment: Some(CommitmentConfig::confirmed()), - max_supported_transaction_version: Some(0), - }; - - let sig_strings = transactions - .iter() - .map(|t| t.signature.clone()) - .collect::>(); - - let signatures: Vec<_> = transactions - .into_iter() - .map(|t| t.signature.parse::().unwrap()) - .collect(); - - let txn_futs: Vec<_> = signatures - .iter() - .map(|s| rpc_client.get_transaction_with_config(s, txn_config)) - .collect(); - - let mut txns = join_all(txn_futs).await; - - // TODO: batch fills into groups of 1000 - let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets); - - // Write any fills to the database, and mark the transactions as processed - add_fills_atomically(pool, worker_id, fills, sig_strings).await?; - } - - // TODO: graceful shutdown - // println!("Worker {} down \n", worker_id); - Ok(()) -} - fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration { let naive_cur = NaiveDateTime::from_timestamp_millis(current_time * 1000).unwrap(); let naive_bf = NaiveDateTime::from_timestamp_millis(backfill_end * 1000).unwrap(); diff --git a/src/database/insert.rs b/src/database/insert.rs index e7eedaf..6fe01ff 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -1,8 +1,6 @@ use deadpool_postgres::Pool; use log::debug; -use std::{ - collections::{HashMap} -}; +use std::collections::HashMap; use tokio::sync::mpsc::{error::TryRecvError, Receiver}; use crate::{ @@ -10,7 +8,7 @@ use crate::{ utils::{to_timestampz, AnyhowWrap}, }; -pub async fn add_fills_atomically( +pub async fn insert_fills_atomically( pool: &Pool, worker_id: i32, fills: Vec, diff --git a/src/structs/openbook.rs b/src/structs/openbook.rs index 8629ce2..d3c0c0b 100644 --- a/src/structs/openbook.rs +++ b/src/structs/openbook.rs @@ -22,7 +22,12 @@ pub struct OpenBookFillEventRaw { pub referrer_rebate: Option, } impl OpenBookFillEventRaw { - pub fn into_event(self, signature: String, block_time: i64, log_index: usize) -> OpenBookFillEvent { + pub fn into_event( + self, + signature: String, + block_time: i64, + log_index: usize, + ) -> OpenBookFillEvent { OpenBookFillEvent { signature, market: self.market, @@ -62,7 +67,7 @@ pub struct OpenBookFillEvent { pub client_order_id: Option, pub referrer_rebate: Option, pub block_time: i64, - pub log_index: usize + pub log_index: usize, } #[derive(Copy, Clone, Debug, PartialEq)] diff --git a/src/worker/main.rs b/src/worker/main.rs index 77581f9..35b6161 100644 --- a/src/worker/main.rs +++ b/src/worker/main.rs @@ -1,22 +1,18 @@ use log::{error, info}; use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; -use openbook_candles::structs::openbook::OpenBookFillEvent; +use openbook_candles::structs::transaction::NUM_TRANSACTION_PARTITIONS; use openbook_candles::utils::Config; use openbook_candles::worker::metrics::{ - serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE, METRIC_FILLS_QUEUE_LENGTH, + serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE, }; -use openbook_candles::worker::trade_fetching::scrape::scrape; +use openbook_candles::worker::trade_fetching::scrape::{scrape_fills, scrape_signatures}; use openbook_candles::{ - database::{ - initialize::{connect_to_database, setup_database}, - insert::persist_fill_events, - }, + database::initialize::{connect_to_database, setup_database}, worker::candle_batching::batch_for_market, }; use solana_sdk::pubkey::Pubkey; use std::env; use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; -use tokio::sync::mpsc; #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() -> anyhow::Result<()> { @@ -32,8 +28,6 @@ async fn main() -> anyhow::Result<()> { rpc_url: rpc_url.clone(), }; - let fills_queue_max_size = 10000; - let markets = load_markets(path_to_markets_json); let market_infos = fetch_market_infos(&config, markets.clone()).await?; let mut target_markets = HashMap::new(); @@ -46,21 +40,26 @@ async fn main() -> anyhow::Result<()> { setup_database(&pool).await?; let mut handles = vec![]; - let (fill_sender, mut fill_receiver) = mpsc::channel::(fills_queue_max_size); - let scrape_fill_sender = fill_sender.clone(); + // signature scraping + let rpc_clone = rpc_url.clone(); + let pool_clone = pool.clone(); handles.push(tokio::spawn(async move { - scrape(&config, &scrape_fill_sender, &target_markets).await; + scrape_signatures(rpc_clone, &pool_clone).await.unwrap(); })); - let fills_pool = pool.clone(); - handles.push(tokio::spawn(async move { - loop { - persist_fill_events(&fills_pool, &mut fill_receiver) + // transaction/fill scraping + for id in 0..NUM_TRANSACTION_PARTITIONS { + let rpc_clone = rpc_url.clone(); + let pool_clone = pool.clone(); + let markets_clone = target_markets.clone(); + handles.push(tokio::spawn(async move { + scrape_fills(id as i32, rpc_clone, &pool_clone, &markets_clone) .await .unwrap(); - } - })); + })); + } + // candle batching for market in market_infos.into_iter() { let batch_pool = pool.clone(); handles.push(tokio::spawn(async move { @@ -70,7 +69,6 @@ async fn main() -> anyhow::Result<()> { } let monitor_pool = pool.clone(); - let monitor_fill_channel = fill_sender.clone(); handles.push(tokio::spawn(async move { // TODO: maybe break this out into a new function loop { @@ -78,9 +76,6 @@ async fn main() -> anyhow::Result<()> { METRIC_DB_POOL_AVAILABLE.set(pool_status.available as i64); METRIC_DB_POOL_SIZE.set(pool_status.size as i64); - METRIC_FILLS_QUEUE_LENGTH - .set((fills_queue_max_size - monitor_fill_channel.capacity()) as i64); - tokio::time::sleep(WaitDuration::from_secs(10)).await; } })); diff --git a/src/worker/metrics/mod.rs b/src/worker/metrics/mod.rs index 09488f3..7f76add 100644 --- a/src/worker/metrics/mod.rs +++ b/src/worker/metrics/mod.rs @@ -2,8 +2,8 @@ use actix_web::{dev::Server, http::StatusCode, App, HttpServer}; use actix_web_prom::PrometheusMetricsBuilder; use lazy_static::lazy_static; use prometheus::{ - register_int_counter_vec_with_registry, register_int_gauge_with_registry, IntCounterVec, - IntGauge, Registry, + register_int_counter_vec_with_registry, register_int_counter_with_registry, + register_int_gauge_with_registry, IntCounter, IntCounterVec, IntGauge, Registry, }; lazy_static! { @@ -30,9 +30,9 @@ lazy_static! { METRIC_REGISTRY ) .unwrap(); - pub static ref METRIC_FILLS_QUEUE_LENGTH: IntGauge = register_int_gauge_with_registry!( - "fills_queue_length", - "Current length of the fills write queue", + pub static ref METRIC_TRANSACTIONS_TOTAL: IntCounter = register_int_counter_with_registry!( + "transactions_total", + "Total number of transaction signatures scraped", METRIC_REGISTRY ) .unwrap(); diff --git a/src/worker/trade_fetching/parsing.rs b/src/worker/trade_fetching/parsing.rs index 6add236..d5d1491 100644 --- a/src/worker/trade_fetching/parsing.rs +++ b/src/worker/trade_fetching/parsing.rs @@ -15,7 +15,7 @@ const PROGRAM_DATA: &str = "Program data: "; pub fn parse_trades_from_openbook_txns( txns: &mut Vec>, - sig_strings: &Vec, + sig_strings: &Vec, target_markets: &HashMap, ) -> Vec { let mut fills_vector = Vec::::new(); diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs index 7957bb8..fa1fd10 100644 --- a/src/worker/trade_fetching/scrape.rs +++ b/src/worker/trade_fetching/scrape.rs @@ -1,5 +1,6 @@ +use deadpool_postgres::Pool; use futures::future::join_all; -use log::{debug, warn}; +use log::{debug, info, warn}; use solana_client::{ nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, rpc_config::RpcTransactionConfig, @@ -10,112 +11,111 @@ use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; use tokio::sync::mpsc::Sender; use crate::{ - structs::openbook::OpenBookFillEvent, - utils::{Config, OPENBOOK_KEY}, - worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL}, + database::{ + fetch::fetch_worker_transactions, + insert::{build_transactions_insert_statement, insert_fills_atomically}, + }, + structs::{openbook::OpenBookFillEvent, transaction::PgTransaction}, + utils::{AnyhowWrap, Config, OPENBOOK_KEY}, + worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL, METRIC_TRANSACTIONS_TOTAL}, }; use super::parsing::parse_trades_from_openbook_txns; -pub async fn scrape( - config: &Config, - fill_sender: &Sender, - target_markets: &HashMap, -) { - let rpc_client = - RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::processed()); - let before_slot = None; +pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<()> { + let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed()); + loop { - scrape_transactions( - &rpc_client, - before_slot, - Some(150), - fill_sender, - target_markets, - ) - .await; - tokio::time::sleep(WaitDuration::from_millis(250)).await; - } -} + let rpc_config = GetConfirmedSignaturesForAddress2Config { + before: None, + until: None, + limit: None, + commitment: Some(CommitmentConfig::confirmed()), + }; -pub async fn scrape_transactions( - rpc_client: &RpcClient, - before_sig: Option, - limit: Option, - fill_sender: &Sender, - target_markets: &HashMap, -) -> Option { - let rpc_config = GetConfirmedSignaturesForAddress2Config { - before: before_sig, - until: None, - limit, - commitment: Some(CommitmentConfig::confirmed()), - }; - - let mut sigs = match rpc_client - .get_signatures_for_address_with_config( - &OPENBOOK_KEY, - rpc_config, - ) - .await - { - Ok(s) => s, - Err(e) => { - warn!("rpc error in get_signatures_for_address_with_config: {}", e); - METRIC_RPC_ERRORS_TOTAL - .with_label_values(&["getSignaturesForAddress"]) - .inc(); - return before_sig; - } - }; - - if sigs.is_empty() { - debug!("No signatures found"); - return before_sig; - } - - let last = sigs.last().unwrap(); - let request_last_sig = Signature::from_str(&last.signature).unwrap(); - - sigs.retain(|sig| sig.err.is_none()); - if sigs.last().is_none() { - return Some(request_last_sig); - } - - let txn_config = RpcTransactionConfig { - encoding: Some(UiTransactionEncoding::Json), - commitment: Some(CommitmentConfig::confirmed()), - max_supported_transaction_version: Some(0), - }; - - let sig_strings = sigs - .iter() - .map(|t| t.signature.clone()) - .collect::>(); - - let signatures: Vec<_> = sigs - .into_iter() - .map(|sig| sig.signature.parse::().unwrap()) - .collect(); - - let txn_futs: Vec<_> = signatures - .iter() - .map(|s| rpc_client.get_transaction_with_config(s, txn_config)) - .collect(); - - let mut txns = join_all(txn_futs).await; - - let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets); - if !fills.is_empty() { - for fill in fills.into_iter() { - let market_name = target_markets.get(&fill.market).unwrap(); - if let Err(_) = fill_sender.send(fill).await { - panic!("receiver dropped"); + let sigs = match rpc_client + .get_signatures_for_address_with_config(&OPENBOOK_KEY, rpc_config) + .await + { + Ok(sigs) => sigs, + Err(e) => { + warn!("rpc error in get_signatures_for_address_with_config: {}", e); + METRIC_RPC_ERRORS_TOTAL + .with_label_values(&["getSignaturesForAddress"]) + .inc(); + continue; } - METRIC_FILLS_TOTAL.with_label_values(&[market_name]).inc(); + }; + if sigs.is_empty() { + debug!("No signatures found, trying again"); + continue; } + let transactions: Vec = sigs + .into_iter() + .map(|s| PgTransaction::from_rpc_confirmed_transaction(s)) + .collect(); + + debug!("Scraper writing: {:?} txns to DB\n", transactions.len()); + let upsert_statement = build_transactions_insert_statement(transactions); + let client = pool.get().await?; + let num_txns = client + .execute(&upsert_statement, &[]) + .await + .map_err_anyhow()?; + METRIC_TRANSACTIONS_TOTAL.inc_by(num_txns); + } + // TODO: graceful shutdown + Ok(()) +} + +pub async fn scrape_fills( + worker_id: i32, + rpc_url: String, + pool: &Pool, + target_markets: &HashMap, +) -> anyhow::Result<()> { + debug!("Worker {} started \n", worker_id); + let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed()); + + loop { + let transactions = fetch_worker_transactions(worker_id, pool).await?; + if transactions.len() == 0 { + debug!("No signatures found by worker {}", worker_id); + tokio::time::sleep(WaitDuration::from_secs(1)).await; + continue; + }; + + // for each signature, fetch the transaction + let txn_config = RpcTransactionConfig { + encoding: Some(UiTransactionEncoding::Json), + commitment: Some(CommitmentConfig::confirmed()), + max_supported_transaction_version: Some(0), + }; + + let sig_strings = transactions + .iter() + .map(|t| t.signature.clone()) + .collect::>(); + + let signatures: Vec<_> = transactions + .into_iter() + .map(|t| t.signature.parse::().unwrap()) + .collect(); + + let txn_futs: Vec<_> = signatures + .iter() + .map(|s| rpc_client.get_transaction_with_config(s, txn_config)) + .collect(); + + let mut txns = join_all(txn_futs).await; + + // TODO: reenable total fills metric + let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets); + + // Write any fills to the database, and update the transactions as processed + insert_fills_atomically(pool, worker_id, fills, sig_strings).await?; } - Some(request_last_sig) + Ok(()) }