Compare commits
15 Commits
83e63c0de2
...
9b69e380c3
Author | SHA1 | Date |
---|---|---|
riordanp | 9b69e380c3 | |
dboures | df1af2c10d | |
dboures | 34117ca95e | |
dboures | 6e16ef8e04 | |
dboures | 7ae8339ebb | |
dboures | 8bbf89677c | |
dboures | 0e821cc3c1 | |
Riordan Panayides | 872e6fcdb2 | |
Riordan Panayides | 5b61f3d949 | |
Riordan Panayides | 697a20d9ff | |
Riordan Panayides | cccc7accd5 | |
dboures | c20429bcc5 | |
dboures | a4bf9a35be | |
dboures | cead78381d | |
Riordan Panayides | 28dc3c5e59 |
14
.env-example
14
.env-example
|
@ -1,5 +1,11 @@
|
||||||
RPC_URL=http://solana-mainnet-api.rpc-node.com
|
RPC_URL=http://solana-mainnet-api.rpc-node.com
|
||||||
DATABASE_URL=
|
SERVER_BIND_ADDR="[::]:8080"
|
||||||
SQLX_OFFLINE=true
|
PG_HOST=127.0.0.1
|
||||||
MAX_PG_POOL_CONNS_WORKER=5
|
PG_PORT=5432
|
||||||
MAX_PG_POOL_CONNS_SERVER=15
|
PG_USER=postgres
|
||||||
|
PG_PASSWORD=
|
||||||
|
PG_DBNAME=postgres
|
||||||
|
PG_MAX_POOL_CONNECTIONS=10
|
||||||
|
PG_USE_SSL=false
|
||||||
|
PG_CA_CERT_PATH=
|
||||||
|
PG_CLIENT_KEY_PATH=
|
File diff suppressed because it is too large
Load Diff
17
Cargo.toml
17
Cargo.toml
|
@ -16,8 +16,12 @@ name = "server"
|
||||||
path = "src/server/main.rs"
|
path = "src/server/main.rs"
|
||||||
|
|
||||||
[[bin]]
|
[[bin]]
|
||||||
name = "backfill"
|
name = "backfill-trades"
|
||||||
path = "src/backfill/main.rs"
|
path = "src/backfill-trades/main.rs"
|
||||||
|
|
||||||
|
[[bin]]
|
||||||
|
name = "backfill-candles"
|
||||||
|
path = "src/backfill-candles/main.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
|
@ -26,7 +30,10 @@ futures = "0.3.27"
|
||||||
|
|
||||||
jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] }
|
jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] }
|
||||||
|
|
||||||
sqlx = { version = "0.6", features = [ "runtime-tokio-native-tls" , "postgres", "chrono", "decimal", "offline" ] }
|
deadpool-postgres = { version = "0.10.5", features = [ "rt_tokio_1", "serde" ] }
|
||||||
|
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
|
||||||
|
postgres-native-tls = "0.5.0"
|
||||||
|
native-tls = "0.2.11"
|
||||||
chrono = "0.4.23"
|
chrono = "0.4.23"
|
||||||
|
|
||||||
solana-client = "=1.14.13"
|
solana-client = "=1.14.13"
|
||||||
|
@ -58,4 +65,6 @@ actix-web = "4"
|
||||||
|
|
||||||
arrayref = "0.3.6"
|
arrayref = "0.3.6"
|
||||||
bytemuck = "1.12.3"
|
bytemuck = "1.12.3"
|
||||||
num_enum = "0.6.1"
|
num_enum = "0.6.1"
|
||||||
|
|
||||||
|
config = "0.13.1"
|
|
@ -19,4 +19,6 @@ RUN apt-get update && apt-get -y install ca-certificates libssl1.1
|
||||||
# We do not need the Rust toolchain to run the binary!
|
# We do not need the Rust toolchain to run the binary!
|
||||||
FROM base_image AS runtime
|
FROM base_image AS runtime
|
||||||
COPY --from=builder /target/release/server /usr/local/bin
|
COPY --from=builder /target/release/server /usr/local/bin
|
||||||
ENTRYPOINT ["/usr/local/bin/server"]
|
COPY --from=builder markets.json .
|
||||||
|
COPY --from=builder ca.cer .
|
||||||
|
COPY --from=builder client.pks .
|
|
@ -2,13 +2,13 @@ FROM lukemathwalker/cargo-chef:latest-rust-1.67.1-slim AS chef
|
||||||
|
|
||||||
FROM chef AS planner
|
FROM chef AS planner
|
||||||
COPY . .
|
COPY . .
|
||||||
RUN cargo chef prepare --recipe-path worker-recipe.json
|
RUN cargo chef prepare --recipe-path recipe.json
|
||||||
|
|
||||||
FROM chef AS builder
|
FROM chef AS builder
|
||||||
COPY --from=planner worker-recipe.json worker-recipe.json
|
COPY --from=planner recipe.json recipe.json
|
||||||
RUN apt-get update && apt-get install -y libudev-dev clang pkg-config libssl-dev build-essential cmake
|
RUN apt-get update && apt-get install -y libudev-dev clang pkg-config libssl-dev build-essential cmake
|
||||||
RUN rustup component add rustfmt && update-ca-certificates
|
RUN rustup component add rustfmt && update-ca-certificates
|
||||||
RUN cargo chef cook --release --recipe-path worker-recipe.json
|
RUN cargo chef cook --release --recipe-path recipe.json
|
||||||
# Build application
|
# Build application
|
||||||
COPY . .
|
COPY . .
|
||||||
RUN cargo build --release --bin worker
|
RUN cargo build --release --bin worker
|
||||||
|
@ -19,4 +19,6 @@ RUN apt-get update && apt-get -y install ca-certificates libssl1.1
|
||||||
# We do not need the Rust toolchain to run the binary!
|
# We do not need the Rust toolchain to run the binary!
|
||||||
FROM base_image AS runtime
|
FROM base_image AS runtime
|
||||||
COPY --from=builder /target/release/worker /usr/local/bin
|
COPY --from=builder /target/release/worker /usr/local/bin
|
||||||
ENTRYPOINT ["/usr/local/bin/worker"]
|
COPY --from=builder markets.json .
|
||||||
|
COPY --from=builder ca.cer .
|
||||||
|
COPY --from=builder client.pks .
|
|
@ -0,0 +1,19 @@
|
||||||
|
app = "openbook-candles-server"
|
||||||
|
kill_signal = "SIGTERM"
|
||||||
|
kill_timeout = 30
|
||||||
|
|
||||||
|
[build]
|
||||||
|
dockerfile = "../Dockerfile.server"
|
||||||
|
|
||||||
|
[experimental]
|
||||||
|
cmd = ["server", "markets.json"]
|
||||||
|
|
||||||
|
[[services]]
|
||||||
|
internal_port = 8080
|
||||||
|
processes = ["app"]
|
||||||
|
protocol = "tcp"
|
||||||
|
|
||||||
|
[services.concurrency]
|
||||||
|
hard_limit = 1024
|
||||||
|
soft_limit = 1024
|
||||||
|
type = "connections"
|
|
@ -0,0 +1,9 @@
|
||||||
|
app = "openbook-candles-worker"
|
||||||
|
kill_signal = "SIGTERM"
|
||||||
|
kill_timeout = 30
|
||||||
|
|
||||||
|
[build]
|
||||||
|
dockerfile = "../Dockerfile.worker"
|
||||||
|
|
||||||
|
[experimental]
|
||||||
|
cmd = ["worker", "markets.json"]
|
12
markets.json
12
markets.json
|
@ -23,10 +23,6 @@
|
||||||
"name": "mSOL/USDC",
|
"name": "mSOL/USDC",
|
||||||
"address": "9Lyhks5bQQxb9EyyX55NtgKQzpM4WK7JCmeaWuQ5MoXD"
|
"address": "9Lyhks5bQQxb9EyyX55NtgKQzpM4WK7JCmeaWuQ5MoXD"
|
||||||
},
|
},
|
||||||
{
|
|
||||||
"name": "stSOL/USDC",
|
|
||||||
"address": "JCKa72xFYGWBEVJZ7AKZ2ofugWPBfrrouQviaGaohi3R"
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
"name": "SOL/USDT",
|
"name": "SOL/USDT",
|
||||||
"address": "2AdaV97p6SfkuMQJdu8DHhBhmJe7oWdvbm52MJfYQmfA"
|
"address": "2AdaV97p6SfkuMQJdu8DHhBhmJe7oWdvbm52MJfYQmfA"
|
||||||
|
@ -35,10 +31,6 @@
|
||||||
"name": "USDT/USDC",
|
"name": "USDT/USDC",
|
||||||
"address": "B2na8Awyd7cpC59iEU43FagJAPLigr3AP3s38KM982bu"
|
"address": "B2na8Awyd7cpC59iEU43FagJAPLigr3AP3s38KM982bu"
|
||||||
},
|
},
|
||||||
{
|
|
||||||
"name": "USDT/USDC",
|
|
||||||
"address": "B2na8Awyd7cpC59iEU43FagJAPLigr3AP3s38KM982bu"
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
"name": "ETH/USDC",
|
"name": "ETH/USDC",
|
||||||
"address": "BbJgE7HZMaDp5NTYvRh5jZSkQPVDTU8ubPFtpogUkEj4"
|
"address": "BbJgE7HZMaDp5NTYvRh5jZSkQPVDTU8ubPFtpogUkEj4"
|
||||||
|
@ -50,9 +42,5 @@
|
||||||
{
|
{
|
||||||
"name": "RAY/USDT",
|
"name": "RAY/USDT",
|
||||||
"address": "GpHbiJJ9VHiuHVXeoet121Utrbm1CSNNzYrBKB8Xz2oz"
|
"address": "GpHbiJJ9VHiuHVXeoet121Utrbm1CSNNzYrBKB8Xz2oz"
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "SLND/USDC",
|
|
||||||
"address": "HTHMfoxePjcXFhrV74pfCUNoWGe374ecFwiDjPGTkzHr"
|
|
||||||
}
|
}
|
||||||
]
|
]
|
|
@ -0,0 +1,69 @@
|
||||||
|
|
||||||
|
|
||||||
|
use deadpool_postgres::Object;
|
||||||
|
|
||||||
|
use openbook_candles::{
|
||||||
|
database::{
|
||||||
|
initialize::connect_to_database,
|
||||||
|
insert::{build_candles_upsert_statement},
|
||||||
|
},
|
||||||
|
structs::{
|
||||||
|
candle::Candle,
|
||||||
|
markets::{fetch_market_infos, load_markets},
|
||||||
|
resolution::Resolution,
|
||||||
|
},
|
||||||
|
utils::{AnyhowWrap, Config},
|
||||||
|
worker::candle_batching::{
|
||||||
|
higher_order_candles::backfill_batch_higher_order_candles,
|
||||||
|
minute_candles::backfill_batch_1m_candles,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
use std::{env};
|
||||||
|
use strum::IntoEnumIterator;
|
||||||
|
|
||||||
|
|
||||||
|
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
dotenv::dotenv().ok();
|
||||||
|
let args: Vec<String> = env::args().collect();
|
||||||
|
assert!(args.len() == 2);
|
||||||
|
|
||||||
|
let path_to_markets_json = &args[1];
|
||||||
|
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
|
||||||
|
|
||||||
|
let config = Config {
|
||||||
|
rpc_url: rpc_url.clone(),
|
||||||
|
};
|
||||||
|
let markets = load_markets(path_to_markets_json);
|
||||||
|
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
|
||||||
|
println!("Backfilling candles for {:?}", markets);
|
||||||
|
|
||||||
|
let pool = connect_to_database().await?;
|
||||||
|
for market in market_infos.into_iter() {
|
||||||
|
let client = pool.get().await?;
|
||||||
|
let minute_candles = backfill_batch_1m_candles(&pool, &market).await?;
|
||||||
|
save_candles(minute_candles, client).await?;
|
||||||
|
|
||||||
|
for resolution in Resolution::iter() {
|
||||||
|
if resolution == Resolution::R1m {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let higher_order_candles =
|
||||||
|
backfill_batch_higher_order_candles(&pool, &market.name, resolution).await?;
|
||||||
|
let client = pool.get().await?;
|
||||||
|
save_candles(higher_order_candles, client).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn save_candles(candles: Vec<Candle>, client: Object) -> anyhow::Result<()> {
|
||||||
|
if !candles.is_empty() {
|
||||||
|
let upsert_statement = build_candles_upsert_statement(candles);
|
||||||
|
client
|
||||||
|
.execute(&upsert_statement, &[])
|
||||||
|
.await
|
||||||
|
.map_err_anyhow()?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -1,33 +1,37 @@
|
||||||
use std::{collections::HashMap, str::FromStr, env};
|
|
||||||
use anchor_lang::prelude::Pubkey;
|
use anchor_lang::prelude::Pubkey;
|
||||||
use chrono::{NaiveDateTime, DateTime, Utc, Duration};
|
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
|
||||||
use futures::future::join_all;
|
use futures::future::join_all;
|
||||||
use openbook_candles::{structs::{openbook::OpenBookFillEventLog, markets::{load_markets, fetch_market_infos}}, worker::trade_fetching::{scrape::scrape_transactions, parsing::parse_trades_from_openbook_txns}, database::{initialize::connect_to_database, insert::persist_fill_events}, utils::Config};
|
use openbook_candles::{
|
||||||
use solana_client::{rpc_config::RpcTransactionConfig, nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, rpc_response::RpcConfirmedTransactionStatusWithSignature};
|
database::{initialize::connect_to_database, insert::persist_fill_events},
|
||||||
|
structs::{
|
||||||
|
markets::{fetch_market_infos, load_markets},
|
||||||
|
openbook::OpenBookFillEvent,
|
||||||
|
},
|
||||||
|
utils::Config,
|
||||||
|
worker::trade_fetching::parsing::parse_trades_from_openbook_txns,
|
||||||
|
};
|
||||||
|
use solana_client::{
|
||||||
|
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
|
||||||
|
rpc_config::RpcTransactionConfig, rpc_response::RpcConfirmedTransactionStatusWithSignature,
|
||||||
|
};
|
||||||
use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature};
|
use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature};
|
||||||
use solana_transaction_status::UiTransactionEncoding;
|
use solana_transaction_status::UiTransactionEncoding;
|
||||||
use tokio::sync::mpsc::{Sender, self};
|
use std::{collections::HashMap, env, str::FromStr};
|
||||||
|
use tokio::sync::mpsc::{self, Sender};
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
dotenv::dotenv().ok();
|
dotenv::dotenv().ok();
|
||||||
let args: Vec<String> = env::args().collect();
|
let args: Vec<String> = env::args().collect();
|
||||||
assert!(args.len() == 2);
|
assert!(args.len() == 2);
|
||||||
|
|
||||||
let path_to_markets_json = &args[1];
|
let path_to_markets_json = &args[1];
|
||||||
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
|
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
|
||||||
let database_url: String = dotenv::var("DATABASE_URL").unwrap();
|
|
||||||
let max_pg_pool_connections: u32 = dotenv::var("MAX_PG_POOL_CONNS_WORKER")
|
|
||||||
.unwrap()
|
|
||||||
.parse::<u32>()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let config = Config {
|
let config = Config {
|
||||||
rpc_url: rpc_url.clone(),
|
rpc_url: rpc_url.clone(),
|
||||||
database_url,
|
|
||||||
max_pg_pool_connections,
|
|
||||||
};
|
};
|
||||||
let markets = load_markets(&path_to_markets_json);
|
let markets = load_markets(path_to_markets_json);
|
||||||
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
|
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
|
||||||
let mut target_markets = HashMap::new();
|
let mut target_markets = HashMap::new();
|
||||||
for m in market_infos.clone() {
|
for m in market_infos.clone() {
|
||||||
|
@ -35,10 +39,10 @@ async fn main() -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
println!("{:?}", target_markets);
|
println!("{:?}", target_markets);
|
||||||
|
|
||||||
let pool = connect_to_database(&config).await?;
|
let pool = connect_to_database().await?;
|
||||||
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEventLog>(1000);
|
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(1000);
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
persist_fill_events(&pool, &mut fill_receiver)
|
persist_fill_events(&pool, &mut fill_receiver)
|
||||||
.await
|
.await
|
||||||
|
@ -52,10 +56,9 @@ async fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
pub async fn backfill(
|
pub async fn backfill(
|
||||||
rpc_url: String,
|
rpc_url: String,
|
||||||
fill_sender: &Sender<OpenBookFillEventLog>,
|
fill_sender: &Sender<OpenBookFillEvent>,
|
||||||
target_markets: &HashMap<Pubkey, u8>,
|
target_markets: &HashMap<Pubkey, u8>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
|
|
||||||
println!("backfill started");
|
println!("backfill started");
|
||||||
let mut before_sig: Option<Signature> = None;
|
let mut before_sig: Option<Signature> = None;
|
||||||
let mut now_time = Utc::now().timestamp();
|
let mut now_time = Utc::now().timestamp();
|
||||||
|
@ -64,14 +67,14 @@ pub async fn backfill(
|
||||||
let mut handles = vec![];
|
let mut handles = vec![];
|
||||||
|
|
||||||
while now_time > end_time {
|
while now_time > end_time {
|
||||||
let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
|
let rpc_client =
|
||||||
|
RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
|
||||||
let maybe_r = get_signatures(&rpc_client, before_sig).await;
|
let maybe_r = get_signatures(&rpc_client, before_sig).await;
|
||||||
|
|
||||||
match maybe_r {
|
match maybe_r {
|
||||||
Some((last, time, sigs)) => {
|
Some((last, time, sigs)) => {
|
||||||
now_time = time;
|
now_time = time;
|
||||||
before_sig = Some(last);
|
before_sig = Some(last);
|
||||||
|
|
||||||
let time_left = backfill_time_left(now_time, end_time);
|
let time_left = backfill_time_left(now_time, end_time);
|
||||||
println!(
|
println!(
|
||||||
"{} minutes ~ {} days remaining in the backfill\n",
|
"{} minutes ~ {} days remaining in the backfill\n",
|
||||||
|
@ -85,10 +88,10 @@ pub async fn backfill(
|
||||||
get_transactions(&rpc_client, sigs, &cloned_sender, &cloned_markets).await;
|
get_transactions(&rpc_client, sigs, &cloned_sender, &cloned_markets).await;
|
||||||
});
|
});
|
||||||
handles.push(handle);
|
handles.push(handle);
|
||||||
},
|
}
|
||||||
None => {},
|
None => {}
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
|
|
||||||
futures::future::join_all(handles).await;
|
futures::future::join_all(handles).await;
|
||||||
|
|
||||||
|
@ -96,44 +99,52 @@ pub async fn backfill(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn get_signatures(
|
||||||
|
rpc_client: &RpcClient,
|
||||||
|
before_sig: Option<Signature>,
|
||||||
|
) -> Option<(
|
||||||
|
Signature,
|
||||||
|
i64,
|
||||||
|
Vec<RpcConfirmedTransactionStatusWithSignature>,
|
||||||
|
)> {
|
||||||
|
let rpc_config = GetConfirmedSignaturesForAddress2Config {
|
||||||
|
before: before_sig,
|
||||||
|
until: None,
|
||||||
|
limit: None,
|
||||||
|
commitment: Some(CommitmentConfig::confirmed()),
|
||||||
|
};
|
||||||
|
|
||||||
pub async fn get_signatures(rpc_client: &RpcClient,
|
let sigs = match rpc_client
|
||||||
before_sig: Option<Signature>) -> Option<(Signature, i64, Vec<RpcConfirmedTransactionStatusWithSignature>)> {
|
.get_signatures_for_address_with_config(
|
||||||
let rpc_config = GetConfirmedSignaturesForAddress2Config {
|
&Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(),
|
||||||
before: before_sig,
|
rpc_config,
|
||||||
until: None,
|
)
|
||||||
limit: None,
|
.await
|
||||||
commitment: Some(CommitmentConfig::confirmed()),
|
{
|
||||||
};
|
Ok(s) => s,
|
||||||
|
Err(e) => {
|
||||||
let sigs = match rpc_client
|
println!("Error in get_signatures_for_address_with_config: {}", e);
|
||||||
.get_signatures_for_address_with_config(
|
|
||||||
&Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(),
|
|
||||||
rpc_config,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(s) => s,
|
|
||||||
Err(e) => {
|
|
||||||
println!("Error in get_signatures_for_address_with_config: {}", e);
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if sigs.len() == 0 {
|
|
||||||
println!("No signatures found");
|
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
let last = sigs.last().unwrap();
|
};
|
||||||
return Some((Signature::from_str(&last.signature).unwrap(), last.block_time.unwrap(), sigs));
|
|
||||||
|
if sigs.is_empty() {
|
||||||
|
println!("No signatures found");
|
||||||
|
return None;
|
||||||
}
|
}
|
||||||
|
let last = sigs.last().unwrap();
|
||||||
|
// println!("{:?}", last.block_time.unwrap());
|
||||||
|
Some((
|
||||||
|
Signature::from_str(&last.signature).unwrap(),
|
||||||
|
last.block_time.unwrap(),
|
||||||
|
sigs,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn get_transactions(
|
pub async fn get_transactions(
|
||||||
rpc_client: &RpcClient,
|
rpc_client: &RpcClient,
|
||||||
mut sigs: Vec<RpcConfirmedTransactionStatusWithSignature>,
|
mut sigs: Vec<RpcConfirmedTransactionStatusWithSignature>,
|
||||||
fill_sender: &Sender<OpenBookFillEventLog>,
|
fill_sender: &Sender<OpenBookFillEvent>,
|
||||||
target_markets: &HashMap<Pubkey, u8>,
|
target_markets: &HashMap<Pubkey, u8>,
|
||||||
) {
|
) {
|
||||||
sigs.retain(|sig| sig.err.is_none());
|
sigs.retain(|sig| sig.err.is_none());
|
||||||
|
@ -154,14 +165,15 @@ pub async fn get_transactions(
|
||||||
|
|
||||||
let txn_futs: Vec<_> = signatures
|
let txn_futs: Vec<_> = signatures
|
||||||
.iter()
|
.iter()
|
||||||
.map(|s| rpc_client.get_transaction_with_config(&s, txn_config))
|
.map(|s| rpc_client.get_transaction_with_config(s, txn_config))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let mut txns = join_all(txn_futs).await;
|
let mut txns = join_all(txn_futs).await;
|
||||||
|
|
||||||
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
|
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
|
||||||
if fills.len() > 0 {
|
if !fills.is_empty() {
|
||||||
for fill in fills.into_iter() {
|
for fill in fills.into_iter() {
|
||||||
|
// println!("Sending fill {:?}", fill);
|
||||||
if let Err(_) = fill_sender.send(fill).await {
|
if let Err(_) = fill_sender.send(fill).await {
|
||||||
panic!("receiver dropped");
|
panic!("receiver dropped");
|
||||||
}
|
}
|
||||||
|
@ -175,4 +187,4 @@ fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration {
|
||||||
let cur_date = DateTime::<Utc>::from_utc(naive_cur, Utc);
|
let cur_date = DateTime::<Utc>::from_utc(naive_cur, Utc);
|
||||||
let bf_date = DateTime::<Utc>::from_utc(naive_bf, Utc);
|
let bf_date = DateTime::<Utc>::from_utc(naive_bf, Utc);
|
||||||
cur_date - bf_date
|
cur_date - bf_date
|
||||||
}
|
}
|
|
@ -1,50 +1,54 @@
|
||||||
use chrono::{DateTime, Utc};
|
use crate::structs::{
|
||||||
use sqlx::{pool::PoolConnection, Postgres};
|
candle::Candle,
|
||||||
|
coingecko::{PgCoinGecko24HighLow, PgCoinGecko24HourVolume},
|
||||||
use crate::{
|
openbook::PgOpenBookFill,
|
||||||
structs::{
|
resolution::Resolution,
|
||||||
candle::Candle,
|
trader::PgTrader,
|
||||||
coingecko::{PgCoinGecko24HighLow, PgCoinGecko24HourVolume},
|
|
||||||
openbook::PgOpenBookFill,
|
|
||||||
resolution::Resolution,
|
|
||||||
trader::PgTrader,
|
|
||||||
},
|
|
||||||
utils::AnyhowWrap,
|
|
||||||
};
|
};
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use deadpool_postgres::{GenericClient, Pool};
|
||||||
|
|
||||||
pub async fn fetch_earliest_fill(
|
pub async fn fetch_earliest_fill(
|
||||||
conn: &mut PoolConnection<Postgres>,
|
pool: &Pool,
|
||||||
market_address_string: &str,
|
market_address_string: &str,
|
||||||
) -> anyhow::Result<Option<PgOpenBookFill>> {
|
) -> anyhow::Result<Option<PgOpenBookFill>> {
|
||||||
sqlx::query_as!(
|
let client = pool.get().await?;
|
||||||
PgOpenBookFill,
|
|
||||||
r#"SELECT
|
let stmt = client
|
||||||
time as "time!",
|
.prepare(
|
||||||
bid as "bid!",
|
r#"SELECT
|
||||||
maker as "maker!",
|
time as "time!",
|
||||||
native_qty_paid as "native_qty_paid!",
|
bid as "bid!",
|
||||||
native_qty_received as "native_qty_received!",
|
maker as "maker!",
|
||||||
native_fee_or_rebate as "native_fee_or_rebate!"
|
native_qty_paid as "native_qty_paid!",
|
||||||
from fills
|
native_qty_received as "native_qty_received!",
|
||||||
where market = $1
|
native_fee_or_rebate as "native_fee_or_rebate!"
|
||||||
and maker = true
|
from fills
|
||||||
ORDER BY time asc LIMIT 1"#,
|
where market = $1
|
||||||
market_address_string
|
and maker = true
|
||||||
)
|
ORDER BY time asc LIMIT 1"#,
|
||||||
.fetch_optional(conn)
|
)
|
||||||
.await
|
.await?;
|
||||||
.map_err_anyhow()
|
|
||||||
|
let row = client.query_opt(&stmt, &[&market_address_string]).await?;
|
||||||
|
|
||||||
|
match row {
|
||||||
|
Some(r) => Ok(Some(PgOpenBookFill::from_row(r))),
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn fetch_fills_from(
|
pub async fn fetch_fills_from(
|
||||||
conn: &mut PoolConnection<Postgres>,
|
pool: &Pool,
|
||||||
market_address_string: &str,
|
market_address_string: &str,
|
||||||
start_time: DateTime<Utc>,
|
start_time: DateTime<Utc>,
|
||||||
end_time: DateTime<Utc>,
|
end_time: DateTime<Utc>,
|
||||||
) -> anyhow::Result<Vec<PgOpenBookFill>> {
|
) -> anyhow::Result<Vec<PgOpenBookFill>> {
|
||||||
sqlx::query_as!(
|
let client = pool.get().await?;
|
||||||
PgOpenBookFill,
|
|
||||||
r#"SELECT
|
let stmt = client
|
||||||
|
.prepare(
|
||||||
|
r#"SELECT
|
||||||
time as "time!",
|
time as "time!",
|
||||||
bid as "bid!",
|
bid as "bid!",
|
||||||
maker as "maker!",
|
maker as "maker!",
|
||||||
|
@ -53,31 +57,36 @@ pub async fn fetch_fills_from(
|
||||||
native_fee_or_rebate as "native_fee_or_rebate!"
|
native_fee_or_rebate as "native_fee_or_rebate!"
|
||||||
from fills
|
from fills
|
||||||
where market = $1
|
where market = $1
|
||||||
and time >= $2
|
and time >= $2::timestamptz
|
||||||
and time < $3
|
and time < $3::timestamptz
|
||||||
and maker = true
|
and maker = true
|
||||||
ORDER BY time asc"#,
|
ORDER BY time asc"#,
|
||||||
market_address_string,
|
)
|
||||||
start_time,
|
.await?;
|
||||||
end_time
|
|
||||||
)
|
let rows = client
|
||||||
.fetch_all(conn)
|
.query(&stmt, &[&market_address_string, &start_time, &end_time])
|
||||||
.await
|
.await?;
|
||||||
.map_err_anyhow()
|
Ok(rows
|
||||||
|
.into_iter()
|
||||||
|
.map(PgOpenBookFill::from_row)
|
||||||
|
.collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn fetch_latest_finished_candle(
|
pub async fn fetch_latest_finished_candle(
|
||||||
conn: &mut PoolConnection<Postgres>,
|
pool: &Pool,
|
||||||
market_name: &str,
|
market_name: &str,
|
||||||
resolution: Resolution,
|
resolution: Resolution,
|
||||||
) -> anyhow::Result<Option<Candle>> {
|
) -> anyhow::Result<Option<Candle>> {
|
||||||
sqlx::query_as!(
|
let client = pool.get().await?;
|
||||||
Candle,
|
|
||||||
r#"SELECT
|
let stmt = client
|
||||||
|
.prepare(
|
||||||
|
r#"SELECT
|
||||||
|
market_name as "market_name!",
|
||||||
start_time as "start_time!",
|
start_time as "start_time!",
|
||||||
end_time as "end_time!",
|
end_time as "end_time!",
|
||||||
resolution as "resolution!",
|
resolution as "resolution!",
|
||||||
market_name as "market_name!",
|
|
||||||
open as "open!",
|
open as "open!",
|
||||||
close as "close!",
|
close as "close!",
|
||||||
high as "high!",
|
high as "high!",
|
||||||
|
@ -89,26 +98,35 @@ pub async fn fetch_latest_finished_candle(
|
||||||
and resolution = $2
|
and resolution = $2
|
||||||
and complete = true
|
and complete = true
|
||||||
ORDER BY start_time desc LIMIT 1"#,
|
ORDER BY start_time desc LIMIT 1"#,
|
||||||
market_name,
|
)
|
||||||
resolution.to_string()
|
.await?;
|
||||||
)
|
|
||||||
.fetch_optional(conn)
|
let row = client
|
||||||
.await
|
.query_opt(&stmt, &[&market_name, &resolution.to_string()])
|
||||||
.map_err_anyhow()
|
.await?;
|
||||||
|
|
||||||
|
match row {
|
||||||
|
Some(r) => Ok(Some(Candle::from_row(r))),
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Fetches all of the candles for the given market and resoultion, starting from the earliest.
|
||||||
|
/// Note that this function will fetch ALL candles.
|
||||||
pub async fn fetch_earliest_candles(
|
pub async fn fetch_earliest_candles(
|
||||||
conn: &mut PoolConnection<Postgres>,
|
pool: &Pool,
|
||||||
market_name: &str,
|
market_name: &str,
|
||||||
resolution: Resolution,
|
resolution: Resolution,
|
||||||
) -> anyhow::Result<Vec<Candle>> {
|
) -> anyhow::Result<Vec<Candle>> {
|
||||||
sqlx::query_as!(
|
let client = pool.get().await?;
|
||||||
Candle,
|
|
||||||
r#"SELECT
|
let stmt = client
|
||||||
|
.prepare(
|
||||||
|
r#"SELECT
|
||||||
|
market_name as "market_name!",
|
||||||
start_time as "start_time!",
|
start_time as "start_time!",
|
||||||
end_time as "end_time!",
|
end_time as "end_time!",
|
||||||
resolution as "resolution!",
|
resolution as "resolution!",
|
||||||
market_name as "market_name!",
|
|
||||||
open as "open!",
|
open as "open!",
|
||||||
close as "close!",
|
close as "close!",
|
||||||
high as "high!",
|
high as "high!",
|
||||||
|
@ -119,64 +137,32 @@ pub async fn fetch_earliest_candles(
|
||||||
where market_name = $1
|
where market_name = $1
|
||||||
and resolution = $2
|
and resolution = $2
|
||||||
ORDER BY start_time asc"#,
|
ORDER BY start_time asc"#,
|
||||||
market_name,
|
)
|
||||||
resolution.to_string()
|
.await?;
|
||||||
)
|
|
||||||
.fetch_all(conn)
|
let rows = client
|
||||||
.await
|
.query(&stmt, &[&market_name, &resolution.to_string()])
|
||||||
.map_err_anyhow()
|
.await?;
|
||||||
|
|
||||||
|
Ok(rows.into_iter().map(Candle::from_row).collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn fetch_candles_from(
|
pub async fn fetch_candles_from(
|
||||||
conn: &mut PoolConnection<Postgres>,
|
pool: &Pool,
|
||||||
market_name: &str,
|
market_name: &str,
|
||||||
resolution: Resolution,
|
resolution: Resolution,
|
||||||
start_time: DateTime<Utc>,
|
start_time: DateTime<Utc>,
|
||||||
end_time: DateTime<Utc>,
|
end_time: DateTime<Utc>,
|
||||||
) -> anyhow::Result<Vec<Candle>> {
|
) -> anyhow::Result<Vec<Candle>> {
|
||||||
sqlx::query_as!(
|
let client = pool.get().await?;
|
||||||
Candle,
|
|
||||||
r#"SELECT
|
|
||||||
start_time as "start_time!",
|
|
||||||
end_time as "end_time!",
|
|
||||||
resolution as "resolution!",
|
|
||||||
market_name as "market_name!",
|
|
||||||
open as "open!",
|
|
||||||
close as "close!",
|
|
||||||
high as "high!",
|
|
||||||
low as "low!",
|
|
||||||
volume as "volume!",
|
|
||||||
complete as "complete!"
|
|
||||||
from candles
|
|
||||||
where market_name = $1
|
|
||||||
and resolution = $2
|
|
||||||
and start_time >= $3
|
|
||||||
and end_time <= $4
|
|
||||||
ORDER BY start_time asc"#,
|
|
||||||
market_name,
|
|
||||||
resolution.to_string(),
|
|
||||||
start_time,
|
|
||||||
end_time
|
|
||||||
)
|
|
||||||
.fetch_all(conn)
|
|
||||||
.await
|
|
||||||
.map_err_anyhow()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn fetch_tradingview_candles(
|
let stmt = client
|
||||||
conn: &mut PoolConnection<Postgres>,
|
.prepare(
|
||||||
market_name: &str,
|
r#"SELECT
|
||||||
resolution: Resolution,
|
market_name as "market_name!",
|
||||||
start_time: DateTime<Utc>,
|
|
||||||
end_time: DateTime<Utc>,
|
|
||||||
) -> anyhow::Result<Vec<Candle>> {
|
|
||||||
sqlx::query_as!(
|
|
||||||
Candle,
|
|
||||||
r#"SELECT
|
|
||||||
start_time as "start_time!",
|
start_time as "start_time!",
|
||||||
end_time as "end_time!",
|
end_time as "end_time!",
|
||||||
resolution as "resolution!",
|
resolution as "resolution!",
|
||||||
market_name as "market_name!",
|
|
||||||
open as "open!",
|
open as "open!",
|
||||||
close as "close!",
|
close as "close!",
|
||||||
high as "high!",
|
high as "high!",
|
||||||
|
@ -189,112 +175,135 @@ pub async fn fetch_tradingview_candles(
|
||||||
and start_time >= $3
|
and start_time >= $3
|
||||||
and end_time <= $4
|
and end_time <= $4
|
||||||
ORDER BY start_time asc"#,
|
ORDER BY start_time asc"#,
|
||||||
market_name,
|
)
|
||||||
resolution.to_string(),
|
.await?;
|
||||||
start_time,
|
|
||||||
end_time
|
let rows = client
|
||||||
)
|
.query(
|
||||||
.fetch_all(conn)
|
&stmt,
|
||||||
.await
|
&[
|
||||||
.map_err_anyhow()
|
&market_name,
|
||||||
|
&resolution.to_string(),
|
||||||
|
&start_time,
|
||||||
|
&end_time,
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(rows.into_iter().map(Candle::from_row).collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn fetch_top_traders_by_base_volume_from(
|
pub async fn fetch_top_traders_by_base_volume_from(
|
||||||
conn: &mut PoolConnection<Postgres>,
|
pool: &Pool,
|
||||||
market_address_string: &str,
|
market_address_string: &str,
|
||||||
start_time: DateTime<Utc>,
|
start_time: DateTime<Utc>,
|
||||||
end_time: DateTime<Utc>,
|
end_time: DateTime<Utc>,
|
||||||
) -> anyhow::Result<Vec<PgTrader>> {
|
) -> anyhow::Result<Vec<PgTrader>> {
|
||||||
sqlx::query_as!(
|
let client = pool.get().await?;
|
||||||
PgTrader,
|
|
||||||
r#"SELECT
|
let stmt = client
|
||||||
open_orders_owner,
|
.prepare(
|
||||||
sum(
|
r#"SELECT
|
||||||
native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
|
open_orders_owner,
|
||||||
) as "raw_ask_size!",
|
sum(
|
||||||
sum(
|
native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
|
||||||
native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
|
) as "raw_ask_size!",
|
||||||
) as "raw_bid_size!"
|
sum(
|
||||||
FROM fills
|
native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
|
||||||
WHERE market = $1
|
) as "raw_bid_size!"
|
||||||
AND time >= $2
|
FROM fills
|
||||||
AND time < $3
|
WHERE market = $1
|
||||||
GROUP BY open_orders_owner
|
AND time >= $2
|
||||||
ORDER BY
|
AND time < $3
|
||||||
sum(native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
|
GROUP BY open_orders_owner
|
||||||
+
|
ORDER BY
|
||||||
sum(native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
|
sum(native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
|
||||||
DESC
|
+
|
||||||
LIMIT 10000"#,
|
sum(native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
|
||||||
market_address_string,
|
DESC
|
||||||
start_time,
|
LIMIT 10000"#,
|
||||||
end_time
|
)
|
||||||
)
|
.await?;
|
||||||
.fetch_all(conn)
|
|
||||||
.await
|
let rows = client
|
||||||
.map_err_anyhow()
|
.query(&stmt, &[&market_address_string, &start_time, &end_time])
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(rows.into_iter().map(PgTrader::from_row).collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn fetch_top_traders_by_quote_volume_from(
|
pub async fn fetch_top_traders_by_quote_volume_from(
|
||||||
conn: &mut PoolConnection<Postgres>,
|
pool: &Pool,
|
||||||
market_address_string: &str,
|
market_address_string: &str,
|
||||||
start_time: DateTime<Utc>,
|
start_time: DateTime<Utc>,
|
||||||
end_time: DateTime<Utc>,
|
end_time: DateTime<Utc>,
|
||||||
) -> anyhow::Result<Vec<PgTrader>> {
|
) -> anyhow::Result<Vec<PgTrader>> {
|
||||||
sqlx::query_as!(
|
let client = pool.get().await?;
|
||||||
PgTrader,
|
|
||||||
r#"SELECT
|
let stmt = client
|
||||||
open_orders_owner,
|
.prepare(
|
||||||
sum(
|
r#"SELECT
|
||||||
native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
|
open_orders_owner,
|
||||||
) as "raw_ask_size!",
|
sum(
|
||||||
sum(
|
native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
|
||||||
native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
|
) as "raw_ask_size!",
|
||||||
) as "raw_bid_size!"
|
sum(
|
||||||
FROM fills
|
native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
|
||||||
WHERE market = $1
|
) as "raw_bid_size!"
|
||||||
AND time >= $2
|
FROM fills
|
||||||
AND time < $3
|
WHERE market = $1
|
||||||
GROUP BY open_orders_owner
|
AND time >= $2
|
||||||
ORDER BY
|
AND time < $3
|
||||||
sum(native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
|
GROUP BY open_orders_owner
|
||||||
+
|
ORDER BY
|
||||||
sum(native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
|
sum(native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
|
||||||
DESC
|
+
|
||||||
LIMIT 10000"#,
|
sum(native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
|
||||||
market_address_string,
|
DESC
|
||||||
start_time,
|
LIMIT 10000"#,
|
||||||
end_time
|
)
|
||||||
)
|
.await?;
|
||||||
.fetch_all(conn)
|
|
||||||
.await
|
let rows = client
|
||||||
.map_err_anyhow()
|
.query(&stmt, &[&market_address_string, &start_time, &end_time])
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(rows.into_iter().map(PgTrader::from_row).collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn fetch_coingecko_24h_volume(
|
pub async fn fetch_coingecko_24h_volume(
|
||||||
conn: &mut PoolConnection<Postgres>,
|
pool: &Pool,
|
||||||
) -> anyhow::Result<Vec<PgCoinGecko24HourVolume>> {
|
) -> anyhow::Result<Vec<PgCoinGecko24HourVolume>> {
|
||||||
sqlx::query_as!(
|
let client = pool.get().await?;
|
||||||
PgCoinGecko24HourVolume,
|
|
||||||
r#"select market as "address!",
|
let stmt = client
|
||||||
sum(native_qty_paid) as "raw_quote_size!",
|
.prepare(
|
||||||
sum(native_qty_received) as "raw_base_size!"
|
r#"select market as "address!",
|
||||||
|
sum(native_qty_received) as "raw_base_size!",
|
||||||
|
sum(native_qty_paid) as "raw_quote_size!"
|
||||||
from fills
|
from fills
|
||||||
where "time" >= current_timestamp - interval '1 day'
|
where "time" >= current_timestamp - interval '1 day'
|
||||||
and bid = true
|
and bid = true
|
||||||
group by market"#
|
group by market"#,
|
||||||
)
|
)
|
||||||
.fetch_all(conn)
|
.await?;
|
||||||
.await
|
|
||||||
.map_err_anyhow()
|
let rows = client.query(&stmt, &[]).await?;
|
||||||
|
|
||||||
|
Ok(rows
|
||||||
|
.into_iter()
|
||||||
|
.map(PgCoinGecko24HourVolume::from_row)
|
||||||
|
.collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn fetch_coingecko_24h_high_low(
|
pub async fn fetch_coingecko_24h_high_low(
|
||||||
conn: &mut PoolConnection<Postgres>,
|
pool: &Pool,
|
||||||
) -> anyhow::Result<Vec<PgCoinGecko24HighLow>> {
|
) -> anyhow::Result<Vec<PgCoinGecko24HighLow>> {
|
||||||
sqlx::query_as!(
|
let client = pool.get().await?;
|
||||||
PgCoinGecko24HighLow,
|
|
||||||
r#"select
|
let stmt = client
|
||||||
|
.prepare(
|
||||||
|
r#"select
|
||||||
g.market_name as "market_name!",
|
g.market_name as "market_name!",
|
||||||
g.high as "high!",
|
g.high as "high!",
|
||||||
g.low as "low!",
|
g.low as "low!",
|
||||||
|
@ -317,9 +326,14 @@ pub async fn fetch_coingecko_24h_high_low(
|
||||||
join candles c on g.market_name = c.market_name
|
join candles c on g.market_name = c.market_name
|
||||||
and g.start_time = c.start_time
|
and g.start_time = c.start_time
|
||||||
where
|
where
|
||||||
c.resolution = '1M'"#
|
c.resolution = '1M'"#,
|
||||||
)
|
)
|
||||||
.fetch_all(conn)
|
.await?;
|
||||||
.await
|
|
||||||
.map_err_anyhow()
|
let rows = client.query(&stmt, &[]).await?;
|
||||||
|
|
||||||
|
Ok(rows
|
||||||
|
.into_iter()
|
||||||
|
.map(PgCoinGecko24HighLow::from_row)
|
||||||
|
.collect())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,24 +1,70 @@
|
||||||
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
|
use std::{fs, time::Duration};
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use crate::utils::{AnyhowWrap, Config};
|
use deadpool_postgres::{
|
||||||
|
ManagerConfig, Pool, PoolConfig, RecyclingMethod, Runtime, SslMode, Timeouts,
|
||||||
|
};
|
||||||
|
use native_tls::{Certificate, Identity, TlsConnector};
|
||||||
|
use postgres_native_tls::MakeTlsConnector;
|
||||||
|
|
||||||
pub async fn connect_to_database(config: &Config) -> anyhow::Result<Pool<Postgres>> {
|
use crate::utils::PgConfig;
|
||||||
loop {
|
|
||||||
let pool = PgPoolOptions::new()
|
pub async fn connect_to_database() -> anyhow::Result<Pool> {
|
||||||
.max_connections(config.max_pg_pool_connections)
|
let mut pg_config = PgConfig::from_env()?;
|
||||||
.connect(&config.database_url)
|
|
||||||
.await;
|
pg_config.pg.manager = Some(ManagerConfig {
|
||||||
if pool.is_ok() {
|
recycling_method: RecyclingMethod::Fast,
|
||||||
println!("Database connected");
|
});
|
||||||
return pool.map_err_anyhow();
|
pg_config.pg.pool = Some(PoolConfig {
|
||||||
|
max_size: pg_config.pg_max_pool_connections,
|
||||||
|
timeouts: Timeouts::default(),
|
||||||
|
});
|
||||||
|
|
||||||
|
// openssl pkcs12 -export -in client.cer -inkey client-key.cer -out client.pks
|
||||||
|
// base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64
|
||||||
|
// fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a mango-fills
|
||||||
|
// fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills
|
||||||
|
let tls = if pg_config.pg_use_ssl {
|
||||||
|
pg_config.pg.ssl_mode = Some(SslMode::Require);
|
||||||
|
let ca_cert = fs::read(pg_config.pg_ca_cert_path.expect("reading ca cert from env"))
|
||||||
|
.expect("reading ca cert from file");
|
||||||
|
let client_key = fs::read(
|
||||||
|
pg_config
|
||||||
|
.pg_client_key_path
|
||||||
|
.expect("reading client key from env"),
|
||||||
|
)
|
||||||
|
.expect("reading client key from file");
|
||||||
|
MakeTlsConnector::new(
|
||||||
|
TlsConnector::builder()
|
||||||
|
.add_root_certificate(Certificate::from_pem(&ca_cert)?)
|
||||||
|
.identity(Identity::from_pkcs12(&client_key, "pass")?)
|
||||||
|
.danger_accept_invalid_certs(false)
|
||||||
|
.build()?,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
MakeTlsConnector::new(
|
||||||
|
TlsConnector::builder()
|
||||||
|
.danger_accept_invalid_certs(true)
|
||||||
|
.build()
|
||||||
|
.unwrap(),
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
let pool = pg_config
|
||||||
|
.pg
|
||||||
|
.create_pool(Some(Runtime::Tokio1), tls)
|
||||||
|
.unwrap();
|
||||||
|
match pool.get().await {
|
||||||
|
Ok(_) => println!("Database connected"),
|
||||||
|
Err(e) => {
|
||||||
|
println!("Failed to connect to database: {}, retrying", e.to_string());
|
||||||
|
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||||
}
|
}
|
||||||
println!("Failed to connect to database, retrying");
|
|
||||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(pool)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn setup_database(pool: &Pool<Postgres>) -> anyhow::Result<()> {
|
pub async fn setup_database(pool: &Pool) -> anyhow::Result<()> {
|
||||||
let candles_table_fut = create_candles_table(pool);
|
let candles_table_fut = create_candles_table(pool);
|
||||||
let fills_table_fut = create_fills_table(pool);
|
let fills_table_fut = create_fills_table(pool);
|
||||||
let result = tokio::try_join!(candles_table_fut, fills_table_fut);
|
let result = tokio::try_join!(candles_table_fut, fills_table_fut);
|
||||||
|
@ -34,50 +80,51 @@ pub async fn setup_database(pool: &Pool<Postgres>) -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn create_candles_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
|
pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> {
|
||||||
let mut tx = pool.begin().await.map_err_anyhow()?;
|
let client = pool.get().await?;
|
||||||
|
|
||||||
sqlx::query!(
|
client
|
||||||
"CREATE TABLE IF NOT EXISTS candles (
|
.execute(
|
||||||
|
"CREATE TABLE IF NOT EXISTS candles (
|
||||||
id serial,
|
id serial,
|
||||||
market_name text,
|
market_name text,
|
||||||
start_time timestamptz,
|
start_time timestamptz,
|
||||||
end_time timestamptz,
|
end_time timestamptz,
|
||||||
resolution text,
|
resolution text,
|
||||||
open numeric,
|
open double precision,
|
||||||
close numeric,
|
close double precision,
|
||||||
high numeric,
|
high double precision,
|
||||||
low numeric,
|
low double precision,
|
||||||
volume numeric,
|
volume double precision,
|
||||||
complete bool
|
complete bool
|
||||||
)",
|
)",
|
||||||
)
|
&[],
|
||||||
.execute(&mut tx)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
sqlx::query!(
|
client.execute(
|
||||||
"CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market_name, start_time, resolution)"
|
"CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market_name, start_time, resolution)",
|
||||||
).execute(&mut tx).await?;
|
&[]
|
||||||
|
).await?;
|
||||||
|
|
||||||
sqlx::query!(
|
client.execute(
|
||||||
"DO $$
|
"DO $$
|
||||||
BEGIN
|
BEGIN
|
||||||
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'unique_candles') THEN
|
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'unique_candles') THEN
|
||||||
ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market_name, start_time, resolution);
|
ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market_name, start_time, resolution);
|
||||||
END IF;
|
END IF;
|
||||||
END $$"
|
END $$", &[]
|
||||||
)
|
).await?;
|
||||||
.execute(&mut tx)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
tx.commit().await.map_err_anyhow()
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn create_fills_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
|
pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> {
|
||||||
let mut tx = pool.begin().await.map_err_anyhow()?;
|
let client = pool.get().await?;
|
||||||
|
|
||||||
sqlx::query!(
|
client
|
||||||
"CREATE TABLE IF NOT EXISTS fills (
|
.execute(
|
||||||
|
"CREATE TABLE IF NOT EXISTS fills (
|
||||||
id numeric PRIMARY KEY,
|
id numeric PRIMARY KEY,
|
||||||
time timestamptz not null,
|
time timestamptz not null,
|
||||||
market text not null,
|
market text not null,
|
||||||
|
@ -85,23 +132,28 @@ pub async fn create_fills_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
|
||||||
open_orders_owner text not null,
|
open_orders_owner text not null,
|
||||||
bid bool not null,
|
bid bool not null,
|
||||||
maker bool not null,
|
maker bool not null,
|
||||||
native_qty_paid numeric not null,
|
native_qty_paid double precision not null,
|
||||||
native_qty_received numeric not null,
|
native_qty_received double precision not null,
|
||||||
native_fee_or_rebate numeric not null,
|
native_fee_or_rebate double precision not null,
|
||||||
fee_tier text not null,
|
fee_tier text not null,
|
||||||
order_id text not null
|
order_id text not null
|
||||||
)",
|
)",
|
||||||
)
|
&[],
|
||||||
.execute(&mut tx)
|
)
|
||||||
.await?;
|
|
||||||
|
|
||||||
sqlx::query!("CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)")
|
|
||||||
.execute(&mut tx)
|
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
sqlx::query!("CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)")
|
client
|
||||||
.execute(&mut tx)
|
.execute(
|
||||||
|
"CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)",
|
||||||
|
&[],
|
||||||
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
tx.commit().await.map_err_anyhow()
|
client
|
||||||
|
.execute(
|
||||||
|
"CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)",
|
||||||
|
&[],
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
use chrono::Utc;
|
use deadpool_postgres::Pool;
|
||||||
use sqlx::{Connection, Pool, Postgres};
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::{hash_map::DefaultHasher, HashMap},
|
collections::{hash_map::DefaultHasher, HashMap},
|
||||||
hash::{Hash, Hasher},
|
hash::{Hash, Hasher},
|
||||||
|
@ -7,26 +6,24 @@ use std::{
|
||||||
use tokio::sync::mpsc::{error::TryRecvError, Receiver};
|
use tokio::sync::mpsc::{error::TryRecvError, Receiver};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
structs::{candle::Candle, openbook::OpenBookFillEventLog},
|
structs::{candle::Candle, openbook::OpenBookFillEvent},
|
||||||
utils::AnyhowWrap,
|
utils::{to_timestampz, AnyhowWrap},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub async fn persist_fill_events(
|
pub async fn persist_fill_events(
|
||||||
pool: &Pool<Postgres>,
|
pool: &Pool,
|
||||||
fill_receiver: &mut Receiver<OpenBookFillEventLog>,
|
fill_receiver: &mut Receiver<OpenBookFillEvent>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let mut conn = pool.acquire().await.unwrap();
|
let client = pool.get().await?;
|
||||||
loop {
|
loop {
|
||||||
let mut write_batch = HashMap::new();
|
let mut write_batch = HashMap::new();
|
||||||
while write_batch.len() < 10 {
|
while write_batch.len() < 10 {
|
||||||
match fill_receiver.try_recv() {
|
match fill_receiver.try_recv() {
|
||||||
Ok(event) => {
|
Ok(event) => {
|
||||||
if !write_batch.contains_key(&event) {
|
write_batch.entry(event).or_insert(0);
|
||||||
write_batch.insert(event, 0);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Err(TryRecvError::Empty) => {
|
Err(TryRecvError::Empty) => {
|
||||||
if write_batch.len() > 0 {
|
if !write_batch.is_empty() {
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
continue;
|
continue;
|
||||||
|
@ -38,65 +35,64 @@ pub async fn persist_fill_events(
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
if write_batch.len() > 0 {
|
if !write_batch.is_empty() {
|
||||||
// print!("writing: {:?} events to DB\n", write_batch.len());
|
// print!("writing: {:?} events to DB\n", write_batch.len());
|
||||||
|
|
||||||
match conn.ping().await {
|
// match conn.ping().await {
|
||||||
Ok(_) => {
|
// Ok(_) => {
|
||||||
let upsert_statement = build_fills_upsert_statement(write_batch);
|
let upsert_statement = build_fills_upsert_statement(write_batch);
|
||||||
sqlx::query(&upsert_statement)
|
client
|
||||||
.execute(&mut conn)
|
.execute(&upsert_statement, &[])
|
||||||
.await
|
.await
|
||||||
.map_err_anyhow()
|
.map_err_anyhow()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
// }
|
||||||
Err(_) => {
|
// Err(_) => {
|
||||||
println!("Fills ping failed");
|
// println!("Fills ping failed");
|
||||||
break;
|
// break;
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn persist_candles(
|
pub async fn persist_candles(
|
||||||
pool: Pool<Postgres>,
|
pool: Pool,
|
||||||
candles_receiver: &mut Receiver<Vec<Candle>>,
|
candles_receiver: &mut Receiver<Vec<Candle>>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let mut conn = pool.acquire().await.unwrap();
|
let client = pool.get().await.unwrap();
|
||||||
loop {
|
loop {
|
||||||
match conn.ping().await {
|
// match client.ping().await {
|
||||||
Ok(_) => {
|
// Ok(_) => {
|
||||||
match candles_receiver.try_recv() {
|
match candles_receiver.try_recv() {
|
||||||
Ok(candles) => {
|
Ok(candles) => {
|
||||||
if candles.len() == 0 {
|
if candles.is_empty() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// print!("writing: {:?} candles to DB\n", candles.len());
|
// print!("writing: {:?} candles to DB\n", candles.len());
|
||||||
let upsert_statement = build_candes_upsert_statement(candles);
|
let upsert_statement = build_candles_upsert_statement(candles);
|
||||||
sqlx::query(&upsert_statement)
|
client
|
||||||
.execute(&mut conn)
|
.execute(&upsert_statement, &[])
|
||||||
.await
|
.await
|
||||||
.map_err_anyhow()
|
.map_err_anyhow()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
|
||||||
Err(TryRecvError::Empty) => continue,
|
|
||||||
Err(TryRecvError::Disconnected) => {
|
|
||||||
panic!("Candles sender must stay alive")
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(TryRecvError::Empty) => continue,
|
||||||
println!("Candle ping failed");
|
Err(TryRecvError::Disconnected) => {
|
||||||
break;
|
panic!("Candles sender must stay alive")
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
// }
|
||||||
|
// Err(_) => {
|
||||||
|
// println!("Candle ping failed");
|
||||||
|
// break;
|
||||||
|
// }
|
||||||
|
// };
|
||||||
}
|
}
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn build_fills_upsert_statement(events: HashMap<OpenBookFillEventLog, u8>) -> String {
|
#[allow(deprecated)]
|
||||||
|
fn build_fills_upsert_statement(events: HashMap<OpenBookFillEvent, u8>) -> String {
|
||||||
let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES");
|
let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES");
|
||||||
for (idx, event) in events.keys().enumerate() {
|
for (idx, event) in events.keys().enumerate() {
|
||||||
let mut hasher = DefaultHasher::new();
|
let mut hasher = DefaultHasher::new();
|
||||||
|
@ -104,7 +100,7 @@ fn build_fills_upsert_statement(events: HashMap<OpenBookFillEventLog, u8>) -> St
|
||||||
let val_str = format!(
|
let val_str = format!(
|
||||||
"({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {})",
|
"({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {})",
|
||||||
hasher.finish(),
|
hasher.finish(),
|
||||||
Utc::now().to_rfc3339(),
|
to_timestampz(event.block_time as u64).to_rfc3339(),
|
||||||
event.market,
|
event.market,
|
||||||
event.open_orders,
|
event.open_orders,
|
||||||
event.open_orders_owner,
|
event.open_orders_owner,
|
||||||
|
@ -130,7 +126,7 @@ fn build_fills_upsert_statement(events: HashMap<OpenBookFillEventLog, u8>) -> St
|
||||||
stmt
|
stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
fn build_candes_upsert_statement(candles: Vec<Candle>) -> String {
|
pub fn build_candles_upsert_statement(candles: Vec<Candle>) -> String {
|
||||||
let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES");
|
let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES");
|
||||||
for (idx, candle) in candles.iter().enumerate() {
|
for (idx, candle) in candles.iter().enumerate() {
|
||||||
let val_str = format!(
|
let val_str = format!(
|
||||||
|
@ -176,7 +172,7 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_event_hashing() {
|
fn test_event_hashing() {
|
||||||
let event_1 = OpenBookFillEventLog {
|
let event_1 = OpenBookFillEvent {
|
||||||
market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(),
|
market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(),
|
||||||
open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(),
|
open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(),
|
||||||
open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw")
|
open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw")
|
||||||
|
@ -191,9 +187,10 @@ mod tests {
|
||||||
fee_tier: 0,
|
fee_tier: 0,
|
||||||
client_order_id: None,
|
client_order_id: None,
|
||||||
referrer_rebate: Some(841),
|
referrer_rebate: Some(841),
|
||||||
|
block_time: 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
let event_2 = OpenBookFillEventLog {
|
let event_2 = OpenBookFillEvent {
|
||||||
market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(),
|
market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(),
|
||||||
open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(),
|
open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(),
|
||||||
open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw")
|
open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw")
|
||||||
|
@ -208,6 +205,7 @@ mod tests {
|
||||||
fee_tier: 0,
|
fee_tier: 0,
|
||||||
client_order_id: None,
|
client_order_id: None,
|
||||||
referrer_rebate: Some(841),
|
referrer_rebate: Some(841),
|
||||||
|
block_time: 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut h1 = DefaultHasher::new();
|
let mut h1 = DefaultHasher::new();
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use openbook_candles::{
|
use openbook_candles::{
|
||||||
database::fetch::fetch_tradingview_candles,
|
database::fetch::fetch_candles_from,
|
||||||
structs::{markets::valid_market, resolution::Resolution, tradingview::TvResponse},
|
structs::{markets::valid_market, resolution::Resolution, tradingview::TvResponse},
|
||||||
utils::{to_timestampz, WebContext},
|
utils::{to_timestampz, WebContext},
|
||||||
};
|
};
|
||||||
|
@ -34,9 +34,8 @@ pub async fn get_candles(
|
||||||
let from = to_timestampz(info.from);
|
let from = to_timestampz(info.from);
|
||||||
let to = to_timestampz(info.to);
|
let to = to_timestampz(info.to);
|
||||||
|
|
||||||
let mut conn = context.pool.acquire().await.unwrap();
|
|
||||||
let candles =
|
let candles =
|
||||||
match fetch_tradingview_candles(&mut conn, &info.market_name, resolution, from, to).await {
|
match fetch_candles_from(&context.pool, &info.market_name, resolution, from, to).await {
|
||||||
Ok(c) => c,
|
Ok(c) => c,
|
||||||
Err(_) => return Err(ServerError::DbQueryError),
|
Err(_) => return Err(ServerError::DbQueryError),
|
||||||
};
|
};
|
||||||
|
|
|
@ -10,7 +10,7 @@ use openbook_candles::{
|
||||||
CoinGecko24HourVolume, CoinGeckoOrderBook, CoinGeckoPair, CoinGeckoTicker,
|
CoinGecko24HourVolume, CoinGeckoOrderBook, CoinGeckoPair, CoinGeckoTicker,
|
||||||
PgCoinGecko24HighLow,
|
PgCoinGecko24HighLow,
|
||||||
},
|
},
|
||||||
slab::{get_best_bids_and_asks, get_orderbooks_with_depth},
|
slab::get_orderbooks_with_depth,
|
||||||
},
|
},
|
||||||
utils::WebContext,
|
utils::WebContext,
|
||||||
};
|
};
|
||||||
|
@ -49,17 +49,14 @@ pub async fn pairs(context: web::Data<WebContext>) -> Result<HttpResponse, Serve
|
||||||
|
|
||||||
#[get("/tickers")]
|
#[get("/tickers")]
|
||||||
pub async fn tickers(context: web::Data<WebContext>) -> Result<HttpResponse, ServerError> {
|
pub async fn tickers(context: web::Data<WebContext>) -> Result<HttpResponse, ServerError> {
|
||||||
let client = RpcClient::new(context.rpc_url.clone());
|
// let client = RpcClient::new(context.rpc_url.clone());
|
||||||
let markets = &context.markets;
|
let markets = &context.markets;
|
||||||
|
|
||||||
let mut c1 = context.pool.acquire().await.unwrap();
|
|
||||||
let mut c2 = context.pool.acquire().await.unwrap();
|
|
||||||
// let bba_fut = get_best_bids_and_asks(client, markets);
|
// let bba_fut = get_best_bids_and_asks(client, markets);
|
||||||
let volume_fut = fetch_coingecko_24h_volume(&mut c1);
|
let volume_fut = fetch_coingecko_24h_volume(&context.pool);
|
||||||
let high_low_fut = fetch_coingecko_24h_high_low(&mut c2);
|
let high_low_fut = fetch_coingecko_24h_high_low(&context.pool);
|
||||||
|
|
||||||
let (volume_query, high_low_quey) =
|
let (volume_query, high_low_quey) = join!(volume_fut, high_low_fut,);
|
||||||
join!(volume_fut, high_low_fut,);
|
|
||||||
|
|
||||||
let raw_volumes = match volume_query {
|
let raw_volumes = match volume_query {
|
||||||
Ok(c) => c,
|
Ok(c) => c,
|
||||||
|
@ -74,7 +71,7 @@ pub async fn tickers(context: web::Data<WebContext>) -> Result<HttpResponse, Ser
|
||||||
let default_volume = CoinGecko24HourVolume::default();
|
let default_volume = CoinGecko24HourVolume::default();
|
||||||
let volumes: Vec<CoinGecko24HourVolume> = raw_volumes
|
let volumes: Vec<CoinGecko24HourVolume> = raw_volumes
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|v| v.convert_to_readable(&markets))
|
.map(|v| v.convert_to_readable(markets))
|
||||||
.collect();
|
.collect();
|
||||||
let tickers = markets
|
let tickers = markets
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -105,7 +102,7 @@ pub async fn tickers(context: web::Data<WebContext>) -> Result<HttpResponse, Ser
|
||||||
Ok(HttpResponse::Ok().json(tickers))
|
Ok(HttpResponse::Ok().json(tickers))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[get("/orderbook")]
|
#[get("/orderbook")] // TODO: implement an optional geyser version
|
||||||
pub async fn orderbook(
|
pub async fn orderbook(
|
||||||
info: web::Query<OrderBookParams>,
|
info: web::Query<OrderBookParams>,
|
||||||
context: web::Data<WebContext>,
|
context: web::Data<WebContext>,
|
||||||
|
|
|
@ -4,7 +4,7 @@ use actix_web::{
|
||||||
App, HttpServer,
|
App, HttpServer,
|
||||||
};
|
};
|
||||||
use candles::get_candles;
|
use candles::get_candles;
|
||||||
use dotenv;
|
|
||||||
use markets::get_markets;
|
use markets::get_markets;
|
||||||
use openbook_candles::{
|
use openbook_candles::{
|
||||||
database::initialize::connect_to_database,
|
database::initialize::connect_to_database,
|
||||||
|
@ -29,21 +29,15 @@ async fn main() -> std::io::Result<()> {
|
||||||
assert!(args.len() == 2);
|
assert!(args.len() == 2);
|
||||||
let path_to_markets_json = &args[1];
|
let path_to_markets_json = &args[1];
|
||||||
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
|
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
|
||||||
let database_url: String = dotenv::var("DATABASE_URL").unwrap();
|
let bind_addr: String = dotenv::var("SERVER_BIND_ADDR").expect("reading bind addr from env");
|
||||||
let max_pg_pool_connections: u32 = dotenv::var("MAX_PG_POOL_CONNS_SERVER")
|
|
||||||
.unwrap()
|
|
||||||
.parse::<u32>()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let config = Config {
|
let config = Config {
|
||||||
rpc_url: rpc_url.clone(),
|
rpc_url: rpc_url.clone(),
|
||||||
database_url: database_url.clone(),
|
|
||||||
max_pg_pool_connections,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let markets = load_markets(path_to_markets_json);
|
let markets = load_markets(path_to_markets_json);
|
||||||
let market_infos = fetch_market_infos(&config, markets).await.unwrap();
|
let market_infos = fetch_market_infos(&config, markets).await.unwrap();
|
||||||
let pool = connect_to_database(&config).await.unwrap();
|
let pool = connect_to_database().await.unwrap();
|
||||||
|
|
||||||
let context = Data::new(WebContext {
|
let context = Data::new(WebContext {
|
||||||
rpc_url,
|
rpc_url,
|
||||||
|
@ -65,7 +59,7 @@ async fn main() -> std::io::Result<()> {
|
||||||
.service(coingecko::service()),
|
.service(coingecko::service()),
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.bind(("127.0.0.1", 8080))?
|
.bind(&bind_addr)?
|
||||||
.run()
|
.run()
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,14 +31,17 @@ pub async fn get_top_traders_by_base_volume(
|
||||||
let from = to_timestampz(info.from);
|
let from = to_timestampz(info.from);
|
||||||
let to = to_timestampz(info.to);
|
let to = to_timestampz(info.to);
|
||||||
|
|
||||||
let mut conn = context.pool.acquire().await.unwrap();
|
let raw_traders = match fetch_top_traders_by_base_volume_from(
|
||||||
let raw_traders =
|
&context.pool,
|
||||||
match fetch_top_traders_by_base_volume_from(&mut conn, &selected_market.address, from, to)
|
&selected_market.address,
|
||||||
.await
|
from,
|
||||||
{
|
to,
|
||||||
Ok(c) => c,
|
)
|
||||||
Err(_) => return Err(ServerError::DbQueryError),
|
.await
|
||||||
};
|
{
|
||||||
|
Ok(c) => c,
|
||||||
|
Err(_) => return Err(ServerError::DbQueryError),
|
||||||
|
};
|
||||||
|
|
||||||
let traders = raw_traders
|
let traders = raw_traders
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
@ -48,7 +51,7 @@ pub async fn get_top_traders_by_base_volume(
|
||||||
let response = TraderResponse {
|
let response = TraderResponse {
|
||||||
start_time: info.from,
|
start_time: info.from,
|
||||||
end_time: info.to,
|
end_time: info.to,
|
||||||
traders: traders,
|
traders,
|
||||||
volume_type: VolumeType::Base.to_string(),
|
volume_type: VolumeType::Base.to_string(),
|
||||||
};
|
};
|
||||||
Ok(HttpResponse::Ok().json(response))
|
Ok(HttpResponse::Ok().json(response))
|
||||||
|
@ -67,14 +70,17 @@ pub async fn get_top_traders_by_quote_volume(
|
||||||
let from = to_timestampz(info.from);
|
let from = to_timestampz(info.from);
|
||||||
let to = to_timestampz(info.to);
|
let to = to_timestampz(info.to);
|
||||||
|
|
||||||
let mut conn = context.pool.acquire().await.unwrap();
|
let raw_traders = match fetch_top_traders_by_quote_volume_from(
|
||||||
let raw_traders =
|
&context.pool,
|
||||||
match fetch_top_traders_by_quote_volume_from(&mut conn, &selected_market.address, from, to)
|
&selected_market.address,
|
||||||
.await
|
from,
|
||||||
{
|
to,
|
||||||
Ok(c) => c,
|
)
|
||||||
Err(_) => return Err(ServerError::DbQueryError),
|
.await
|
||||||
};
|
{
|
||||||
|
Ok(c) => c,
|
||||||
|
Err(_) => return Err(ServerError::DbQueryError),
|
||||||
|
};
|
||||||
|
|
||||||
let traders = raw_traders
|
let traders = raw_traders
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
@ -84,7 +90,7 @@ pub async fn get_top_traders_by_quote_volume(
|
||||||
let response = TraderResponse {
|
let response = TraderResponse {
|
||||||
start_time: info.from,
|
start_time: info.from,
|
||||||
end_time: info.to,
|
end_time: info.to,
|
||||||
traders: traders,
|
traders,
|
||||||
volume_type: VolumeType::Quote.to_string(),
|
volume_type: VolumeType::Quote.to_string(),
|
||||||
};
|
};
|
||||||
Ok(HttpResponse::Ok().json(response))
|
Ok(HttpResponse::Ok().json(response))
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
use chrono::{DateTime, NaiveDateTime, Utc};
|
use chrono::{DateTime, NaiveDateTime, Utc};
|
||||||
use num_traits::Zero;
|
use tokio_postgres::Row;
|
||||||
use sqlx::types::Decimal;
|
|
||||||
|
|
||||||
use super::resolution::Resolution;
|
use super::resolution::Resolution;
|
||||||
|
|
||||||
|
@ -10,11 +9,11 @@ pub struct Candle {
|
||||||
pub start_time: DateTime<Utc>,
|
pub start_time: DateTime<Utc>,
|
||||||
pub end_time: DateTime<Utc>,
|
pub end_time: DateTime<Utc>,
|
||||||
pub resolution: String,
|
pub resolution: String,
|
||||||
pub open: Decimal,
|
pub open: f64,
|
||||||
pub close: Decimal,
|
pub close: f64,
|
||||||
pub high: Decimal,
|
pub high: f64,
|
||||||
pub low: Decimal,
|
pub low: f64,
|
||||||
pub volume: Decimal,
|
pub volume: f64,
|
||||||
pub complete: bool,
|
pub complete: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -25,12 +24,27 @@ impl Candle {
|
||||||
start_time: DateTime::from_utc(NaiveDateTime::MIN, Utc),
|
start_time: DateTime::from_utc(NaiveDateTime::MIN, Utc),
|
||||||
end_time: DateTime::from_utc(NaiveDateTime::MIN, Utc),
|
end_time: DateTime::from_utc(NaiveDateTime::MIN, Utc),
|
||||||
resolution: resolution.to_string(),
|
resolution: resolution.to_string(),
|
||||||
open: Decimal::zero(),
|
open: 0.0,
|
||||||
close: Decimal::zero(),
|
close: 0.0,
|
||||||
high: Decimal::zero(),
|
high: 0.0,
|
||||||
low: Decimal::zero(),
|
low: 0.0,
|
||||||
volume: Decimal::zero(),
|
volume: 0.0,
|
||||||
complete: false,
|
complete: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn from_row(row: Row) -> Self {
|
||||||
|
Candle {
|
||||||
|
market_name: row.get(0),
|
||||||
|
start_time: row.get(1),
|
||||||
|
end_time: row.get(2),
|
||||||
|
resolution: row.get(3),
|
||||||
|
open: row.get(4),
|
||||||
|
close: row.get(5),
|
||||||
|
high: row.get(6),
|
||||||
|
low: row.get(7),
|
||||||
|
volume: row.get(8),
|
||||||
|
complete: row.get(9),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use sqlx::types::Decimal;
|
use tokio_postgres::Row;
|
||||||
|
|
||||||
use super::{markets::MarketInfo, openbook::token_factor};
|
use super::{markets::MarketInfo, openbook::token_factor};
|
||||||
|
|
||||||
|
@ -35,8 +35,8 @@ pub struct CoinGeckoTicker {
|
||||||
|
|
||||||
pub struct PgCoinGecko24HourVolume {
|
pub struct PgCoinGecko24HourVolume {
|
||||||
pub address: String,
|
pub address: String,
|
||||||
pub raw_base_size: Decimal,
|
pub raw_base_size: f64,
|
||||||
pub raw_quote_size: Decimal,
|
pub raw_quote_size: f64,
|
||||||
}
|
}
|
||||||
impl PgCoinGecko24HourVolume {
|
impl PgCoinGecko24HourVolume {
|
||||||
pub fn convert_to_readable(&self, markets: &Vec<MarketInfo>) -> CoinGecko24HourVolume {
|
pub fn convert_to_readable(&self, markets: &Vec<MarketInfo>) -> CoinGecko24HourVolume {
|
||||||
|
@ -49,19 +49,38 @@ impl PgCoinGecko24HourVolume {
|
||||||
target_volume,
|
target_volume,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn from_row(row: Row) -> Self {
|
||||||
|
PgCoinGecko24HourVolume {
|
||||||
|
address: row.get(0),
|
||||||
|
raw_base_size: row.get(1),
|
||||||
|
raw_quote_size: row.get(2),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
pub struct CoinGecko24HourVolume {
|
pub struct CoinGecko24HourVolume {
|
||||||
pub market_name: String,
|
pub market_name: String,
|
||||||
pub base_volume: Decimal,
|
pub base_volume: f64,
|
||||||
pub target_volume: Decimal,
|
pub target_volume: f64,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
pub struct PgCoinGecko24HighLow {
|
pub struct PgCoinGecko24HighLow {
|
||||||
pub market_name: String,
|
pub market_name: String,
|
||||||
pub high: Decimal,
|
pub high: f64,
|
||||||
pub low: Decimal,
|
pub low: f64,
|
||||||
pub close: Decimal,
|
pub close: f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PgCoinGecko24HighLow {
|
||||||
|
pub fn from_row(row: Row) -> Self {
|
||||||
|
PgCoinGecko24HighLow {
|
||||||
|
market_name: row.get(0),
|
||||||
|
high: row.get(1),
|
||||||
|
low: row.get(2),
|
||||||
|
close: row.get(3),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -111,8 +111,8 @@ pub async fn fetch_market_infos(
|
||||||
.value;
|
.value;
|
||||||
for i in 0..mint_results.len() {
|
for i in 0..mint_results.len() {
|
||||||
let mut mint_account = mint_results[i].as_ref().unwrap().clone();
|
let mut mint_account = mint_results[i].as_ref().unwrap().clone();
|
||||||
let mut mint_bytes: &[u8] = &mut mint_account.data[..];
|
let mint_bytes: &[u8] = &mut mint_account.data[..];
|
||||||
let mint = Mint::unpack_from_slice(&mut mint_bytes).unwrap();
|
let mint = Mint::unpack_from_slice(mint_bytes).unwrap();
|
||||||
|
|
||||||
mint_key_map.insert(mint_keys[i], mint.decimals);
|
mint_key_map.insert(mint_keys[i], mint.decimals);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +1,12 @@
|
||||||
use anchor_lang::{event, AnchorDeserialize, AnchorSerialize};
|
use anchor_lang::{event, AnchorDeserialize, AnchorSerialize};
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use num_traits::FromPrimitive;
|
use num_traits::Pow;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use sqlx::types::Decimal;
|
use tokio_postgres::Row;
|
||||||
|
|
||||||
#[event]
|
#[event]
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
pub struct OpenBookFillEventLog {
|
pub struct OpenBookFillEventRaw {
|
||||||
pub market: Pubkey,
|
pub market: Pubkey,
|
||||||
pub open_orders: Pubkey,
|
pub open_orders: Pubkey,
|
||||||
pub open_orders_owner: Pubkey,
|
pub open_orders_owner: Pubkey,
|
||||||
|
@ -21,15 +21,66 @@ pub struct OpenBookFillEventLog {
|
||||||
pub client_order_id: Option<u64>,
|
pub client_order_id: Option<u64>,
|
||||||
pub referrer_rebate: Option<u64>,
|
pub referrer_rebate: Option<u64>,
|
||||||
}
|
}
|
||||||
|
impl OpenBookFillEventRaw {
|
||||||
|
pub fn with_time(self, block_time: i64) -> OpenBookFillEvent {
|
||||||
|
OpenBookFillEvent {
|
||||||
|
market: self.market,
|
||||||
|
open_orders: self.open_orders,
|
||||||
|
open_orders_owner: self.open_orders_owner,
|
||||||
|
bid: self.bid,
|
||||||
|
maker: self.maker,
|
||||||
|
native_qty_paid: self.native_qty_paid,
|
||||||
|
native_qty_received: self.native_qty_received,
|
||||||
|
native_fee_or_rebate: self.native_fee_or_rebate,
|
||||||
|
order_id: self.order_id,
|
||||||
|
owner_slot: self.owner_slot,
|
||||||
|
fee_tier: self.fee_tier,
|
||||||
|
client_order_id: self.client_order_id,
|
||||||
|
referrer_rebate: self.referrer_rebate,
|
||||||
|
block_time,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[event]
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
|
pub struct OpenBookFillEvent {
|
||||||
|
pub market: Pubkey,
|
||||||
|
pub open_orders: Pubkey,
|
||||||
|
pub open_orders_owner: Pubkey,
|
||||||
|
pub bid: bool,
|
||||||
|
pub maker: bool,
|
||||||
|
pub native_qty_paid: u64,
|
||||||
|
pub native_qty_received: u64,
|
||||||
|
pub native_fee_or_rebate: u64,
|
||||||
|
pub order_id: u128,
|
||||||
|
pub owner_slot: u8,
|
||||||
|
pub fee_tier: u8,
|
||||||
|
pub client_order_id: Option<u64>,
|
||||||
|
pub referrer_rebate: Option<u64>,
|
||||||
|
pub block_time: i64,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, PartialEq)]
|
#[derive(Copy, Clone, Debug, PartialEq)]
|
||||||
pub struct PgOpenBookFill {
|
pub struct PgOpenBookFill {
|
||||||
pub time: DateTime<Utc>,
|
pub time: DateTime<Utc>,
|
||||||
pub bid: bool,
|
pub bid: bool,
|
||||||
pub maker: bool,
|
pub maker: bool,
|
||||||
pub native_qty_paid: Decimal,
|
pub native_qty_paid: f64,
|
||||||
pub native_qty_received: Decimal,
|
pub native_qty_received: f64,
|
||||||
pub native_fee_or_rebate: Decimal,
|
pub native_fee_or_rebate: f64,
|
||||||
|
}
|
||||||
|
impl PgOpenBookFill {
|
||||||
|
pub fn from_row(row: Row) -> Self {
|
||||||
|
PgOpenBookFill {
|
||||||
|
time: row.get(0),
|
||||||
|
bid: row.get(1),
|
||||||
|
maker: row.get(2),
|
||||||
|
native_qty_paid: row.get(3),
|
||||||
|
native_qty_received: row.get(4),
|
||||||
|
native_fee_or_rebate: row.get(5),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, AnchorDeserialize)]
|
#[derive(Copy, Clone, AnchorDeserialize)]
|
||||||
|
@ -91,7 +142,7 @@ pub fn calculate_fill_price_and_size(
|
||||||
fill: PgOpenBookFill,
|
fill: PgOpenBookFill,
|
||||||
base_decimals: u8,
|
base_decimals: u8,
|
||||||
quote_decimals: u8,
|
quote_decimals: u8,
|
||||||
) -> (Decimal, Decimal) {
|
) -> (f64, f64) {
|
||||||
if fill.bid {
|
if fill.bid {
|
||||||
let price_before_fees = if fill.maker {
|
let price_before_fees = if fill.maker {
|
||||||
fill.native_qty_paid + fill.native_fee_or_rebate
|
fill.native_qty_paid + fill.native_fee_or_rebate
|
||||||
|
@ -115,6 +166,6 @@ pub fn calculate_fill_price_and_size(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn token_factor(decimals: u8) -> Decimal {
|
pub fn token_factor(decimals: u8) -> f64 {
|
||||||
Decimal::from_u64(10u64.pow(decimals as u32)).unwrap()
|
10f64.pow(decimals as f64)
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,6 @@ use futures::join;
|
||||||
use num_enum::{IntoPrimitive, TryFromPrimitive};
|
use num_enum::{IntoPrimitive, TryFromPrimitive};
|
||||||
use num_traits::ToPrimitive;
|
use num_traits::ToPrimitive;
|
||||||
use solana_client::nonblocking::rpc_client::RpcClient;
|
use solana_client::nonblocking::rpc_client::RpcClient;
|
||||||
use sqlx::types::Decimal;
|
|
||||||
use std::{
|
use std::{
|
||||||
convert::TryFrom,
|
convert::TryFrom,
|
||||||
mem::{align_of, size_of},
|
mem::{align_of, size_of},
|
||||||
|
@ -102,19 +101,19 @@ impl LeafNode {
|
||||||
NonZeroU64::new((self.key >> 64) as u64).unwrap()
|
NonZeroU64::new((self.key >> 64) as u64).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn readable_price(&self, market: &MarketInfo) -> Decimal {
|
pub fn readable_price(&self, market: &MarketInfo) -> f64 {
|
||||||
let price_lots = Decimal::from((self.key >> 64) as u64);
|
let price_lots = (self.key >> 64) as f64;
|
||||||
let base_multiplier = token_factor(market.base_decimals);
|
let base_multiplier = token_factor(market.base_decimals);
|
||||||
let quote_multiplier = token_factor(market.quote_decimals);
|
let quote_multiplier = token_factor(market.quote_decimals);
|
||||||
let base_lot_size = Decimal::from(market.base_lot_size);
|
let base_lot_size = market.base_lot_size as f64;
|
||||||
let quote_lot_size = Decimal::from(market.quote_lot_size);
|
let quote_lot_size = market.quote_lot_size as f64;
|
||||||
(price_lots * quote_lot_size * base_multiplier) / (base_lot_size * quote_multiplier)
|
(price_lots * quote_lot_size * base_multiplier) / (base_lot_size * quote_multiplier)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn readable_quantity(&self, market: &MarketInfo) -> Decimal {
|
pub fn readable_quantity(&self, market: &MarketInfo) -> f64 {
|
||||||
let base_lot_size = Decimal::from(market.base_lot_size);
|
let base_lot_size = market.base_lot_size as f64;
|
||||||
let base_multiplier = token_factor(market.base_decimals);
|
let base_multiplier = token_factor(market.base_decimals);
|
||||||
Decimal::from(self.quantity) * base_lot_size / base_multiplier
|
self.quantity as f64 * base_lot_size / base_multiplier
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
|
@ -406,7 +405,7 @@ impl Slab {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_best(&self, market: &MarketInfo, bid: bool) -> Decimal {
|
pub fn get_best(&self, market: &MarketInfo, bid: bool) -> f64 {
|
||||||
let min = if bid {
|
let min = if bid {
|
||||||
self.find_max()
|
self.find_max()
|
||||||
} else {
|
} else {
|
||||||
|
@ -419,7 +418,7 @@ impl Slab {
|
||||||
pub async fn get_best_bids_and_asks(
|
pub async fn get_best_bids_and_asks(
|
||||||
client: RpcClient,
|
client: RpcClient,
|
||||||
markets: &Vec<MarketInfo>,
|
markets: &Vec<MarketInfo>,
|
||||||
) -> (Vec<Decimal>, Vec<Decimal>) {
|
) -> (Vec<f64>, Vec<f64>) {
|
||||||
let bid_keys = markets
|
let bid_keys = markets
|
||||||
.iter()
|
.iter()
|
||||||
.map(|m| Pubkey::from_str(&m.bids_key).unwrap())
|
.map(|m| Pubkey::from_str(&m.bids_key).unwrap())
|
||||||
|
|
|
@ -2,15 +2,24 @@ use std::fmt;
|
||||||
|
|
||||||
use num_traits::ToPrimitive;
|
use num_traits::ToPrimitive;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use sqlx::types::Decimal;
|
use tokio_postgres::Row;
|
||||||
|
|
||||||
use super::openbook::token_factor;
|
use super::openbook::token_factor;
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq)]
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
pub struct PgTrader {
|
pub struct PgTrader {
|
||||||
pub open_orders_owner: String,
|
pub open_orders_owner: String,
|
||||||
pub raw_ask_size: Decimal,
|
pub raw_ask_size: f64,
|
||||||
pub raw_bid_size: Decimal,
|
pub raw_bid_size: f64,
|
||||||
|
}
|
||||||
|
impl PgTrader {
|
||||||
|
pub fn from_row(row: Row) -> Self {
|
||||||
|
PgTrader {
|
||||||
|
open_orders_owner: row.get(0),
|
||||||
|
raw_ask_size: row.get(1),
|
||||||
|
raw_bid_size: row.get(2),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq, Serialize)]
|
#[derive(Clone, Debug, PartialEq, Serialize)]
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use chrono::{NaiveDateTime, Utc};
|
use chrono::{NaiveDateTime, Utc};
|
||||||
|
use deadpool_postgres::Pool;
|
||||||
use serde_derive::Deserialize;
|
use serde_derive::Deserialize;
|
||||||
use sqlx::{Pool, Postgres};
|
|
||||||
|
|
||||||
use crate::structs::markets::MarketInfo;
|
use crate::structs::markets::MarketInfo;
|
||||||
|
|
||||||
|
@ -16,20 +16,53 @@ impl<T, E: std::fmt::Debug> AnyhowWrap for Result<T, E> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, serde::Deserialize)]
|
||||||
|
pub struct PgConfig {
|
||||||
|
pub pg: deadpool_postgres::Config,
|
||||||
|
pub pg_max_pool_connections: usize,
|
||||||
|
pub pg_use_ssl: bool,
|
||||||
|
pub pg_ca_cert_path: Option<String>,
|
||||||
|
pub pg_client_key_path: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PgConfig {
|
||||||
|
pub fn from_env() -> Result<Self, config::ConfigError> {
|
||||||
|
config::Config::builder()
|
||||||
|
.add_source(config::Environment::default().separator("_"))
|
||||||
|
.add_source(config::Environment::default())
|
||||||
|
.build()?
|
||||||
|
.try_deserialize()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Deserialize)]
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
pub rpc_url: String,
|
pub rpc_url: String,
|
||||||
pub database_url: String,
|
|
||||||
pub max_pg_pool_connections: u32,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct WebContext {
|
pub struct WebContext {
|
||||||
pub rpc_url: String,
|
pub rpc_url: String,
|
||||||
pub markets: Vec<MarketInfo>,
|
pub markets: Vec<MarketInfo>,
|
||||||
pub pool: Pool<Postgres>,
|
pub pool: Pool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(deprecated)]
|
#[allow(deprecated)]
|
||||||
pub fn to_timestampz(seconds: u64) -> chrono::DateTime<Utc> {
|
pub fn to_timestampz(seconds: u64) -> chrono::DateTime<Utc> {
|
||||||
chrono::DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(seconds as i64, 0), Utc)
|
chrono::DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(seconds as i64, 0), Utc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn f64_max(a: f64, b: f64) -> f64 {
|
||||||
|
if a >= b {
|
||||||
|
a
|
||||||
|
} else {
|
||||||
|
b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn f64_min(a: f64, b: f64) -> f64 {
|
||||||
|
if a < b {
|
||||||
|
a
|
||||||
|
} else {
|
||||||
|
b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use chrono::{DateTime, Duration, DurationRound, Utc};
|
use chrono::{DateTime, Duration, DurationRound, Utc};
|
||||||
use sqlx::{pool::PoolConnection, Postgres};
|
use deadpool_postgres::Pool;
|
||||||
use std::cmp::{max, min};
|
use std::cmp::max;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
database::fetch::{fetch_candles_from, fetch_earliest_candles, fetch_latest_finished_candle},
|
database::fetch::{fetch_candles_from, fetch_earliest_candles, fetch_latest_finished_candle},
|
||||||
|
@ -8,28 +8,29 @@ use crate::{
|
||||||
candle::Candle,
|
candle::Candle,
|
||||||
resolution::{day, Resolution},
|
resolution::{day, Resolution},
|
||||||
},
|
},
|
||||||
|
utils::{f64_max, f64_min},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub async fn batch_higher_order_candles(
|
pub async fn batch_higher_order_candles(
|
||||||
conn: &mut PoolConnection<Postgres>,
|
pool: &Pool,
|
||||||
market_name: &str,
|
market_name: &str,
|
||||||
resolution: Resolution,
|
resolution: Resolution,
|
||||||
) -> anyhow::Result<Vec<Candle>> {
|
) -> anyhow::Result<Vec<Candle>> {
|
||||||
let latest_candle = fetch_latest_finished_candle(conn, market_name, resolution).await?;
|
let latest_candle = fetch_latest_finished_candle(pool, market_name, resolution).await?;
|
||||||
|
|
||||||
match latest_candle {
|
match latest_candle {
|
||||||
Some(candle) => {
|
Some(candle) => {
|
||||||
let start_time = candle.end_time;
|
let start_time = candle.end_time;
|
||||||
let end_time = start_time + day();
|
let end_time = start_time + day();
|
||||||
let mut constituent_candles = fetch_candles_from(
|
let mut constituent_candles = fetch_candles_from(
|
||||||
conn,
|
pool,
|
||||||
market_name,
|
market_name,
|
||||||
resolution.get_constituent_resolution(),
|
resolution.get_constituent_resolution(),
|
||||||
start_time,
|
start_time,
|
||||||
end_time,
|
end_time,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
if constituent_candles.len() == 0 {
|
if constituent_candles.is_empty() {
|
||||||
return Ok(Vec::new());
|
return Ok(Vec::new());
|
||||||
}
|
}
|
||||||
let combined_candles = combine_into_higher_order_candles(
|
let combined_candles = combine_into_higher_order_candles(
|
||||||
|
@ -42,9 +43,9 @@ pub async fn batch_higher_order_candles(
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
let mut constituent_candles =
|
let mut constituent_candles =
|
||||||
fetch_earliest_candles(conn, market_name, resolution.get_constituent_resolution())
|
fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution())
|
||||||
.await?;
|
.await?;
|
||||||
if constituent_candles.len() == 0 {
|
if constituent_candles.is_empty() {
|
||||||
// println!(
|
// println!(
|
||||||
// "Batching {}, but no candles found for: {:?}, {}",
|
// "Batching {}, but no candles found for: {:?}, {}",
|
||||||
// resolution,
|
// resolution,
|
||||||
|
@ -55,7 +56,7 @@ pub async fn batch_higher_order_candles(
|
||||||
}
|
}
|
||||||
let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
|
let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
|
||||||
|
|
||||||
if constituent_candles.len() == 0 {
|
if constituent_candles.is_empty() {
|
||||||
return Ok(Vec::new());
|
return Ok(Vec::new());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -99,7 +100,7 @@ fn combine_into_higher_order_candles(
|
||||||
let mut combined_candles = vec![empty_candle; num_candles];
|
let mut combined_candles = vec![empty_candle; num_candles];
|
||||||
|
|
||||||
let mut con_iter = constituent_candles.iter_mut().peekable();
|
let mut con_iter = constituent_candles.iter_mut().peekable();
|
||||||
let mut start_time = st.clone();
|
let mut start_time = st;
|
||||||
let mut end_time = start_time + duration;
|
let mut end_time = start_time + duration;
|
||||||
|
|
||||||
let mut last_candle = seed_candle;
|
let mut last_candle = seed_candle;
|
||||||
|
@ -112,8 +113,8 @@ fn combine_into_higher_order_candles(
|
||||||
|
|
||||||
while matches!(con_iter.peek(), Some(c) if c.end_time <= end_time) {
|
while matches!(con_iter.peek(), Some(c) if c.end_time <= end_time) {
|
||||||
let unit_candle = con_iter.next().unwrap();
|
let unit_candle = con_iter.next().unwrap();
|
||||||
combined_candles[i].high = max(combined_candles[i].high, unit_candle.high);
|
combined_candles[i].high = f64_max(combined_candles[i].high, unit_candle.high);
|
||||||
combined_candles[i].low = min(combined_candles[i].low, unit_candle.low);
|
combined_candles[i].low = f64_min(combined_candles[i].low, unit_candle.low);
|
||||||
combined_candles[i].close = unit_candle.close;
|
combined_candles[i].close = unit_candle.close;
|
||||||
combined_candles[i].volume += unit_candle.volume;
|
combined_candles[i].volume += unit_candle.volume;
|
||||||
combined_candles[i].complete = unit_candle.complete;
|
combined_candles[i].complete = unit_candle.complete;
|
||||||
|
@ -124,7 +125,7 @@ fn combine_into_higher_order_candles(
|
||||||
combined_candles[i].end_time = end_time;
|
combined_candles[i].end_time = end_time;
|
||||||
|
|
||||||
start_time = end_time;
|
start_time = end_time;
|
||||||
end_time = end_time + duration;
|
end_time += duration;
|
||||||
|
|
||||||
last_candle = combined_candles[i].clone();
|
last_candle = combined_candles[i].clone();
|
||||||
}
|
}
|
||||||
|
@ -143,3 +144,29 @@ fn trim_candles(mut c: Vec<Candle>, start_time: DateTime<Utc>) -> Vec<Candle> {
|
||||||
}
|
}
|
||||||
c
|
c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn backfill_batch_higher_order_candles(
|
||||||
|
pool: &Pool,
|
||||||
|
market_name: &str,
|
||||||
|
resolution: Resolution,
|
||||||
|
) -> anyhow::Result<Vec<Candle>> {
|
||||||
|
let mut constituent_candles =
|
||||||
|
fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution()).await?;
|
||||||
|
if constituent_candles.is_empty() {
|
||||||
|
return Ok(vec![]);
|
||||||
|
}
|
||||||
|
let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
|
||||||
|
|
||||||
|
let seed_candle = constituent_candles[0].clone();
|
||||||
|
let combined_candles = combine_into_higher_order_candles(
|
||||||
|
&mut constituent_candles,
|
||||||
|
resolution,
|
||||||
|
start_time,
|
||||||
|
seed_candle,
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(trim_candles(
|
||||||
|
combined_candles,
|
||||||
|
constituent_candles[0].start_time,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use std::cmp::{max, min};
|
use std::cmp::min;
|
||||||
|
|
||||||
use chrono::{DateTime, Duration, DurationRound, Utc};
|
use chrono::{DateTime, Duration, DurationRound, Utc};
|
||||||
use sqlx::{pool::PoolConnection, types::Decimal, Postgres};
|
use deadpool_postgres::Pool;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
|
database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
|
||||||
|
@ -11,15 +11,13 @@ use crate::{
|
||||||
openbook::{calculate_fill_price_and_size, PgOpenBookFill},
|
openbook::{calculate_fill_price_and_size, PgOpenBookFill},
|
||||||
resolution::{day, Resolution},
|
resolution::{day, Resolution},
|
||||||
},
|
},
|
||||||
|
utils::{f64_max, f64_min},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub async fn batch_1m_candles(
|
pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Result<Vec<Candle>> {
|
||||||
conn: &mut PoolConnection<Postgres>,
|
|
||||||
market: &MarketInfo,
|
|
||||||
) -> anyhow::Result<Vec<Candle>> {
|
|
||||||
let market_name = &market.name;
|
let market_name = &market.name;
|
||||||
let market_address = &market.address;
|
let market_address = &market.address;
|
||||||
let latest_candle = fetch_latest_finished_candle(conn, market_name, Resolution::R1m).await?;
|
let latest_candle = fetch_latest_finished_candle(pool, market_name, Resolution::R1m).await?;
|
||||||
|
|
||||||
match latest_candle {
|
match latest_candle {
|
||||||
Some(candle) => {
|
Some(candle) => {
|
||||||
|
@ -28,7 +26,7 @@ pub async fn batch_1m_candles(
|
||||||
start_time + day(),
|
start_time + day(),
|
||||||
Utc::now().duration_trunc(Duration::minutes(1))?,
|
Utc::now().duration_trunc(Duration::minutes(1))?,
|
||||||
);
|
);
|
||||||
let mut fills = fetch_fills_from(conn, market_address, start_time, end_time).await?;
|
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
|
||||||
let candles = combine_fills_into_1m_candles(
|
let candles = combine_fills_into_1m_candles(
|
||||||
&mut fills,
|
&mut fills,
|
||||||
market,
|
market,
|
||||||
|
@ -39,7 +37,7 @@ pub async fn batch_1m_candles(
|
||||||
Ok(candles)
|
Ok(candles)
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
let earliest_fill = fetch_earliest_fill(conn, market_address).await?;
|
let earliest_fill = fetch_earliest_fill(pool, market_address).await?;
|
||||||
|
|
||||||
if earliest_fill.is_none() {
|
if earliest_fill.is_none() {
|
||||||
println!("No fills found for: {:?}", market_name);
|
println!("No fills found for: {:?}", market_name);
|
||||||
|
@ -54,8 +52,8 @@ pub async fn batch_1m_candles(
|
||||||
start_time + day(),
|
start_time + day(),
|
||||||
Utc::now().duration_trunc(Duration::minutes(1))?,
|
Utc::now().duration_trunc(Duration::minutes(1))?,
|
||||||
);
|
);
|
||||||
let mut fills = fetch_fills_from(conn, market_address, start_time, end_time).await?;
|
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
|
||||||
if fills.len() > 0 {
|
if !fills.is_empty() {
|
||||||
let candles =
|
let candles =
|
||||||
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
|
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
|
||||||
Ok(candles)
|
Ok(candles)
|
||||||
|
@ -71,7 +69,7 @@ fn combine_fills_into_1m_candles(
|
||||||
market: &MarketInfo,
|
market: &MarketInfo,
|
||||||
st: DateTime<Utc>,
|
st: DateTime<Utc>,
|
||||||
et: DateTime<Utc>,
|
et: DateTime<Utc>,
|
||||||
maybe_last_price: Option<Decimal>,
|
maybe_last_price: Option<f64>,
|
||||||
) -> Vec<Candle> {
|
) -> Vec<Candle> {
|
||||||
let empty_candle = Candle::create_empty_candle(market.name.clone(), Resolution::R1m);
|
let empty_candle = Candle::create_empty_candle(market.name.clone(), Resolution::R1m);
|
||||||
|
|
||||||
|
@ -79,13 +77,13 @@ fn combine_fills_into_1m_candles(
|
||||||
let mut candles = vec![empty_candle; minutes as usize];
|
let mut candles = vec![empty_candle; minutes as usize];
|
||||||
|
|
||||||
let mut fills_iter = fills.iter_mut().peekable();
|
let mut fills_iter = fills.iter_mut().peekable();
|
||||||
let mut start_time = st.clone();
|
let mut start_time = st;
|
||||||
let mut end_time = start_time + Duration::minutes(1);
|
let mut end_time = start_time + Duration::minutes(1);
|
||||||
|
|
||||||
let mut last_price = match maybe_last_price {
|
let mut last_price = match maybe_last_price {
|
||||||
Some(p) => p,
|
Some(p) => p,
|
||||||
None => {
|
None => {
|
||||||
let first = fills_iter.peek().clone().unwrap();
|
let first = fills_iter.peek().unwrap();
|
||||||
let (price, _) =
|
let (price, _) =
|
||||||
calculate_fill_price_and_size(**first, market.base_decimals, market.quote_decimals);
|
calculate_fill_price_and_size(**first, market.base_decimals, market.quote_decimals);
|
||||||
price
|
price
|
||||||
|
@ -105,8 +103,8 @@ fn combine_fills_into_1m_candles(
|
||||||
calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals);
|
calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals);
|
||||||
|
|
||||||
candles[i].close = price;
|
candles[i].close = price;
|
||||||
candles[i].low = min(price, candles[i].low);
|
candles[i].low = f64_min(price, candles[i].low);
|
||||||
candles[i].high = max(price, candles[i].high);
|
candles[i].high = f64_max(price, candles[i].high);
|
||||||
candles[i].volume += volume;
|
candles[i].volume += volume;
|
||||||
|
|
||||||
last_price = price;
|
last_price = price;
|
||||||
|
@ -117,8 +115,43 @@ fn combine_fills_into_1m_candles(
|
||||||
candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time);
|
candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time);
|
||||||
|
|
||||||
start_time = end_time;
|
start_time = end_time;
|
||||||
end_time = end_time + Duration::minutes(1);
|
end_time += Duration::minutes(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
candles
|
candles
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Goes from the earliest fill to the most recent. Will mark candles as complete if there are missing gaps of fills between the start and end.
|
||||||
|
pub async fn backfill_batch_1m_candles(
|
||||||
|
pool: &Pool,
|
||||||
|
market: &MarketInfo,
|
||||||
|
) -> anyhow::Result<Vec<Candle>> {
|
||||||
|
let market_name = &market.name;
|
||||||
|
let market_address = &market.address;
|
||||||
|
let mut candles = vec![];
|
||||||
|
|
||||||
|
let earliest_fill = fetch_earliest_fill(pool, &market.address).await?;
|
||||||
|
if earliest_fill.is_none() {
|
||||||
|
println!("No fills found for: {:?}", &market_name);
|
||||||
|
return Ok(candles);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut start_time = earliest_fill
|
||||||
|
.unwrap()
|
||||||
|
.time
|
||||||
|
.duration_trunc(Duration::minutes(1))?;
|
||||||
|
while start_time < Utc::now() {
|
||||||
|
let end_time = min(
|
||||||
|
start_time + day(),
|
||||||
|
Utc::now().duration_trunc(Duration::minutes(1))?,
|
||||||
|
);
|
||||||
|
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
|
||||||
|
if !fills.is_empty() {
|
||||||
|
let mut minute_candles =
|
||||||
|
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
|
||||||
|
candles.append(&mut minute_candles);
|
||||||
|
}
|
||||||
|
start_time += day()
|
||||||
|
}
|
||||||
|
Ok(candles)
|
||||||
|
}
|
||||||
|
|
|
@ -2,7 +2,7 @@ pub mod higher_order_candles;
|
||||||
pub mod minute_candles;
|
pub mod minute_candles;
|
||||||
|
|
||||||
use chrono::Duration;
|
use chrono::Duration;
|
||||||
use sqlx::{pool::PoolConnection, Pool, Postgres};
|
use deadpool_postgres::Pool;
|
||||||
use strum::IntoEnumIterator;
|
use strum::IntoEnumIterator;
|
||||||
use tokio::{sync::mpsc::Sender, time::sleep};
|
use tokio::{sync::mpsc::Sender, time::sleep};
|
||||||
|
|
||||||
|
@ -14,17 +14,17 @@ use crate::{
|
||||||
use self::higher_order_candles::batch_higher_order_candles;
|
use self::higher_order_candles::batch_higher_order_candles;
|
||||||
|
|
||||||
pub async fn batch_for_market(
|
pub async fn batch_for_market(
|
||||||
pool: Pool<Postgres>,
|
pool: &Pool,
|
||||||
candles_sender: &Sender<Vec<Candle>>,
|
candles_sender: &Sender<Vec<Candle>>,
|
||||||
market: &MarketInfo,
|
market: &MarketInfo,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
loop {
|
loop {
|
||||||
let sender = candles_sender.clone();
|
let sender = candles_sender.clone();
|
||||||
let market_clone = market.clone();
|
let market_clone = market.clone();
|
||||||
let mut conn = pool.acquire().await?;
|
// let client = pool.get().await?;
|
||||||
loop {
|
loop {
|
||||||
sleep(Duration::milliseconds(2000).to_std()?).await;
|
sleep(Duration::milliseconds(2000).to_std()?).await;
|
||||||
match batch_inner(&mut conn, &sender, &market_clone).await {
|
match batch_inner(pool, &sender, &market_clone).await {
|
||||||
Ok(_) => {}
|
Ok(_) => {}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
println!(
|
println!(
|
||||||
|
@ -41,26 +41,26 @@ pub async fn batch_for_market(
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn batch_inner(
|
async fn batch_inner(
|
||||||
conn: &mut PoolConnection<Postgres>,
|
pool: &Pool,
|
||||||
candles_sender: &Sender<Vec<Candle>>,
|
candles_sender: &Sender<Vec<Candle>>,
|
||||||
market: &MarketInfo,
|
market: &MarketInfo,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let market_name = &market.name.clone();
|
let market_name = &market.name.clone();
|
||||||
let candles = batch_1m_candles(conn, market).await?;
|
let candles = batch_1m_candles(pool, market).await?;
|
||||||
send_candles(candles, candles_sender).await;
|
send_candles(candles, candles_sender).await;
|
||||||
|
|
||||||
for resolution in Resolution::iter() {
|
for resolution in Resolution::iter() {
|
||||||
if resolution == Resolution::R1m {
|
if resolution == Resolution::R1m {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let candles = batch_higher_order_candles(conn, market_name, resolution).await?;
|
let candles = batch_higher_order_candles(pool, market_name, resolution).await?;
|
||||||
send_candles(candles, candles_sender).await;
|
send_candles(candles, candles_sender).await;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_candles(candles: Vec<Candle>, candles_sender: &Sender<Vec<Candle>>) {
|
async fn send_candles(candles: Vec<Candle>, candles_sender: &Sender<Vec<Candle>>) {
|
||||||
if candles.len() > 0 {
|
if !candles.is_empty() {
|
||||||
if let Err(_) = candles_sender.send(candles).await {
|
if let Err(_) = candles_sender.send(candles).await {
|
||||||
panic!("candles receiver dropped");
|
panic!("candles receiver dropped");
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,20 +1,22 @@
|
||||||
use dotenv;
|
|
||||||
use openbook_candles::database::{
|
|
||||||
initialize::{connect_to_database, setup_database},
|
|
||||||
insert::{persist_candles, persist_fill_events},
|
|
||||||
};
|
|
||||||
use openbook_candles::structs::candle::Candle;
|
use openbook_candles::structs::candle::Candle;
|
||||||
use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
|
use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
|
||||||
use openbook_candles::structs::openbook::OpenBookFillEventLog;
|
use openbook_candles::structs::openbook::OpenBookFillEvent;
|
||||||
use openbook_candles::utils::Config;
|
use openbook_candles::utils::Config;
|
||||||
use openbook_candles::worker::candle_batching::batch_for_market;
|
|
||||||
use openbook_candles::worker::trade_fetching::scrape::scrape;
|
use openbook_candles::worker::trade_fetching::scrape::scrape;
|
||||||
|
use openbook_candles::{
|
||||||
|
database::{
|
||||||
|
initialize::{connect_to_database, setup_database},
|
||||||
|
insert::{persist_candles, persist_fill_events},
|
||||||
|
},
|
||||||
|
worker::candle_batching::batch_for_market,
|
||||||
|
};
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::{collections::HashMap, str::FromStr};
|
use std::{collections::HashMap, str::FromStr};
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
dotenv::dotenv().ok();
|
dotenv::dotenv().ok();
|
||||||
|
|
||||||
|
@ -22,19 +24,12 @@ async fn main() -> anyhow::Result<()> {
|
||||||
assert!(args.len() == 2);
|
assert!(args.len() == 2);
|
||||||
let path_to_markets_json = &args[1];
|
let path_to_markets_json = &args[1];
|
||||||
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
|
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
|
||||||
let database_url: String = dotenv::var("DATABASE_URL").unwrap();
|
|
||||||
let max_pg_pool_connections: u32 = dotenv::var("MAX_PG_POOL_CONNS_WORKER")
|
|
||||||
.unwrap()
|
|
||||||
.parse::<u32>()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let config = Config {
|
let config = Config {
|
||||||
rpc_url: rpc_url.clone(),
|
rpc_url: rpc_url.clone(),
|
||||||
database_url,
|
|
||||||
max_pg_pool_connections,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let markets = load_markets(&path_to_markets_json);
|
let markets = load_markets(path_to_markets_json);
|
||||||
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
|
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
|
||||||
let mut target_markets = HashMap::new();
|
let mut target_markets = HashMap::new();
|
||||||
for m in market_infos.clone() {
|
for m in market_infos.clone() {
|
||||||
|
@ -42,14 +37,14 @@ async fn main() -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
println!("{:?}", target_markets);
|
println!("{:?}", target_markets);
|
||||||
|
|
||||||
let pool = connect_to_database(&config).await?;
|
let pool = connect_to_database().await?;
|
||||||
setup_database(&pool).await?;
|
setup_database(&pool).await?;
|
||||||
let mut handles = vec![];
|
let mut handles = vec![];
|
||||||
|
|
||||||
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEventLog>(1000);
|
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(1000);
|
||||||
|
|
||||||
handles.push(tokio::spawn(async move {
|
handles.push(tokio::spawn(async move {
|
||||||
scrape(&config, &fill_sender, &target_markets).await; //TODO: send the vec, it's okay
|
scrape(&config, &fill_sender, &target_markets).await;
|
||||||
}));
|
}));
|
||||||
|
|
||||||
let fills_pool = pool.clone();
|
let fills_pool = pool.clone();
|
||||||
|
@ -67,7 +62,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let sender = candle_sender.clone();
|
let sender = candle_sender.clone();
|
||||||
let batch_pool = pool.clone();
|
let batch_pool = pool.clone();
|
||||||
handles.push(tokio::spawn(async move {
|
handles.push(tokio::spawn(async move {
|
||||||
batch_for_market(batch_pool, &sender, &market)
|
batch_for_market(&batch_pool, &sender, &market)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
println!("SOMETHING WENT WRONG");
|
println!("SOMETHING WENT WRONG");
|
||||||
|
|
|
@ -5,22 +5,26 @@ use solana_transaction_status::{
|
||||||
};
|
};
|
||||||
use std::{collections::HashMap, io::Error};
|
use std::{collections::HashMap, io::Error};
|
||||||
|
|
||||||
use crate::structs::openbook::OpenBookFillEventLog;
|
use crate::structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw};
|
||||||
|
|
||||||
const PROGRAM_DATA: &str = "Program data: ";
|
const PROGRAM_DATA: &str = "Program data: ";
|
||||||
|
|
||||||
pub fn parse_trades_from_openbook_txns(
|
pub fn parse_trades_from_openbook_txns(
|
||||||
txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>,
|
txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>,
|
||||||
target_markets: &HashMap<Pubkey, u8>,
|
target_markets: &HashMap<Pubkey, u8>,
|
||||||
) -> Vec<OpenBookFillEventLog> {
|
) -> Vec<OpenBookFillEvent> {
|
||||||
let mut fills_vector = Vec::<OpenBookFillEventLog>::new();
|
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
|
||||||
for txn in txns.iter_mut() {
|
for txn in txns.iter_mut() {
|
||||||
match txn {
|
match txn {
|
||||||
Ok(t) => {
|
Ok(t) => {
|
||||||
if let Some(m) = &t.transaction.meta {
|
if let Some(m) = &t.transaction.meta {
|
||||||
match &m.log_messages {
|
match &m.log_messages {
|
||||||
OptionSerializer::Some(logs) => {
|
OptionSerializer::Some(logs) => {
|
||||||
match parse_openbook_fills_from_logs(logs, target_markets) {
|
match parse_openbook_fills_from_logs(
|
||||||
|
logs,
|
||||||
|
target_markets,
|
||||||
|
t.block_time.unwrap(),
|
||||||
|
) {
|
||||||
Some(mut events) => fills_vector.append(&mut events),
|
Some(mut events) => fills_vector.append(&mut events),
|
||||||
None => {}
|
None => {}
|
||||||
}
|
}
|
||||||
|
@ -39,8 +43,9 @@ pub fn parse_trades_from_openbook_txns(
|
||||||
fn parse_openbook_fills_from_logs(
|
fn parse_openbook_fills_from_logs(
|
||||||
logs: &Vec<String>,
|
logs: &Vec<String>,
|
||||||
target_markets: &HashMap<Pubkey, u8>,
|
target_markets: &HashMap<Pubkey, u8>,
|
||||||
) -> Option<Vec<OpenBookFillEventLog>> {
|
block_time: i64,
|
||||||
let mut fills_vector = Vec::<OpenBookFillEventLog>::new();
|
) -> Option<Vec<OpenBookFillEvent>> {
|
||||||
|
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
|
||||||
for l in logs {
|
for l in logs {
|
||||||
match l.strip_prefix(PROGRAM_DATA) {
|
match l.strip_prefix(PROGRAM_DATA) {
|
||||||
Some(log) => {
|
Some(log) => {
|
||||||
|
@ -49,13 +54,14 @@ fn parse_openbook_fills_from_logs(
|
||||||
_ => continue,
|
_ => continue,
|
||||||
};
|
};
|
||||||
let mut slice: &[u8] = &borsh_bytes[8..];
|
let mut slice: &[u8] = &borsh_bytes[8..];
|
||||||
let event: Result<OpenBookFillEventLog, Error> =
|
let event: Result<OpenBookFillEventRaw, Error> =
|
||||||
anchor_lang::AnchorDeserialize::deserialize(&mut slice);
|
anchor_lang::AnchorDeserialize::deserialize(&mut slice);
|
||||||
|
|
||||||
match event {
|
match event {
|
||||||
Ok(e) => {
|
Ok(e) => {
|
||||||
if target_markets.contains_key(&e.market) {
|
let fill_event = e.with_time(block_time);
|
||||||
fills_vector.push(e);
|
if target_markets.contains_key(&fill_event.market) {
|
||||||
|
fills_vector.push(fill_event);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => continue,
|
_ => continue,
|
||||||
|
@ -65,9 +71,9 @@ fn parse_openbook_fills_from_logs(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if fills_vector.len() > 0 {
|
if !fills_vector.is_empty() {
|
||||||
return Some(fills_vector);
|
Some(fills_vector)
|
||||||
} else {
|
} else {
|
||||||
return None;
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,13 +8,13 @@ use solana_transaction_status::UiTransactionEncoding;
|
||||||
use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
|
use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
|
|
||||||
use crate::{structs::openbook::OpenBookFillEventLog, utils::Config};
|
use crate::{structs::openbook::OpenBookFillEvent, utils::Config};
|
||||||
|
|
||||||
use super::parsing::parse_trades_from_openbook_txns;
|
use super::parsing::parse_trades_from_openbook_txns;
|
||||||
|
|
||||||
pub async fn scrape(
|
pub async fn scrape(
|
||||||
config: &Config,
|
config: &Config,
|
||||||
fill_sender: &Sender<OpenBookFillEventLog>,
|
fill_sender: &Sender<OpenBookFillEvent>,
|
||||||
target_markets: &HashMap<Pubkey, u8>,
|
target_markets: &HashMap<Pubkey, u8>,
|
||||||
) {
|
) {
|
||||||
let rpc_client =
|
let rpc_client =
|
||||||
|
@ -38,7 +38,7 @@ pub async fn scrape_transactions(
|
||||||
rpc_client: &RpcClient,
|
rpc_client: &RpcClient,
|
||||||
before_sig: Option<Signature>,
|
before_sig: Option<Signature>,
|
||||||
limit: Option<usize>,
|
limit: Option<usize>,
|
||||||
fill_sender: &Sender<OpenBookFillEventLog>,
|
fill_sender: &Sender<OpenBookFillEvent>,
|
||||||
target_markets: &HashMap<Pubkey, u8>,
|
target_markets: &HashMap<Pubkey, u8>,
|
||||||
) -> Option<Signature> {
|
) -> Option<Signature> {
|
||||||
let rpc_config = GetConfirmedSignaturesForAddress2Config {
|
let rpc_config = GetConfirmedSignaturesForAddress2Config {
|
||||||
|
@ -62,12 +62,12 @@ pub async fn scrape_transactions(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if sigs.len() == 0 {
|
if sigs.is_empty() {
|
||||||
println!("No signatures found");
|
println!("No signatures found");
|
||||||
return before_sig;
|
return before_sig;
|
||||||
}
|
}
|
||||||
|
|
||||||
let last = sigs.last().clone().unwrap();
|
let last = sigs.last().unwrap();
|
||||||
let request_last_sig = Signature::from_str(&last.signature).unwrap();
|
let request_last_sig = Signature::from_str(&last.signature).unwrap();
|
||||||
|
|
||||||
sigs.retain(|sig| sig.err.is_none());
|
sigs.retain(|sig| sig.err.is_none());
|
||||||
|
@ -88,13 +88,13 @@ pub async fn scrape_transactions(
|
||||||
|
|
||||||
let txn_futs: Vec<_> = signatures
|
let txn_futs: Vec<_> = signatures
|
||||||
.iter()
|
.iter()
|
||||||
.map(|s| rpc_client.get_transaction_with_config(&s, txn_config))
|
.map(|s| rpc_client.get_transaction_with_config(s, txn_config))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let mut txns = join_all(txn_futs).await;
|
let mut txns = join_all(txn_futs).await;
|
||||||
|
|
||||||
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
|
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
|
||||||
if fills.len() > 0 {
|
if !fills.is_empty() {
|
||||||
for fill in fills.into_iter() {
|
for fill in fills.into_iter() {
|
||||||
if let Err(_) = fill_sender.send(fill).await {
|
if let Err(_) = fill_sender.send(fill).await {
|
||||||
panic!("receiver dropped");
|
panic!("receiver dropped");
|
||||||
|
|
Loading…
Reference in New Issue