refactor: remove scraper and use existing db schema

This commit is contained in:
Lou-Kamades 2023-06-14 02:19:27 -05:00
parent 01b22cec38
commit f74cec5d13
No known key found for this signature in database
GPG Key ID: 87A166E4D7C01F30
17 changed files with 95 additions and 933 deletions

2
.gitignore vendored
View File

@ -1,2 +1,4 @@
/target
.env
*.cer*
*.pks*

View File

@ -15,10 +15,6 @@ path = "src/worker/main.rs"
name = "server"
path = "src/server/main.rs"
[[bin]]
name = "backfill-trades"
path = "src/backfill-trades/main.rs"
[[bin]]
name = "backfill-candles"
path = "src/backfill-candles/main.rs"

View File

@ -1,141 +0,0 @@
use anchor_lang::prelude::Pubkey;
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use deadpool_postgres::Pool;
use log::debug;
use openbook_candles::{
database::{
initialize::{connect_to_database, setup_database},
insert::build_transactions_insert_statement,
},
structs::{
markets::{fetch_market_infos, load_markets},
transaction::{PgTransaction, NUM_TRANSACTION_PARTITIONS},
},
utils::{AnyhowWrap, Config, OPENBOOK_KEY},
worker::trade_fetching::scrape::scrape_fills,
};
use solana_client::{
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
};
use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature};
use std::{collections::HashMap, env, str::FromStr};
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
let args: Vec<String> = env::args().collect();
assert!(args.len() == 2);
let path_to_markets_json = &args[1];
// let num_days = args[2].parse::<i64>().unwrap(); // TODO: implement
let num_days = 1;
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
let config = Config {
rpc_url: rpc_url.clone(),
};
let markets = load_markets(path_to_markets_json);
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
let mut target_markets = HashMap::new();
for m in market_infos.clone() {
target_markets.insert(Pubkey::from_str(&m.address)?, m.name);
}
println!("{:?}", target_markets);
let pool = connect_to_database().await?;
setup_database(&pool).await?;
let mut handles = vec![];
let rpc_clone = rpc_url.clone();
let pool_clone = pool.clone();
handles.push(tokio::spawn(async move {
fetch_signatures(rpc_clone, &pool_clone, num_days)
.await
.unwrap();
}));
// Low priority improvement: batch fills into 1000's per worker
for id in 0..NUM_TRANSACTION_PARTITIONS {
let rpc_clone = rpc_url.clone();
let pool_clone = pool.clone();
let markets_clone = target_markets.clone();
handles.push(tokio::spawn(async move {
scrape_fills(id as i32, rpc_clone, &pool_clone, &markets_clone)
.await
.unwrap();
}));
}
// TODO: spawn status thread
futures::future::join_all(handles).await;
Ok(())
}
pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> anyhow::Result<()> {
let mut before_sig: Option<Signature> = None;
let mut now_time = Utc::now().timestamp();
let end_time = (Utc::now() - Duration::days(num_days)).timestamp();
let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
while now_time > end_time {
let rpc_config = GetConfirmedSignaturesForAddress2Config {
before: before_sig,
until: None,
limit: None,
commitment: Some(CommitmentConfig::confirmed()),
};
let sigs = match rpc_client
.get_signatures_for_address_with_config(&OPENBOOK_KEY, rpc_config)
.await
{
Ok(sigs) => sigs,
Err(e) => {
println!("Error fetching signatures: {}", e);
continue;
}
};
if sigs.is_empty() {
println!("No signatures found, trying again");
continue;
}
let last = sigs.last().unwrap();
let last_time = last.block_time.unwrap().clone();
let last_signature = last.signature.clone();
let transactions = sigs
.into_iter()
.map(|s| PgTransaction::from_rpc_confirmed_transaction(s))
.collect::<Vec<PgTransaction>>();
if transactions.is_empty() {
println!("No transactions found, trying again");
}
debug!("writing: {:?} txns to DB\n", transactions.len());
let upsert_statement = build_transactions_insert_statement(transactions);
let client = pool.get().await?;
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()?;
now_time = last_time;
before_sig = Some(Signature::from_str(&last_signature)?);
let time_left = backfill_time_left(now_time, end_time);
println!(
"{} minutes ~ {} days remaining in the backfill\n",
time_left.num_minutes(),
time_left.num_days()
);
}
Ok(())
}
fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration {
let naive_cur = NaiveDateTime::from_timestamp_millis(current_time * 1000).unwrap();
let naive_bf = NaiveDateTime::from_timestamp_millis(backfill_end * 1000).unwrap();
let cur_date = DateTime::<Utc>::from_utc(naive_cur, Utc);
let bf_date = DateTime::<Utc>::from_utc(naive_bf, Utc);
cur_date - bf_date
}

View File

@ -4,7 +4,6 @@ use crate::structs::{
openbook::PgOpenBookFill,
resolution::Resolution,
trader::PgTrader,
transaction::PgTransaction,
};
use chrono::{DateTime, Utc};
use deadpool_postgres::{GenericClient, Pool};
@ -16,13 +15,12 @@ pub async fn fetch_earliest_fill(
let client = pool.get().await?;
let stmt = r#"SELECT
time as "time!",
bid as "bid!",
maker as "maker!",
native_qty_paid as "native_qty_paid!",
native_qty_received as "native_qty_received!",
native_fee_or_rebate as "native_fee_or_rebate!"
from fills
block_datetime as "time",
bid as "bid",
maker as "maker",
price as "price",
size as "size"
from openbook.openbook_fill_events
where market = $1
and maker = true
ORDER BY time asc LIMIT 1"#;
@ -44,16 +42,15 @@ pub async fn fetch_fills_from(
let client = pool.get().await?;
let stmt = r#"SELECT
time as "time!",
bid as "bid!",
maker as "maker!",
native_qty_paid as "native_qty_paid!",
native_qty_received as "native_qty_received!",
native_fee_or_rebate as "native_fee_or_rebate!"
from fills
block_datetime as "time",
bid as "bid",
maker as "maker",
price as "price",
size as "size"
from openbook.openbook_fill_events
where market = $1
and time >= $2::timestamptz
and time < $3::timestamptz
and block_datetime >= $2::timestamptz
and block_datetime < $3::timestamptz
and maker = true
ORDER BY time asc"#;
@ -71,17 +68,17 @@ pub async fn fetch_latest_finished_candle(
let client = pool.get().await?;
let stmt = r#"SELECT
market_name as "market_name!",
start_time as "start_time!",
end_time as "end_time!",
resolution as "resolution!",
open as "open!",
close as "close!",
high as "high!",
low as "low!",
volume as "volume!",
complete as "complete!"
from candles
market_name as "market_name",
start_time as "start_time",
end_time as "end_time",
resolution as "resolution",
open as "open",
close as "close",
high as "high",
low as "low",
volume as "volume",
complete as "complete"
from openbook.candles
where market_name = $1
and resolution = $2
and complete = true
@ -107,17 +104,17 @@ pub async fn fetch_earliest_candles(
let client = pool.get().await?;
let stmt = r#"SELECT
market_name as "market_name!",
start_time as "start_time!",
end_time as "end_time!",
market_name as "market_name",
start_time as "start_time",
end_time as "end_time",
resolution as "resolution!",
open as "open!",
close as "close!",
high as "high!",
low as "low!",
volume as "volume!",
complete as "complete!"
from candles
open as "open",
close as "close",
high as "high",
low as "low",
volume as "volume",
complete as "complete"
from openbook.candles
where market_name = $1
and resolution = $2
ORDER BY start_time asc"#;
@ -139,17 +136,17 @@ pub async fn fetch_candles_from(
let client = pool.get().await?;
let stmt = r#"SELECT
market_name as "market_name!",
start_time as "start_time!",
end_time as "end_time!",
resolution as "resolution!",
open as "open!",
close as "close!",
high as "high!",
low as "low!",
volume as "volume!",
complete as "complete!"
from candles
market_name as "market_name",
start_time as "start_time",
end_time as "end_time",
resolution as "resolution",
open as "open",
close as "close",
high as "high",
low as "low",
volume as "volume",
complete as "complete"
from openbook.candles
where market_name = $1
and resolution = $2
and start_time >= $3
@ -182,20 +179,20 @@ pub async fn fetch_top_traders_by_base_volume_from(
let stmt = r#"SELECT
open_orders_owner,
sum(
native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
) as "raw_ask_size!",
native_quantity_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
) as "raw_ask_size",
sum(
native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
) as "raw_bid_size!"
FROM fills
native_quantity_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
) as "raw_bid_size"
FROM openbook.openbook_fill_events
WHERE market = $1
AND time >= $2
AND time < $3
GROUP BY open_orders_owner
ORDER BY
sum(native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
sum(native_quantity_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
+
sum(native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
sum(native_quantity_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
DESC
LIMIT 10000"#;
@ -217,20 +214,20 @@ pub async fn fetch_top_traders_by_quote_volume_from(
let stmt = r#"SELECT
open_orders_owner,
sum(
native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
) as "raw_ask_size!",
native_quantity_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
) as "raw_ask_size",
sum(
native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
) as "raw_bid_size!"
FROM fills
native_quantity_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
) as "raw_bid_size"
FROM openbook.openbook_fill_events
WHERE market = $1
AND time >= $2
AND time < $3
GROUP BY open_orders_owner
ORDER BY
sum(native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
sum(native_quantity_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
+
sum(native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
sum(native_quantity_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
DESC
LIMIT 10000"#;
@ -249,20 +246,20 @@ pub async fn fetch_coingecko_24h_volume(
let stmt = r#"SELECT
t1.market,
COALESCE(t2.native_qty_received, 0) as "raw_base_size!",
COALESCE(t2.native_qty_paid, 0) as "raw_quote_size!"
COALESCE(t2.native_quantity_received, 0) as "raw_base_size",
COALESCE(t2.native_quantity_paid, 0) as "raw_quote_size"
FROM (
SELECT distinct on (market) *
FROM fills f
FROM openbook.openbook_fill_events f
where bid = true
and market = any($1)
order by market, "time" desc
) t1
LEFT JOIN (
select market,
sum(native_qty_received) as "native_qty_received",
sum(native_qty_paid) as "native_qty_paid"
from fills
sum(native_quantity_received) as "native_quantity_received",
sum(native_quantity_paid) as "native_quantity_paid"
from openbook.openbook_fill_events
where "time" >= current_timestamp - interval '1 day'
and bid = true
group by market
@ -291,10 +288,10 @@ pub async fn fetch_coingecko_24h_high_low(
(
SELECT *
from
candles
openbook.candles
where (market_name, start_time, resolution) in (
select market_name, max(start_time), resolution
from candles
from openbook.candles
where "resolution" = '1M'
and market_name = any($1)
group by market_name, resolution
@ -307,7 +304,7 @@ pub async fn fetch_coingecko_24h_high_low(
max(high) as "high",
min(low) as "low"
from
candles
openbook.candles
where
"resolution" = '1M'
and "start_time" >= current_timestamp - interval '1 day'
@ -321,23 +318,3 @@ pub async fn fetch_coingecko_24h_high_low(
.map(PgCoinGecko24HighLow::from_row)
.collect())
}
/// Fetches unprocessed, non-error transactions for the specified worker partition.
/// Pulls at most 50 transactions at a time.
pub async fn fetch_worker_transactions(
worker_id: i32,
pool: &Pool,
) -> anyhow::Result<Vec<PgTransaction>> {
let client = pool.get().await?;
let stmt = r#"SELECT signature, program_pk, block_datetime, slot, err, "processed", worker_partition
FROM transactions
where worker_partition = $1
and err = false
and processed = false
LIMIT 50"#;
let rows = client.query(stmt, &[&worker_id]).await?;
Ok(rows.into_iter().map(PgTransaction::from_row).collect())
}

View File

@ -21,8 +21,8 @@ pub async fn connect_to_database() -> anyhow::Result<Pool> {
// openssl pkcs12 -export -in client.cer -inkey client-key.cer -out client.pks
// base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64
// fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a mango-fills
// fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills
// fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a APP-NAME
// fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a APP-NAME
let tls = if pg_config.pg_use_ssl {
pg_config.pg.ssl_mode = Some(SslMode::Require);
let ca_cert = fs::read(pg_config.pg_ca_cert_path.expect("reading ca cert from env"))
@ -66,11 +66,7 @@ pub async fn connect_to_database() -> anyhow::Result<Pool> {
}
pub async fn setup_database(pool: &Pool) -> anyhow::Result<()> {
let candles_table_fut = create_candles_table(pool);
let transactions_table_fut = create_transactions_table(pool);
let fills_table_fut = create_fills_table(pool);
let result = tokio::try_join!(candles_table_fut, transactions_table_fut, fills_table_fut);
match result {
match create_candles_table(pool).await {
Ok(_) => {
println!("Successfully configured database");
Ok(())
@ -87,7 +83,7 @@ pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> {
client
.execute(
"CREATE TABLE IF NOT EXISTS candles (
"CREATE TABLE IF NOT EXISTS openbook.candles (
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
market_name text,
start_time timestamptz,
@ -105,90 +101,9 @@ pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> {
.await?;
client.execute(
"CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market_name, start_time, resolution)",
"CREATE UNIQUE INDEX idx_market_time_resolution ON openbook.candles USING btree (market_name, start_time, resolution);",
&[]
).await?;
client.execute(
"DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'unique_candles') THEN
ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market_name, start_time, resolution);
END IF;
END $$", &[]
).await?;
Ok(())
}
pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> {
let client = pool.get().await?;
client
.execute(
"CREATE TABLE IF NOT EXISTS fills (
signature text not null,
time timestamptz not null,
market text not null,
open_orders text not null,
open_orders_owner text not null,
bid bool not null,
maker bool not null,
native_qty_paid double precision not null,
native_qty_received double precision not null,
native_fee_or_rebate double precision not null,
fee_tier text not null,
order_id text not null,
log_index int4 not null,
CONSTRAINT fills_pk PRIMARY KEY (signature, log_index)
)",
&[],
)
.await?;
client
.execute(
"CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)",
&[],
)
.await?;
Ok(())
}
pub async fn create_transactions_table(pool: &Pool) -> anyhow::Result<()> {
let client = pool.get().await?;
client
.execute(
"CREATE TABLE IF NOT EXISTS transactions (
signature text NOT NULL,
program_pk text NOT NULL,
block_datetime timestamptz NOT NULL,
slot bigint NOT NULL,
err bool NOT NULL,
processed bool NOT NULL,
worker_partition int4 NOT NULL,
CONSTRAINT transactions_pk PRIMARY KEY (signature, worker_partition)
) PARTITION BY LIST (worker_partition);",
&[],
)
.await?;
client.batch_execute(
"CREATE INDEX IF NOT EXISTS transactions_processed_err_idx ON ONLY transactions (signature) WHERE processed IS NOT TRUE and err IS NOT TRUE;
CREATE INDEX IF NOT EXISTS transactions_program_pk_idx ON ONLY transactions USING btree (program_pk, slot DESC);
CREATE TABLE IF NOT EXISTS transactions_0 PARTITION OF transactions FOR VALUES IN (0);
CREATE TABLE IF NOT EXISTS transactions_1 PARTITION OF transactions FOR VALUES IN (1);
CREATE TABLE IF NOT EXISTS transactions_2 PARTITION OF transactions FOR VALUES IN (2);
CREATE TABLE IF NOT EXISTS transactions_3 PARTITION OF transactions FOR VALUES IN (3);
CREATE TABLE IF NOT EXISTS transactions_4 PARTITION OF transactions FOR VALUES IN (4);
CREATE TABLE IF NOT EXISTS transactions_5 PARTITION OF transactions FOR VALUES IN (5);
CREATE TABLE IF NOT EXISTS transactions_6 PARTITION OF transactions FOR VALUES IN (6);
CREATE TABLE IF NOT EXISTS transactions_7 PARTITION OF transactions FOR VALUES IN (7);
CREATE TABLE IF NOT EXISTS transactions_8 PARTITION OF transactions FOR VALUES IN (8);
CREATE TABLE IF NOT EXISTS transactions_9 PARTITION OF transactions FOR VALUES IN (9);"
).await?;
Ok(())
}

View File

@ -1,153 +1,7 @@
use deadpool_postgres::Pool;
use log::debug;
use std::collections::HashMap;
use tokio::sync::mpsc::{error::TryRecvError, Receiver};
use crate::{
structs::{candle::Candle, openbook::OpenBookFillEvent, transaction::PgTransaction},
utils::{to_timestampz, AnyhowWrap},
};
pub async fn insert_fills_atomically(
pool: &Pool,
worker_id: i32,
fills: Vec<OpenBookFillEvent>,
signatures: Vec<String>,
) -> anyhow::Result<()> {
let mut client = pool.get().await?;
let db_txn = client.build_transaction().start().await?;
// 1. Insert fills
if fills.len() > 0 {
let fills_statement = build_fills_upsert_statement_not_crazy(fills);
db_txn
.execute(&fills_statement, &[])
.await
.map_err_anyhow()
.unwrap();
}
// 2. Update txns table as processed
let transactions_statement =
build_transactions_processed_update_statement(worker_id, signatures);
db_txn
.execute(&transactions_statement, &[])
.await
.map_err_anyhow()
.unwrap();
db_txn.commit().await?;
Ok(())
}
pub async fn persist_fill_events(
pool: &Pool,
fill_receiver: &mut Receiver<OpenBookFillEvent>,
) -> anyhow::Result<()> {
loop {
let mut write_batch = HashMap::new();
while write_batch.len() < 10 {
match fill_receiver.try_recv() {
Ok(event) => {
write_batch.entry(event).or_insert(0);
}
Err(TryRecvError::Empty) => {
if !write_batch.is_empty() {
break;
} else {
continue;
}
}
Err(TryRecvError::Disconnected) => {
panic!("Fills sender must stay alive")
}
};
}
if !write_batch.is_empty() {
debug!("writing: {:?} events to DB\n", write_batch.len());
let upsert_statement = build_fills_upsert_statement(write_batch);
let client = pool.get().await?;
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()
.unwrap();
}
}
}
#[allow(deprecated)]
fn build_fills_upsert_statement(events: HashMap<OpenBookFillEvent, u8>) -> String {
let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES");
for (idx, event) in events.keys().enumerate() {
let val_str = format!(
"({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})",
event.signature,
to_timestampz(event.block_time as u64).to_rfc3339(),
event.market,
event.open_orders,
event.open_orders_owner,
event.bid,
event.maker,
event.native_qty_paid,
event.native_qty_received,
event.native_fee_or_rebate,
event.fee_tier,
event.order_id,
event.log_index,
);
if idx == 0 {
stmt = format!("{} {}", &stmt, val_str);
} else {
stmt = format!("{}, {}", &stmt, val_str);
}
}
let handle_conflict = "ON CONFLICT DO NOTHING";
stmt = format!("{} {}", stmt, handle_conflict);
stmt
}
fn build_fills_upsert_statement_not_crazy(fills: Vec<OpenBookFillEvent>) -> String {
let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES");
for (idx, fill) in fills.iter().enumerate() {
let val_str = format!(
"(\'{}\', \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})",
fill.signature,
to_timestampz(fill.block_time as u64).to_rfc3339(),
fill.market,
fill.open_orders,
fill.open_orders_owner,
fill.bid,
fill.maker,
fill.native_qty_paid,
fill.native_qty_received,
fill.native_fee_or_rebate,
fill.fee_tier,
fill.order_id,
fill.log_index,
);
if idx == 0 {
stmt = format!("{} {}", &stmt, val_str);
} else {
stmt = format!("{}, {}", &stmt, val_str);
}
}
let handle_conflict = "ON CONFLICT DO NOTHING";
stmt = format!("{} {}", stmt, handle_conflict);
stmt
}
use crate::structs::candle::Candle;
pub fn build_candles_upsert_statement(candles: &Vec<Candle>) -> String {
let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES");
let mut stmt = String::from("INSERT INTO openbook.candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES");
for (idx, candle) in candles.iter().enumerate() {
let val_str = format!(
"(\'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {})",
@ -183,54 +37,3 @@ pub fn build_candles_upsert_statement(candles: &Vec<Candle>) -> String {
stmt = format!("{} {}", stmt, handle_conflict);
stmt
}
pub fn build_transactions_insert_statement(transactions: Vec<PgTransaction>) -> String {
let mut stmt = String::from("INSERT INTO transactions (signature, program_pk, block_datetime, slot, err, processed, worker_partition) VALUES");
for (idx, txn) in transactions.iter().enumerate() {
let val_str = format!(
"(\'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {})",
txn.signature,
txn.program_pk,
txn.block_datetime.to_rfc3339(),
txn.slot,
txn.err,
txn.processed,
txn.worker_partition,
);
if idx == 0 {
stmt = format!("{} {}", &stmt, val_str);
} else {
stmt = format!("{}, {}", &stmt, val_str);
}
}
let handle_conflict = "ON CONFLICT DO NOTHING";
stmt = format!("{} {}", stmt, handle_conflict);
stmt
}
pub fn build_transactions_processed_update_statement(
worker_id: i32,
processed_signatures: Vec<String>,
) -> String {
let mut stmt = String::from(
"UPDATE transactions
SET processed = true
WHERE transactions.signature IN (",
);
for (idx, sig) in processed_signatures.iter().enumerate() {
let val_str = if idx == processed_signatures.len() - 1 {
format!("\'{}\'", sig,)
} else {
format!("\'{}\',", sig,)
};
stmt = format!("{} {}", &stmt, val_str);
}
let worker_stmt = format!(") AND worker_partition = {} ", worker_id);
stmt = format!("{} {}", stmt, worker_stmt);
stmt
}

View File

@ -6,4 +6,3 @@ pub mod resolution;
pub mod slab;
pub mod trader;
pub mod tradingview;
pub mod transaction;

View File

@ -1,83 +1,15 @@
use anchor_lang::{event, AnchorDeserialize, AnchorSerialize};
use anchor_lang::AnchorDeserialize;
use chrono::{DateTime, Utc};
use num_traits::Pow;
use solana_sdk::pubkey::Pubkey;
use tokio_postgres::Row;
#[event]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OpenBookFillEventRaw {
pub market: Pubkey,
pub open_orders: Pubkey,
pub open_orders_owner: Pubkey,
pub bid: bool,
pub maker: bool,
pub native_qty_paid: u64,
pub native_qty_received: u64,
pub native_fee_or_rebate: u64,
pub order_id: u128,
pub owner_slot: u8,
pub fee_tier: u8,
pub client_order_id: Option<u64>,
pub referrer_rebate: Option<u64>,
}
impl OpenBookFillEventRaw {
pub fn into_event(
self,
signature: String,
block_time: i64,
log_index: usize,
) -> OpenBookFillEvent {
OpenBookFillEvent {
signature,
market: self.market,
open_orders: self.open_orders,
open_orders_owner: self.open_orders_owner,
bid: self.bid,
maker: self.maker,
native_qty_paid: self.native_qty_paid,
native_qty_received: self.native_qty_received,
native_fee_or_rebate: self.native_fee_or_rebate,
order_id: self.order_id,
owner_slot: self.owner_slot,
fee_tier: self.fee_tier,
client_order_id: self.client_order_id,
referrer_rebate: self.referrer_rebate,
block_time,
log_index,
}
}
}
#[event]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OpenBookFillEvent {
pub signature: String,
pub market: Pubkey,
pub open_orders: Pubkey,
pub open_orders_owner: Pubkey,
pub bid: bool,
pub maker: bool,
pub native_qty_paid: u64,
pub native_qty_received: u64,
pub native_fee_or_rebate: u64,
pub order_id: u128,
pub owner_slot: u8,
pub fee_tier: u8,
pub client_order_id: Option<u64>,
pub referrer_rebate: Option<u64>,
pub block_time: i64,
pub log_index: usize,
}
#[derive(Copy, Clone, Debug, PartialEq)]
pub struct PgOpenBookFill {
pub time: DateTime<Utc>,
pub bid: bool,
pub maker: bool,
pub native_qty_paid: f64,
pub native_qty_received: f64,
pub native_fee_or_rebate: f64,
pub price: f64,
pub size: f64,
}
impl PgOpenBookFill {
pub fn from_row(row: Row) -> Self {
@ -85,9 +17,8 @@ impl PgOpenBookFill {
time: row.get(0),
bid: row.get(1),
maker: row.get(2),
native_qty_paid: row.get(3),
native_qty_received: row.get(4),
native_fee_or_rebate: row.get(5),
price: row.get(3),
size: row.get(4),
}
}
}
@ -147,34 +78,6 @@ pub struct MarketState {
pub referrer_rebates_accrued: u64,
}
pub fn calculate_fill_price_and_size(
fill: PgOpenBookFill,
base_decimals: u8,
quote_decimals: u8,
) -> (f64, f64) {
if fill.bid {
let price_before_fees = if fill.maker {
fill.native_qty_paid + fill.native_fee_or_rebate
} else {
fill.native_qty_paid - fill.native_fee_or_rebate
};
let price = (price_before_fees * token_factor(base_decimals))
/ (token_factor(quote_decimals) * fill.native_qty_received);
let size = fill.native_qty_received / token_factor(base_decimals);
(price, size)
} else {
let price_before_fees = if fill.maker {
fill.native_qty_received - fill.native_fee_or_rebate
} else {
fill.native_qty_received + fill.native_fee_or_rebate
};
let price = (price_before_fees * token_factor(base_decimals))
/ (token_factor(quote_decimals) * fill.native_qty_paid);
let size = fill.native_qty_paid / token_factor(base_decimals);
(price, size)
}
}
pub fn token_factor(decimals: u8) -> f64 {
10f64.pow(decimals as f64)
}

View File

@ -9,8 +9,8 @@ use super::openbook::token_factor;
#[derive(Clone, Debug, PartialEq)]
pub struct PgTrader {
pub open_orders_owner: String,
pub raw_ask_size: f64,
pub raw_bid_size: f64,
pub raw_ask_size: i64,
pub raw_bid_size: i64,
}
impl PgTrader {
pub fn from_row(row: Row) -> Self {
@ -52,8 +52,8 @@ pub struct TraderResponse {
// Note that the Postgres queries only return volumes in base or quote
pub fn calculate_trader_volume(trader: PgTrader, decimals: u8) -> Trader {
let bid_size = trader.raw_bid_size / token_factor(decimals);
let ask_size = trader.raw_ask_size / token_factor(decimals);
let bid_size = (trader.raw_bid_size as f64) / token_factor(decimals);
let ask_size = (trader.raw_ask_size as f64) / token_factor(decimals);
Trader {
pubkey: trader.open_orders_owner,

View File

@ -1,52 +0,0 @@
use chrono::{DateTime, Utc};
use solana_client::rpc_response::RpcConfirmedTransactionStatusWithSignature;
use tokio_postgres::Row;
use crate::utils::{to_timestampz, OPENBOOK_KEY};
#[derive(Clone, Debug, PartialEq)]
pub struct PgTransaction {
pub signature: String,
pub program_pk: String,
pub block_datetime: DateTime<Utc>,
pub slot: u64,
pub err: bool,
pub processed: bool,
pub worker_partition: i32,
}
pub const NUM_TRANSACTION_PARTITIONS: u64 = 10;
impl PgTransaction {
pub fn from_rpc_confirmed_transaction(
rpc_confirmed_transaction: RpcConfirmedTransactionStatusWithSignature,
) -> Self {
PgTransaction {
signature: rpc_confirmed_transaction.signature,
program_pk: OPENBOOK_KEY.to_string(),
block_datetime: to_timestampz(rpc_confirmed_transaction.block_time.unwrap() as u64),
slot: rpc_confirmed_transaction.slot,
err: rpc_confirmed_transaction.err.is_some(),
processed: false,
worker_partition: (rpc_confirmed_transaction.slot % NUM_TRANSACTION_PARTITIONS) as i32,
}
}
pub fn from_row(row: Row) -> Self {
let slot_raw = row.get::<usize, i64>(3);
PgTransaction {
signature: row.get(0),
program_pk: row.get(1),
block_datetime: row.get(2),
slot: slot_raw as u64,
err: row.get(4),
processed: row.get(5),
worker_partition: row.get(6),
}
}
}
pub enum ProcessState {
Processed,
Unprocessed,
}

View File

@ -9,7 +9,7 @@ use crate::{
structs::{
candle::Candle,
markets::MarketInfo,
openbook::{calculate_fill_price_and_size, PgOpenBookFill},
openbook::PgOpenBookFill,
resolution::{day, Resolution},
},
utils::{f64_max, f64_min},
@ -86,9 +86,7 @@ fn combine_fills_into_1m_candles(
Some(p) => p,
None => {
let first = fills_iter.peek().unwrap();
let (price, _) =
calculate_fill_price_and_size(**first, market.base_decimals, market.quote_decimals);
price
first.price
}
};
@ -100,15 +98,13 @@ fn combine_fills_into_1m_candles(
while matches!(fills_iter.peek(), Some(f) if f.time < end_time) {
let fill = fills_iter.next().unwrap();
let (price, volume) =
calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals);
candles[i].close = price;
candles[i].low = f64_min(price, candles[i].low);
candles[i].high = f64_max(price, candles[i].high);
candles[i].volume += volume;
candles[i].close = fill.price;
candles[i].low = f64_min(fill.price, candles[i].low);
candles[i].high = f64_max(fill.price, candles[i].high);
candles[i].volume += fill.size;
last_price = price;
last_price = fill.price;
}
candles[i].start_time = start_time;

View File

@ -56,7 +56,7 @@ async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
.with_label_values(&[market.name.as_str()])
.inc_by(candles.clone().len() as u64);
save_candles(pool, candles).await?;
}
}
Ok(())
}

View File

@ -1,11 +1,9 @@
use log::{error, info};
use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
use openbook_candles::structs::transaction::NUM_TRANSACTION_PARTITIONS;
use openbook_candles::utils::Config;
use openbook_candles::worker::metrics::{
serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE,
};
use openbook_candles::worker::trade_fetching::scrape::{scrape_fills, scrape_signatures};
use openbook_candles::{
database::initialize::{connect_to_database, setup_database},
worker::candle_batching::batch_for_market,
@ -40,25 +38,6 @@ async fn main() -> anyhow::Result<()> {
setup_database(&pool).await?;
let mut handles = vec![];
// signature scraping
let rpc_clone = rpc_url.clone();
let pool_clone = pool.clone();
handles.push(tokio::spawn(async move {
scrape_signatures(rpc_clone, &pool_clone).await.unwrap();
}));
// transaction/fill scraping
for id in 0..NUM_TRANSACTION_PARTITIONS {
let rpc_clone = rpc_url.clone();
let pool_clone = pool.clone();
let markets_clone = target_markets.clone();
handles.push(tokio::spawn(async move {
scrape_fills(id as i32, rpc_clone, &pool_clone, &markets_clone)
.await
.unwrap();
}));
}
// candle batching
for market in market_infos.into_iter() {
let batch_pool = pool.clone();

View File

@ -1,3 +1,2 @@
pub mod candle_batching;
pub mod metrics;
pub mod trade_fetching;

View File

@ -1,2 +0,0 @@
pub mod parsing;
pub mod scrape;

View File

@ -1,91 +0,0 @@
use log::warn;
use solana_client::client_error::Result as ClientResult;
use solana_sdk::pubkey::Pubkey;
use solana_transaction_status::{
option_serializer::OptionSerializer, EncodedConfirmedTransactionWithStatusMeta,
};
use std::{collections::HashMap, io::Error};
use crate::{
structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw},
worker::metrics::METRIC_RPC_ERRORS_TOTAL,
};
const PROGRAM_DATA: &str = "Program data: ";
pub fn parse_trades_from_openbook_txns(
txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>,
sig_strings: &Vec<String>,
target_markets: &HashMap<Pubkey, String>,
) -> Vec<OpenBookFillEvent> {
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
for (idx, txn) in txns.iter_mut().enumerate() {
match txn {
Ok(t) => {
if let Some(m) = &t.transaction.meta {
match &m.log_messages {
OptionSerializer::Some(logs) => {
match parse_openbook_fills_from_logs(
logs,
target_markets,
sig_strings[idx].clone(),
t.block_time.unwrap(),
) {
Some(mut events) => fills_vector.append(&mut events),
None => {}
}
}
OptionSerializer::None => {}
OptionSerializer::Skip => {}
}
}
}
Err(e) => {
warn!("rpc error in get_transaction {}", e);
METRIC_RPC_ERRORS_TOTAL
.with_label_values(&["getTransaction"])
.inc();
}
}
}
fills_vector
}
fn parse_openbook_fills_from_logs(
logs: &Vec<String>,
target_markets: &HashMap<Pubkey, String>,
signature: String,
block_time: i64,
) -> Option<Vec<OpenBookFillEvent>> {
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
for (idx, l) in logs.iter().enumerate() {
match l.strip_prefix(PROGRAM_DATA) {
Some(log) => {
let borsh_bytes = match anchor_lang::__private::base64::decode(log) {
Ok(borsh_bytes) => borsh_bytes,
_ => continue,
};
let mut slice: &[u8] = &borsh_bytes[8..];
let event: Result<OpenBookFillEventRaw, Error> =
anchor_lang::AnchorDeserialize::deserialize(&mut slice);
match event {
Ok(e) => {
let fill_event = e.into_event(signature.clone(), block_time, idx);
if target_markets.contains_key(&fill_event.market) {
fills_vector.push(fill_event);
}
}
_ => continue,
}
}
_ => (),
}
}
if !fills_vector.is_empty() {
Some(fills_vector)
} else {
None
}
}

View File

@ -1,121 +0,0 @@
use deadpool_postgres::Pool;
use futures::future::join_all;
use log::{debug, info, warn};
use solana_client::{
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
rpc_config::RpcTransactionConfig,
};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature};
use solana_transaction_status::UiTransactionEncoding;
use std::{collections::HashMap, str::FromStr};
use tokio::sync::mpsc::Sender;
use crate::{
database::{
fetch::fetch_worker_transactions,
insert::{build_transactions_insert_statement, insert_fills_atomically},
},
structs::{openbook::OpenBookFillEvent, transaction::PgTransaction},
utils::{AnyhowWrap, Config, OPENBOOK_KEY},
worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL, METRIC_TRANSACTIONS_TOTAL},
};
use super::parsing::parse_trades_from_openbook_txns;
pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<()> {
let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
loop {
let rpc_config = GetConfirmedSignaturesForAddress2Config {
before: None,
until: None,
limit: None,
commitment: Some(CommitmentConfig::confirmed()),
};
let sigs = match rpc_client
.get_signatures_for_address_with_config(&OPENBOOK_KEY, rpc_config)
.await
{
Ok(sigs) => sigs,
Err(e) => {
warn!("rpc error in get_signatures_for_address_with_config: {}", e);
METRIC_RPC_ERRORS_TOTAL
.with_label_values(&["getSignaturesForAddress"])
.inc();
continue;
}
};
if sigs.is_empty() {
debug!("No signatures found, trying again");
continue;
}
let transactions: Vec<PgTransaction> = sigs
.into_iter()
.map(|s| PgTransaction::from_rpc_confirmed_transaction(s))
.collect();
debug!("Scraper writing: {:?} txns to DB\n", transactions.len());
let upsert_statement = build_transactions_insert_statement(transactions);
let client = pool.get().await?;
let num_txns = client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()?;
METRIC_TRANSACTIONS_TOTAL.inc_by(num_txns);
}
// TODO: graceful shutdown
Ok(())
}
pub async fn scrape_fills(
worker_id: i32,
rpc_url: String,
pool: &Pool,
target_markets: &HashMap<Pubkey, String>,
) -> anyhow::Result<()> {
debug!("Worker {} started \n", worker_id);
let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed());
loop {
let transactions = fetch_worker_transactions(worker_id, pool).await?;
if transactions.len() == 0 {
debug!("No signatures found by worker {}", worker_id);
tokio::time::sleep(WaitDuration::from_secs(1)).await;
continue;
};
// for each signature, fetch the transaction
let txn_config = RpcTransactionConfig {
encoding: Some(UiTransactionEncoding::Json),
commitment: Some(CommitmentConfig::confirmed()),
max_supported_transaction_version: Some(0),
};
let sig_strings = transactions
.iter()
.map(|t| t.signature.clone())
.collect::<Vec<String>>();
let signatures: Vec<_> = transactions
.into_iter()
.map(|t| t.signature.parse::<Signature>().unwrap())
.collect();
let txn_futs: Vec<_> = signatures
.iter()
.map(|s| rpc_client.get_transaction_with_config(s, txn_config))
.collect();
let mut txns = join_all(txn_futs).await;
// TODO: reenable total fills metric
let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets);
// Write any fills to the database, and update the transactions as processed
insert_fills_atomically(pool, worker_id, fills, sig_strings).await?;
}
Ok(())
}