refactor: worker uses a transactions table

This commit is contained in:
dboures 2023-06-08 02:47:05 -05:00
parent dc1726af43
commit 67b3583ba6
No known key found for this signature in database
GPG Key ID: AB3790129D478852
7 changed files with 146 additions and 205 deletions

View File

@ -1,28 +1,24 @@
use anchor_lang::prelude::Pubkey; use anchor_lang::prelude::Pubkey;
use chrono::{DateTime, Duration, NaiveDateTime, Utc}; use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use deadpool_postgres::Pool; use deadpool_postgres::Pool;
use futures::future::join_all;
use log::debug; use log::debug;
use openbook_candles::{ use openbook_candles::{
database::{ database::{
fetch::fetch_worker_transactions,
initialize::{connect_to_database, setup_database}, initialize::{connect_to_database, setup_database},
insert::{add_fills_atomically, build_transactions_insert_statement}, insert::build_transactions_insert_statement,
}, },
structs::{ structs::{
markets::{fetch_market_infos, load_markets}, markets::{fetch_market_infos, load_markets},
transaction::{PgTransaction, NUM_TRANSACTION_PARTITIONS}, transaction::{PgTransaction, NUM_TRANSACTION_PARTITIONS},
}, },
utils::{AnyhowWrap, Config, OPENBOOK_KEY}, utils::{AnyhowWrap, Config, OPENBOOK_KEY},
worker::trade_fetching::parsing::parse_trades_from_openbook_txns, worker::trade_fetching::scrape::scrape_fills,
}; };
use solana_client::{ use solana_client::{
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
rpc_config::RpcTransactionConfig
}; };
use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature}; use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature};
use solana_transaction_status::UiTransactionEncoding; use std::{collections::HashMap, env, str::FromStr};
use std::{collections::HashMap, env, str::FromStr,time::Duration as WaitDuration };
#[tokio::main(flavor = "multi_thread", worker_threads = 10)] #[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
@ -54,15 +50,18 @@ async fn main() -> anyhow::Result<()> {
let rpc_clone = rpc_url.clone(); let rpc_clone = rpc_url.clone();
let pool_clone = pool.clone(); let pool_clone = pool.clone();
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
fetch_signatures(rpc_clone, &pool_clone, num_days).await.unwrap(); fetch_signatures(rpc_clone, &pool_clone, num_days)
.await
.unwrap();
})); }));
// Low priority improvement: batch fills into 1000's per worker
for id in 0..NUM_TRANSACTION_PARTITIONS { for id in 0..NUM_TRANSACTION_PARTITIONS {
let rpc_clone = rpc_url.clone(); let rpc_clone = rpc_url.clone();
let pool_clone = pool.clone(); let pool_clone = pool.clone();
let markets_clone = target_markets.clone(); let markets_clone = target_markets.clone();
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
backfill(id as i32, rpc_clone, &pool_clone, &markets_clone) scrape_fills(id as i32, rpc_clone, &pool_clone, &markets_clone)
.await .await
.unwrap(); .unwrap();
})); }));
@ -89,10 +88,7 @@ pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> an
}; };
let sigs = match rpc_client let sigs = match rpc_client
.get_signatures_for_address_with_config( .get_signatures_for_address_with_config(&OPENBOOK_KEY, rpc_config)
&OPENBOOK_KEY,
rpc_config,
)
.await .await
{ {
Ok(sigs) => sigs, Ok(sigs) => sigs,
@ -136,59 +132,6 @@ pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> an
Ok(()) Ok(())
} }
pub async fn backfill(
worker_id: i32,
rpc_url: String,
pool: &Pool,
target_markets: &HashMap<Pubkey, String>,
) -> anyhow::Result<()> {
println!("Worker {} up \n", worker_id);
let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed());
loop {
let transactions = fetch_worker_transactions(worker_id, pool).await?;
if transactions.len() == 0 {
println!("No signatures found by worker {}", worker_id);
tokio::time::sleep(WaitDuration::from_secs(1)).await;
continue;
};
// for each signature, fetch the transaction
let txn_config = RpcTransactionConfig {
encoding: Some(UiTransactionEncoding::Json),
commitment: Some(CommitmentConfig::confirmed()),
max_supported_transaction_version: Some(0),
};
let sig_strings = transactions
.iter()
.map(|t| t.signature.clone())
.collect::<Vec<String>>();
let signatures: Vec<_> = transactions
.into_iter()
.map(|t| t.signature.parse::<Signature>().unwrap())
.collect();
let txn_futs: Vec<_> = signatures
.iter()
.map(|s| rpc_client.get_transaction_with_config(s, txn_config))
.collect();
let mut txns = join_all(txn_futs).await;
// TODO: batch fills into groups of 1000
let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets);
// Write any fills to the database, and mark the transactions as processed
add_fills_atomically(pool, worker_id, fills, sig_strings).await?;
}
// TODO: graceful shutdown
// println!("Worker {} down \n", worker_id);
Ok(())
}
fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration { fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration {
let naive_cur = NaiveDateTime::from_timestamp_millis(current_time * 1000).unwrap(); let naive_cur = NaiveDateTime::from_timestamp_millis(current_time * 1000).unwrap();
let naive_bf = NaiveDateTime::from_timestamp_millis(backfill_end * 1000).unwrap(); let naive_bf = NaiveDateTime::from_timestamp_millis(backfill_end * 1000).unwrap();

View File

@ -1,8 +1,6 @@
use deadpool_postgres::Pool; use deadpool_postgres::Pool;
use log::debug; use log::debug;
use std::{ use std::collections::HashMap;
collections::{HashMap}
};
use tokio::sync::mpsc::{error::TryRecvError, Receiver}; use tokio::sync::mpsc::{error::TryRecvError, Receiver};
use crate::{ use crate::{
@ -10,7 +8,7 @@ use crate::{
utils::{to_timestampz, AnyhowWrap}, utils::{to_timestampz, AnyhowWrap},
}; };
pub async fn add_fills_atomically( pub async fn insert_fills_atomically(
pool: &Pool, pool: &Pool,
worker_id: i32, worker_id: i32,
fills: Vec<OpenBookFillEvent>, fills: Vec<OpenBookFillEvent>,

View File

@ -22,7 +22,12 @@ pub struct OpenBookFillEventRaw {
pub referrer_rebate: Option<u64>, pub referrer_rebate: Option<u64>,
} }
impl OpenBookFillEventRaw { impl OpenBookFillEventRaw {
pub fn into_event(self, signature: String, block_time: i64, log_index: usize) -> OpenBookFillEvent { pub fn into_event(
self,
signature: String,
block_time: i64,
log_index: usize,
) -> OpenBookFillEvent {
OpenBookFillEvent { OpenBookFillEvent {
signature, signature,
market: self.market, market: self.market,
@ -62,7 +67,7 @@ pub struct OpenBookFillEvent {
pub client_order_id: Option<u64>, pub client_order_id: Option<u64>,
pub referrer_rebate: Option<u64>, pub referrer_rebate: Option<u64>,
pub block_time: i64, pub block_time: i64,
pub log_index: usize pub log_index: usize,
} }
#[derive(Copy, Clone, Debug, PartialEq)] #[derive(Copy, Clone, Debug, PartialEq)]

View File

@ -1,22 +1,18 @@
use log::{error, info}; use log::{error, info};
use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
use openbook_candles::structs::openbook::OpenBookFillEvent; use openbook_candles::structs::transaction::NUM_TRANSACTION_PARTITIONS;
use openbook_candles::utils::Config; use openbook_candles::utils::Config;
use openbook_candles::worker::metrics::{ use openbook_candles::worker::metrics::{
serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE, METRIC_FILLS_QUEUE_LENGTH, serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE,
}; };
use openbook_candles::worker::trade_fetching::scrape::scrape; use openbook_candles::worker::trade_fetching::scrape::{scrape_fills, scrape_signatures};
use openbook_candles::{ use openbook_candles::{
database::{ database::initialize::{connect_to_database, setup_database},
initialize::{connect_to_database, setup_database},
insert::persist_fill_events,
},
worker::candle_batching::batch_for_market, worker::candle_batching::batch_for_market,
}; };
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std::env; use std::env;
use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
use tokio::sync::mpsc;
#[tokio::main(flavor = "multi_thread", worker_threads = 10)] #[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
@ -32,8 +28,6 @@ async fn main() -> anyhow::Result<()> {
rpc_url: rpc_url.clone(), rpc_url: rpc_url.clone(),
}; };
let fills_queue_max_size = 10000;
let markets = load_markets(path_to_markets_json); let markets = load_markets(path_to_markets_json);
let market_infos = fetch_market_infos(&config, markets.clone()).await?; let market_infos = fetch_market_infos(&config, markets.clone()).await?;
let mut target_markets = HashMap::new(); let mut target_markets = HashMap::new();
@ -46,21 +40,26 @@ async fn main() -> anyhow::Result<()> {
setup_database(&pool).await?; setup_database(&pool).await?;
let mut handles = vec![]; let mut handles = vec![];
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(fills_queue_max_size); // signature scraping
let scrape_fill_sender = fill_sender.clone(); let rpc_clone = rpc_url.clone();
let pool_clone = pool.clone();
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
scrape(&config, &scrape_fill_sender, &target_markets).await; scrape_signatures(rpc_clone, &pool_clone).await.unwrap();
})); }));
let fills_pool = pool.clone(); // transaction/fill scraping
for id in 0..NUM_TRANSACTION_PARTITIONS {
let rpc_clone = rpc_url.clone();
let pool_clone = pool.clone();
let markets_clone = target_markets.clone();
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
loop { scrape_fills(id as i32, rpc_clone, &pool_clone, &markets_clone)
persist_fill_events(&fills_pool, &mut fill_receiver)
.await .await
.unwrap(); .unwrap();
}
})); }));
}
// candle batching
for market in market_infos.into_iter() { for market in market_infos.into_iter() {
let batch_pool = pool.clone(); let batch_pool = pool.clone();
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
@ -70,7 +69,6 @@ async fn main() -> anyhow::Result<()> {
} }
let monitor_pool = pool.clone(); let monitor_pool = pool.clone();
let monitor_fill_channel = fill_sender.clone();
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
// TODO: maybe break this out into a new function // TODO: maybe break this out into a new function
loop { loop {
@ -78,9 +76,6 @@ async fn main() -> anyhow::Result<()> {
METRIC_DB_POOL_AVAILABLE.set(pool_status.available as i64); METRIC_DB_POOL_AVAILABLE.set(pool_status.available as i64);
METRIC_DB_POOL_SIZE.set(pool_status.size as i64); METRIC_DB_POOL_SIZE.set(pool_status.size as i64);
METRIC_FILLS_QUEUE_LENGTH
.set((fills_queue_max_size - monitor_fill_channel.capacity()) as i64);
tokio::time::sleep(WaitDuration::from_secs(10)).await; tokio::time::sleep(WaitDuration::from_secs(10)).await;
} }
})); }));

View File

@ -2,8 +2,8 @@ use actix_web::{dev::Server, http::StatusCode, App, HttpServer};
use actix_web_prom::PrometheusMetricsBuilder; use actix_web_prom::PrometheusMetricsBuilder;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use prometheus::{ use prometheus::{
register_int_counter_vec_with_registry, register_int_gauge_with_registry, IntCounterVec, register_int_counter_vec_with_registry, register_int_counter_with_registry,
IntGauge, Registry, register_int_gauge_with_registry, IntCounter, IntCounterVec, IntGauge, Registry,
}; };
lazy_static! { lazy_static! {
@ -30,9 +30,9 @@ lazy_static! {
METRIC_REGISTRY METRIC_REGISTRY
) )
.unwrap(); .unwrap();
pub static ref METRIC_FILLS_QUEUE_LENGTH: IntGauge = register_int_gauge_with_registry!( pub static ref METRIC_TRANSACTIONS_TOTAL: IntCounter = register_int_counter_with_registry!(
"fills_queue_length", "transactions_total",
"Current length of the fills write queue", "Total number of transaction signatures scraped",
METRIC_REGISTRY METRIC_REGISTRY
) )
.unwrap(); .unwrap();

View File

@ -1,5 +1,6 @@
use deadpool_postgres::Pool;
use futures::future::join_all; use futures::future::join_all;
use log::{debug, warn}; use log::{debug, info, warn};
use solana_client::{ use solana_client::{
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
rpc_config::RpcTransactionConfig, rpc_config::RpcTransactionConfig,
@ -10,93 +11,96 @@ use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use crate::{ use crate::{
structs::openbook::OpenBookFillEvent, database::{
utils::{Config, OPENBOOK_KEY}, fetch::fetch_worker_transactions,
worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL}, insert::{build_transactions_insert_statement, insert_fills_atomically},
},
structs::{openbook::OpenBookFillEvent, transaction::PgTransaction},
utils::{AnyhowWrap, Config, OPENBOOK_KEY},
worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL, METRIC_TRANSACTIONS_TOTAL},
}; };
use super::parsing::parse_trades_from_openbook_txns; use super::parsing::parse_trades_from_openbook_txns;
pub async fn scrape(
config: &Config,
fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, String>,
) {
let rpc_client =
RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::processed());
let before_slot = None; pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<()> {
let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
loop { loop {
scrape_transactions(
&rpc_client,
before_slot,
Some(150),
fill_sender,
target_markets,
)
.await;
tokio::time::sleep(WaitDuration::from_millis(250)).await;
}
}
pub async fn scrape_transactions(
rpc_client: &RpcClient,
before_sig: Option<Signature>,
limit: Option<usize>,
fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, String>,
) -> Option<Signature> {
let rpc_config = GetConfirmedSignaturesForAddress2Config { let rpc_config = GetConfirmedSignaturesForAddress2Config {
before: before_sig, before: None,
until: None, until: None,
limit, limit: None,
commitment: Some(CommitmentConfig::confirmed()), commitment: Some(CommitmentConfig::confirmed()),
}; };
let mut sigs = match rpc_client let sigs = match rpc_client
.get_signatures_for_address_with_config( .get_signatures_for_address_with_config(&OPENBOOK_KEY, rpc_config)
&OPENBOOK_KEY,
rpc_config,
)
.await .await
{ {
Ok(s) => s, Ok(sigs) => sigs,
Err(e) => { Err(e) => {
warn!("rpc error in get_signatures_for_address_with_config: {}", e); warn!("rpc error in get_signatures_for_address_with_config: {}", e);
METRIC_RPC_ERRORS_TOTAL METRIC_RPC_ERRORS_TOTAL
.with_label_values(&["getSignaturesForAddress"]) .with_label_values(&["getSignaturesForAddress"])
.inc(); .inc();
return before_sig; continue;
} }
}; };
if sigs.is_empty() { if sigs.is_empty() {
debug!("No signatures found"); debug!("No signatures found, trying again");
return before_sig; continue;
}
let transactions: Vec<PgTransaction> = sigs
.into_iter()
.map(|s| PgTransaction::from_rpc_confirmed_transaction(s))
.collect();
debug!("Scraper writing: {:?} txns to DB\n", transactions.len());
let upsert_statement = build_transactions_insert_statement(transactions);
let client = pool.get().await?;
let num_txns = client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()?;
METRIC_TRANSACTIONS_TOTAL.inc_by(num_txns);
}
// TODO: graceful shutdown
Ok(())
} }
let last = sigs.last().unwrap(); pub async fn scrape_fills(
let request_last_sig = Signature::from_str(&last.signature).unwrap(); worker_id: i32,
rpc_url: String,
pool: &Pool,
target_markets: &HashMap<Pubkey, String>,
) -> anyhow::Result<()> {
debug!("Worker {} started \n", worker_id);
let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed());
sigs.retain(|sig| sig.err.is_none()); loop {
if sigs.last().is_none() { let transactions = fetch_worker_transactions(worker_id, pool).await?;
return Some(request_last_sig); if transactions.len() == 0 {
} debug!("No signatures found by worker {}", worker_id);
tokio::time::sleep(WaitDuration::from_secs(1)).await;
continue;
};
// for each signature, fetch the transaction
let txn_config = RpcTransactionConfig { let txn_config = RpcTransactionConfig {
encoding: Some(UiTransactionEncoding::Json), encoding: Some(UiTransactionEncoding::Json),
commitment: Some(CommitmentConfig::confirmed()), commitment: Some(CommitmentConfig::confirmed()),
max_supported_transaction_version: Some(0), max_supported_transaction_version: Some(0),
}; };
let sig_strings = sigs let sig_strings = transactions
.iter() .iter()
.map(|t| t.signature.clone()) .map(|t| t.signature.clone())
.collect::<Vec<String>>(); .collect::<Vec<String>>();
let signatures: Vec<_> = sigs let signatures: Vec<_> = transactions
.into_iter() .into_iter()
.map(|sig| sig.signature.parse::<Signature>().unwrap()) .map(|t| t.signature.parse::<Signature>().unwrap())
.collect(); .collect();
let txn_futs: Vec<_> = signatures let txn_futs: Vec<_> = signatures
@ -106,16 +110,12 @@ pub async fn scrape_transactions(
let mut txns = join_all(txn_futs).await; let mut txns = join_all(txn_futs).await;
// TODO: reenable total fills metric
let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets); let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets);
if !fills.is_empty() {
for fill in fills.into_iter() { // Write any fills to the database, and update the transactions as processed
let market_name = target_markets.get(&fill.market).unwrap(); insert_fills_atomically(pool, worker_id, fills, sig_strings).await?;
if let Err(_) = fill_sender.send(fill).await {
panic!("receiver dropped");
}
METRIC_FILLS_TOTAL.with_label_values(&[market_name]).inc();
}
} }
Some(request_last_sig) Ok(())
} }