Merge pull request #6 from dboures/reliable-scraping

Reliable scraping
This commit is contained in:
dboures 2023-06-14 22:30:52 -05:00 committed by GitHub
commit c7c760ae72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 436 additions and 372 deletions

2
.gitignore vendored
View File

@ -1,2 +1,4 @@
/target /target
.env .env
*.cer*
*.pks*

View File

@ -53,7 +53,7 @@ async fn main() -> anyhow::Result<()> {
async fn save_candles(candles: Vec<Candle>, client: Object) -> anyhow::Result<()> { async fn save_candles(candles: Vec<Candle>, client: Object) -> anyhow::Result<()> {
if !candles.is_empty() { if !candles.is_empty() {
let upsert_statement = build_candles_upsert_statement(candles); let upsert_statement = build_candles_upsert_statement(&candles);
client client
.execute(&upsert_statement, &[]) .execute(&upsert_statement, &[])
.await .await

View File

@ -1,23 +1,24 @@
use anchor_lang::prelude::Pubkey; use anchor_lang::prelude::Pubkey;
use chrono::{DateTime, Duration, NaiveDateTime, Utc}; use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use futures::future::join_all; use deadpool_postgres::Pool;
use log::debug;
use openbook_candles::{ use openbook_candles::{
database::{initialize::connect_to_database, insert::persist_fill_events}, database::{
initialize::{connect_to_database, setup_database},
insert::build_transactions_insert_statement,
},
structs::{ structs::{
markets::{fetch_market_infos, load_markets}, markets::{fetch_market_infos, load_markets},
openbook::OpenBookFillEvent, transaction::{PgTransaction, NUM_TRANSACTION_PARTITIONS},
}, },
utils::Config, utils::{AnyhowWrap, Config, OPENBOOK_KEY},
worker::trade_fetching::parsing::parse_trades_from_openbook_txns, worker::trade_fetching::scrape::scrape_fills,
}; };
use solana_client::{ use solana_client::{
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
rpc_config::RpcTransactionConfig, rpc_response::RpcConfirmedTransactionStatusWithSignature,
}; };
use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature}; use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature};
use solana_transaction_status::UiTransactionEncoding;
use std::{collections::HashMap, env, str::FromStr}; use std::{collections::HashMap, env, str::FromStr};
use tokio::sync::mpsc::{self, Sender};
#[tokio::main(flavor = "multi_thread", worker_threads = 10)] #[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
@ -26,6 +27,8 @@ async fn main() -> anyhow::Result<()> {
assert!(args.len() == 2); assert!(args.len() == 2);
let path_to_markets_json = &args[1]; let path_to_markets_json = &args[1];
// let num_days = args[2].parse::<i64>().unwrap(); // TODO: implement
let num_days = 1;
let rpc_url: String = dotenv::var("RPC_URL").unwrap(); let rpc_url: String = dotenv::var("RPC_URL").unwrap();
let config = Config { let config = Config {
@ -40,145 +43,93 @@ async fn main() -> anyhow::Result<()> {
println!("{:?}", target_markets); println!("{:?}", target_markets);
let pool = connect_to_database().await?; let pool = connect_to_database().await?;
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(1000); setup_database(&pool).await?;
tokio::spawn(async move {
loop {
persist_fill_events(&pool, &mut fill_receiver)
.await
.unwrap();
}
});
backfill(rpc_url, &fill_sender, &target_markets).await?;
Ok(())
}
pub async fn backfill(
rpc_url: String,
fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, String>,
) -> anyhow::Result<()> {
println!("backfill started");
let mut before_sig: Option<Signature> = None;
let mut now_time = Utc::now().timestamp();
let end_time = (Utc::now() - Duration::days(1)).timestamp();
let mut handles = vec![]; let mut handles = vec![];
while now_time > end_time { let rpc_clone = rpc_url.clone();
let rpc_client = let pool_clone = pool.clone();
RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed()); handles.push(tokio::spawn(async move {
let maybe_r = get_signatures(&rpc_client, before_sig).await; fetch_signatures(rpc_clone, &pool_clone, num_days)
.await
.unwrap();
}));
match maybe_r { // Low priority improvement: batch fills into 1000's per worker
Some((last, time, sigs)) => { for id in 0..NUM_TRANSACTION_PARTITIONS {
now_time = time; let rpc_clone = rpc_url.clone();
before_sig = Some(last); let pool_clone = pool.clone();
let time_left = backfill_time_left(now_time, end_time); let markets_clone = target_markets.clone();
println!( handles.push(tokio::spawn(async move {
"{} minutes ~ {} days remaining in the backfill\n", scrape_fills(id as i32, rpc_clone, &pool_clone, &markets_clone)
time_left.num_minutes(), .await
time_left.num_days() .unwrap();
); }));
let cloned_markets = target_markets.clone();
let cloned_sender = fill_sender.clone();
let handle = tokio::spawn(async move {
get_transactions(&rpc_client, sigs, &cloned_sender, &cloned_markets).await;
});
handles.push(handle);
}
None => {}
}
} }
futures::future::join_all(handles).await; // TODO: spawn status thread
println!("Backfill complete \n"); futures::future::join_all(handles).await;
Ok(()) Ok(())
} }
pub async fn get_signatures( pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> anyhow::Result<()> {
rpc_client: &RpcClient, let mut before_sig: Option<Signature> = None;
before_sig: Option<Signature>, let mut now_time = Utc::now().timestamp();
) -> Option<( let end_time = (Utc::now() - Duration::days(num_days)).timestamp();
Signature, let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
i64,
Vec<RpcConfirmedTransactionStatusWithSignature>,
)> {
let rpc_config = GetConfirmedSignaturesForAddress2Config {
before: before_sig,
until: None,
limit: None,
commitment: Some(CommitmentConfig::confirmed()),
};
let sigs = match rpc_client while now_time > end_time {
.get_signatures_for_address_with_config( let rpc_config = GetConfirmedSignaturesForAddress2Config {
&Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(), before: before_sig,
rpc_config, until: None,
) limit: None,
.await commitment: Some(CommitmentConfig::confirmed()),
{ };
Ok(s) => s,
Err(e) => {
println!("Error in get_signatures_for_address_with_config: {}", e);
return None;
}
};
if sigs.is_empty() { let sigs = match rpc_client
println!("No signatures found"); .get_signatures_for_address_with_config(&OPENBOOK_KEY, rpc_config)
return None; .await
} {
let last = sigs.last().unwrap(); Ok(sigs) => sigs,
// println!("{:?}", last.block_time.unwrap()); Err(e) => {
Some(( println!("Error fetching signatures: {}", e);
Signature::from_str(&last.signature).unwrap(), continue;
last.block_time.unwrap(),
sigs,
))
}
pub async fn get_transactions(
rpc_client: &RpcClient,
mut sigs: Vec<RpcConfirmedTransactionStatusWithSignature>,
fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, String>,
) {
sigs.retain(|sig| sig.err.is_none());
if sigs.last().is_none() {
return;
}
let txn_config = RpcTransactionConfig {
encoding: Some(UiTransactionEncoding::Json),
commitment: Some(CommitmentConfig::confirmed()),
max_supported_transaction_version: Some(0),
};
let signatures: Vec<_> = sigs
.into_iter()
.map(|sig| sig.signature.parse::<Signature>().unwrap())
.collect();
let txn_futs: Vec<_> = signatures
.iter()
.map(|s| rpc_client.get_transaction_with_config(s, txn_config))
.collect();
let mut txns = join_all(txn_futs).await;
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
if !fills.is_empty() {
for fill in fills.into_iter() {
// println!("Sending fill {:?}", fill);
if let Err(_) = fill_sender.send(fill).await {
panic!("receiver dropped");
} }
};
if sigs.is_empty() {
println!("No signatures found, trying again");
continue;
} }
let last = sigs.last().unwrap();
let last_time = last.block_time.unwrap();
let last_signature = last.signature.clone();
let transactions = sigs
.into_iter()
.map(PgTransaction::from_rpc_confirmed_transaction)
.collect::<Vec<PgTransaction>>();
if transactions.is_empty() {
println!("No transactions found, trying again");
}
debug!("writing: {:?} txns to DB\n", transactions.len());
let upsert_statement = build_transactions_insert_statement(transactions);
let client = pool.get().await?;
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()?;
now_time = last_time;
before_sig = Some(Signature::from_str(&last_signature)?);
let time_left = backfill_time_left(now_time, end_time);
println!(
"{} minutes ~ {} days remaining in the backfill\n",
time_left.num_minutes(),
time_left.num_days()
);
} }
Ok(())
} }
fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration { fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration {

View File

@ -4,6 +4,7 @@ use crate::structs::{
openbook::PgOpenBookFill, openbook::PgOpenBookFill,
resolution::Resolution, resolution::Resolution,
trader::PgTrader, trader::PgTrader,
transaction::PgTransaction,
}; };
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use deadpool_postgres::{GenericClient, Pool}; use deadpool_postgres::{GenericClient, Pool};
@ -320,3 +321,23 @@ pub async fn fetch_coingecko_24h_high_low(
.map(PgCoinGecko24HighLow::from_row) .map(PgCoinGecko24HighLow::from_row)
.collect()) .collect())
} }
/// Fetches unprocessed, non-error transactions for the specified worker partition.
/// Pulls at most 50 transactions at a time.
pub async fn fetch_worker_transactions(
worker_id: i32,
pool: &Pool,
) -> anyhow::Result<Vec<PgTransaction>> {
let client = pool.get().await?;
let stmt = r#"SELECT signature, program_pk, block_datetime, slot, err, "processed", worker_partition
FROM transactions
where worker_partition = $1
and err = false
and processed = false
LIMIT 50"#;
let rows = client.query(stmt, &[&worker_id]).await?;
Ok(rows.into_iter().map(PgTransaction::from_row).collect())
}

View File

@ -67,8 +67,9 @@ pub async fn connect_to_database() -> anyhow::Result<Pool> {
pub async fn setup_database(pool: &Pool) -> anyhow::Result<()> { pub async fn setup_database(pool: &Pool) -> anyhow::Result<()> {
let candles_table_fut = create_candles_table(pool); let candles_table_fut = create_candles_table(pool);
let transactions_table_fut = create_transactions_table(pool);
let fills_table_fut = create_fills_table(pool); let fills_table_fut = create_fills_table(pool);
let result = tokio::try_join!(candles_table_fut, fills_table_fut); let result = tokio::try_join!(candles_table_fut, transactions_table_fut, fills_table_fut);
match result { match result {
Ok(_) => { Ok(_) => {
println!("Successfully configured database"); println!("Successfully configured database");
@ -126,7 +127,7 @@ pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> {
client client
.execute( .execute(
"CREATE TABLE IF NOT EXISTS fills ( "CREATE TABLE IF NOT EXISTS fills (
id numeric PRIMARY KEY, signature text not null,
time timestamptz not null, time timestamptz not null,
market text not null, market text not null,
open_orders text not null, open_orders text not null,
@ -137,19 +138,14 @@ pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> {
native_qty_received double precision not null, native_qty_received double precision not null,
native_fee_or_rebate double precision not null, native_fee_or_rebate double precision not null,
fee_tier text not null, fee_tier text not null,
order_id text not null order_id text not null,
log_index int4 not null,
CONSTRAINT fills_pk PRIMARY KEY (signature, log_index)
)", )",
&[], &[],
) )
.await?; .await?;
client
.execute(
"CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)",
&[],
)
.await?;
client client
.execute( .execute(
"CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)", "CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)",
@ -158,3 +154,41 @@ pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> {
.await?; .await?;
Ok(()) Ok(())
} }
pub async fn create_transactions_table(pool: &Pool) -> anyhow::Result<()> {
let client = pool.get().await?;
client
.execute(
"CREATE TABLE IF NOT EXISTS transactions (
signature text NOT NULL,
program_pk text NOT NULL,
block_datetime timestamptz NOT NULL,
slot bigint NOT NULL,
err bool NOT NULL,
processed bool NOT NULL,
worker_partition int4 NOT NULL,
CONSTRAINT transactions_pk PRIMARY KEY (signature, worker_partition)
) PARTITION BY LIST (worker_partition);",
&[],
)
.await?;
client.batch_execute(
"CREATE INDEX IF NOT EXISTS transactions_processed_err_idx ON ONLY transactions (signature) WHERE processed IS NOT TRUE and err IS NOT TRUE;
CREATE INDEX IF NOT EXISTS transactions_program_pk_idx ON ONLY transactions USING btree (program_pk, slot DESC);
CREATE TABLE IF NOT EXISTS transactions_0 PARTITION OF transactions FOR VALUES IN (0);
CREATE TABLE IF NOT EXISTS transactions_1 PARTITION OF transactions FOR VALUES IN (1);
CREATE TABLE IF NOT EXISTS transactions_2 PARTITION OF transactions FOR VALUES IN (2);
CREATE TABLE IF NOT EXISTS transactions_3 PARTITION OF transactions FOR VALUES IN (3);
CREATE TABLE IF NOT EXISTS transactions_4 PARTITION OF transactions FOR VALUES IN (4);
CREATE TABLE IF NOT EXISTS transactions_5 PARTITION OF transactions FOR VALUES IN (5);
CREATE TABLE IF NOT EXISTS transactions_6 PARTITION OF transactions FOR VALUES IN (6);
CREATE TABLE IF NOT EXISTS transactions_7 PARTITION OF transactions FOR VALUES IN (7);
CREATE TABLE IF NOT EXISTS transactions_8 PARTITION OF transactions FOR VALUES IN (8);
CREATE TABLE IF NOT EXISTS transactions_9 PARTITION OF transactions FOR VALUES IN (9);"
).await?;
Ok(())
}

View File

@ -1,73 +1,62 @@
use deadpool_postgres::Pool; use deadpool_postgres::Pool;
use log::debug;
use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher},
};
use tokio::sync::mpsc::{error::TryRecvError, Receiver};
use crate::{ use crate::{
structs::{candle::Candle, openbook::OpenBookFillEvent}, structs::{candle::Candle, openbook::OpenBookFillEvent, transaction::PgTransaction},
utils::{to_timestampz, AnyhowWrap}, utils::{to_timestampz, AnyhowWrap},
}; };
pub async fn persist_fill_events( pub async fn insert_fills_atomically(
pool: &Pool, pool: &Pool,
fill_receiver: &mut Receiver<OpenBookFillEvent>, worker_id: i32,
fills: Vec<OpenBookFillEvent>,
signatures: Vec<String>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
loop { let mut client = pool.get().await?;
let mut write_batch = HashMap::new();
while write_batch.len() < 10 {
match fill_receiver.try_recv() {
Ok(event) => {
write_batch.entry(event).or_insert(0);
}
Err(TryRecvError::Empty) => {
if !write_batch.is_empty() {
break;
} else {
continue;
}
}
Err(TryRecvError::Disconnected) => {
panic!("Fills sender must stay alive")
}
};
}
if !write_batch.is_empty() { let db_txn = client.build_transaction().start().await?;
debug!("writing: {:?} events to DB\n", write_batch.len());
let upsert_statement = build_fills_upsert_statement(write_batch); // 1. Insert fills
let client = pool.get().await?; if !fills.is_empty() {
client let fills_statement = build_fills_upsert_statement(fills);
.execute(&upsert_statement, &[]) db_txn
.await .execute(&fills_statement, &[])
.map_err_anyhow() .await
.unwrap(); .map_err_anyhow()
} .unwrap();
} }
// 2. Update txns table as processed
let transactions_statement =
build_transactions_processed_update_statement(worker_id, signatures);
db_txn
.execute(&transactions_statement, &[])
.await
.map_err_anyhow()
.unwrap();
db_txn.commit().await?;
Ok(())
} }
#[allow(deprecated)] fn build_fills_upsert_statement(fills: Vec<OpenBookFillEvent>) -> String {
fn build_fills_upsert_statement(events: HashMap<OpenBookFillEvent, u8>) -> String { let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES");
let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES"); for (idx, fill) in fills.iter().enumerate() {
for (idx, event) in events.keys().enumerate() {
let mut hasher = DefaultHasher::new();
event.hash(&mut hasher);
let val_str = format!( let val_str = format!(
"({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {})", "(\'{}\', \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})",
hasher.finish(), fill.signature,
to_timestampz(event.block_time as u64).to_rfc3339(), to_timestampz(fill.block_time as u64).to_rfc3339(),
event.market, fill.market,
event.open_orders, fill.open_orders,
event.open_orders_owner, fill.open_orders_owner,
event.bid, fill.bid,
event.maker, fill.maker,
event.native_qty_paid, fill.native_qty_paid,
event.native_qty_received, fill.native_qty_received,
event.native_fee_or_rebate, fill.native_fee_or_rebate,
event.fee_tier, fill.fee_tier,
event.order_id, fill.order_id,
fill.log_index,
); );
if idx == 0 { if idx == 0 {
@ -77,7 +66,7 @@ fn build_fills_upsert_statement(events: HashMap<OpenBookFillEvent, u8>) -> Strin
} }
} }
let handle_conflict = "ON CONFLICT (id) DO UPDATE SET market=excluded.market"; let handle_conflict = "ON CONFLICT DO NOTHING";
stmt = format!("{} {}", stmt, handle_conflict); stmt = format!("{} {}", stmt, handle_conflict);
stmt stmt
@ -121,56 +110,53 @@ pub fn build_candles_upsert_statement(candles: &Vec<Candle>) -> String {
stmt stmt
} }
#[cfg(test)] pub fn build_transactions_insert_statement(transactions: Vec<PgTransaction>) -> String {
mod tests { let mut stmt = String::from("INSERT INTO transactions (signature, program_pk, block_datetime, slot, err, processed, worker_partition) VALUES");
use super::*; for (idx, txn) in transactions.iter().enumerate() {
use solana_sdk::pubkey::Pubkey; let val_str = format!(
use std::str::FromStr; "(\'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {})",
txn.signature,
txn.program_pk,
txn.block_datetime.to_rfc3339(),
txn.slot,
txn.err,
txn.processed,
txn.worker_partition,
);
#[test] if idx == 0 {
fn test_event_hashing() { stmt = format!("{} {}", &stmt, val_str);
let event_1 = OpenBookFillEvent { } else {
market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(), stmt = format!("{}, {}", &stmt, val_str);
open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(), }
open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw")
.unwrap(),
bid: false,
maker: false,
native_qty_paid: 200000000,
native_qty_received: 4204317,
native_fee_or_rebate: 1683,
order_id: 387898134381964481824213,
owner_slot: 0,
fee_tier: 0,
client_order_id: None,
referrer_rebate: Some(841),
block_time: 0,
};
let event_2 = OpenBookFillEvent {
market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(),
open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(),
open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw")
.unwrap(),
bid: false,
maker: false,
native_qty_paid: 200000001,
native_qty_received: 4204317,
native_fee_or_rebate: 1683,
order_id: 387898134381964481824213,
owner_slot: 0,
fee_tier: 0,
client_order_id: None,
referrer_rebate: Some(841),
block_time: 0,
};
let mut h1 = DefaultHasher::new();
event_1.hash(&mut h1);
let mut h2 = DefaultHasher::new();
event_2.hash(&mut h2);
assert_ne!(h1.finish(), h2.finish());
} }
let handle_conflict = "ON CONFLICT DO NOTHING";
stmt = format!("{} {}", stmt, handle_conflict);
stmt
}
pub fn build_transactions_processed_update_statement(
worker_id: i32,
processed_signatures: Vec<String>,
) -> String {
let mut stmt = String::from(
"UPDATE transactions
SET processed = true
WHERE transactions.signature IN (",
);
for (idx, sig) in processed_signatures.iter().enumerate() {
let val_str = if idx == processed_signatures.len() - 1 {
format!("\'{}\'", sig,)
} else {
format!("\'{}\',", sig,)
};
stmt = format!("{} {}", &stmt, val_str);
}
let worker_stmt = format!(") AND worker_partition = {} ", worker_id);
stmt = format!("{} {}", stmt, worker_stmt);
stmt
} }

View File

@ -55,7 +55,7 @@ async fn main() -> std::io::Result<()> {
.unwrap(); .unwrap();
// For collecting metrics on the public api, excluding 404s // For collecting metrics on the public api, excluding 404s
let public_metrics = PrometheusMetricsBuilder::new("openbook_candles_server") let public_metrics = PrometheusMetricsBuilder::new("openbook_candles_server")
.registry(registry.clone()) .registry(registry)
.exclude_status(StatusCode::NOT_FOUND) .exclude_status(StatusCode::NOT_FOUND)
.build() .build()
.unwrap(); .unwrap();

View File

@ -6,3 +6,4 @@ pub mod resolution;
pub mod slab; pub mod slab;
pub mod trader; pub mod trader;
pub mod tradingview; pub mod tradingview;
pub mod transaction;

View File

@ -22,8 +22,14 @@ pub struct OpenBookFillEventRaw {
pub referrer_rebate: Option<u64>, pub referrer_rebate: Option<u64>,
} }
impl OpenBookFillEventRaw { impl OpenBookFillEventRaw {
pub fn with_time(self, block_time: i64) -> OpenBookFillEvent { pub fn into_event(
self,
signature: String,
block_time: i64,
log_index: usize,
) -> OpenBookFillEvent {
OpenBookFillEvent { OpenBookFillEvent {
signature,
market: self.market, market: self.market,
open_orders: self.open_orders, open_orders: self.open_orders,
open_orders_owner: self.open_orders_owner, open_orders_owner: self.open_orders_owner,
@ -38,6 +44,7 @@ impl OpenBookFillEventRaw {
client_order_id: self.client_order_id, client_order_id: self.client_order_id,
referrer_rebate: self.referrer_rebate, referrer_rebate: self.referrer_rebate,
block_time, block_time,
log_index,
} }
} }
} }
@ -45,6 +52,7 @@ impl OpenBookFillEventRaw {
#[event] #[event]
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OpenBookFillEvent { pub struct OpenBookFillEvent {
pub signature: String,
pub market: Pubkey, pub market: Pubkey,
pub open_orders: Pubkey, pub open_orders: Pubkey,
pub open_orders_owner: Pubkey, pub open_orders_owner: Pubkey,
@ -59,6 +67,7 @@ pub struct OpenBookFillEvent {
pub client_order_id: Option<u64>, pub client_order_id: Option<u64>,
pub referrer_rebate: Option<u64>, pub referrer_rebate: Option<u64>,
pub block_time: i64, pub block_time: i64,
pub log_index: usize,
} }
#[derive(Copy, Clone, Debug, PartialEq)] #[derive(Copy, Clone, Debug, PartialEq)]

View File

@ -0,0 +1,52 @@
use chrono::{DateTime, Utc};
use solana_client::rpc_response::RpcConfirmedTransactionStatusWithSignature;
use tokio_postgres::Row;
use crate::utils::{to_timestampz, OPENBOOK_KEY};
#[derive(Clone, Debug, PartialEq)]
pub struct PgTransaction {
pub signature: String,
pub program_pk: String,
pub block_datetime: DateTime<Utc>,
pub slot: u64,
pub err: bool,
pub processed: bool,
pub worker_partition: i32,
}
pub const NUM_TRANSACTION_PARTITIONS: u64 = 10;
impl PgTransaction {
pub fn from_rpc_confirmed_transaction(
rpc_confirmed_transaction: RpcConfirmedTransactionStatusWithSignature,
) -> Self {
PgTransaction {
signature: rpc_confirmed_transaction.signature,
program_pk: OPENBOOK_KEY.to_string(),
block_datetime: to_timestampz(rpc_confirmed_transaction.block_time.unwrap() as u64),
slot: rpc_confirmed_transaction.slot,
err: rpc_confirmed_transaction.err.is_some(),
processed: false,
worker_partition: (rpc_confirmed_transaction.slot % NUM_TRANSACTION_PARTITIONS) as i32,
}
}
pub fn from_row(row: Row) -> Self {
let slot_raw = row.get::<usize, i64>(3);
PgTransaction {
signature: row.get(0),
program_pk: row.get(1),
block_datetime: row.get(2),
slot: slot_raw as u64,
err: row.get(4),
processed: row.get(5),
worker_partition: row.get(6),
}
}
}
pub enum ProcessState {
Processed,
Unprocessed,
}

View File

@ -1,9 +1,13 @@
use anchor_lang::prelude::Pubkey;
use chrono::{NaiveDateTime, Utc}; use chrono::{NaiveDateTime, Utc};
use deadpool_postgres::Pool; use deadpool_postgres::Pool;
use serde_derive::Deserialize; use serde_derive::Deserialize;
use solana_sdk::pubkey;
use crate::structs::markets::MarketInfo; use crate::structs::markets::MarketInfo;
pub const OPENBOOK_KEY: Pubkey = pubkey!("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX");
pub trait AnyhowWrap { pub trait AnyhowWrap {
type Value; type Value;
fn map_err_anyhow(self) -> anyhow::Result<Self::Value>; fn map_err_anyhow(self) -> anyhow::Result<Self::Value>;

View File

@ -21,7 +21,7 @@ pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
let market_clone = market.clone(); let market_clone = market.clone();
loop { loop {
sleep(Duration::milliseconds(2000).to_std()?).await; sleep(Duration::milliseconds(5000).to_std()?).await;
match batch_inner(pool, &market_clone).await { match batch_inner(pool, &market_clone).await {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
@ -53,7 +53,7 @@ async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
} }
async fn save_candles(pool: &Pool, candles: Vec<Candle>) -> anyhow::Result<()> { async fn save_candles(pool: &Pool, candles: Vec<Candle>) -> anyhow::Result<()> {
if candles.len() == 0 { if candles.is_empty() {
return Ok(()); return Ok(());
} }
let upsert_statement = build_candles_upsert_statement(&candles); let upsert_statement = build_candles_upsert_statement(&candles);

View File

@ -1,22 +1,18 @@
use log::{error, info}; use log::{error, info};
use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
use openbook_candles::structs::openbook::OpenBookFillEvent; use openbook_candles::structs::transaction::NUM_TRANSACTION_PARTITIONS;
use openbook_candles::utils::Config; use openbook_candles::utils::Config;
use openbook_candles::worker::metrics::{ use openbook_candles::worker::metrics::{
serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE, METRIC_FILLS_QUEUE_LENGTH, serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE,
}; };
use openbook_candles::worker::trade_fetching::scrape::scrape; use openbook_candles::worker::trade_fetching::scrape::{scrape_fills, scrape_signatures};
use openbook_candles::{ use openbook_candles::{
database::{ database::initialize::{connect_to_database, setup_database},
initialize::{connect_to_database, setup_database},
insert::{persist_fill_events},
},
worker::candle_batching::batch_for_market, worker::candle_batching::batch_for_market,
}; };
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std::env; use std::env;
use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
use tokio::sync::mpsc;
#[tokio::main(flavor = "multi_thread", worker_threads = 10)] #[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
@ -32,8 +28,6 @@ async fn main() -> anyhow::Result<()> {
rpc_url: rpc_url.clone(), rpc_url: rpc_url.clone(),
}; };
let fills_queue_max_size = 10000;
let markets = load_markets(path_to_markets_json); let markets = load_markets(path_to_markets_json);
let market_infos = fetch_market_infos(&config, markets.clone()).await?; let market_infos = fetch_market_infos(&config, markets.clone()).await?;
let mut target_markets = HashMap::new(); let mut target_markets = HashMap::new();
@ -46,21 +40,26 @@ async fn main() -> anyhow::Result<()> {
setup_database(&pool).await?; setup_database(&pool).await?;
let mut handles = vec![]; let mut handles = vec![];
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(fills_queue_max_size); // signature scraping
let scrape_fill_sender = fill_sender.clone(); let rpc_clone = rpc_url.clone();
let pool_clone = pool.clone();
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
scrape(&config, &scrape_fill_sender, &target_markets).await; scrape_signatures(rpc_clone, &pool_clone).await.unwrap();
})); }));
let fills_pool = pool.clone(); // transaction/fill scraping
handles.push(tokio::spawn(async move { for id in 0..NUM_TRANSACTION_PARTITIONS {
loop { let rpc_clone = rpc_url.clone();
persist_fill_events(&fills_pool, &mut fill_receiver) let pool_clone = pool.clone();
let markets_clone = target_markets.clone();
handles.push(tokio::spawn(async move {
scrape_fills(id as i32, rpc_clone, &pool_clone, &markets_clone)
.await .await
.unwrap(); .unwrap();
} }));
})); }
// candle batching
for market in market_infos.into_iter() { for market in market_infos.into_iter() {
let batch_pool = pool.clone(); let batch_pool = pool.clone();
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
@ -70,7 +69,6 @@ async fn main() -> anyhow::Result<()> {
} }
let monitor_pool = pool.clone(); let monitor_pool = pool.clone();
let monitor_fill_channel = fill_sender.clone();
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
// TODO: maybe break this out into a new function // TODO: maybe break this out into a new function
loop { loop {
@ -78,9 +76,6 @@ async fn main() -> anyhow::Result<()> {
METRIC_DB_POOL_AVAILABLE.set(pool_status.available as i64); METRIC_DB_POOL_AVAILABLE.set(pool_status.available as i64);
METRIC_DB_POOL_SIZE.set(pool_status.size as i64); METRIC_DB_POOL_SIZE.set(pool_status.size as i64);
METRIC_FILLS_QUEUE_LENGTH
.set((fills_queue_max_size - monitor_fill_channel.capacity()) as i64);
tokio::time::sleep(WaitDuration::from_secs(10)).await; tokio::time::sleep(WaitDuration::from_secs(10)).await;
} }
})); }));

View File

@ -2,8 +2,8 @@ use actix_web::{dev::Server, http::StatusCode, App, HttpServer};
use actix_web_prom::PrometheusMetricsBuilder; use actix_web_prom::PrometheusMetricsBuilder;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use prometheus::{ use prometheus::{
register_int_counter_vec_with_registry, register_int_gauge_with_registry, IntCounterVec, register_int_counter_vec_with_registry, register_int_counter_with_registry,
IntGauge, Registry, register_int_gauge_with_registry, IntCounter, IntCounterVec, IntGauge, Registry,
}; };
lazy_static! { lazy_static! {
@ -30,9 +30,9 @@ lazy_static! {
METRIC_REGISTRY METRIC_REGISTRY
) )
.unwrap(); .unwrap();
pub static ref METRIC_FILLS_QUEUE_LENGTH: IntGauge = register_int_gauge_with_registry!( pub static ref METRIC_TRANSACTIONS_TOTAL: IntCounter = register_int_counter_with_registry!(
"fills_queue_length", "transactions_total",
"Current length of the fills write queue", "Total number of transaction signatures scraped",
METRIC_REGISTRY METRIC_REGISTRY
) )
.unwrap(); .unwrap();

View File

@ -15,10 +15,12 @@ const PROGRAM_DATA: &str = "Program data: ";
pub fn parse_trades_from_openbook_txns( pub fn parse_trades_from_openbook_txns(
txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>, txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>,
mut sig_strings: Vec<String>,
target_markets: &HashMap<Pubkey, String>, target_markets: &HashMap<Pubkey, String>,
) -> Vec<OpenBookFillEvent> { ) -> (Vec<OpenBookFillEvent>, Vec<String>) {
let mut fills_vector = Vec::<OpenBookFillEvent>::new(); let mut fills_vector = Vec::<OpenBookFillEvent>::new();
for txn in txns.iter_mut() { let mut failed_sigs = vec![];
for (idx, txn) in txns.iter_mut().enumerate() {
match txn { match txn {
Ok(t) => { Ok(t) => {
if let Some(m) = &t.transaction.meta { if let Some(m) = &t.transaction.meta {
@ -27,6 +29,7 @@ pub fn parse_trades_from_openbook_txns(
match parse_openbook_fills_from_logs( match parse_openbook_fills_from_logs(
logs, logs,
target_markets, target_markets,
sig_strings[idx].clone(),
t.block_time.unwrap(), t.block_time.unwrap(),
) { ) {
Some(mut events) => fills_vector.append(&mut events), Some(mut events) => fills_vector.append(&mut events),
@ -40,22 +43,25 @@ pub fn parse_trades_from_openbook_txns(
} }
Err(e) => { Err(e) => {
warn!("rpc error in get_transaction {}", e); warn!("rpc error in get_transaction {}", e);
failed_sigs.push(sig_strings[idx].clone());
METRIC_RPC_ERRORS_TOTAL METRIC_RPC_ERRORS_TOTAL
.with_label_values(&["getTransaction"]) .with_label_values(&["getTransaction"])
.inc(); .inc();
} }
} }
} }
fills_vector sig_strings.retain(|s| !failed_sigs.contains(s));
(fills_vector, sig_strings)
} }
fn parse_openbook_fills_from_logs( fn parse_openbook_fills_from_logs(
logs: &Vec<String>, logs: &Vec<String>,
target_markets: &HashMap<Pubkey, String>, target_markets: &HashMap<Pubkey, String>,
signature: String,
block_time: i64, block_time: i64,
) -> Option<Vec<OpenBookFillEvent>> { ) -> Option<Vec<OpenBookFillEvent>> {
let mut fills_vector = Vec::<OpenBookFillEvent>::new(); let mut fills_vector = Vec::<OpenBookFillEvent>::new();
for l in logs { for (idx, l) in logs.iter().enumerate() {
match l.strip_prefix(PROGRAM_DATA) { match l.strip_prefix(PROGRAM_DATA) {
Some(log) => { Some(log) => {
let borsh_bytes = match anchor_lang::__private::base64::decode(log) { let borsh_bytes = match anchor_lang::__private::base64::decode(log) {
@ -68,7 +74,7 @@ fn parse_openbook_fills_from_logs(
match event { match event {
Ok(e) => { Ok(e) => {
let fill_event = e.with_time(block_time); let fill_event = e.into_event(signature.clone(), block_time, idx);
if target_markets.contains_key(&fill_event.market) { if target_markets.contains_key(&fill_event.market) {
fills_vector.push(fill_event); fills_vector.push(fill_event);
} }

View File

@ -1,3 +1,4 @@
use deadpool_postgres::Pool;
use futures::future::join_all; use futures::future::join_all;
use log::{debug, warn}; use log::{debug, warn};
use solana_client::{ use solana_client::{
@ -6,111 +7,113 @@ use solana_client::{
}; };
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature}; use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature};
use solana_transaction_status::UiTransactionEncoding; use solana_transaction_status::UiTransactionEncoding;
use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; use std::{collections::HashMap, time::Duration as WaitDuration};
use tokio::sync::mpsc::Sender;
use crate::{ use crate::{
structs::openbook::OpenBookFillEvent, database::{
utils::Config, fetch::fetch_worker_transactions,
worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL}, insert::{build_transactions_insert_statement, insert_fills_atomically},
},
structs::transaction::PgTransaction,
utils::{AnyhowWrap, OPENBOOK_KEY},
worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL, METRIC_TRANSACTIONS_TOTAL},
}; };
use super::parsing::parse_trades_from_openbook_txns; use super::parsing::parse_trades_from_openbook_txns;
pub async fn scrape( pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<()> {
config: &Config, let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, String>,
) {
let rpc_client =
RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::processed());
let before_slot = None;
loop { loop {
scrape_transactions( let rpc_config = GetConfirmedSignaturesForAddress2Config {
&rpc_client, before: None,
before_slot, until: None,
Some(150), limit: None,
fill_sender, commitment: Some(CommitmentConfig::confirmed()),
target_markets, };
)
.await; let sigs = match rpc_client
tokio::time::sleep(WaitDuration::from_millis(250)).await; .get_signatures_for_address_with_config(&OPENBOOK_KEY, rpc_config)
.await
{
Ok(sigs) => sigs,
Err(e) => {
warn!("rpc error in get_signatures_for_address_with_config: {}", e);
METRIC_RPC_ERRORS_TOTAL
.with_label_values(&["getSignaturesForAddress"])
.inc();
continue;
}
};
if sigs.is_empty() {
debug!("No signatures found, trying again");
continue;
}
let transactions: Vec<PgTransaction> = sigs
.into_iter()
.map(PgTransaction::from_rpc_confirmed_transaction)
.collect();
debug!("Scraper writing: {:?} txns to DB\n", transactions.len());
let upsert_statement = build_transactions_insert_statement(transactions);
let client = pool.get().await?;
let num_txns = client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()?;
METRIC_TRANSACTIONS_TOTAL.inc_by(num_txns);
} }
// TODO: graceful shutdown
} }
pub async fn scrape_transactions( pub async fn scrape_fills(
rpc_client: &RpcClient, worker_id: i32,
before_sig: Option<Signature>, rpc_url: String,
limit: Option<usize>, pool: &Pool,
fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, String>, target_markets: &HashMap<Pubkey, String>,
) -> Option<Signature> { ) -> anyhow::Result<()> {
let rpc_config = GetConfirmedSignaturesForAddress2Config { debug!("Worker {} started \n", worker_id);
before: before_sig, let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed());
until: None,
limit,
commitment: Some(CommitmentConfig::confirmed()),
};
let mut sigs = match rpc_client loop {
.get_signatures_for_address_with_config( let transactions = fetch_worker_transactions(worker_id, pool).await?;
&Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(), if transactions.is_empty() {
rpc_config, debug!("No signatures found by worker {}", worker_id);
) tokio::time::sleep(WaitDuration::from_secs(1)).await;
.await continue;
{ };
Ok(s) => s,
Err(e) => {
warn!("rpc error in get_signatures_for_address_with_config: {}", e);
METRIC_RPC_ERRORS_TOTAL
.with_label_values(&["getSignaturesForAddress"])
.inc();
return before_sig;
}
};
if sigs.is_empty() { // for each signature, fetch the transaction
debug!("No signatures found"); let txn_config = RpcTransactionConfig {
return before_sig; encoding: Some(UiTransactionEncoding::Json),
} commitment: Some(CommitmentConfig::confirmed()),
max_supported_transaction_version: Some(0),
};
let last = sigs.last().unwrap(); let sig_strings = transactions
let request_last_sig = Signature::from_str(&last.signature).unwrap(); .iter()
.map(|t| t.signature.clone())
.collect::<Vec<String>>();
sigs.retain(|sig| sig.err.is_none()); let signatures: Vec<_> = transactions
if sigs.last().is_none() { .into_iter()
return Some(request_last_sig); .map(|t| t.signature.parse::<Signature>().unwrap())
} .collect();
let txn_config = RpcTransactionConfig { let txn_futs: Vec<_> = signatures
encoding: Some(UiTransactionEncoding::Json), .iter()
commitment: Some(CommitmentConfig::confirmed()), .map(|s| rpc_client.get_transaction_with_config(s, txn_config))
max_supported_transaction_version: Some(0), .collect();
};
let signatures: Vec<_> = sigs let mut txns = join_all(txn_futs).await;
.into_iter()
.map(|sig| sig.signature.parse::<Signature>().unwrap())
.collect();
let txn_futs: Vec<_> = signatures let (fills, completed_sigs) =
.iter() parse_trades_from_openbook_txns(&mut txns, sig_strings, target_markets);
.map(|s| rpc_client.get_transaction_with_config(s, txn_config)) for fill in fills.iter() {
.collect();
let mut txns = join_all(txn_futs).await;
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
if !fills.is_empty() {
for fill in fills.into_iter() {
let market_name = target_markets.get(&fill.market).unwrap(); let market_name = target_markets.get(&fill.market).unwrap();
if let Err(_) = fill_sender.send(fill).await {
panic!("receiver dropped");
}
METRIC_FILLS_TOTAL.with_label_values(&[market_name]).inc(); METRIC_FILLS_TOTAL.with_label_values(&[market_name]).inc();
} }
// Write fills to the database, and update properly fetched transactions as processed
insert_fills_atomically(pool, worker_id, fills, completed_sigs).await?;
} }
Some(request_last_sig)
} }