Merge pull request #3 from blockworks-foundation/remove-scraper

Remove scraper
This commit is contained in:
Lou-Kamades 2023-07-26 20:26:04 -05:00 committed by GitHub
commit a0b32621a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 396 additions and 992 deletions

36
Cargo.lock generated
View File

@ -2615,6 +2615,15 @@ dependencies = [
"either", "either",
] ]
[[package]]
name = "itertools"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57"
dependencies = [
"either",
]
[[package]] [[package]]
name = "itoa" name = "itoa"
version = "1.0.5" version = "1.0.5"
@ -3450,6 +3459,7 @@ dependencies = [
"dotenv", "dotenv",
"env_logger 0.10.0", "env_logger 0.10.0",
"futures 0.3.27", "futures 0.3.27",
"itertools 0.11.0",
"jsonrpc-core-client", "jsonrpc-core-client",
"lazy_static", "lazy_static",
"log 0.4.17", "log 0.4.17",
@ -4014,7 +4024,7 @@ checksum = "a3f8ad728fb08fe212df3c05169e940fbb6d9d16a877ddde14644a983ba2012e"
dependencies = [ dependencies = [
"bytes 1.3.0", "bytes 1.3.0",
"heck 0.4.0", "heck 0.4.0",
"itertools", "itertools 0.10.5",
"lazy_static", "lazy_static",
"log 0.4.17", "log 0.4.17",
"multimap", "multimap",
@ -4035,7 +4045,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bda8c0881ea9f722eb9629376db3d0b903b462477c1aafcb0566610ac28ac5d" checksum = "8bda8c0881ea9f722eb9629376db3d0b903b462477c1aafcb0566610ac28ac5d"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"itertools", "itertools 0.10.5",
"proc-macro2 1.0.56", "proc-macro2 1.0.56",
"quote 1.0.26", "quote 1.0.26",
"syn 1.0.109", "syn 1.0.109",
@ -4877,7 +4887,7 @@ dependencies = [
"default-env", "default-env",
"enumflags2", "enumflags2",
"field-offset", "field-offset",
"itertools", "itertools 0.10.5",
"num-traits", "num-traits",
"num_enum 0.5.7", "num_enum 0.5.7",
"safe-transmute", "safe-transmute",
@ -5252,7 +5262,7 @@ dependencies = [
"futures-util", "futures-util",
"indexmap", "indexmap",
"indicatif", "indicatif",
"itertools", "itertools 0.10.5",
"jsonrpc-core", "jsonrpc-core",
"lazy_static", "lazy_static",
"log 0.4.17", "log 0.4.17",
@ -5416,7 +5426,7 @@ dependencies = [
"crossbeam-channel", "crossbeam-channel",
"flate2", "flate2",
"indexmap", "indexmap",
"itertools", "itertools 0.10.5",
"log 0.4.17", "log 0.4.17",
"lru", "lru",
"matches", "matches",
@ -5465,7 +5475,7 @@ dependencies = [
"dashmap", "dashmap",
"fs_extra", "fs_extra",
"futures 0.3.27", "futures 0.3.27",
"itertools", "itertools 0.10.5",
"lazy_static", "lazy_static",
"libc", "libc",
"log 0.4.17", "log 0.4.17",
@ -5643,7 +5653,7 @@ dependencies = [
"console_log", "console_log",
"curve25519-dalek", "curve25519-dalek",
"getrandom 0.2.8", "getrandom 0.2.8",
"itertools", "itertools 0.10.5",
"js-sys", "js-sys",
"lazy_static", "lazy_static",
"libc", "libc",
@ -5682,7 +5692,7 @@ dependencies = [
"bincode", "bincode",
"eager", "eager",
"enum-iterator", "enum-iterator",
"itertools", "itertools 0.10.5",
"libc", "libc",
"libloading", "libloading",
"log 0.4.17", "log 0.4.17",
@ -5739,7 +5749,7 @@ dependencies = [
"bs58 0.4.0", "bs58 0.4.0",
"crossbeam-channel", "crossbeam-channel",
"dashmap", "dashmap",
"itertools", "itertools 0.10.5",
"jsonrpc-core", "jsonrpc-core",
"jsonrpc-core-client", "jsonrpc-core-client",
"jsonrpc-derive", "jsonrpc-derive",
@ -5801,7 +5811,7 @@ dependencies = [
"fnv", "fnv",
"im", "im",
"index_list", "index_list",
"itertools", "itertools 0.10.5",
"lazy_static", "lazy_static",
"log 0.4.17", "log 0.4.17",
"lru", "lru",
@ -5863,7 +5873,7 @@ dependencies = [
"ed25519-dalek-bip32", "ed25519-dalek-bip32",
"generic-array", "generic-array",
"hmac 0.12.1", "hmac 0.12.1",
"itertools", "itertools 0.10.5",
"js-sys", "js-sys",
"lazy_static", "lazy_static",
"libsecp256k1", "libsecp256k1",
@ -6011,7 +6021,7 @@ dependencies = [
"futures-util", "futures-util",
"histogram", "histogram",
"indexmap", "indexmap",
"itertools", "itertools 0.10.5",
"libc", "libc",
"log 0.4.17", "log 0.4.17",
"nix", "nix",
@ -6143,7 +6153,7 @@ dependencies = [
"cipher 0.4.3", "cipher 0.4.3",
"curve25519-dalek", "curve25519-dalek",
"getrandom 0.1.16", "getrandom 0.1.16",
"itertools", "itertools 0.10.5",
"lazy_static", "lazy_static",
"merlin", "merlin",
"num-derive", "num-derive",

View File

@ -15,10 +15,6 @@ path = "src/worker/main.rs"
name = "server" name = "server"
path = "src/server/main.rs" path = "src/server/main.rs"
[[bin]]
name = "backfill-trades"
path = "src/backfill-trades/main.rs"
[[bin]] [[bin]]
name = "backfill-candles" name = "backfill-candles"
path = "src/backfill-candles/main.rs" path = "src/backfill-candles/main.rs"
@ -71,3 +67,4 @@ num_enum = "0.6.1"
config = "0.13.1" config = "0.13.1"
prometheus = "0.13.3" prometheus = "0.13.3"
lazy_static = "1.4.0" lazy_static = "1.4.0"
itertools = "0.11.0"

View File

@ -296,7 +296,7 @@ Returns 24-hour pricing and volume information on each market available.
**Request:** **Request:**
`GET /api/coingecko/orderbook/?ticker_id={ticker_id}&depth={depth}` `GET /api/coingecko/orderbook?ticker_id={ticker_id}&depth={depth}`
Returns order book information with a specified depth for a given market. Returns order book information with a specified depth for a given market.

View File

@ -1,20 +1,17 @@
use deadpool_postgres::Object;
use openbook_candles::{ use openbook_candles::{
database::{initialize::connect_to_database, insert::build_candles_upsert_statement}, database::{initialize::connect_to_database},
structs::{ structs::{
candle::Candle,
markets::{fetch_market_infos, load_markets}, markets::{fetch_market_infos, load_markets},
resolution::Resolution,
}, },
utils::{AnyhowWrap, Config}, utils::{Config},
worker::candle_batching::{ worker::candle_batching::{
higher_order_candles::backfill_batch_higher_order_candles, higher_order_candles::backfill_batch_higher_order_candles, minute_candles::backfill_batch_1m_candles,
minute_candles::backfill_batch_1m_candles,
}, },
}; };
use std::env; use std::env;
use strum::IntoEnumIterator;
#[tokio::main(flavor = "multi_thread", worker_threads = 10)] #[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
@ -33,31 +30,19 @@ async fn main() -> anyhow::Result<()> {
println!("Backfilling candles for {:?}", markets); println!("Backfilling candles for {:?}", markets);
let pool = connect_to_database().await?; let pool = connect_to_database().await?;
for market in market_infos.into_iter() { backfill_batch_1m_candles(&pool, market_infos.clone()).await?;
let client = pool.get().await?;
let minute_candles = backfill_batch_1m_candles(&pool, &market).await?;
save_candles(minute_candles, client).await?;
for resolution in Resolution::iter() { let mut handles = vec![];
if resolution == Resolution::R1m { let mi = market_infos.clone();
continue; for market in mi.into_iter() {
} let pc = pool.clone();
let higher_order_candles = handles.push(tokio::spawn(async move {
backfill_batch_higher_order_candles(&pool, &market.name, resolution).await?; backfill_batch_higher_order_candles(&pc, &market.name)
let client = pool.get().await?; .await
save_candles(higher_order_candles, client).await?; .unwrap();
} }));
}
Ok(())
}
async fn save_candles(candles: Vec<Candle>, client: Object) -> anyhow::Result<()> {
if !candles.is_empty() {
let upsert_statement = build_candles_upsert_statement(&candles);
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()?;
} }
futures::future::join_all(handles).await;
Ok(()) Ok(())
} }

View File

@ -1,141 +0,0 @@
use anchor_lang::prelude::Pubkey;
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use deadpool_postgres::Pool;
use log::debug;
use openbook_candles::{
database::{
initialize::{connect_to_database, setup_database},
insert::build_transactions_insert_statement,
},
structs::{
markets::{fetch_market_infos, load_markets},
transaction::{PgTransaction, NUM_TRANSACTION_PARTITIONS},
},
utils::{AnyhowWrap, Config, OPENBOOK_KEY},
worker::trade_fetching::scrape::scrape_fills,
};
use solana_client::{
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
};
use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature};
use std::{collections::HashMap, env, str::FromStr};
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
let args: Vec<String> = env::args().collect();
assert!(args.len() == 2);
let path_to_markets_json = &args[1];
// let num_days = args[2].parse::<i64>().unwrap(); // TODO: implement
let num_days = 1;
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
let config = Config {
rpc_url: rpc_url.clone(),
};
let markets = load_markets(path_to_markets_json);
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
let mut target_markets = HashMap::new();
for m in market_infos.clone() {
target_markets.insert(Pubkey::from_str(&m.address)?, m.name);
}
println!("{:?}", target_markets);
let pool = connect_to_database().await?;
setup_database(&pool).await?;
let mut handles = vec![];
let rpc_clone = rpc_url.clone();
let pool_clone = pool.clone();
handles.push(tokio::spawn(async move {
fetch_signatures(rpc_clone, &pool_clone, num_days)
.await
.unwrap();
}));
// Low priority improvement: batch fills into 1000's per worker
for id in 0..NUM_TRANSACTION_PARTITIONS {
let rpc_clone = rpc_url.clone();
let pool_clone = pool.clone();
let markets_clone = target_markets.clone();
handles.push(tokio::spawn(async move {
scrape_fills(id as i32, rpc_clone, &pool_clone, &markets_clone)
.await
.unwrap();
}));
}
// TODO: spawn status thread
futures::future::join_all(handles).await;
Ok(())
}
pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> anyhow::Result<()> {
let mut before_sig: Option<Signature> = None;
let mut now_time = Utc::now().timestamp();
let end_time = (Utc::now() - Duration::days(num_days)).timestamp();
let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
while now_time > end_time {
let rpc_config = GetConfirmedSignaturesForAddress2Config {
before: before_sig,
until: None,
limit: None,
commitment: Some(CommitmentConfig::confirmed()),
};
let sigs = match rpc_client
.get_signatures_for_address_with_config(&OPENBOOK_KEY, rpc_config)
.await
{
Ok(sigs) => sigs,
Err(e) => {
println!("Error fetching signatures: {}", e);
continue;
}
};
if sigs.is_empty() {
println!("No signatures found, trying again");
continue;
}
let last = sigs.last().unwrap();
let last_time = last.block_time.unwrap();
let last_signature = last.signature.clone();
let transactions = sigs
.into_iter()
.map(PgTransaction::from_rpc_confirmed_transaction)
.collect::<Vec<PgTransaction>>();
if transactions.is_empty() {
println!("No transactions found, trying again");
}
debug!("writing: {:?} txns to DB\n", transactions.len());
let upsert_statement = build_transactions_insert_statement(transactions);
let client = pool.get().await?;
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()?;
now_time = last_time;
before_sig = Some(Signature::from_str(&last_signature)?);
let time_left = backfill_time_left(now_time, end_time);
println!(
"{} minutes ~ {} days remaining in the backfill\n",
time_left.num_minutes(),
time_left.num_days()
);
}
Ok(())
}
fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration {
let naive_cur = NaiveDateTime::from_timestamp_millis(current_time * 1000).unwrap();
let naive_bf = NaiveDateTime::from_timestamp_millis(backfill_end * 1000).unwrap();
let cur_date = DateTime::<Utc>::from_utc(naive_cur, Utc);
let bf_date = DateTime::<Utc>::from_utc(naive_bf, Utc);
cur_date - bf_date
}

82
src/database/backfill.rs Normal file
View File

@ -0,0 +1,82 @@
use crate::structs::{candle::Candle, openbook::PgOpenBookFill};
use chrono::{DateTime, Utc};
use deadpool_postgres::{GenericClient, Object};
pub async fn fetch_earliest_fill_multiple_markets(
conn_object: &Object,
market_address_strings: &Vec<String>,
) -> anyhow::Result<Option<PgOpenBookFill>> {
let stmt = r#"SELECT
block_datetime as "time",
market as "market_key",
bid as "bid",
maker as "maker",
price as "price",
size as "size"
from openbook.openbook_fill_events
where market = ANY($1)
and maker = true
ORDER BY time asc LIMIT 1"#;
let row = conn_object
.query_opt(stmt, &[&market_address_strings])
.await?;
match row {
Some(r) => Ok(Some(PgOpenBookFill::from_row(r))),
None => Ok(None),
}
}
pub async fn fetch_fills_multiple_markets_from(
conn_object: &Object,
market_address_strings: &Vec<String>,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
) -> anyhow::Result<Vec<PgOpenBookFill>> {
let stmt = r#"SELECT
block_datetime as "time",
market as "market_key",
bid as "bid",
maker as "maker",
price as "price",
size as "size"
from openbook.openbook_fill_events
where market = ANY($1)
and block_datetime >= $2::timestamptz
and block_datetime < $3::timestamptz
and maker = true
ORDER BY time asc"#;
let rows = conn_object
.query(stmt, &[&market_address_strings, &start_time, &end_time])
.await?;
Ok(rows.into_iter().map(PgOpenBookFill::from_row).collect())
}
pub async fn fetch_last_minute_candles(conn_object: &Object) -> anyhow::Result<Vec<Candle>> {
let stmt = r#"SELECT
c.market_name as "market_name",
c.start_time as "start_time",
c.end_time as "end_time",
c.resolution as "resolution",
c.open as "open",
c.close as "close",
c.high as "high",
c.low as "low",
c.volume as "volume",
c.complete as "complete"
from
(
select market_name, max(start_time) as max_start_time from openbook.candles
where resolution = '1M'
group by market_name
) mkts
left join openbook.candles c
on mkts.market_name = c.market_name
and mkts.max_start_time = c.start_time
where c.resolution ='1M'"#;
let rows = conn_object.query(stmt, &[]).await?;
Ok(rows.into_iter().map(Candle::from_row).collect())
}

View File

@ -4,7 +4,6 @@ use crate::structs::{
openbook::PgOpenBookFill, openbook::PgOpenBookFill,
resolution::Resolution, resolution::Resolution,
trader::PgTrader, trader::PgTrader,
transaction::PgTransaction,
}; };
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use deadpool_postgres::{GenericClient, Pool}; use deadpool_postgres::{GenericClient, Pool};
@ -16,13 +15,13 @@ pub async fn fetch_earliest_fill(
let client = pool.get().await?; let client = pool.get().await?;
let stmt = r#"SELECT let stmt = r#"SELECT
time as "time!", block_datetime as "time",
bid as "bid!", market as "market_key",
maker as "maker!", bid as "bid",
native_qty_paid as "native_qty_paid!", maker as "maker",
native_qty_received as "native_qty_received!", price as "price",
native_fee_or_rebate as "native_fee_or_rebate!" size as "size"
from fills from openbook.openbook_fill_events
where market = $1 where market = $1
and maker = true and maker = true
ORDER BY time asc LIMIT 1"#; ORDER BY time asc LIMIT 1"#;
@ -44,16 +43,16 @@ pub async fn fetch_fills_from(
let client = pool.get().await?; let client = pool.get().await?;
let stmt = r#"SELECT let stmt = r#"SELECT
time as "time!", block_datetime as "time",
bid as "bid!", market as "market_key",
maker as "maker!", bid as "bid",
native_qty_paid as "native_qty_paid!", maker as "maker",
native_qty_received as "native_qty_received!", price as "price",
native_fee_or_rebate as "native_fee_or_rebate!" size as "size"
from fills from openbook.openbook_fill_events
where market = $1 where market = $1
and time >= $2::timestamptz and block_datetime >= $2::timestamptz
and time < $3::timestamptz and block_datetime < $3::timestamptz
and maker = true and maker = true
ORDER BY time asc"#; ORDER BY time asc"#;
@ -71,17 +70,17 @@ pub async fn fetch_latest_finished_candle(
let client = pool.get().await?; let client = pool.get().await?;
let stmt = r#"SELECT let stmt = r#"SELECT
market_name as "market_name!", market_name as "market_name",
start_time as "start_time!", start_time as "start_time",
end_time as "end_time!", end_time as "end_time",
resolution as "resolution!", resolution as "resolution",
open as "open!", open as "open",
close as "close!", close as "close",
high as "high!", high as "high",
low as "low!", low as "low",
volume as "volume!", volume as "volume",
complete as "complete!" complete as "complete"
from candles from openbook.candles
where market_name = $1 where market_name = $1
and resolution = $2 and resolution = $2
and complete = true and complete = true
@ -97,8 +96,8 @@ pub async fn fetch_latest_finished_candle(
} }
} }
/// Fetches all of the candles for the given market and resoultion, starting from the earliest. /// Fetches all of the candles for the given market and resolution, starting from the earliest.
/// Note that this function will fetch ALL candles. /// Note that this function will fetch at most 2000 candles.
pub async fn fetch_earliest_candles( pub async fn fetch_earliest_candles(
pool: &Pool, pool: &Pool,
market_name: &str, market_name: &str,
@ -107,20 +106,21 @@ pub async fn fetch_earliest_candles(
let client = pool.get().await?; let client = pool.get().await?;
let stmt = r#"SELECT let stmt = r#"SELECT
market_name as "market_name!", market_name as "market_name",
start_time as "start_time!", start_time as "start_time",
end_time as "end_time!", end_time as "end_time",
resolution as "resolution!", resolution as "resolution!",
open as "open!", open as "open",
close as "close!", close as "close",
high as "high!", high as "high",
low as "low!", low as "low",
volume as "volume!", volume as "volume",
complete as "complete!" complete as "complete"
from candles from openbook.candles
where market_name = $1 where market_name = $1
and resolution = $2 and resolution = $2
ORDER BY start_time asc"#; ORDER BY start_time asc
LIMIT 2000"#;
let rows = client let rows = client
.query(stmt, &[&market_name, &resolution.to_string()]) .query(stmt, &[&market_name, &resolution.to_string()])
@ -139,17 +139,17 @@ pub async fn fetch_candles_from(
let client = pool.get().await?; let client = pool.get().await?;
let stmt = r#"SELECT let stmt = r#"SELECT
market_name as "market_name!", market_name as "market_name",
start_time as "start_time!", start_time as "start_time",
end_time as "end_time!", end_time as "end_time",
resolution as "resolution!", resolution as "resolution",
open as "open!", open as "open",
close as "close!", close as "close",
high as "high!", high as "high",
low as "low!", low as "low",
volume as "volume!", volume as "volume",
complete as "complete!" complete as "complete"
from candles from openbook.candles
where market_name = $1 where market_name = $1
and resolution = $2 and resolution = $2
and start_time >= $3 and start_time >= $3
@ -182,20 +182,20 @@ pub async fn fetch_top_traders_by_base_volume_from(
let stmt = r#"SELECT let stmt = r#"SELECT
open_orders_owner, open_orders_owner,
sum( sum(
native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END native_quantity_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
) as "raw_ask_size!", ) as "raw_ask_size",
sum( sum(
native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END native_quantity_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
) as "raw_bid_size!" ) as "raw_bid_size"
FROM fills FROM openbook.openbook_fill_events
WHERE market = $1 WHERE market = $1
AND time >= $2 AND time >= $2
AND time < $3 AND time < $3
GROUP BY open_orders_owner GROUP BY open_orders_owner
ORDER BY ORDER BY
sum(native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END) sum(native_quantity_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
+ +
sum(native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END) sum(native_quantity_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
DESC DESC
LIMIT 10000"#; LIMIT 10000"#;
@ -217,20 +217,20 @@ pub async fn fetch_top_traders_by_quote_volume_from(
let stmt = r#"SELECT let stmt = r#"SELECT
open_orders_owner, open_orders_owner,
sum( sum(
native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END native_quantity_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
) as "raw_ask_size!", ) as "raw_ask_size",
sum( sum(
native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END native_quantity_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
) as "raw_bid_size!" ) as "raw_bid_size"
FROM fills FROM openbook.openbook_fill_events
WHERE market = $1 WHERE market = $1
AND time >= $2 AND time >= $2
AND time < $3 AND time < $3
GROUP BY open_orders_owner GROUP BY open_orders_owner
ORDER BY ORDER BY
sum(native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END) sum(native_quantity_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
+ +
sum(native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END) sum(native_quantity_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
DESC DESC
LIMIT 10000"#; LIMIT 10000"#;
@ -249,24 +249,27 @@ pub async fn fetch_coingecko_24h_volume(
let stmt = r#"SELECT let stmt = r#"SELECT
t1.market, t1.market,
COALESCE(t2.native_qty_received, 0) as "raw_base_size!", COALESCE(t2.base_size, 0) as "base_size",
COALESCE(t2.native_qty_paid, 0) as "raw_quote_size!" COALESCE(t3.quote_size, 0) as "quote_size"
FROM ( FROM (
SELECT distinct on (market) * SELECT unnest($1::text[]) as market
FROM fills f
where bid = true
and market = any($1)
order by market, "time" desc
) t1 ) t1
LEFT JOIN ( LEFT JOIN (
select market, select market,
sum(native_qty_received) as "native_qty_received", sum("size") as "base_size"
sum(native_qty_paid) as "native_qty_paid" from openbook.openbook_fill_events
from fills where block_datetime >= current_timestamp - interval '1 day'
where "time" >= current_timestamp - interval '1 day'
and bid = true and bid = true
group by market group by market
) t2 ON t1.market = t2.market"#; ) t2 ON t1.market = t2.market
LEFT JOIN (
select market,
sum("size" * price) as "quote_size"
from openbook.openbook_fill_events
where block_datetime >= current_timestamp - interval '1 day'
and bid = true
group by market
) t3 ON t1.market = t3.market"#;
let rows = client.query(stmt, &[&market_address_strings]).await?; let rows = client.query(stmt, &[&market_address_strings]).await?;
@ -291,10 +294,10 @@ pub async fn fetch_coingecko_24h_high_low(
( (
SELECT * SELECT *
from from
candles openbook.candles
where (market_name, start_time, resolution) in ( where (market_name, start_time, resolution) in (
select market_name, max(start_time), resolution select market_name, max(start_time), resolution
from candles from openbook.candles
where "resolution" = '1M' where "resolution" = '1M'
and market_name = any($1) and market_name = any($1)
group by market_name, resolution group by market_name, resolution
@ -307,7 +310,7 @@ pub async fn fetch_coingecko_24h_high_low(
max(high) as "high", max(high) as "high",
min(low) as "low" min(low) as "low"
from from
candles openbook.candles
where where
"resolution" = '1M' "resolution" = '1M'
and "start_time" >= current_timestamp - interval '1 day' and "start_time" >= current_timestamp - interval '1 day'
@ -321,23 +324,3 @@ pub async fn fetch_coingecko_24h_high_low(
.map(PgCoinGecko24HighLow::from_row) .map(PgCoinGecko24HighLow::from_row)
.collect()) .collect())
} }
/// Fetches unprocessed, non-error transactions for the specified worker partition.
/// Pulls at most 50 transactions at a time.
pub async fn fetch_worker_transactions(
worker_id: i32,
pool: &Pool,
) -> anyhow::Result<Vec<PgTransaction>> {
let client = pool.get().await?;
let stmt = r#"SELECT signature, program_pk, block_datetime, slot, err, "processed", worker_partition
FROM transactions
where worker_partition = $1
and err = false
and processed = false
LIMIT 50"#;
let rows = client.query(stmt, &[&worker_id]).await?;
Ok(rows.into_iter().map(PgTransaction::from_row).collect())
}

View File

@ -21,8 +21,8 @@ pub async fn connect_to_database() -> anyhow::Result<Pool> {
// openssl pkcs12 -export -in client.cer -inkey client-key.cer -out client.pks // openssl pkcs12 -export -in client.cer -inkey client-key.cer -out client.pks
// base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64 // base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64
// fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a mango-fills // fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a APP-NAME
// fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills // fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a APP-NAME
let tls = if pg_config.pg_use_ssl { let tls = if pg_config.pg_use_ssl {
pg_config.pg.ssl_mode = Some(SslMode::Require); pg_config.pg.ssl_mode = Some(SslMode::Require);
let ca_cert = fs::read(pg_config.pg_ca_cert_path.expect("reading ca cert from env")) let ca_cert = fs::read(pg_config.pg_ca_cert_path.expect("reading ca cert from env"))
@ -66,11 +66,7 @@ pub async fn connect_to_database() -> anyhow::Result<Pool> {
} }
pub async fn setup_database(pool: &Pool) -> anyhow::Result<()> { pub async fn setup_database(pool: &Pool) -> anyhow::Result<()> {
let candles_table_fut = create_candles_table(pool); match create_candles_table(pool).await {
let transactions_table_fut = create_transactions_table(pool);
let fills_table_fut = create_fills_table(pool);
let result = tokio::try_join!(candles_table_fut, transactions_table_fut, fills_table_fut);
match result {
Ok(_) => { Ok(_) => {
println!("Successfully configured database"); println!("Successfully configured database");
Ok(()) Ok(())
@ -87,7 +83,7 @@ pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> {
client client
.execute( .execute(
"CREATE TABLE IF NOT EXISTS candles ( "CREATE TABLE IF NOT EXISTS openbook.candles (
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
market_name text, market_name text,
start_time timestamptz, start_time timestamptz,
@ -105,90 +101,9 @@ pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> {
.await?; .await?;
client.execute( client.execute(
"CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market_name, start_time, resolution)", "CREATE UNIQUE INDEX IF NOT EXISTS idx_market_time_resolution ON openbook.candles USING btree (market_name, start_time, resolution);",
&[] &[]
).await?; ).await?;
client.execute(
"DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'unique_candles') THEN
ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market_name, start_time, resolution);
END IF;
END $$", &[]
).await?;
Ok(())
}
pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> {
let client = pool.get().await?;
client
.execute(
"CREATE TABLE IF NOT EXISTS fills (
signature text not null,
time timestamptz not null,
market text not null,
open_orders text not null,
open_orders_owner text not null,
bid bool not null,
maker bool not null,
native_qty_paid double precision not null,
native_qty_received double precision not null,
native_fee_or_rebate double precision not null,
fee_tier text not null,
order_id text not null,
log_index int4 not null,
CONSTRAINT fills_pk PRIMARY KEY (signature, log_index)
)",
&[],
)
.await?;
client
.execute(
"CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)",
&[],
)
.await?;
Ok(())
}
pub async fn create_transactions_table(pool: &Pool) -> anyhow::Result<()> {
let client = pool.get().await?;
client
.execute(
"CREATE TABLE IF NOT EXISTS transactions (
signature text NOT NULL,
program_pk text NOT NULL,
block_datetime timestamptz NOT NULL,
slot bigint NOT NULL,
err bool NOT NULL,
processed bool NOT NULL,
worker_partition int4 NOT NULL,
CONSTRAINT transactions_pk PRIMARY KEY (signature, worker_partition)
) PARTITION BY LIST (worker_partition);",
&[],
)
.await?;
client.batch_execute(
"CREATE INDEX IF NOT EXISTS transactions_processed_err_idx ON ONLY transactions (signature) WHERE processed IS NOT TRUE and err IS NOT TRUE;
CREATE INDEX IF NOT EXISTS transactions_program_pk_idx ON ONLY transactions USING btree (program_pk, slot DESC);
CREATE TABLE IF NOT EXISTS transactions_0 PARTITION OF transactions FOR VALUES IN (0);
CREATE TABLE IF NOT EXISTS transactions_1 PARTITION OF transactions FOR VALUES IN (1);
CREATE TABLE IF NOT EXISTS transactions_2 PARTITION OF transactions FOR VALUES IN (2);
CREATE TABLE IF NOT EXISTS transactions_3 PARTITION OF transactions FOR VALUES IN (3);
CREATE TABLE IF NOT EXISTS transactions_4 PARTITION OF transactions FOR VALUES IN (4);
CREATE TABLE IF NOT EXISTS transactions_5 PARTITION OF transactions FOR VALUES IN (5);
CREATE TABLE IF NOT EXISTS transactions_6 PARTITION OF transactions FOR VALUES IN (6);
CREATE TABLE IF NOT EXISTS transactions_7 PARTITION OF transactions FOR VALUES IN (7);
CREATE TABLE IF NOT EXISTS transactions_8 PARTITION OF transactions FOR VALUES IN (8);
CREATE TABLE IF NOT EXISTS transactions_9 PARTITION OF transactions FOR VALUES IN (9);"
).await?;
Ok(()) Ok(())
} }

View File

@ -1,79 +1,7 @@
use deadpool_postgres::Pool; use crate::structs::candle::Candle;
use crate::{
structs::{candle::Candle, openbook::OpenBookFillEvent, transaction::PgTransaction},
utils::{to_timestampz, AnyhowWrap},
};
pub async fn insert_fills_atomically(
pool: &Pool,
worker_id: i32,
fills: Vec<OpenBookFillEvent>,
signatures: Vec<String>,
) -> anyhow::Result<()> {
let mut client = pool.get().await?;
let db_txn = client.build_transaction().start().await?;
// 1. Insert fills
if !fills.is_empty() {
let fills_statement = build_fills_upsert_statement(fills);
db_txn
.execute(&fills_statement, &[])
.await
.map_err_anyhow()
.unwrap();
}
// 2. Update txns table as processed
let transactions_statement =
build_transactions_processed_update_statement(worker_id, signatures);
db_txn
.execute(&transactions_statement, &[])
.await
.map_err_anyhow()
.unwrap();
db_txn.commit().await?;
Ok(())
}
fn build_fills_upsert_statement(fills: Vec<OpenBookFillEvent>) -> String {
let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES");
for (idx, fill) in fills.iter().enumerate() {
let val_str = format!(
"(\'{}\', \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})",
fill.signature,
to_timestampz(fill.block_time as u64).to_rfc3339(),
fill.market,
fill.open_orders,
fill.open_orders_owner,
fill.bid,
fill.maker,
fill.native_qty_paid,
fill.native_qty_received,
fill.native_fee_or_rebate,
fill.fee_tier,
fill.order_id,
fill.log_index,
);
if idx == 0 {
stmt = format!("{} {}", &stmt, val_str);
} else {
stmt = format!("{}, {}", &stmt, val_str);
}
}
let handle_conflict = "ON CONFLICT DO NOTHING";
stmt = format!("{} {}", stmt, handle_conflict);
stmt
}
pub fn build_candles_upsert_statement(candles: &Vec<Candle>) -> String { pub fn build_candles_upsert_statement(candles: &Vec<Candle>) -> String {
let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES"); let mut stmt = String::from("INSERT INTO openbook.candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES");
for (idx, candle) in candles.iter().enumerate() { for (idx, candle) in candles.iter().enumerate() {
let val_str = format!( let val_str = format!(
"(\'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {})", "(\'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {})",
@ -109,54 +37,3 @@ pub fn build_candles_upsert_statement(candles: &Vec<Candle>) -> String {
stmt = format!("{} {}", stmt, handle_conflict); stmt = format!("{} {}", stmt, handle_conflict);
stmt stmt
} }
pub fn build_transactions_insert_statement(transactions: Vec<PgTransaction>) -> String {
let mut stmt = String::from("INSERT INTO transactions (signature, program_pk, block_datetime, slot, err, processed, worker_partition) VALUES");
for (idx, txn) in transactions.iter().enumerate() {
let val_str = format!(
"(\'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {})",
txn.signature,
txn.program_pk,
txn.block_datetime.to_rfc3339(),
txn.slot,
txn.err,
txn.processed,
txn.worker_partition,
);
if idx == 0 {
stmt = format!("{} {}", &stmt, val_str);
} else {
stmt = format!("{}, {}", &stmt, val_str);
}
}
let handle_conflict = "ON CONFLICT DO NOTHING";
stmt = format!("{} {}", stmt, handle_conflict);
stmt
}
pub fn build_transactions_processed_update_statement(
worker_id: i32,
processed_signatures: Vec<String>,
) -> String {
let mut stmt = String::from(
"UPDATE transactions
SET processed = true
WHERE transactions.signature IN (",
);
for (idx, sig) in processed_signatures.iter().enumerate() {
let val_str = if idx == processed_signatures.len() - 1 {
format!("\'{}\'", sig,)
} else {
format!("\'{}\',", sig,)
};
stmt = format!("{} {}", &stmt, val_str);
}
let worker_stmt = format!(") AND worker_partition = {} ", worker_id);
stmt = format!("{} {}", stmt, worker_stmt);
stmt
}

View File

@ -1,3 +1,4 @@
pub mod backfill;
pub mod fetch; pub mod fetch;
pub mod initialize; pub mod initialize;
pub mod insert; pub mod insert;

View File

@ -1,7 +1,7 @@
use serde::Serialize; use serde::Serialize;
use tokio_postgres::Row; use tokio_postgres::Row;
use super::{markets::MarketInfo, openbook::token_factor}; use super::markets::MarketInfo;
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct CoinGeckoOrderBook { pub struct CoinGeckoOrderBook {
@ -35,26 +35,24 @@ pub struct CoinGeckoTicker {
pub struct PgCoinGecko24HourVolume { pub struct PgCoinGecko24HourVolume {
pub address: String, pub address: String,
pub raw_base_size: f64, pub base_size: f64,
pub raw_quote_size: f64, pub quote_size: f64,
} }
impl PgCoinGecko24HourVolume { impl PgCoinGecko24HourVolume {
pub fn convert_to_readable(&self, markets: &Vec<MarketInfo>) -> CoinGecko24HourVolume { pub fn convert_to_readable(&self, markets: &Vec<MarketInfo>) -> CoinGecko24HourVolume {
let market = markets.iter().find(|m| m.address == self.address).unwrap(); let market = markets.iter().find(|m| m.address == self.address).unwrap();
let base_volume = self.raw_base_size / token_factor(market.base_decimals);
let target_volume = self.raw_quote_size / token_factor(market.quote_decimals);
CoinGecko24HourVolume { CoinGecko24HourVolume {
market_name: market.name.clone(), market_name: market.name.clone(),
base_volume, base_volume: self.base_size,
target_volume, target_volume: self.quote_size,
} }
} }
pub fn from_row(row: Row) -> Self { pub fn from_row(row: Row) -> Self {
PgCoinGecko24HourVolume { PgCoinGecko24HourVolume {
address: row.get(0), address: row.get(0),
raw_base_size: row.get(1), base_size: row.get(1),
raw_quote_size: row.get(2), quote_size: row.get(2),
} }
} }
} }

View File

@ -6,4 +6,3 @@ pub mod resolution;
pub mod slab; pub mod slab;
pub mod trader; pub mod trader;
pub mod tradingview; pub mod tradingview;
pub mod transaction;

View File

@ -1,93 +1,26 @@
use anchor_lang::{event, AnchorDeserialize, AnchorSerialize}; use anchor_lang::AnchorDeserialize;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use num_traits::Pow; use num_traits::Pow;
use solana_sdk::pubkey::Pubkey;
use tokio_postgres::Row; use tokio_postgres::Row;
#[event] #[derive(Clone, Debug, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OpenBookFillEventRaw {
pub market: Pubkey,
pub open_orders: Pubkey,
pub open_orders_owner: Pubkey,
pub bid: bool,
pub maker: bool,
pub native_qty_paid: u64,
pub native_qty_received: u64,
pub native_fee_or_rebate: u64,
pub order_id: u128,
pub owner_slot: u8,
pub fee_tier: u8,
pub client_order_id: Option<u64>,
pub referrer_rebate: Option<u64>,
}
impl OpenBookFillEventRaw {
pub fn into_event(
self,
signature: String,
block_time: i64,
log_index: usize,
) -> OpenBookFillEvent {
OpenBookFillEvent {
signature,
market: self.market,
open_orders: self.open_orders,
open_orders_owner: self.open_orders_owner,
bid: self.bid,
maker: self.maker,
native_qty_paid: self.native_qty_paid,
native_qty_received: self.native_qty_received,
native_fee_or_rebate: self.native_fee_or_rebate,
order_id: self.order_id,
owner_slot: self.owner_slot,
fee_tier: self.fee_tier,
client_order_id: self.client_order_id,
referrer_rebate: self.referrer_rebate,
block_time,
log_index,
}
}
}
#[event]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OpenBookFillEvent {
pub signature: String,
pub market: Pubkey,
pub open_orders: Pubkey,
pub open_orders_owner: Pubkey,
pub bid: bool,
pub maker: bool,
pub native_qty_paid: u64,
pub native_qty_received: u64,
pub native_fee_or_rebate: u64,
pub order_id: u128,
pub owner_slot: u8,
pub fee_tier: u8,
pub client_order_id: Option<u64>,
pub referrer_rebate: Option<u64>,
pub block_time: i64,
pub log_index: usize,
}
#[derive(Copy, Clone, Debug, PartialEq)]
pub struct PgOpenBookFill { pub struct PgOpenBookFill {
pub time: DateTime<Utc>, pub time: DateTime<Utc>,
pub market_key: String,
pub bid: bool, pub bid: bool,
pub maker: bool, pub maker: bool,
pub native_qty_paid: f64, pub price: f64,
pub native_qty_received: f64, pub size: f64,
pub native_fee_or_rebate: f64,
} }
impl PgOpenBookFill { impl PgOpenBookFill {
pub fn from_row(row: Row) -> Self { pub fn from_row(row: Row) -> Self {
PgOpenBookFill { PgOpenBookFill {
time: row.get(0), time: row.get(0),
bid: row.get(1), market_key: row.get(1),
maker: row.get(2), bid: row.get(2),
native_qty_paid: row.get(3), maker: row.get(3),
native_qty_received: row.get(4), price: row.get(4),
native_fee_or_rebate: row.get(5), size: row.get(5),
} }
} }
} }
@ -147,34 +80,6 @@ pub struct MarketState {
pub referrer_rebates_accrued: u64, pub referrer_rebates_accrued: u64,
} }
pub fn calculate_fill_price_and_size(
fill: PgOpenBookFill,
base_decimals: u8,
quote_decimals: u8,
) -> (f64, f64) {
if fill.bid {
let price_before_fees = if fill.maker {
fill.native_qty_paid + fill.native_fee_or_rebate
} else {
fill.native_qty_paid - fill.native_fee_or_rebate
};
let price = (price_before_fees * token_factor(base_decimals))
/ (token_factor(quote_decimals) * fill.native_qty_received);
let size = fill.native_qty_received / token_factor(base_decimals);
(price, size)
} else {
let price_before_fees = if fill.maker {
fill.native_qty_received - fill.native_fee_or_rebate
} else {
fill.native_qty_received + fill.native_fee_or_rebate
};
let price = (price_before_fees * token_factor(base_decimals))
/ (token_factor(quote_decimals) * fill.native_qty_paid);
let size = fill.native_qty_paid / token_factor(base_decimals);
(price, size)
}
}
pub fn token_factor(decimals: u8) -> f64 { pub fn token_factor(decimals: u8) -> f64 {
10f64.pow(decimals as f64) 10f64.pow(decimals as f64)
} }

View File

@ -9,8 +9,8 @@ use super::openbook::token_factor;
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
pub struct PgTrader { pub struct PgTrader {
pub open_orders_owner: String, pub open_orders_owner: String,
pub raw_ask_size: f64, pub raw_ask_size: i64,
pub raw_bid_size: f64, pub raw_bid_size: i64,
} }
impl PgTrader { impl PgTrader {
pub fn from_row(row: Row) -> Self { pub fn from_row(row: Row) -> Self {
@ -52,8 +52,8 @@ pub struct TraderResponse {
// Note that the Postgres queries only return volumes in base or quote // Note that the Postgres queries only return volumes in base or quote
pub fn calculate_trader_volume(trader: PgTrader, decimals: u8) -> Trader { pub fn calculate_trader_volume(trader: PgTrader, decimals: u8) -> Trader {
let bid_size = trader.raw_bid_size / token_factor(decimals); let bid_size = (trader.raw_bid_size as f64) / token_factor(decimals);
let ask_size = trader.raw_ask_size / token_factor(decimals); let ask_size = (trader.raw_ask_size as f64) / token_factor(decimals);
Trader { Trader {
pubkey: trader.open_orders_owner, pubkey: trader.open_orders_owner,

View File

@ -1,52 +0,0 @@
use chrono::{DateTime, Utc};
use solana_client::rpc_response::RpcConfirmedTransactionStatusWithSignature;
use tokio_postgres::Row;
use crate::utils::{to_timestampz, OPENBOOK_KEY};
#[derive(Clone, Debug, PartialEq)]
pub struct PgTransaction {
pub signature: String,
pub program_pk: String,
pub block_datetime: DateTime<Utc>,
pub slot: u64,
pub err: bool,
pub processed: bool,
pub worker_partition: i32,
}
pub const NUM_TRANSACTION_PARTITIONS: u64 = 10;
impl PgTransaction {
pub fn from_rpc_confirmed_transaction(
rpc_confirmed_transaction: RpcConfirmedTransactionStatusWithSignature,
) -> Self {
PgTransaction {
signature: rpc_confirmed_transaction.signature,
program_pk: OPENBOOK_KEY.to_string(),
block_datetime: to_timestampz(rpc_confirmed_transaction.block_time.unwrap() as u64),
slot: rpc_confirmed_transaction.slot,
err: rpc_confirmed_transaction.err.is_some(),
processed: false,
worker_partition: (rpc_confirmed_transaction.slot % NUM_TRANSACTION_PARTITIONS) as i32,
}
}
pub fn from_row(row: Row) -> Self {
let slot_raw = row.get::<usize, i64>(3);
PgTransaction {
signature: row.get(0),
program_pk: row.get(1),
block_datetime: row.get(2),
slot: slot_raw as u64,
err: row.get(4),
processed: row.get(5),
worker_partition: row.get(6),
}
}
}
pub enum ProcessState {
Processed,
Unprocessed,
}

View File

@ -1,15 +1,22 @@
use chrono::{DateTime, Duration, DurationRound, Utc}; use chrono::{DateTime, Duration, DurationRound, Utc};
use deadpool_postgres::Pool; use deadpool_postgres::Pool;
use log::debug; use log::debug;
use std::cmp::max; use std::cmp::{max, min};
use strum::IntoEnumIterator;
use crate::{ use crate::{
database::fetch::{fetch_candles_from, fetch_earliest_candles, fetch_latest_finished_candle}, database::{
fetch::{
fetch_candles_from, fetch_earliest_candles,
fetch_latest_finished_candle,
},
insert::build_candles_upsert_statement,
},
structs::{ structs::{
candle::Candle, candle::Candle,
resolution::{day, Resolution}, resolution::{day, Resolution},
}, },
utils::{f64_max, f64_min}, utils::{f64_max, f64_min, AnyhowWrap},
}; };
pub async fn batch_higher_order_candles( pub async fn batch_higher_order_candles(
@ -34,12 +41,8 @@ pub async fn batch_higher_order_candles(
if constituent_candles.is_empty() { if constituent_candles.is_empty() {
return Ok(Vec::new()); return Ok(Vec::new());
} }
let combined_candles = combine_into_higher_order_candles( let combined_candles =
&mut constituent_candles, combine_into_higher_order_candles(&mut constituent_candles, resolution, start_time);
resolution,
start_time,
candle,
);
Ok(combined_candles) Ok(combined_candles)
} }
None => { None => {
@ -61,13 +64,8 @@ pub async fn batch_higher_order_candles(
return Ok(Vec::new()); return Ok(Vec::new());
} }
let seed_candle = constituent_candles[0].clone(); let combined_candles =
let combined_candles = combine_into_higher_order_candles( combine_into_higher_order_candles(&mut constituent_candles, resolution, start_time);
&mut constituent_candles,
resolution,
start_time,
seed_candle,
);
Ok(trim_candles( Ok(trim_candles(
combined_candles, combined_candles,
@ -78,10 +76,9 @@ pub async fn batch_higher_order_candles(
} }
fn combine_into_higher_order_candles( fn combine_into_higher_order_candles(
constituent_candles: &mut Vec<Candle>, constituent_candles: &Vec<Candle>,
target_resolution: Resolution, target_resolution: Resolution,
st: DateTime<Utc>, st: DateTime<Utc>,
seed_candle: Candle,
) -> Vec<Candle> { ) -> Vec<Candle> {
debug!("combining for target_resolution: {}", target_resolution); debug!("combining for target_resolution: {}", target_resolution);
@ -92,7 +89,7 @@ fn combine_into_higher_order_candles(
target_resolution, target_resolution,
); );
let now = Utc::now().duration_trunc(Duration::minutes(1)).unwrap(); let now = Utc::now().duration_trunc(Duration::minutes(1)).unwrap();
let candle_window = now - st; let candle_window = min(now - st, day());
let num_candles = max( let num_candles = max(
1, 1,
(candle_window.num_minutes() / duration.num_minutes()) as usize + 1, (candle_window.num_minutes() / duration.num_minutes()) as usize + 1,
@ -100,17 +97,16 @@ fn combine_into_higher_order_candles(
let mut combined_candles = vec![empty_candle; num_candles]; let mut combined_candles = vec![empty_candle; num_candles];
let mut con_iter = constituent_candles.iter_mut().peekable(); let mut last_close = constituent_candles[0].close;
let mut con_iter = constituent_candles.iter().peekable();
let mut start_time = st; let mut start_time = st;
let mut end_time = start_time + duration; let mut end_time = start_time + duration;
let mut last_candle = seed_candle;
for i in 0..combined_candles.len() { for i in 0..combined_candles.len() {
combined_candles[i].open = last_candle.close; combined_candles[i].open = last_close;
combined_candles[i].low = last_candle.close; combined_candles[i].low = last_close;
combined_candles[i].close = last_candle.close; combined_candles[i].close = last_close;
combined_candles[i].high = last_candle.close; combined_candles[i].high = last_close;
while matches!(con_iter.peek(), Some(c) if c.end_time <= end_time) { while matches!(con_iter.peek(), Some(c) if c.end_time <= end_time) {
let unit_candle = con_iter.next().unwrap(); let unit_candle = con_iter.next().unwrap();
@ -128,7 +124,7 @@ fn combine_into_higher_order_candles(
start_time = end_time; start_time = end_time;
end_time += duration; end_time += duration;
last_candle = combined_candles[i].clone(); last_close = combined_candles[i].close;
} }
combined_candles combined_candles
@ -149,25 +145,38 @@ fn trim_candles(mut c: Vec<Candle>, start_time: DateTime<Utc>) -> Vec<Candle> {
pub async fn backfill_batch_higher_order_candles( pub async fn backfill_batch_higher_order_candles(
pool: &Pool, pool: &Pool,
market_name: &str, market_name: &str,
resolution: Resolution, ) -> anyhow::Result<()> {
) -> anyhow::Result<Vec<Candle>> { let earliest_candles = fetch_earliest_candles(pool, market_name, Resolution::R1m).await?;
let mut constituent_candles = let mut start_time = earliest_candles[0].start_time.duration_trunc(day())?;
fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution()).await?; while start_time < Utc::now() {
if constituent_candles.is_empty() { let mut candles = vec![];
return Ok(vec![]); let mut constituent_candles = fetch_candles_from(
pool,
market_name,
Resolution::R1m,
start_time,
start_time + day(),
)
.await?;
for resolution in Resolution::iter() {
if resolution == Resolution::R1m {
continue;
}
let mut combined_candles =
combine_into_higher_order_candles(&mut constituent_candles, resolution, start_time);
candles.append(&mut combined_candles);
}
let upsert_statement = build_candles_upsert_statement(&candles);
let client = pool.get().await.unwrap();
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()?;
// println!("{:?} {:?} done", market_name, start_time);
start_time += day();
} }
let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
let seed_candle = constituent_candles[0].clone(); Ok(())
let combined_candles = combine_into_higher_order_candles(
&mut constituent_candles,
resolution,
start_time,
seed_candle,
);
Ok(trim_candles(
combined_candles,
constituent_candles[0].start_time,
))
} }

View File

@ -1,18 +1,26 @@
use std::cmp::min; use std::{cmp::min, collections::HashMap};
use chrono::{DateTime, Duration, DurationRound, Utc}; use chrono::{DateTime, Duration, DurationRound, Utc};
use deadpool_postgres::Pool; use deadpool_postgres::Pool;
use itertools::Itertools;
use log::debug; use log::debug;
use crate::database::backfill::{
fetch_earliest_fill_multiple_markets, fetch_fills_multiple_markets_from,
fetch_last_minute_candles,
};
use crate::{ use crate::{
database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle}, database::{
fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
insert::build_candles_upsert_statement,
},
structs::{ structs::{
candle::Candle, candle::{Candle},
markets::MarketInfo, markets::MarketInfo,
openbook::{calculate_fill_price_and_size, PgOpenBookFill}, openbook::PgOpenBookFill,
resolution::{day, Resolution}, resolution::{day, Resolution},
}, },
utils::{f64_max, f64_min}, utils::{f64_max, f64_min, AnyhowWrap},
}; };
pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Result<Vec<Candle>> { pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Result<Vec<Candle>> {
@ -86,9 +94,7 @@ fn combine_fills_into_1m_candles(
Some(p) => p, Some(p) => p,
None => { None => {
let first = fills_iter.peek().unwrap(); let first = fills_iter.peek().unwrap();
let (price, _) = first.price
calculate_fill_price_and_size(**first, market.base_decimals, market.quote_decimals);
price
} }
}; };
@ -100,15 +106,13 @@ fn combine_fills_into_1m_candles(
while matches!(fills_iter.peek(), Some(f) if f.time < end_time) { while matches!(fills_iter.peek(), Some(f) if f.time < end_time) {
let fill = fills_iter.next().unwrap(); let fill = fills_iter.next().unwrap();
let (price, volume) =
calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals);
candles[i].close = price; candles[i].close = fill.price;
candles[i].low = f64_min(price, candles[i].low); candles[i].low = f64_min(fill.price, candles[i].low);
candles[i].high = f64_max(price, candles[i].high); candles[i].high = f64_max(fill.price, candles[i].high);
candles[i].volume += volume; candles[i].volume += fill.size;
last_price = price; last_price = fill.price;
} }
candles[i].start_time = start_time; candles[i].start_time = start_time;
@ -125,17 +129,19 @@ fn combine_fills_into_1m_candles(
/// Goes from the earliest fill to the most recent. Will mark candles as complete if there are missing gaps of fills between the start and end. /// Goes from the earliest fill to the most recent. Will mark candles as complete if there are missing gaps of fills between the start and end.
pub async fn backfill_batch_1m_candles( pub async fn backfill_batch_1m_candles(
pool: &Pool, pool: &Pool,
market: &MarketInfo, markets: Vec<MarketInfo>,
) -> anyhow::Result<Vec<Candle>> { ) -> anyhow::Result<()> {
let market_name = &market.name; let market_address_strings: Vec<String> = markets.iter().map(|m| m.address.clone()).collect();
let market_address = &market.address; let mut candle_container = HashMap::new();
let mut candles = vec![]; let client = pool.get().await?;
let earliest_fill = fetch_earliest_fill(pool, &market.address).await?; let earliest_fill =
fetch_earliest_fill_multiple_markets(&client, &market_address_strings).await?;
if earliest_fill.is_none() { if earliest_fill.is_none() {
debug!("No fills found for: {:?}", &market_name); println!("No fills found for backfill");
return Ok(candles); return Ok(());
} }
println!("Found earliset fill for backfill");
let mut start_time = earliest_fill let mut start_time = earliest_fill
.unwrap() .unwrap()
@ -146,13 +152,78 @@ pub async fn backfill_batch_1m_candles(
start_time + day(), start_time + day(),
Utc::now().duration_trunc(Duration::minutes(1))?, Utc::now().duration_trunc(Duration::minutes(1))?,
); );
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; let last_candles = fetch_last_minute_candles(&client).await?;
if !fills.is_empty() { let all_fills = fetch_fills_multiple_markets_from(
let mut minute_candles = &client,
&market_address_strings,
start_time,
end_time,
)
.await?;
// println!("{:?} {:?}", start_time, end_time);
// println!("all fills len : {:?}", all_fills.len());
println!("{:?}", all_fills[0]);
// println!("{:?}", all_fills[1]);
// println!("Fetched multiple fills for backfill");
let fills_groups = all_fills
.into_iter()
.sorted_by(|a, b| Ord::cmp(&a.market_key, &b.market_key))
.group_by(|f| f.market_key.clone());
let fills_by_market: Vec<(String, Vec<PgOpenBookFill>)> = fills_groups
.into_iter()
.map(|(m, group)| (m, group.collect()))
.collect();
println!("fbm len : {:?}", fills_by_market.len());
// sort fills by market, make candles
for (_, mut fills) in fills_by_market {
let market = markets
.iter()
.find(|m| m.address == fills[0].market_key)
.unwrap();
let minute_candles =
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None); combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
candles.append(&mut minute_candles); candle_container.insert(&market.address, minute_candles);
} }
start_time += day()
// where no candles, make empty ones
for (k, v) in candle_container.iter_mut() {
if v.is_empty() {
let market = markets.iter().find(|m| &m.address == *k).unwrap();
let last_candle = last_candles
.iter()
.find(|c| c.market_name == market.name)
.unwrap();
let empty_candles = combine_fills_into_1m_candles(
&mut vec![],
market,
start_time,
end_time,
Some(last_candle.close),
);
*v = empty_candles;
}
}
// insert candles in batches
for candles in candle_container.values() {
let candle_chunks: Vec<Vec<Candle>> =
candles.chunks(1500).map(|chunk| chunk.to_vec()).collect(); // 1440 minutes in a day
for c in candle_chunks {
let upsert_statement = build_candles_upsert_statement(&c);
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()?;
}
}
// reset entries but keep markets we've seen for blank candles
for (_, v) in candle_container.iter_mut() {
*v = vec![];
}
println!("day done");
start_time += day();
} }
Ok(candles) Ok(())
} }

View File

@ -21,7 +21,6 @@ use super::metrics::METRIC_CANDLES_TOTAL;
pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> { pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
loop { loop {
let market_clone = market.clone(); let market_clone = market.clone();
loop { loop {
sleep(Duration::milliseconds(5000).to_std()?).await; sleep(Duration::milliseconds(5000).to_std()?).await;
match batch_inner(pool, &market_clone).await { match batch_inner(pool, &market_clone).await {
@ -43,6 +42,9 @@ pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> { async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
let market_name = &market.name.clone(); let market_name = &market.name.clone();
let candles = batch_1m_candles(pool, market).await?; let candles = batch_1m_candles(pool, market).await?;
if candles.is_empty() {
return Ok(());
}
METRIC_CANDLES_TOTAL METRIC_CANDLES_TOTAL
.with_label_values(&[market.name.as_str()]) .with_label_values(&[market.name.as_str()])
.inc_by(candles.clone().len() as u64); .inc_by(candles.clone().len() as u64);
@ -56,7 +58,7 @@ async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
.with_label_values(&[market.name.as_str()]) .with_label_values(&[market.name.as_str()])
.inc_by(candles.clone().len() as u64); .inc_by(candles.clone().len() as u64);
save_candles(pool, candles).await?; save_candles(pool, candles).await?;
} }
Ok(()) Ok(())
} }

View File

@ -1,11 +1,9 @@
use log::{error, info}; use log::{error, info};
use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
use openbook_candles::structs::transaction::NUM_TRANSACTION_PARTITIONS;
use openbook_candles::utils::Config; use openbook_candles::utils::Config;
use openbook_candles::worker::metrics::{ use openbook_candles::worker::metrics::{
serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE, serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE,
}; };
use openbook_candles::worker::trade_fetching::scrape::{scrape_fills, scrape_signatures};
use openbook_candles::{ use openbook_candles::{
database::initialize::{connect_to_database, setup_database}, database::initialize::{connect_to_database, setup_database},
worker::candle_batching::batch_for_market, worker::candle_batching::batch_for_market,
@ -40,25 +38,6 @@ async fn main() -> anyhow::Result<()> {
setup_database(&pool).await?; setup_database(&pool).await?;
let mut handles = vec![]; let mut handles = vec![];
// signature scraping
let rpc_clone = rpc_url.clone();
let pool_clone = pool.clone();
handles.push(tokio::spawn(async move {
scrape_signatures(rpc_clone, &pool_clone).await.unwrap();
}));
// transaction/fill scraping
for id in 0..NUM_TRANSACTION_PARTITIONS {
let rpc_clone = rpc_url.clone();
let pool_clone = pool.clone();
let markets_clone = target_markets.clone();
handles.push(tokio::spawn(async move {
scrape_fills(id as i32, rpc_clone, &pool_clone, &markets_clone)
.await
.unwrap();
}));
}
// candle batching // candle batching
for market in market_infos.into_iter() { for market in market_infos.into_iter() {
let batch_pool = pool.clone(); let batch_pool = pool.clone();

View File

@ -1,3 +1,2 @@
pub mod candle_batching; pub mod candle_batching;
pub mod metrics; pub mod metrics;
pub mod trade_fetching;

View File

@ -1,2 +0,0 @@
pub mod parsing;
pub mod scrape;

View File

@ -1,94 +0,0 @@
use log::warn;
use solana_client::client_error::Result as ClientResult;
use solana_sdk::pubkey::Pubkey;
use solana_transaction_status::{
option_serializer::OptionSerializer, EncodedConfirmedTransactionWithStatusMeta,
};
use std::{collections::HashMap, io::Error};
use crate::{
structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw},
worker::metrics::METRIC_RPC_ERRORS_TOTAL,
};
const PROGRAM_DATA: &str = "Program data: ";
pub fn parse_trades_from_openbook_txns(
txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>,
mut sig_strings: Vec<String>,
target_markets: &HashMap<Pubkey, String>,
) -> (Vec<OpenBookFillEvent>, Vec<String>) {
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
let mut failed_sigs = vec![];
for (idx, txn) in txns.iter_mut().enumerate() {
match txn {
Ok(t) => {
if let Some(m) = &t.transaction.meta {
match &m.log_messages {
OptionSerializer::Some(logs) => {
match parse_openbook_fills_from_logs(
logs,
target_markets,
sig_strings[idx].clone(),
t.block_time.unwrap(),
) {
Some(mut events) => fills_vector.append(&mut events),
None => {}
}
}
OptionSerializer::None => {}
OptionSerializer::Skip => {}
}
}
}
Err(e) => {
warn!("rpc error in get_transaction {}", e);
failed_sigs.push(sig_strings[idx].clone());
METRIC_RPC_ERRORS_TOTAL
.with_label_values(&["getTransaction"])
.inc();
}
}
}
sig_strings.retain(|s| !failed_sigs.contains(s));
(fills_vector, sig_strings)
}
fn parse_openbook_fills_from_logs(
logs: &Vec<String>,
target_markets: &HashMap<Pubkey, String>,
signature: String,
block_time: i64,
) -> Option<Vec<OpenBookFillEvent>> {
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
for (idx, l) in logs.iter().enumerate() {
match l.strip_prefix(PROGRAM_DATA) {
Some(log) => {
let borsh_bytes = match anchor_lang::__private::base64::decode(log) {
Ok(borsh_bytes) => borsh_bytes,
_ => continue,
};
let mut slice: &[u8] = &borsh_bytes[8..];
let event: Result<OpenBookFillEventRaw, Error> =
anchor_lang::AnchorDeserialize::deserialize(&mut slice);
match event {
Ok(e) => {
let fill_event = e.into_event(signature.clone(), block_time, idx);
if target_markets.contains_key(&fill_event.market) {
fills_vector.push(fill_event);
}
}
_ => continue,
}
}
_ => (),
}
}
if !fills_vector.is_empty() {
Some(fills_vector)
} else {
None
}
}

View File

@ -1,119 +0,0 @@
use deadpool_postgres::Pool;
use futures::future::join_all;
use log::{debug, warn};
use solana_client::{
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
rpc_config::RpcTransactionConfig,
};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature};
use solana_transaction_status::UiTransactionEncoding;
use std::{collections::HashMap, time::Duration as WaitDuration};
use crate::{
database::{
fetch::fetch_worker_transactions,
insert::{build_transactions_insert_statement, insert_fills_atomically},
},
structs::transaction::PgTransaction,
utils::{AnyhowWrap, OPENBOOK_KEY},
worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL, METRIC_TRANSACTIONS_TOTAL},
};
use super::parsing::parse_trades_from_openbook_txns;
pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<()> {
let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
loop {
let rpc_config = GetConfirmedSignaturesForAddress2Config {
before: None,
until: None,
limit: None,
commitment: Some(CommitmentConfig::confirmed()),
};
let sigs = match rpc_client
.get_signatures_for_address_with_config(&OPENBOOK_KEY, rpc_config)
.await
{
Ok(sigs) => sigs,
Err(e) => {
warn!("rpc error in get_signatures_for_address_with_config: {}", e);
METRIC_RPC_ERRORS_TOTAL
.with_label_values(&["getSignaturesForAddress"])
.inc();
continue;
}
};
if sigs.is_empty() {
debug!("No signatures found, trying again");
continue;
}
let transactions: Vec<PgTransaction> = sigs
.into_iter()
.map(PgTransaction::from_rpc_confirmed_transaction)
.collect();
debug!("Scraper writing: {:?} txns to DB\n", transactions.len());
let upsert_statement = build_transactions_insert_statement(transactions);
let client = pool.get().await?;
let num_txns = client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()?;
METRIC_TRANSACTIONS_TOTAL.inc_by(num_txns);
}
// TODO: graceful shutdown
}
pub async fn scrape_fills(
worker_id: i32,
rpc_url: String,
pool: &Pool,
target_markets: &HashMap<Pubkey, String>,
) -> anyhow::Result<()> {
debug!("Worker {} started \n", worker_id);
let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed());
loop {
let transactions = fetch_worker_transactions(worker_id, pool).await?;
if transactions.is_empty() {
debug!("No signatures found by worker {}", worker_id);
tokio::time::sleep(WaitDuration::from_secs(1)).await;
continue;
};
// for each signature, fetch the transaction
let txn_config = RpcTransactionConfig {
encoding: Some(UiTransactionEncoding::Json),
commitment: Some(CommitmentConfig::confirmed()),
max_supported_transaction_version: Some(0),
};
let sig_strings = transactions
.iter()
.map(|t| t.signature.clone())
.collect::<Vec<String>>();
let signatures: Vec<_> = transactions
.into_iter()
.map(|t| t.signature.parse::<Signature>().unwrap())
.collect();
let txn_futs: Vec<_> = signatures
.iter()
.map(|s| rpc_client.get_transaction_with_config(s, txn_config))
.collect();
let mut txns = join_all(txn_futs).await;
let (fills, completed_sigs) =
parse_trades_from_openbook_txns(&mut txns, sig_strings, target_markets);
for fill in fills.iter() {
let market_name = target_markets.get(&fill.market).unwrap();
METRIC_FILLS_TOTAL.with_label_values(&[market_name]).inc();
}
// Write fills to the database, and update properly fetched transactions as processed
insert_fills_atomically(pool, worker_id, fills, completed_sigs).await?;
}
}