Compare commits

...

7 Commits

Author SHA1 Message Date
Riordan Panayides cbb89a3219 Update markets 2023-06-30 19:39:14 +01:00
Riordan Panayides 252cccf74e Merge remote-tracking branch 'upstream/main' 2023-06-30 19:33:21 +01:00
dboures c7c760ae72
Merge pull request #6 from dboures/reliable-scraping
Reliable scraping
2023-06-14 22:30:52 -05:00
dboures 8b3e4b95a7
increase candle cooldown to 5 seconds 2023-06-09 07:55:41 -05:00
dboures d4afc71c04
reenable total fills metric 2023-06-08 23:50:48 -05:00
dboures c7ee74b872
style: clippy and dead code cleanup 2023-06-08 22:56:49 -05:00
dboures 4ba8fd5ec5
fix: don't mark unfetched txns as processed 2023-06-08 22:19:53 -05:00
8 changed files with 50 additions and 101 deletions

2
.gitignore vendored
View File

@ -1,2 +1,4 @@
/target
.env
*.cer*
*.pks*

View File

@ -34,5 +34,25 @@
{
"name": "RAY/USDC",
"address": "DZjbn4XC8qoHKikZqzmhemykVzmossoayV9ffbsUqxVj"
},
{
"name": "mSOL/SOL",
"address": "AYhLYoDr6QCtVb5n1M5hsWLG74oB8VEz378brxGTnjjn"
},
{
"name": "stSOL/USDC",
"address": "JCKa72xFYGWBEVJZ7AKZ2ofugWPBfrrouQviaGaohi3R"
},
{
"name": "LDO/USDC",
"address": "BqApFW7DwXThCDZAbK13nbHksEsv6YJMCdj58sJmRLdy"
},
{
"name": "stSOL/SOL",
"address": "GoXhYTpRF4vs4gx48S7XhbaukVbJXVycXimhGfzWNGLF"
},
{
"name": "JitoSOL/USDC",
"address": "DkbVbMhFxswS32xnn1K2UY4aoBugXooBTxdzkWWDWRkH"
}
]

View File

@ -102,11 +102,11 @@ pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> an
continue;
}
let last = sigs.last().unwrap();
let last_time = last.block_time.unwrap().clone();
let last_time = last.block_time.unwrap();
let last_signature = last.signature.clone();
let transactions = sigs
.into_iter()
.map(|s| PgTransaction::from_rpc_confirmed_transaction(s))
.map(PgTransaction::from_rpc_confirmed_transaction)
.collect::<Vec<PgTransaction>>();
if transactions.is_empty() {

View File

@ -1,7 +1,4 @@
use deadpool_postgres::Pool;
use log::debug;
use std::collections::HashMap;
use tokio::sync::mpsc::{error::TryRecvError, Receiver};
use crate::{
structs::{candle::Candle, openbook::OpenBookFillEvent, transaction::PgTransaction},
@ -19,8 +16,8 @@ pub async fn insert_fills_atomically(
let db_txn = client.build_transaction().start().await?;
// 1. Insert fills
if fills.len() > 0 {
let fills_statement = build_fills_upsert_statement_not_crazy(fills);
if !fills.is_empty() {
let fills_statement = build_fills_upsert_statement(fills);
db_txn
.execute(&fills_statement, &[])
.await
@ -42,78 +39,7 @@ pub async fn insert_fills_atomically(
Ok(())
}
pub async fn persist_fill_events(
pool: &Pool,
fill_receiver: &mut Receiver<OpenBookFillEvent>,
) -> anyhow::Result<()> {
loop {
let mut write_batch = HashMap::new();
while write_batch.len() < 10 {
match fill_receiver.try_recv() {
Ok(event) => {
write_batch.entry(event).or_insert(0);
}
Err(TryRecvError::Empty) => {
if !write_batch.is_empty() {
break;
} else {
continue;
}
}
Err(TryRecvError::Disconnected) => {
panic!("Fills sender must stay alive")
}
};
}
if !write_batch.is_empty() {
debug!("writing: {:?} events to DB\n", write_batch.len());
let upsert_statement = build_fills_upsert_statement(write_batch);
let client = pool.get().await?;
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()
.unwrap();
}
}
}
#[allow(deprecated)]
fn build_fills_upsert_statement(events: HashMap<OpenBookFillEvent, u8>) -> String {
let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES");
for (idx, event) in events.keys().enumerate() {
let val_str = format!(
"({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})",
event.signature,
to_timestampz(event.block_time as u64).to_rfc3339(),
event.market,
event.open_orders,
event.open_orders_owner,
event.bid,
event.maker,
event.native_qty_paid,
event.native_qty_received,
event.native_fee_or_rebate,
event.fee_tier,
event.order_id,
event.log_index,
);
if idx == 0 {
stmt = format!("{} {}", &stmt, val_str);
} else {
stmt = format!("{}, {}", &stmt, val_str);
}
}
let handle_conflict = "ON CONFLICT DO NOTHING";
stmt = format!("{} {}", stmt, handle_conflict);
stmt
}
fn build_fills_upsert_statement_not_crazy(fills: Vec<OpenBookFillEvent>) -> String {
fn build_fills_upsert_statement(fills: Vec<OpenBookFillEvent>) -> String {
let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES");
for (idx, fill) in fills.iter().enumerate() {
let val_str = format!(

View File

@ -55,7 +55,7 @@ async fn main() -> std::io::Result<()> {
.unwrap();
// For collecting metrics on the public api, excluding 404s
let public_metrics = PrometheusMetricsBuilder::new("openbook_candles_server")
.registry(registry.clone())
.registry(registry)
.exclude_status(StatusCode::NOT_FOUND)
.build()
.unwrap();

View File

@ -23,7 +23,7 @@ pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
let market_clone = market.clone();
loop {
sleep(Duration::milliseconds(2000).to_std()?).await;
sleep(Duration::milliseconds(5000).to_std()?).await;
match batch_inner(pool, &market_clone).await {
Ok(_) => {}
Err(e) => {
@ -61,7 +61,7 @@ async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
}
async fn save_candles(pool: &Pool, candles: Vec<Candle>) -> anyhow::Result<()> {
if candles.len() == 0 {
if candles.is_empty() {
return Ok(());
}
let upsert_statement = build_candles_upsert_statement(&candles);

View File

@ -15,10 +15,11 @@ const PROGRAM_DATA: &str = "Program data: ";
pub fn parse_trades_from_openbook_txns(
txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>,
sig_strings: &Vec<String>,
mut sig_strings: Vec<String>,
target_markets: &HashMap<Pubkey, String>,
) -> Vec<OpenBookFillEvent> {
) -> (Vec<OpenBookFillEvent>, Vec<String>) {
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
let mut failed_sigs = vec![];
for (idx, txn) in txns.iter_mut().enumerate() {
match txn {
Ok(t) => {
@ -42,13 +43,15 @@ pub fn parse_trades_from_openbook_txns(
}
Err(e) => {
warn!("rpc error in get_transaction {}", e);
failed_sigs.push(sig_strings[idx].clone());
METRIC_RPC_ERRORS_TOTAL
.with_label_values(&["getTransaction"])
.inc();
}
}
}
fills_vector
sig_strings.retain(|s| !failed_sigs.contains(s));
(fills_vector, sig_strings)
}
fn parse_openbook_fills_from_logs(

View File

@ -1,28 +1,26 @@
use deadpool_postgres::Pool;
use futures::future::join_all;
use log::{debug, info, warn};
use log::{debug, warn};
use solana_client::{
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
rpc_config::RpcTransactionConfig,
};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature};
use solana_transaction_status::UiTransactionEncoding;
use std::{collections::HashMap, str::FromStr};
use tokio::sync::mpsc::Sender;
use std::{collections::HashMap, time::Duration as WaitDuration};
use crate::{
database::{
fetch::fetch_worker_transactions,
insert::{build_transactions_insert_statement, insert_fills_atomically},
},
structs::{openbook::OpenBookFillEvent, transaction::PgTransaction},
utils::{AnyhowWrap, Config, OPENBOOK_KEY},
structs::transaction::PgTransaction,
utils::{AnyhowWrap, OPENBOOK_KEY},
worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL, METRIC_TRANSACTIONS_TOTAL},
};
use super::parsing::parse_trades_from_openbook_txns;
pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<()> {
let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
@ -53,7 +51,7 @@ pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<(
}
let transactions: Vec<PgTransaction> = sigs
.into_iter()
.map(|s| PgTransaction::from_rpc_confirmed_transaction(s))
.map(PgTransaction::from_rpc_confirmed_transaction)
.collect();
debug!("Scraper writing: {:?} txns to DB\n", transactions.len());
@ -66,7 +64,6 @@ pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<(
METRIC_TRANSACTIONS_TOTAL.inc_by(num_txns);
}
// TODO: graceful shutdown
Ok(())
}
pub async fn scrape_fills(
@ -80,7 +77,7 @@ pub async fn scrape_fills(
loop {
let transactions = fetch_worker_transactions(worker_id, pool).await?;
if transactions.len() == 0 {
if transactions.is_empty() {
debug!("No signatures found by worker {}", worker_id);
tokio::time::sleep(WaitDuration::from_secs(1)).await;
continue;
@ -110,12 +107,13 @@ pub async fn scrape_fills(
let mut txns = join_all(txn_futs).await;
// TODO: reenable total fills metric
let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets);
// Write any fills to the database, and update the transactions as processed
insert_fills_atomically(pool, worker_id, fills, sig_strings).await?;
let (fills, completed_sigs) =
parse_trades_from_openbook_txns(&mut txns, sig_strings, target_markets);
for fill in fills.iter() {
let market_name = target_markets.get(&fill.market).unwrap();
METRIC_FILLS_TOTAL.with_label_values(&[market_name]).inc();
}
// Write fills to the database, and update properly fetched transactions as processed
insert_fills_atomically(pool, worker_id, fills, completed_sigs).await?;
}
Ok(())
}