Compare commits

...

5 Commits

Author SHA1 Message Date
Lou-Kamades a0b32621a1
Merge pull request #3 from blockworks-foundation/remove-scraper
Remove scraper
2023-07-26 20:26:04 -05:00
Lou-Kamades 7bba73b7f3
refactor: make backfilling faster 2023-07-26 20:25:19 -05:00
Lou-Kamades 34dc341059
Merge branch 'main' into remove-scraper 2023-07-26 10:42:41 -05:00
Lou-Kamades 89915bf249
bugfixes and performance improvements 2023-06-15 01:50:24 -05:00
Lou-Kamades f74cec5d13
refactor: remove scraper and use existing db schema 2023-06-14 02:19:27 -05:00
23 changed files with 396 additions and 992 deletions

36
Cargo.lock generated
View File

@ -2615,6 +2615,15 @@ dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "1.0.5"
@ -3450,6 +3459,7 @@ dependencies = [
"dotenv",
"env_logger 0.10.0",
"futures 0.3.27",
"itertools 0.11.0",
"jsonrpc-core-client",
"lazy_static",
"log 0.4.17",
@ -4014,7 +4024,7 @@ checksum = "a3f8ad728fb08fe212df3c05169e940fbb6d9d16a877ddde14644a983ba2012e"
dependencies = [
"bytes 1.3.0",
"heck 0.4.0",
"itertools",
"itertools 0.10.5",
"lazy_static",
"log 0.4.17",
"multimap",
@ -4035,7 +4045,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bda8c0881ea9f722eb9629376db3d0b903b462477c1aafcb0566610ac28ac5d"
dependencies = [
"anyhow",
"itertools",
"itertools 0.10.5",
"proc-macro2 1.0.56",
"quote 1.0.26",
"syn 1.0.109",
@ -4877,7 +4887,7 @@ dependencies = [
"default-env",
"enumflags2",
"field-offset",
"itertools",
"itertools 0.10.5",
"num-traits",
"num_enum 0.5.7",
"safe-transmute",
@ -5252,7 +5262,7 @@ dependencies = [
"futures-util",
"indexmap",
"indicatif",
"itertools",
"itertools 0.10.5",
"jsonrpc-core",
"lazy_static",
"log 0.4.17",
@ -5416,7 +5426,7 @@ dependencies = [
"crossbeam-channel",
"flate2",
"indexmap",
"itertools",
"itertools 0.10.5",
"log 0.4.17",
"lru",
"matches",
@ -5465,7 +5475,7 @@ dependencies = [
"dashmap",
"fs_extra",
"futures 0.3.27",
"itertools",
"itertools 0.10.5",
"lazy_static",
"libc",
"log 0.4.17",
@ -5643,7 +5653,7 @@ dependencies = [
"console_log",
"curve25519-dalek",
"getrandom 0.2.8",
"itertools",
"itertools 0.10.5",
"js-sys",
"lazy_static",
"libc",
@ -5682,7 +5692,7 @@ dependencies = [
"bincode",
"eager",
"enum-iterator",
"itertools",
"itertools 0.10.5",
"libc",
"libloading",
"log 0.4.17",
@ -5739,7 +5749,7 @@ dependencies = [
"bs58 0.4.0",
"crossbeam-channel",
"dashmap",
"itertools",
"itertools 0.10.5",
"jsonrpc-core",
"jsonrpc-core-client",
"jsonrpc-derive",
@ -5801,7 +5811,7 @@ dependencies = [
"fnv",
"im",
"index_list",
"itertools",
"itertools 0.10.5",
"lazy_static",
"log 0.4.17",
"lru",
@ -5863,7 +5873,7 @@ dependencies = [
"ed25519-dalek-bip32",
"generic-array",
"hmac 0.12.1",
"itertools",
"itertools 0.10.5",
"js-sys",
"lazy_static",
"libsecp256k1",
@ -6011,7 +6021,7 @@ dependencies = [
"futures-util",
"histogram",
"indexmap",
"itertools",
"itertools 0.10.5",
"libc",
"log 0.4.17",
"nix",
@ -6143,7 +6153,7 @@ dependencies = [
"cipher 0.4.3",
"curve25519-dalek",
"getrandom 0.1.16",
"itertools",
"itertools 0.10.5",
"lazy_static",
"merlin",
"num-derive",

View File

@ -15,10 +15,6 @@ path = "src/worker/main.rs"
name = "server"
path = "src/server/main.rs"
[[bin]]
name = "backfill-trades"
path = "src/backfill-trades/main.rs"
[[bin]]
name = "backfill-candles"
path = "src/backfill-candles/main.rs"
@ -71,3 +67,4 @@ num_enum = "0.6.1"
config = "0.13.1"
prometheus = "0.13.3"
lazy_static = "1.4.0"
itertools = "0.11.0"

View File

@ -296,7 +296,7 @@ Returns 24-hour pricing and volume information on each market available.
**Request:**
`GET /api/coingecko/orderbook/?ticker_id={ticker_id}&depth={depth}`
`GET /api/coingecko/orderbook?ticker_id={ticker_id}&depth={depth}`
Returns order book information with a specified depth for a given market.

View File

@ -1,20 +1,17 @@
use deadpool_postgres::Object;
use openbook_candles::{
database::{initialize::connect_to_database, insert::build_candles_upsert_statement},
database::{initialize::connect_to_database},
structs::{
candle::Candle,
markets::{fetch_market_infos, load_markets},
resolution::Resolution,
},
utils::{AnyhowWrap, Config},
utils::{Config},
worker::candle_batching::{
higher_order_candles::backfill_batch_higher_order_candles,
minute_candles::backfill_batch_1m_candles,
higher_order_candles::backfill_batch_higher_order_candles, minute_candles::backfill_batch_1m_candles,
},
};
use std::env;
use strum::IntoEnumIterator;
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> {
@ -33,31 +30,19 @@ async fn main() -> anyhow::Result<()> {
println!("Backfilling candles for {:?}", markets);
let pool = connect_to_database().await?;
for market in market_infos.into_iter() {
let client = pool.get().await?;
let minute_candles = backfill_batch_1m_candles(&pool, &market).await?;
save_candles(minute_candles, client).await?;
backfill_batch_1m_candles(&pool, market_infos.clone()).await?;
for resolution in Resolution::iter() {
if resolution == Resolution::R1m {
continue;
}
let higher_order_candles =
backfill_batch_higher_order_candles(&pool, &market.name, resolution).await?;
let client = pool.get().await?;
save_candles(higher_order_candles, client).await?;
}
}
Ok(())
}
async fn save_candles(candles: Vec<Candle>, client: Object) -> anyhow::Result<()> {
if !candles.is_empty() {
let upsert_statement = build_candles_upsert_statement(&candles);
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()?;
let mut handles = vec![];
let mi = market_infos.clone();
for market in mi.into_iter() {
let pc = pool.clone();
handles.push(tokio::spawn(async move {
backfill_batch_higher_order_candles(&pc, &market.name)
.await
.unwrap();
}));
}
futures::future::join_all(handles).await;
Ok(())
}

View File

@ -1,141 +0,0 @@
use anchor_lang::prelude::Pubkey;
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use deadpool_postgres::Pool;
use log::debug;
use openbook_candles::{
database::{
initialize::{connect_to_database, setup_database},
insert::build_transactions_insert_statement,
},
structs::{
markets::{fetch_market_infos, load_markets},
transaction::{PgTransaction, NUM_TRANSACTION_PARTITIONS},
},
utils::{AnyhowWrap, Config, OPENBOOK_KEY},
worker::trade_fetching::scrape::scrape_fills,
};
use solana_client::{
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
};
use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature};
use std::{collections::HashMap, env, str::FromStr};
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
let args: Vec<String> = env::args().collect();
assert!(args.len() == 2);
let path_to_markets_json = &args[1];
// let num_days = args[2].parse::<i64>().unwrap(); // TODO: implement
let num_days = 1;
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
let config = Config {
rpc_url: rpc_url.clone(),
};
let markets = load_markets(path_to_markets_json);
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
let mut target_markets = HashMap::new();
for m in market_infos.clone() {
target_markets.insert(Pubkey::from_str(&m.address)?, m.name);
}
println!("{:?}", target_markets);
let pool = connect_to_database().await?;
setup_database(&pool).await?;
let mut handles = vec![];
let rpc_clone = rpc_url.clone();
let pool_clone = pool.clone();
handles.push(tokio::spawn(async move {
fetch_signatures(rpc_clone, &pool_clone, num_days)
.await
.unwrap();
}));
// Low priority improvement: batch fills into 1000's per worker
for id in 0..NUM_TRANSACTION_PARTITIONS {
let rpc_clone = rpc_url.clone();
let pool_clone = pool.clone();
let markets_clone = target_markets.clone();
handles.push(tokio::spawn(async move {
scrape_fills(id as i32, rpc_clone, &pool_clone, &markets_clone)
.await
.unwrap();
}));
}
// TODO: spawn status thread
futures::future::join_all(handles).await;
Ok(())
}
pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> anyhow::Result<()> {
let mut before_sig: Option<Signature> = None;
let mut now_time = Utc::now().timestamp();
let end_time = (Utc::now() - Duration::days(num_days)).timestamp();
let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
while now_time > end_time {
let rpc_config = GetConfirmedSignaturesForAddress2Config {
before: before_sig,
until: None,
limit: None,
commitment: Some(CommitmentConfig::confirmed()),
};
let sigs = match rpc_client
.get_signatures_for_address_with_config(&OPENBOOK_KEY, rpc_config)
.await
{
Ok(sigs) => sigs,
Err(e) => {
println!("Error fetching signatures: {}", e);
continue;
}
};
if sigs.is_empty() {
println!("No signatures found, trying again");
continue;
}
let last = sigs.last().unwrap();
let last_time = last.block_time.unwrap();
let last_signature = last.signature.clone();
let transactions = sigs
.into_iter()
.map(PgTransaction::from_rpc_confirmed_transaction)
.collect::<Vec<PgTransaction>>();
if transactions.is_empty() {
println!("No transactions found, trying again");
}
debug!("writing: {:?} txns to DB\n", transactions.len());
let upsert_statement = build_transactions_insert_statement(transactions);
let client = pool.get().await?;
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()?;
now_time = last_time;
before_sig = Some(Signature::from_str(&last_signature)?);
let time_left = backfill_time_left(now_time, end_time);
println!(
"{} minutes ~ {} days remaining in the backfill\n",
time_left.num_minutes(),
time_left.num_days()
);
}
Ok(())
}
fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration {
let naive_cur = NaiveDateTime::from_timestamp_millis(current_time * 1000).unwrap();
let naive_bf = NaiveDateTime::from_timestamp_millis(backfill_end * 1000).unwrap();
let cur_date = DateTime::<Utc>::from_utc(naive_cur, Utc);
let bf_date = DateTime::<Utc>::from_utc(naive_bf, Utc);
cur_date - bf_date
}

82
src/database/backfill.rs Normal file
View File

@ -0,0 +1,82 @@
use crate::structs::{candle::Candle, openbook::PgOpenBookFill};
use chrono::{DateTime, Utc};
use deadpool_postgres::{GenericClient, Object};
pub async fn fetch_earliest_fill_multiple_markets(
conn_object: &Object,
market_address_strings: &Vec<String>,
) -> anyhow::Result<Option<PgOpenBookFill>> {
let stmt = r#"SELECT
block_datetime as "time",
market as "market_key",
bid as "bid",
maker as "maker",
price as "price",
size as "size"
from openbook.openbook_fill_events
where market = ANY($1)
and maker = true
ORDER BY time asc LIMIT 1"#;
let row = conn_object
.query_opt(stmt, &[&market_address_strings])
.await?;
match row {
Some(r) => Ok(Some(PgOpenBookFill::from_row(r))),
None => Ok(None),
}
}
pub async fn fetch_fills_multiple_markets_from(
conn_object: &Object,
market_address_strings: &Vec<String>,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
) -> anyhow::Result<Vec<PgOpenBookFill>> {
let stmt = r#"SELECT
block_datetime as "time",
market as "market_key",
bid as "bid",
maker as "maker",
price as "price",
size as "size"
from openbook.openbook_fill_events
where market = ANY($1)
and block_datetime >= $2::timestamptz
and block_datetime < $3::timestamptz
and maker = true
ORDER BY time asc"#;
let rows = conn_object
.query(stmt, &[&market_address_strings, &start_time, &end_time])
.await?;
Ok(rows.into_iter().map(PgOpenBookFill::from_row).collect())
}
pub async fn fetch_last_minute_candles(conn_object: &Object) -> anyhow::Result<Vec<Candle>> {
let stmt = r#"SELECT
c.market_name as "market_name",
c.start_time as "start_time",
c.end_time as "end_time",
c.resolution as "resolution",
c.open as "open",
c.close as "close",
c.high as "high",
c.low as "low",
c.volume as "volume",
c.complete as "complete"
from
(
select market_name, max(start_time) as max_start_time from openbook.candles
where resolution = '1M'
group by market_name
) mkts
left join openbook.candles c
on mkts.market_name = c.market_name
and mkts.max_start_time = c.start_time
where c.resolution ='1M'"#;
let rows = conn_object.query(stmt, &[]).await?;
Ok(rows.into_iter().map(Candle::from_row).collect())
}

View File

@ -4,7 +4,6 @@ use crate::structs::{
openbook::PgOpenBookFill,
resolution::Resolution,
trader::PgTrader,
transaction::PgTransaction,
};
use chrono::{DateTime, Utc};
use deadpool_postgres::{GenericClient, Pool};
@ -16,13 +15,13 @@ pub async fn fetch_earliest_fill(
let client = pool.get().await?;
let stmt = r#"SELECT
time as "time!",
bid as "bid!",
maker as "maker!",
native_qty_paid as "native_qty_paid!",
native_qty_received as "native_qty_received!",
native_fee_or_rebate as "native_fee_or_rebate!"
from fills
block_datetime as "time",
market as "market_key",
bid as "bid",
maker as "maker",
price as "price",
size as "size"
from openbook.openbook_fill_events
where market = $1
and maker = true
ORDER BY time asc LIMIT 1"#;
@ -44,16 +43,16 @@ pub async fn fetch_fills_from(
let client = pool.get().await?;
let stmt = r#"SELECT
time as "time!",
bid as "bid!",
maker as "maker!",
native_qty_paid as "native_qty_paid!",
native_qty_received as "native_qty_received!",
native_fee_or_rebate as "native_fee_or_rebate!"
from fills
block_datetime as "time",
market as "market_key",
bid as "bid",
maker as "maker",
price as "price",
size as "size"
from openbook.openbook_fill_events
where market = $1
and time >= $2::timestamptz
and time < $3::timestamptz
and block_datetime >= $2::timestamptz
and block_datetime < $3::timestamptz
and maker = true
ORDER BY time asc"#;
@ -71,17 +70,17 @@ pub async fn fetch_latest_finished_candle(
let client = pool.get().await?;
let stmt = r#"SELECT
market_name as "market_name!",
start_time as "start_time!",
end_time as "end_time!",
resolution as "resolution!",
open as "open!",
close as "close!",
high as "high!",
low as "low!",
volume as "volume!",
complete as "complete!"
from candles
market_name as "market_name",
start_time as "start_time",
end_time as "end_time",
resolution as "resolution",
open as "open",
close as "close",
high as "high",
low as "low",
volume as "volume",
complete as "complete"
from openbook.candles
where market_name = $1
and resolution = $2
and complete = true
@ -97,8 +96,8 @@ pub async fn fetch_latest_finished_candle(
}
}
/// Fetches all of the candles for the given market and resoultion, starting from the earliest.
/// Note that this function will fetch ALL candles.
/// Fetches all of the candles for the given market and resolution, starting from the earliest.
/// Note that this function will fetch at most 2000 candles.
pub async fn fetch_earliest_candles(
pool: &Pool,
market_name: &str,
@ -107,20 +106,21 @@ pub async fn fetch_earliest_candles(
let client = pool.get().await?;
let stmt = r#"SELECT
market_name as "market_name!",
start_time as "start_time!",
end_time as "end_time!",
market_name as "market_name",
start_time as "start_time",
end_time as "end_time",
resolution as "resolution!",
open as "open!",
close as "close!",
high as "high!",
low as "low!",
volume as "volume!",
complete as "complete!"
from candles
open as "open",
close as "close",
high as "high",
low as "low",
volume as "volume",
complete as "complete"
from openbook.candles
where market_name = $1
and resolution = $2
ORDER BY start_time asc"#;
ORDER BY start_time asc
LIMIT 2000"#;
let rows = client
.query(stmt, &[&market_name, &resolution.to_string()])
@ -139,17 +139,17 @@ pub async fn fetch_candles_from(
let client = pool.get().await?;
let stmt = r#"SELECT
market_name as "market_name!",
start_time as "start_time!",
end_time as "end_time!",
resolution as "resolution!",
open as "open!",
close as "close!",
high as "high!",
low as "low!",
volume as "volume!",
complete as "complete!"
from candles
market_name as "market_name",
start_time as "start_time",
end_time as "end_time",
resolution as "resolution",
open as "open",
close as "close",
high as "high",
low as "low",
volume as "volume",
complete as "complete"
from openbook.candles
where market_name = $1
and resolution = $2
and start_time >= $3
@ -182,20 +182,20 @@ pub async fn fetch_top_traders_by_base_volume_from(
let stmt = r#"SELECT
open_orders_owner,
sum(
native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
) as "raw_ask_size!",
native_quantity_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
) as "raw_ask_size",
sum(
native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
) as "raw_bid_size!"
FROM fills
native_quantity_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
) as "raw_bid_size"
FROM openbook.openbook_fill_events
WHERE market = $1
AND time >= $2
AND time < $3
GROUP BY open_orders_owner
ORDER BY
sum(native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
sum(native_quantity_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
+
sum(native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
sum(native_quantity_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
DESC
LIMIT 10000"#;
@ -217,20 +217,20 @@ pub async fn fetch_top_traders_by_quote_volume_from(
let stmt = r#"SELECT
open_orders_owner,
sum(
native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
) as "raw_ask_size!",
native_quantity_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
) as "raw_ask_size",
sum(
native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
) as "raw_bid_size!"
FROM fills
native_quantity_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
) as "raw_bid_size"
FROM openbook.openbook_fill_events
WHERE market = $1
AND time >= $2
AND time < $3
GROUP BY open_orders_owner
ORDER BY
sum(native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
sum(native_quantity_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
+
sum(native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
sum(native_quantity_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
DESC
LIMIT 10000"#;
@ -249,24 +249,27 @@ pub async fn fetch_coingecko_24h_volume(
let stmt = r#"SELECT
t1.market,
COALESCE(t2.native_qty_received, 0) as "raw_base_size!",
COALESCE(t2.native_qty_paid, 0) as "raw_quote_size!"
COALESCE(t2.base_size, 0) as "base_size",
COALESCE(t3.quote_size, 0) as "quote_size"
FROM (
SELECT distinct on (market) *
FROM fills f
where bid = true
and market = any($1)
order by market, "time" desc
SELECT unnest($1::text[]) as market
) t1
LEFT JOIN (
select market,
sum(native_qty_received) as "native_qty_received",
sum(native_qty_paid) as "native_qty_paid"
from fills
where "time" >= current_timestamp - interval '1 day'
sum("size") as "base_size"
from openbook.openbook_fill_events
where block_datetime >= current_timestamp - interval '1 day'
and bid = true
group by market
) t2 ON t1.market = t2.market"#;
) t2 ON t1.market = t2.market
LEFT JOIN (
select market,
sum("size" * price) as "quote_size"
from openbook.openbook_fill_events
where block_datetime >= current_timestamp - interval '1 day'
and bid = true
group by market
) t3 ON t1.market = t3.market"#;
let rows = client.query(stmt, &[&market_address_strings]).await?;
@ -291,10 +294,10 @@ pub async fn fetch_coingecko_24h_high_low(
(
SELECT *
from
candles
openbook.candles
where (market_name, start_time, resolution) in (
select market_name, max(start_time), resolution
from candles
from openbook.candles
where "resolution" = '1M'
and market_name = any($1)
group by market_name, resolution
@ -307,7 +310,7 @@ pub async fn fetch_coingecko_24h_high_low(
max(high) as "high",
min(low) as "low"
from
candles
openbook.candles
where
"resolution" = '1M'
and "start_time" >= current_timestamp - interval '1 day'
@ -321,23 +324,3 @@ pub async fn fetch_coingecko_24h_high_low(
.map(PgCoinGecko24HighLow::from_row)
.collect())
}
/// Fetches unprocessed, non-error transactions for the specified worker partition.
/// Pulls at most 50 transactions at a time.
pub async fn fetch_worker_transactions(
worker_id: i32,
pool: &Pool,
) -> anyhow::Result<Vec<PgTransaction>> {
let client = pool.get().await?;
let stmt = r#"SELECT signature, program_pk, block_datetime, slot, err, "processed", worker_partition
FROM transactions
where worker_partition = $1
and err = false
and processed = false
LIMIT 50"#;
let rows = client.query(stmt, &[&worker_id]).await?;
Ok(rows.into_iter().map(PgTransaction::from_row).collect())
}

View File

@ -21,8 +21,8 @@ pub async fn connect_to_database() -> anyhow::Result<Pool> {
// openssl pkcs12 -export -in client.cer -inkey client-key.cer -out client.pks
// base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64
// fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a mango-fills
// fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills
// fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a APP-NAME
// fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a APP-NAME
let tls = if pg_config.pg_use_ssl {
pg_config.pg.ssl_mode = Some(SslMode::Require);
let ca_cert = fs::read(pg_config.pg_ca_cert_path.expect("reading ca cert from env"))
@ -66,11 +66,7 @@ pub async fn connect_to_database() -> anyhow::Result<Pool> {
}
pub async fn setup_database(pool: &Pool) -> anyhow::Result<()> {
let candles_table_fut = create_candles_table(pool);
let transactions_table_fut = create_transactions_table(pool);
let fills_table_fut = create_fills_table(pool);
let result = tokio::try_join!(candles_table_fut, transactions_table_fut, fills_table_fut);
match result {
match create_candles_table(pool).await {
Ok(_) => {
println!("Successfully configured database");
Ok(())
@ -87,7 +83,7 @@ pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> {
client
.execute(
"CREATE TABLE IF NOT EXISTS candles (
"CREATE TABLE IF NOT EXISTS openbook.candles (
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
market_name text,
start_time timestamptz,
@ -105,90 +101,9 @@ pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> {
.await?;
client.execute(
"CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market_name, start_time, resolution)",
"CREATE UNIQUE INDEX IF NOT EXISTS idx_market_time_resolution ON openbook.candles USING btree (market_name, start_time, resolution);",
&[]
).await?;
client.execute(
"DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'unique_candles') THEN
ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market_name, start_time, resolution);
END IF;
END $$", &[]
).await?;
Ok(())
}
pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> {
let client = pool.get().await?;
client
.execute(
"CREATE TABLE IF NOT EXISTS fills (
signature text not null,
time timestamptz not null,
market text not null,
open_orders text not null,
open_orders_owner text not null,
bid bool not null,
maker bool not null,
native_qty_paid double precision not null,
native_qty_received double precision not null,
native_fee_or_rebate double precision not null,
fee_tier text not null,
order_id text not null,
log_index int4 not null,
CONSTRAINT fills_pk PRIMARY KEY (signature, log_index)
)",
&[],
)
.await?;
client
.execute(
"CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)",
&[],
)
.await?;
Ok(())
}
pub async fn create_transactions_table(pool: &Pool) -> anyhow::Result<()> {
let client = pool.get().await?;
client
.execute(
"CREATE TABLE IF NOT EXISTS transactions (
signature text NOT NULL,
program_pk text NOT NULL,
block_datetime timestamptz NOT NULL,
slot bigint NOT NULL,
err bool NOT NULL,
processed bool NOT NULL,
worker_partition int4 NOT NULL,
CONSTRAINT transactions_pk PRIMARY KEY (signature, worker_partition)
) PARTITION BY LIST (worker_partition);",
&[],
)
.await?;
client.batch_execute(
"CREATE INDEX IF NOT EXISTS transactions_processed_err_idx ON ONLY transactions (signature) WHERE processed IS NOT TRUE and err IS NOT TRUE;
CREATE INDEX IF NOT EXISTS transactions_program_pk_idx ON ONLY transactions USING btree (program_pk, slot DESC);
CREATE TABLE IF NOT EXISTS transactions_0 PARTITION OF transactions FOR VALUES IN (0);
CREATE TABLE IF NOT EXISTS transactions_1 PARTITION OF transactions FOR VALUES IN (1);
CREATE TABLE IF NOT EXISTS transactions_2 PARTITION OF transactions FOR VALUES IN (2);
CREATE TABLE IF NOT EXISTS transactions_3 PARTITION OF transactions FOR VALUES IN (3);
CREATE TABLE IF NOT EXISTS transactions_4 PARTITION OF transactions FOR VALUES IN (4);
CREATE TABLE IF NOT EXISTS transactions_5 PARTITION OF transactions FOR VALUES IN (5);
CREATE TABLE IF NOT EXISTS transactions_6 PARTITION OF transactions FOR VALUES IN (6);
CREATE TABLE IF NOT EXISTS transactions_7 PARTITION OF transactions FOR VALUES IN (7);
CREATE TABLE IF NOT EXISTS transactions_8 PARTITION OF transactions FOR VALUES IN (8);
CREATE TABLE IF NOT EXISTS transactions_9 PARTITION OF transactions FOR VALUES IN (9);"
).await?;
Ok(())
}

View File

@ -1,79 +1,7 @@
use deadpool_postgres::Pool;
use crate::{
structs::{candle::Candle, openbook::OpenBookFillEvent, transaction::PgTransaction},
utils::{to_timestampz, AnyhowWrap},
};
pub async fn insert_fills_atomically(
pool: &Pool,
worker_id: i32,
fills: Vec<OpenBookFillEvent>,
signatures: Vec<String>,
) -> anyhow::Result<()> {
let mut client = pool.get().await?;
let db_txn = client.build_transaction().start().await?;
// 1. Insert fills
if !fills.is_empty() {
let fills_statement = build_fills_upsert_statement(fills);
db_txn
.execute(&fills_statement, &[])
.await
.map_err_anyhow()
.unwrap();
}
// 2. Update txns table as processed
let transactions_statement =
build_transactions_processed_update_statement(worker_id, signatures);
db_txn
.execute(&transactions_statement, &[])
.await
.map_err_anyhow()
.unwrap();
db_txn.commit().await?;
Ok(())
}
fn build_fills_upsert_statement(fills: Vec<OpenBookFillEvent>) -> String {
let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES");
for (idx, fill) in fills.iter().enumerate() {
let val_str = format!(
"(\'{}\', \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})",
fill.signature,
to_timestampz(fill.block_time as u64).to_rfc3339(),
fill.market,
fill.open_orders,
fill.open_orders_owner,
fill.bid,
fill.maker,
fill.native_qty_paid,
fill.native_qty_received,
fill.native_fee_or_rebate,
fill.fee_tier,
fill.order_id,
fill.log_index,
);
if idx == 0 {
stmt = format!("{} {}", &stmt, val_str);
} else {
stmt = format!("{}, {}", &stmt, val_str);
}
}
let handle_conflict = "ON CONFLICT DO NOTHING";
stmt = format!("{} {}", stmt, handle_conflict);
stmt
}
use crate::structs::candle::Candle;
pub fn build_candles_upsert_statement(candles: &Vec<Candle>) -> String {
let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES");
let mut stmt = String::from("INSERT INTO openbook.candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES");
for (idx, candle) in candles.iter().enumerate() {
let val_str = format!(
"(\'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {})",
@ -109,54 +37,3 @@ pub fn build_candles_upsert_statement(candles: &Vec<Candle>) -> String {
stmt = format!("{} {}", stmt, handle_conflict);
stmt
}
pub fn build_transactions_insert_statement(transactions: Vec<PgTransaction>) -> String {
let mut stmt = String::from("INSERT INTO transactions (signature, program_pk, block_datetime, slot, err, processed, worker_partition) VALUES");
for (idx, txn) in transactions.iter().enumerate() {
let val_str = format!(
"(\'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {})",
txn.signature,
txn.program_pk,
txn.block_datetime.to_rfc3339(),
txn.slot,
txn.err,
txn.processed,
txn.worker_partition,
);
if idx == 0 {
stmt = format!("{} {}", &stmt, val_str);
} else {
stmt = format!("{}, {}", &stmt, val_str);
}
}
let handle_conflict = "ON CONFLICT DO NOTHING";
stmt = format!("{} {}", stmt, handle_conflict);
stmt
}
pub fn build_transactions_processed_update_statement(
worker_id: i32,
processed_signatures: Vec<String>,
) -> String {
let mut stmt = String::from(
"UPDATE transactions
SET processed = true
WHERE transactions.signature IN (",
);
for (idx, sig) in processed_signatures.iter().enumerate() {
let val_str = if idx == processed_signatures.len() - 1 {
format!("\'{}\'", sig,)
} else {
format!("\'{}\',", sig,)
};
stmt = format!("{} {}", &stmt, val_str);
}
let worker_stmt = format!(") AND worker_partition = {} ", worker_id);
stmt = format!("{} {}", stmt, worker_stmt);
stmt
}

View File

@ -1,3 +1,4 @@
pub mod backfill;
pub mod fetch;
pub mod initialize;
pub mod insert;

View File

@ -1,7 +1,7 @@
use serde::Serialize;
use tokio_postgres::Row;
use super::{markets::MarketInfo, openbook::token_factor};
use super::markets::MarketInfo;
#[derive(Debug, Clone, Serialize)]
pub struct CoinGeckoOrderBook {
@ -35,26 +35,24 @@ pub struct CoinGeckoTicker {
pub struct PgCoinGecko24HourVolume {
pub address: String,
pub raw_base_size: f64,
pub raw_quote_size: f64,
pub base_size: f64,
pub quote_size: f64,
}
impl PgCoinGecko24HourVolume {
pub fn convert_to_readable(&self, markets: &Vec<MarketInfo>) -> CoinGecko24HourVolume {
let market = markets.iter().find(|m| m.address == self.address).unwrap();
let base_volume = self.raw_base_size / token_factor(market.base_decimals);
let target_volume = self.raw_quote_size / token_factor(market.quote_decimals);
CoinGecko24HourVolume {
market_name: market.name.clone(),
base_volume,
target_volume,
base_volume: self.base_size,
target_volume: self.quote_size,
}
}
pub fn from_row(row: Row) -> Self {
PgCoinGecko24HourVolume {
address: row.get(0),
raw_base_size: row.get(1),
raw_quote_size: row.get(2),
base_size: row.get(1),
quote_size: row.get(2),
}
}
}

View File

@ -6,4 +6,3 @@ pub mod resolution;
pub mod slab;
pub mod trader;
pub mod tradingview;
pub mod transaction;

View File

@ -1,93 +1,26 @@
use anchor_lang::{event, AnchorDeserialize, AnchorSerialize};
use anchor_lang::AnchorDeserialize;
use chrono::{DateTime, Utc};
use num_traits::Pow;
use solana_sdk::pubkey::Pubkey;
use tokio_postgres::Row;
#[event]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OpenBookFillEventRaw {
pub market: Pubkey,
pub open_orders: Pubkey,
pub open_orders_owner: Pubkey,
pub bid: bool,
pub maker: bool,
pub native_qty_paid: u64,
pub native_qty_received: u64,
pub native_fee_or_rebate: u64,
pub order_id: u128,
pub owner_slot: u8,
pub fee_tier: u8,
pub client_order_id: Option<u64>,
pub referrer_rebate: Option<u64>,
}
impl OpenBookFillEventRaw {
pub fn into_event(
self,
signature: String,
block_time: i64,
log_index: usize,
) -> OpenBookFillEvent {
OpenBookFillEvent {
signature,
market: self.market,
open_orders: self.open_orders,
open_orders_owner: self.open_orders_owner,
bid: self.bid,
maker: self.maker,
native_qty_paid: self.native_qty_paid,
native_qty_received: self.native_qty_received,
native_fee_or_rebate: self.native_fee_or_rebate,
order_id: self.order_id,
owner_slot: self.owner_slot,
fee_tier: self.fee_tier,
client_order_id: self.client_order_id,
referrer_rebate: self.referrer_rebate,
block_time,
log_index,
}
}
}
#[event]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OpenBookFillEvent {
pub signature: String,
pub market: Pubkey,
pub open_orders: Pubkey,
pub open_orders_owner: Pubkey,
pub bid: bool,
pub maker: bool,
pub native_qty_paid: u64,
pub native_qty_received: u64,
pub native_fee_or_rebate: u64,
pub order_id: u128,
pub owner_slot: u8,
pub fee_tier: u8,
pub client_order_id: Option<u64>,
pub referrer_rebate: Option<u64>,
pub block_time: i64,
pub log_index: usize,
}
#[derive(Copy, Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq)]
pub struct PgOpenBookFill {
pub time: DateTime<Utc>,
pub market_key: String,
pub bid: bool,
pub maker: bool,
pub native_qty_paid: f64,
pub native_qty_received: f64,
pub native_fee_or_rebate: f64,
pub price: f64,
pub size: f64,
}
impl PgOpenBookFill {
pub fn from_row(row: Row) -> Self {
PgOpenBookFill {
time: row.get(0),
bid: row.get(1),
maker: row.get(2),
native_qty_paid: row.get(3),
native_qty_received: row.get(4),
native_fee_or_rebate: row.get(5),
market_key: row.get(1),
bid: row.get(2),
maker: row.get(3),
price: row.get(4),
size: row.get(5),
}
}
}
@ -147,34 +80,6 @@ pub struct MarketState {
pub referrer_rebates_accrued: u64,
}
pub fn calculate_fill_price_and_size(
fill: PgOpenBookFill,
base_decimals: u8,
quote_decimals: u8,
) -> (f64, f64) {
if fill.bid {
let price_before_fees = if fill.maker {
fill.native_qty_paid + fill.native_fee_or_rebate
} else {
fill.native_qty_paid - fill.native_fee_or_rebate
};
let price = (price_before_fees * token_factor(base_decimals))
/ (token_factor(quote_decimals) * fill.native_qty_received);
let size = fill.native_qty_received / token_factor(base_decimals);
(price, size)
} else {
let price_before_fees = if fill.maker {
fill.native_qty_received - fill.native_fee_or_rebate
} else {
fill.native_qty_received + fill.native_fee_or_rebate
};
let price = (price_before_fees * token_factor(base_decimals))
/ (token_factor(quote_decimals) * fill.native_qty_paid);
let size = fill.native_qty_paid / token_factor(base_decimals);
(price, size)
}
}
pub fn token_factor(decimals: u8) -> f64 {
10f64.pow(decimals as f64)
}

View File

@ -9,8 +9,8 @@ use super::openbook::token_factor;
#[derive(Clone, Debug, PartialEq)]
pub struct PgTrader {
pub open_orders_owner: String,
pub raw_ask_size: f64,
pub raw_bid_size: f64,
pub raw_ask_size: i64,
pub raw_bid_size: i64,
}
impl PgTrader {
pub fn from_row(row: Row) -> Self {
@ -52,8 +52,8 @@ pub struct TraderResponse {
// Note that the Postgres queries only return volumes in base or quote
pub fn calculate_trader_volume(trader: PgTrader, decimals: u8) -> Trader {
let bid_size = trader.raw_bid_size / token_factor(decimals);
let ask_size = trader.raw_ask_size / token_factor(decimals);
let bid_size = (trader.raw_bid_size as f64) / token_factor(decimals);
let ask_size = (trader.raw_ask_size as f64) / token_factor(decimals);
Trader {
pubkey: trader.open_orders_owner,

View File

@ -1,52 +0,0 @@
use chrono::{DateTime, Utc};
use solana_client::rpc_response::RpcConfirmedTransactionStatusWithSignature;
use tokio_postgres::Row;
use crate::utils::{to_timestampz, OPENBOOK_KEY};
#[derive(Clone, Debug, PartialEq)]
pub struct PgTransaction {
pub signature: String,
pub program_pk: String,
pub block_datetime: DateTime<Utc>,
pub slot: u64,
pub err: bool,
pub processed: bool,
pub worker_partition: i32,
}
pub const NUM_TRANSACTION_PARTITIONS: u64 = 10;
impl PgTransaction {
pub fn from_rpc_confirmed_transaction(
rpc_confirmed_transaction: RpcConfirmedTransactionStatusWithSignature,
) -> Self {
PgTransaction {
signature: rpc_confirmed_transaction.signature,
program_pk: OPENBOOK_KEY.to_string(),
block_datetime: to_timestampz(rpc_confirmed_transaction.block_time.unwrap() as u64),
slot: rpc_confirmed_transaction.slot,
err: rpc_confirmed_transaction.err.is_some(),
processed: false,
worker_partition: (rpc_confirmed_transaction.slot % NUM_TRANSACTION_PARTITIONS) as i32,
}
}
pub fn from_row(row: Row) -> Self {
let slot_raw = row.get::<usize, i64>(3);
PgTransaction {
signature: row.get(0),
program_pk: row.get(1),
block_datetime: row.get(2),
slot: slot_raw as u64,
err: row.get(4),
processed: row.get(5),
worker_partition: row.get(6),
}
}
}
pub enum ProcessState {
Processed,
Unprocessed,
}

View File

@ -1,15 +1,22 @@
use chrono::{DateTime, Duration, DurationRound, Utc};
use deadpool_postgres::Pool;
use log::debug;
use std::cmp::max;
use std::cmp::{max, min};
use strum::IntoEnumIterator;
use crate::{
database::fetch::{fetch_candles_from, fetch_earliest_candles, fetch_latest_finished_candle},
database::{
fetch::{
fetch_candles_from, fetch_earliest_candles,
fetch_latest_finished_candle,
},
insert::build_candles_upsert_statement,
},
structs::{
candle::Candle,
resolution::{day, Resolution},
},
utils::{f64_max, f64_min},
utils::{f64_max, f64_min, AnyhowWrap},
};
pub async fn batch_higher_order_candles(
@ -34,12 +41,8 @@ pub async fn batch_higher_order_candles(
if constituent_candles.is_empty() {
return Ok(Vec::new());
}
let combined_candles = combine_into_higher_order_candles(
&mut constituent_candles,
resolution,
start_time,
candle,
);
let combined_candles =
combine_into_higher_order_candles(&mut constituent_candles, resolution, start_time);
Ok(combined_candles)
}
None => {
@ -61,13 +64,8 @@ pub async fn batch_higher_order_candles(
return Ok(Vec::new());
}
let seed_candle = constituent_candles[0].clone();
let combined_candles = combine_into_higher_order_candles(
&mut constituent_candles,
resolution,
start_time,
seed_candle,
);
let combined_candles =
combine_into_higher_order_candles(&mut constituent_candles, resolution, start_time);
Ok(trim_candles(
combined_candles,
@ -78,10 +76,9 @@ pub async fn batch_higher_order_candles(
}
fn combine_into_higher_order_candles(
constituent_candles: &mut Vec<Candle>,
constituent_candles: &Vec<Candle>,
target_resolution: Resolution,
st: DateTime<Utc>,
seed_candle: Candle,
) -> Vec<Candle> {
debug!("combining for target_resolution: {}", target_resolution);
@ -92,7 +89,7 @@ fn combine_into_higher_order_candles(
target_resolution,
);
let now = Utc::now().duration_trunc(Duration::minutes(1)).unwrap();
let candle_window = now - st;
let candle_window = min(now - st, day());
let num_candles = max(
1,
(candle_window.num_minutes() / duration.num_minutes()) as usize + 1,
@ -100,17 +97,16 @@ fn combine_into_higher_order_candles(
let mut combined_candles = vec![empty_candle; num_candles];
let mut con_iter = constituent_candles.iter_mut().peekable();
let mut last_close = constituent_candles[0].close;
let mut con_iter = constituent_candles.iter().peekable();
let mut start_time = st;
let mut end_time = start_time + duration;
let mut last_candle = seed_candle;
for i in 0..combined_candles.len() {
combined_candles[i].open = last_candle.close;
combined_candles[i].low = last_candle.close;
combined_candles[i].close = last_candle.close;
combined_candles[i].high = last_candle.close;
combined_candles[i].open = last_close;
combined_candles[i].low = last_close;
combined_candles[i].close = last_close;
combined_candles[i].high = last_close;
while matches!(con_iter.peek(), Some(c) if c.end_time <= end_time) {
let unit_candle = con_iter.next().unwrap();
@ -128,7 +124,7 @@ fn combine_into_higher_order_candles(
start_time = end_time;
end_time += duration;
last_candle = combined_candles[i].clone();
last_close = combined_candles[i].close;
}
combined_candles
@ -149,25 +145,38 @@ fn trim_candles(mut c: Vec<Candle>, start_time: DateTime<Utc>) -> Vec<Candle> {
pub async fn backfill_batch_higher_order_candles(
pool: &Pool,
market_name: &str,
resolution: Resolution,
) -> anyhow::Result<Vec<Candle>> {
let mut constituent_candles =
fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution()).await?;
if constituent_candles.is_empty() {
return Ok(vec![]);
) -> anyhow::Result<()> {
let earliest_candles = fetch_earliest_candles(pool, market_name, Resolution::R1m).await?;
let mut start_time = earliest_candles[0].start_time.duration_trunc(day())?;
while start_time < Utc::now() {
let mut candles = vec![];
let mut constituent_candles = fetch_candles_from(
pool,
market_name,
Resolution::R1m,
start_time,
start_time + day(),
)
.await?;
for resolution in Resolution::iter() {
if resolution == Resolution::R1m {
continue;
}
let mut combined_candles =
combine_into_higher_order_candles(&mut constituent_candles, resolution, start_time);
candles.append(&mut combined_candles);
}
let upsert_statement = build_candles_upsert_statement(&candles);
let client = pool.get().await.unwrap();
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()?;
// println!("{:?} {:?} done", market_name, start_time);
start_time += day();
}
let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
let seed_candle = constituent_candles[0].clone();
let combined_candles = combine_into_higher_order_candles(
&mut constituent_candles,
resolution,
start_time,
seed_candle,
);
Ok(trim_candles(
combined_candles,
constituent_candles[0].start_time,
))
Ok(())
}

View File

@ -1,18 +1,26 @@
use std::cmp::min;
use std::{cmp::min, collections::HashMap};
use chrono::{DateTime, Duration, DurationRound, Utc};
use deadpool_postgres::Pool;
use itertools::Itertools;
use log::debug;
use crate::database::backfill::{
fetch_earliest_fill_multiple_markets, fetch_fills_multiple_markets_from,
fetch_last_minute_candles,
};
use crate::{
database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
database::{
fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
insert::build_candles_upsert_statement,
},
structs::{
candle::Candle,
candle::{Candle},
markets::MarketInfo,
openbook::{calculate_fill_price_and_size, PgOpenBookFill},
openbook::PgOpenBookFill,
resolution::{day, Resolution},
},
utils::{f64_max, f64_min},
utils::{f64_max, f64_min, AnyhowWrap},
};
pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Result<Vec<Candle>> {
@ -86,9 +94,7 @@ fn combine_fills_into_1m_candles(
Some(p) => p,
None => {
let first = fills_iter.peek().unwrap();
let (price, _) =
calculate_fill_price_and_size(**first, market.base_decimals, market.quote_decimals);
price
first.price
}
};
@ -100,15 +106,13 @@ fn combine_fills_into_1m_candles(
while matches!(fills_iter.peek(), Some(f) if f.time < end_time) {
let fill = fills_iter.next().unwrap();
let (price, volume) =
calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals);
candles[i].close = price;
candles[i].low = f64_min(price, candles[i].low);
candles[i].high = f64_max(price, candles[i].high);
candles[i].volume += volume;
candles[i].close = fill.price;
candles[i].low = f64_min(fill.price, candles[i].low);
candles[i].high = f64_max(fill.price, candles[i].high);
candles[i].volume += fill.size;
last_price = price;
last_price = fill.price;
}
candles[i].start_time = start_time;
@ -125,17 +129,19 @@ fn combine_fills_into_1m_candles(
/// Goes from the earliest fill to the most recent. Will mark candles as complete if there are missing gaps of fills between the start and end.
pub async fn backfill_batch_1m_candles(
pool: &Pool,
market: &MarketInfo,
) -> anyhow::Result<Vec<Candle>> {
let market_name = &market.name;
let market_address = &market.address;
let mut candles = vec![];
markets: Vec<MarketInfo>,
) -> anyhow::Result<()> {
let market_address_strings: Vec<String> = markets.iter().map(|m| m.address.clone()).collect();
let mut candle_container = HashMap::new();
let client = pool.get().await?;
let earliest_fill = fetch_earliest_fill(pool, &market.address).await?;
let earliest_fill =
fetch_earliest_fill_multiple_markets(&client, &market_address_strings).await?;
if earliest_fill.is_none() {
debug!("No fills found for: {:?}", &market_name);
return Ok(candles);
println!("No fills found for backfill");
return Ok(());
}
println!("Found earliset fill for backfill");
let mut start_time = earliest_fill
.unwrap()
@ -146,13 +152,78 @@ pub async fn backfill_batch_1m_candles(
start_time + day(),
Utc::now().duration_trunc(Duration::minutes(1))?,
);
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
if !fills.is_empty() {
let mut minute_candles =
let last_candles = fetch_last_minute_candles(&client).await?;
let all_fills = fetch_fills_multiple_markets_from(
&client,
&market_address_strings,
start_time,
end_time,
)
.await?;
// println!("{:?} {:?}", start_time, end_time);
// println!("all fills len : {:?}", all_fills.len());
println!("{:?}", all_fills[0]);
// println!("{:?}", all_fills[1]);
// println!("Fetched multiple fills for backfill");
let fills_groups = all_fills
.into_iter()
.sorted_by(|a, b| Ord::cmp(&a.market_key, &b.market_key))
.group_by(|f| f.market_key.clone());
let fills_by_market: Vec<(String, Vec<PgOpenBookFill>)> = fills_groups
.into_iter()
.map(|(m, group)| (m, group.collect()))
.collect();
println!("fbm len : {:?}", fills_by_market.len());
// sort fills by market, make candles
for (_, mut fills) in fills_by_market {
let market = markets
.iter()
.find(|m| m.address == fills[0].market_key)
.unwrap();
let minute_candles =
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
candles.append(&mut minute_candles);
candle_container.insert(&market.address, minute_candles);
}
start_time += day()
// where no candles, make empty ones
for (k, v) in candle_container.iter_mut() {
if v.is_empty() {
let market = markets.iter().find(|m| &m.address == *k).unwrap();
let last_candle = last_candles
.iter()
.find(|c| c.market_name == market.name)
.unwrap();
let empty_candles = combine_fills_into_1m_candles(
&mut vec![],
market,
start_time,
end_time,
Some(last_candle.close),
);
*v = empty_candles;
}
}
// insert candles in batches
for candles in candle_container.values() {
let candle_chunks: Vec<Vec<Candle>> =
candles.chunks(1500).map(|chunk| chunk.to_vec()).collect(); // 1440 minutes in a day
for c in candle_chunks {
let upsert_statement = build_candles_upsert_statement(&c);
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()?;
}
}
// reset entries but keep markets we've seen for blank candles
for (_, v) in candle_container.iter_mut() {
*v = vec![];
}
println!("day done");
start_time += day();
}
Ok(candles)
Ok(())
}

View File

@ -21,7 +21,6 @@ use super::metrics::METRIC_CANDLES_TOTAL;
pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
loop {
let market_clone = market.clone();
loop {
sleep(Duration::milliseconds(5000).to_std()?).await;
match batch_inner(pool, &market_clone).await {
@ -43,6 +42,9 @@ pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
let market_name = &market.name.clone();
let candles = batch_1m_candles(pool, market).await?;
if candles.is_empty() {
return Ok(());
}
METRIC_CANDLES_TOTAL
.with_label_values(&[market.name.as_str()])
.inc_by(candles.clone().len() as u64);
@ -56,7 +58,7 @@ async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
.with_label_values(&[market.name.as_str()])
.inc_by(candles.clone().len() as u64);
save_candles(pool, candles).await?;
}
}
Ok(())
}

View File

@ -1,11 +1,9 @@
use log::{error, info};
use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
use openbook_candles::structs::transaction::NUM_TRANSACTION_PARTITIONS;
use openbook_candles::utils::Config;
use openbook_candles::worker::metrics::{
serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE,
};
use openbook_candles::worker::trade_fetching::scrape::{scrape_fills, scrape_signatures};
use openbook_candles::{
database::initialize::{connect_to_database, setup_database},
worker::candle_batching::batch_for_market,
@ -40,25 +38,6 @@ async fn main() -> anyhow::Result<()> {
setup_database(&pool).await?;
let mut handles = vec![];
// signature scraping
let rpc_clone = rpc_url.clone();
let pool_clone = pool.clone();
handles.push(tokio::spawn(async move {
scrape_signatures(rpc_clone, &pool_clone).await.unwrap();
}));
// transaction/fill scraping
for id in 0..NUM_TRANSACTION_PARTITIONS {
let rpc_clone = rpc_url.clone();
let pool_clone = pool.clone();
let markets_clone = target_markets.clone();
handles.push(tokio::spawn(async move {
scrape_fills(id as i32, rpc_clone, &pool_clone, &markets_clone)
.await
.unwrap();
}));
}
// candle batching
for market in market_infos.into_iter() {
let batch_pool = pool.clone();

View File

@ -1,3 +1,2 @@
pub mod candle_batching;
pub mod metrics;
pub mod trade_fetching;

View File

@ -1,2 +0,0 @@
pub mod parsing;
pub mod scrape;

View File

@ -1,94 +0,0 @@
use log::warn;
use solana_client::client_error::Result as ClientResult;
use solana_sdk::pubkey::Pubkey;
use solana_transaction_status::{
option_serializer::OptionSerializer, EncodedConfirmedTransactionWithStatusMeta,
};
use std::{collections::HashMap, io::Error};
use crate::{
structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw},
worker::metrics::METRIC_RPC_ERRORS_TOTAL,
};
const PROGRAM_DATA: &str = "Program data: ";
pub fn parse_trades_from_openbook_txns(
txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>,
mut sig_strings: Vec<String>,
target_markets: &HashMap<Pubkey, String>,
) -> (Vec<OpenBookFillEvent>, Vec<String>) {
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
let mut failed_sigs = vec![];
for (idx, txn) in txns.iter_mut().enumerate() {
match txn {
Ok(t) => {
if let Some(m) = &t.transaction.meta {
match &m.log_messages {
OptionSerializer::Some(logs) => {
match parse_openbook_fills_from_logs(
logs,
target_markets,
sig_strings[idx].clone(),
t.block_time.unwrap(),
) {
Some(mut events) => fills_vector.append(&mut events),
None => {}
}
}
OptionSerializer::None => {}
OptionSerializer::Skip => {}
}
}
}
Err(e) => {
warn!("rpc error in get_transaction {}", e);
failed_sigs.push(sig_strings[idx].clone());
METRIC_RPC_ERRORS_TOTAL
.with_label_values(&["getTransaction"])
.inc();
}
}
}
sig_strings.retain(|s| !failed_sigs.contains(s));
(fills_vector, sig_strings)
}
fn parse_openbook_fills_from_logs(
logs: &Vec<String>,
target_markets: &HashMap<Pubkey, String>,
signature: String,
block_time: i64,
) -> Option<Vec<OpenBookFillEvent>> {
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
for (idx, l) in logs.iter().enumerate() {
match l.strip_prefix(PROGRAM_DATA) {
Some(log) => {
let borsh_bytes = match anchor_lang::__private::base64::decode(log) {
Ok(borsh_bytes) => borsh_bytes,
_ => continue,
};
let mut slice: &[u8] = &borsh_bytes[8..];
let event: Result<OpenBookFillEventRaw, Error> =
anchor_lang::AnchorDeserialize::deserialize(&mut slice);
match event {
Ok(e) => {
let fill_event = e.into_event(signature.clone(), block_time, idx);
if target_markets.contains_key(&fill_event.market) {
fills_vector.push(fill_event);
}
}
_ => continue,
}
}
_ => (),
}
}
if !fills_vector.is_empty() {
Some(fills_vector)
} else {
None
}
}

View File

@ -1,119 +0,0 @@
use deadpool_postgres::Pool;
use futures::future::join_all;
use log::{debug, warn};
use solana_client::{
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
rpc_config::RpcTransactionConfig,
};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature};
use solana_transaction_status::UiTransactionEncoding;
use std::{collections::HashMap, time::Duration as WaitDuration};
use crate::{
database::{
fetch::fetch_worker_transactions,
insert::{build_transactions_insert_statement, insert_fills_atomically},
},
structs::transaction::PgTransaction,
utils::{AnyhowWrap, OPENBOOK_KEY},
worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL, METRIC_TRANSACTIONS_TOTAL},
};
use super::parsing::parse_trades_from_openbook_txns;
pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<()> {
let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
loop {
let rpc_config = GetConfirmedSignaturesForAddress2Config {
before: None,
until: None,
limit: None,
commitment: Some(CommitmentConfig::confirmed()),
};
let sigs = match rpc_client
.get_signatures_for_address_with_config(&OPENBOOK_KEY, rpc_config)
.await
{
Ok(sigs) => sigs,
Err(e) => {
warn!("rpc error in get_signatures_for_address_with_config: {}", e);
METRIC_RPC_ERRORS_TOTAL
.with_label_values(&["getSignaturesForAddress"])
.inc();
continue;
}
};
if sigs.is_empty() {
debug!("No signatures found, trying again");
continue;
}
let transactions: Vec<PgTransaction> = sigs
.into_iter()
.map(PgTransaction::from_rpc_confirmed_transaction)
.collect();
debug!("Scraper writing: {:?} txns to DB\n", transactions.len());
let upsert_statement = build_transactions_insert_statement(transactions);
let client = pool.get().await?;
let num_txns = client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()?;
METRIC_TRANSACTIONS_TOTAL.inc_by(num_txns);
}
// TODO: graceful shutdown
}
pub async fn scrape_fills(
worker_id: i32,
rpc_url: String,
pool: &Pool,
target_markets: &HashMap<Pubkey, String>,
) -> anyhow::Result<()> {
debug!("Worker {} started \n", worker_id);
let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed());
loop {
let transactions = fetch_worker_transactions(worker_id, pool).await?;
if transactions.is_empty() {
debug!("No signatures found by worker {}", worker_id);
tokio::time::sleep(WaitDuration::from_secs(1)).await;
continue;
};
// for each signature, fetch the transaction
let txn_config = RpcTransactionConfig {
encoding: Some(UiTransactionEncoding::Json),
commitment: Some(CommitmentConfig::confirmed()),
max_supported_transaction_version: Some(0),
};
let sig_strings = transactions
.iter()
.map(|t| t.signature.clone())
.collect::<Vec<String>>();
let signatures: Vec<_> = transactions
.into_iter()
.map(|t| t.signature.parse::<Signature>().unwrap())
.collect();
let txn_futs: Vec<_> = signatures
.iter()
.map(|s| rpc_client.get_transaction_with_config(s, txn_config))
.collect();
let mut txns = join_all(txn_futs).await;
let (fills, completed_sigs) =
parse_trades_from_openbook_txns(&mut txns, sig_strings, target_markets);
for fill in fills.iter() {
let market_name = target_markets.get(&fill.market).unwrap();
METRIC_FILLS_TOTAL.with_label_values(&[market_name]).inc();
}
// Write fills to the database, and update properly fetched transactions as processed
insert_fills_atomically(pool, worker_id, fills, completed_sigs).await?;
}
}