Compare commits

...

15 Commits

Author SHA1 Message Date
riordanp 9b69e380c3
Remove combined deploy config (#1) 2023-05-23 12:37:58 +01:00
dboures df1af2c10d
chore: remove illiquid markets 2023-05-20 14:18:45 -05:00
dboures 34117ca95e
fix: query error in /coingecko/tickers 2023-05-20 13:59:56 -05:00
dboures 6e16ef8e04
lint: cargo clippy 2023-05-19 19:16:37 -05:00
dboures 7ae8339ebb
feat: split backfills into trades and candles 2023-05-19 19:15:13 -05:00
dboures 8bbf89677c
docs: update .env-example 2023-05-19 14:51:20 -05:00
dboures 0e821cc3c1
feat: fill events include block time (fixes backfill issues) 2023-05-19 14:50:56 -05:00
Riordan Panayides 872e6fcdb2 Fix reading postgres config from env 2023-05-18 19:41:17 +01:00
Riordan Panayides 5b61f3d949 wip: many changes for deployment
- split out postgres config and read from env
- make ssl optional
- revert to separate dockerfiles
- give tokio enough workers
- allow custom server bind address
- fix warnings
- cargo fmt
2023-05-17 17:11:17 +01:00
Riordan Panayides 697a20d9ff Fix backfill 2023-05-16 15:59:33 +01:00
Riordan Panayides cccc7accd5 Merge remote-tracking branch 'upstream/tokio-postgres' into pan/fly-deploy 2023-05-15 13:05:06 +01:00
dboures c20429bcc5
wip: attempt to use SSL cert 2023-05-14 03:21:34 -05:00
dboures a4bf9a35be
refactor: server uses tokio-postgres instead of sqlx 2023-05-14 03:09:12 -05:00
dboures cead78381d
refactor: worker uses tokio-postgres instead of sqlx 2023-05-14 02:15:10 -05:00
Riordan Panayides 28dc3c5e59 Add combined dockerfile, fly config 2023-05-13 14:52:00 +01:00
30 changed files with 1227 additions and 991 deletions

View File

@ -1,5 +1,11 @@
RPC_URL=http://solana-mainnet-api.rpc-node.com
DATABASE_URL=
SQLX_OFFLINE=true
MAX_PG_POOL_CONNS_WORKER=5
MAX_PG_POOL_CONNS_SERVER=15
SERVER_BIND_ADDR="[::]:8080"
PG_HOST=127.0.0.1
PG_PORT=5432
PG_USER=postgres
PG_PASSWORD=
PG_DBNAME=postgres
PG_MAX_POOL_CONNECTIONS=10
PG_USE_SSL=false
PG_CA_CERT_PATH=
PG_CLIENT_KEY_PATH=

752
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -16,8 +16,12 @@ name = "server"
path = "src/server/main.rs"
[[bin]]
name = "backfill"
path = "src/backfill/main.rs"
name = "backfill-trades"
path = "src/backfill-trades/main.rs"
[[bin]]
name = "backfill-candles"
path = "src/backfill-candles/main.rs"
[dependencies]
tokio = { version = "1", features = ["full"] }
@ -26,7 +30,10 @@ futures = "0.3.27"
jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] }
sqlx = { version = "0.6", features = [ "runtime-tokio-native-tls" , "postgres", "chrono", "decimal", "offline" ] }
deadpool-postgres = { version = "0.10.5", features = [ "rt_tokio_1", "serde" ] }
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
postgres-native-tls = "0.5.0"
native-tls = "0.2.11"
chrono = "0.4.23"
solana-client = "=1.14.13"
@ -58,4 +65,6 @@ actix-web = "4"
arrayref = "0.3.6"
bytemuck = "1.12.3"
num_enum = "0.6.1"
num_enum = "0.6.1"
config = "0.13.1"

View File

@ -19,4 +19,6 @@ RUN apt-get update && apt-get -y install ca-certificates libssl1.1
# We do not need the Rust toolchain to run the binary!
FROM base_image AS runtime
COPY --from=builder /target/release/server /usr/local/bin
ENTRYPOINT ["/usr/local/bin/server"]
COPY --from=builder markets.json .
COPY --from=builder ca.cer .
COPY --from=builder client.pks .

View File

@ -2,13 +2,13 @@ FROM lukemathwalker/cargo-chef:latest-rust-1.67.1-slim AS chef
FROM chef AS planner
COPY . .
RUN cargo chef prepare --recipe-path worker-recipe.json
RUN cargo chef prepare --recipe-path recipe.json
FROM chef AS builder
COPY --from=planner worker-recipe.json worker-recipe.json
COPY --from=planner recipe.json recipe.json
RUN apt-get update && apt-get install -y libudev-dev clang pkg-config libssl-dev build-essential cmake
RUN rustup component add rustfmt && update-ca-certificates
RUN cargo chef cook --release --recipe-path worker-recipe.json
RUN cargo chef cook --release --recipe-path recipe.json
# Build application
COPY . .
RUN cargo build --release --bin worker
@ -19,4 +19,6 @@ RUN apt-get update && apt-get -y install ca-certificates libssl1.1
# We do not need the Rust toolchain to run the binary!
FROM base_image AS runtime
COPY --from=builder /target/release/worker /usr/local/bin
ENTRYPOINT ["/usr/local/bin/worker"]
COPY --from=builder markets.json .
COPY --from=builder ca.cer .
COPY --from=builder client.pks .

19
cd/server.toml Normal file
View File

@ -0,0 +1,19 @@
app = "openbook-candles-server"
kill_signal = "SIGTERM"
kill_timeout = 30
[build]
dockerfile = "../Dockerfile.server"
[experimental]
cmd = ["server", "markets.json"]
[[services]]
internal_port = 8080
processes = ["app"]
protocol = "tcp"
[services.concurrency]
hard_limit = 1024
soft_limit = 1024
type = "connections"

9
cd/worker.toml Normal file
View File

@ -0,0 +1,9 @@
app = "openbook-candles-worker"
kill_signal = "SIGTERM"
kill_timeout = 30
[build]
dockerfile = "../Dockerfile.worker"
[experimental]
cmd = ["worker", "markets.json"]

View File

@ -23,10 +23,6 @@
"name": "mSOL/USDC",
"address": "9Lyhks5bQQxb9EyyX55NtgKQzpM4WK7JCmeaWuQ5MoXD"
},
{
"name": "stSOL/USDC",
"address": "JCKa72xFYGWBEVJZ7AKZ2ofugWPBfrrouQviaGaohi3R"
},
{
"name": "SOL/USDT",
"address": "2AdaV97p6SfkuMQJdu8DHhBhmJe7oWdvbm52MJfYQmfA"
@ -35,10 +31,6 @@
"name": "USDT/USDC",
"address": "B2na8Awyd7cpC59iEU43FagJAPLigr3AP3s38KM982bu"
},
{
"name": "USDT/USDC",
"address": "B2na8Awyd7cpC59iEU43FagJAPLigr3AP3s38KM982bu"
},
{
"name": "ETH/USDC",
"address": "BbJgE7HZMaDp5NTYvRh5jZSkQPVDTU8ubPFtpogUkEj4"
@ -50,9 +42,5 @@
{
"name": "RAY/USDT",
"address": "GpHbiJJ9VHiuHVXeoet121Utrbm1CSNNzYrBKB8Xz2oz"
},
{
"name": "SLND/USDC",
"address": "HTHMfoxePjcXFhrV74pfCUNoWGe374ecFwiDjPGTkzHr"
}
]

View File

@ -0,0 +1,69 @@
use deadpool_postgres::Object;
use openbook_candles::{
database::{
initialize::connect_to_database,
insert::{build_candles_upsert_statement},
},
structs::{
candle::Candle,
markets::{fetch_market_infos, load_markets},
resolution::Resolution,
},
utils::{AnyhowWrap, Config},
worker::candle_batching::{
higher_order_candles::backfill_batch_higher_order_candles,
minute_candles::backfill_batch_1m_candles,
},
};
use std::{env};
use strum::IntoEnumIterator;
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
let args: Vec<String> = env::args().collect();
assert!(args.len() == 2);
let path_to_markets_json = &args[1];
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
let config = Config {
rpc_url: rpc_url.clone(),
};
let markets = load_markets(path_to_markets_json);
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
println!("Backfilling candles for {:?}", markets);
let pool = connect_to_database().await?;
for market in market_infos.into_iter() {
let client = pool.get().await?;
let minute_candles = backfill_batch_1m_candles(&pool, &market).await?;
save_candles(minute_candles, client).await?;
for resolution in Resolution::iter() {
if resolution == Resolution::R1m {
continue;
}
let higher_order_candles =
backfill_batch_higher_order_candles(&pool, &market.name, resolution).await?;
let client = pool.get().await?;
save_candles(higher_order_candles, client).await?;
}
}
Ok(())
}
async fn save_candles(candles: Vec<Candle>, client: Object) -> anyhow::Result<()> {
if !candles.is_empty() {
let upsert_statement = build_candles_upsert_statement(candles);
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()?;
}
Ok(())
}

View File

@ -1,33 +1,37 @@
use std::{collections::HashMap, str::FromStr, env};
use anchor_lang::prelude::Pubkey;
use chrono::{NaiveDateTime, DateTime, Utc, Duration};
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use futures::future::join_all;
use openbook_candles::{structs::{openbook::OpenBookFillEventLog, markets::{load_markets, fetch_market_infos}}, worker::trade_fetching::{scrape::scrape_transactions, parsing::parse_trades_from_openbook_txns}, database::{initialize::connect_to_database, insert::persist_fill_events}, utils::Config};
use solana_client::{rpc_config::RpcTransactionConfig, nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, rpc_response::RpcConfirmedTransactionStatusWithSignature};
use openbook_candles::{
database::{initialize::connect_to_database, insert::persist_fill_events},
structs::{
markets::{fetch_market_infos, load_markets},
openbook::OpenBookFillEvent,
},
utils::Config,
worker::trade_fetching::parsing::parse_trades_from_openbook_txns,
};
use solana_client::{
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
rpc_config::RpcTransactionConfig, rpc_response::RpcConfirmedTransactionStatusWithSignature,
};
use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature};
use solana_transaction_status::UiTransactionEncoding;
use tokio::sync::mpsc::{Sender, self};
use std::{collections::HashMap, env, str::FromStr};
use tokio::sync::mpsc::{self, Sender};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
let args: Vec<String> = env::args().collect();
assert!(args.len() == 2);
let path_to_markets_json = &args[1];
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
let database_url: String = dotenv::var("DATABASE_URL").unwrap();
let max_pg_pool_connections: u32 = dotenv::var("MAX_PG_POOL_CONNS_WORKER")
.unwrap()
.parse::<u32>()
.unwrap();
let config = Config {
rpc_url: rpc_url.clone(),
database_url,
max_pg_pool_connections,
};
let markets = load_markets(&path_to_markets_json);
let markets = load_markets(path_to_markets_json);
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
let mut target_markets = HashMap::new();
for m in market_infos.clone() {
@ -35,10 +39,10 @@ async fn main() -> anyhow::Result<()> {
}
println!("{:?}", target_markets);
let pool = connect_to_database(&config).await?;
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEventLog>(1000);
let pool = connect_to_database().await?;
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(1000);
tokio::spawn(async move {
tokio::spawn(async move {
loop {
persist_fill_events(&pool, &mut fill_receiver)
.await
@ -52,10 +56,9 @@ async fn main() -> anyhow::Result<()> {
pub async fn backfill(
rpc_url: String,
fill_sender: &Sender<OpenBookFillEventLog>,
fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>,
) -> anyhow::Result<()> {
println!("backfill started");
let mut before_sig: Option<Signature> = None;
let mut now_time = Utc::now().timestamp();
@ -64,14 +67,14 @@ pub async fn backfill(
let mut handles = vec![];
while now_time > end_time {
let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
let rpc_client =
RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
let maybe_r = get_signatures(&rpc_client, before_sig).await;
match maybe_r {
Some((last, time, sigs)) => {
now_time = time;
before_sig = Some(last);
let time_left = backfill_time_left(now_time, end_time);
println!(
"{} minutes ~ {} days remaining in the backfill\n",
@ -85,10 +88,10 @@ pub async fn backfill(
get_transactions(&rpc_client, sigs, &cloned_sender, &cloned_markets).await;
});
handles.push(handle);
},
None => {},
}
None => {}
}
};
}
futures::future::join_all(handles).await;
@ -96,44 +99,52 @@ pub async fn backfill(
Ok(())
}
pub async fn get_signatures(
rpc_client: &RpcClient,
before_sig: Option<Signature>,
) -> Option<(
Signature,
i64,
Vec<RpcConfirmedTransactionStatusWithSignature>,
)> {
let rpc_config = GetConfirmedSignaturesForAddress2Config {
before: before_sig,
until: None,
limit: None,
commitment: Some(CommitmentConfig::confirmed()),
};
pub async fn get_signatures(rpc_client: &RpcClient,
before_sig: Option<Signature>) -> Option<(Signature, i64, Vec<RpcConfirmedTransactionStatusWithSignature>)> {
let rpc_config = GetConfirmedSignaturesForAddress2Config {
before: before_sig,
until: None,
limit: None,
commitment: Some(CommitmentConfig::confirmed()),
};
let sigs = match rpc_client
.get_signatures_for_address_with_config(
&Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(),
rpc_config,
)
.await
{
Ok(s) => s,
Err(e) => {
println!("Error in get_signatures_for_address_with_config: {}", e);
return None;
}
};
if sigs.len() == 0 {
println!("No signatures found");
let sigs = match rpc_client
.get_signatures_for_address_with_config(
&Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(),
rpc_config,
)
.await
{
Ok(s) => s,
Err(e) => {
println!("Error in get_signatures_for_address_with_config: {}", e);
return None;
}
let last = sigs.last().unwrap();
return Some((Signature::from_str(&last.signature).unwrap(), last.block_time.unwrap(), sigs));
};
if sigs.is_empty() {
println!("No signatures found");
return None;
}
let last = sigs.last().unwrap();
// println!("{:?}", last.block_time.unwrap());
Some((
Signature::from_str(&last.signature).unwrap(),
last.block_time.unwrap(),
sigs,
))
}
pub async fn get_transactions(
rpc_client: &RpcClient,
mut sigs: Vec<RpcConfirmedTransactionStatusWithSignature>,
fill_sender: &Sender<OpenBookFillEventLog>,
fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>,
) {
sigs.retain(|sig| sig.err.is_none());
@ -154,14 +165,15 @@ pub async fn get_transactions(
let txn_futs: Vec<_> = signatures
.iter()
.map(|s| rpc_client.get_transaction_with_config(&s, txn_config))
.map(|s| rpc_client.get_transaction_with_config(s, txn_config))
.collect();
let mut txns = join_all(txn_futs).await;
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
if fills.len() > 0 {
if !fills.is_empty() {
for fill in fills.into_iter() {
// println!("Sending fill {:?}", fill);
if let Err(_) = fill_sender.send(fill).await {
panic!("receiver dropped");
}
@ -175,4 +187,4 @@ fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration {
let cur_date = DateTime::<Utc>::from_utc(naive_cur, Utc);
let bf_date = DateTime::<Utc>::from_utc(naive_bf, Utc);
cur_date - bf_date
}
}

View File

@ -1,50 +1,54 @@
use chrono::{DateTime, Utc};
use sqlx::{pool::PoolConnection, Postgres};
use crate::{
structs::{
candle::Candle,
coingecko::{PgCoinGecko24HighLow, PgCoinGecko24HourVolume},
openbook::PgOpenBookFill,
resolution::Resolution,
trader::PgTrader,
},
utils::AnyhowWrap,
use crate::structs::{
candle::Candle,
coingecko::{PgCoinGecko24HighLow, PgCoinGecko24HourVolume},
openbook::PgOpenBookFill,
resolution::Resolution,
trader::PgTrader,
};
use chrono::{DateTime, Utc};
use deadpool_postgres::{GenericClient, Pool};
pub async fn fetch_earliest_fill(
conn: &mut PoolConnection<Postgres>,
pool: &Pool,
market_address_string: &str,
) -> anyhow::Result<Option<PgOpenBookFill>> {
sqlx::query_as!(
PgOpenBookFill,
r#"SELECT
time as "time!",
bid as "bid!",
maker as "maker!",
native_qty_paid as "native_qty_paid!",
native_qty_received as "native_qty_received!",
native_fee_or_rebate as "native_fee_or_rebate!"
from fills
where market = $1
and maker = true
ORDER BY time asc LIMIT 1"#,
market_address_string
)
.fetch_optional(conn)
.await
.map_err_anyhow()
let client = pool.get().await?;
let stmt = client
.prepare(
r#"SELECT
time as "time!",
bid as "bid!",
maker as "maker!",
native_qty_paid as "native_qty_paid!",
native_qty_received as "native_qty_received!",
native_fee_or_rebate as "native_fee_or_rebate!"
from fills
where market = $1
and maker = true
ORDER BY time asc LIMIT 1"#,
)
.await?;
let row = client.query_opt(&stmt, &[&market_address_string]).await?;
match row {
Some(r) => Ok(Some(PgOpenBookFill::from_row(r))),
None => Ok(None),
}
}
pub async fn fetch_fills_from(
conn: &mut PoolConnection<Postgres>,
pool: &Pool,
market_address_string: &str,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
) -> anyhow::Result<Vec<PgOpenBookFill>> {
sqlx::query_as!(
PgOpenBookFill,
r#"SELECT
let client = pool.get().await?;
let stmt = client
.prepare(
r#"SELECT
time as "time!",
bid as "bid!",
maker as "maker!",
@ -53,31 +57,36 @@ pub async fn fetch_fills_from(
native_fee_or_rebate as "native_fee_or_rebate!"
from fills
where market = $1
and time >= $2
and time < $3
and time >= $2::timestamptz
and time < $3::timestamptz
and maker = true
ORDER BY time asc"#,
market_address_string,
start_time,
end_time
)
.fetch_all(conn)
.await
.map_err_anyhow()
)
.await?;
let rows = client
.query(&stmt, &[&market_address_string, &start_time, &end_time])
.await?;
Ok(rows
.into_iter()
.map(PgOpenBookFill::from_row)
.collect())
}
pub async fn fetch_latest_finished_candle(
conn: &mut PoolConnection<Postgres>,
pool: &Pool,
market_name: &str,
resolution: Resolution,
) -> anyhow::Result<Option<Candle>> {
sqlx::query_as!(
Candle,
r#"SELECT
let client = pool.get().await?;
let stmt = client
.prepare(
r#"SELECT
market_name as "market_name!",
start_time as "start_time!",
end_time as "end_time!",
resolution as "resolution!",
market_name as "market_name!",
open as "open!",
close as "close!",
high as "high!",
@ -89,26 +98,35 @@ pub async fn fetch_latest_finished_candle(
and resolution = $2
and complete = true
ORDER BY start_time desc LIMIT 1"#,
market_name,
resolution.to_string()
)
.fetch_optional(conn)
.await
.map_err_anyhow()
)
.await?;
let row = client
.query_opt(&stmt, &[&market_name, &resolution.to_string()])
.await?;
match row {
Some(r) => Ok(Some(Candle::from_row(r))),
None => Ok(None),
}
}
/// Fetches all of the candles for the given market and resoultion, starting from the earliest.
/// Note that this function will fetch ALL candles.
pub async fn fetch_earliest_candles(
conn: &mut PoolConnection<Postgres>,
pool: &Pool,
market_name: &str,
resolution: Resolution,
) -> anyhow::Result<Vec<Candle>> {
sqlx::query_as!(
Candle,
r#"SELECT
let client = pool.get().await?;
let stmt = client
.prepare(
r#"SELECT
market_name as "market_name!",
start_time as "start_time!",
end_time as "end_time!",
resolution as "resolution!",
market_name as "market_name!",
open as "open!",
close as "close!",
high as "high!",
@ -119,64 +137,32 @@ pub async fn fetch_earliest_candles(
where market_name = $1
and resolution = $2
ORDER BY start_time asc"#,
market_name,
resolution.to_string()
)
.fetch_all(conn)
.await
.map_err_anyhow()
)
.await?;
let rows = client
.query(&stmt, &[&market_name, &resolution.to_string()])
.await?;
Ok(rows.into_iter().map(Candle::from_row).collect())
}
pub async fn fetch_candles_from(
conn: &mut PoolConnection<Postgres>,
pool: &Pool,
market_name: &str,
resolution: Resolution,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
) -> anyhow::Result<Vec<Candle>> {
sqlx::query_as!(
Candle,
r#"SELECT
start_time as "start_time!",
end_time as "end_time!",
resolution as "resolution!",
market_name as "market_name!",
open as "open!",
close as "close!",
high as "high!",
low as "low!",
volume as "volume!",
complete as "complete!"
from candles
where market_name = $1
and resolution = $2
and start_time >= $3
and end_time <= $4
ORDER BY start_time asc"#,
market_name,
resolution.to_string(),
start_time,
end_time
)
.fetch_all(conn)
.await
.map_err_anyhow()
}
let client = pool.get().await?;
pub async fn fetch_tradingview_candles(
conn: &mut PoolConnection<Postgres>,
market_name: &str,
resolution: Resolution,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
) -> anyhow::Result<Vec<Candle>> {
sqlx::query_as!(
Candle,
r#"SELECT
let stmt = client
.prepare(
r#"SELECT
market_name as "market_name!",
start_time as "start_time!",
end_time as "end_time!",
resolution as "resolution!",
market_name as "market_name!",
open as "open!",
close as "close!",
high as "high!",
@ -189,112 +175,135 @@ pub async fn fetch_tradingview_candles(
and start_time >= $3
and end_time <= $4
ORDER BY start_time asc"#,
market_name,
resolution.to_string(),
start_time,
end_time
)
.fetch_all(conn)
.await
.map_err_anyhow()
)
.await?;
let rows = client
.query(
&stmt,
&[
&market_name,
&resolution.to_string(),
&start_time,
&end_time,
],
)
.await?;
Ok(rows.into_iter().map(Candle::from_row).collect())
}
pub async fn fetch_top_traders_by_base_volume_from(
conn: &mut PoolConnection<Postgres>,
pool: &Pool,
market_address_string: &str,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
) -> anyhow::Result<Vec<PgTrader>> {
sqlx::query_as!(
PgTrader,
r#"SELECT
open_orders_owner,
sum(
native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
) as "raw_ask_size!",
sum(
native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
) as "raw_bid_size!"
FROM fills
WHERE market = $1
AND time >= $2
AND time < $3
GROUP BY open_orders_owner
ORDER BY
sum(native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
+
sum(native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
DESC
LIMIT 10000"#,
market_address_string,
start_time,
end_time
)
.fetch_all(conn)
.await
.map_err_anyhow()
let client = pool.get().await?;
let stmt = client
.prepare(
r#"SELECT
open_orders_owner,
sum(
native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
) as "raw_ask_size!",
sum(
native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
) as "raw_bid_size!"
FROM fills
WHERE market = $1
AND time >= $2
AND time < $3
GROUP BY open_orders_owner
ORDER BY
sum(native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
+
sum(native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
DESC
LIMIT 10000"#,
)
.await?;
let rows = client
.query(&stmt, &[&market_address_string, &start_time, &end_time])
.await?;
Ok(rows.into_iter().map(PgTrader::from_row).collect())
}
pub async fn fetch_top_traders_by_quote_volume_from(
conn: &mut PoolConnection<Postgres>,
pool: &Pool,
market_address_string: &str,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
) -> anyhow::Result<Vec<PgTrader>> {
sqlx::query_as!(
PgTrader,
r#"SELECT
open_orders_owner,
sum(
native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
) as "raw_ask_size!",
sum(
native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
) as "raw_bid_size!"
FROM fills
WHERE market = $1
AND time >= $2
AND time < $3
GROUP BY open_orders_owner
ORDER BY
sum(native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
+
sum(native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
DESC
LIMIT 10000"#,
market_address_string,
start_time,
end_time
)
.fetch_all(conn)
.await
.map_err_anyhow()
let client = pool.get().await?;
let stmt = client
.prepare(
r#"SELECT
open_orders_owner,
sum(
native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
) as "raw_ask_size!",
sum(
native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
) as "raw_bid_size!"
FROM fills
WHERE market = $1
AND time >= $2
AND time < $3
GROUP BY open_orders_owner
ORDER BY
sum(native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
+
sum(native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
DESC
LIMIT 10000"#,
)
.await?;
let rows = client
.query(&stmt, &[&market_address_string, &start_time, &end_time])
.await?;
Ok(rows.into_iter().map(PgTrader::from_row).collect())
}
pub async fn fetch_coingecko_24h_volume(
conn: &mut PoolConnection<Postgres>,
pool: &Pool,
) -> anyhow::Result<Vec<PgCoinGecko24HourVolume>> {
sqlx::query_as!(
PgCoinGecko24HourVolume,
r#"select market as "address!",
sum(native_qty_paid) as "raw_quote_size!",
sum(native_qty_received) as "raw_base_size!"
let client = pool.get().await?;
let stmt = client
.prepare(
r#"select market as "address!",
sum(native_qty_received) as "raw_base_size!",
sum(native_qty_paid) as "raw_quote_size!"
from fills
where "time" >= current_timestamp - interval '1 day'
and bid = true
group by market"#
)
.fetch_all(conn)
.await
.map_err_anyhow()
group by market"#,
)
.await?;
let rows = client.query(&stmt, &[]).await?;
Ok(rows
.into_iter()
.map(PgCoinGecko24HourVolume::from_row)
.collect())
}
pub async fn fetch_coingecko_24h_high_low(
conn: &mut PoolConnection<Postgres>,
pool: &Pool,
) -> anyhow::Result<Vec<PgCoinGecko24HighLow>> {
sqlx::query_as!(
PgCoinGecko24HighLow,
r#"select
let client = pool.get().await?;
let stmt = client
.prepare(
r#"select
g.market_name as "market_name!",
g.high as "high!",
g.low as "low!",
@ -317,9 +326,14 @@ pub async fn fetch_coingecko_24h_high_low(
join candles c on g.market_name = c.market_name
and g.start_time = c.start_time
where
c.resolution = '1M'"#
)
.fetch_all(conn)
.await
.map_err_anyhow()
c.resolution = '1M'"#,
)
.await?;
let rows = client.query(&stmt, &[]).await?;
Ok(rows
.into_iter()
.map(PgCoinGecko24HighLow::from_row)
.collect())
}

View File

@ -1,24 +1,70 @@
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
use std::time::Duration;
use std::{fs, time::Duration};
use crate::utils::{AnyhowWrap, Config};
use deadpool_postgres::{
ManagerConfig, Pool, PoolConfig, RecyclingMethod, Runtime, SslMode, Timeouts,
};
use native_tls::{Certificate, Identity, TlsConnector};
use postgres_native_tls::MakeTlsConnector;
pub async fn connect_to_database(config: &Config) -> anyhow::Result<Pool<Postgres>> {
loop {
let pool = PgPoolOptions::new()
.max_connections(config.max_pg_pool_connections)
.connect(&config.database_url)
.await;
if pool.is_ok() {
println!("Database connected");
return pool.map_err_anyhow();
use crate::utils::PgConfig;
pub async fn connect_to_database() -> anyhow::Result<Pool> {
let mut pg_config = PgConfig::from_env()?;
pg_config.pg.manager = Some(ManagerConfig {
recycling_method: RecyclingMethod::Fast,
});
pg_config.pg.pool = Some(PoolConfig {
max_size: pg_config.pg_max_pool_connections,
timeouts: Timeouts::default(),
});
// openssl pkcs12 -export -in client.cer -inkey client-key.cer -out client.pks
// base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64
// fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a mango-fills
// fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills
let tls = if pg_config.pg_use_ssl {
pg_config.pg.ssl_mode = Some(SslMode::Require);
let ca_cert = fs::read(pg_config.pg_ca_cert_path.expect("reading ca cert from env"))
.expect("reading ca cert from file");
let client_key = fs::read(
pg_config
.pg_client_key_path
.expect("reading client key from env"),
)
.expect("reading client key from file");
MakeTlsConnector::new(
TlsConnector::builder()
.add_root_certificate(Certificate::from_pem(&ca_cert)?)
.identity(Identity::from_pkcs12(&client_key, "pass")?)
.danger_accept_invalid_certs(false)
.build()?,
)
} else {
MakeTlsConnector::new(
TlsConnector::builder()
.danger_accept_invalid_certs(true)
.build()
.unwrap(),
)
};
let pool = pg_config
.pg
.create_pool(Some(Runtime::Tokio1), tls)
.unwrap();
match pool.get().await {
Ok(_) => println!("Database connected"),
Err(e) => {
println!("Failed to connect to database: {}, retrying", e.to_string());
tokio::time::sleep(Duration::from_millis(500)).await;
}
println!("Failed to connect to database, retrying");
tokio::time::sleep(Duration::from_millis(500)).await;
}
Ok(pool)
}
pub async fn setup_database(pool: &Pool<Postgres>) -> anyhow::Result<()> {
pub async fn setup_database(pool: &Pool) -> anyhow::Result<()> {
let candles_table_fut = create_candles_table(pool);
let fills_table_fut = create_fills_table(pool);
let result = tokio::try_join!(candles_table_fut, fills_table_fut);
@ -34,50 +80,51 @@ pub async fn setup_database(pool: &Pool<Postgres>) -> anyhow::Result<()> {
}
}
pub async fn create_candles_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
let mut tx = pool.begin().await.map_err_anyhow()?;
pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> {
let client = pool.get().await?;
sqlx::query!(
"CREATE TABLE IF NOT EXISTS candles (
client
.execute(
"CREATE TABLE IF NOT EXISTS candles (
id serial,
market_name text,
start_time timestamptz,
end_time timestamptz,
resolution text,
open numeric,
close numeric,
high numeric,
low numeric,
volume numeric,
open double precision,
close double precision,
high double precision,
low double precision,
volume double precision,
complete bool
)",
)
.execute(&mut tx)
.await?;
&[],
)
.await?;
sqlx::query!(
"CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market_name, start_time, resolution)"
).execute(&mut tx).await?;
client.execute(
"CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market_name, start_time, resolution)",
&[]
).await?;
sqlx::query!(
client.execute(
"DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'unique_candles') THEN
ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market_name, start_time, resolution);
END IF;
END $$"
)
.execute(&mut tx)
.await?;
END $$", &[]
).await?;
tx.commit().await.map_err_anyhow()
Ok(())
}
pub async fn create_fills_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
let mut tx = pool.begin().await.map_err_anyhow()?;
pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> {
let client = pool.get().await?;
sqlx::query!(
"CREATE TABLE IF NOT EXISTS fills (
client
.execute(
"CREATE TABLE IF NOT EXISTS fills (
id numeric PRIMARY KEY,
time timestamptz not null,
market text not null,
@ -85,23 +132,28 @@ pub async fn create_fills_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
open_orders_owner text not null,
bid bool not null,
maker bool not null,
native_qty_paid numeric not null,
native_qty_received numeric not null,
native_fee_or_rebate numeric not null,
native_qty_paid double precision not null,
native_qty_received double precision not null,
native_fee_or_rebate double precision not null,
fee_tier text not null,
order_id text not null
)",
)
.execute(&mut tx)
.await?;
sqlx::query!("CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)")
.execute(&mut tx)
&[],
)
.await?;
sqlx::query!("CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)")
.execute(&mut tx)
client
.execute(
"CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)",
&[],
)
.await?;
tx.commit().await.map_err_anyhow()
client
.execute(
"CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)",
&[],
)
.await?;
Ok(())
}

View File

@ -1,5 +1,4 @@
use chrono::Utc;
use sqlx::{Connection, Pool, Postgres};
use deadpool_postgres::Pool;
use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher},
@ -7,26 +6,24 @@ use std::{
use tokio::sync::mpsc::{error::TryRecvError, Receiver};
use crate::{
structs::{candle::Candle, openbook::OpenBookFillEventLog},
utils::AnyhowWrap,
structs::{candle::Candle, openbook::OpenBookFillEvent},
utils::{to_timestampz, AnyhowWrap},
};
pub async fn persist_fill_events(
pool: &Pool<Postgres>,
fill_receiver: &mut Receiver<OpenBookFillEventLog>,
pool: &Pool,
fill_receiver: &mut Receiver<OpenBookFillEvent>,
) -> anyhow::Result<()> {
let mut conn = pool.acquire().await.unwrap();
let client = pool.get().await?;
loop {
let mut write_batch = HashMap::new();
while write_batch.len() < 10 {
match fill_receiver.try_recv() {
Ok(event) => {
if !write_batch.contains_key(&event) {
write_batch.insert(event, 0);
}
write_batch.entry(event).or_insert(0);
}
Err(TryRecvError::Empty) => {
if write_batch.len() > 0 {
if !write_batch.is_empty() {
break;
} else {
continue;
@ -38,65 +35,64 @@ pub async fn persist_fill_events(
};
}
if write_batch.len() > 0 {
if !write_batch.is_empty() {
// print!("writing: {:?} events to DB\n", write_batch.len());
match conn.ping().await {
Ok(_) => {
let upsert_statement = build_fills_upsert_statement(write_batch);
sqlx::query(&upsert_statement)
.execute(&mut conn)
.await
.map_err_anyhow()
.unwrap();
}
Err(_) => {
println!("Fills ping failed");
break;
}
}
// match conn.ping().await {
// Ok(_) => {
let upsert_statement = build_fills_upsert_statement(write_batch);
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()
.unwrap();
// }
// Err(_) => {
// println!("Fills ping failed");
// break;
// }
// }
}
}
Ok(())
}
pub async fn persist_candles(
pool: Pool<Postgres>,
pool: Pool,
candles_receiver: &mut Receiver<Vec<Candle>>,
) -> anyhow::Result<()> {
let mut conn = pool.acquire().await.unwrap();
let client = pool.get().await.unwrap();
loop {
match conn.ping().await {
Ok(_) => {
match candles_receiver.try_recv() {
Ok(candles) => {
if candles.len() == 0 {
continue;
}
// print!("writing: {:?} candles to DB\n", candles.len());
let upsert_statement = build_candes_upsert_statement(candles);
sqlx::query(&upsert_statement)
.execute(&mut conn)
.await
.map_err_anyhow()
.unwrap();
}
Err(TryRecvError::Empty) => continue,
Err(TryRecvError::Disconnected) => {
panic!("Candles sender must stay alive")
}
};
// match client.ping().await {
// Ok(_) => {
match candles_receiver.try_recv() {
Ok(candles) => {
if candles.is_empty() {
continue;
}
// print!("writing: {:?} candles to DB\n", candles.len());
let upsert_statement = build_candles_upsert_statement(candles);
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()
.unwrap();
}
Err(_) => {
println!("Candle ping failed");
break;
Err(TryRecvError::Empty) => continue,
Err(TryRecvError::Disconnected) => {
panic!("Candles sender must stay alive")
}
};
// }
// Err(_) => {
// println!("Candle ping failed");
// break;
// }
// };
}
Ok(())
}
fn build_fills_upsert_statement(events: HashMap<OpenBookFillEventLog, u8>) -> String {
#[allow(deprecated)]
fn build_fills_upsert_statement(events: HashMap<OpenBookFillEvent, u8>) -> String {
let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES");
for (idx, event) in events.keys().enumerate() {
let mut hasher = DefaultHasher::new();
@ -104,7 +100,7 @@ fn build_fills_upsert_statement(events: HashMap<OpenBookFillEventLog, u8>) -> St
let val_str = format!(
"({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {})",
hasher.finish(),
Utc::now().to_rfc3339(),
to_timestampz(event.block_time as u64).to_rfc3339(),
event.market,
event.open_orders,
event.open_orders_owner,
@ -130,7 +126,7 @@ fn build_fills_upsert_statement(events: HashMap<OpenBookFillEventLog, u8>) -> St
stmt
}
fn build_candes_upsert_statement(candles: Vec<Candle>) -> String {
pub fn build_candles_upsert_statement(candles: Vec<Candle>) -> String {
let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES");
for (idx, candle) in candles.iter().enumerate() {
let val_str = format!(
@ -176,7 +172,7 @@ mod tests {
#[test]
fn test_event_hashing() {
let event_1 = OpenBookFillEventLog {
let event_1 = OpenBookFillEvent {
market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(),
open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(),
open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw")
@ -191,9 +187,10 @@ mod tests {
fee_tier: 0,
client_order_id: None,
referrer_rebate: Some(841),
block_time: 0,
};
let event_2 = OpenBookFillEventLog {
let event_2 = OpenBookFillEvent {
market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(),
open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(),
open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw")
@ -208,6 +205,7 @@ mod tests {
fee_tier: 0,
client_order_id: None,
referrer_rebate: Some(841),
block_time: 0,
};
let mut h1 = DefaultHasher::new();

View File

@ -1,5 +1,5 @@
use openbook_candles::{
database::fetch::fetch_tradingview_candles,
database::fetch::fetch_candles_from,
structs::{markets::valid_market, resolution::Resolution, tradingview::TvResponse},
utils::{to_timestampz, WebContext},
};
@ -34,9 +34,8 @@ pub async fn get_candles(
let from = to_timestampz(info.from);
let to = to_timestampz(info.to);
let mut conn = context.pool.acquire().await.unwrap();
let candles =
match fetch_tradingview_candles(&mut conn, &info.market_name, resolution, from, to).await {
match fetch_candles_from(&context.pool, &info.market_name, resolution, from, to).await {
Ok(c) => c,
Err(_) => return Err(ServerError::DbQueryError),
};

View File

@ -10,7 +10,7 @@ use openbook_candles::{
CoinGecko24HourVolume, CoinGeckoOrderBook, CoinGeckoPair, CoinGeckoTicker,
PgCoinGecko24HighLow,
},
slab::{get_best_bids_and_asks, get_orderbooks_with_depth},
slab::get_orderbooks_with_depth,
},
utils::WebContext,
};
@ -49,17 +49,14 @@ pub async fn pairs(context: web::Data<WebContext>) -> Result<HttpResponse, Serve
#[get("/tickers")]
pub async fn tickers(context: web::Data<WebContext>) -> Result<HttpResponse, ServerError> {
let client = RpcClient::new(context.rpc_url.clone());
// let client = RpcClient::new(context.rpc_url.clone());
let markets = &context.markets;
let mut c1 = context.pool.acquire().await.unwrap();
let mut c2 = context.pool.acquire().await.unwrap();
// let bba_fut = get_best_bids_and_asks(client, markets);
let volume_fut = fetch_coingecko_24h_volume(&mut c1);
let high_low_fut = fetch_coingecko_24h_high_low(&mut c2);
let volume_fut = fetch_coingecko_24h_volume(&context.pool);
let high_low_fut = fetch_coingecko_24h_high_low(&context.pool);
let (volume_query, high_low_quey) =
join!(volume_fut, high_low_fut,);
let (volume_query, high_low_quey) = join!(volume_fut, high_low_fut,);
let raw_volumes = match volume_query {
Ok(c) => c,
@ -74,7 +71,7 @@ pub async fn tickers(context: web::Data<WebContext>) -> Result<HttpResponse, Ser
let default_volume = CoinGecko24HourVolume::default();
let volumes: Vec<CoinGecko24HourVolume> = raw_volumes
.into_iter()
.map(|v| v.convert_to_readable(&markets))
.map(|v| v.convert_to_readable(markets))
.collect();
let tickers = markets
.iter()
@ -105,7 +102,7 @@ pub async fn tickers(context: web::Data<WebContext>) -> Result<HttpResponse, Ser
Ok(HttpResponse::Ok().json(tickers))
}
#[get("/orderbook")]
#[get("/orderbook")] // TODO: implement an optional geyser version
pub async fn orderbook(
info: web::Query<OrderBookParams>,
context: web::Data<WebContext>,

View File

@ -4,7 +4,7 @@ use actix_web::{
App, HttpServer,
};
use candles::get_candles;
use dotenv;
use markets::get_markets;
use openbook_candles::{
database::initialize::connect_to_database,
@ -29,21 +29,15 @@ async fn main() -> std::io::Result<()> {
assert!(args.len() == 2);
let path_to_markets_json = &args[1];
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
let database_url: String = dotenv::var("DATABASE_URL").unwrap();
let max_pg_pool_connections: u32 = dotenv::var("MAX_PG_POOL_CONNS_SERVER")
.unwrap()
.parse::<u32>()
.unwrap();
let bind_addr: String = dotenv::var("SERVER_BIND_ADDR").expect("reading bind addr from env");
let config = Config {
rpc_url: rpc_url.clone(),
database_url: database_url.clone(),
max_pg_pool_connections,
};
let markets = load_markets(path_to_markets_json);
let market_infos = fetch_market_infos(&config, markets).await.unwrap();
let pool = connect_to_database(&config).await.unwrap();
let pool = connect_to_database().await.unwrap();
let context = Data::new(WebContext {
rpc_url,
@ -65,7 +59,7 @@ async fn main() -> std::io::Result<()> {
.service(coingecko::service()),
)
})
.bind(("127.0.0.1", 8080))?
.bind(&bind_addr)?
.run()
.await
}

View File

@ -31,14 +31,17 @@ pub async fn get_top_traders_by_base_volume(
let from = to_timestampz(info.from);
let to = to_timestampz(info.to);
let mut conn = context.pool.acquire().await.unwrap();
let raw_traders =
match fetch_top_traders_by_base_volume_from(&mut conn, &selected_market.address, from, to)
.await
{
Ok(c) => c,
Err(_) => return Err(ServerError::DbQueryError),
};
let raw_traders = match fetch_top_traders_by_base_volume_from(
&context.pool,
&selected_market.address,
from,
to,
)
.await
{
Ok(c) => c,
Err(_) => return Err(ServerError::DbQueryError),
};
let traders = raw_traders
.into_iter()
@ -48,7 +51,7 @@ pub async fn get_top_traders_by_base_volume(
let response = TraderResponse {
start_time: info.from,
end_time: info.to,
traders: traders,
traders,
volume_type: VolumeType::Base.to_string(),
};
Ok(HttpResponse::Ok().json(response))
@ -67,14 +70,17 @@ pub async fn get_top_traders_by_quote_volume(
let from = to_timestampz(info.from);
let to = to_timestampz(info.to);
let mut conn = context.pool.acquire().await.unwrap();
let raw_traders =
match fetch_top_traders_by_quote_volume_from(&mut conn, &selected_market.address, from, to)
.await
{
Ok(c) => c,
Err(_) => return Err(ServerError::DbQueryError),
};
let raw_traders = match fetch_top_traders_by_quote_volume_from(
&context.pool,
&selected_market.address,
from,
to,
)
.await
{
Ok(c) => c,
Err(_) => return Err(ServerError::DbQueryError),
};
let traders = raw_traders
.into_iter()
@ -84,7 +90,7 @@ pub async fn get_top_traders_by_quote_volume(
let response = TraderResponse {
start_time: info.from,
end_time: info.to,
traders: traders,
traders,
volume_type: VolumeType::Quote.to_string(),
};
Ok(HttpResponse::Ok().json(response))

View File

@ -1,6 +1,5 @@
use chrono::{DateTime, NaiveDateTime, Utc};
use num_traits::Zero;
use sqlx::types::Decimal;
use tokio_postgres::Row;
use super::resolution::Resolution;
@ -10,11 +9,11 @@ pub struct Candle {
pub start_time: DateTime<Utc>,
pub end_time: DateTime<Utc>,
pub resolution: String,
pub open: Decimal,
pub close: Decimal,
pub high: Decimal,
pub low: Decimal,
pub volume: Decimal,
pub open: f64,
pub close: f64,
pub high: f64,
pub low: f64,
pub volume: f64,
pub complete: bool,
}
@ -25,12 +24,27 @@ impl Candle {
start_time: DateTime::from_utc(NaiveDateTime::MIN, Utc),
end_time: DateTime::from_utc(NaiveDateTime::MIN, Utc),
resolution: resolution.to_string(),
open: Decimal::zero(),
close: Decimal::zero(),
high: Decimal::zero(),
low: Decimal::zero(),
volume: Decimal::zero(),
open: 0.0,
close: 0.0,
high: 0.0,
low: 0.0,
volume: 0.0,
complete: false,
}
}
pub fn from_row(row: Row) -> Self {
Candle {
market_name: row.get(0),
start_time: row.get(1),
end_time: row.get(2),
resolution: row.get(3),
open: row.get(4),
close: row.get(5),
high: row.get(6),
low: row.get(7),
volume: row.get(8),
complete: row.get(9),
}
}
}

View File

@ -1,5 +1,5 @@
use serde::Serialize;
use sqlx::types::Decimal;
use tokio_postgres::Row;
use super::{markets::MarketInfo, openbook::token_factor};
@ -35,8 +35,8 @@ pub struct CoinGeckoTicker {
pub struct PgCoinGecko24HourVolume {
pub address: String,
pub raw_base_size: Decimal,
pub raw_quote_size: Decimal,
pub raw_base_size: f64,
pub raw_quote_size: f64,
}
impl PgCoinGecko24HourVolume {
pub fn convert_to_readable(&self, markets: &Vec<MarketInfo>) -> CoinGecko24HourVolume {
@ -49,19 +49,38 @@ impl PgCoinGecko24HourVolume {
target_volume,
}
}
pub fn from_row(row: Row) -> Self {
PgCoinGecko24HourVolume {
address: row.get(0),
raw_base_size: row.get(1),
raw_quote_size: row.get(2),
}
}
}
#[derive(Debug, Default)]
pub struct CoinGecko24HourVolume {
pub market_name: String,
pub base_volume: Decimal,
pub target_volume: Decimal,
pub base_volume: f64,
pub target_volume: f64,
}
#[derive(Debug, Default)]
pub struct PgCoinGecko24HighLow {
pub market_name: String,
pub high: Decimal,
pub low: Decimal,
pub close: Decimal,
pub high: f64,
pub low: f64,
pub close: f64,
}
impl PgCoinGecko24HighLow {
pub fn from_row(row: Row) -> Self {
PgCoinGecko24HighLow {
market_name: row.get(0),
high: row.get(1),
low: row.get(2),
close: row.get(3),
}
}
}

View File

@ -111,8 +111,8 @@ pub async fn fetch_market_infos(
.value;
for i in 0..mint_results.len() {
let mut mint_account = mint_results[i].as_ref().unwrap().clone();
let mut mint_bytes: &[u8] = &mut mint_account.data[..];
let mint = Mint::unpack_from_slice(&mut mint_bytes).unwrap();
let mint_bytes: &[u8] = &mut mint_account.data[..];
let mint = Mint::unpack_from_slice(mint_bytes).unwrap();
mint_key_map.insert(mint_keys[i], mint.decimals);
}

View File

@ -1,12 +1,12 @@
use anchor_lang::{event, AnchorDeserialize, AnchorSerialize};
use chrono::{DateTime, Utc};
use num_traits::FromPrimitive;
use num_traits::Pow;
use solana_sdk::pubkey::Pubkey;
use sqlx::types::Decimal;
use tokio_postgres::Row;
#[event]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OpenBookFillEventLog {
pub struct OpenBookFillEventRaw {
pub market: Pubkey,
pub open_orders: Pubkey,
pub open_orders_owner: Pubkey,
@ -21,15 +21,66 @@ pub struct OpenBookFillEventLog {
pub client_order_id: Option<u64>,
pub referrer_rebate: Option<u64>,
}
impl OpenBookFillEventRaw {
pub fn with_time(self, block_time: i64) -> OpenBookFillEvent {
OpenBookFillEvent {
market: self.market,
open_orders: self.open_orders,
open_orders_owner: self.open_orders_owner,
bid: self.bid,
maker: self.maker,
native_qty_paid: self.native_qty_paid,
native_qty_received: self.native_qty_received,
native_fee_or_rebate: self.native_fee_or_rebate,
order_id: self.order_id,
owner_slot: self.owner_slot,
fee_tier: self.fee_tier,
client_order_id: self.client_order_id,
referrer_rebate: self.referrer_rebate,
block_time,
}
}
}
#[event]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OpenBookFillEvent {
pub market: Pubkey,
pub open_orders: Pubkey,
pub open_orders_owner: Pubkey,
pub bid: bool,
pub maker: bool,
pub native_qty_paid: u64,
pub native_qty_received: u64,
pub native_fee_or_rebate: u64,
pub order_id: u128,
pub owner_slot: u8,
pub fee_tier: u8,
pub client_order_id: Option<u64>,
pub referrer_rebate: Option<u64>,
pub block_time: i64,
}
#[derive(Copy, Clone, Debug, PartialEq)]
pub struct PgOpenBookFill {
pub time: DateTime<Utc>,
pub bid: bool,
pub maker: bool,
pub native_qty_paid: Decimal,
pub native_qty_received: Decimal,
pub native_fee_or_rebate: Decimal,
pub native_qty_paid: f64,
pub native_qty_received: f64,
pub native_fee_or_rebate: f64,
}
impl PgOpenBookFill {
pub fn from_row(row: Row) -> Self {
PgOpenBookFill {
time: row.get(0),
bid: row.get(1),
maker: row.get(2),
native_qty_paid: row.get(3),
native_qty_received: row.get(4),
native_fee_or_rebate: row.get(5),
}
}
}
#[derive(Copy, Clone, AnchorDeserialize)]
@ -91,7 +142,7 @@ pub fn calculate_fill_price_and_size(
fill: PgOpenBookFill,
base_decimals: u8,
quote_decimals: u8,
) -> (Decimal, Decimal) {
) -> (f64, f64) {
if fill.bid {
let price_before_fees = if fill.maker {
fill.native_qty_paid + fill.native_fee_or_rebate
@ -115,6 +166,6 @@ pub fn calculate_fill_price_and_size(
}
}
pub fn token_factor(decimals: u8) -> Decimal {
Decimal::from_u64(10u64.pow(decimals as u32)).unwrap()
pub fn token_factor(decimals: u8) -> f64 {
10f64.pow(decimals as f64)
}

View File

@ -5,7 +5,6 @@ use futures::join;
use num_enum::{IntoPrimitive, TryFromPrimitive};
use num_traits::ToPrimitive;
use solana_client::nonblocking::rpc_client::RpcClient;
use sqlx::types::Decimal;
use std::{
convert::TryFrom,
mem::{align_of, size_of},
@ -102,19 +101,19 @@ impl LeafNode {
NonZeroU64::new((self.key >> 64) as u64).unwrap()
}
pub fn readable_price(&self, market: &MarketInfo) -> Decimal {
let price_lots = Decimal::from((self.key >> 64) as u64);
pub fn readable_price(&self, market: &MarketInfo) -> f64 {
let price_lots = (self.key >> 64) as f64;
let base_multiplier = token_factor(market.base_decimals);
let quote_multiplier = token_factor(market.quote_decimals);
let base_lot_size = Decimal::from(market.base_lot_size);
let quote_lot_size = Decimal::from(market.quote_lot_size);
let base_lot_size = market.base_lot_size as f64;
let quote_lot_size = market.quote_lot_size as f64;
(price_lots * quote_lot_size * base_multiplier) / (base_lot_size * quote_multiplier)
}
pub fn readable_quantity(&self, market: &MarketInfo) -> Decimal {
let base_lot_size = Decimal::from(market.base_lot_size);
pub fn readable_quantity(&self, market: &MarketInfo) -> f64 {
let base_lot_size = market.base_lot_size as f64;
let base_multiplier = token_factor(market.base_decimals);
Decimal::from(self.quantity) * base_lot_size / base_multiplier
self.quantity as f64 * base_lot_size / base_multiplier
}
#[inline]
@ -406,7 +405,7 @@ impl Slab {
}
}
pub fn get_best(&self, market: &MarketInfo, bid: bool) -> Decimal {
pub fn get_best(&self, market: &MarketInfo, bid: bool) -> f64 {
let min = if bid {
self.find_max()
} else {
@ -419,7 +418,7 @@ impl Slab {
pub async fn get_best_bids_and_asks(
client: RpcClient,
markets: &Vec<MarketInfo>,
) -> (Vec<Decimal>, Vec<Decimal>) {
) -> (Vec<f64>, Vec<f64>) {
let bid_keys = markets
.iter()
.map(|m| Pubkey::from_str(&m.bids_key).unwrap())

View File

@ -2,15 +2,24 @@ use std::fmt;
use num_traits::ToPrimitive;
use serde::Serialize;
use sqlx::types::Decimal;
use tokio_postgres::Row;
use super::openbook::token_factor;
#[derive(Clone, Debug, PartialEq)]
pub struct PgTrader {
pub open_orders_owner: String,
pub raw_ask_size: Decimal,
pub raw_bid_size: Decimal,
pub raw_ask_size: f64,
pub raw_bid_size: f64,
}
impl PgTrader {
pub fn from_row(row: Row) -> Self {
PgTrader {
open_orders_owner: row.get(0),
raw_ask_size: row.get(1),
raw_bid_size: row.get(2),
}
}
}
#[derive(Clone, Debug, PartialEq, Serialize)]

View File

@ -1,6 +1,6 @@
use chrono::{NaiveDateTime, Utc};
use deadpool_postgres::Pool;
use serde_derive::Deserialize;
use sqlx::{Pool, Postgres};
use crate::structs::markets::MarketInfo;
@ -16,20 +16,53 @@ impl<T, E: std::fmt::Debug> AnyhowWrap for Result<T, E> {
}
}
#[derive(Debug, serde::Deserialize)]
pub struct PgConfig {
pub pg: deadpool_postgres::Config,
pub pg_max_pool_connections: usize,
pub pg_use_ssl: bool,
pub pg_ca_cert_path: Option<String>,
pub pg_client_key_path: Option<String>,
}
impl PgConfig {
pub fn from_env() -> Result<Self, config::ConfigError> {
config::Config::builder()
.add_source(config::Environment::default().separator("_"))
.add_source(config::Environment::default())
.build()?
.try_deserialize()
}
}
#[derive(Clone, Debug, Deserialize)]
pub struct Config {
pub rpc_url: String,
pub database_url: String,
pub max_pg_pool_connections: u32,
}
pub struct WebContext {
pub rpc_url: String,
pub markets: Vec<MarketInfo>,
pub pool: Pool<Postgres>,
pub pool: Pool,
}
#[allow(deprecated)]
pub fn to_timestampz(seconds: u64) -> chrono::DateTime<Utc> {
chrono::DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(seconds as i64, 0), Utc)
}
pub(crate) fn f64_max(a: f64, b: f64) -> f64 {
if a >= b {
a
} else {
b
}
}
pub(crate) fn f64_min(a: f64, b: f64) -> f64 {
if a < b {
a
} else {
b
}
}

View File

@ -1,6 +1,6 @@
use chrono::{DateTime, Duration, DurationRound, Utc};
use sqlx::{pool::PoolConnection, Postgres};
use std::cmp::{max, min};
use deadpool_postgres::Pool;
use std::cmp::max;
use crate::{
database::fetch::{fetch_candles_from, fetch_earliest_candles, fetch_latest_finished_candle},
@ -8,28 +8,29 @@ use crate::{
candle::Candle,
resolution::{day, Resolution},
},
utils::{f64_max, f64_min},
};
pub async fn batch_higher_order_candles(
conn: &mut PoolConnection<Postgres>,
pool: &Pool,
market_name: &str,
resolution: Resolution,
) -> anyhow::Result<Vec<Candle>> {
let latest_candle = fetch_latest_finished_candle(conn, market_name, resolution).await?;
let latest_candle = fetch_latest_finished_candle(pool, market_name, resolution).await?;
match latest_candle {
Some(candle) => {
let start_time = candle.end_time;
let end_time = start_time + day();
let mut constituent_candles = fetch_candles_from(
conn,
pool,
market_name,
resolution.get_constituent_resolution(),
start_time,
end_time,
)
.await?;
if constituent_candles.len() == 0 {
if constituent_candles.is_empty() {
return Ok(Vec::new());
}
let combined_candles = combine_into_higher_order_candles(
@ -42,9 +43,9 @@ pub async fn batch_higher_order_candles(
}
None => {
let mut constituent_candles =
fetch_earliest_candles(conn, market_name, resolution.get_constituent_resolution())
fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution())
.await?;
if constituent_candles.len() == 0 {
if constituent_candles.is_empty() {
// println!(
// "Batching {}, but no candles found for: {:?}, {}",
// resolution,
@ -55,7 +56,7 @@ pub async fn batch_higher_order_candles(
}
let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
if constituent_candles.len() == 0 {
if constituent_candles.is_empty() {
return Ok(Vec::new());
}
@ -99,7 +100,7 @@ fn combine_into_higher_order_candles(
let mut combined_candles = vec![empty_candle; num_candles];
let mut con_iter = constituent_candles.iter_mut().peekable();
let mut start_time = st.clone();
let mut start_time = st;
let mut end_time = start_time + duration;
let mut last_candle = seed_candle;
@ -112,8 +113,8 @@ fn combine_into_higher_order_candles(
while matches!(con_iter.peek(), Some(c) if c.end_time <= end_time) {
let unit_candle = con_iter.next().unwrap();
combined_candles[i].high = max(combined_candles[i].high, unit_candle.high);
combined_candles[i].low = min(combined_candles[i].low, unit_candle.low);
combined_candles[i].high = f64_max(combined_candles[i].high, unit_candle.high);
combined_candles[i].low = f64_min(combined_candles[i].low, unit_candle.low);
combined_candles[i].close = unit_candle.close;
combined_candles[i].volume += unit_candle.volume;
combined_candles[i].complete = unit_candle.complete;
@ -124,7 +125,7 @@ fn combine_into_higher_order_candles(
combined_candles[i].end_time = end_time;
start_time = end_time;
end_time = end_time + duration;
end_time += duration;
last_candle = combined_candles[i].clone();
}
@ -143,3 +144,29 @@ fn trim_candles(mut c: Vec<Candle>, start_time: DateTime<Utc>) -> Vec<Candle> {
}
c
}
pub async fn backfill_batch_higher_order_candles(
pool: &Pool,
market_name: &str,
resolution: Resolution,
) -> anyhow::Result<Vec<Candle>> {
let mut constituent_candles =
fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution()).await?;
if constituent_candles.is_empty() {
return Ok(vec![]);
}
let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
let seed_candle = constituent_candles[0].clone();
let combined_candles = combine_into_higher_order_candles(
&mut constituent_candles,
resolution,
start_time,
seed_candle,
);
Ok(trim_candles(
combined_candles,
constituent_candles[0].start_time,
))
}

View File

@ -1,7 +1,7 @@
use std::cmp::{max, min};
use std::cmp::min;
use chrono::{DateTime, Duration, DurationRound, Utc};
use sqlx::{pool::PoolConnection, types::Decimal, Postgres};
use deadpool_postgres::Pool;
use crate::{
database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
@ -11,15 +11,13 @@ use crate::{
openbook::{calculate_fill_price_and_size, PgOpenBookFill},
resolution::{day, Resolution},
},
utils::{f64_max, f64_min},
};
pub async fn batch_1m_candles(
conn: &mut PoolConnection<Postgres>,
market: &MarketInfo,
) -> anyhow::Result<Vec<Candle>> {
pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Result<Vec<Candle>> {
let market_name = &market.name;
let market_address = &market.address;
let latest_candle = fetch_latest_finished_candle(conn, market_name, Resolution::R1m).await?;
let latest_candle = fetch_latest_finished_candle(pool, market_name, Resolution::R1m).await?;
match latest_candle {
Some(candle) => {
@ -28,7 +26,7 @@ pub async fn batch_1m_candles(
start_time + day(),
Utc::now().duration_trunc(Duration::minutes(1))?,
);
let mut fills = fetch_fills_from(conn, market_address, start_time, end_time).await?;
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
let candles = combine_fills_into_1m_candles(
&mut fills,
market,
@ -39,7 +37,7 @@ pub async fn batch_1m_candles(
Ok(candles)
}
None => {
let earliest_fill = fetch_earliest_fill(conn, market_address).await?;
let earliest_fill = fetch_earliest_fill(pool, market_address).await?;
if earliest_fill.is_none() {
println!("No fills found for: {:?}", market_name);
@ -54,8 +52,8 @@ pub async fn batch_1m_candles(
start_time + day(),
Utc::now().duration_trunc(Duration::minutes(1))?,
);
let mut fills = fetch_fills_from(conn, market_address, start_time, end_time).await?;
if fills.len() > 0 {
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
if !fills.is_empty() {
let candles =
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
Ok(candles)
@ -71,7 +69,7 @@ fn combine_fills_into_1m_candles(
market: &MarketInfo,
st: DateTime<Utc>,
et: DateTime<Utc>,
maybe_last_price: Option<Decimal>,
maybe_last_price: Option<f64>,
) -> Vec<Candle> {
let empty_candle = Candle::create_empty_candle(market.name.clone(), Resolution::R1m);
@ -79,13 +77,13 @@ fn combine_fills_into_1m_candles(
let mut candles = vec![empty_candle; minutes as usize];
let mut fills_iter = fills.iter_mut().peekable();
let mut start_time = st.clone();
let mut start_time = st;
let mut end_time = start_time + Duration::minutes(1);
let mut last_price = match maybe_last_price {
Some(p) => p,
None => {
let first = fills_iter.peek().clone().unwrap();
let first = fills_iter.peek().unwrap();
let (price, _) =
calculate_fill_price_and_size(**first, market.base_decimals, market.quote_decimals);
price
@ -105,8 +103,8 @@ fn combine_fills_into_1m_candles(
calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals);
candles[i].close = price;
candles[i].low = min(price, candles[i].low);
candles[i].high = max(price, candles[i].high);
candles[i].low = f64_min(price, candles[i].low);
candles[i].high = f64_max(price, candles[i].high);
candles[i].volume += volume;
last_price = price;
@ -117,8 +115,43 @@ fn combine_fills_into_1m_candles(
candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time);
start_time = end_time;
end_time = end_time + Duration::minutes(1);
end_time += Duration::minutes(1);
}
candles
}
/// Goes from the earliest fill to the most recent. Will mark candles as complete if there are missing gaps of fills between the start and end.
pub async fn backfill_batch_1m_candles(
pool: &Pool,
market: &MarketInfo,
) -> anyhow::Result<Vec<Candle>> {
let market_name = &market.name;
let market_address = &market.address;
let mut candles = vec![];
let earliest_fill = fetch_earliest_fill(pool, &market.address).await?;
if earliest_fill.is_none() {
println!("No fills found for: {:?}", &market_name);
return Ok(candles);
}
let mut start_time = earliest_fill
.unwrap()
.time
.duration_trunc(Duration::minutes(1))?;
while start_time < Utc::now() {
let end_time = min(
start_time + day(),
Utc::now().duration_trunc(Duration::minutes(1))?,
);
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
if !fills.is_empty() {
let mut minute_candles =
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
candles.append(&mut minute_candles);
}
start_time += day()
}
Ok(candles)
}

View File

@ -2,7 +2,7 @@ pub mod higher_order_candles;
pub mod minute_candles;
use chrono::Duration;
use sqlx::{pool::PoolConnection, Pool, Postgres};
use deadpool_postgres::Pool;
use strum::IntoEnumIterator;
use tokio::{sync::mpsc::Sender, time::sleep};
@ -14,17 +14,17 @@ use crate::{
use self::higher_order_candles::batch_higher_order_candles;
pub async fn batch_for_market(
pool: Pool<Postgres>,
pool: &Pool,
candles_sender: &Sender<Vec<Candle>>,
market: &MarketInfo,
) -> anyhow::Result<()> {
loop {
let sender = candles_sender.clone();
let market_clone = market.clone();
let mut conn = pool.acquire().await?;
// let client = pool.get().await?;
loop {
sleep(Duration::milliseconds(2000).to_std()?).await;
match batch_inner(&mut conn, &sender, &market_clone).await {
match batch_inner(pool, &sender, &market_clone).await {
Ok(_) => {}
Err(e) => {
println!(
@ -41,26 +41,26 @@ pub async fn batch_for_market(
}
async fn batch_inner(
conn: &mut PoolConnection<Postgres>,
pool: &Pool,
candles_sender: &Sender<Vec<Candle>>,
market: &MarketInfo,
) -> anyhow::Result<()> {
let market_name = &market.name.clone();
let candles = batch_1m_candles(conn, market).await?;
let candles = batch_1m_candles(pool, market).await?;
send_candles(candles, candles_sender).await;
for resolution in Resolution::iter() {
if resolution == Resolution::R1m {
continue;
}
let candles = batch_higher_order_candles(conn, market_name, resolution).await?;
let candles = batch_higher_order_candles(pool, market_name, resolution).await?;
send_candles(candles, candles_sender).await;
}
Ok(())
}
async fn send_candles(candles: Vec<Candle>, candles_sender: &Sender<Vec<Candle>>) {
if candles.len() > 0 {
if !candles.is_empty() {
if let Err(_) = candles_sender.send(candles).await {
panic!("candles receiver dropped");
}

View File

@ -1,20 +1,22 @@
use dotenv;
use openbook_candles::database::{
initialize::{connect_to_database, setup_database},
insert::{persist_candles, persist_fill_events},
};
use openbook_candles::structs::candle::Candle;
use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
use openbook_candles::structs::openbook::OpenBookFillEventLog;
use openbook_candles::structs::openbook::OpenBookFillEvent;
use openbook_candles::utils::Config;
use openbook_candles::worker::candle_batching::batch_for_market;
use openbook_candles::worker::trade_fetching::scrape::scrape;
use openbook_candles::{
database::{
initialize::{connect_to_database, setup_database},
insert::{persist_candles, persist_fill_events},
},
worker::candle_batching::batch_for_market,
};
use solana_sdk::pubkey::Pubkey;
use std::env;
use std::{collections::HashMap, str::FromStr};
use tokio::sync::mpsc;
#[tokio::main]
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
@ -22,19 +24,12 @@ async fn main() -> anyhow::Result<()> {
assert!(args.len() == 2);
let path_to_markets_json = &args[1];
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
let database_url: String = dotenv::var("DATABASE_URL").unwrap();
let max_pg_pool_connections: u32 = dotenv::var("MAX_PG_POOL_CONNS_WORKER")
.unwrap()
.parse::<u32>()
.unwrap();
let config = Config {
rpc_url: rpc_url.clone(),
database_url,
max_pg_pool_connections,
};
let markets = load_markets(&path_to_markets_json);
let markets = load_markets(path_to_markets_json);
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
let mut target_markets = HashMap::new();
for m in market_infos.clone() {
@ -42,14 +37,14 @@ async fn main() -> anyhow::Result<()> {
}
println!("{:?}", target_markets);
let pool = connect_to_database(&config).await?;
let pool = connect_to_database().await?;
setup_database(&pool).await?;
let mut handles = vec![];
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEventLog>(1000);
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(1000);
handles.push(tokio::spawn(async move {
scrape(&config, &fill_sender, &target_markets).await; //TODO: send the vec, it's okay
scrape(&config, &fill_sender, &target_markets).await;
}));
let fills_pool = pool.clone();
@ -67,7 +62,7 @@ async fn main() -> anyhow::Result<()> {
let sender = candle_sender.clone();
let batch_pool = pool.clone();
handles.push(tokio::spawn(async move {
batch_for_market(batch_pool, &sender, &market)
batch_for_market(&batch_pool, &sender, &market)
.await
.unwrap();
println!("SOMETHING WENT WRONG");

View File

@ -5,22 +5,26 @@ use solana_transaction_status::{
};
use std::{collections::HashMap, io::Error};
use crate::structs::openbook::OpenBookFillEventLog;
use crate::structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw};
const PROGRAM_DATA: &str = "Program data: ";
pub fn parse_trades_from_openbook_txns(
txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>,
target_markets: &HashMap<Pubkey, u8>,
) -> Vec<OpenBookFillEventLog> {
let mut fills_vector = Vec::<OpenBookFillEventLog>::new();
) -> Vec<OpenBookFillEvent> {
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
for txn in txns.iter_mut() {
match txn {
Ok(t) => {
if let Some(m) = &t.transaction.meta {
match &m.log_messages {
OptionSerializer::Some(logs) => {
match parse_openbook_fills_from_logs(logs, target_markets) {
match parse_openbook_fills_from_logs(
logs,
target_markets,
t.block_time.unwrap(),
) {
Some(mut events) => fills_vector.append(&mut events),
None => {}
}
@ -39,8 +43,9 @@ pub fn parse_trades_from_openbook_txns(
fn parse_openbook_fills_from_logs(
logs: &Vec<String>,
target_markets: &HashMap<Pubkey, u8>,
) -> Option<Vec<OpenBookFillEventLog>> {
let mut fills_vector = Vec::<OpenBookFillEventLog>::new();
block_time: i64,
) -> Option<Vec<OpenBookFillEvent>> {
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
for l in logs {
match l.strip_prefix(PROGRAM_DATA) {
Some(log) => {
@ -49,13 +54,14 @@ fn parse_openbook_fills_from_logs(
_ => continue,
};
let mut slice: &[u8] = &borsh_bytes[8..];
let event: Result<OpenBookFillEventLog, Error> =
let event: Result<OpenBookFillEventRaw, Error> =
anchor_lang::AnchorDeserialize::deserialize(&mut slice);
match event {
Ok(e) => {
if target_markets.contains_key(&e.market) {
fills_vector.push(e);
let fill_event = e.with_time(block_time);
if target_markets.contains_key(&fill_event.market) {
fills_vector.push(fill_event);
}
}
_ => continue,
@ -65,9 +71,9 @@ fn parse_openbook_fills_from_logs(
}
}
if fills_vector.len() > 0 {
return Some(fills_vector);
if !fills_vector.is_empty() {
Some(fills_vector)
} else {
return None;
None
}
}

View File

@ -8,13 +8,13 @@ use solana_transaction_status::UiTransactionEncoding;
use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
use tokio::sync::mpsc::Sender;
use crate::{structs::openbook::OpenBookFillEventLog, utils::Config};
use crate::{structs::openbook::OpenBookFillEvent, utils::Config};
use super::parsing::parse_trades_from_openbook_txns;
pub async fn scrape(
config: &Config,
fill_sender: &Sender<OpenBookFillEventLog>,
fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>,
) {
let rpc_client =
@ -38,7 +38,7 @@ pub async fn scrape_transactions(
rpc_client: &RpcClient,
before_sig: Option<Signature>,
limit: Option<usize>,
fill_sender: &Sender<OpenBookFillEventLog>,
fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>,
) -> Option<Signature> {
let rpc_config = GetConfirmedSignaturesForAddress2Config {
@ -62,12 +62,12 @@ pub async fn scrape_transactions(
}
};
if sigs.len() == 0 {
if sigs.is_empty() {
println!("No signatures found");
return before_sig;
}
let last = sigs.last().clone().unwrap();
let last = sigs.last().unwrap();
let request_last_sig = Signature::from_str(&last.signature).unwrap();
sigs.retain(|sig| sig.err.is_none());
@ -88,13 +88,13 @@ pub async fn scrape_transactions(
let txn_futs: Vec<_> = signatures
.iter()
.map(|s| rpc_client.get_transaction_with_config(&s, txn_config))
.map(|s| rpc_client.get_transaction_with_config(s, txn_config))
.collect();
let mut txns = join_all(txn_futs).await;
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
if fills.len() > 0 {
if !fills.is_empty() {
for fill in fills.into_iter() {
if let Err(_) = fill_sender.send(fill).await {
panic!("receiver dropped");