Compare commits
5 Commits
cbb89a3219
...
a0b32621a1
Author | SHA1 | Date |
---|---|---|
Lou-Kamades | a0b32621a1 | |
Lou-Kamades | 7bba73b7f3 | |
Lou-Kamades | 34dc341059 | |
Lou-Kamades | 89915bf249 | |
Lou-Kamades | f74cec5d13 |
|
@ -2615,6 +2615,15 @@ dependencies = [
|
||||||
"either",
|
"either",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "itertools"
|
||||||
|
version = "0.11.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57"
|
||||||
|
dependencies = [
|
||||||
|
"either",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "itoa"
|
name = "itoa"
|
||||||
version = "1.0.5"
|
version = "1.0.5"
|
||||||
|
@ -3450,6 +3459,7 @@ dependencies = [
|
||||||
"dotenv",
|
"dotenv",
|
||||||
"env_logger 0.10.0",
|
"env_logger 0.10.0",
|
||||||
"futures 0.3.27",
|
"futures 0.3.27",
|
||||||
|
"itertools 0.11.0",
|
||||||
"jsonrpc-core-client",
|
"jsonrpc-core-client",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"log 0.4.17",
|
"log 0.4.17",
|
||||||
|
@ -4014,7 +4024,7 @@ checksum = "a3f8ad728fb08fe212df3c05169e940fbb6d9d16a877ddde14644a983ba2012e"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes 1.3.0",
|
"bytes 1.3.0",
|
||||||
"heck 0.4.0",
|
"heck 0.4.0",
|
||||||
"itertools",
|
"itertools 0.10.5",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"log 0.4.17",
|
"log 0.4.17",
|
||||||
"multimap",
|
"multimap",
|
||||||
|
@ -4035,7 +4045,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "8bda8c0881ea9f722eb9629376db3d0b903b462477c1aafcb0566610ac28ac5d"
|
checksum = "8bda8c0881ea9f722eb9629376db3d0b903b462477c1aafcb0566610ac28ac5d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"itertools",
|
"itertools 0.10.5",
|
||||||
"proc-macro2 1.0.56",
|
"proc-macro2 1.0.56",
|
||||||
"quote 1.0.26",
|
"quote 1.0.26",
|
||||||
"syn 1.0.109",
|
"syn 1.0.109",
|
||||||
|
@ -4877,7 +4887,7 @@ dependencies = [
|
||||||
"default-env",
|
"default-env",
|
||||||
"enumflags2",
|
"enumflags2",
|
||||||
"field-offset",
|
"field-offset",
|
||||||
"itertools",
|
"itertools 0.10.5",
|
||||||
"num-traits",
|
"num-traits",
|
||||||
"num_enum 0.5.7",
|
"num_enum 0.5.7",
|
||||||
"safe-transmute",
|
"safe-transmute",
|
||||||
|
@ -5252,7 +5262,7 @@ dependencies = [
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"indexmap",
|
"indexmap",
|
||||||
"indicatif",
|
"indicatif",
|
||||||
"itertools",
|
"itertools 0.10.5",
|
||||||
"jsonrpc-core",
|
"jsonrpc-core",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"log 0.4.17",
|
"log 0.4.17",
|
||||||
|
@ -5416,7 +5426,7 @@ dependencies = [
|
||||||
"crossbeam-channel",
|
"crossbeam-channel",
|
||||||
"flate2",
|
"flate2",
|
||||||
"indexmap",
|
"indexmap",
|
||||||
"itertools",
|
"itertools 0.10.5",
|
||||||
"log 0.4.17",
|
"log 0.4.17",
|
||||||
"lru",
|
"lru",
|
||||||
"matches",
|
"matches",
|
||||||
|
@ -5465,7 +5475,7 @@ dependencies = [
|
||||||
"dashmap",
|
"dashmap",
|
||||||
"fs_extra",
|
"fs_extra",
|
||||||
"futures 0.3.27",
|
"futures 0.3.27",
|
||||||
"itertools",
|
"itertools 0.10.5",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"libc",
|
"libc",
|
||||||
"log 0.4.17",
|
"log 0.4.17",
|
||||||
|
@ -5643,7 +5653,7 @@ dependencies = [
|
||||||
"console_log",
|
"console_log",
|
||||||
"curve25519-dalek",
|
"curve25519-dalek",
|
||||||
"getrandom 0.2.8",
|
"getrandom 0.2.8",
|
||||||
"itertools",
|
"itertools 0.10.5",
|
||||||
"js-sys",
|
"js-sys",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"libc",
|
"libc",
|
||||||
|
@ -5682,7 +5692,7 @@ dependencies = [
|
||||||
"bincode",
|
"bincode",
|
||||||
"eager",
|
"eager",
|
||||||
"enum-iterator",
|
"enum-iterator",
|
||||||
"itertools",
|
"itertools 0.10.5",
|
||||||
"libc",
|
"libc",
|
||||||
"libloading",
|
"libloading",
|
||||||
"log 0.4.17",
|
"log 0.4.17",
|
||||||
|
@ -5739,7 +5749,7 @@ dependencies = [
|
||||||
"bs58 0.4.0",
|
"bs58 0.4.0",
|
||||||
"crossbeam-channel",
|
"crossbeam-channel",
|
||||||
"dashmap",
|
"dashmap",
|
||||||
"itertools",
|
"itertools 0.10.5",
|
||||||
"jsonrpc-core",
|
"jsonrpc-core",
|
||||||
"jsonrpc-core-client",
|
"jsonrpc-core-client",
|
||||||
"jsonrpc-derive",
|
"jsonrpc-derive",
|
||||||
|
@ -5801,7 +5811,7 @@ dependencies = [
|
||||||
"fnv",
|
"fnv",
|
||||||
"im",
|
"im",
|
||||||
"index_list",
|
"index_list",
|
||||||
"itertools",
|
"itertools 0.10.5",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"log 0.4.17",
|
"log 0.4.17",
|
||||||
"lru",
|
"lru",
|
||||||
|
@ -5863,7 +5873,7 @@ dependencies = [
|
||||||
"ed25519-dalek-bip32",
|
"ed25519-dalek-bip32",
|
||||||
"generic-array",
|
"generic-array",
|
||||||
"hmac 0.12.1",
|
"hmac 0.12.1",
|
||||||
"itertools",
|
"itertools 0.10.5",
|
||||||
"js-sys",
|
"js-sys",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"libsecp256k1",
|
"libsecp256k1",
|
||||||
|
@ -6011,7 +6021,7 @@ dependencies = [
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"histogram",
|
"histogram",
|
||||||
"indexmap",
|
"indexmap",
|
||||||
"itertools",
|
"itertools 0.10.5",
|
||||||
"libc",
|
"libc",
|
||||||
"log 0.4.17",
|
"log 0.4.17",
|
||||||
"nix",
|
"nix",
|
||||||
|
@ -6143,7 +6153,7 @@ dependencies = [
|
||||||
"cipher 0.4.3",
|
"cipher 0.4.3",
|
||||||
"curve25519-dalek",
|
"curve25519-dalek",
|
||||||
"getrandom 0.1.16",
|
"getrandom 0.1.16",
|
||||||
"itertools",
|
"itertools 0.10.5",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"merlin",
|
"merlin",
|
||||||
"num-derive",
|
"num-derive",
|
||||||
|
|
|
@ -15,10 +15,6 @@ path = "src/worker/main.rs"
|
||||||
name = "server"
|
name = "server"
|
||||||
path = "src/server/main.rs"
|
path = "src/server/main.rs"
|
||||||
|
|
||||||
[[bin]]
|
|
||||||
name = "backfill-trades"
|
|
||||||
path = "src/backfill-trades/main.rs"
|
|
||||||
|
|
||||||
[[bin]]
|
[[bin]]
|
||||||
name = "backfill-candles"
|
name = "backfill-candles"
|
||||||
path = "src/backfill-candles/main.rs"
|
path = "src/backfill-candles/main.rs"
|
||||||
|
@ -71,3 +67,4 @@ num_enum = "0.6.1"
|
||||||
config = "0.13.1"
|
config = "0.13.1"
|
||||||
prometheus = "0.13.3"
|
prometheus = "0.13.3"
|
||||||
lazy_static = "1.4.0"
|
lazy_static = "1.4.0"
|
||||||
|
itertools = "0.11.0"
|
||||||
|
|
|
@ -296,7 +296,7 @@ Returns 24-hour pricing and volume information on each market available.
|
||||||
|
|
||||||
**Request:**
|
**Request:**
|
||||||
|
|
||||||
`GET /api/coingecko/orderbook/?ticker_id={ticker_id}&depth={depth}`
|
`GET /api/coingecko/orderbook?ticker_id={ticker_id}&depth={depth}`
|
||||||
|
|
||||||
|
|
||||||
Returns order book information with a specified depth for a given market.
|
Returns order book information with a specified depth for a given market.
|
||||||
|
|
|
@ -1,20 +1,17 @@
|
||||||
use deadpool_postgres::Object;
|
|
||||||
|
|
||||||
use openbook_candles::{
|
use openbook_candles::{
|
||||||
database::{initialize::connect_to_database, insert::build_candles_upsert_statement},
|
database::{initialize::connect_to_database},
|
||||||
structs::{
|
structs::{
|
||||||
candle::Candle,
|
|
||||||
markets::{fetch_market_infos, load_markets},
|
markets::{fetch_market_infos, load_markets},
|
||||||
resolution::Resolution,
|
|
||||||
},
|
},
|
||||||
utils::{AnyhowWrap, Config},
|
utils::{Config},
|
||||||
worker::candle_batching::{
|
worker::candle_batching::{
|
||||||
higher_order_candles::backfill_batch_higher_order_candles,
|
higher_order_candles::backfill_batch_higher_order_candles, minute_candles::backfill_batch_1m_candles,
|
||||||
minute_candles::backfill_batch_1m_candles,
|
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use std::env;
|
use std::env;
|
||||||
use strum::IntoEnumIterator;
|
|
||||||
|
|
||||||
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
@ -33,31 +30,19 @@ async fn main() -> anyhow::Result<()> {
|
||||||
println!("Backfilling candles for {:?}", markets);
|
println!("Backfilling candles for {:?}", markets);
|
||||||
|
|
||||||
let pool = connect_to_database().await?;
|
let pool = connect_to_database().await?;
|
||||||
for market in market_infos.into_iter() {
|
backfill_batch_1m_candles(&pool, market_infos.clone()).await?;
|
||||||
let client = pool.get().await?;
|
|
||||||
let minute_candles = backfill_batch_1m_candles(&pool, &market).await?;
|
|
||||||
save_candles(minute_candles, client).await?;
|
|
||||||
|
|
||||||
for resolution in Resolution::iter() {
|
let mut handles = vec![];
|
||||||
if resolution == Resolution::R1m {
|
let mi = market_infos.clone();
|
||||||
continue;
|
for market in mi.into_iter() {
|
||||||
}
|
let pc = pool.clone();
|
||||||
let higher_order_candles =
|
handles.push(tokio::spawn(async move {
|
||||||
backfill_batch_higher_order_candles(&pool, &market.name, resolution).await?;
|
backfill_batch_higher_order_candles(&pc, &market.name)
|
||||||
let client = pool.get().await?;
|
|
||||||
save_candles(higher_order_candles, client).await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn save_candles(candles: Vec<Candle>, client: Object) -> anyhow::Result<()> {
|
|
||||||
if !candles.is_empty() {
|
|
||||||
let upsert_statement = build_candles_upsert_statement(&candles);
|
|
||||||
client
|
|
||||||
.execute(&upsert_statement, &[])
|
|
||||||
.await
|
.await
|
||||||
.map_err_anyhow()?;
|
.unwrap();
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
futures::future::join_all(handles).await;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,141 +0,0 @@
|
||||||
use anchor_lang::prelude::Pubkey;
|
|
||||||
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
|
|
||||||
use deadpool_postgres::Pool;
|
|
||||||
use log::debug;
|
|
||||||
use openbook_candles::{
|
|
||||||
database::{
|
|
||||||
initialize::{connect_to_database, setup_database},
|
|
||||||
insert::build_transactions_insert_statement,
|
|
||||||
},
|
|
||||||
structs::{
|
|
||||||
markets::{fetch_market_infos, load_markets},
|
|
||||||
transaction::{PgTransaction, NUM_TRANSACTION_PARTITIONS},
|
|
||||||
},
|
|
||||||
utils::{AnyhowWrap, Config, OPENBOOK_KEY},
|
|
||||||
worker::trade_fetching::scrape::scrape_fills,
|
|
||||||
};
|
|
||||||
use solana_client::{
|
|
||||||
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
|
|
||||||
};
|
|
||||||
use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature};
|
|
||||||
use std::{collections::HashMap, env, str::FromStr};
|
|
||||||
|
|
||||||
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
|
||||||
async fn main() -> anyhow::Result<()> {
|
|
||||||
dotenv::dotenv().ok();
|
|
||||||
let args: Vec<String> = env::args().collect();
|
|
||||||
assert!(args.len() == 2);
|
|
||||||
|
|
||||||
let path_to_markets_json = &args[1];
|
|
||||||
// let num_days = args[2].parse::<i64>().unwrap(); // TODO: implement
|
|
||||||
let num_days = 1;
|
|
||||||
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
|
|
||||||
|
|
||||||
let config = Config {
|
|
||||||
rpc_url: rpc_url.clone(),
|
|
||||||
};
|
|
||||||
let markets = load_markets(path_to_markets_json);
|
|
||||||
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
|
|
||||||
let mut target_markets = HashMap::new();
|
|
||||||
for m in market_infos.clone() {
|
|
||||||
target_markets.insert(Pubkey::from_str(&m.address)?, m.name);
|
|
||||||
}
|
|
||||||
println!("{:?}", target_markets);
|
|
||||||
|
|
||||||
let pool = connect_to_database().await?;
|
|
||||||
setup_database(&pool).await?;
|
|
||||||
|
|
||||||
let mut handles = vec![];
|
|
||||||
|
|
||||||
let rpc_clone = rpc_url.clone();
|
|
||||||
let pool_clone = pool.clone();
|
|
||||||
handles.push(tokio::spawn(async move {
|
|
||||||
fetch_signatures(rpc_clone, &pool_clone, num_days)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
}));
|
|
||||||
|
|
||||||
// Low priority improvement: batch fills into 1000's per worker
|
|
||||||
for id in 0..NUM_TRANSACTION_PARTITIONS {
|
|
||||||
let rpc_clone = rpc_url.clone();
|
|
||||||
let pool_clone = pool.clone();
|
|
||||||
let markets_clone = target_markets.clone();
|
|
||||||
handles.push(tokio::spawn(async move {
|
|
||||||
scrape_fills(id as i32, rpc_clone, &pool_clone, &markets_clone)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: spawn status thread
|
|
||||||
|
|
||||||
futures::future::join_all(handles).await;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> anyhow::Result<()> {
|
|
||||||
let mut before_sig: Option<Signature> = None;
|
|
||||||
let mut now_time = Utc::now().timestamp();
|
|
||||||
let end_time = (Utc::now() - Duration::days(num_days)).timestamp();
|
|
||||||
let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
|
|
||||||
|
|
||||||
while now_time > end_time {
|
|
||||||
let rpc_config = GetConfirmedSignaturesForAddress2Config {
|
|
||||||
before: before_sig,
|
|
||||||
until: None,
|
|
||||||
limit: None,
|
|
||||||
commitment: Some(CommitmentConfig::confirmed()),
|
|
||||||
};
|
|
||||||
|
|
||||||
let sigs = match rpc_client
|
|
||||||
.get_signatures_for_address_with_config(&OPENBOOK_KEY, rpc_config)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(sigs) => sigs,
|
|
||||||
Err(e) => {
|
|
||||||
println!("Error fetching signatures: {}", e);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if sigs.is_empty() {
|
|
||||||
println!("No signatures found, trying again");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let last = sigs.last().unwrap();
|
|
||||||
let last_time = last.block_time.unwrap();
|
|
||||||
let last_signature = last.signature.clone();
|
|
||||||
let transactions = sigs
|
|
||||||
.into_iter()
|
|
||||||
.map(PgTransaction::from_rpc_confirmed_transaction)
|
|
||||||
.collect::<Vec<PgTransaction>>();
|
|
||||||
|
|
||||||
if transactions.is_empty() {
|
|
||||||
println!("No transactions found, trying again");
|
|
||||||
}
|
|
||||||
debug!("writing: {:?} txns to DB\n", transactions.len());
|
|
||||||
let upsert_statement = build_transactions_insert_statement(transactions);
|
|
||||||
let client = pool.get().await?;
|
|
||||||
client
|
|
||||||
.execute(&upsert_statement, &[])
|
|
||||||
.await
|
|
||||||
.map_err_anyhow()?;
|
|
||||||
|
|
||||||
now_time = last_time;
|
|
||||||
before_sig = Some(Signature::from_str(&last_signature)?);
|
|
||||||
let time_left = backfill_time_left(now_time, end_time);
|
|
||||||
println!(
|
|
||||||
"{} minutes ~ {} days remaining in the backfill\n",
|
|
||||||
time_left.num_minutes(),
|
|
||||||
time_left.num_days()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration {
|
|
||||||
let naive_cur = NaiveDateTime::from_timestamp_millis(current_time * 1000).unwrap();
|
|
||||||
let naive_bf = NaiveDateTime::from_timestamp_millis(backfill_end * 1000).unwrap();
|
|
||||||
let cur_date = DateTime::<Utc>::from_utc(naive_cur, Utc);
|
|
||||||
let bf_date = DateTime::<Utc>::from_utc(naive_bf, Utc);
|
|
||||||
cur_date - bf_date
|
|
||||||
}
|
|
|
@ -0,0 +1,82 @@
|
||||||
|
use crate::structs::{candle::Candle, openbook::PgOpenBookFill};
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use deadpool_postgres::{GenericClient, Object};
|
||||||
|
|
||||||
|
pub async fn fetch_earliest_fill_multiple_markets(
|
||||||
|
conn_object: &Object,
|
||||||
|
market_address_strings: &Vec<String>,
|
||||||
|
) -> anyhow::Result<Option<PgOpenBookFill>> {
|
||||||
|
let stmt = r#"SELECT
|
||||||
|
block_datetime as "time",
|
||||||
|
market as "market_key",
|
||||||
|
bid as "bid",
|
||||||
|
maker as "maker",
|
||||||
|
price as "price",
|
||||||
|
size as "size"
|
||||||
|
from openbook.openbook_fill_events
|
||||||
|
where market = ANY($1)
|
||||||
|
and maker = true
|
||||||
|
ORDER BY time asc LIMIT 1"#;
|
||||||
|
|
||||||
|
let row = conn_object
|
||||||
|
.query_opt(stmt, &[&market_address_strings])
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
match row {
|
||||||
|
Some(r) => Ok(Some(PgOpenBookFill::from_row(r))),
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn fetch_fills_multiple_markets_from(
|
||||||
|
conn_object: &Object,
|
||||||
|
market_address_strings: &Vec<String>,
|
||||||
|
start_time: DateTime<Utc>,
|
||||||
|
end_time: DateTime<Utc>,
|
||||||
|
) -> anyhow::Result<Vec<PgOpenBookFill>> {
|
||||||
|
let stmt = r#"SELECT
|
||||||
|
block_datetime as "time",
|
||||||
|
market as "market_key",
|
||||||
|
bid as "bid",
|
||||||
|
maker as "maker",
|
||||||
|
price as "price",
|
||||||
|
size as "size"
|
||||||
|
from openbook.openbook_fill_events
|
||||||
|
where market = ANY($1)
|
||||||
|
and block_datetime >= $2::timestamptz
|
||||||
|
and block_datetime < $3::timestamptz
|
||||||
|
and maker = true
|
||||||
|
ORDER BY time asc"#;
|
||||||
|
|
||||||
|
let rows = conn_object
|
||||||
|
.query(stmt, &[&market_address_strings, &start_time, &end_time])
|
||||||
|
.await?;
|
||||||
|
Ok(rows.into_iter().map(PgOpenBookFill::from_row).collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn fetch_last_minute_candles(conn_object: &Object) -> anyhow::Result<Vec<Candle>> {
|
||||||
|
let stmt = r#"SELECT
|
||||||
|
c.market_name as "market_name",
|
||||||
|
c.start_time as "start_time",
|
||||||
|
c.end_time as "end_time",
|
||||||
|
c.resolution as "resolution",
|
||||||
|
c.open as "open",
|
||||||
|
c.close as "close",
|
||||||
|
c.high as "high",
|
||||||
|
c.low as "low",
|
||||||
|
c.volume as "volume",
|
||||||
|
c.complete as "complete"
|
||||||
|
from
|
||||||
|
(
|
||||||
|
select market_name, max(start_time) as max_start_time from openbook.candles
|
||||||
|
where resolution = '1M'
|
||||||
|
group by market_name
|
||||||
|
) mkts
|
||||||
|
left join openbook.candles c
|
||||||
|
on mkts.market_name = c.market_name
|
||||||
|
and mkts.max_start_time = c.start_time
|
||||||
|
where c.resolution ='1M'"#;
|
||||||
|
|
||||||
|
let rows = conn_object.query(stmt, &[]).await?;
|
||||||
|
Ok(rows.into_iter().map(Candle::from_row).collect())
|
||||||
|
}
|
|
@ -4,7 +4,6 @@ use crate::structs::{
|
||||||
openbook::PgOpenBookFill,
|
openbook::PgOpenBookFill,
|
||||||
resolution::Resolution,
|
resolution::Resolution,
|
||||||
trader::PgTrader,
|
trader::PgTrader,
|
||||||
transaction::PgTransaction,
|
|
||||||
};
|
};
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use deadpool_postgres::{GenericClient, Pool};
|
use deadpool_postgres::{GenericClient, Pool};
|
||||||
|
@ -16,13 +15,13 @@ pub async fn fetch_earliest_fill(
|
||||||
let client = pool.get().await?;
|
let client = pool.get().await?;
|
||||||
|
|
||||||
let stmt = r#"SELECT
|
let stmt = r#"SELECT
|
||||||
time as "time!",
|
block_datetime as "time",
|
||||||
bid as "bid!",
|
market as "market_key",
|
||||||
maker as "maker!",
|
bid as "bid",
|
||||||
native_qty_paid as "native_qty_paid!",
|
maker as "maker",
|
||||||
native_qty_received as "native_qty_received!",
|
price as "price",
|
||||||
native_fee_or_rebate as "native_fee_or_rebate!"
|
size as "size"
|
||||||
from fills
|
from openbook.openbook_fill_events
|
||||||
where market = $1
|
where market = $1
|
||||||
and maker = true
|
and maker = true
|
||||||
ORDER BY time asc LIMIT 1"#;
|
ORDER BY time asc LIMIT 1"#;
|
||||||
|
@ -44,16 +43,16 @@ pub async fn fetch_fills_from(
|
||||||
let client = pool.get().await?;
|
let client = pool.get().await?;
|
||||||
|
|
||||||
let stmt = r#"SELECT
|
let stmt = r#"SELECT
|
||||||
time as "time!",
|
block_datetime as "time",
|
||||||
bid as "bid!",
|
market as "market_key",
|
||||||
maker as "maker!",
|
bid as "bid",
|
||||||
native_qty_paid as "native_qty_paid!",
|
maker as "maker",
|
||||||
native_qty_received as "native_qty_received!",
|
price as "price",
|
||||||
native_fee_or_rebate as "native_fee_or_rebate!"
|
size as "size"
|
||||||
from fills
|
from openbook.openbook_fill_events
|
||||||
where market = $1
|
where market = $1
|
||||||
and time >= $2::timestamptz
|
and block_datetime >= $2::timestamptz
|
||||||
and time < $3::timestamptz
|
and block_datetime < $3::timestamptz
|
||||||
and maker = true
|
and maker = true
|
||||||
ORDER BY time asc"#;
|
ORDER BY time asc"#;
|
||||||
|
|
||||||
|
@ -71,17 +70,17 @@ pub async fn fetch_latest_finished_candle(
|
||||||
let client = pool.get().await?;
|
let client = pool.get().await?;
|
||||||
|
|
||||||
let stmt = r#"SELECT
|
let stmt = r#"SELECT
|
||||||
market_name as "market_name!",
|
market_name as "market_name",
|
||||||
start_time as "start_time!",
|
start_time as "start_time",
|
||||||
end_time as "end_time!",
|
end_time as "end_time",
|
||||||
resolution as "resolution!",
|
resolution as "resolution",
|
||||||
open as "open!",
|
open as "open",
|
||||||
close as "close!",
|
close as "close",
|
||||||
high as "high!",
|
high as "high",
|
||||||
low as "low!",
|
low as "low",
|
||||||
volume as "volume!",
|
volume as "volume",
|
||||||
complete as "complete!"
|
complete as "complete"
|
||||||
from candles
|
from openbook.candles
|
||||||
where market_name = $1
|
where market_name = $1
|
||||||
and resolution = $2
|
and resolution = $2
|
||||||
and complete = true
|
and complete = true
|
||||||
|
@ -97,8 +96,8 @@ pub async fn fetch_latest_finished_candle(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetches all of the candles for the given market and resoultion, starting from the earliest.
|
/// Fetches all of the candles for the given market and resolution, starting from the earliest.
|
||||||
/// Note that this function will fetch ALL candles.
|
/// Note that this function will fetch at most 2000 candles.
|
||||||
pub async fn fetch_earliest_candles(
|
pub async fn fetch_earliest_candles(
|
||||||
pool: &Pool,
|
pool: &Pool,
|
||||||
market_name: &str,
|
market_name: &str,
|
||||||
|
@ -107,20 +106,21 @@ pub async fn fetch_earliest_candles(
|
||||||
let client = pool.get().await?;
|
let client = pool.get().await?;
|
||||||
|
|
||||||
let stmt = r#"SELECT
|
let stmt = r#"SELECT
|
||||||
market_name as "market_name!",
|
market_name as "market_name",
|
||||||
start_time as "start_time!",
|
start_time as "start_time",
|
||||||
end_time as "end_time!",
|
end_time as "end_time",
|
||||||
resolution as "resolution!",
|
resolution as "resolution!",
|
||||||
open as "open!",
|
open as "open",
|
||||||
close as "close!",
|
close as "close",
|
||||||
high as "high!",
|
high as "high",
|
||||||
low as "low!",
|
low as "low",
|
||||||
volume as "volume!",
|
volume as "volume",
|
||||||
complete as "complete!"
|
complete as "complete"
|
||||||
from candles
|
from openbook.candles
|
||||||
where market_name = $1
|
where market_name = $1
|
||||||
and resolution = $2
|
and resolution = $2
|
||||||
ORDER BY start_time asc"#;
|
ORDER BY start_time asc
|
||||||
|
LIMIT 2000"#;
|
||||||
|
|
||||||
let rows = client
|
let rows = client
|
||||||
.query(stmt, &[&market_name, &resolution.to_string()])
|
.query(stmt, &[&market_name, &resolution.to_string()])
|
||||||
|
@ -139,17 +139,17 @@ pub async fn fetch_candles_from(
|
||||||
let client = pool.get().await?;
|
let client = pool.get().await?;
|
||||||
|
|
||||||
let stmt = r#"SELECT
|
let stmt = r#"SELECT
|
||||||
market_name as "market_name!",
|
market_name as "market_name",
|
||||||
start_time as "start_time!",
|
start_time as "start_time",
|
||||||
end_time as "end_time!",
|
end_time as "end_time",
|
||||||
resolution as "resolution!",
|
resolution as "resolution",
|
||||||
open as "open!",
|
open as "open",
|
||||||
close as "close!",
|
close as "close",
|
||||||
high as "high!",
|
high as "high",
|
||||||
low as "low!",
|
low as "low",
|
||||||
volume as "volume!",
|
volume as "volume",
|
||||||
complete as "complete!"
|
complete as "complete"
|
||||||
from candles
|
from openbook.candles
|
||||||
where market_name = $1
|
where market_name = $1
|
||||||
and resolution = $2
|
and resolution = $2
|
||||||
and start_time >= $3
|
and start_time >= $3
|
||||||
|
@ -182,20 +182,20 @@ pub async fn fetch_top_traders_by_base_volume_from(
|
||||||
let stmt = r#"SELECT
|
let stmt = r#"SELECT
|
||||||
open_orders_owner,
|
open_orders_owner,
|
||||||
sum(
|
sum(
|
||||||
native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
|
native_quantity_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
|
||||||
) as "raw_ask_size!",
|
) as "raw_ask_size",
|
||||||
sum(
|
sum(
|
||||||
native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
|
native_quantity_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
|
||||||
) as "raw_bid_size!"
|
) as "raw_bid_size"
|
||||||
FROM fills
|
FROM openbook.openbook_fill_events
|
||||||
WHERE market = $1
|
WHERE market = $1
|
||||||
AND time >= $2
|
AND time >= $2
|
||||||
AND time < $3
|
AND time < $3
|
||||||
GROUP BY open_orders_owner
|
GROUP BY open_orders_owner
|
||||||
ORDER BY
|
ORDER BY
|
||||||
sum(native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
|
sum(native_quantity_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
|
||||||
+
|
+
|
||||||
sum(native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
|
sum(native_quantity_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
|
||||||
DESC
|
DESC
|
||||||
LIMIT 10000"#;
|
LIMIT 10000"#;
|
||||||
|
|
||||||
|
@ -217,20 +217,20 @@ pub async fn fetch_top_traders_by_quote_volume_from(
|
||||||
let stmt = r#"SELECT
|
let stmt = r#"SELECT
|
||||||
open_orders_owner,
|
open_orders_owner,
|
||||||
sum(
|
sum(
|
||||||
native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
|
native_quantity_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
|
||||||
) as "raw_ask_size!",
|
) as "raw_ask_size",
|
||||||
sum(
|
sum(
|
||||||
native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
|
native_quantity_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
|
||||||
) as "raw_bid_size!"
|
) as "raw_bid_size"
|
||||||
FROM fills
|
FROM openbook.openbook_fill_events
|
||||||
WHERE market = $1
|
WHERE market = $1
|
||||||
AND time >= $2
|
AND time >= $2
|
||||||
AND time < $3
|
AND time < $3
|
||||||
GROUP BY open_orders_owner
|
GROUP BY open_orders_owner
|
||||||
ORDER BY
|
ORDER BY
|
||||||
sum(native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
|
sum(native_quantity_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
|
||||||
+
|
+
|
||||||
sum(native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
|
sum(native_quantity_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
|
||||||
DESC
|
DESC
|
||||||
LIMIT 10000"#;
|
LIMIT 10000"#;
|
||||||
|
|
||||||
|
@ -249,24 +249,27 @@ pub async fn fetch_coingecko_24h_volume(
|
||||||
|
|
||||||
let stmt = r#"SELECT
|
let stmt = r#"SELECT
|
||||||
t1.market,
|
t1.market,
|
||||||
COALESCE(t2.native_qty_received, 0) as "raw_base_size!",
|
COALESCE(t2.base_size, 0) as "base_size",
|
||||||
COALESCE(t2.native_qty_paid, 0) as "raw_quote_size!"
|
COALESCE(t3.quote_size, 0) as "quote_size"
|
||||||
FROM (
|
FROM (
|
||||||
SELECT distinct on (market) *
|
SELECT unnest($1::text[]) as market
|
||||||
FROM fills f
|
|
||||||
where bid = true
|
|
||||||
and market = any($1)
|
|
||||||
order by market, "time" desc
|
|
||||||
) t1
|
) t1
|
||||||
LEFT JOIN (
|
LEFT JOIN (
|
||||||
select market,
|
select market,
|
||||||
sum(native_qty_received) as "native_qty_received",
|
sum("size") as "base_size"
|
||||||
sum(native_qty_paid) as "native_qty_paid"
|
from openbook.openbook_fill_events
|
||||||
from fills
|
where block_datetime >= current_timestamp - interval '1 day'
|
||||||
where "time" >= current_timestamp - interval '1 day'
|
|
||||||
and bid = true
|
and bid = true
|
||||||
group by market
|
group by market
|
||||||
) t2 ON t1.market = t2.market"#;
|
) t2 ON t1.market = t2.market
|
||||||
|
LEFT JOIN (
|
||||||
|
select market,
|
||||||
|
sum("size" * price) as "quote_size"
|
||||||
|
from openbook.openbook_fill_events
|
||||||
|
where block_datetime >= current_timestamp - interval '1 day'
|
||||||
|
and bid = true
|
||||||
|
group by market
|
||||||
|
) t3 ON t1.market = t3.market"#;
|
||||||
|
|
||||||
let rows = client.query(stmt, &[&market_address_strings]).await?;
|
let rows = client.query(stmt, &[&market_address_strings]).await?;
|
||||||
|
|
||||||
|
@ -291,10 +294,10 @@ pub async fn fetch_coingecko_24h_high_low(
|
||||||
(
|
(
|
||||||
SELECT *
|
SELECT *
|
||||||
from
|
from
|
||||||
candles
|
openbook.candles
|
||||||
where (market_name, start_time, resolution) in (
|
where (market_name, start_time, resolution) in (
|
||||||
select market_name, max(start_time), resolution
|
select market_name, max(start_time), resolution
|
||||||
from candles
|
from openbook.candles
|
||||||
where "resolution" = '1M'
|
where "resolution" = '1M'
|
||||||
and market_name = any($1)
|
and market_name = any($1)
|
||||||
group by market_name, resolution
|
group by market_name, resolution
|
||||||
|
@ -307,7 +310,7 @@ pub async fn fetch_coingecko_24h_high_low(
|
||||||
max(high) as "high",
|
max(high) as "high",
|
||||||
min(low) as "low"
|
min(low) as "low"
|
||||||
from
|
from
|
||||||
candles
|
openbook.candles
|
||||||
where
|
where
|
||||||
"resolution" = '1M'
|
"resolution" = '1M'
|
||||||
and "start_time" >= current_timestamp - interval '1 day'
|
and "start_time" >= current_timestamp - interval '1 day'
|
||||||
|
@ -321,23 +324,3 @@ pub async fn fetch_coingecko_24h_high_low(
|
||||||
.map(PgCoinGecko24HighLow::from_row)
|
.map(PgCoinGecko24HighLow::from_row)
|
||||||
.collect())
|
.collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetches unprocessed, non-error transactions for the specified worker partition.
|
|
||||||
/// Pulls at most 50 transactions at a time.
|
|
||||||
pub async fn fetch_worker_transactions(
|
|
||||||
worker_id: i32,
|
|
||||||
pool: &Pool,
|
|
||||||
) -> anyhow::Result<Vec<PgTransaction>> {
|
|
||||||
let client = pool.get().await?;
|
|
||||||
|
|
||||||
let stmt = r#"SELECT signature, program_pk, block_datetime, slot, err, "processed", worker_partition
|
|
||||||
FROM transactions
|
|
||||||
where worker_partition = $1
|
|
||||||
and err = false
|
|
||||||
and processed = false
|
|
||||||
LIMIT 50"#;
|
|
||||||
|
|
||||||
let rows = client.query(stmt, &[&worker_id]).await?;
|
|
||||||
|
|
||||||
Ok(rows.into_iter().map(PgTransaction::from_row).collect())
|
|
||||||
}
|
|
||||||
|
|
|
@ -21,8 +21,8 @@ pub async fn connect_to_database() -> anyhow::Result<Pool> {
|
||||||
|
|
||||||
// openssl pkcs12 -export -in client.cer -inkey client-key.cer -out client.pks
|
// openssl pkcs12 -export -in client.cer -inkey client-key.cer -out client.pks
|
||||||
// base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64
|
// base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64
|
||||||
// fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a mango-fills
|
// fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a APP-NAME
|
||||||
// fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills
|
// fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a APP-NAME
|
||||||
let tls = if pg_config.pg_use_ssl {
|
let tls = if pg_config.pg_use_ssl {
|
||||||
pg_config.pg.ssl_mode = Some(SslMode::Require);
|
pg_config.pg.ssl_mode = Some(SslMode::Require);
|
||||||
let ca_cert = fs::read(pg_config.pg_ca_cert_path.expect("reading ca cert from env"))
|
let ca_cert = fs::read(pg_config.pg_ca_cert_path.expect("reading ca cert from env"))
|
||||||
|
@ -66,11 +66,7 @@ pub async fn connect_to_database() -> anyhow::Result<Pool> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn setup_database(pool: &Pool) -> anyhow::Result<()> {
|
pub async fn setup_database(pool: &Pool) -> anyhow::Result<()> {
|
||||||
let candles_table_fut = create_candles_table(pool);
|
match create_candles_table(pool).await {
|
||||||
let transactions_table_fut = create_transactions_table(pool);
|
|
||||||
let fills_table_fut = create_fills_table(pool);
|
|
||||||
let result = tokio::try_join!(candles_table_fut, transactions_table_fut, fills_table_fut);
|
|
||||||
match result {
|
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
println!("Successfully configured database");
|
println!("Successfully configured database");
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -87,7 +83,7 @@ pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> {
|
||||||
|
|
||||||
client
|
client
|
||||||
.execute(
|
.execute(
|
||||||
"CREATE TABLE IF NOT EXISTS candles (
|
"CREATE TABLE IF NOT EXISTS openbook.candles (
|
||||||
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
||||||
market_name text,
|
market_name text,
|
||||||
start_time timestamptz,
|
start_time timestamptz,
|
||||||
|
@ -105,90 +101,9 @@ pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> {
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
client.execute(
|
client.execute(
|
||||||
"CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market_name, start_time, resolution)",
|
"CREATE UNIQUE INDEX IF NOT EXISTS idx_market_time_resolution ON openbook.candles USING btree (market_name, start_time, resolution);",
|
||||||
&[]
|
&[]
|
||||||
).await?;
|
).await?;
|
||||||
|
|
||||||
client.execute(
|
|
||||||
"DO $$
|
|
||||||
BEGIN
|
|
||||||
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'unique_candles') THEN
|
|
||||||
ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market_name, start_time, resolution);
|
|
||||||
END IF;
|
|
||||||
END $$", &[]
|
|
||||||
).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> {
|
|
||||||
let client = pool.get().await?;
|
|
||||||
|
|
||||||
client
|
|
||||||
.execute(
|
|
||||||
"CREATE TABLE IF NOT EXISTS fills (
|
|
||||||
signature text not null,
|
|
||||||
time timestamptz not null,
|
|
||||||
market text not null,
|
|
||||||
open_orders text not null,
|
|
||||||
open_orders_owner text not null,
|
|
||||||
bid bool not null,
|
|
||||||
maker bool not null,
|
|
||||||
native_qty_paid double precision not null,
|
|
||||||
native_qty_received double precision not null,
|
|
||||||
native_fee_or_rebate double precision not null,
|
|
||||||
fee_tier text not null,
|
|
||||||
order_id text not null,
|
|
||||||
log_index int4 not null,
|
|
||||||
CONSTRAINT fills_pk PRIMARY KEY (signature, log_index)
|
|
||||||
)",
|
|
||||||
&[],
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
client
|
|
||||||
.execute(
|
|
||||||
"CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)",
|
|
||||||
&[],
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn create_transactions_table(pool: &Pool) -> anyhow::Result<()> {
|
|
||||||
let client = pool.get().await?;
|
|
||||||
|
|
||||||
client
|
|
||||||
.execute(
|
|
||||||
"CREATE TABLE IF NOT EXISTS transactions (
|
|
||||||
signature text NOT NULL,
|
|
||||||
program_pk text NOT NULL,
|
|
||||||
block_datetime timestamptz NOT NULL,
|
|
||||||
slot bigint NOT NULL,
|
|
||||||
err bool NOT NULL,
|
|
||||||
processed bool NOT NULL,
|
|
||||||
worker_partition int4 NOT NULL,
|
|
||||||
CONSTRAINT transactions_pk PRIMARY KEY (signature, worker_partition)
|
|
||||||
) PARTITION BY LIST (worker_partition);",
|
|
||||||
&[],
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
client.batch_execute(
|
|
||||||
"CREATE INDEX IF NOT EXISTS transactions_processed_err_idx ON ONLY transactions (signature) WHERE processed IS NOT TRUE and err IS NOT TRUE;
|
|
||||||
CREATE INDEX IF NOT EXISTS transactions_program_pk_idx ON ONLY transactions USING btree (program_pk, slot DESC);
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS transactions_0 PARTITION OF transactions FOR VALUES IN (0);
|
|
||||||
CREATE TABLE IF NOT EXISTS transactions_1 PARTITION OF transactions FOR VALUES IN (1);
|
|
||||||
CREATE TABLE IF NOT EXISTS transactions_2 PARTITION OF transactions FOR VALUES IN (2);
|
|
||||||
CREATE TABLE IF NOT EXISTS transactions_3 PARTITION OF transactions FOR VALUES IN (3);
|
|
||||||
CREATE TABLE IF NOT EXISTS transactions_4 PARTITION OF transactions FOR VALUES IN (4);
|
|
||||||
CREATE TABLE IF NOT EXISTS transactions_5 PARTITION OF transactions FOR VALUES IN (5);
|
|
||||||
CREATE TABLE IF NOT EXISTS transactions_6 PARTITION OF transactions FOR VALUES IN (6);
|
|
||||||
CREATE TABLE IF NOT EXISTS transactions_7 PARTITION OF transactions FOR VALUES IN (7);
|
|
||||||
CREATE TABLE IF NOT EXISTS transactions_8 PARTITION OF transactions FOR VALUES IN (8);
|
|
||||||
CREATE TABLE IF NOT EXISTS transactions_9 PARTITION OF transactions FOR VALUES IN (9);"
|
|
||||||
).await?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,79 +1,7 @@
|
||||||
use deadpool_postgres::Pool;
|
use crate::structs::candle::Candle;
|
||||||
|
|
||||||
use crate::{
|
|
||||||
structs::{candle::Candle, openbook::OpenBookFillEvent, transaction::PgTransaction},
|
|
||||||
utils::{to_timestampz, AnyhowWrap},
|
|
||||||
};
|
|
||||||
|
|
||||||
pub async fn insert_fills_atomically(
|
|
||||||
pool: &Pool,
|
|
||||||
worker_id: i32,
|
|
||||||
fills: Vec<OpenBookFillEvent>,
|
|
||||||
signatures: Vec<String>,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let mut client = pool.get().await?;
|
|
||||||
|
|
||||||
let db_txn = client.build_transaction().start().await?;
|
|
||||||
|
|
||||||
// 1. Insert fills
|
|
||||||
if !fills.is_empty() {
|
|
||||||
let fills_statement = build_fills_upsert_statement(fills);
|
|
||||||
db_txn
|
|
||||||
.execute(&fills_statement, &[])
|
|
||||||
.await
|
|
||||||
.map_err_anyhow()
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. Update txns table as processed
|
|
||||||
let transactions_statement =
|
|
||||||
build_transactions_processed_update_statement(worker_id, signatures);
|
|
||||||
db_txn
|
|
||||||
.execute(&transactions_statement, &[])
|
|
||||||
.await
|
|
||||||
.map_err_anyhow()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
db_txn.commit().await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn build_fills_upsert_statement(fills: Vec<OpenBookFillEvent>) -> String {
|
|
||||||
let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES");
|
|
||||||
for (idx, fill) in fills.iter().enumerate() {
|
|
||||||
let val_str = format!(
|
|
||||||
"(\'{}\', \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})",
|
|
||||||
fill.signature,
|
|
||||||
to_timestampz(fill.block_time as u64).to_rfc3339(),
|
|
||||||
fill.market,
|
|
||||||
fill.open_orders,
|
|
||||||
fill.open_orders_owner,
|
|
||||||
fill.bid,
|
|
||||||
fill.maker,
|
|
||||||
fill.native_qty_paid,
|
|
||||||
fill.native_qty_received,
|
|
||||||
fill.native_fee_or_rebate,
|
|
||||||
fill.fee_tier,
|
|
||||||
fill.order_id,
|
|
||||||
fill.log_index,
|
|
||||||
);
|
|
||||||
|
|
||||||
if idx == 0 {
|
|
||||||
stmt = format!("{} {}", &stmt, val_str);
|
|
||||||
} else {
|
|
||||||
stmt = format!("{}, {}", &stmt, val_str);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let handle_conflict = "ON CONFLICT DO NOTHING";
|
|
||||||
|
|
||||||
stmt = format!("{} {}", stmt, handle_conflict);
|
|
||||||
stmt
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn build_candles_upsert_statement(candles: &Vec<Candle>) -> String {
|
pub fn build_candles_upsert_statement(candles: &Vec<Candle>) -> String {
|
||||||
let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES");
|
let mut stmt = String::from("INSERT INTO openbook.candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES");
|
||||||
for (idx, candle) in candles.iter().enumerate() {
|
for (idx, candle) in candles.iter().enumerate() {
|
||||||
let val_str = format!(
|
let val_str = format!(
|
||||||
"(\'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {})",
|
"(\'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {})",
|
||||||
|
@ -109,54 +37,3 @@ pub fn build_candles_upsert_statement(candles: &Vec<Candle>) -> String {
|
||||||
stmt = format!("{} {}", stmt, handle_conflict);
|
stmt = format!("{} {}", stmt, handle_conflict);
|
||||||
stmt
|
stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn build_transactions_insert_statement(transactions: Vec<PgTransaction>) -> String {
|
|
||||||
let mut stmt = String::from("INSERT INTO transactions (signature, program_pk, block_datetime, slot, err, processed, worker_partition) VALUES");
|
|
||||||
for (idx, txn) in transactions.iter().enumerate() {
|
|
||||||
let val_str = format!(
|
|
||||||
"(\'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {})",
|
|
||||||
txn.signature,
|
|
||||||
txn.program_pk,
|
|
||||||
txn.block_datetime.to_rfc3339(),
|
|
||||||
txn.slot,
|
|
||||||
txn.err,
|
|
||||||
txn.processed,
|
|
||||||
txn.worker_partition,
|
|
||||||
);
|
|
||||||
|
|
||||||
if idx == 0 {
|
|
||||||
stmt = format!("{} {}", &stmt, val_str);
|
|
||||||
} else {
|
|
||||||
stmt = format!("{}, {}", &stmt, val_str);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let handle_conflict = "ON CONFLICT DO NOTHING";
|
|
||||||
|
|
||||||
stmt = format!("{} {}", stmt, handle_conflict);
|
|
||||||
stmt
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn build_transactions_processed_update_statement(
|
|
||||||
worker_id: i32,
|
|
||||||
processed_signatures: Vec<String>,
|
|
||||||
) -> String {
|
|
||||||
let mut stmt = String::from(
|
|
||||||
"UPDATE transactions
|
|
||||||
SET processed = true
|
|
||||||
WHERE transactions.signature IN (",
|
|
||||||
);
|
|
||||||
for (idx, sig) in processed_signatures.iter().enumerate() {
|
|
||||||
let val_str = if idx == processed_signatures.len() - 1 {
|
|
||||||
format!("\'{}\'", sig,)
|
|
||||||
} else {
|
|
||||||
format!("\'{}\',", sig,)
|
|
||||||
};
|
|
||||||
stmt = format!("{} {}", &stmt, val_str);
|
|
||||||
}
|
|
||||||
|
|
||||||
let worker_stmt = format!(") AND worker_partition = {} ", worker_id);
|
|
||||||
|
|
||||||
stmt = format!("{} {}", stmt, worker_stmt);
|
|
||||||
stmt
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
pub mod backfill;
|
||||||
pub mod fetch;
|
pub mod fetch;
|
||||||
pub mod initialize;
|
pub mod initialize;
|
||||||
pub mod insert;
|
pub mod insert;
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use tokio_postgres::Row;
|
use tokio_postgres::Row;
|
||||||
|
|
||||||
use super::{markets::MarketInfo, openbook::token_factor};
|
use super::markets::MarketInfo;
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
pub struct CoinGeckoOrderBook {
|
pub struct CoinGeckoOrderBook {
|
||||||
|
@ -35,26 +35,24 @@ pub struct CoinGeckoTicker {
|
||||||
|
|
||||||
pub struct PgCoinGecko24HourVolume {
|
pub struct PgCoinGecko24HourVolume {
|
||||||
pub address: String,
|
pub address: String,
|
||||||
pub raw_base_size: f64,
|
pub base_size: f64,
|
||||||
pub raw_quote_size: f64,
|
pub quote_size: f64,
|
||||||
}
|
}
|
||||||
impl PgCoinGecko24HourVolume {
|
impl PgCoinGecko24HourVolume {
|
||||||
pub fn convert_to_readable(&self, markets: &Vec<MarketInfo>) -> CoinGecko24HourVolume {
|
pub fn convert_to_readable(&self, markets: &Vec<MarketInfo>) -> CoinGecko24HourVolume {
|
||||||
let market = markets.iter().find(|m| m.address == self.address).unwrap();
|
let market = markets.iter().find(|m| m.address == self.address).unwrap();
|
||||||
let base_volume = self.raw_base_size / token_factor(market.base_decimals);
|
|
||||||
let target_volume = self.raw_quote_size / token_factor(market.quote_decimals);
|
|
||||||
CoinGecko24HourVolume {
|
CoinGecko24HourVolume {
|
||||||
market_name: market.name.clone(),
|
market_name: market.name.clone(),
|
||||||
base_volume,
|
base_volume: self.base_size,
|
||||||
target_volume,
|
target_volume: self.quote_size,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn from_row(row: Row) -> Self {
|
pub fn from_row(row: Row) -> Self {
|
||||||
PgCoinGecko24HourVolume {
|
PgCoinGecko24HourVolume {
|
||||||
address: row.get(0),
|
address: row.get(0),
|
||||||
raw_base_size: row.get(1),
|
base_size: row.get(1),
|
||||||
raw_quote_size: row.get(2),
|
quote_size: row.get(2),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,4 +6,3 @@ pub mod resolution;
|
||||||
pub mod slab;
|
pub mod slab;
|
||||||
pub mod trader;
|
pub mod trader;
|
||||||
pub mod tradingview;
|
pub mod tradingview;
|
||||||
pub mod transaction;
|
|
||||||
|
|
|
@ -1,93 +1,26 @@
|
||||||
use anchor_lang::{event, AnchorDeserialize, AnchorSerialize};
|
use anchor_lang::AnchorDeserialize;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use num_traits::Pow;
|
use num_traits::Pow;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
|
||||||
use tokio_postgres::Row;
|
use tokio_postgres::Row;
|
||||||
|
|
||||||
#[event]
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
|
||||||
pub struct OpenBookFillEventRaw {
|
|
||||||
pub market: Pubkey,
|
|
||||||
pub open_orders: Pubkey,
|
|
||||||
pub open_orders_owner: Pubkey,
|
|
||||||
pub bid: bool,
|
|
||||||
pub maker: bool,
|
|
||||||
pub native_qty_paid: u64,
|
|
||||||
pub native_qty_received: u64,
|
|
||||||
pub native_fee_or_rebate: u64,
|
|
||||||
pub order_id: u128,
|
|
||||||
pub owner_slot: u8,
|
|
||||||
pub fee_tier: u8,
|
|
||||||
pub client_order_id: Option<u64>,
|
|
||||||
pub referrer_rebate: Option<u64>,
|
|
||||||
}
|
|
||||||
impl OpenBookFillEventRaw {
|
|
||||||
pub fn into_event(
|
|
||||||
self,
|
|
||||||
signature: String,
|
|
||||||
block_time: i64,
|
|
||||||
log_index: usize,
|
|
||||||
) -> OpenBookFillEvent {
|
|
||||||
OpenBookFillEvent {
|
|
||||||
signature,
|
|
||||||
market: self.market,
|
|
||||||
open_orders: self.open_orders,
|
|
||||||
open_orders_owner: self.open_orders_owner,
|
|
||||||
bid: self.bid,
|
|
||||||
maker: self.maker,
|
|
||||||
native_qty_paid: self.native_qty_paid,
|
|
||||||
native_qty_received: self.native_qty_received,
|
|
||||||
native_fee_or_rebate: self.native_fee_or_rebate,
|
|
||||||
order_id: self.order_id,
|
|
||||||
owner_slot: self.owner_slot,
|
|
||||||
fee_tier: self.fee_tier,
|
|
||||||
client_order_id: self.client_order_id,
|
|
||||||
referrer_rebate: self.referrer_rebate,
|
|
||||||
block_time,
|
|
||||||
log_index,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[event]
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
|
||||||
pub struct OpenBookFillEvent {
|
|
||||||
pub signature: String,
|
|
||||||
pub market: Pubkey,
|
|
||||||
pub open_orders: Pubkey,
|
|
||||||
pub open_orders_owner: Pubkey,
|
|
||||||
pub bid: bool,
|
|
||||||
pub maker: bool,
|
|
||||||
pub native_qty_paid: u64,
|
|
||||||
pub native_qty_received: u64,
|
|
||||||
pub native_fee_or_rebate: u64,
|
|
||||||
pub order_id: u128,
|
|
||||||
pub owner_slot: u8,
|
|
||||||
pub fee_tier: u8,
|
|
||||||
pub client_order_id: Option<u64>,
|
|
||||||
pub referrer_rebate: Option<u64>,
|
|
||||||
pub block_time: i64,
|
|
||||||
pub log_index: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, PartialEq)]
|
|
||||||
pub struct PgOpenBookFill {
|
pub struct PgOpenBookFill {
|
||||||
pub time: DateTime<Utc>,
|
pub time: DateTime<Utc>,
|
||||||
|
pub market_key: String,
|
||||||
pub bid: bool,
|
pub bid: bool,
|
||||||
pub maker: bool,
|
pub maker: bool,
|
||||||
pub native_qty_paid: f64,
|
pub price: f64,
|
||||||
pub native_qty_received: f64,
|
pub size: f64,
|
||||||
pub native_fee_or_rebate: f64,
|
|
||||||
}
|
}
|
||||||
impl PgOpenBookFill {
|
impl PgOpenBookFill {
|
||||||
pub fn from_row(row: Row) -> Self {
|
pub fn from_row(row: Row) -> Self {
|
||||||
PgOpenBookFill {
|
PgOpenBookFill {
|
||||||
time: row.get(0),
|
time: row.get(0),
|
||||||
bid: row.get(1),
|
market_key: row.get(1),
|
||||||
maker: row.get(2),
|
bid: row.get(2),
|
||||||
native_qty_paid: row.get(3),
|
maker: row.get(3),
|
||||||
native_qty_received: row.get(4),
|
price: row.get(4),
|
||||||
native_fee_or_rebate: row.get(5),
|
size: row.get(5),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -147,34 +80,6 @@ pub struct MarketState {
|
||||||
pub referrer_rebates_accrued: u64,
|
pub referrer_rebates_accrued: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn calculate_fill_price_and_size(
|
|
||||||
fill: PgOpenBookFill,
|
|
||||||
base_decimals: u8,
|
|
||||||
quote_decimals: u8,
|
|
||||||
) -> (f64, f64) {
|
|
||||||
if fill.bid {
|
|
||||||
let price_before_fees = if fill.maker {
|
|
||||||
fill.native_qty_paid + fill.native_fee_or_rebate
|
|
||||||
} else {
|
|
||||||
fill.native_qty_paid - fill.native_fee_or_rebate
|
|
||||||
};
|
|
||||||
let price = (price_before_fees * token_factor(base_decimals))
|
|
||||||
/ (token_factor(quote_decimals) * fill.native_qty_received);
|
|
||||||
let size = fill.native_qty_received / token_factor(base_decimals);
|
|
||||||
(price, size)
|
|
||||||
} else {
|
|
||||||
let price_before_fees = if fill.maker {
|
|
||||||
fill.native_qty_received - fill.native_fee_or_rebate
|
|
||||||
} else {
|
|
||||||
fill.native_qty_received + fill.native_fee_or_rebate
|
|
||||||
};
|
|
||||||
let price = (price_before_fees * token_factor(base_decimals))
|
|
||||||
/ (token_factor(quote_decimals) * fill.native_qty_paid);
|
|
||||||
let size = fill.native_qty_paid / token_factor(base_decimals);
|
|
||||||
(price, size)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn token_factor(decimals: u8) -> f64 {
|
pub fn token_factor(decimals: u8) -> f64 {
|
||||||
10f64.pow(decimals as f64)
|
10f64.pow(decimals as f64)
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,8 +9,8 @@ use super::openbook::token_factor;
|
||||||
#[derive(Clone, Debug, PartialEq)]
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
pub struct PgTrader {
|
pub struct PgTrader {
|
||||||
pub open_orders_owner: String,
|
pub open_orders_owner: String,
|
||||||
pub raw_ask_size: f64,
|
pub raw_ask_size: i64,
|
||||||
pub raw_bid_size: f64,
|
pub raw_bid_size: i64,
|
||||||
}
|
}
|
||||||
impl PgTrader {
|
impl PgTrader {
|
||||||
pub fn from_row(row: Row) -> Self {
|
pub fn from_row(row: Row) -> Self {
|
||||||
|
@ -52,8 +52,8 @@ pub struct TraderResponse {
|
||||||
|
|
||||||
// Note that the Postgres queries only return volumes in base or quote
|
// Note that the Postgres queries only return volumes in base or quote
|
||||||
pub fn calculate_trader_volume(trader: PgTrader, decimals: u8) -> Trader {
|
pub fn calculate_trader_volume(trader: PgTrader, decimals: u8) -> Trader {
|
||||||
let bid_size = trader.raw_bid_size / token_factor(decimals);
|
let bid_size = (trader.raw_bid_size as f64) / token_factor(decimals);
|
||||||
let ask_size = trader.raw_ask_size / token_factor(decimals);
|
let ask_size = (trader.raw_ask_size as f64) / token_factor(decimals);
|
||||||
|
|
||||||
Trader {
|
Trader {
|
||||||
pubkey: trader.open_orders_owner,
|
pubkey: trader.open_orders_owner,
|
||||||
|
|
|
@ -1,52 +0,0 @@
|
||||||
use chrono::{DateTime, Utc};
|
|
||||||
use solana_client::rpc_response::RpcConfirmedTransactionStatusWithSignature;
|
|
||||||
use tokio_postgres::Row;
|
|
||||||
|
|
||||||
use crate::utils::{to_timestampz, OPENBOOK_KEY};
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq)]
|
|
||||||
pub struct PgTransaction {
|
|
||||||
pub signature: String,
|
|
||||||
pub program_pk: String,
|
|
||||||
pub block_datetime: DateTime<Utc>,
|
|
||||||
pub slot: u64,
|
|
||||||
pub err: bool,
|
|
||||||
pub processed: bool,
|
|
||||||
pub worker_partition: i32,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub const NUM_TRANSACTION_PARTITIONS: u64 = 10;
|
|
||||||
|
|
||||||
impl PgTransaction {
|
|
||||||
pub fn from_rpc_confirmed_transaction(
|
|
||||||
rpc_confirmed_transaction: RpcConfirmedTransactionStatusWithSignature,
|
|
||||||
) -> Self {
|
|
||||||
PgTransaction {
|
|
||||||
signature: rpc_confirmed_transaction.signature,
|
|
||||||
program_pk: OPENBOOK_KEY.to_string(),
|
|
||||||
block_datetime: to_timestampz(rpc_confirmed_transaction.block_time.unwrap() as u64),
|
|
||||||
slot: rpc_confirmed_transaction.slot,
|
|
||||||
err: rpc_confirmed_transaction.err.is_some(),
|
|
||||||
processed: false,
|
|
||||||
worker_partition: (rpc_confirmed_transaction.slot % NUM_TRANSACTION_PARTITIONS) as i32,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn from_row(row: Row) -> Self {
|
|
||||||
let slot_raw = row.get::<usize, i64>(3);
|
|
||||||
PgTransaction {
|
|
||||||
signature: row.get(0),
|
|
||||||
program_pk: row.get(1),
|
|
||||||
block_datetime: row.get(2),
|
|
||||||
slot: slot_raw as u64,
|
|
||||||
err: row.get(4),
|
|
||||||
processed: row.get(5),
|
|
||||||
worker_partition: row.get(6),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub enum ProcessState {
|
|
||||||
Processed,
|
|
||||||
Unprocessed,
|
|
||||||
}
|
|
|
@ -1,15 +1,22 @@
|
||||||
use chrono::{DateTime, Duration, DurationRound, Utc};
|
use chrono::{DateTime, Duration, DurationRound, Utc};
|
||||||
use deadpool_postgres::Pool;
|
use deadpool_postgres::Pool;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use std::cmp::max;
|
use std::cmp::{max, min};
|
||||||
|
use strum::IntoEnumIterator;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
database::fetch::{fetch_candles_from, fetch_earliest_candles, fetch_latest_finished_candle},
|
database::{
|
||||||
|
fetch::{
|
||||||
|
fetch_candles_from, fetch_earliest_candles,
|
||||||
|
fetch_latest_finished_candle,
|
||||||
|
},
|
||||||
|
insert::build_candles_upsert_statement,
|
||||||
|
},
|
||||||
structs::{
|
structs::{
|
||||||
candle::Candle,
|
candle::Candle,
|
||||||
resolution::{day, Resolution},
|
resolution::{day, Resolution},
|
||||||
},
|
},
|
||||||
utils::{f64_max, f64_min},
|
utils::{f64_max, f64_min, AnyhowWrap},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub async fn batch_higher_order_candles(
|
pub async fn batch_higher_order_candles(
|
||||||
|
@ -34,12 +41,8 @@ pub async fn batch_higher_order_candles(
|
||||||
if constituent_candles.is_empty() {
|
if constituent_candles.is_empty() {
|
||||||
return Ok(Vec::new());
|
return Ok(Vec::new());
|
||||||
}
|
}
|
||||||
let combined_candles = combine_into_higher_order_candles(
|
let combined_candles =
|
||||||
&mut constituent_candles,
|
combine_into_higher_order_candles(&mut constituent_candles, resolution, start_time);
|
||||||
resolution,
|
|
||||||
start_time,
|
|
||||||
candle,
|
|
||||||
);
|
|
||||||
Ok(combined_candles)
|
Ok(combined_candles)
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
|
@ -61,13 +64,8 @@ pub async fn batch_higher_order_candles(
|
||||||
return Ok(Vec::new());
|
return Ok(Vec::new());
|
||||||
}
|
}
|
||||||
|
|
||||||
let seed_candle = constituent_candles[0].clone();
|
let combined_candles =
|
||||||
let combined_candles = combine_into_higher_order_candles(
|
combine_into_higher_order_candles(&mut constituent_candles, resolution, start_time);
|
||||||
&mut constituent_candles,
|
|
||||||
resolution,
|
|
||||||
start_time,
|
|
||||||
seed_candle,
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(trim_candles(
|
Ok(trim_candles(
|
||||||
combined_candles,
|
combined_candles,
|
||||||
|
@ -78,10 +76,9 @@ pub async fn batch_higher_order_candles(
|
||||||
}
|
}
|
||||||
|
|
||||||
fn combine_into_higher_order_candles(
|
fn combine_into_higher_order_candles(
|
||||||
constituent_candles: &mut Vec<Candle>,
|
constituent_candles: &Vec<Candle>,
|
||||||
target_resolution: Resolution,
|
target_resolution: Resolution,
|
||||||
st: DateTime<Utc>,
|
st: DateTime<Utc>,
|
||||||
seed_candle: Candle,
|
|
||||||
) -> Vec<Candle> {
|
) -> Vec<Candle> {
|
||||||
debug!("combining for target_resolution: {}", target_resolution);
|
debug!("combining for target_resolution: {}", target_resolution);
|
||||||
|
|
||||||
|
@ -92,7 +89,7 @@ fn combine_into_higher_order_candles(
|
||||||
target_resolution,
|
target_resolution,
|
||||||
);
|
);
|
||||||
let now = Utc::now().duration_trunc(Duration::minutes(1)).unwrap();
|
let now = Utc::now().duration_trunc(Duration::minutes(1)).unwrap();
|
||||||
let candle_window = now - st;
|
let candle_window = min(now - st, day());
|
||||||
let num_candles = max(
|
let num_candles = max(
|
||||||
1,
|
1,
|
||||||
(candle_window.num_minutes() / duration.num_minutes()) as usize + 1,
|
(candle_window.num_minutes() / duration.num_minutes()) as usize + 1,
|
||||||
|
@ -100,17 +97,16 @@ fn combine_into_higher_order_candles(
|
||||||
|
|
||||||
let mut combined_candles = vec![empty_candle; num_candles];
|
let mut combined_candles = vec![empty_candle; num_candles];
|
||||||
|
|
||||||
let mut con_iter = constituent_candles.iter_mut().peekable();
|
let mut last_close = constituent_candles[0].close;
|
||||||
|
let mut con_iter = constituent_candles.iter().peekable();
|
||||||
let mut start_time = st;
|
let mut start_time = st;
|
||||||
let mut end_time = start_time + duration;
|
let mut end_time = start_time + duration;
|
||||||
|
|
||||||
let mut last_candle = seed_candle;
|
|
||||||
|
|
||||||
for i in 0..combined_candles.len() {
|
for i in 0..combined_candles.len() {
|
||||||
combined_candles[i].open = last_candle.close;
|
combined_candles[i].open = last_close;
|
||||||
combined_candles[i].low = last_candle.close;
|
combined_candles[i].low = last_close;
|
||||||
combined_candles[i].close = last_candle.close;
|
combined_candles[i].close = last_close;
|
||||||
combined_candles[i].high = last_candle.close;
|
combined_candles[i].high = last_close;
|
||||||
|
|
||||||
while matches!(con_iter.peek(), Some(c) if c.end_time <= end_time) {
|
while matches!(con_iter.peek(), Some(c) if c.end_time <= end_time) {
|
||||||
let unit_candle = con_iter.next().unwrap();
|
let unit_candle = con_iter.next().unwrap();
|
||||||
|
@ -128,7 +124,7 @@ fn combine_into_higher_order_candles(
|
||||||
start_time = end_time;
|
start_time = end_time;
|
||||||
end_time += duration;
|
end_time += duration;
|
||||||
|
|
||||||
last_candle = combined_candles[i].clone();
|
last_close = combined_candles[i].close;
|
||||||
}
|
}
|
||||||
|
|
||||||
combined_candles
|
combined_candles
|
||||||
|
@ -149,25 +145,38 @@ fn trim_candles(mut c: Vec<Candle>, start_time: DateTime<Utc>) -> Vec<Candle> {
|
||||||
pub async fn backfill_batch_higher_order_candles(
|
pub async fn backfill_batch_higher_order_candles(
|
||||||
pool: &Pool,
|
pool: &Pool,
|
||||||
market_name: &str,
|
market_name: &str,
|
||||||
resolution: Resolution,
|
) -> anyhow::Result<()> {
|
||||||
) -> anyhow::Result<Vec<Candle>> {
|
let earliest_candles = fetch_earliest_candles(pool, market_name, Resolution::R1m).await?;
|
||||||
let mut constituent_candles =
|
let mut start_time = earliest_candles[0].start_time.duration_trunc(day())?;
|
||||||
fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution()).await?;
|
while start_time < Utc::now() {
|
||||||
if constituent_candles.is_empty() {
|
let mut candles = vec![];
|
||||||
return Ok(vec![]);
|
let mut constituent_candles = fetch_candles_from(
|
||||||
}
|
pool,
|
||||||
let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
|
market_name,
|
||||||
|
Resolution::R1m,
|
||||||
let seed_candle = constituent_candles[0].clone();
|
|
||||||
let combined_candles = combine_into_higher_order_candles(
|
|
||||||
&mut constituent_candles,
|
|
||||||
resolution,
|
|
||||||
start_time,
|
start_time,
|
||||||
seed_candle,
|
start_time + day(),
|
||||||
);
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(trim_candles(
|
for resolution in Resolution::iter() {
|
||||||
combined_candles,
|
if resolution == Resolution::R1m {
|
||||||
constituent_candles[0].start_time,
|
continue;
|
||||||
))
|
}
|
||||||
|
let mut combined_candles =
|
||||||
|
combine_into_higher_order_candles(&mut constituent_candles, resolution, start_time);
|
||||||
|
candles.append(&mut combined_candles);
|
||||||
|
}
|
||||||
|
|
||||||
|
let upsert_statement = build_candles_upsert_statement(&candles);
|
||||||
|
let client = pool.get().await.unwrap();
|
||||||
|
client
|
||||||
|
.execute(&upsert_statement, &[])
|
||||||
|
.await
|
||||||
|
.map_err_anyhow()?;
|
||||||
|
// println!("{:?} {:?} done", market_name, start_time);
|
||||||
|
start_time += day();
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,18 +1,26 @@
|
||||||
use std::cmp::min;
|
use std::{cmp::min, collections::HashMap};
|
||||||
|
|
||||||
use chrono::{DateTime, Duration, DurationRound, Utc};
|
use chrono::{DateTime, Duration, DurationRound, Utc};
|
||||||
use deadpool_postgres::Pool;
|
use deadpool_postgres::Pool;
|
||||||
|
use itertools::Itertools;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
|
|
||||||
|
use crate::database::backfill::{
|
||||||
|
fetch_earliest_fill_multiple_markets, fetch_fills_multiple_markets_from,
|
||||||
|
fetch_last_minute_candles,
|
||||||
|
};
|
||||||
use crate::{
|
use crate::{
|
||||||
database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
|
database::{
|
||||||
|
fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
|
||||||
|
insert::build_candles_upsert_statement,
|
||||||
|
},
|
||||||
structs::{
|
structs::{
|
||||||
candle::Candle,
|
candle::{Candle},
|
||||||
markets::MarketInfo,
|
markets::MarketInfo,
|
||||||
openbook::{calculate_fill_price_and_size, PgOpenBookFill},
|
openbook::PgOpenBookFill,
|
||||||
resolution::{day, Resolution},
|
resolution::{day, Resolution},
|
||||||
},
|
},
|
||||||
utils::{f64_max, f64_min},
|
utils::{f64_max, f64_min, AnyhowWrap},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Result<Vec<Candle>> {
|
pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Result<Vec<Candle>> {
|
||||||
|
@ -86,9 +94,7 @@ fn combine_fills_into_1m_candles(
|
||||||
Some(p) => p,
|
Some(p) => p,
|
||||||
None => {
|
None => {
|
||||||
let first = fills_iter.peek().unwrap();
|
let first = fills_iter.peek().unwrap();
|
||||||
let (price, _) =
|
first.price
|
||||||
calculate_fill_price_and_size(**first, market.base_decimals, market.quote_decimals);
|
|
||||||
price
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -100,15 +106,13 @@ fn combine_fills_into_1m_candles(
|
||||||
|
|
||||||
while matches!(fills_iter.peek(), Some(f) if f.time < end_time) {
|
while matches!(fills_iter.peek(), Some(f) if f.time < end_time) {
|
||||||
let fill = fills_iter.next().unwrap();
|
let fill = fills_iter.next().unwrap();
|
||||||
let (price, volume) =
|
|
||||||
calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals);
|
|
||||||
|
|
||||||
candles[i].close = price;
|
candles[i].close = fill.price;
|
||||||
candles[i].low = f64_min(price, candles[i].low);
|
candles[i].low = f64_min(fill.price, candles[i].low);
|
||||||
candles[i].high = f64_max(price, candles[i].high);
|
candles[i].high = f64_max(fill.price, candles[i].high);
|
||||||
candles[i].volume += volume;
|
candles[i].volume += fill.size;
|
||||||
|
|
||||||
last_price = price;
|
last_price = fill.price;
|
||||||
}
|
}
|
||||||
|
|
||||||
candles[i].start_time = start_time;
|
candles[i].start_time = start_time;
|
||||||
|
@ -125,17 +129,19 @@ fn combine_fills_into_1m_candles(
|
||||||
/// Goes from the earliest fill to the most recent. Will mark candles as complete if there are missing gaps of fills between the start and end.
|
/// Goes from the earliest fill to the most recent. Will mark candles as complete if there are missing gaps of fills between the start and end.
|
||||||
pub async fn backfill_batch_1m_candles(
|
pub async fn backfill_batch_1m_candles(
|
||||||
pool: &Pool,
|
pool: &Pool,
|
||||||
market: &MarketInfo,
|
markets: Vec<MarketInfo>,
|
||||||
) -> anyhow::Result<Vec<Candle>> {
|
) -> anyhow::Result<()> {
|
||||||
let market_name = &market.name;
|
let market_address_strings: Vec<String> = markets.iter().map(|m| m.address.clone()).collect();
|
||||||
let market_address = &market.address;
|
let mut candle_container = HashMap::new();
|
||||||
let mut candles = vec![];
|
let client = pool.get().await?;
|
||||||
|
|
||||||
let earliest_fill = fetch_earliest_fill(pool, &market.address).await?;
|
let earliest_fill =
|
||||||
|
fetch_earliest_fill_multiple_markets(&client, &market_address_strings).await?;
|
||||||
if earliest_fill.is_none() {
|
if earliest_fill.is_none() {
|
||||||
debug!("No fills found for: {:?}", &market_name);
|
println!("No fills found for backfill");
|
||||||
return Ok(candles);
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
println!("Found earliset fill for backfill");
|
||||||
|
|
||||||
let mut start_time = earliest_fill
|
let mut start_time = earliest_fill
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
@ -146,13 +152,78 @@ pub async fn backfill_batch_1m_candles(
|
||||||
start_time + day(),
|
start_time + day(),
|
||||||
Utc::now().duration_trunc(Duration::minutes(1))?,
|
Utc::now().duration_trunc(Duration::minutes(1))?,
|
||||||
);
|
);
|
||||||
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
|
let last_candles = fetch_last_minute_candles(&client).await?;
|
||||||
if !fills.is_empty() {
|
let all_fills = fetch_fills_multiple_markets_from(
|
||||||
let mut minute_candles =
|
&client,
|
||||||
|
&market_address_strings,
|
||||||
|
start_time,
|
||||||
|
end_time,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
// println!("{:?} {:?}", start_time, end_time);
|
||||||
|
// println!("all fills len : {:?}", all_fills.len());
|
||||||
|
println!("{:?}", all_fills[0]);
|
||||||
|
// println!("{:?}", all_fills[1]);
|
||||||
|
// println!("Fetched multiple fills for backfill");
|
||||||
|
let fills_groups = all_fills
|
||||||
|
.into_iter()
|
||||||
|
.sorted_by(|a, b| Ord::cmp(&a.market_key, &b.market_key))
|
||||||
|
.group_by(|f| f.market_key.clone());
|
||||||
|
|
||||||
|
let fills_by_market: Vec<(String, Vec<PgOpenBookFill>)> = fills_groups
|
||||||
|
.into_iter()
|
||||||
|
.map(|(m, group)| (m, group.collect()))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
println!("fbm len : {:?}", fills_by_market.len());
|
||||||
|
// sort fills by market, make candles
|
||||||
|
for (_, mut fills) in fills_by_market {
|
||||||
|
let market = markets
|
||||||
|
.iter()
|
||||||
|
.find(|m| m.address == fills[0].market_key)
|
||||||
|
.unwrap();
|
||||||
|
let minute_candles =
|
||||||
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
|
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
|
||||||
candles.append(&mut minute_candles);
|
candle_container.insert(&market.address, minute_candles);
|
||||||
}
|
}
|
||||||
start_time += day()
|
|
||||||
|
// where no candles, make empty ones
|
||||||
|
for (k, v) in candle_container.iter_mut() {
|
||||||
|
if v.is_empty() {
|
||||||
|
let market = markets.iter().find(|m| &m.address == *k).unwrap();
|
||||||
|
let last_candle = last_candles
|
||||||
|
.iter()
|
||||||
|
.find(|c| c.market_name == market.name)
|
||||||
|
.unwrap();
|
||||||
|
let empty_candles = combine_fills_into_1m_candles(
|
||||||
|
&mut vec![],
|
||||||
|
market,
|
||||||
|
start_time,
|
||||||
|
end_time,
|
||||||
|
Some(last_candle.close),
|
||||||
|
);
|
||||||
|
*v = empty_candles;
|
||||||
}
|
}
|
||||||
Ok(candles)
|
}
|
||||||
|
|
||||||
|
// insert candles in batches
|
||||||
|
for candles in candle_container.values() {
|
||||||
|
let candle_chunks: Vec<Vec<Candle>> =
|
||||||
|
candles.chunks(1500).map(|chunk| chunk.to_vec()).collect(); // 1440 minutes in a day
|
||||||
|
for c in candle_chunks {
|
||||||
|
let upsert_statement = build_candles_upsert_statement(&c);
|
||||||
|
client
|
||||||
|
.execute(&upsert_statement, &[])
|
||||||
|
.await
|
||||||
|
.map_err_anyhow()?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// reset entries but keep markets we've seen for blank candles
|
||||||
|
for (_, v) in candle_container.iter_mut() {
|
||||||
|
*v = vec![];
|
||||||
|
}
|
||||||
|
println!("day done");
|
||||||
|
start_time += day();
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,6 @@ use super::metrics::METRIC_CANDLES_TOTAL;
|
||||||
pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
|
pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
|
||||||
loop {
|
loop {
|
||||||
let market_clone = market.clone();
|
let market_clone = market.clone();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
sleep(Duration::milliseconds(5000).to_std()?).await;
|
sleep(Duration::milliseconds(5000).to_std()?).await;
|
||||||
match batch_inner(pool, &market_clone).await {
|
match batch_inner(pool, &market_clone).await {
|
||||||
|
@ -43,6 +42,9 @@ pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
|
||||||
async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
|
async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
|
||||||
let market_name = &market.name.clone();
|
let market_name = &market.name.clone();
|
||||||
let candles = batch_1m_candles(pool, market).await?;
|
let candles = batch_1m_candles(pool, market).await?;
|
||||||
|
if candles.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
METRIC_CANDLES_TOTAL
|
METRIC_CANDLES_TOTAL
|
||||||
.with_label_values(&[market.name.as_str()])
|
.with_label_values(&[market.name.as_str()])
|
||||||
.inc_by(candles.clone().len() as u64);
|
.inc_by(candles.clone().len() as u64);
|
||||||
|
|
|
@ -1,11 +1,9 @@
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
|
use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
|
||||||
use openbook_candles::structs::transaction::NUM_TRANSACTION_PARTITIONS;
|
|
||||||
use openbook_candles::utils::Config;
|
use openbook_candles::utils::Config;
|
||||||
use openbook_candles::worker::metrics::{
|
use openbook_candles::worker::metrics::{
|
||||||
serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE,
|
serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE,
|
||||||
};
|
};
|
||||||
use openbook_candles::worker::trade_fetching::scrape::{scrape_fills, scrape_signatures};
|
|
||||||
use openbook_candles::{
|
use openbook_candles::{
|
||||||
database::initialize::{connect_to_database, setup_database},
|
database::initialize::{connect_to_database, setup_database},
|
||||||
worker::candle_batching::batch_for_market,
|
worker::candle_batching::batch_for_market,
|
||||||
|
@ -40,25 +38,6 @@ async fn main() -> anyhow::Result<()> {
|
||||||
setup_database(&pool).await?;
|
setup_database(&pool).await?;
|
||||||
let mut handles = vec![];
|
let mut handles = vec![];
|
||||||
|
|
||||||
// signature scraping
|
|
||||||
let rpc_clone = rpc_url.clone();
|
|
||||||
let pool_clone = pool.clone();
|
|
||||||
handles.push(tokio::spawn(async move {
|
|
||||||
scrape_signatures(rpc_clone, &pool_clone).await.unwrap();
|
|
||||||
}));
|
|
||||||
|
|
||||||
// transaction/fill scraping
|
|
||||||
for id in 0..NUM_TRANSACTION_PARTITIONS {
|
|
||||||
let rpc_clone = rpc_url.clone();
|
|
||||||
let pool_clone = pool.clone();
|
|
||||||
let markets_clone = target_markets.clone();
|
|
||||||
handles.push(tokio::spawn(async move {
|
|
||||||
scrape_fills(id as i32, rpc_clone, &pool_clone, &markets_clone)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
// candle batching
|
// candle batching
|
||||||
for market in market_infos.into_iter() {
|
for market in market_infos.into_iter() {
|
||||||
let batch_pool = pool.clone();
|
let batch_pool = pool.clone();
|
||||||
|
|
|
@ -1,3 +1,2 @@
|
||||||
pub mod candle_batching;
|
pub mod candle_batching;
|
||||||
pub mod metrics;
|
pub mod metrics;
|
||||||
pub mod trade_fetching;
|
|
||||||
|
|
|
@ -1,2 +0,0 @@
|
||||||
pub mod parsing;
|
|
||||||
pub mod scrape;
|
|
|
@ -1,94 +0,0 @@
|
||||||
use log::warn;
|
|
||||||
use solana_client::client_error::Result as ClientResult;
|
|
||||||
use solana_sdk::pubkey::Pubkey;
|
|
||||||
use solana_transaction_status::{
|
|
||||||
option_serializer::OptionSerializer, EncodedConfirmedTransactionWithStatusMeta,
|
|
||||||
};
|
|
||||||
use std::{collections::HashMap, io::Error};
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw},
|
|
||||||
worker::metrics::METRIC_RPC_ERRORS_TOTAL,
|
|
||||||
};
|
|
||||||
|
|
||||||
const PROGRAM_DATA: &str = "Program data: ";
|
|
||||||
|
|
||||||
pub fn parse_trades_from_openbook_txns(
|
|
||||||
txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>,
|
|
||||||
mut sig_strings: Vec<String>,
|
|
||||||
target_markets: &HashMap<Pubkey, String>,
|
|
||||||
) -> (Vec<OpenBookFillEvent>, Vec<String>) {
|
|
||||||
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
|
|
||||||
let mut failed_sigs = vec![];
|
|
||||||
for (idx, txn) in txns.iter_mut().enumerate() {
|
|
||||||
match txn {
|
|
||||||
Ok(t) => {
|
|
||||||
if let Some(m) = &t.transaction.meta {
|
|
||||||
match &m.log_messages {
|
|
||||||
OptionSerializer::Some(logs) => {
|
|
||||||
match parse_openbook_fills_from_logs(
|
|
||||||
logs,
|
|
||||||
target_markets,
|
|
||||||
sig_strings[idx].clone(),
|
|
||||||
t.block_time.unwrap(),
|
|
||||||
) {
|
|
||||||
Some(mut events) => fills_vector.append(&mut events),
|
|
||||||
None => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
OptionSerializer::None => {}
|
|
||||||
OptionSerializer::Skip => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!("rpc error in get_transaction {}", e);
|
|
||||||
failed_sigs.push(sig_strings[idx].clone());
|
|
||||||
METRIC_RPC_ERRORS_TOTAL
|
|
||||||
.with_label_values(&["getTransaction"])
|
|
||||||
.inc();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sig_strings.retain(|s| !failed_sigs.contains(s));
|
|
||||||
(fills_vector, sig_strings)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn parse_openbook_fills_from_logs(
|
|
||||||
logs: &Vec<String>,
|
|
||||||
target_markets: &HashMap<Pubkey, String>,
|
|
||||||
signature: String,
|
|
||||||
block_time: i64,
|
|
||||||
) -> Option<Vec<OpenBookFillEvent>> {
|
|
||||||
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
|
|
||||||
for (idx, l) in logs.iter().enumerate() {
|
|
||||||
match l.strip_prefix(PROGRAM_DATA) {
|
|
||||||
Some(log) => {
|
|
||||||
let borsh_bytes = match anchor_lang::__private::base64::decode(log) {
|
|
||||||
Ok(borsh_bytes) => borsh_bytes,
|
|
||||||
_ => continue,
|
|
||||||
};
|
|
||||||
let mut slice: &[u8] = &borsh_bytes[8..];
|
|
||||||
let event: Result<OpenBookFillEventRaw, Error> =
|
|
||||||
anchor_lang::AnchorDeserialize::deserialize(&mut slice);
|
|
||||||
|
|
||||||
match event {
|
|
||||||
Ok(e) => {
|
|
||||||
let fill_event = e.into_event(signature.clone(), block_time, idx);
|
|
||||||
if target_markets.contains_key(&fill_event.market) {
|
|
||||||
fills_vector.push(fill_event);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => continue,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => (),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !fills_vector.is_empty() {
|
|
||||||
Some(fills_vector)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,119 +0,0 @@
|
||||||
use deadpool_postgres::Pool;
|
|
||||||
use futures::future::join_all;
|
|
||||||
use log::{debug, warn};
|
|
||||||
use solana_client::{
|
|
||||||
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
|
|
||||||
rpc_config::RpcTransactionConfig,
|
|
||||||
};
|
|
||||||
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature};
|
|
||||||
use solana_transaction_status::UiTransactionEncoding;
|
|
||||||
use std::{collections::HashMap, time::Duration as WaitDuration};
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
database::{
|
|
||||||
fetch::fetch_worker_transactions,
|
|
||||||
insert::{build_transactions_insert_statement, insert_fills_atomically},
|
|
||||||
},
|
|
||||||
structs::transaction::PgTransaction,
|
|
||||||
utils::{AnyhowWrap, OPENBOOK_KEY},
|
|
||||||
worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL, METRIC_TRANSACTIONS_TOTAL},
|
|
||||||
};
|
|
||||||
|
|
||||||
use super::parsing::parse_trades_from_openbook_txns;
|
|
||||||
|
|
||||||
pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<()> {
|
|
||||||
let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let rpc_config = GetConfirmedSignaturesForAddress2Config {
|
|
||||||
before: None,
|
|
||||||
until: None,
|
|
||||||
limit: None,
|
|
||||||
commitment: Some(CommitmentConfig::confirmed()),
|
|
||||||
};
|
|
||||||
|
|
||||||
let sigs = match rpc_client
|
|
||||||
.get_signatures_for_address_with_config(&OPENBOOK_KEY, rpc_config)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(sigs) => sigs,
|
|
||||||
Err(e) => {
|
|
||||||
warn!("rpc error in get_signatures_for_address_with_config: {}", e);
|
|
||||||
METRIC_RPC_ERRORS_TOTAL
|
|
||||||
.with_label_values(&["getSignaturesForAddress"])
|
|
||||||
.inc();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if sigs.is_empty() {
|
|
||||||
debug!("No signatures found, trying again");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let transactions: Vec<PgTransaction> = sigs
|
|
||||||
.into_iter()
|
|
||||||
.map(PgTransaction::from_rpc_confirmed_transaction)
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
debug!("Scraper writing: {:?} txns to DB\n", transactions.len());
|
|
||||||
let upsert_statement = build_transactions_insert_statement(transactions);
|
|
||||||
let client = pool.get().await?;
|
|
||||||
let num_txns = client
|
|
||||||
.execute(&upsert_statement, &[])
|
|
||||||
.await
|
|
||||||
.map_err_anyhow()?;
|
|
||||||
METRIC_TRANSACTIONS_TOTAL.inc_by(num_txns);
|
|
||||||
}
|
|
||||||
// TODO: graceful shutdown
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn scrape_fills(
|
|
||||||
worker_id: i32,
|
|
||||||
rpc_url: String,
|
|
||||||
pool: &Pool,
|
|
||||||
target_markets: &HashMap<Pubkey, String>,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
debug!("Worker {} started \n", worker_id);
|
|
||||||
let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed());
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let transactions = fetch_worker_transactions(worker_id, pool).await?;
|
|
||||||
if transactions.is_empty() {
|
|
||||||
debug!("No signatures found by worker {}", worker_id);
|
|
||||||
tokio::time::sleep(WaitDuration::from_secs(1)).await;
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
|
|
||||||
// for each signature, fetch the transaction
|
|
||||||
let txn_config = RpcTransactionConfig {
|
|
||||||
encoding: Some(UiTransactionEncoding::Json),
|
|
||||||
commitment: Some(CommitmentConfig::confirmed()),
|
|
||||||
max_supported_transaction_version: Some(0),
|
|
||||||
};
|
|
||||||
|
|
||||||
let sig_strings = transactions
|
|
||||||
.iter()
|
|
||||||
.map(|t| t.signature.clone())
|
|
||||||
.collect::<Vec<String>>();
|
|
||||||
|
|
||||||
let signatures: Vec<_> = transactions
|
|
||||||
.into_iter()
|
|
||||||
.map(|t| t.signature.parse::<Signature>().unwrap())
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let txn_futs: Vec<_> = signatures
|
|
||||||
.iter()
|
|
||||||
.map(|s| rpc_client.get_transaction_with_config(s, txn_config))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let mut txns = join_all(txn_futs).await;
|
|
||||||
|
|
||||||
let (fills, completed_sigs) =
|
|
||||||
parse_trades_from_openbook_txns(&mut txns, sig_strings, target_markets);
|
|
||||||
for fill in fills.iter() {
|
|
||||||
let market_name = target_markets.get(&fill.market).unwrap();
|
|
||||||
METRIC_FILLS_TOTAL.with_label_values(&[market_name]).inc();
|
|
||||||
}
|
|
||||||
// Write fills to the database, and update properly fetched transactions as processed
|
|
||||||
insert_fills_atomically(pool, worker_id, fills, completed_sigs).await?;
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue