diff --git a/Cargo.lock b/Cargo.lock index 7829e5a..976f058 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2021,9 +2021,9 @@ checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" [[package]] name = "futures" -version = "0.3.25" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0" +checksum = "531ac96c6ff5fd7c62263c5e3c67a603af4fcaee2e1a0ae5565ba3a11e69e549" dependencies = [ "futures-channel", "futures-core", @@ -2036,9 +2036,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.25" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed" +checksum = "164713a5a0dcc3e7b4b1ed7d3b433cabc18025386f9339346e8daf15963cf7ac" dependencies = [ "futures-core", "futures-sink", @@ -2046,15 +2046,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.25" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac" +checksum = "86d7a0c1aa76363dac491de0ee99faf6941128376f1cf96f07db7603b7de69dd" [[package]] name = "futures-executor" -version = "0.3.25" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7acc85df6714c176ab5edf386123fafe217be88c0840ec11f199441134a074e2" +checksum = "1997dd9df74cdac935c76252744c1ed5794fac083242ea4fe77ef3ed60ba0f83" dependencies = [ "futures-core", "futures-task", @@ -2075,15 +2075,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.25" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" +checksum = "89d422fa3cbe3b40dca574ab087abb5bc98258ea57eea3fd6f1fa7162c778b91" [[package]] name = "futures-macro" -version = "0.3.25" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d" +checksum = "3eb14ed937631bd8b8b8977f2c198443447a8355b6e3ca599f38c975e5a963b6" dependencies = [ "proc-macro2 1.0.50", "quote 1.0.23", @@ -2092,21 +2092,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.25" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9" +checksum = "ec93083a4aecafb2a80a885c9de1f0ccae9dbd32c2bb54b0c3a65690e0b8d2f2" [[package]] name = "futures-task" -version = "0.3.25" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea" +checksum = "fd65540d33b37b16542a0438c12e6aeead10d4ac5d05bd3f805b8f35ab592879" [[package]] name = "futures-util" -version = "0.3.25" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6" +checksum = "3ef6b17e481503ec85211fed8f39d1970f128935ca1f814cd32ac4a6842e84ab" dependencies = [ "futures 0.1.31", "futures-channel", @@ -2203,7 +2203,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8af59a261bcf42f45d1b261232847b9b850ba0a1419d6100698246fb66e9240" dependencies = [ "arc-swap", - "futures 0.3.25", + "futures 0.3.27", "log 0.4.17", "reqwest", "serde", @@ -2495,7 +2495,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca815a891b24fdfb243fa3239c86154392b0953ee584aa1a2a1f66d20cbe75cc" dependencies = [ "bytes 1.3.0", - "futures 0.3.25", + "futures 0.3.27", "headers", "http", "hyper 0.14.23", @@ -2735,7 +2735,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2b99d4207e2a04fb4581746903c2bb7eb376f88de9c699d0f3e10feeac0cd3a" dependencies = [ "derive_more", - "futures 0.3.25", + "futures 0.3.27", "hyper 0.14.23", "jsonrpc-core", "jsonrpc-pubsub", @@ -2753,7 +2753,7 @@ version = "18.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14f7f76aef2d054868398427f6c54943cf3d1caa9a7ec7d0c38d69df97a965eb" dependencies = [ - "futures 0.3.25", + "futures 0.3.27", "futures-executor", "futures-util", "log 0.4.17", @@ -2768,7 +2768,7 @@ version = "18.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b51da17abecbdab3e3d4f26b01c5ec075e88d3abe3ab3b05dc9aa69392764ec0" dependencies = [ - "futures 0.3.25", + "futures 0.3.27", "jsonrpc-client-transports", ] @@ -2790,7 +2790,7 @@ version = "18.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1dea6e07251d9ce6a552abfb5d7ad6bc290a4596c8dcc3d795fae2bbdc1f3ff" dependencies = [ - "futures 0.3.25", + "futures 0.3.27", "hyper 0.14.23", "jsonrpc-core", "jsonrpc-server-utils", @@ -2806,7 +2806,7 @@ version = "18.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240f87695e6c6f62fb37f05c02c04953cf68d6408b8c1c89de85c7a0125b1011" dependencies = [ - "futures 0.3.25", + "futures 0.3.27", "jsonrpc-core", "lazy_static", "log 0.4.17", @@ -2822,7 +2822,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa4fdea130485b572c39a460d50888beb00afb3e35de23ccd7fad8ff19f0e0d4" dependencies = [ "bytes 1.3.0", - "futures 0.3.25", + "futures 0.3.27", "globset", "jsonrpc-core", "lazy_static", @@ -3501,6 +3501,7 @@ dependencies = [ "derive_more", "dotenv", "env_logger 0.10.0", + "futures 0.3.27", "jsonrpc-core-client", "log 0.4.17", "num-traits", @@ -5058,7 +5059,7 @@ checksum = "41d1c5305e39e09653383c2c7244f2f78b3bcae37cf50c64cb4789c9f5096ec2" dependencies = [ "base64 0.13.1", "bytes 1.3.0", - "futures 0.3.25", + "futures 0.3.27", "httparse", "log 0.4.17", "rand 0.8.5", @@ -5213,7 +5214,7 @@ dependencies = [ "clap 2.34.0", "crossbeam-channel", "enum_dispatch", - "futures 0.3.25", + "futures 0.3.27", "futures-util", "indexmap", "indicatif", @@ -5429,7 +5430,7 @@ dependencies = [ "crossbeam-channel", "dashmap", "fs_extra", - "futures 0.3.25", + "futures 0.3.27", "itertools", "lazy_static", "libc", @@ -5927,7 +5928,7 @@ dependencies = [ "bzip2", "enum-iterator", "flate2", - "futures 0.3.25", + "futures 0.3.27", "goauth", "http", "hyper 0.14.23", diff --git a/Cargo.toml b/Cargo.toml index 8b64add..a06502a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ path = "src/server/main.rs" [dependencies] tokio = { version = "1", features = ["full"] } tokio-stream = "0.1" +futures = "0.3.27" jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] } diff --git a/sqlx-data.json b/sqlx-data.json index c38c424..d14a6f9 100644 --- a/sqlx-data.json +++ b/sqlx-data.json @@ -20,24 +20,6 @@ }, "query": "CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)" }, - "63fd83accf07c969461c66ca26a28f506dbbc2e8c0b7d74d9a06ccf52aa1b8b6": { - "describe": { - "columns": [ - { - "name": "total", - "ordinal": 0, - "type_info": "Int8" - } - ], - "nullable": [ - null - ], - "parameters": { - "Left": [] - } - }, - "query": "Select COUNT(*) as total from fills" - }, "6658c0121e5a7defbd1fe7c549ca0a957b188b9eb1837573a05d0e6476ef945a": { "describe": { "columns": [], @@ -349,6 +331,40 @@ }, "query": "SELECT \n time as \"time!\",\n bid as \"bid!\",\n maker as \"maker!\",\n native_qty_paid as \"native_qty_paid!\",\n native_qty_received as \"native_qty_received!\",\n native_fee_or_rebate as \"native_fee_or_rebate!\" \n from fills \n where market = $1\n and time >= $2\n and time < $3 \n and maker = true\n ORDER BY time asc" }, + "dc367af7bb8da9d57033833da06567f3f3cfdcff72e5b140ccd3355f91b4c5a7": { + "describe": { + "columns": [ + { + "name": "open_orders_owner", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "raw_ask_size!", + "ordinal": 1, + "type_info": "Numeric" + }, + { + "name": "raw_bid_size!", + "ordinal": 2, + "type_info": "Numeric" + } + ], + "nullable": [ + false, + null, + null + ], + "parameters": { + "Left": [ + "Text", + "Timestamptz", + "Timestamptz" + ] + } + }, + "query": "SELECT \n open_orders_owner, \n sum(\n native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END\n ) as \"raw_ask_size!\",\n sum(\n native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END\n ) as \"raw_bid_size!\"\n FROM public.\"fills\"\n WHERE market = $1\n AND time >= $2\n AND time < $3\n GROUP BY open_orders_owner\n ORDER BY \n sum(native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END) \n + \n sum(native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END) \nDESC " + }, "dc7c7c04b6870b9617e1e869aa4b7027baddaeeb22f2792f2e9c40f643f863c7": { "describe": { "columns": [ diff --git a/src/candle_creation/candle_batching/higher_order_candles.rs b/src/candle_creation/candle_batching/higher_order_candles.rs index de79133..effe564 100644 --- a/src/candle_creation/candle_batching/higher_order_candles.rs +++ b/src/candle_creation/candle_batching/higher_order_candles.rs @@ -4,10 +4,11 @@ use sqlx::{types::Decimal, Pool, Postgres}; use std::cmp::{max, min}; use crate::{ - database::{ - fetch::{fetch_candles_from, fetch_earliest_candle, fetch_latest_finished_candle}, - Candle, - }, structs::resolution::{day, Resolution}, + database::fetch::{fetch_candles_from, fetch_earliest_candle, fetch_latest_finished_candle}, + structs::{ + candle::Candle, + resolution::{day, Resolution}, + }, }; pub async fn batch_higher_order_candles( diff --git a/src/candle_creation/candle_batching/minute_candles.rs b/src/candle_creation/candle_batching/minute_candles.rs index 8736720..80be14f 100644 --- a/src/candle_creation/candle_batching/minute_candles.rs +++ b/src/candle_creation/candle_batching/minute_candles.rs @@ -4,10 +4,15 @@ use chrono::{DateTime, Duration, DurationRound, Utc}; use num_traits::{FromPrimitive, Zero}; use sqlx::{types::Decimal, Pool, Postgres}; -use crate::{database::{ - fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle}, - Candle, PgOpenBookFill, -}, structs::{markets::MarketInfo, resolution::{Resolution, day}}}; +use crate::{ + database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle}, + structs::{ + candle::Candle, + markets::MarketInfo, + openbook::{calculate_fill_price_and_size, PgOpenBookFill}, + resolution::{day, Resolution}, + }, +}; pub async fn batch_1m_candles( pool: &Pool, @@ -106,35 +111,3 @@ fn combine_fills_into_1m_candles( candles } - -fn calculate_fill_price_and_size( - fill: PgOpenBookFill, - base_decimals: u8, - quote_decimals: u8, -) -> (Decimal, Decimal) { - if fill.bid { - let price_before_fees = if fill.maker { - fill.native_qty_paid + fill.native_fee_or_rebate - } else { - fill.native_qty_paid - fill.native_fee_or_rebate - }; - let price = (price_before_fees * token_factor(base_decimals)) - / (token_factor(quote_decimals) * fill.native_qty_received); - let size = fill.native_qty_received / token_factor(base_decimals); - (price, size) - } else { - let price_before_fees = if fill.maker { - fill.native_qty_received - fill.native_fee_or_rebate - } else { - fill.native_qty_received + fill.native_fee_or_rebate - }; - let price = (price_before_fees * token_factor(base_decimals)) - / (token_factor(quote_decimals) * fill.native_qty_paid); - let size = fill.native_qty_paid / token_factor(base_decimals); - (price, size) - } -} - -fn token_factor(decimals: u8) -> Decimal { - Decimal::from_u64(10u64.pow(decimals as u32)).unwrap() -} diff --git a/src/candle_creation/candle_batching/mod.rs b/src/candle_creation/candle_batching/mod.rs index 4f0f81d..231fb8d 100644 --- a/src/candle_creation/candle_batching/mod.rs +++ b/src/candle_creation/candle_batching/mod.rs @@ -8,13 +8,11 @@ use tokio::{sync::mpsc::Sender, time::sleep}; use crate::{ candle_creation::candle_batching::minute_candles::batch_1m_candles, - database::{Candle}, structs::{markets::MarketInfo, resolution::Resolution}, + structs::{candle::Candle, markets::MarketInfo, resolution::Resolution}, }; use self::higher_order_candles::batch_higher_order_candles; - - pub async fn batch_candles( pool: Pool, candles_sender: &Sender>, diff --git a/src/candle_creation/main.rs b/src/candle_creation/main.rs index 94791f4..a41b362 100644 --- a/src/candle_creation/main.rs +++ b/src/candle_creation/main.rs @@ -7,11 +7,11 @@ use openbook_candles::candle_creation::trade_fetching::{ use openbook_candles::database::{ initialize::{connect_to_database, setup_database}, insert::{persist_candles, persist_fill_events}, - Candle, }; +use openbook_candles::structs::candle::Candle; use openbook_candles::structs::markets::load_markets; use openbook_candles::structs::openbook::OpenBookFillEventLog; -use openbook_candles::utils::{Config}; +use openbook_candles::utils::Config; use solana_sdk::pubkey::Pubkey; use std::{collections::HashMap, str::FromStr}; use tokio::sync::mpsc; @@ -22,7 +22,10 @@ async fn main() -> anyhow::Result<()> { let rpc_url: String = dotenv::var("RPC_URL").unwrap(); let database_url: String = dotenv::var("DATABASE_URL").unwrap(); - let max_pg_pool_connections: u32 = dotenv::var("MAX_PG_POOL_CONNS_WORKER").unwrap().parse::().unwrap(); + let max_pg_pool_connections: u32 = dotenv::var("MAX_PG_POOL_CONNS_WORKER") + .unwrap() + .parse::() + .unwrap(); let config = Config { rpc_url: rpc_url.clone(), @@ -43,11 +46,11 @@ async fn main() -> anyhow::Result<()> { let (fill_sender, fill_receiver) = mpsc::channel::(1000); - let bf_sender = fill_sender.clone(); - let targets = target_markets.clone(); - tokio::spawn(async move { - backfill(&rpc_url.clone(), &bf_sender, &targets).await; - }); + // let bf_sender = fill_sender.clone(); + // let targets = target_markets.clone(); + // tokio::spawn(async move { + // backfill(&rpc_url.clone(), &bf_sender, &targets).await; + // }); tokio::spawn(async move { scrape(&config, &fill_sender, &target_markets).await; //TODO: send the vec, it's okay diff --git a/src/candle_creation/trade_fetching/backfill.rs b/src/candle_creation/trade_fetching/backfill.rs index c027103..634ced8 100644 --- a/src/candle_creation/trade_fetching/backfill.rs +++ b/src/candle_creation/trade_fetching/backfill.rs @@ -1,14 +1,17 @@ use chrono::{DateTime, Duration, NaiveDateTime, Utc}; -use solana_client::{rpc_client::RpcClient, rpc_config::RpcTransactionConfig}; +use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcTransactionConfig}; use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature}; use solana_transaction_status::UiTransactionEncoding; use std::collections::HashMap; use tokio::sync::mpsc::Sender; -use crate::{candle_creation::trade_fetching::scrape::scrape_transactions, structs::openbook::OpenBookFillEventLog}; +use crate::{ + candle_creation::trade_fetching::scrape::scrape_transactions, + structs::openbook::OpenBookFillEventLog, +}; pub async fn backfill( - rpc_url: &String, + rpc_url: String, fill_sender: &Sender, target_markets: &HashMap, ) { @@ -31,9 +34,12 @@ pub async fn backfill( commitment: Some(CommitmentConfig::confirmed()), max_supported_transaction_version: Some(0), }; - match rpc_client.get_transaction_with_config(&last_sig_option.unwrap(), txn_config) { + match rpc_client + .get_transaction_with_config(&last_sig_option.unwrap(), txn_config) + .await + { Ok(txn) => { - let unix_sig_time = rpc_client.get_block_time(txn.slot).unwrap(); + let unix_sig_time = rpc_client.get_block_time(txn.slot).await.unwrap(); if unix_sig_time < end_time { break; } diff --git a/src/candle_creation/trade_fetching/scrape.rs b/src/candle_creation/trade_fetching/scrape.rs index cabd706..fc174f1 100644 --- a/src/candle_creation/trade_fetching/scrape.rs +++ b/src/candle_creation/trade_fetching/scrape.rs @@ -1,8 +1,10 @@ use anchor_lang::AnchorDeserialize; +use futures::future::join_all; use solana_account_decoder::UiAccountEncoding; use solana_client::{ client_error::Result as ClientResult, - rpc_client::{GetConfirmedSignaturesForAddress2Config, RpcClient}, + nonblocking::rpc_client::RpcClient, + rpc_client::GetConfirmedSignaturesForAddress2Config, rpc_config::{RpcAccountInfoConfig, RpcTransactionConfig}, }; use solana_sdk::{ @@ -14,18 +16,22 @@ use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; use tokio::sync::mpsc::Sender; use crate::{ - utils::{Config}, structs::{openbook::{OpenBookFillEventLog, MarketState}, markets::{MarketInfo, MarketConfig}}, + structs::{ + markets::{MarketConfig, MarketInfo}, + openbook::{MarketState, OpenBookFillEventLog}, + }, + utils::Config, }; -use super::parsing::{parse_trades_from_openbook_txns}; +use super::parsing::parse_trades_from_openbook_txns; pub async fn scrape( config: &Config, fill_sender: &Sender, target_markets: &HashMap, ) { - let url = &config.rpc_url; - let rpc_client = RpcClient::new_with_commitment(url, CommitmentConfig::processed()); + let rpc_client = + RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::processed()); let before_slot = None; loop { @@ -55,10 +61,13 @@ pub async fn scrape_transactions( commitment: Some(CommitmentConfig::confirmed()), }; - let mut sigs = match rpc_client.get_signatures_for_address_with_config( - &Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(), - rpc_config, - ) { + let mut sigs = match rpc_client + .get_signatures_for_address_with_config( + &Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(), + rpc_config, + ) + .await + { Ok(s) => s, Err(e) => { println!("Error in get_signatures_for_address_with_config: {}", e); @@ -85,15 +94,17 @@ pub async fn scrape_transactions( max_supported_transaction_version: Some(0), }; - let mut txns = sigs + let signatures: Vec<_> = sigs .into_iter() - .map(|sig| { - rpc_client.get_transaction_with_config( - &sig.signature.parse::().unwrap(), - txn_config, - ) - }) - .collect::>>(); // TODO: am I actually getting all the txns? + .map(|sig| sig.signature.parse::().unwrap()) + .collect(); + + let txn_futs: Vec<_> = signatures + .iter() + .map(|s| rpc_client.get_transaction_with_config(&s, txn_config)) + .collect(); + + let mut txns = join_all(txn_futs).await; let fills = parse_trades_from_openbook_txns(&mut txns, target_markets); if fills.len() > 0 { @@ -111,8 +122,8 @@ pub async fn fetch_market_infos( config: &Config, markets: Vec, ) -> anyhow::Result> { - let url = &config.rpc_url; - let rpc_client = RpcClient::new_with_commitment(url, CommitmentConfig::processed()); + let rpc_client = + RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::processed()); let rpc_config = RpcAccountInfoConfig { encoding: Some(UiAccountEncoding::Base64), @@ -127,7 +138,7 @@ pub async fn fetch_market_infos( .collect::>(); let mut market_results = rpc_client .get_multiple_accounts_with_config(&market_keys, rpc_config.clone()) - .unwrap() + .await? .value; let mut mint_key_map = HashMap::new(); @@ -171,7 +182,7 @@ pub async fn fetch_market_infos( let mint_results = rpc_client .get_multiple_accounts_with_config(&mint_keys, rpc_config) - .unwrap() + .await? .value; for i in 0..mint_results.len() { let mut mint_account = mint_results[i].as_ref().unwrap().clone(); diff --git a/src/database/fetch.rs b/src/database/fetch.rs index 92c3a87..9d41282 100644 --- a/src/database/fetch.rs +++ b/src/database/fetch.rs @@ -1,9 +1,14 @@ use chrono::{DateTime, Utc}; use sqlx::{Pool, Postgres}; -use crate::{database::PgOpenBookFill, utils::AnyhowWrap}; - -use super::{Candle, Resolution}; +use crate::{ + structs::{ + candle::Candle, + openbook::{PgOpenBookFill, PgTrader}, + resolution::Resolution, + }, + utils::AnyhowWrap, +}; pub async fn fetch_earliest_fill( pool: &Pool, @@ -192,3 +197,40 @@ pub async fn fetch_tradingview_candles( .await .map_err_anyhow() } + +pub async fn fetch_top_traders_by_volume_from( + pool: &Pool, + market_address_string: &str, + start_time: DateTime, + end_time: DateTime, +) -> anyhow::Result> { + sqlx::query_as!( + PgTrader, + r#"SELECT + open_orders_owner, + sum( + native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END + ) as "raw_ask_size!", + sum( + native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END + ) as "raw_bid_size!" + FROM public."fills" + WHERE market = $1 + AND time >= $2 + AND time < $3 + GROUP BY open_orders_owner + ORDER BY + sum(native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END) + + + sum(native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END) +DESC "#, + market_address_string, + start_time, + end_time + ) + .fetch_all(pool) + .await + .map_err_anyhow() +} + +// pub async fn fetch_traders_above_x_dollars diff --git a/src/database/insert.rs b/src/database/insert.rs index a903790..b98dc2d 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -7,9 +7,10 @@ use std::{ }; use tokio::sync::mpsc::{error::TryRecvError, Receiver}; -use crate::{utils::AnyhowWrap, structs::openbook::OpenBookFillEventLog}; - -use super::Candle; +use crate::{ + structs::{candle::Candle, openbook::OpenBookFillEventLog}, + utils::AnyhowWrap, +}; pub async fn persist_fill_events( pool: &Pool, diff --git a/src/database/mod.rs b/src/database/mod.rs index c21842a..1d33d3a 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,56 +1,3 @@ -use chrono::{DateTime, NaiveDateTime, Utc}; -use num_traits::Zero; -use sqlx::types::Decimal; - -use crate::structs::resolution::Resolution; - pub mod fetch; pub mod initialize; pub mod insert; - -pub trait Summary { - fn summarize(&self) -> String; -} - -#[derive(Clone, Debug)] -pub struct Candle { - pub market_name: String, - pub start_time: DateTime, - pub end_time: DateTime, - pub resolution: String, - pub open: Decimal, - pub close: Decimal, - pub high: Decimal, - pub low: Decimal, - pub volume: Decimal, - pub complete: bool, -} - -impl Candle { - pub fn create_empty_candle(market_name: String, resolution: Resolution) -> Candle { - Candle { - market_name, - start_time: DateTime::from_utc(NaiveDateTime::MIN, Utc), - end_time: DateTime::from_utc(NaiveDateTime::MIN, Utc), - resolution: resolution.to_string(), - open: Decimal::zero(), - close: Decimal::zero(), - high: Decimal::zero(), - low: Decimal::zero(), - volume: Decimal::zero(), - complete: false, - } - } -} - -#[derive(Copy, Clone, Debug, PartialEq)] -pub struct PgOpenBookFill { - pub time: DateTime, - pub bid: bool, - pub maker: bool, - pub native_qty_paid: Decimal, - pub native_qty_received: Decimal, - pub native_fee_or_rebate: Decimal, -} - - diff --git a/src/lib.rs b/src/lib.rs index 7a8bb72..04fe6a5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ pub mod candle_creation; pub mod database; -pub mod utils; pub mod structs; +pub mod utils; diff --git a/src/server/candles.rs b/src/server/candles.rs index 75ff5f9..1773c2e 100644 --- a/src/server/candles.rs +++ b/src/server/candles.rs @@ -1,10 +1,17 @@ -use chrono::{Utc, NaiveDateTime}; -use openbook_candles::{utils::WebContext, database::{fetch::fetch_tradingview_candles}, structs::{resolution::Resolution, markets::MarketInfo, tradingview::TvResponse}}; +use openbook_candles::{ + database::fetch::fetch_tradingview_candles, + structs::{ + markets::{valid_market, MarketInfo}, + resolution::Resolution, + tradingview::TvResponse, + }, + utils::{to_timestampz, WebContext}, +}; use crate::server_error::ServerError; use { - actix_web::{get, web, HttpResponse, Scope}, + actix_web::{get, web, HttpResponse}, serde::Deserialize, }; @@ -16,11 +23,6 @@ pub struct Params { pub resolution: String, } -pub fn service() -> Scope { - web::scope("/tradingview") - .service(get_candles) -} - #[get("/candles")] pub async fn get_candles( info: web::Query, @@ -36,18 +38,13 @@ pub async fn get_candles( let from = to_timestampz(info.from); let to = to_timestampz(info.to); - let candles = match fetch_tradingview_candles(&context.pool, &info.market_name, resolution, from, to).await { - Ok(c) => c, - Err(_) => return Err(ServerError::DbQueryError) - }; + let candles = + match fetch_tradingview_candles(&context.pool, &info.market_name, resolution, from, to) + .await + { + Ok(c) => c, + Err(_) => return Err(ServerError::DbQueryError), + }; Ok(HttpResponse::Ok().json(TvResponse::candles_to_tv(candles))) } - -fn valid_market(market_name: &str, markets: &Vec) -> bool { - markets.iter().any(|x| x.name == market_name) -} - -fn to_timestampz(seconds: u64) -> chrono::DateTime { - chrono::DateTime::::from_utc(NaiveDateTime::from_timestamp(seconds as i64, 0), Utc) -} \ No newline at end of file diff --git a/src/server/main.rs b/src/server/main.rs index d1ea3af..367522f 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -4,32 +4,21 @@ use actix_web::{ web::{self, Data}, App, HttpResponse, HttpServer, Responder, }; +use candles::get_candles; use dotenv; -use openbook_candles::{utils::{Config, WebContext}, candle_creation::trade_fetching::scrape::fetch_market_infos, database::initialize::connect_to_database, structs::markets::load_markets}; +use openbook_candles::{ + candle_creation::trade_fetching::scrape::fetch_market_infos, + database::initialize::connect_to_database, + structs::markets::load_markets, + utils::{Config, WebContext}, +}; use sqlx::{Pool, Postgres}; +use traders::get_top_traders_by_base_volume; mod candles; mod server_error; +mod traders; -#[get("/")] -async fn hello() -> impl Responder { - HttpResponse::Ok().body("Hello world!") -} - -#[get("/trade-count")] -async fn get_total_trades(pool_data: web::Data>) -> impl Responder { - let pool = pool_data.get_ref(); - let total_query = sqlx::query!("Select COUNT(*) as total from fills") - .fetch_one(pool) - .await - .unwrap(); - let total_trades: i64 = total_query.total.unwrap_or_else(|| 0); - HttpResponse::Ok().json(total_trades) -} - -async fn manual_hello() -> impl Responder { - HttpResponse::Ok().body("Hey there!") -} #[actix_web::main] async fn main() -> std::io::Result<()> { @@ -38,7 +27,10 @@ async fn main() -> std::io::Result<()> { let rpc_url: String = dotenv::var("RPC_URL").unwrap(); let database_url: String = dotenv::var("DATABASE_URL").unwrap(); - let max_pg_pool_connections: u32 = dotenv::var("MAX_PG_POOL_CONNS_SERVER").unwrap().parse::().unwrap(); + let max_pg_pool_connections: u32 = dotenv::var("MAX_PG_POOL_CONNS_SERVER") + .unwrap() + .parse::() + .unwrap(); let config = Config { rpc_url: rpc_url.clone(), @@ -55,12 +47,17 @@ async fn main() -> std::io::Result<()> { markets: market_infos, }); + println!("Starting server"); HttpServer::new(move || { App::new() .wrap(Logger::default()) - .app_data(Data::new(context.clone())) - .service(candles::service()) - .route("/hey", web::get().to(manual_hello)) + .app_data(context.clone()) + .service( + web::scope("/api") + .service(get_candles) + .service(get_top_traders_by_base_volume) + // .service(get_top_traders_by_quote_volume) + ) }) .bind(("127.0.0.1", 8080))? .run() diff --git a/src/server/mod.rs b/src/server/mod.rs index 7a0707b..c49c73f 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1 +1,2 @@ -pub mod candles; \ No newline at end of file +pub mod candles; +pub mod traders; \ No newline at end of file diff --git a/src/server/server_error.rs b/src/server/server_error.rs index 010d9bd..145bbff 100644 --- a/src/server/server_error.rs +++ b/src/server/server_error.rs @@ -17,8 +17,8 @@ pub enum ServerError { DbQueryError, #[display(fmt = "Error getting connection")] DbPoolError, - #[display(fmt = "Raw market not found")] - RawMarketNotFound, + #[display(fmt = "Market not found")] + MarketNotFound, #[display(fmt = "Request symbol not found")] SymbolNotFound, } @@ -37,7 +37,7 @@ impl error::ResponseError for ServerError { ServerError::WrongResolution => StatusCode::BAD_REQUEST, ServerError::DbQueryError => StatusCode::INTERNAL_SERVER_ERROR, ServerError::DbPoolError => StatusCode::INTERNAL_SERVER_ERROR, - ServerError::RawMarketNotFound => StatusCode::BAD_REQUEST, + ServerError::MarketNotFound => StatusCode::BAD_REQUEST, ServerError::SymbolNotFound => StatusCode::BAD_REQUEST, } } @@ -47,4 +47,4 @@ impl From for std::io::Error { fn from(e: ServerError) -> Self { std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) } -} \ No newline at end of file +} diff --git a/src/server/traders.rs b/src/server/traders.rs new file mode 100644 index 0000000..512a674 --- /dev/null +++ b/src/server/traders.rs @@ -0,0 +1,47 @@ +use crate::server_error::ServerError; +use openbook_candles::{ + database::fetch::fetch_top_traders_by_volume_from, + structs::openbook::{calculate_trader_volume, Trader}, + utils::{to_timestampz, WebContext}, +}; +use { + actix_web::{get, web, HttpResponse}, + serde::Deserialize, +}; + +#[derive(Debug, Deserialize)] +pub struct Params { + pub market_name: String, + pub from: u64, + pub to: u64, +} + +#[get("/traders/base-volume")] +pub async fn get_top_traders_by_base_volume( + info: web::Query, + context: web::Data, +) -> Result { + let selected_market = context.markets.iter().find(|x| x.name == info.market_name); + if selected_market.is_none() { + return Err(ServerError::MarketNotFound); + } + let selected_market = selected_market.unwrap(); + let from = to_timestampz(info.from); + let to = to_timestampz(info.to); + + let raw_traders = + match fetch_top_traders_by_volume_from(&context.pool, &selected_market.address, from, to) + .await + { + Ok(c) => c, + Err(_) => return Err(ServerError::DbQueryError), + }; + + let traders = raw_traders + .into_iter() + .map(|t| calculate_trader_volume(t, selected_market.base_decimals)) + .collect::>(); + + // TODO: add start and end in response? + Ok(HttpResponse::Ok().json(traders)) +} diff --git a/src/structs/candle.rs b/src/structs/candle.rs new file mode 100644 index 0000000..be6ffb7 --- /dev/null +++ b/src/structs/candle.rs @@ -0,0 +1,36 @@ +use chrono::{DateTime, NaiveDateTime, Utc}; +use num_traits::Zero; +use sqlx::types::Decimal; + +use super::resolution::Resolution; + +#[derive(Clone, Debug)] +pub struct Candle { + pub market_name: String, + pub start_time: DateTime, + pub end_time: DateTime, + pub resolution: String, + pub open: Decimal, + pub close: Decimal, + pub high: Decimal, + pub low: Decimal, + pub volume: Decimal, + pub complete: bool, +} + +impl Candle { + pub fn create_empty_candle(market_name: String, resolution: Resolution) -> Candle { + Candle { + market_name, + start_time: DateTime::from_utc(NaiveDateTime::MIN, Utc), + end_time: DateTime::from_utc(NaiveDateTime::MIN, Utc), + resolution: resolution.to_string(), + open: Decimal::zero(), + close: Decimal::zero(), + high: Decimal::zero(), + low: Decimal::zero(), + volume: Decimal::zero(), + complete: false, + } + } +} diff --git a/src/structs/markets.rs b/src/structs/markets.rs index 9c5f650..892fd9b 100644 --- a/src/structs/markets.rs +++ b/src/structs/markets.rs @@ -1,5 +1,5 @@ -use std::fs::File; use serde::Deserialize; +use std::fs::File; #[derive(Debug, Clone)] pub struct MarketInfo { @@ -22,4 +22,8 @@ pub struct MarketConfig { pub fn load_markets(path: &str) -> Vec { let reader = File::open(path).unwrap(); serde_json::from_reader(reader).unwrap() -} \ No newline at end of file +} + +pub fn valid_market(market_name: &str, markets: &Vec) -> bool { + markets.iter().any(|x| x.name == market_name) +} diff --git a/src/structs/mod.rs b/src/structs/mod.rs index 88dca2b..a1fd60e 100644 --- a/src/structs/mod.rs +++ b/src/structs/mod.rs @@ -1,4 +1,5 @@ +pub mod candle; pub mod markets; +pub mod openbook; pub mod resolution; pub mod tradingview; -pub mod openbook; \ No newline at end of file diff --git a/src/structs/openbook.rs b/src/structs/openbook.rs index c77c69a..b3a7abf 100644 --- a/src/structs/openbook.rs +++ b/src/structs/openbook.rs @@ -1,5 +1,9 @@ use anchor_lang::{event, AnchorDeserialize, AnchorSerialize}; +use chrono::{DateTime, Utc}; +use num_traits::{FromPrimitive, ToPrimitive}; +use serde::Serialize; use solana_sdk::pubkey::Pubkey; +use sqlx::types::Decimal; #[event] #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -19,6 +23,29 @@ pub struct OpenBookFillEventLog { pub referrer_rebate: Option, } +#[derive(Copy, Clone, Debug, PartialEq)] +pub struct PgOpenBookFill { + pub time: DateTime, + pub bid: bool, + pub maker: bool, + pub native_qty_paid: Decimal, + pub native_qty_received: Decimal, + pub native_fee_or_rebate: Decimal, +} + +#[derive(Clone, Debug, PartialEq)] +pub struct PgTrader { + pub open_orders_owner: String, + pub raw_ask_size: Decimal, + pub raw_bid_size: Decimal, +} + +#[derive(Clone, Debug, PartialEq, Serialize)] +pub struct Trader { + pub pubkey: String, + pub volume_base_units: f64, +} + #[derive(Copy, Clone, AnchorDeserialize)] #[cfg_attr(target_endian = "little", derive(Debug))] #[repr(packed)] @@ -72,4 +99,47 @@ pub struct MarketState { pub fee_rate_bps: u64, // 46 pub referrer_rebates_accrued: u64, -} \ No newline at end of file +} + +pub fn calculate_fill_price_and_size( + fill: PgOpenBookFill, + base_decimals: u8, + quote_decimals: u8, +) -> (Decimal, Decimal) { + if fill.bid { + let price_before_fees = if fill.maker { + fill.native_qty_paid + fill.native_fee_or_rebate + } else { + fill.native_qty_paid - fill.native_fee_or_rebate + }; + let price = (price_before_fees * token_factor(base_decimals)) + / (token_factor(quote_decimals) * fill.native_qty_received); + let size = fill.native_qty_received / token_factor(base_decimals); + (price, size) + } else { + let price_before_fees = if fill.maker { + fill.native_qty_received - fill.native_fee_or_rebate + } else { + fill.native_qty_received + fill.native_fee_or_rebate + }; + let price = (price_before_fees * token_factor(base_decimals)) + / (token_factor(quote_decimals) * fill.native_qty_paid); + let size = fill.native_qty_paid / token_factor(base_decimals); + (price, size) + } +} + +pub fn calculate_trader_volume(trader: PgTrader, base_decimals: u8) -> Trader { + let bid_size = trader.raw_bid_size / token_factor(base_decimals); + let ask_size = trader.raw_ask_size / token_factor(base_decimals); + + Trader { + pubkey: trader.open_orders_owner, + volume_base_units: (bid_size + ask_size).to_f64().unwrap(), + // TODO: quote volume + } +} + +fn token_factor(decimals: u8) -> Decimal { + Decimal::from_u64(10u64.pow(decimals as u32)).unwrap() +} diff --git a/src/structs/resolution.rs b/src/structs/resolution.rs index 66c9435..8ffa9dc 100644 --- a/src/structs/resolution.rs +++ b/src/structs/resolution.rs @@ -1,5 +1,5 @@ -use std::fmt; use chrono::Duration; +use std::fmt; use strum::EnumIter; #[derive(EnumIter, Copy, Clone, Eq, PartialEq)] @@ -78,4 +78,4 @@ impl Resolution { _ => Err(()), } } -} \ No newline at end of file +} diff --git a/src/structs/tradingview.rs b/src/structs/tradingview.rs index 6010d35..af68874 100644 --- a/src/structs/tradingview.rs +++ b/src/structs/tradingview.rs @@ -2,7 +2,7 @@ use chrono::Utc; use num_traits::ToPrimitive; use serde::Serialize; -use crate::database::Candle; +use super::candle::Candle; #[derive(Serialize)] pub struct TvResponse { @@ -50,8 +50,6 @@ impl TvResponse { assert_eq!(low.len(), high.len()); assert_eq!(volume.len(), time.len()); - let len = time.len(); - TvResponse { status: "ok".to_owned(), error_message: None, diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 058b113..3bf823a 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,6 +1,6 @@ +use chrono::{NaiveDateTime, Utc}; use serde_derive::Deserialize; use sqlx::{Pool, Postgres}; -use std::fs::File; use crate::structs::markets::MarketInfo; @@ -25,7 +25,9 @@ pub struct Config { pub struct WebContext { pub markets: Vec, - pub pool: Pool + pub pool: Pool, } - +pub fn to_timestampz(seconds: u64) -> chrono::DateTime { + chrono::DateTime::::from_utc(NaiveDateTime::from_timestamp(seconds as i64, 0), Utc) +}