Merge remote-tracking branch 'upstream/main'

This commit is contained in:
Riordan Panayides 2023-06-30 19:33:21 +01:00
commit 252cccf74e
7 changed files with 30 additions and 101 deletions

2
.gitignore vendored
View File

@ -1,2 +1,4 @@
/target /target
.env .env
*.cer*
*.pks*

View File

@ -102,11 +102,11 @@ pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> an
continue; continue;
} }
let last = sigs.last().unwrap(); let last = sigs.last().unwrap();
let last_time = last.block_time.unwrap().clone(); let last_time = last.block_time.unwrap();
let last_signature = last.signature.clone(); let last_signature = last.signature.clone();
let transactions = sigs let transactions = sigs
.into_iter() .into_iter()
.map(|s| PgTransaction::from_rpc_confirmed_transaction(s)) .map(PgTransaction::from_rpc_confirmed_transaction)
.collect::<Vec<PgTransaction>>(); .collect::<Vec<PgTransaction>>();
if transactions.is_empty() { if transactions.is_empty() {

View File

@ -1,7 +1,4 @@
use deadpool_postgres::Pool; use deadpool_postgres::Pool;
use log::debug;
use std::collections::HashMap;
use tokio::sync::mpsc::{error::TryRecvError, Receiver};
use crate::{ use crate::{
structs::{candle::Candle, openbook::OpenBookFillEvent, transaction::PgTransaction}, structs::{candle::Candle, openbook::OpenBookFillEvent, transaction::PgTransaction},
@ -19,8 +16,8 @@ pub async fn insert_fills_atomically(
let db_txn = client.build_transaction().start().await?; let db_txn = client.build_transaction().start().await?;
// 1. Insert fills // 1. Insert fills
if fills.len() > 0 { if !fills.is_empty() {
let fills_statement = build_fills_upsert_statement_not_crazy(fills); let fills_statement = build_fills_upsert_statement(fills);
db_txn db_txn
.execute(&fills_statement, &[]) .execute(&fills_statement, &[])
.await .await
@ -42,78 +39,7 @@ pub async fn insert_fills_atomically(
Ok(()) Ok(())
} }
pub async fn persist_fill_events( fn build_fills_upsert_statement(fills: Vec<OpenBookFillEvent>) -> String {
pool: &Pool,
fill_receiver: &mut Receiver<OpenBookFillEvent>,
) -> anyhow::Result<()> {
loop {
let mut write_batch = HashMap::new();
while write_batch.len() < 10 {
match fill_receiver.try_recv() {
Ok(event) => {
write_batch.entry(event).or_insert(0);
}
Err(TryRecvError::Empty) => {
if !write_batch.is_empty() {
break;
} else {
continue;
}
}
Err(TryRecvError::Disconnected) => {
panic!("Fills sender must stay alive")
}
};
}
if !write_batch.is_empty() {
debug!("writing: {:?} events to DB\n", write_batch.len());
let upsert_statement = build_fills_upsert_statement(write_batch);
let client = pool.get().await?;
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()
.unwrap();
}
}
}
#[allow(deprecated)]
fn build_fills_upsert_statement(events: HashMap<OpenBookFillEvent, u8>) -> String {
let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES");
for (idx, event) in events.keys().enumerate() {
let val_str = format!(
"({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})",
event.signature,
to_timestampz(event.block_time as u64).to_rfc3339(),
event.market,
event.open_orders,
event.open_orders_owner,
event.bid,
event.maker,
event.native_qty_paid,
event.native_qty_received,
event.native_fee_or_rebate,
event.fee_tier,
event.order_id,
event.log_index,
);
if idx == 0 {
stmt = format!("{} {}", &stmt, val_str);
} else {
stmt = format!("{}, {}", &stmt, val_str);
}
}
let handle_conflict = "ON CONFLICT DO NOTHING";
stmt = format!("{} {}", stmt, handle_conflict);
stmt
}
fn build_fills_upsert_statement_not_crazy(fills: Vec<OpenBookFillEvent>) -> String {
let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES"); let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES");
for (idx, fill) in fills.iter().enumerate() { for (idx, fill) in fills.iter().enumerate() {
let val_str = format!( let val_str = format!(

View File

@ -55,7 +55,7 @@ async fn main() -> std::io::Result<()> {
.unwrap(); .unwrap();
// For collecting metrics on the public api, excluding 404s // For collecting metrics on the public api, excluding 404s
let public_metrics = PrometheusMetricsBuilder::new("openbook_candles_server") let public_metrics = PrometheusMetricsBuilder::new("openbook_candles_server")
.registry(registry.clone()) .registry(registry)
.exclude_status(StatusCode::NOT_FOUND) .exclude_status(StatusCode::NOT_FOUND)
.build() .build()
.unwrap(); .unwrap();

View File

@ -23,7 +23,7 @@ pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
let market_clone = market.clone(); let market_clone = market.clone();
loop { loop {
sleep(Duration::milliseconds(2000).to_std()?).await; sleep(Duration::milliseconds(5000).to_std()?).await;
match batch_inner(pool, &market_clone).await { match batch_inner(pool, &market_clone).await {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
@ -61,7 +61,7 @@ async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
} }
async fn save_candles(pool: &Pool, candles: Vec<Candle>) -> anyhow::Result<()> { async fn save_candles(pool: &Pool, candles: Vec<Candle>) -> anyhow::Result<()> {
if candles.len() == 0 { if candles.is_empty() {
return Ok(()); return Ok(());
} }
let upsert_statement = build_candles_upsert_statement(&candles); let upsert_statement = build_candles_upsert_statement(&candles);

View File

@ -15,10 +15,11 @@ const PROGRAM_DATA: &str = "Program data: ";
pub fn parse_trades_from_openbook_txns( pub fn parse_trades_from_openbook_txns(
txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>, txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>,
sig_strings: &Vec<String>, mut sig_strings: Vec<String>,
target_markets: &HashMap<Pubkey, String>, target_markets: &HashMap<Pubkey, String>,
) -> Vec<OpenBookFillEvent> { ) -> (Vec<OpenBookFillEvent>, Vec<String>) {
let mut fills_vector = Vec::<OpenBookFillEvent>::new(); let mut fills_vector = Vec::<OpenBookFillEvent>::new();
let mut failed_sigs = vec![];
for (idx, txn) in txns.iter_mut().enumerate() { for (idx, txn) in txns.iter_mut().enumerate() {
match txn { match txn {
Ok(t) => { Ok(t) => {
@ -42,13 +43,15 @@ pub fn parse_trades_from_openbook_txns(
} }
Err(e) => { Err(e) => {
warn!("rpc error in get_transaction {}", e); warn!("rpc error in get_transaction {}", e);
failed_sigs.push(sig_strings[idx].clone());
METRIC_RPC_ERRORS_TOTAL METRIC_RPC_ERRORS_TOTAL
.with_label_values(&["getTransaction"]) .with_label_values(&["getTransaction"])
.inc(); .inc();
} }
} }
} }
fills_vector sig_strings.retain(|s| !failed_sigs.contains(s));
(fills_vector, sig_strings)
} }
fn parse_openbook_fills_from_logs( fn parse_openbook_fills_from_logs(

View File

@ -1,28 +1,26 @@
use deadpool_postgres::Pool; use deadpool_postgres::Pool;
use futures::future::join_all; use futures::future::join_all;
use log::{debug, info, warn}; use log::{debug, warn};
use solana_client::{ use solana_client::{
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
rpc_config::RpcTransactionConfig, rpc_config::RpcTransactionConfig,
}; };
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature}; use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature};
use solana_transaction_status::UiTransactionEncoding; use solana_transaction_status::UiTransactionEncoding;
use std::{collections::HashMap, str::FromStr}; use std::{collections::HashMap, time::Duration as WaitDuration};
use tokio::sync::mpsc::Sender;
use crate::{ use crate::{
database::{ database::{
fetch::fetch_worker_transactions, fetch::fetch_worker_transactions,
insert::{build_transactions_insert_statement, insert_fills_atomically}, insert::{build_transactions_insert_statement, insert_fills_atomically},
}, },
structs::{openbook::OpenBookFillEvent, transaction::PgTransaction}, structs::transaction::PgTransaction,
utils::{AnyhowWrap, Config, OPENBOOK_KEY}, utils::{AnyhowWrap, OPENBOOK_KEY},
worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL, METRIC_TRANSACTIONS_TOTAL}, worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL, METRIC_TRANSACTIONS_TOTAL},
}; };
use super::parsing::parse_trades_from_openbook_txns; use super::parsing::parse_trades_from_openbook_txns;
pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<()> { pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<()> {
let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed()); let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
@ -53,7 +51,7 @@ pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<(
} }
let transactions: Vec<PgTransaction> = sigs let transactions: Vec<PgTransaction> = sigs
.into_iter() .into_iter()
.map(|s| PgTransaction::from_rpc_confirmed_transaction(s)) .map(PgTransaction::from_rpc_confirmed_transaction)
.collect(); .collect();
debug!("Scraper writing: {:?} txns to DB\n", transactions.len()); debug!("Scraper writing: {:?} txns to DB\n", transactions.len());
@ -66,7 +64,6 @@ pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<(
METRIC_TRANSACTIONS_TOTAL.inc_by(num_txns); METRIC_TRANSACTIONS_TOTAL.inc_by(num_txns);
} }
// TODO: graceful shutdown // TODO: graceful shutdown
Ok(())
} }
pub async fn scrape_fills( pub async fn scrape_fills(
@ -80,7 +77,7 @@ pub async fn scrape_fills(
loop { loop {
let transactions = fetch_worker_transactions(worker_id, pool).await?; let transactions = fetch_worker_transactions(worker_id, pool).await?;
if transactions.len() == 0 { if transactions.is_empty() {
debug!("No signatures found by worker {}", worker_id); debug!("No signatures found by worker {}", worker_id);
tokio::time::sleep(WaitDuration::from_secs(1)).await; tokio::time::sleep(WaitDuration::from_secs(1)).await;
continue; continue;
@ -110,12 +107,13 @@ pub async fn scrape_fills(
let mut txns = join_all(txn_futs).await; let mut txns = join_all(txn_futs).await;
// TODO: reenable total fills metric let (fills, completed_sigs) =
let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets); parse_trades_from_openbook_txns(&mut txns, sig_strings, target_markets);
for fill in fills.iter() {
// Write any fills to the database, and update the transactions as processed let market_name = target_markets.get(&fill.market).unwrap();
insert_fills_atomically(pool, worker_id, fills, sig_strings).await?; METRIC_FILLS_TOTAL.with_label_values(&[market_name]).inc();
}
// Write fills to the database, and update properly fetched transactions as processed
insert_fills_atomically(pool, worker_id, fills, completed_sigs).await?;
} }
Ok(())
} }