Compare commits

...

15 Commits

Author SHA1 Message Date
riordanp 9b69e380c3
Remove combined deploy config (#1) 2023-05-23 12:37:58 +01:00
dboures df1af2c10d
chore: remove illiquid markets 2023-05-20 14:18:45 -05:00
dboures 34117ca95e
fix: query error in /coingecko/tickers 2023-05-20 13:59:56 -05:00
dboures 6e16ef8e04
lint: cargo clippy 2023-05-19 19:16:37 -05:00
dboures 7ae8339ebb
feat: split backfills into trades and candles 2023-05-19 19:15:13 -05:00
dboures 8bbf89677c
docs: update .env-example 2023-05-19 14:51:20 -05:00
dboures 0e821cc3c1
feat: fill events include block time (fixes backfill issues) 2023-05-19 14:50:56 -05:00
Riordan Panayides 872e6fcdb2 Fix reading postgres config from env 2023-05-18 19:41:17 +01:00
Riordan Panayides 5b61f3d949 wip: many changes for deployment
- split out postgres config and read from env
- make ssl optional
- revert to separate dockerfiles
- give tokio enough workers
- allow custom server bind address
- fix warnings
- cargo fmt
2023-05-17 17:11:17 +01:00
Riordan Panayides 697a20d9ff Fix backfill 2023-05-16 15:59:33 +01:00
Riordan Panayides cccc7accd5 Merge remote-tracking branch 'upstream/tokio-postgres' into pan/fly-deploy 2023-05-15 13:05:06 +01:00
dboures c20429bcc5
wip: attempt to use SSL cert 2023-05-14 03:21:34 -05:00
dboures a4bf9a35be
refactor: server uses tokio-postgres instead of sqlx 2023-05-14 03:09:12 -05:00
dboures cead78381d
refactor: worker uses tokio-postgres instead of sqlx 2023-05-14 02:15:10 -05:00
Riordan Panayides 28dc3c5e59 Add combined dockerfile, fly config 2023-05-13 14:52:00 +01:00
30 changed files with 1227 additions and 991 deletions

View File

@ -1,5 +1,11 @@
RPC_URL=http://solana-mainnet-api.rpc-node.com RPC_URL=http://solana-mainnet-api.rpc-node.com
DATABASE_URL= SERVER_BIND_ADDR="[::]:8080"
SQLX_OFFLINE=true PG_HOST=127.0.0.1
MAX_PG_POOL_CONNS_WORKER=5 PG_PORT=5432
MAX_PG_POOL_CONNS_SERVER=15 PG_USER=postgres
PG_PASSWORD=
PG_DBNAME=postgres
PG_MAX_POOL_CONNECTIONS=10
PG_USE_SSL=false
PG_CA_CERT_PATH=
PG_CLIENT_KEY_PATH=

752
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -16,8 +16,12 @@ name = "server"
path = "src/server/main.rs" path = "src/server/main.rs"
[[bin]] [[bin]]
name = "backfill" name = "backfill-trades"
path = "src/backfill/main.rs" path = "src/backfill-trades/main.rs"
[[bin]]
name = "backfill-candles"
path = "src/backfill-candles/main.rs"
[dependencies] [dependencies]
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
@ -26,7 +30,10 @@ futures = "0.3.27"
jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] } jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] }
sqlx = { version = "0.6", features = [ "runtime-tokio-native-tls" , "postgres", "chrono", "decimal", "offline" ] } deadpool-postgres = { version = "0.10.5", features = [ "rt_tokio_1", "serde" ] }
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
postgres-native-tls = "0.5.0"
native-tls = "0.2.11"
chrono = "0.4.23" chrono = "0.4.23"
solana-client = "=1.14.13" solana-client = "=1.14.13"
@ -58,4 +65,6 @@ actix-web = "4"
arrayref = "0.3.6" arrayref = "0.3.6"
bytemuck = "1.12.3" bytemuck = "1.12.3"
num_enum = "0.6.1" num_enum = "0.6.1"
config = "0.13.1"

View File

@ -19,4 +19,6 @@ RUN apt-get update && apt-get -y install ca-certificates libssl1.1
# We do not need the Rust toolchain to run the binary! # We do not need the Rust toolchain to run the binary!
FROM base_image AS runtime FROM base_image AS runtime
COPY --from=builder /target/release/server /usr/local/bin COPY --from=builder /target/release/server /usr/local/bin
ENTRYPOINT ["/usr/local/bin/server"] COPY --from=builder markets.json .
COPY --from=builder ca.cer .
COPY --from=builder client.pks .

View File

@ -2,13 +2,13 @@ FROM lukemathwalker/cargo-chef:latest-rust-1.67.1-slim AS chef
FROM chef AS planner FROM chef AS planner
COPY . . COPY . .
RUN cargo chef prepare --recipe-path worker-recipe.json RUN cargo chef prepare --recipe-path recipe.json
FROM chef AS builder FROM chef AS builder
COPY --from=planner worker-recipe.json worker-recipe.json COPY --from=planner recipe.json recipe.json
RUN apt-get update && apt-get install -y libudev-dev clang pkg-config libssl-dev build-essential cmake RUN apt-get update && apt-get install -y libudev-dev clang pkg-config libssl-dev build-essential cmake
RUN rustup component add rustfmt && update-ca-certificates RUN rustup component add rustfmt && update-ca-certificates
RUN cargo chef cook --release --recipe-path worker-recipe.json RUN cargo chef cook --release --recipe-path recipe.json
# Build application # Build application
COPY . . COPY . .
RUN cargo build --release --bin worker RUN cargo build --release --bin worker
@ -19,4 +19,6 @@ RUN apt-get update && apt-get -y install ca-certificates libssl1.1
# We do not need the Rust toolchain to run the binary! # We do not need the Rust toolchain to run the binary!
FROM base_image AS runtime FROM base_image AS runtime
COPY --from=builder /target/release/worker /usr/local/bin COPY --from=builder /target/release/worker /usr/local/bin
ENTRYPOINT ["/usr/local/bin/worker"] COPY --from=builder markets.json .
COPY --from=builder ca.cer .
COPY --from=builder client.pks .

19
cd/server.toml Normal file
View File

@ -0,0 +1,19 @@
app = "openbook-candles-server"
kill_signal = "SIGTERM"
kill_timeout = 30
[build]
dockerfile = "../Dockerfile.server"
[experimental]
cmd = ["server", "markets.json"]
[[services]]
internal_port = 8080
processes = ["app"]
protocol = "tcp"
[services.concurrency]
hard_limit = 1024
soft_limit = 1024
type = "connections"

9
cd/worker.toml Normal file
View File

@ -0,0 +1,9 @@
app = "openbook-candles-worker"
kill_signal = "SIGTERM"
kill_timeout = 30
[build]
dockerfile = "../Dockerfile.worker"
[experimental]
cmd = ["worker", "markets.json"]

View File

@ -23,10 +23,6 @@
"name": "mSOL/USDC", "name": "mSOL/USDC",
"address": "9Lyhks5bQQxb9EyyX55NtgKQzpM4WK7JCmeaWuQ5MoXD" "address": "9Lyhks5bQQxb9EyyX55NtgKQzpM4WK7JCmeaWuQ5MoXD"
}, },
{
"name": "stSOL/USDC",
"address": "JCKa72xFYGWBEVJZ7AKZ2ofugWPBfrrouQviaGaohi3R"
},
{ {
"name": "SOL/USDT", "name": "SOL/USDT",
"address": "2AdaV97p6SfkuMQJdu8DHhBhmJe7oWdvbm52MJfYQmfA" "address": "2AdaV97p6SfkuMQJdu8DHhBhmJe7oWdvbm52MJfYQmfA"
@ -35,10 +31,6 @@
"name": "USDT/USDC", "name": "USDT/USDC",
"address": "B2na8Awyd7cpC59iEU43FagJAPLigr3AP3s38KM982bu" "address": "B2na8Awyd7cpC59iEU43FagJAPLigr3AP3s38KM982bu"
}, },
{
"name": "USDT/USDC",
"address": "B2na8Awyd7cpC59iEU43FagJAPLigr3AP3s38KM982bu"
},
{ {
"name": "ETH/USDC", "name": "ETH/USDC",
"address": "BbJgE7HZMaDp5NTYvRh5jZSkQPVDTU8ubPFtpogUkEj4" "address": "BbJgE7HZMaDp5NTYvRh5jZSkQPVDTU8ubPFtpogUkEj4"
@ -50,9 +42,5 @@
{ {
"name": "RAY/USDT", "name": "RAY/USDT",
"address": "GpHbiJJ9VHiuHVXeoet121Utrbm1CSNNzYrBKB8Xz2oz" "address": "GpHbiJJ9VHiuHVXeoet121Utrbm1CSNNzYrBKB8Xz2oz"
},
{
"name": "SLND/USDC",
"address": "HTHMfoxePjcXFhrV74pfCUNoWGe374ecFwiDjPGTkzHr"
} }
] ]

View File

@ -0,0 +1,69 @@
use deadpool_postgres::Object;
use openbook_candles::{
database::{
initialize::connect_to_database,
insert::{build_candles_upsert_statement},
},
structs::{
candle::Candle,
markets::{fetch_market_infos, load_markets},
resolution::Resolution,
},
utils::{AnyhowWrap, Config},
worker::candle_batching::{
higher_order_candles::backfill_batch_higher_order_candles,
minute_candles::backfill_batch_1m_candles,
},
};
use std::{env};
use strum::IntoEnumIterator;
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
let args: Vec<String> = env::args().collect();
assert!(args.len() == 2);
let path_to_markets_json = &args[1];
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
let config = Config {
rpc_url: rpc_url.clone(),
};
let markets = load_markets(path_to_markets_json);
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
println!("Backfilling candles for {:?}", markets);
let pool = connect_to_database().await?;
for market in market_infos.into_iter() {
let client = pool.get().await?;
let minute_candles = backfill_batch_1m_candles(&pool, &market).await?;
save_candles(minute_candles, client).await?;
for resolution in Resolution::iter() {
if resolution == Resolution::R1m {
continue;
}
let higher_order_candles =
backfill_batch_higher_order_candles(&pool, &market.name, resolution).await?;
let client = pool.get().await?;
save_candles(higher_order_candles, client).await?;
}
}
Ok(())
}
async fn save_candles(candles: Vec<Candle>, client: Object) -> anyhow::Result<()> {
if !candles.is_empty() {
let upsert_statement = build_candles_upsert_statement(candles);
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()?;
}
Ok(())
}

View File

@ -1,33 +1,37 @@
use std::{collections::HashMap, str::FromStr, env};
use anchor_lang::prelude::Pubkey; use anchor_lang::prelude::Pubkey;
use chrono::{NaiveDateTime, DateTime, Utc, Duration}; use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use futures::future::join_all; use futures::future::join_all;
use openbook_candles::{structs::{openbook::OpenBookFillEventLog, markets::{load_markets, fetch_market_infos}}, worker::trade_fetching::{scrape::scrape_transactions, parsing::parse_trades_from_openbook_txns}, database::{initialize::connect_to_database, insert::persist_fill_events}, utils::Config}; use openbook_candles::{
use solana_client::{rpc_config::RpcTransactionConfig, nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, rpc_response::RpcConfirmedTransactionStatusWithSignature}; database::{initialize::connect_to_database, insert::persist_fill_events},
structs::{
markets::{fetch_market_infos, load_markets},
openbook::OpenBookFillEvent,
},
utils::Config,
worker::trade_fetching::parsing::parse_trades_from_openbook_txns,
};
use solana_client::{
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
rpc_config::RpcTransactionConfig, rpc_response::RpcConfirmedTransactionStatusWithSignature,
};
use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature}; use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature};
use solana_transaction_status::UiTransactionEncoding; use solana_transaction_status::UiTransactionEncoding;
use tokio::sync::mpsc::{Sender, self}; use std::{collections::HashMap, env, str::FromStr};
use tokio::sync::mpsc::{self, Sender};
#[tokio::main] #[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok(); dotenv::dotenv().ok();
let args: Vec<String> = env::args().collect(); let args: Vec<String> = env::args().collect();
assert!(args.len() == 2); assert!(args.len() == 2);
let path_to_markets_json = &args[1]; let path_to_markets_json = &args[1];
let rpc_url: String = dotenv::var("RPC_URL").unwrap(); let rpc_url: String = dotenv::var("RPC_URL").unwrap();
let database_url: String = dotenv::var("DATABASE_URL").unwrap();
let max_pg_pool_connections: u32 = dotenv::var("MAX_PG_POOL_CONNS_WORKER")
.unwrap()
.parse::<u32>()
.unwrap();
let config = Config { let config = Config {
rpc_url: rpc_url.clone(), rpc_url: rpc_url.clone(),
database_url,
max_pg_pool_connections,
}; };
let markets = load_markets(&path_to_markets_json); let markets = load_markets(path_to_markets_json);
let market_infos = fetch_market_infos(&config, markets.clone()).await?; let market_infos = fetch_market_infos(&config, markets.clone()).await?;
let mut target_markets = HashMap::new(); let mut target_markets = HashMap::new();
for m in market_infos.clone() { for m in market_infos.clone() {
@ -35,10 +39,10 @@ async fn main() -> anyhow::Result<()> {
} }
println!("{:?}", target_markets); println!("{:?}", target_markets);
let pool = connect_to_database(&config).await?; let pool = connect_to_database().await?;
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEventLog>(1000); let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(1000);
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
persist_fill_events(&pool, &mut fill_receiver) persist_fill_events(&pool, &mut fill_receiver)
.await .await
@ -52,10 +56,9 @@ async fn main() -> anyhow::Result<()> {
pub async fn backfill( pub async fn backfill(
rpc_url: String, rpc_url: String,
fill_sender: &Sender<OpenBookFillEventLog>, fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>, target_markets: &HashMap<Pubkey, u8>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
println!("backfill started"); println!("backfill started");
let mut before_sig: Option<Signature> = None; let mut before_sig: Option<Signature> = None;
let mut now_time = Utc::now().timestamp(); let mut now_time = Utc::now().timestamp();
@ -64,14 +67,14 @@ pub async fn backfill(
let mut handles = vec![]; let mut handles = vec![];
while now_time > end_time { while now_time > end_time {
let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed()); let rpc_client =
RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
let maybe_r = get_signatures(&rpc_client, before_sig).await; let maybe_r = get_signatures(&rpc_client, before_sig).await;
match maybe_r { match maybe_r {
Some((last, time, sigs)) => { Some((last, time, sigs)) => {
now_time = time; now_time = time;
before_sig = Some(last); before_sig = Some(last);
let time_left = backfill_time_left(now_time, end_time); let time_left = backfill_time_left(now_time, end_time);
println!( println!(
"{} minutes ~ {} days remaining in the backfill\n", "{} minutes ~ {} days remaining in the backfill\n",
@ -85,10 +88,10 @@ pub async fn backfill(
get_transactions(&rpc_client, sigs, &cloned_sender, &cloned_markets).await; get_transactions(&rpc_client, sigs, &cloned_sender, &cloned_markets).await;
}); });
handles.push(handle); handles.push(handle);
}, }
None => {}, None => {}
} }
}; }
futures::future::join_all(handles).await; futures::future::join_all(handles).await;
@ -96,44 +99,52 @@ pub async fn backfill(
Ok(()) Ok(())
} }
pub async fn get_signatures(
rpc_client: &RpcClient,
before_sig: Option<Signature>,
) -> Option<(
Signature,
i64,
Vec<RpcConfirmedTransactionStatusWithSignature>,
)> {
let rpc_config = GetConfirmedSignaturesForAddress2Config {
before: before_sig,
until: None,
limit: None,
commitment: Some(CommitmentConfig::confirmed()),
};
pub async fn get_signatures(rpc_client: &RpcClient, let sigs = match rpc_client
before_sig: Option<Signature>) -> Option<(Signature, i64, Vec<RpcConfirmedTransactionStatusWithSignature>)> { .get_signatures_for_address_with_config(
let rpc_config = GetConfirmedSignaturesForAddress2Config { &Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(),
before: before_sig, rpc_config,
until: None, )
limit: None, .await
commitment: Some(CommitmentConfig::confirmed()), {
}; Ok(s) => s,
Err(e) => {
let sigs = match rpc_client println!("Error in get_signatures_for_address_with_config: {}", e);
.get_signatures_for_address_with_config(
&Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(),
rpc_config,
)
.await
{
Ok(s) => s,
Err(e) => {
println!("Error in get_signatures_for_address_with_config: {}", e);
return None;
}
};
if sigs.len() == 0 {
println!("No signatures found");
return None; return None;
} }
let last = sigs.last().unwrap(); };
return Some((Signature::from_str(&last.signature).unwrap(), last.block_time.unwrap(), sigs));
if sigs.is_empty() {
println!("No signatures found");
return None;
} }
let last = sigs.last().unwrap();
// println!("{:?}", last.block_time.unwrap());
Some((
Signature::from_str(&last.signature).unwrap(),
last.block_time.unwrap(),
sigs,
))
}
pub async fn get_transactions( pub async fn get_transactions(
rpc_client: &RpcClient, rpc_client: &RpcClient,
mut sigs: Vec<RpcConfirmedTransactionStatusWithSignature>, mut sigs: Vec<RpcConfirmedTransactionStatusWithSignature>,
fill_sender: &Sender<OpenBookFillEventLog>, fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>, target_markets: &HashMap<Pubkey, u8>,
) { ) {
sigs.retain(|sig| sig.err.is_none()); sigs.retain(|sig| sig.err.is_none());
@ -154,14 +165,15 @@ pub async fn get_transactions(
let txn_futs: Vec<_> = signatures let txn_futs: Vec<_> = signatures
.iter() .iter()
.map(|s| rpc_client.get_transaction_with_config(&s, txn_config)) .map(|s| rpc_client.get_transaction_with_config(s, txn_config))
.collect(); .collect();
let mut txns = join_all(txn_futs).await; let mut txns = join_all(txn_futs).await;
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets); let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
if fills.len() > 0 { if !fills.is_empty() {
for fill in fills.into_iter() { for fill in fills.into_iter() {
// println!("Sending fill {:?}", fill);
if let Err(_) = fill_sender.send(fill).await { if let Err(_) = fill_sender.send(fill).await {
panic!("receiver dropped"); panic!("receiver dropped");
} }
@ -175,4 +187,4 @@ fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration {
let cur_date = DateTime::<Utc>::from_utc(naive_cur, Utc); let cur_date = DateTime::<Utc>::from_utc(naive_cur, Utc);
let bf_date = DateTime::<Utc>::from_utc(naive_bf, Utc); let bf_date = DateTime::<Utc>::from_utc(naive_bf, Utc);
cur_date - bf_date cur_date - bf_date
} }

View File

@ -1,50 +1,54 @@
use chrono::{DateTime, Utc}; use crate::structs::{
use sqlx::{pool::PoolConnection, Postgres}; candle::Candle,
coingecko::{PgCoinGecko24HighLow, PgCoinGecko24HourVolume},
use crate::{ openbook::PgOpenBookFill,
structs::{ resolution::Resolution,
candle::Candle, trader::PgTrader,
coingecko::{PgCoinGecko24HighLow, PgCoinGecko24HourVolume},
openbook::PgOpenBookFill,
resolution::Resolution,
trader::PgTrader,
},
utils::AnyhowWrap,
}; };
use chrono::{DateTime, Utc};
use deadpool_postgres::{GenericClient, Pool};
pub async fn fetch_earliest_fill( pub async fn fetch_earliest_fill(
conn: &mut PoolConnection<Postgres>, pool: &Pool,
market_address_string: &str, market_address_string: &str,
) -> anyhow::Result<Option<PgOpenBookFill>> { ) -> anyhow::Result<Option<PgOpenBookFill>> {
sqlx::query_as!( let client = pool.get().await?;
PgOpenBookFill,
r#"SELECT let stmt = client
time as "time!", .prepare(
bid as "bid!", r#"SELECT
maker as "maker!", time as "time!",
native_qty_paid as "native_qty_paid!", bid as "bid!",
native_qty_received as "native_qty_received!", maker as "maker!",
native_fee_or_rebate as "native_fee_or_rebate!" native_qty_paid as "native_qty_paid!",
from fills native_qty_received as "native_qty_received!",
where market = $1 native_fee_or_rebate as "native_fee_or_rebate!"
and maker = true from fills
ORDER BY time asc LIMIT 1"#, where market = $1
market_address_string and maker = true
) ORDER BY time asc LIMIT 1"#,
.fetch_optional(conn) )
.await .await?;
.map_err_anyhow()
let row = client.query_opt(&stmt, &[&market_address_string]).await?;
match row {
Some(r) => Ok(Some(PgOpenBookFill::from_row(r))),
None => Ok(None),
}
} }
pub async fn fetch_fills_from( pub async fn fetch_fills_from(
conn: &mut PoolConnection<Postgres>, pool: &Pool,
market_address_string: &str, market_address_string: &str,
start_time: DateTime<Utc>, start_time: DateTime<Utc>,
end_time: DateTime<Utc>, end_time: DateTime<Utc>,
) -> anyhow::Result<Vec<PgOpenBookFill>> { ) -> anyhow::Result<Vec<PgOpenBookFill>> {
sqlx::query_as!( let client = pool.get().await?;
PgOpenBookFill,
r#"SELECT let stmt = client
.prepare(
r#"SELECT
time as "time!", time as "time!",
bid as "bid!", bid as "bid!",
maker as "maker!", maker as "maker!",
@ -53,31 +57,36 @@ pub async fn fetch_fills_from(
native_fee_or_rebate as "native_fee_or_rebate!" native_fee_or_rebate as "native_fee_or_rebate!"
from fills from fills
where market = $1 where market = $1
and time >= $2 and time >= $2::timestamptz
and time < $3 and time < $3::timestamptz
and maker = true and maker = true
ORDER BY time asc"#, ORDER BY time asc"#,
market_address_string, )
start_time, .await?;
end_time
) let rows = client
.fetch_all(conn) .query(&stmt, &[&market_address_string, &start_time, &end_time])
.await .await?;
.map_err_anyhow() Ok(rows
.into_iter()
.map(PgOpenBookFill::from_row)
.collect())
} }
pub async fn fetch_latest_finished_candle( pub async fn fetch_latest_finished_candle(
conn: &mut PoolConnection<Postgres>, pool: &Pool,
market_name: &str, market_name: &str,
resolution: Resolution, resolution: Resolution,
) -> anyhow::Result<Option<Candle>> { ) -> anyhow::Result<Option<Candle>> {
sqlx::query_as!( let client = pool.get().await?;
Candle,
r#"SELECT let stmt = client
.prepare(
r#"SELECT
market_name as "market_name!",
start_time as "start_time!", start_time as "start_time!",
end_time as "end_time!", end_time as "end_time!",
resolution as "resolution!", resolution as "resolution!",
market_name as "market_name!",
open as "open!", open as "open!",
close as "close!", close as "close!",
high as "high!", high as "high!",
@ -89,26 +98,35 @@ pub async fn fetch_latest_finished_candle(
and resolution = $2 and resolution = $2
and complete = true and complete = true
ORDER BY start_time desc LIMIT 1"#, ORDER BY start_time desc LIMIT 1"#,
market_name, )
resolution.to_string() .await?;
)
.fetch_optional(conn) let row = client
.await .query_opt(&stmt, &[&market_name, &resolution.to_string()])
.map_err_anyhow() .await?;
match row {
Some(r) => Ok(Some(Candle::from_row(r))),
None => Ok(None),
}
} }
/// Fetches all of the candles for the given market and resoultion, starting from the earliest.
/// Note that this function will fetch ALL candles.
pub async fn fetch_earliest_candles( pub async fn fetch_earliest_candles(
conn: &mut PoolConnection<Postgres>, pool: &Pool,
market_name: &str, market_name: &str,
resolution: Resolution, resolution: Resolution,
) -> anyhow::Result<Vec<Candle>> { ) -> anyhow::Result<Vec<Candle>> {
sqlx::query_as!( let client = pool.get().await?;
Candle,
r#"SELECT let stmt = client
.prepare(
r#"SELECT
market_name as "market_name!",
start_time as "start_time!", start_time as "start_time!",
end_time as "end_time!", end_time as "end_time!",
resolution as "resolution!", resolution as "resolution!",
market_name as "market_name!",
open as "open!", open as "open!",
close as "close!", close as "close!",
high as "high!", high as "high!",
@ -119,64 +137,32 @@ pub async fn fetch_earliest_candles(
where market_name = $1 where market_name = $1
and resolution = $2 and resolution = $2
ORDER BY start_time asc"#, ORDER BY start_time asc"#,
market_name, )
resolution.to_string() .await?;
)
.fetch_all(conn) let rows = client
.await .query(&stmt, &[&market_name, &resolution.to_string()])
.map_err_anyhow() .await?;
Ok(rows.into_iter().map(Candle::from_row).collect())
} }
pub async fn fetch_candles_from( pub async fn fetch_candles_from(
conn: &mut PoolConnection<Postgres>, pool: &Pool,
market_name: &str, market_name: &str,
resolution: Resolution, resolution: Resolution,
start_time: DateTime<Utc>, start_time: DateTime<Utc>,
end_time: DateTime<Utc>, end_time: DateTime<Utc>,
) -> anyhow::Result<Vec<Candle>> { ) -> anyhow::Result<Vec<Candle>> {
sqlx::query_as!( let client = pool.get().await?;
Candle,
r#"SELECT
start_time as "start_time!",
end_time as "end_time!",
resolution as "resolution!",
market_name as "market_name!",
open as "open!",
close as "close!",
high as "high!",
low as "low!",
volume as "volume!",
complete as "complete!"
from candles
where market_name = $1
and resolution = $2
and start_time >= $3
and end_time <= $4
ORDER BY start_time asc"#,
market_name,
resolution.to_string(),
start_time,
end_time
)
.fetch_all(conn)
.await
.map_err_anyhow()
}
pub async fn fetch_tradingview_candles( let stmt = client
conn: &mut PoolConnection<Postgres>, .prepare(
market_name: &str, r#"SELECT
resolution: Resolution, market_name as "market_name!",
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
) -> anyhow::Result<Vec<Candle>> {
sqlx::query_as!(
Candle,
r#"SELECT
start_time as "start_time!", start_time as "start_time!",
end_time as "end_time!", end_time as "end_time!",
resolution as "resolution!", resolution as "resolution!",
market_name as "market_name!",
open as "open!", open as "open!",
close as "close!", close as "close!",
high as "high!", high as "high!",
@ -189,112 +175,135 @@ pub async fn fetch_tradingview_candles(
and start_time >= $3 and start_time >= $3
and end_time <= $4 and end_time <= $4
ORDER BY start_time asc"#, ORDER BY start_time asc"#,
market_name, )
resolution.to_string(), .await?;
start_time,
end_time let rows = client
) .query(
.fetch_all(conn) &stmt,
.await &[
.map_err_anyhow() &market_name,
&resolution.to_string(),
&start_time,
&end_time,
],
)
.await?;
Ok(rows.into_iter().map(Candle::from_row).collect())
} }
pub async fn fetch_top_traders_by_base_volume_from( pub async fn fetch_top_traders_by_base_volume_from(
conn: &mut PoolConnection<Postgres>, pool: &Pool,
market_address_string: &str, market_address_string: &str,
start_time: DateTime<Utc>, start_time: DateTime<Utc>,
end_time: DateTime<Utc>, end_time: DateTime<Utc>,
) -> anyhow::Result<Vec<PgTrader>> { ) -> anyhow::Result<Vec<PgTrader>> {
sqlx::query_as!( let client = pool.get().await?;
PgTrader,
r#"SELECT let stmt = client
open_orders_owner, .prepare(
sum( r#"SELECT
native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END open_orders_owner,
) as "raw_ask_size!", sum(
sum( native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END ) as "raw_ask_size!",
) as "raw_bid_size!" sum(
FROM fills native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
WHERE market = $1 ) as "raw_bid_size!"
AND time >= $2 FROM fills
AND time < $3 WHERE market = $1
GROUP BY open_orders_owner AND time >= $2
ORDER BY AND time < $3
sum(native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END) GROUP BY open_orders_owner
+ ORDER BY
sum(native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END) sum(native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
DESC +
LIMIT 10000"#, sum(native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
market_address_string, DESC
start_time, LIMIT 10000"#,
end_time )
) .await?;
.fetch_all(conn)
.await let rows = client
.map_err_anyhow() .query(&stmt, &[&market_address_string, &start_time, &end_time])
.await?;
Ok(rows.into_iter().map(PgTrader::from_row).collect())
} }
pub async fn fetch_top_traders_by_quote_volume_from( pub async fn fetch_top_traders_by_quote_volume_from(
conn: &mut PoolConnection<Postgres>, pool: &Pool,
market_address_string: &str, market_address_string: &str,
start_time: DateTime<Utc>, start_time: DateTime<Utc>,
end_time: DateTime<Utc>, end_time: DateTime<Utc>,
) -> anyhow::Result<Vec<PgTrader>> { ) -> anyhow::Result<Vec<PgTrader>> {
sqlx::query_as!( let client = pool.get().await?;
PgTrader,
r#"SELECT let stmt = client
open_orders_owner, .prepare(
sum( r#"SELECT
native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END open_orders_owner,
) as "raw_ask_size!", sum(
sum( native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END ) as "raw_ask_size!",
) as "raw_bid_size!" sum(
FROM fills native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
WHERE market = $1 ) as "raw_bid_size!"
AND time >= $2 FROM fills
AND time < $3 WHERE market = $1
GROUP BY open_orders_owner AND time >= $2
ORDER BY AND time < $3
sum(native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END) GROUP BY open_orders_owner
+ ORDER BY
sum(native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END) sum(native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
DESC +
LIMIT 10000"#, sum(native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
market_address_string, DESC
start_time, LIMIT 10000"#,
end_time )
) .await?;
.fetch_all(conn)
.await let rows = client
.map_err_anyhow() .query(&stmt, &[&market_address_string, &start_time, &end_time])
.await?;
Ok(rows.into_iter().map(PgTrader::from_row).collect())
} }
pub async fn fetch_coingecko_24h_volume( pub async fn fetch_coingecko_24h_volume(
conn: &mut PoolConnection<Postgres>, pool: &Pool,
) -> anyhow::Result<Vec<PgCoinGecko24HourVolume>> { ) -> anyhow::Result<Vec<PgCoinGecko24HourVolume>> {
sqlx::query_as!( let client = pool.get().await?;
PgCoinGecko24HourVolume,
r#"select market as "address!", let stmt = client
sum(native_qty_paid) as "raw_quote_size!", .prepare(
sum(native_qty_received) as "raw_base_size!" r#"select market as "address!",
sum(native_qty_received) as "raw_base_size!",
sum(native_qty_paid) as "raw_quote_size!"
from fills from fills
where "time" >= current_timestamp - interval '1 day' where "time" >= current_timestamp - interval '1 day'
and bid = true and bid = true
group by market"# group by market"#,
) )
.fetch_all(conn) .await?;
.await
.map_err_anyhow() let rows = client.query(&stmt, &[]).await?;
Ok(rows
.into_iter()
.map(PgCoinGecko24HourVolume::from_row)
.collect())
} }
pub async fn fetch_coingecko_24h_high_low( pub async fn fetch_coingecko_24h_high_low(
conn: &mut PoolConnection<Postgres>, pool: &Pool,
) -> anyhow::Result<Vec<PgCoinGecko24HighLow>> { ) -> anyhow::Result<Vec<PgCoinGecko24HighLow>> {
sqlx::query_as!( let client = pool.get().await?;
PgCoinGecko24HighLow,
r#"select let stmt = client
.prepare(
r#"select
g.market_name as "market_name!", g.market_name as "market_name!",
g.high as "high!", g.high as "high!",
g.low as "low!", g.low as "low!",
@ -317,9 +326,14 @@ pub async fn fetch_coingecko_24h_high_low(
join candles c on g.market_name = c.market_name join candles c on g.market_name = c.market_name
and g.start_time = c.start_time and g.start_time = c.start_time
where where
c.resolution = '1M'"# c.resolution = '1M'"#,
) )
.fetch_all(conn) .await?;
.await
.map_err_anyhow() let rows = client.query(&stmt, &[]).await?;
Ok(rows
.into_iter()
.map(PgCoinGecko24HighLow::from_row)
.collect())
} }

View File

@ -1,24 +1,70 @@
use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; use std::{fs, time::Duration};
use std::time::Duration;
use crate::utils::{AnyhowWrap, Config}; use deadpool_postgres::{
ManagerConfig, Pool, PoolConfig, RecyclingMethod, Runtime, SslMode, Timeouts,
};
use native_tls::{Certificate, Identity, TlsConnector};
use postgres_native_tls::MakeTlsConnector;
pub async fn connect_to_database(config: &Config) -> anyhow::Result<Pool<Postgres>> { use crate::utils::PgConfig;
loop {
let pool = PgPoolOptions::new() pub async fn connect_to_database() -> anyhow::Result<Pool> {
.max_connections(config.max_pg_pool_connections) let mut pg_config = PgConfig::from_env()?;
.connect(&config.database_url)
.await; pg_config.pg.manager = Some(ManagerConfig {
if pool.is_ok() { recycling_method: RecyclingMethod::Fast,
println!("Database connected"); });
return pool.map_err_anyhow(); pg_config.pg.pool = Some(PoolConfig {
max_size: pg_config.pg_max_pool_connections,
timeouts: Timeouts::default(),
});
// openssl pkcs12 -export -in client.cer -inkey client-key.cer -out client.pks
// base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64
// fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a mango-fills
// fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills
let tls = if pg_config.pg_use_ssl {
pg_config.pg.ssl_mode = Some(SslMode::Require);
let ca_cert = fs::read(pg_config.pg_ca_cert_path.expect("reading ca cert from env"))
.expect("reading ca cert from file");
let client_key = fs::read(
pg_config
.pg_client_key_path
.expect("reading client key from env"),
)
.expect("reading client key from file");
MakeTlsConnector::new(
TlsConnector::builder()
.add_root_certificate(Certificate::from_pem(&ca_cert)?)
.identity(Identity::from_pkcs12(&client_key, "pass")?)
.danger_accept_invalid_certs(false)
.build()?,
)
} else {
MakeTlsConnector::new(
TlsConnector::builder()
.danger_accept_invalid_certs(true)
.build()
.unwrap(),
)
};
let pool = pg_config
.pg
.create_pool(Some(Runtime::Tokio1), tls)
.unwrap();
match pool.get().await {
Ok(_) => println!("Database connected"),
Err(e) => {
println!("Failed to connect to database: {}, retrying", e.to_string());
tokio::time::sleep(Duration::from_millis(500)).await;
} }
println!("Failed to connect to database, retrying");
tokio::time::sleep(Duration::from_millis(500)).await;
} }
Ok(pool)
} }
pub async fn setup_database(pool: &Pool<Postgres>) -> anyhow::Result<()> { pub async fn setup_database(pool: &Pool) -> anyhow::Result<()> {
let candles_table_fut = create_candles_table(pool); let candles_table_fut = create_candles_table(pool);
let fills_table_fut = create_fills_table(pool); let fills_table_fut = create_fills_table(pool);
let result = tokio::try_join!(candles_table_fut, fills_table_fut); let result = tokio::try_join!(candles_table_fut, fills_table_fut);
@ -34,50 +80,51 @@ pub async fn setup_database(pool: &Pool<Postgres>) -> anyhow::Result<()> {
} }
} }
pub async fn create_candles_table(pool: &Pool<Postgres>) -> anyhow::Result<()> { pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> {
let mut tx = pool.begin().await.map_err_anyhow()?; let client = pool.get().await?;
sqlx::query!( client
"CREATE TABLE IF NOT EXISTS candles ( .execute(
"CREATE TABLE IF NOT EXISTS candles (
id serial, id serial,
market_name text, market_name text,
start_time timestamptz, start_time timestamptz,
end_time timestamptz, end_time timestamptz,
resolution text, resolution text,
open numeric, open double precision,
close numeric, close double precision,
high numeric, high double precision,
low numeric, low double precision,
volume numeric, volume double precision,
complete bool complete bool
)", )",
) &[],
.execute(&mut tx) )
.await?; .await?;
sqlx::query!( client.execute(
"CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market_name, start_time, resolution)" "CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market_name, start_time, resolution)",
).execute(&mut tx).await?; &[]
).await?;
sqlx::query!( client.execute(
"DO $$ "DO $$
BEGIN BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'unique_candles') THEN IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'unique_candles') THEN
ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market_name, start_time, resolution); ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market_name, start_time, resolution);
END IF; END IF;
END $$" END $$", &[]
) ).await?;
.execute(&mut tx)
.await?;
tx.commit().await.map_err_anyhow() Ok(())
} }
pub async fn create_fills_table(pool: &Pool<Postgres>) -> anyhow::Result<()> { pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> {
let mut tx = pool.begin().await.map_err_anyhow()?; let client = pool.get().await?;
sqlx::query!( client
"CREATE TABLE IF NOT EXISTS fills ( .execute(
"CREATE TABLE IF NOT EXISTS fills (
id numeric PRIMARY KEY, id numeric PRIMARY KEY,
time timestamptz not null, time timestamptz not null,
market text not null, market text not null,
@ -85,23 +132,28 @@ pub async fn create_fills_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
open_orders_owner text not null, open_orders_owner text not null,
bid bool not null, bid bool not null,
maker bool not null, maker bool not null,
native_qty_paid numeric not null, native_qty_paid double precision not null,
native_qty_received numeric not null, native_qty_received double precision not null,
native_fee_or_rebate numeric not null, native_fee_or_rebate double precision not null,
fee_tier text not null, fee_tier text not null,
order_id text not null order_id text not null
)", )",
) &[],
.execute(&mut tx) )
.await?;
sqlx::query!("CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)")
.execute(&mut tx)
.await?; .await?;
sqlx::query!("CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)") client
.execute(&mut tx) .execute(
"CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)",
&[],
)
.await?; .await?;
tx.commit().await.map_err_anyhow() client
.execute(
"CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)",
&[],
)
.await?;
Ok(())
} }

View File

@ -1,5 +1,4 @@
use chrono::Utc; use deadpool_postgres::Pool;
use sqlx::{Connection, Pool, Postgres};
use std::{ use std::{
collections::{hash_map::DefaultHasher, HashMap}, collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher}, hash::{Hash, Hasher},
@ -7,26 +6,24 @@ use std::{
use tokio::sync::mpsc::{error::TryRecvError, Receiver}; use tokio::sync::mpsc::{error::TryRecvError, Receiver};
use crate::{ use crate::{
structs::{candle::Candle, openbook::OpenBookFillEventLog}, structs::{candle::Candle, openbook::OpenBookFillEvent},
utils::AnyhowWrap, utils::{to_timestampz, AnyhowWrap},
}; };
pub async fn persist_fill_events( pub async fn persist_fill_events(
pool: &Pool<Postgres>, pool: &Pool,
fill_receiver: &mut Receiver<OpenBookFillEventLog>, fill_receiver: &mut Receiver<OpenBookFillEvent>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut conn = pool.acquire().await.unwrap(); let client = pool.get().await?;
loop { loop {
let mut write_batch = HashMap::new(); let mut write_batch = HashMap::new();
while write_batch.len() < 10 { while write_batch.len() < 10 {
match fill_receiver.try_recv() { match fill_receiver.try_recv() {
Ok(event) => { Ok(event) => {
if !write_batch.contains_key(&event) { write_batch.entry(event).or_insert(0);
write_batch.insert(event, 0);
}
} }
Err(TryRecvError::Empty) => { Err(TryRecvError::Empty) => {
if write_batch.len() > 0 { if !write_batch.is_empty() {
break; break;
} else { } else {
continue; continue;
@ -38,65 +35,64 @@ pub async fn persist_fill_events(
}; };
} }
if write_batch.len() > 0 { if !write_batch.is_empty() {
// print!("writing: {:?} events to DB\n", write_batch.len()); // print!("writing: {:?} events to DB\n", write_batch.len());
match conn.ping().await { // match conn.ping().await {
Ok(_) => { // Ok(_) => {
let upsert_statement = build_fills_upsert_statement(write_batch); let upsert_statement = build_fills_upsert_statement(write_batch);
sqlx::query(&upsert_statement) client
.execute(&mut conn) .execute(&upsert_statement, &[])
.await .await
.map_err_anyhow() .map_err_anyhow()
.unwrap(); .unwrap();
} // }
Err(_) => { // Err(_) => {
println!("Fills ping failed"); // println!("Fills ping failed");
break; // break;
} // }
} // }
} }
} }
Ok(())
} }
pub async fn persist_candles( pub async fn persist_candles(
pool: Pool<Postgres>, pool: Pool,
candles_receiver: &mut Receiver<Vec<Candle>>, candles_receiver: &mut Receiver<Vec<Candle>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut conn = pool.acquire().await.unwrap(); let client = pool.get().await.unwrap();
loop { loop {
match conn.ping().await { // match client.ping().await {
Ok(_) => { // Ok(_) => {
match candles_receiver.try_recv() { match candles_receiver.try_recv() {
Ok(candles) => { Ok(candles) => {
if candles.len() == 0 { if candles.is_empty() {
continue; continue;
} }
// print!("writing: {:?} candles to DB\n", candles.len()); // print!("writing: {:?} candles to DB\n", candles.len());
let upsert_statement = build_candes_upsert_statement(candles); let upsert_statement = build_candles_upsert_statement(candles);
sqlx::query(&upsert_statement) client
.execute(&mut conn) .execute(&upsert_statement, &[])
.await .await
.map_err_anyhow() .map_err_anyhow()
.unwrap(); .unwrap();
}
Err(TryRecvError::Empty) => continue,
Err(TryRecvError::Disconnected) => {
panic!("Candles sender must stay alive")
}
};
} }
Err(_) => { Err(TryRecvError::Empty) => continue,
println!("Candle ping failed"); Err(TryRecvError::Disconnected) => {
break; panic!("Candles sender must stay alive")
} }
}; };
// }
// Err(_) => {
// println!("Candle ping failed");
// break;
// }
// };
} }
Ok(())
} }
fn build_fills_upsert_statement(events: HashMap<OpenBookFillEventLog, u8>) -> String { #[allow(deprecated)]
fn build_fills_upsert_statement(events: HashMap<OpenBookFillEvent, u8>) -> String {
let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES"); let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES");
for (idx, event) in events.keys().enumerate() { for (idx, event) in events.keys().enumerate() {
let mut hasher = DefaultHasher::new(); let mut hasher = DefaultHasher::new();
@ -104,7 +100,7 @@ fn build_fills_upsert_statement(events: HashMap<OpenBookFillEventLog, u8>) -> St
let val_str = format!( let val_str = format!(
"({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {})", "({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {})",
hasher.finish(), hasher.finish(),
Utc::now().to_rfc3339(), to_timestampz(event.block_time as u64).to_rfc3339(),
event.market, event.market,
event.open_orders, event.open_orders,
event.open_orders_owner, event.open_orders_owner,
@ -130,7 +126,7 @@ fn build_fills_upsert_statement(events: HashMap<OpenBookFillEventLog, u8>) -> St
stmt stmt
} }
fn build_candes_upsert_statement(candles: Vec<Candle>) -> String { pub fn build_candles_upsert_statement(candles: Vec<Candle>) -> String {
let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES"); let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES");
for (idx, candle) in candles.iter().enumerate() { for (idx, candle) in candles.iter().enumerate() {
let val_str = format!( let val_str = format!(
@ -176,7 +172,7 @@ mod tests {
#[test] #[test]
fn test_event_hashing() { fn test_event_hashing() {
let event_1 = OpenBookFillEventLog { let event_1 = OpenBookFillEvent {
market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(), market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(),
open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(), open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(),
open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw") open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw")
@ -191,9 +187,10 @@ mod tests {
fee_tier: 0, fee_tier: 0,
client_order_id: None, client_order_id: None,
referrer_rebate: Some(841), referrer_rebate: Some(841),
block_time: 0,
}; };
let event_2 = OpenBookFillEventLog { let event_2 = OpenBookFillEvent {
market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(), market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(),
open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(), open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(),
open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw") open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw")
@ -208,6 +205,7 @@ mod tests {
fee_tier: 0, fee_tier: 0,
client_order_id: None, client_order_id: None,
referrer_rebate: Some(841), referrer_rebate: Some(841),
block_time: 0,
}; };
let mut h1 = DefaultHasher::new(); let mut h1 = DefaultHasher::new();

View File

@ -1,5 +1,5 @@
use openbook_candles::{ use openbook_candles::{
database::fetch::fetch_tradingview_candles, database::fetch::fetch_candles_from,
structs::{markets::valid_market, resolution::Resolution, tradingview::TvResponse}, structs::{markets::valid_market, resolution::Resolution, tradingview::TvResponse},
utils::{to_timestampz, WebContext}, utils::{to_timestampz, WebContext},
}; };
@ -34,9 +34,8 @@ pub async fn get_candles(
let from = to_timestampz(info.from); let from = to_timestampz(info.from);
let to = to_timestampz(info.to); let to = to_timestampz(info.to);
let mut conn = context.pool.acquire().await.unwrap();
let candles = let candles =
match fetch_tradingview_candles(&mut conn, &info.market_name, resolution, from, to).await { match fetch_candles_from(&context.pool, &info.market_name, resolution, from, to).await {
Ok(c) => c, Ok(c) => c,
Err(_) => return Err(ServerError::DbQueryError), Err(_) => return Err(ServerError::DbQueryError),
}; };

View File

@ -10,7 +10,7 @@ use openbook_candles::{
CoinGecko24HourVolume, CoinGeckoOrderBook, CoinGeckoPair, CoinGeckoTicker, CoinGecko24HourVolume, CoinGeckoOrderBook, CoinGeckoPair, CoinGeckoTicker,
PgCoinGecko24HighLow, PgCoinGecko24HighLow,
}, },
slab::{get_best_bids_and_asks, get_orderbooks_with_depth}, slab::get_orderbooks_with_depth,
}, },
utils::WebContext, utils::WebContext,
}; };
@ -49,17 +49,14 @@ pub async fn pairs(context: web::Data<WebContext>) -> Result<HttpResponse, Serve
#[get("/tickers")] #[get("/tickers")]
pub async fn tickers(context: web::Data<WebContext>) -> Result<HttpResponse, ServerError> { pub async fn tickers(context: web::Data<WebContext>) -> Result<HttpResponse, ServerError> {
let client = RpcClient::new(context.rpc_url.clone()); // let client = RpcClient::new(context.rpc_url.clone());
let markets = &context.markets; let markets = &context.markets;
let mut c1 = context.pool.acquire().await.unwrap();
let mut c2 = context.pool.acquire().await.unwrap();
// let bba_fut = get_best_bids_and_asks(client, markets); // let bba_fut = get_best_bids_and_asks(client, markets);
let volume_fut = fetch_coingecko_24h_volume(&mut c1); let volume_fut = fetch_coingecko_24h_volume(&context.pool);
let high_low_fut = fetch_coingecko_24h_high_low(&mut c2); let high_low_fut = fetch_coingecko_24h_high_low(&context.pool);
let (volume_query, high_low_quey) = let (volume_query, high_low_quey) = join!(volume_fut, high_low_fut,);
join!(volume_fut, high_low_fut,);
let raw_volumes = match volume_query { let raw_volumes = match volume_query {
Ok(c) => c, Ok(c) => c,
@ -74,7 +71,7 @@ pub async fn tickers(context: web::Data<WebContext>) -> Result<HttpResponse, Ser
let default_volume = CoinGecko24HourVolume::default(); let default_volume = CoinGecko24HourVolume::default();
let volumes: Vec<CoinGecko24HourVolume> = raw_volumes let volumes: Vec<CoinGecko24HourVolume> = raw_volumes
.into_iter() .into_iter()
.map(|v| v.convert_to_readable(&markets)) .map(|v| v.convert_to_readable(markets))
.collect(); .collect();
let tickers = markets let tickers = markets
.iter() .iter()
@ -105,7 +102,7 @@ pub async fn tickers(context: web::Data<WebContext>) -> Result<HttpResponse, Ser
Ok(HttpResponse::Ok().json(tickers)) Ok(HttpResponse::Ok().json(tickers))
} }
#[get("/orderbook")] #[get("/orderbook")] // TODO: implement an optional geyser version
pub async fn orderbook( pub async fn orderbook(
info: web::Query<OrderBookParams>, info: web::Query<OrderBookParams>,
context: web::Data<WebContext>, context: web::Data<WebContext>,

View File

@ -4,7 +4,7 @@ use actix_web::{
App, HttpServer, App, HttpServer,
}; };
use candles::get_candles; use candles::get_candles;
use dotenv;
use markets::get_markets; use markets::get_markets;
use openbook_candles::{ use openbook_candles::{
database::initialize::connect_to_database, database::initialize::connect_to_database,
@ -29,21 +29,15 @@ async fn main() -> std::io::Result<()> {
assert!(args.len() == 2); assert!(args.len() == 2);
let path_to_markets_json = &args[1]; let path_to_markets_json = &args[1];
let rpc_url: String = dotenv::var("RPC_URL").unwrap(); let rpc_url: String = dotenv::var("RPC_URL").unwrap();
let database_url: String = dotenv::var("DATABASE_URL").unwrap(); let bind_addr: String = dotenv::var("SERVER_BIND_ADDR").expect("reading bind addr from env");
let max_pg_pool_connections: u32 = dotenv::var("MAX_PG_POOL_CONNS_SERVER")
.unwrap()
.parse::<u32>()
.unwrap();
let config = Config { let config = Config {
rpc_url: rpc_url.clone(), rpc_url: rpc_url.clone(),
database_url: database_url.clone(),
max_pg_pool_connections,
}; };
let markets = load_markets(path_to_markets_json); let markets = load_markets(path_to_markets_json);
let market_infos = fetch_market_infos(&config, markets).await.unwrap(); let market_infos = fetch_market_infos(&config, markets).await.unwrap();
let pool = connect_to_database(&config).await.unwrap(); let pool = connect_to_database().await.unwrap();
let context = Data::new(WebContext { let context = Data::new(WebContext {
rpc_url, rpc_url,
@ -65,7 +59,7 @@ async fn main() -> std::io::Result<()> {
.service(coingecko::service()), .service(coingecko::service()),
) )
}) })
.bind(("127.0.0.1", 8080))? .bind(&bind_addr)?
.run() .run()
.await .await
} }

View File

@ -31,14 +31,17 @@ pub async fn get_top_traders_by_base_volume(
let from = to_timestampz(info.from); let from = to_timestampz(info.from);
let to = to_timestampz(info.to); let to = to_timestampz(info.to);
let mut conn = context.pool.acquire().await.unwrap(); let raw_traders = match fetch_top_traders_by_base_volume_from(
let raw_traders = &context.pool,
match fetch_top_traders_by_base_volume_from(&mut conn, &selected_market.address, from, to) &selected_market.address,
.await from,
{ to,
Ok(c) => c, )
Err(_) => return Err(ServerError::DbQueryError), .await
}; {
Ok(c) => c,
Err(_) => return Err(ServerError::DbQueryError),
};
let traders = raw_traders let traders = raw_traders
.into_iter() .into_iter()
@ -48,7 +51,7 @@ pub async fn get_top_traders_by_base_volume(
let response = TraderResponse { let response = TraderResponse {
start_time: info.from, start_time: info.from,
end_time: info.to, end_time: info.to,
traders: traders, traders,
volume_type: VolumeType::Base.to_string(), volume_type: VolumeType::Base.to_string(),
}; };
Ok(HttpResponse::Ok().json(response)) Ok(HttpResponse::Ok().json(response))
@ -67,14 +70,17 @@ pub async fn get_top_traders_by_quote_volume(
let from = to_timestampz(info.from); let from = to_timestampz(info.from);
let to = to_timestampz(info.to); let to = to_timestampz(info.to);
let mut conn = context.pool.acquire().await.unwrap(); let raw_traders = match fetch_top_traders_by_quote_volume_from(
let raw_traders = &context.pool,
match fetch_top_traders_by_quote_volume_from(&mut conn, &selected_market.address, from, to) &selected_market.address,
.await from,
{ to,
Ok(c) => c, )
Err(_) => return Err(ServerError::DbQueryError), .await
}; {
Ok(c) => c,
Err(_) => return Err(ServerError::DbQueryError),
};
let traders = raw_traders let traders = raw_traders
.into_iter() .into_iter()
@ -84,7 +90,7 @@ pub async fn get_top_traders_by_quote_volume(
let response = TraderResponse { let response = TraderResponse {
start_time: info.from, start_time: info.from,
end_time: info.to, end_time: info.to,
traders: traders, traders,
volume_type: VolumeType::Quote.to_string(), volume_type: VolumeType::Quote.to_string(),
}; };
Ok(HttpResponse::Ok().json(response)) Ok(HttpResponse::Ok().json(response))

View File

@ -1,6 +1,5 @@
use chrono::{DateTime, NaiveDateTime, Utc}; use chrono::{DateTime, NaiveDateTime, Utc};
use num_traits::Zero; use tokio_postgres::Row;
use sqlx::types::Decimal;
use super::resolution::Resolution; use super::resolution::Resolution;
@ -10,11 +9,11 @@ pub struct Candle {
pub start_time: DateTime<Utc>, pub start_time: DateTime<Utc>,
pub end_time: DateTime<Utc>, pub end_time: DateTime<Utc>,
pub resolution: String, pub resolution: String,
pub open: Decimal, pub open: f64,
pub close: Decimal, pub close: f64,
pub high: Decimal, pub high: f64,
pub low: Decimal, pub low: f64,
pub volume: Decimal, pub volume: f64,
pub complete: bool, pub complete: bool,
} }
@ -25,12 +24,27 @@ impl Candle {
start_time: DateTime::from_utc(NaiveDateTime::MIN, Utc), start_time: DateTime::from_utc(NaiveDateTime::MIN, Utc),
end_time: DateTime::from_utc(NaiveDateTime::MIN, Utc), end_time: DateTime::from_utc(NaiveDateTime::MIN, Utc),
resolution: resolution.to_string(), resolution: resolution.to_string(),
open: Decimal::zero(), open: 0.0,
close: Decimal::zero(), close: 0.0,
high: Decimal::zero(), high: 0.0,
low: Decimal::zero(), low: 0.0,
volume: Decimal::zero(), volume: 0.0,
complete: false, complete: false,
} }
} }
pub fn from_row(row: Row) -> Self {
Candle {
market_name: row.get(0),
start_time: row.get(1),
end_time: row.get(2),
resolution: row.get(3),
open: row.get(4),
close: row.get(5),
high: row.get(6),
low: row.get(7),
volume: row.get(8),
complete: row.get(9),
}
}
} }

View File

@ -1,5 +1,5 @@
use serde::Serialize; use serde::Serialize;
use sqlx::types::Decimal; use tokio_postgres::Row;
use super::{markets::MarketInfo, openbook::token_factor}; use super::{markets::MarketInfo, openbook::token_factor};
@ -35,8 +35,8 @@ pub struct CoinGeckoTicker {
pub struct PgCoinGecko24HourVolume { pub struct PgCoinGecko24HourVolume {
pub address: String, pub address: String,
pub raw_base_size: Decimal, pub raw_base_size: f64,
pub raw_quote_size: Decimal, pub raw_quote_size: f64,
} }
impl PgCoinGecko24HourVolume { impl PgCoinGecko24HourVolume {
pub fn convert_to_readable(&self, markets: &Vec<MarketInfo>) -> CoinGecko24HourVolume { pub fn convert_to_readable(&self, markets: &Vec<MarketInfo>) -> CoinGecko24HourVolume {
@ -49,19 +49,38 @@ impl PgCoinGecko24HourVolume {
target_volume, target_volume,
} }
} }
pub fn from_row(row: Row) -> Self {
PgCoinGecko24HourVolume {
address: row.get(0),
raw_base_size: row.get(1),
raw_quote_size: row.get(2),
}
}
} }
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct CoinGecko24HourVolume { pub struct CoinGecko24HourVolume {
pub market_name: String, pub market_name: String,
pub base_volume: Decimal, pub base_volume: f64,
pub target_volume: Decimal, pub target_volume: f64,
} }
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct PgCoinGecko24HighLow { pub struct PgCoinGecko24HighLow {
pub market_name: String, pub market_name: String,
pub high: Decimal, pub high: f64,
pub low: Decimal, pub low: f64,
pub close: Decimal, pub close: f64,
}
impl PgCoinGecko24HighLow {
pub fn from_row(row: Row) -> Self {
PgCoinGecko24HighLow {
market_name: row.get(0),
high: row.get(1),
low: row.get(2),
close: row.get(3),
}
}
} }

View File

@ -111,8 +111,8 @@ pub async fn fetch_market_infos(
.value; .value;
for i in 0..mint_results.len() { for i in 0..mint_results.len() {
let mut mint_account = mint_results[i].as_ref().unwrap().clone(); let mut mint_account = mint_results[i].as_ref().unwrap().clone();
let mut mint_bytes: &[u8] = &mut mint_account.data[..]; let mint_bytes: &[u8] = &mut mint_account.data[..];
let mint = Mint::unpack_from_slice(&mut mint_bytes).unwrap(); let mint = Mint::unpack_from_slice(mint_bytes).unwrap();
mint_key_map.insert(mint_keys[i], mint.decimals); mint_key_map.insert(mint_keys[i], mint.decimals);
} }

View File

@ -1,12 +1,12 @@
use anchor_lang::{event, AnchorDeserialize, AnchorSerialize}; use anchor_lang::{event, AnchorDeserialize, AnchorSerialize};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use num_traits::FromPrimitive; use num_traits::Pow;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use sqlx::types::Decimal; use tokio_postgres::Row;
#[event] #[event]
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OpenBookFillEventLog { pub struct OpenBookFillEventRaw {
pub market: Pubkey, pub market: Pubkey,
pub open_orders: Pubkey, pub open_orders: Pubkey,
pub open_orders_owner: Pubkey, pub open_orders_owner: Pubkey,
@ -21,15 +21,66 @@ pub struct OpenBookFillEventLog {
pub client_order_id: Option<u64>, pub client_order_id: Option<u64>,
pub referrer_rebate: Option<u64>, pub referrer_rebate: Option<u64>,
} }
impl OpenBookFillEventRaw {
pub fn with_time(self, block_time: i64) -> OpenBookFillEvent {
OpenBookFillEvent {
market: self.market,
open_orders: self.open_orders,
open_orders_owner: self.open_orders_owner,
bid: self.bid,
maker: self.maker,
native_qty_paid: self.native_qty_paid,
native_qty_received: self.native_qty_received,
native_fee_or_rebate: self.native_fee_or_rebate,
order_id: self.order_id,
owner_slot: self.owner_slot,
fee_tier: self.fee_tier,
client_order_id: self.client_order_id,
referrer_rebate: self.referrer_rebate,
block_time,
}
}
}
#[event]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OpenBookFillEvent {
pub market: Pubkey,
pub open_orders: Pubkey,
pub open_orders_owner: Pubkey,
pub bid: bool,
pub maker: bool,
pub native_qty_paid: u64,
pub native_qty_received: u64,
pub native_fee_or_rebate: u64,
pub order_id: u128,
pub owner_slot: u8,
pub fee_tier: u8,
pub client_order_id: Option<u64>,
pub referrer_rebate: Option<u64>,
pub block_time: i64,
}
#[derive(Copy, Clone, Debug, PartialEq)] #[derive(Copy, Clone, Debug, PartialEq)]
pub struct PgOpenBookFill { pub struct PgOpenBookFill {
pub time: DateTime<Utc>, pub time: DateTime<Utc>,
pub bid: bool, pub bid: bool,
pub maker: bool, pub maker: bool,
pub native_qty_paid: Decimal, pub native_qty_paid: f64,
pub native_qty_received: Decimal, pub native_qty_received: f64,
pub native_fee_or_rebate: Decimal, pub native_fee_or_rebate: f64,
}
impl PgOpenBookFill {
pub fn from_row(row: Row) -> Self {
PgOpenBookFill {
time: row.get(0),
bid: row.get(1),
maker: row.get(2),
native_qty_paid: row.get(3),
native_qty_received: row.get(4),
native_fee_or_rebate: row.get(5),
}
}
} }
#[derive(Copy, Clone, AnchorDeserialize)] #[derive(Copy, Clone, AnchorDeserialize)]
@ -91,7 +142,7 @@ pub fn calculate_fill_price_and_size(
fill: PgOpenBookFill, fill: PgOpenBookFill,
base_decimals: u8, base_decimals: u8,
quote_decimals: u8, quote_decimals: u8,
) -> (Decimal, Decimal) { ) -> (f64, f64) {
if fill.bid { if fill.bid {
let price_before_fees = if fill.maker { let price_before_fees = if fill.maker {
fill.native_qty_paid + fill.native_fee_or_rebate fill.native_qty_paid + fill.native_fee_or_rebate
@ -115,6 +166,6 @@ pub fn calculate_fill_price_and_size(
} }
} }
pub fn token_factor(decimals: u8) -> Decimal { pub fn token_factor(decimals: u8) -> f64 {
Decimal::from_u64(10u64.pow(decimals as u32)).unwrap() 10f64.pow(decimals as f64)
} }

View File

@ -5,7 +5,6 @@ use futures::join;
use num_enum::{IntoPrimitive, TryFromPrimitive}; use num_enum::{IntoPrimitive, TryFromPrimitive};
use num_traits::ToPrimitive; use num_traits::ToPrimitive;
use solana_client::nonblocking::rpc_client::RpcClient; use solana_client::nonblocking::rpc_client::RpcClient;
use sqlx::types::Decimal;
use std::{ use std::{
convert::TryFrom, convert::TryFrom,
mem::{align_of, size_of}, mem::{align_of, size_of},
@ -102,19 +101,19 @@ impl LeafNode {
NonZeroU64::new((self.key >> 64) as u64).unwrap() NonZeroU64::new((self.key >> 64) as u64).unwrap()
} }
pub fn readable_price(&self, market: &MarketInfo) -> Decimal { pub fn readable_price(&self, market: &MarketInfo) -> f64 {
let price_lots = Decimal::from((self.key >> 64) as u64); let price_lots = (self.key >> 64) as f64;
let base_multiplier = token_factor(market.base_decimals); let base_multiplier = token_factor(market.base_decimals);
let quote_multiplier = token_factor(market.quote_decimals); let quote_multiplier = token_factor(market.quote_decimals);
let base_lot_size = Decimal::from(market.base_lot_size); let base_lot_size = market.base_lot_size as f64;
let quote_lot_size = Decimal::from(market.quote_lot_size); let quote_lot_size = market.quote_lot_size as f64;
(price_lots * quote_lot_size * base_multiplier) / (base_lot_size * quote_multiplier) (price_lots * quote_lot_size * base_multiplier) / (base_lot_size * quote_multiplier)
} }
pub fn readable_quantity(&self, market: &MarketInfo) -> Decimal { pub fn readable_quantity(&self, market: &MarketInfo) -> f64 {
let base_lot_size = Decimal::from(market.base_lot_size); let base_lot_size = market.base_lot_size as f64;
let base_multiplier = token_factor(market.base_decimals); let base_multiplier = token_factor(market.base_decimals);
Decimal::from(self.quantity) * base_lot_size / base_multiplier self.quantity as f64 * base_lot_size / base_multiplier
} }
#[inline] #[inline]
@ -406,7 +405,7 @@ impl Slab {
} }
} }
pub fn get_best(&self, market: &MarketInfo, bid: bool) -> Decimal { pub fn get_best(&self, market: &MarketInfo, bid: bool) -> f64 {
let min = if bid { let min = if bid {
self.find_max() self.find_max()
} else { } else {
@ -419,7 +418,7 @@ impl Slab {
pub async fn get_best_bids_and_asks( pub async fn get_best_bids_and_asks(
client: RpcClient, client: RpcClient,
markets: &Vec<MarketInfo>, markets: &Vec<MarketInfo>,
) -> (Vec<Decimal>, Vec<Decimal>) { ) -> (Vec<f64>, Vec<f64>) {
let bid_keys = markets let bid_keys = markets
.iter() .iter()
.map(|m| Pubkey::from_str(&m.bids_key).unwrap()) .map(|m| Pubkey::from_str(&m.bids_key).unwrap())

View File

@ -2,15 +2,24 @@ use std::fmt;
use num_traits::ToPrimitive; use num_traits::ToPrimitive;
use serde::Serialize; use serde::Serialize;
use sqlx::types::Decimal; use tokio_postgres::Row;
use super::openbook::token_factor; use super::openbook::token_factor;
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
pub struct PgTrader { pub struct PgTrader {
pub open_orders_owner: String, pub open_orders_owner: String,
pub raw_ask_size: Decimal, pub raw_ask_size: f64,
pub raw_bid_size: Decimal, pub raw_bid_size: f64,
}
impl PgTrader {
pub fn from_row(row: Row) -> Self {
PgTrader {
open_orders_owner: row.get(0),
raw_ask_size: row.get(1),
raw_bid_size: row.get(2),
}
}
} }
#[derive(Clone, Debug, PartialEq, Serialize)] #[derive(Clone, Debug, PartialEq, Serialize)]

View File

@ -1,6 +1,6 @@
use chrono::{NaiveDateTime, Utc}; use chrono::{NaiveDateTime, Utc};
use deadpool_postgres::Pool;
use serde_derive::Deserialize; use serde_derive::Deserialize;
use sqlx::{Pool, Postgres};
use crate::structs::markets::MarketInfo; use crate::structs::markets::MarketInfo;
@ -16,20 +16,53 @@ impl<T, E: std::fmt::Debug> AnyhowWrap for Result<T, E> {
} }
} }
#[derive(Debug, serde::Deserialize)]
pub struct PgConfig {
pub pg: deadpool_postgres::Config,
pub pg_max_pool_connections: usize,
pub pg_use_ssl: bool,
pub pg_ca_cert_path: Option<String>,
pub pg_client_key_path: Option<String>,
}
impl PgConfig {
pub fn from_env() -> Result<Self, config::ConfigError> {
config::Config::builder()
.add_source(config::Environment::default().separator("_"))
.add_source(config::Environment::default())
.build()?
.try_deserialize()
}
}
#[derive(Clone, Debug, Deserialize)] #[derive(Clone, Debug, Deserialize)]
pub struct Config { pub struct Config {
pub rpc_url: String, pub rpc_url: String,
pub database_url: String,
pub max_pg_pool_connections: u32,
} }
pub struct WebContext { pub struct WebContext {
pub rpc_url: String, pub rpc_url: String,
pub markets: Vec<MarketInfo>, pub markets: Vec<MarketInfo>,
pub pool: Pool<Postgres>, pub pool: Pool,
} }
#[allow(deprecated)] #[allow(deprecated)]
pub fn to_timestampz(seconds: u64) -> chrono::DateTime<Utc> { pub fn to_timestampz(seconds: u64) -> chrono::DateTime<Utc> {
chrono::DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(seconds as i64, 0), Utc) chrono::DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(seconds as i64, 0), Utc)
} }
pub(crate) fn f64_max(a: f64, b: f64) -> f64 {
if a >= b {
a
} else {
b
}
}
pub(crate) fn f64_min(a: f64, b: f64) -> f64 {
if a < b {
a
} else {
b
}
}

View File

@ -1,6 +1,6 @@
use chrono::{DateTime, Duration, DurationRound, Utc}; use chrono::{DateTime, Duration, DurationRound, Utc};
use sqlx::{pool::PoolConnection, Postgres}; use deadpool_postgres::Pool;
use std::cmp::{max, min}; use std::cmp::max;
use crate::{ use crate::{
database::fetch::{fetch_candles_from, fetch_earliest_candles, fetch_latest_finished_candle}, database::fetch::{fetch_candles_from, fetch_earliest_candles, fetch_latest_finished_candle},
@ -8,28 +8,29 @@ use crate::{
candle::Candle, candle::Candle,
resolution::{day, Resolution}, resolution::{day, Resolution},
}, },
utils::{f64_max, f64_min},
}; };
pub async fn batch_higher_order_candles( pub async fn batch_higher_order_candles(
conn: &mut PoolConnection<Postgres>, pool: &Pool,
market_name: &str, market_name: &str,
resolution: Resolution, resolution: Resolution,
) -> anyhow::Result<Vec<Candle>> { ) -> anyhow::Result<Vec<Candle>> {
let latest_candle = fetch_latest_finished_candle(conn, market_name, resolution).await?; let latest_candle = fetch_latest_finished_candle(pool, market_name, resolution).await?;
match latest_candle { match latest_candle {
Some(candle) => { Some(candle) => {
let start_time = candle.end_time; let start_time = candle.end_time;
let end_time = start_time + day(); let end_time = start_time + day();
let mut constituent_candles = fetch_candles_from( let mut constituent_candles = fetch_candles_from(
conn, pool,
market_name, market_name,
resolution.get_constituent_resolution(), resolution.get_constituent_resolution(),
start_time, start_time,
end_time, end_time,
) )
.await?; .await?;
if constituent_candles.len() == 0 { if constituent_candles.is_empty() {
return Ok(Vec::new()); return Ok(Vec::new());
} }
let combined_candles = combine_into_higher_order_candles( let combined_candles = combine_into_higher_order_candles(
@ -42,9 +43,9 @@ pub async fn batch_higher_order_candles(
} }
None => { None => {
let mut constituent_candles = let mut constituent_candles =
fetch_earliest_candles(conn, market_name, resolution.get_constituent_resolution()) fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution())
.await?; .await?;
if constituent_candles.len() == 0 { if constituent_candles.is_empty() {
// println!( // println!(
// "Batching {}, but no candles found for: {:?}, {}", // "Batching {}, but no candles found for: {:?}, {}",
// resolution, // resolution,
@ -55,7 +56,7 @@ pub async fn batch_higher_order_candles(
} }
let start_time = constituent_candles[0].start_time.duration_trunc(day())?; let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
if constituent_candles.len() == 0 { if constituent_candles.is_empty() {
return Ok(Vec::new()); return Ok(Vec::new());
} }
@ -99,7 +100,7 @@ fn combine_into_higher_order_candles(
let mut combined_candles = vec![empty_candle; num_candles]; let mut combined_candles = vec![empty_candle; num_candles];
let mut con_iter = constituent_candles.iter_mut().peekable(); let mut con_iter = constituent_candles.iter_mut().peekable();
let mut start_time = st.clone(); let mut start_time = st;
let mut end_time = start_time + duration; let mut end_time = start_time + duration;
let mut last_candle = seed_candle; let mut last_candle = seed_candle;
@ -112,8 +113,8 @@ fn combine_into_higher_order_candles(
while matches!(con_iter.peek(), Some(c) if c.end_time <= end_time) { while matches!(con_iter.peek(), Some(c) if c.end_time <= end_time) {
let unit_candle = con_iter.next().unwrap(); let unit_candle = con_iter.next().unwrap();
combined_candles[i].high = max(combined_candles[i].high, unit_candle.high); combined_candles[i].high = f64_max(combined_candles[i].high, unit_candle.high);
combined_candles[i].low = min(combined_candles[i].low, unit_candle.low); combined_candles[i].low = f64_min(combined_candles[i].low, unit_candle.low);
combined_candles[i].close = unit_candle.close; combined_candles[i].close = unit_candle.close;
combined_candles[i].volume += unit_candle.volume; combined_candles[i].volume += unit_candle.volume;
combined_candles[i].complete = unit_candle.complete; combined_candles[i].complete = unit_candle.complete;
@ -124,7 +125,7 @@ fn combine_into_higher_order_candles(
combined_candles[i].end_time = end_time; combined_candles[i].end_time = end_time;
start_time = end_time; start_time = end_time;
end_time = end_time + duration; end_time += duration;
last_candle = combined_candles[i].clone(); last_candle = combined_candles[i].clone();
} }
@ -143,3 +144,29 @@ fn trim_candles(mut c: Vec<Candle>, start_time: DateTime<Utc>) -> Vec<Candle> {
} }
c c
} }
pub async fn backfill_batch_higher_order_candles(
pool: &Pool,
market_name: &str,
resolution: Resolution,
) -> anyhow::Result<Vec<Candle>> {
let mut constituent_candles =
fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution()).await?;
if constituent_candles.is_empty() {
return Ok(vec![]);
}
let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
let seed_candle = constituent_candles[0].clone();
let combined_candles = combine_into_higher_order_candles(
&mut constituent_candles,
resolution,
start_time,
seed_candle,
);
Ok(trim_candles(
combined_candles,
constituent_candles[0].start_time,
))
}

View File

@ -1,7 +1,7 @@
use std::cmp::{max, min}; use std::cmp::min;
use chrono::{DateTime, Duration, DurationRound, Utc}; use chrono::{DateTime, Duration, DurationRound, Utc};
use sqlx::{pool::PoolConnection, types::Decimal, Postgres}; use deadpool_postgres::Pool;
use crate::{ use crate::{
database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle}, database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
@ -11,15 +11,13 @@ use crate::{
openbook::{calculate_fill_price_and_size, PgOpenBookFill}, openbook::{calculate_fill_price_and_size, PgOpenBookFill},
resolution::{day, Resolution}, resolution::{day, Resolution},
}, },
utils::{f64_max, f64_min},
}; };
pub async fn batch_1m_candles( pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Result<Vec<Candle>> {
conn: &mut PoolConnection<Postgres>,
market: &MarketInfo,
) -> anyhow::Result<Vec<Candle>> {
let market_name = &market.name; let market_name = &market.name;
let market_address = &market.address; let market_address = &market.address;
let latest_candle = fetch_latest_finished_candle(conn, market_name, Resolution::R1m).await?; let latest_candle = fetch_latest_finished_candle(pool, market_name, Resolution::R1m).await?;
match latest_candle { match latest_candle {
Some(candle) => { Some(candle) => {
@ -28,7 +26,7 @@ pub async fn batch_1m_candles(
start_time + day(), start_time + day(),
Utc::now().duration_trunc(Duration::minutes(1))?, Utc::now().duration_trunc(Duration::minutes(1))?,
); );
let mut fills = fetch_fills_from(conn, market_address, start_time, end_time).await?; let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
let candles = combine_fills_into_1m_candles( let candles = combine_fills_into_1m_candles(
&mut fills, &mut fills,
market, market,
@ -39,7 +37,7 @@ pub async fn batch_1m_candles(
Ok(candles) Ok(candles)
} }
None => { None => {
let earliest_fill = fetch_earliest_fill(conn, market_address).await?; let earliest_fill = fetch_earliest_fill(pool, market_address).await?;
if earliest_fill.is_none() { if earliest_fill.is_none() {
println!("No fills found for: {:?}", market_name); println!("No fills found for: {:?}", market_name);
@ -54,8 +52,8 @@ pub async fn batch_1m_candles(
start_time + day(), start_time + day(),
Utc::now().duration_trunc(Duration::minutes(1))?, Utc::now().duration_trunc(Duration::minutes(1))?,
); );
let mut fills = fetch_fills_from(conn, market_address, start_time, end_time).await?; let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
if fills.len() > 0 { if !fills.is_empty() {
let candles = let candles =
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None); combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
Ok(candles) Ok(candles)
@ -71,7 +69,7 @@ fn combine_fills_into_1m_candles(
market: &MarketInfo, market: &MarketInfo,
st: DateTime<Utc>, st: DateTime<Utc>,
et: DateTime<Utc>, et: DateTime<Utc>,
maybe_last_price: Option<Decimal>, maybe_last_price: Option<f64>,
) -> Vec<Candle> { ) -> Vec<Candle> {
let empty_candle = Candle::create_empty_candle(market.name.clone(), Resolution::R1m); let empty_candle = Candle::create_empty_candle(market.name.clone(), Resolution::R1m);
@ -79,13 +77,13 @@ fn combine_fills_into_1m_candles(
let mut candles = vec![empty_candle; minutes as usize]; let mut candles = vec![empty_candle; minutes as usize];
let mut fills_iter = fills.iter_mut().peekable(); let mut fills_iter = fills.iter_mut().peekable();
let mut start_time = st.clone(); let mut start_time = st;
let mut end_time = start_time + Duration::minutes(1); let mut end_time = start_time + Duration::minutes(1);
let mut last_price = match maybe_last_price { let mut last_price = match maybe_last_price {
Some(p) => p, Some(p) => p,
None => { None => {
let first = fills_iter.peek().clone().unwrap(); let first = fills_iter.peek().unwrap();
let (price, _) = let (price, _) =
calculate_fill_price_and_size(**first, market.base_decimals, market.quote_decimals); calculate_fill_price_and_size(**first, market.base_decimals, market.quote_decimals);
price price
@ -105,8 +103,8 @@ fn combine_fills_into_1m_candles(
calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals); calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals);
candles[i].close = price; candles[i].close = price;
candles[i].low = min(price, candles[i].low); candles[i].low = f64_min(price, candles[i].low);
candles[i].high = max(price, candles[i].high); candles[i].high = f64_max(price, candles[i].high);
candles[i].volume += volume; candles[i].volume += volume;
last_price = price; last_price = price;
@ -117,8 +115,43 @@ fn combine_fills_into_1m_candles(
candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time); candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time);
start_time = end_time; start_time = end_time;
end_time = end_time + Duration::minutes(1); end_time += Duration::minutes(1);
} }
candles candles
} }
/// Goes from the earliest fill to the most recent. Will mark candles as complete if there are missing gaps of fills between the start and end.
pub async fn backfill_batch_1m_candles(
pool: &Pool,
market: &MarketInfo,
) -> anyhow::Result<Vec<Candle>> {
let market_name = &market.name;
let market_address = &market.address;
let mut candles = vec![];
let earliest_fill = fetch_earliest_fill(pool, &market.address).await?;
if earliest_fill.is_none() {
println!("No fills found for: {:?}", &market_name);
return Ok(candles);
}
let mut start_time = earliest_fill
.unwrap()
.time
.duration_trunc(Duration::minutes(1))?;
while start_time < Utc::now() {
let end_time = min(
start_time + day(),
Utc::now().duration_trunc(Duration::minutes(1))?,
);
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
if !fills.is_empty() {
let mut minute_candles =
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
candles.append(&mut minute_candles);
}
start_time += day()
}
Ok(candles)
}

View File

@ -2,7 +2,7 @@ pub mod higher_order_candles;
pub mod minute_candles; pub mod minute_candles;
use chrono::Duration; use chrono::Duration;
use sqlx::{pool::PoolConnection, Pool, Postgres}; use deadpool_postgres::Pool;
use strum::IntoEnumIterator; use strum::IntoEnumIterator;
use tokio::{sync::mpsc::Sender, time::sleep}; use tokio::{sync::mpsc::Sender, time::sleep};
@ -14,17 +14,17 @@ use crate::{
use self::higher_order_candles::batch_higher_order_candles; use self::higher_order_candles::batch_higher_order_candles;
pub async fn batch_for_market( pub async fn batch_for_market(
pool: Pool<Postgres>, pool: &Pool,
candles_sender: &Sender<Vec<Candle>>, candles_sender: &Sender<Vec<Candle>>,
market: &MarketInfo, market: &MarketInfo,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
loop { loop {
let sender = candles_sender.clone(); let sender = candles_sender.clone();
let market_clone = market.clone(); let market_clone = market.clone();
let mut conn = pool.acquire().await?; // let client = pool.get().await?;
loop { loop {
sleep(Duration::milliseconds(2000).to_std()?).await; sleep(Duration::milliseconds(2000).to_std()?).await;
match batch_inner(&mut conn, &sender, &market_clone).await { match batch_inner(pool, &sender, &market_clone).await {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
println!( println!(
@ -41,26 +41,26 @@ pub async fn batch_for_market(
} }
async fn batch_inner( async fn batch_inner(
conn: &mut PoolConnection<Postgres>, pool: &Pool,
candles_sender: &Sender<Vec<Candle>>, candles_sender: &Sender<Vec<Candle>>,
market: &MarketInfo, market: &MarketInfo,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let market_name = &market.name.clone(); let market_name = &market.name.clone();
let candles = batch_1m_candles(conn, market).await?; let candles = batch_1m_candles(pool, market).await?;
send_candles(candles, candles_sender).await; send_candles(candles, candles_sender).await;
for resolution in Resolution::iter() { for resolution in Resolution::iter() {
if resolution == Resolution::R1m { if resolution == Resolution::R1m {
continue; continue;
} }
let candles = batch_higher_order_candles(conn, market_name, resolution).await?; let candles = batch_higher_order_candles(pool, market_name, resolution).await?;
send_candles(candles, candles_sender).await; send_candles(candles, candles_sender).await;
} }
Ok(()) Ok(())
} }
async fn send_candles(candles: Vec<Candle>, candles_sender: &Sender<Vec<Candle>>) { async fn send_candles(candles: Vec<Candle>, candles_sender: &Sender<Vec<Candle>>) {
if candles.len() > 0 { if !candles.is_empty() {
if let Err(_) = candles_sender.send(candles).await { if let Err(_) = candles_sender.send(candles).await {
panic!("candles receiver dropped"); panic!("candles receiver dropped");
} }

View File

@ -1,20 +1,22 @@
use dotenv;
use openbook_candles::database::{
initialize::{connect_to_database, setup_database},
insert::{persist_candles, persist_fill_events},
};
use openbook_candles::structs::candle::Candle; use openbook_candles::structs::candle::Candle;
use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
use openbook_candles::structs::openbook::OpenBookFillEventLog; use openbook_candles::structs::openbook::OpenBookFillEvent;
use openbook_candles::utils::Config; use openbook_candles::utils::Config;
use openbook_candles::worker::candle_batching::batch_for_market;
use openbook_candles::worker::trade_fetching::scrape::scrape; use openbook_candles::worker::trade_fetching::scrape::scrape;
use openbook_candles::{
database::{
initialize::{connect_to_database, setup_database},
insert::{persist_candles, persist_fill_events},
},
worker::candle_batching::batch_for_market,
};
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std::env; use std::env;
use std::{collections::HashMap, str::FromStr}; use std::{collections::HashMap, str::FromStr};
use tokio::sync::mpsc; use tokio::sync::mpsc;
#[tokio::main] #[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok(); dotenv::dotenv().ok();
@ -22,19 +24,12 @@ async fn main() -> anyhow::Result<()> {
assert!(args.len() == 2); assert!(args.len() == 2);
let path_to_markets_json = &args[1]; let path_to_markets_json = &args[1];
let rpc_url: String = dotenv::var("RPC_URL").unwrap(); let rpc_url: String = dotenv::var("RPC_URL").unwrap();
let database_url: String = dotenv::var("DATABASE_URL").unwrap();
let max_pg_pool_connections: u32 = dotenv::var("MAX_PG_POOL_CONNS_WORKER")
.unwrap()
.parse::<u32>()
.unwrap();
let config = Config { let config = Config {
rpc_url: rpc_url.clone(), rpc_url: rpc_url.clone(),
database_url,
max_pg_pool_connections,
}; };
let markets = load_markets(&path_to_markets_json); let markets = load_markets(path_to_markets_json);
let market_infos = fetch_market_infos(&config, markets.clone()).await?; let market_infos = fetch_market_infos(&config, markets.clone()).await?;
let mut target_markets = HashMap::new(); let mut target_markets = HashMap::new();
for m in market_infos.clone() { for m in market_infos.clone() {
@ -42,14 +37,14 @@ async fn main() -> anyhow::Result<()> {
} }
println!("{:?}", target_markets); println!("{:?}", target_markets);
let pool = connect_to_database(&config).await?; let pool = connect_to_database().await?;
setup_database(&pool).await?; setup_database(&pool).await?;
let mut handles = vec![]; let mut handles = vec![];
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEventLog>(1000); let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(1000);
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
scrape(&config, &fill_sender, &target_markets).await; //TODO: send the vec, it's okay scrape(&config, &fill_sender, &target_markets).await;
})); }));
let fills_pool = pool.clone(); let fills_pool = pool.clone();
@ -67,7 +62,7 @@ async fn main() -> anyhow::Result<()> {
let sender = candle_sender.clone(); let sender = candle_sender.clone();
let batch_pool = pool.clone(); let batch_pool = pool.clone();
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
batch_for_market(batch_pool, &sender, &market) batch_for_market(&batch_pool, &sender, &market)
.await .await
.unwrap(); .unwrap();
println!("SOMETHING WENT WRONG"); println!("SOMETHING WENT WRONG");

View File

@ -5,22 +5,26 @@ use solana_transaction_status::{
}; };
use std::{collections::HashMap, io::Error}; use std::{collections::HashMap, io::Error};
use crate::structs::openbook::OpenBookFillEventLog; use crate::structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw};
const PROGRAM_DATA: &str = "Program data: "; const PROGRAM_DATA: &str = "Program data: ";
pub fn parse_trades_from_openbook_txns( pub fn parse_trades_from_openbook_txns(
txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>, txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>,
target_markets: &HashMap<Pubkey, u8>, target_markets: &HashMap<Pubkey, u8>,
) -> Vec<OpenBookFillEventLog> { ) -> Vec<OpenBookFillEvent> {
let mut fills_vector = Vec::<OpenBookFillEventLog>::new(); let mut fills_vector = Vec::<OpenBookFillEvent>::new();
for txn in txns.iter_mut() { for txn in txns.iter_mut() {
match txn { match txn {
Ok(t) => { Ok(t) => {
if let Some(m) = &t.transaction.meta { if let Some(m) = &t.transaction.meta {
match &m.log_messages { match &m.log_messages {
OptionSerializer::Some(logs) => { OptionSerializer::Some(logs) => {
match parse_openbook_fills_from_logs(logs, target_markets) { match parse_openbook_fills_from_logs(
logs,
target_markets,
t.block_time.unwrap(),
) {
Some(mut events) => fills_vector.append(&mut events), Some(mut events) => fills_vector.append(&mut events),
None => {} None => {}
} }
@ -39,8 +43,9 @@ pub fn parse_trades_from_openbook_txns(
fn parse_openbook_fills_from_logs( fn parse_openbook_fills_from_logs(
logs: &Vec<String>, logs: &Vec<String>,
target_markets: &HashMap<Pubkey, u8>, target_markets: &HashMap<Pubkey, u8>,
) -> Option<Vec<OpenBookFillEventLog>> { block_time: i64,
let mut fills_vector = Vec::<OpenBookFillEventLog>::new(); ) -> Option<Vec<OpenBookFillEvent>> {
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
for l in logs { for l in logs {
match l.strip_prefix(PROGRAM_DATA) { match l.strip_prefix(PROGRAM_DATA) {
Some(log) => { Some(log) => {
@ -49,13 +54,14 @@ fn parse_openbook_fills_from_logs(
_ => continue, _ => continue,
}; };
let mut slice: &[u8] = &borsh_bytes[8..]; let mut slice: &[u8] = &borsh_bytes[8..];
let event: Result<OpenBookFillEventLog, Error> = let event: Result<OpenBookFillEventRaw, Error> =
anchor_lang::AnchorDeserialize::deserialize(&mut slice); anchor_lang::AnchorDeserialize::deserialize(&mut slice);
match event { match event {
Ok(e) => { Ok(e) => {
if target_markets.contains_key(&e.market) { let fill_event = e.with_time(block_time);
fills_vector.push(e); if target_markets.contains_key(&fill_event.market) {
fills_vector.push(fill_event);
} }
} }
_ => continue, _ => continue,
@ -65,9 +71,9 @@ fn parse_openbook_fills_from_logs(
} }
} }
if fills_vector.len() > 0 { if !fills_vector.is_empty() {
return Some(fills_vector); Some(fills_vector)
} else { } else {
return None; None
} }
} }

View File

@ -8,13 +8,13 @@ use solana_transaction_status::UiTransactionEncoding;
use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use crate::{structs::openbook::OpenBookFillEventLog, utils::Config}; use crate::{structs::openbook::OpenBookFillEvent, utils::Config};
use super::parsing::parse_trades_from_openbook_txns; use super::parsing::parse_trades_from_openbook_txns;
pub async fn scrape( pub async fn scrape(
config: &Config, config: &Config,
fill_sender: &Sender<OpenBookFillEventLog>, fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>, target_markets: &HashMap<Pubkey, u8>,
) { ) {
let rpc_client = let rpc_client =
@ -38,7 +38,7 @@ pub async fn scrape_transactions(
rpc_client: &RpcClient, rpc_client: &RpcClient,
before_sig: Option<Signature>, before_sig: Option<Signature>,
limit: Option<usize>, limit: Option<usize>,
fill_sender: &Sender<OpenBookFillEventLog>, fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>, target_markets: &HashMap<Pubkey, u8>,
) -> Option<Signature> { ) -> Option<Signature> {
let rpc_config = GetConfirmedSignaturesForAddress2Config { let rpc_config = GetConfirmedSignaturesForAddress2Config {
@ -62,12 +62,12 @@ pub async fn scrape_transactions(
} }
}; };
if sigs.len() == 0 { if sigs.is_empty() {
println!("No signatures found"); println!("No signatures found");
return before_sig; return before_sig;
} }
let last = sigs.last().clone().unwrap(); let last = sigs.last().unwrap();
let request_last_sig = Signature::from_str(&last.signature).unwrap(); let request_last_sig = Signature::from_str(&last.signature).unwrap();
sigs.retain(|sig| sig.err.is_none()); sigs.retain(|sig| sig.err.is_none());
@ -88,13 +88,13 @@ pub async fn scrape_transactions(
let txn_futs: Vec<_> = signatures let txn_futs: Vec<_> = signatures
.iter() .iter()
.map(|s| rpc_client.get_transaction_with_config(&s, txn_config)) .map(|s| rpc_client.get_transaction_with_config(s, txn_config))
.collect(); .collect();
let mut txns = join_all(txn_futs).await; let mut txns = join_all(txn_futs).await;
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets); let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
if fills.len() > 0 { if !fills.is_empty() {
for fill in fills.into_iter() { for fill in fills.into_iter() {
if let Err(_) = fill_sender.send(fill).await { if let Err(_) = fill_sender.send(fill).await {
panic!("receiver dropped"); panic!("receiver dropped");