Compare commits
7 Commits
01b22cec38
...
cbb89a3219
Author | SHA1 | Date |
---|---|---|
Riordan Panayides | cbb89a3219 | |
Riordan Panayides | 252cccf74e | |
dboures | c7c760ae72 | |
dboures | 8b3e4b95a7 | |
dboures | d4afc71c04 | |
dboures | c7ee74b872 | |
dboures | 4ba8fd5ec5 |
|
@ -1,2 +1,4 @@
|
|||
/target
|
||||
.env
|
||||
*.cer*
|
||||
*.pks*
|
20
markets.json
20
markets.json
|
@ -34,5 +34,25 @@
|
|||
{
|
||||
"name": "RAY/USDC",
|
||||
"address": "DZjbn4XC8qoHKikZqzmhemykVzmossoayV9ffbsUqxVj"
|
||||
},
|
||||
{
|
||||
"name": "mSOL/SOL",
|
||||
"address": "AYhLYoDr6QCtVb5n1M5hsWLG74oB8VEz378brxGTnjjn"
|
||||
},
|
||||
{
|
||||
"name": "stSOL/USDC",
|
||||
"address": "JCKa72xFYGWBEVJZ7AKZ2ofugWPBfrrouQviaGaohi3R"
|
||||
},
|
||||
{
|
||||
"name": "LDO/USDC",
|
||||
"address": "BqApFW7DwXThCDZAbK13nbHksEsv6YJMCdj58sJmRLdy"
|
||||
},
|
||||
{
|
||||
"name": "stSOL/SOL",
|
||||
"address": "GoXhYTpRF4vs4gx48S7XhbaukVbJXVycXimhGfzWNGLF"
|
||||
},
|
||||
{
|
||||
"name": "JitoSOL/USDC",
|
||||
"address": "DkbVbMhFxswS32xnn1K2UY4aoBugXooBTxdzkWWDWRkH"
|
||||
}
|
||||
]
|
|
@ -102,11 +102,11 @@ pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> an
|
|||
continue;
|
||||
}
|
||||
let last = sigs.last().unwrap();
|
||||
let last_time = last.block_time.unwrap().clone();
|
||||
let last_time = last.block_time.unwrap();
|
||||
let last_signature = last.signature.clone();
|
||||
let transactions = sigs
|
||||
.into_iter()
|
||||
.map(|s| PgTransaction::from_rpc_confirmed_transaction(s))
|
||||
.map(PgTransaction::from_rpc_confirmed_transaction)
|
||||
.collect::<Vec<PgTransaction>>();
|
||||
|
||||
if transactions.is_empty() {
|
||||
|
|
|
@ -1,7 +1,4 @@
|
|||
use deadpool_postgres::Pool;
|
||||
use log::debug;
|
||||
use std::collections::HashMap;
|
||||
use tokio::sync::mpsc::{error::TryRecvError, Receiver};
|
||||
|
||||
use crate::{
|
||||
structs::{candle::Candle, openbook::OpenBookFillEvent, transaction::PgTransaction},
|
||||
|
@ -19,8 +16,8 @@ pub async fn insert_fills_atomically(
|
|||
let db_txn = client.build_transaction().start().await?;
|
||||
|
||||
// 1. Insert fills
|
||||
if fills.len() > 0 {
|
||||
let fills_statement = build_fills_upsert_statement_not_crazy(fills);
|
||||
if !fills.is_empty() {
|
||||
let fills_statement = build_fills_upsert_statement(fills);
|
||||
db_txn
|
||||
.execute(&fills_statement, &[])
|
||||
.await
|
||||
|
@ -42,78 +39,7 @@ pub async fn insert_fills_atomically(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn persist_fill_events(
|
||||
pool: &Pool,
|
||||
fill_receiver: &mut Receiver<OpenBookFillEvent>,
|
||||
) -> anyhow::Result<()> {
|
||||
loop {
|
||||
let mut write_batch = HashMap::new();
|
||||
while write_batch.len() < 10 {
|
||||
match fill_receiver.try_recv() {
|
||||
Ok(event) => {
|
||||
write_batch.entry(event).or_insert(0);
|
||||
}
|
||||
Err(TryRecvError::Empty) => {
|
||||
if !write_batch.is_empty() {
|
||||
break;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Err(TryRecvError::Disconnected) => {
|
||||
panic!("Fills sender must stay alive")
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
if !write_batch.is_empty() {
|
||||
debug!("writing: {:?} events to DB\n", write_batch.len());
|
||||
let upsert_statement = build_fills_upsert_statement(write_batch);
|
||||
let client = pool.get().await?;
|
||||
client
|
||||
.execute(&upsert_statement, &[])
|
||||
.await
|
||||
.map_err_anyhow()
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
fn build_fills_upsert_statement(events: HashMap<OpenBookFillEvent, u8>) -> String {
|
||||
let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES");
|
||||
for (idx, event) in events.keys().enumerate() {
|
||||
let val_str = format!(
|
||||
"({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})",
|
||||
event.signature,
|
||||
to_timestampz(event.block_time as u64).to_rfc3339(),
|
||||
event.market,
|
||||
event.open_orders,
|
||||
event.open_orders_owner,
|
||||
event.bid,
|
||||
event.maker,
|
||||
event.native_qty_paid,
|
||||
event.native_qty_received,
|
||||
event.native_fee_or_rebate,
|
||||
event.fee_tier,
|
||||
event.order_id,
|
||||
event.log_index,
|
||||
);
|
||||
|
||||
if idx == 0 {
|
||||
stmt = format!("{} {}", &stmt, val_str);
|
||||
} else {
|
||||
stmt = format!("{}, {}", &stmt, val_str);
|
||||
}
|
||||
}
|
||||
|
||||
let handle_conflict = "ON CONFLICT DO NOTHING";
|
||||
|
||||
stmt = format!("{} {}", stmt, handle_conflict);
|
||||
stmt
|
||||
}
|
||||
|
||||
fn build_fills_upsert_statement_not_crazy(fills: Vec<OpenBookFillEvent>) -> String {
|
||||
fn build_fills_upsert_statement(fills: Vec<OpenBookFillEvent>) -> String {
|
||||
let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES");
|
||||
for (idx, fill) in fills.iter().enumerate() {
|
||||
let val_str = format!(
|
||||
|
|
|
@ -55,7 +55,7 @@ async fn main() -> std::io::Result<()> {
|
|||
.unwrap();
|
||||
// For collecting metrics on the public api, excluding 404s
|
||||
let public_metrics = PrometheusMetricsBuilder::new("openbook_candles_server")
|
||||
.registry(registry.clone())
|
||||
.registry(registry)
|
||||
.exclude_status(StatusCode::NOT_FOUND)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
|
|
@ -23,7 +23,7 @@ pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
|
|||
let market_clone = market.clone();
|
||||
|
||||
loop {
|
||||
sleep(Duration::milliseconds(2000).to_std()?).await;
|
||||
sleep(Duration::milliseconds(5000).to_std()?).await;
|
||||
match batch_inner(pool, &market_clone).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
|
@ -61,7 +61,7 @@ async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
|
|||
}
|
||||
|
||||
async fn save_candles(pool: &Pool, candles: Vec<Candle>) -> anyhow::Result<()> {
|
||||
if candles.len() == 0 {
|
||||
if candles.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let upsert_statement = build_candles_upsert_statement(&candles);
|
||||
|
|
|
@ -15,10 +15,11 @@ const PROGRAM_DATA: &str = "Program data: ";
|
|||
|
||||
pub fn parse_trades_from_openbook_txns(
|
||||
txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>,
|
||||
sig_strings: &Vec<String>,
|
||||
mut sig_strings: Vec<String>,
|
||||
target_markets: &HashMap<Pubkey, String>,
|
||||
) -> Vec<OpenBookFillEvent> {
|
||||
) -> (Vec<OpenBookFillEvent>, Vec<String>) {
|
||||
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
|
||||
let mut failed_sigs = vec![];
|
||||
for (idx, txn) in txns.iter_mut().enumerate() {
|
||||
match txn {
|
||||
Ok(t) => {
|
||||
|
@ -42,13 +43,15 @@ pub fn parse_trades_from_openbook_txns(
|
|||
}
|
||||
Err(e) => {
|
||||
warn!("rpc error in get_transaction {}", e);
|
||||
failed_sigs.push(sig_strings[idx].clone());
|
||||
METRIC_RPC_ERRORS_TOTAL
|
||||
.with_label_values(&["getTransaction"])
|
||||
.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
fills_vector
|
||||
sig_strings.retain(|s| !failed_sigs.contains(s));
|
||||
(fills_vector, sig_strings)
|
||||
}
|
||||
|
||||
fn parse_openbook_fills_from_logs(
|
||||
|
|
|
@ -1,28 +1,26 @@
|
|||
use deadpool_postgres::Pool;
|
||||
use futures::future::join_all;
|
||||
use log::{debug, info, warn};
|
||||
use log::{debug, warn};
|
||||
use solana_client::{
|
||||
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
|
||||
rpc_config::RpcTransactionConfig,
|
||||
};
|
||||
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature};
|
||||
use solana_transaction_status::UiTransactionEncoding;
|
||||
use std::{collections::HashMap, str::FromStr};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use std::{collections::HashMap, time::Duration as WaitDuration};
|
||||
|
||||
use crate::{
|
||||
database::{
|
||||
fetch::fetch_worker_transactions,
|
||||
insert::{build_transactions_insert_statement, insert_fills_atomically},
|
||||
},
|
||||
structs::{openbook::OpenBookFillEvent, transaction::PgTransaction},
|
||||
utils::{AnyhowWrap, Config, OPENBOOK_KEY},
|
||||
structs::transaction::PgTransaction,
|
||||
utils::{AnyhowWrap, OPENBOOK_KEY},
|
||||
worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL, METRIC_TRANSACTIONS_TOTAL},
|
||||
};
|
||||
|
||||
use super::parsing::parse_trades_from_openbook_txns;
|
||||
|
||||
|
||||
pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<()> {
|
||||
let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
|
||||
|
||||
|
@ -53,7 +51,7 @@ pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<(
|
|||
}
|
||||
let transactions: Vec<PgTransaction> = sigs
|
||||
.into_iter()
|
||||
.map(|s| PgTransaction::from_rpc_confirmed_transaction(s))
|
||||
.map(PgTransaction::from_rpc_confirmed_transaction)
|
||||
.collect();
|
||||
|
||||
debug!("Scraper writing: {:?} txns to DB\n", transactions.len());
|
||||
|
@ -66,7 +64,6 @@ pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<(
|
|||
METRIC_TRANSACTIONS_TOTAL.inc_by(num_txns);
|
||||
}
|
||||
// TODO: graceful shutdown
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn scrape_fills(
|
||||
|
@ -80,7 +77,7 @@ pub async fn scrape_fills(
|
|||
|
||||
loop {
|
||||
let transactions = fetch_worker_transactions(worker_id, pool).await?;
|
||||
if transactions.len() == 0 {
|
||||
if transactions.is_empty() {
|
||||
debug!("No signatures found by worker {}", worker_id);
|
||||
tokio::time::sleep(WaitDuration::from_secs(1)).await;
|
||||
continue;
|
||||
|
@ -110,12 +107,13 @@ pub async fn scrape_fills(
|
|||
|
||||
let mut txns = join_all(txn_futs).await;
|
||||
|
||||
// TODO: reenable total fills metric
|
||||
let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets);
|
||||
|
||||
// Write any fills to the database, and update the transactions as processed
|
||||
insert_fills_atomically(pool, worker_id, fills, sig_strings).await?;
|
||||
let (fills, completed_sigs) =
|
||||
parse_trades_from_openbook_txns(&mut txns, sig_strings, target_markets);
|
||||
for fill in fills.iter() {
|
||||
let market_name = target_markets.get(&fill.market).unwrap();
|
||||
METRIC_FILLS_TOTAL.with_label_values(&[market_name]).inc();
|
||||
}
|
||||
// Write fills to the database, and update properly fetched transactions as processed
|
||||
insert_fills_atomically(pool, worker_id, fills, completed_sigs).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue