feat: add traders base volume endpoint

This commit is contained in:
dboures 2023-03-13 11:51:30 -05:00
parent 6e8ddd1ef5
commit c01e53b41c
No known key found for this signature in database
GPG Key ID: AB3790129D478852
25 changed files with 399 additions and 246 deletions

61
Cargo.lock generated
View File

@ -2021,9 +2021,9 @@ checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678"
[[package]]
name = "futures"
version = "0.3.25"
version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0"
checksum = "531ac96c6ff5fd7c62263c5e3c67a603af4fcaee2e1a0ae5565ba3a11e69e549"
dependencies = [
"futures-channel",
"futures-core",
@ -2036,9 +2036,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.25"
version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed"
checksum = "164713a5a0dcc3e7b4b1ed7d3b433cabc18025386f9339346e8daf15963cf7ac"
dependencies = [
"futures-core",
"futures-sink",
@ -2046,15 +2046,15 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.25"
version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac"
checksum = "86d7a0c1aa76363dac491de0ee99faf6941128376f1cf96f07db7603b7de69dd"
[[package]]
name = "futures-executor"
version = "0.3.25"
version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7acc85df6714c176ab5edf386123fafe217be88c0840ec11f199441134a074e2"
checksum = "1997dd9df74cdac935c76252744c1ed5794fac083242ea4fe77ef3ed60ba0f83"
dependencies = [
"futures-core",
"futures-task",
@ -2075,15 +2075,15 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.25"
version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb"
checksum = "89d422fa3cbe3b40dca574ab087abb5bc98258ea57eea3fd6f1fa7162c778b91"
[[package]]
name = "futures-macro"
version = "0.3.25"
version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d"
checksum = "3eb14ed937631bd8b8b8977f2c198443447a8355b6e3ca599f38c975e5a963b6"
dependencies = [
"proc-macro2 1.0.50",
"quote 1.0.23",
@ -2092,21 +2092,21 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.25"
version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9"
checksum = "ec93083a4aecafb2a80a885c9de1f0ccae9dbd32c2bb54b0c3a65690e0b8d2f2"
[[package]]
name = "futures-task"
version = "0.3.25"
version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea"
checksum = "fd65540d33b37b16542a0438c12e6aeead10d4ac5d05bd3f805b8f35ab592879"
[[package]]
name = "futures-util"
version = "0.3.25"
version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6"
checksum = "3ef6b17e481503ec85211fed8f39d1970f128935ca1f814cd32ac4a6842e84ab"
dependencies = [
"futures 0.1.31",
"futures-channel",
@ -2203,7 +2203,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8af59a261bcf42f45d1b261232847b9b850ba0a1419d6100698246fb66e9240"
dependencies = [
"arc-swap",
"futures 0.3.25",
"futures 0.3.27",
"log 0.4.17",
"reqwest",
"serde",
@ -2495,7 +2495,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca815a891b24fdfb243fa3239c86154392b0953ee584aa1a2a1f66d20cbe75cc"
dependencies = [
"bytes 1.3.0",
"futures 0.3.25",
"futures 0.3.27",
"headers",
"http",
"hyper 0.14.23",
@ -2735,7 +2735,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2b99d4207e2a04fb4581746903c2bb7eb376f88de9c699d0f3e10feeac0cd3a"
dependencies = [
"derive_more",
"futures 0.3.25",
"futures 0.3.27",
"hyper 0.14.23",
"jsonrpc-core",
"jsonrpc-pubsub",
@ -2753,7 +2753,7 @@ version = "18.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14f7f76aef2d054868398427f6c54943cf3d1caa9a7ec7d0c38d69df97a965eb"
dependencies = [
"futures 0.3.25",
"futures 0.3.27",
"futures-executor",
"futures-util",
"log 0.4.17",
@ -2768,7 +2768,7 @@ version = "18.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b51da17abecbdab3e3d4f26b01c5ec075e88d3abe3ab3b05dc9aa69392764ec0"
dependencies = [
"futures 0.3.25",
"futures 0.3.27",
"jsonrpc-client-transports",
]
@ -2790,7 +2790,7 @@ version = "18.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1dea6e07251d9ce6a552abfb5d7ad6bc290a4596c8dcc3d795fae2bbdc1f3ff"
dependencies = [
"futures 0.3.25",
"futures 0.3.27",
"hyper 0.14.23",
"jsonrpc-core",
"jsonrpc-server-utils",
@ -2806,7 +2806,7 @@ version = "18.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "240f87695e6c6f62fb37f05c02c04953cf68d6408b8c1c89de85c7a0125b1011"
dependencies = [
"futures 0.3.25",
"futures 0.3.27",
"jsonrpc-core",
"lazy_static",
"log 0.4.17",
@ -2822,7 +2822,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa4fdea130485b572c39a460d50888beb00afb3e35de23ccd7fad8ff19f0e0d4"
dependencies = [
"bytes 1.3.0",
"futures 0.3.25",
"futures 0.3.27",
"globset",
"jsonrpc-core",
"lazy_static",
@ -3501,6 +3501,7 @@ dependencies = [
"derive_more",
"dotenv",
"env_logger 0.10.0",
"futures 0.3.27",
"jsonrpc-core-client",
"log 0.4.17",
"num-traits",
@ -5058,7 +5059,7 @@ checksum = "41d1c5305e39e09653383c2c7244f2f78b3bcae37cf50c64cb4789c9f5096ec2"
dependencies = [
"base64 0.13.1",
"bytes 1.3.0",
"futures 0.3.25",
"futures 0.3.27",
"httparse",
"log 0.4.17",
"rand 0.8.5",
@ -5213,7 +5214,7 @@ dependencies = [
"clap 2.34.0",
"crossbeam-channel",
"enum_dispatch",
"futures 0.3.25",
"futures 0.3.27",
"futures-util",
"indexmap",
"indicatif",
@ -5429,7 +5430,7 @@ dependencies = [
"crossbeam-channel",
"dashmap",
"fs_extra",
"futures 0.3.25",
"futures 0.3.27",
"itertools",
"lazy_static",
"libc",
@ -5927,7 +5928,7 @@ dependencies = [
"bzip2",
"enum-iterator",
"flate2",
"futures 0.3.25",
"futures 0.3.27",
"goauth",
"http",
"hyper 0.14.23",

View File

@ -18,6 +18,7 @@ path = "src/server/main.rs"
[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
futures = "0.3.27"
jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] }

View File

@ -20,24 +20,6 @@
},
"query": "CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)"
},
"63fd83accf07c969461c66ca26a28f506dbbc2e8c0b7d74d9a06ccf52aa1b8b6": {
"describe": {
"columns": [
{
"name": "total",
"ordinal": 0,
"type_info": "Int8"
}
],
"nullable": [
null
],
"parameters": {
"Left": []
}
},
"query": "Select COUNT(*) as total from fills"
},
"6658c0121e5a7defbd1fe7c549ca0a957b188b9eb1837573a05d0e6476ef945a": {
"describe": {
"columns": [],
@ -349,6 +331,40 @@
},
"query": "SELECT \n time as \"time!\",\n bid as \"bid!\",\n maker as \"maker!\",\n native_qty_paid as \"native_qty_paid!\",\n native_qty_received as \"native_qty_received!\",\n native_fee_or_rebate as \"native_fee_or_rebate!\" \n from fills \n where market = $1\n and time >= $2\n and time < $3 \n and maker = true\n ORDER BY time asc"
},
"dc367af7bb8da9d57033833da06567f3f3cfdcff72e5b140ccd3355f91b4c5a7": {
"describe": {
"columns": [
{
"name": "open_orders_owner",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "raw_ask_size!",
"ordinal": 1,
"type_info": "Numeric"
},
{
"name": "raw_bid_size!",
"ordinal": 2,
"type_info": "Numeric"
}
],
"nullable": [
false,
null,
null
],
"parameters": {
"Left": [
"Text",
"Timestamptz",
"Timestamptz"
]
}
},
"query": "SELECT \n open_orders_owner, \n sum(\n native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END\n ) as \"raw_ask_size!\",\n sum(\n native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END\n ) as \"raw_bid_size!\"\n FROM public.\"fills\"\n WHERE market = $1\n AND time >= $2\n AND time < $3\n GROUP BY open_orders_owner\n ORDER BY \n sum(native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END) \n + \n sum(native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END) \nDESC "
},
"dc7c7c04b6870b9617e1e869aa4b7027baddaeeb22f2792f2e9c40f643f863c7": {
"describe": {
"columns": [

View File

@ -4,10 +4,11 @@ use sqlx::{types::Decimal, Pool, Postgres};
use std::cmp::{max, min};
use crate::{
database::{
fetch::{fetch_candles_from, fetch_earliest_candle, fetch_latest_finished_candle},
Candle,
}, structs::resolution::{day, Resolution},
database::fetch::{fetch_candles_from, fetch_earliest_candle, fetch_latest_finished_candle},
structs::{
candle::Candle,
resolution::{day, Resolution},
},
};
pub async fn batch_higher_order_candles(

View File

@ -4,10 +4,15 @@ use chrono::{DateTime, Duration, DurationRound, Utc};
use num_traits::{FromPrimitive, Zero};
use sqlx::{types::Decimal, Pool, Postgres};
use crate::{database::{
fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
Candle, PgOpenBookFill,
}, structs::{markets::MarketInfo, resolution::{Resolution, day}}};
use crate::{
database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
structs::{
candle::Candle,
markets::MarketInfo,
openbook::{calculate_fill_price_and_size, PgOpenBookFill},
resolution::{day, Resolution},
},
};
pub async fn batch_1m_candles(
pool: &Pool<Postgres>,
@ -106,35 +111,3 @@ fn combine_fills_into_1m_candles(
candles
}
fn calculate_fill_price_and_size(
fill: PgOpenBookFill,
base_decimals: u8,
quote_decimals: u8,
) -> (Decimal, Decimal) {
if fill.bid {
let price_before_fees = if fill.maker {
fill.native_qty_paid + fill.native_fee_or_rebate
} else {
fill.native_qty_paid - fill.native_fee_or_rebate
};
let price = (price_before_fees * token_factor(base_decimals))
/ (token_factor(quote_decimals) * fill.native_qty_received);
let size = fill.native_qty_received / token_factor(base_decimals);
(price, size)
} else {
let price_before_fees = if fill.maker {
fill.native_qty_received - fill.native_fee_or_rebate
} else {
fill.native_qty_received + fill.native_fee_or_rebate
};
let price = (price_before_fees * token_factor(base_decimals))
/ (token_factor(quote_decimals) * fill.native_qty_paid);
let size = fill.native_qty_paid / token_factor(base_decimals);
(price, size)
}
}
fn token_factor(decimals: u8) -> Decimal {
Decimal::from_u64(10u64.pow(decimals as u32)).unwrap()
}

View File

@ -8,13 +8,11 @@ use tokio::{sync::mpsc::Sender, time::sleep};
use crate::{
candle_creation::candle_batching::minute_candles::batch_1m_candles,
database::{Candle}, structs::{markets::MarketInfo, resolution::Resolution},
structs::{candle::Candle, markets::MarketInfo, resolution::Resolution},
};
use self::higher_order_candles::batch_higher_order_candles;
pub async fn batch_candles(
pool: Pool<Postgres>,
candles_sender: &Sender<Vec<Candle>>,

View File

@ -7,11 +7,11 @@ use openbook_candles::candle_creation::trade_fetching::{
use openbook_candles::database::{
initialize::{connect_to_database, setup_database},
insert::{persist_candles, persist_fill_events},
Candle,
};
use openbook_candles::structs::candle::Candle;
use openbook_candles::structs::markets::load_markets;
use openbook_candles::structs::openbook::OpenBookFillEventLog;
use openbook_candles::utils::{Config};
use openbook_candles::utils::Config;
use solana_sdk::pubkey::Pubkey;
use std::{collections::HashMap, str::FromStr};
use tokio::sync::mpsc;
@ -22,7 +22,10 @@ async fn main() -> anyhow::Result<()> {
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
let database_url: String = dotenv::var("DATABASE_URL").unwrap();
let max_pg_pool_connections: u32 = dotenv::var("MAX_PG_POOL_CONNS_WORKER").unwrap().parse::<u32>().unwrap();
let max_pg_pool_connections: u32 = dotenv::var("MAX_PG_POOL_CONNS_WORKER")
.unwrap()
.parse::<u32>()
.unwrap();
let config = Config {
rpc_url: rpc_url.clone(),
@ -43,11 +46,11 @@ async fn main() -> anyhow::Result<()> {
let (fill_sender, fill_receiver) = mpsc::channel::<OpenBookFillEventLog>(1000);
let bf_sender = fill_sender.clone();
let targets = target_markets.clone();
tokio::spawn(async move {
backfill(&rpc_url.clone(), &bf_sender, &targets).await;
});
// let bf_sender = fill_sender.clone();
// let targets = target_markets.clone();
// tokio::spawn(async move {
// backfill(&rpc_url.clone(), &bf_sender, &targets).await;
// });
tokio::spawn(async move {
scrape(&config, &fill_sender, &target_markets).await; //TODO: send the vec, it's okay

View File

@ -1,14 +1,17 @@
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use solana_client::{rpc_client::RpcClient, rpc_config::RpcTransactionConfig};
use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcTransactionConfig};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature};
use solana_transaction_status::UiTransactionEncoding;
use std::collections::HashMap;
use tokio::sync::mpsc::Sender;
use crate::{candle_creation::trade_fetching::scrape::scrape_transactions, structs::openbook::OpenBookFillEventLog};
use crate::{
candle_creation::trade_fetching::scrape::scrape_transactions,
structs::openbook::OpenBookFillEventLog,
};
pub async fn backfill(
rpc_url: &String,
rpc_url: String,
fill_sender: &Sender<OpenBookFillEventLog>,
target_markets: &HashMap<Pubkey, u8>,
) {
@ -31,9 +34,12 @@ pub async fn backfill(
commitment: Some(CommitmentConfig::confirmed()),
max_supported_transaction_version: Some(0),
};
match rpc_client.get_transaction_with_config(&last_sig_option.unwrap(), txn_config) {
match rpc_client
.get_transaction_with_config(&last_sig_option.unwrap(), txn_config)
.await
{
Ok(txn) => {
let unix_sig_time = rpc_client.get_block_time(txn.slot).unwrap();
let unix_sig_time = rpc_client.get_block_time(txn.slot).await.unwrap();
if unix_sig_time < end_time {
break;
}

View File

@ -1,8 +1,10 @@
use anchor_lang::AnchorDeserialize;
use futures::future::join_all;
use solana_account_decoder::UiAccountEncoding;
use solana_client::{
client_error::Result as ClientResult,
rpc_client::{GetConfirmedSignaturesForAddress2Config, RpcClient},
nonblocking::rpc_client::RpcClient,
rpc_client::GetConfirmedSignaturesForAddress2Config,
rpc_config::{RpcAccountInfoConfig, RpcTransactionConfig},
};
use solana_sdk::{
@ -14,18 +16,22 @@ use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
use tokio::sync::mpsc::Sender;
use crate::{
utils::{Config}, structs::{openbook::{OpenBookFillEventLog, MarketState}, markets::{MarketInfo, MarketConfig}},
structs::{
markets::{MarketConfig, MarketInfo},
openbook::{MarketState, OpenBookFillEventLog},
},
utils::Config,
};
use super::parsing::{parse_trades_from_openbook_txns};
use super::parsing::parse_trades_from_openbook_txns;
pub async fn scrape(
config: &Config,
fill_sender: &Sender<OpenBookFillEventLog>,
target_markets: &HashMap<Pubkey, u8>,
) {
let url = &config.rpc_url;
let rpc_client = RpcClient::new_with_commitment(url, CommitmentConfig::processed());
let rpc_client =
RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::processed());
let before_slot = None;
loop {
@ -55,10 +61,13 @@ pub async fn scrape_transactions(
commitment: Some(CommitmentConfig::confirmed()),
};
let mut sigs = match rpc_client.get_signatures_for_address_with_config(
&Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(),
rpc_config,
) {
let mut sigs = match rpc_client
.get_signatures_for_address_with_config(
&Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(),
rpc_config,
)
.await
{
Ok(s) => s,
Err(e) => {
println!("Error in get_signatures_for_address_with_config: {}", e);
@ -85,15 +94,17 @@ pub async fn scrape_transactions(
max_supported_transaction_version: Some(0),
};
let mut txns = sigs
let signatures: Vec<_> = sigs
.into_iter()
.map(|sig| {
rpc_client.get_transaction_with_config(
&sig.signature.parse::<Signature>().unwrap(),
txn_config,
)
})
.collect::<Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>>(); // TODO: am I actually getting all the txns?
.map(|sig| sig.signature.parse::<Signature>().unwrap())
.collect();
let txn_futs: Vec<_> = signatures
.iter()
.map(|s| rpc_client.get_transaction_with_config(&s, txn_config))
.collect();
let mut txns = join_all(txn_futs).await;
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
if fills.len() > 0 {
@ -111,8 +122,8 @@ pub async fn fetch_market_infos(
config: &Config,
markets: Vec<MarketConfig>,
) -> anyhow::Result<Vec<MarketInfo>> {
let url = &config.rpc_url;
let rpc_client = RpcClient::new_with_commitment(url, CommitmentConfig::processed());
let rpc_client =
RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::processed());
let rpc_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
@ -127,7 +138,7 @@ pub async fn fetch_market_infos(
.collect::<Vec<Pubkey>>();
let mut market_results = rpc_client
.get_multiple_accounts_with_config(&market_keys, rpc_config.clone())
.unwrap()
.await?
.value;
let mut mint_key_map = HashMap::new();
@ -171,7 +182,7 @@ pub async fn fetch_market_infos(
let mint_results = rpc_client
.get_multiple_accounts_with_config(&mint_keys, rpc_config)
.unwrap()
.await?
.value;
for i in 0..mint_results.len() {
let mut mint_account = mint_results[i].as_ref().unwrap().clone();

View File

@ -1,9 +1,14 @@
use chrono::{DateTime, Utc};
use sqlx::{Pool, Postgres};
use crate::{database::PgOpenBookFill, utils::AnyhowWrap};
use super::{Candle, Resolution};
use crate::{
structs::{
candle::Candle,
openbook::{PgOpenBookFill, PgTrader},
resolution::Resolution,
},
utils::AnyhowWrap,
};
pub async fn fetch_earliest_fill(
pool: &Pool<Postgres>,
@ -192,3 +197,40 @@ pub async fn fetch_tradingview_candles(
.await
.map_err_anyhow()
}
pub async fn fetch_top_traders_by_volume_from(
pool: &Pool<Postgres>,
market_address_string: &str,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
) -> anyhow::Result<Vec<PgTrader>> {
sqlx::query_as!(
PgTrader,
r#"SELECT
open_orders_owner,
sum(
native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
) as "raw_ask_size!",
sum(
native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
) as "raw_bid_size!"
FROM public."fills"
WHERE market = $1
AND time >= $2
AND time < $3
GROUP BY open_orders_owner
ORDER BY
sum(native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
+
sum(native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
DESC "#,
market_address_string,
start_time,
end_time
)
.fetch_all(pool)
.await
.map_err_anyhow()
}
// pub async fn fetch_traders_above_x_dollars

View File

@ -7,9 +7,10 @@ use std::{
};
use tokio::sync::mpsc::{error::TryRecvError, Receiver};
use crate::{utils::AnyhowWrap, structs::openbook::OpenBookFillEventLog};
use super::Candle;
use crate::{
structs::{candle::Candle, openbook::OpenBookFillEventLog},
utils::AnyhowWrap,
};
pub async fn persist_fill_events(
pool: &Pool<Postgres>,

View File

@ -1,56 +1,3 @@
use chrono::{DateTime, NaiveDateTime, Utc};
use num_traits::Zero;
use sqlx::types::Decimal;
use crate::structs::resolution::Resolution;
pub mod fetch;
pub mod initialize;
pub mod insert;
pub trait Summary {
fn summarize(&self) -> String;
}
#[derive(Clone, Debug)]
pub struct Candle {
pub market_name: String,
pub start_time: DateTime<Utc>,
pub end_time: DateTime<Utc>,
pub resolution: String,
pub open: Decimal,
pub close: Decimal,
pub high: Decimal,
pub low: Decimal,
pub volume: Decimal,
pub complete: bool,
}
impl Candle {
pub fn create_empty_candle(market_name: String, resolution: Resolution) -> Candle {
Candle {
market_name,
start_time: DateTime::from_utc(NaiveDateTime::MIN, Utc),
end_time: DateTime::from_utc(NaiveDateTime::MIN, Utc),
resolution: resolution.to_string(),
open: Decimal::zero(),
close: Decimal::zero(),
high: Decimal::zero(),
low: Decimal::zero(),
volume: Decimal::zero(),
complete: false,
}
}
}
#[derive(Copy, Clone, Debug, PartialEq)]
pub struct PgOpenBookFill {
pub time: DateTime<Utc>,
pub bid: bool,
pub maker: bool,
pub native_qty_paid: Decimal,
pub native_qty_received: Decimal,
pub native_fee_or_rebate: Decimal,
}

View File

@ -1,4 +1,4 @@
pub mod candle_creation;
pub mod database;
pub mod utils;
pub mod structs;
pub mod utils;

View File

@ -1,10 +1,17 @@
use chrono::{Utc, NaiveDateTime};
use openbook_candles::{utils::WebContext, database::{fetch::fetch_tradingview_candles}, structs::{resolution::Resolution, markets::MarketInfo, tradingview::TvResponse}};
use openbook_candles::{
database::fetch::fetch_tradingview_candles,
structs::{
markets::{valid_market, MarketInfo},
resolution::Resolution,
tradingview::TvResponse,
},
utils::{to_timestampz, WebContext},
};
use crate::server_error::ServerError;
use {
actix_web::{get, web, HttpResponse, Scope},
actix_web::{get, web, HttpResponse},
serde::Deserialize,
};
@ -16,11 +23,6 @@ pub struct Params {
pub resolution: String,
}
pub fn service() -> Scope {
web::scope("/tradingview")
.service(get_candles)
}
#[get("/candles")]
pub async fn get_candles(
info: web::Query<Params>,
@ -36,18 +38,13 @@ pub async fn get_candles(
let from = to_timestampz(info.from);
let to = to_timestampz(info.to);
let candles = match fetch_tradingview_candles(&context.pool, &info.market_name, resolution, from, to).await {
Ok(c) => c,
Err(_) => return Err(ServerError::DbQueryError)
};
let candles =
match fetch_tradingview_candles(&context.pool, &info.market_name, resolution, from, to)
.await
{
Ok(c) => c,
Err(_) => return Err(ServerError::DbQueryError),
};
Ok(HttpResponse::Ok().json(TvResponse::candles_to_tv(candles)))
}
fn valid_market(market_name: &str, markets: &Vec<MarketInfo>) -> bool {
markets.iter().any(|x| x.name == market_name)
}
fn to_timestampz(seconds: u64) -> chrono::DateTime<Utc> {
chrono::DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(seconds as i64, 0), Utc)
}

View File

@ -4,32 +4,21 @@ use actix_web::{
web::{self, Data},
App, HttpResponse, HttpServer, Responder,
};
use candles::get_candles;
use dotenv;
use openbook_candles::{utils::{Config, WebContext}, candle_creation::trade_fetching::scrape::fetch_market_infos, database::initialize::connect_to_database, structs::markets::load_markets};
use openbook_candles::{
candle_creation::trade_fetching::scrape::fetch_market_infos,
database::initialize::connect_to_database,
structs::markets::load_markets,
utils::{Config, WebContext},
};
use sqlx::{Pool, Postgres};
use traders::get_top_traders_by_base_volume;
mod candles;
mod server_error;
mod traders;
#[get("/")]
async fn hello() -> impl Responder {
HttpResponse::Ok().body("Hello world!")
}
#[get("/trade-count")]
async fn get_total_trades(pool_data: web::Data<Pool<Postgres>>) -> impl Responder {
let pool = pool_data.get_ref();
let total_query = sqlx::query!("Select COUNT(*) as total from fills")
.fetch_one(pool)
.await
.unwrap();
let total_trades: i64 = total_query.total.unwrap_or_else(|| 0);
HttpResponse::Ok().json(total_trades)
}
async fn manual_hello() -> impl Responder {
HttpResponse::Ok().body("Hey there!")
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
@ -38,7 +27,10 @@ async fn main() -> std::io::Result<()> {
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
let database_url: String = dotenv::var("DATABASE_URL").unwrap();
let max_pg_pool_connections: u32 = dotenv::var("MAX_PG_POOL_CONNS_SERVER").unwrap().parse::<u32>().unwrap();
let max_pg_pool_connections: u32 = dotenv::var("MAX_PG_POOL_CONNS_SERVER")
.unwrap()
.parse::<u32>()
.unwrap();
let config = Config {
rpc_url: rpc_url.clone(),
@ -55,12 +47,17 @@ async fn main() -> std::io::Result<()> {
markets: market_infos,
});
println!("Starting server");
HttpServer::new(move || {
App::new()
.wrap(Logger::default())
.app_data(Data::new(context.clone()))
.service(candles::service())
.route("/hey", web::get().to(manual_hello))
.app_data(context.clone())
.service(
web::scope("/api")
.service(get_candles)
.service(get_top_traders_by_base_volume)
// .service(get_top_traders_by_quote_volume)
)
})
.bind(("127.0.0.1", 8080))?
.run()

View File

@ -1 +1,2 @@
pub mod candles;
pub mod candles;
pub mod traders;

View File

@ -17,8 +17,8 @@ pub enum ServerError {
DbQueryError,
#[display(fmt = "Error getting connection")]
DbPoolError,
#[display(fmt = "Raw market not found")]
RawMarketNotFound,
#[display(fmt = "Market not found")]
MarketNotFound,
#[display(fmt = "Request symbol not found")]
SymbolNotFound,
}
@ -37,7 +37,7 @@ impl error::ResponseError for ServerError {
ServerError::WrongResolution => StatusCode::BAD_REQUEST,
ServerError::DbQueryError => StatusCode::INTERNAL_SERVER_ERROR,
ServerError::DbPoolError => StatusCode::INTERNAL_SERVER_ERROR,
ServerError::RawMarketNotFound => StatusCode::BAD_REQUEST,
ServerError::MarketNotFound => StatusCode::BAD_REQUEST,
ServerError::SymbolNotFound => StatusCode::BAD_REQUEST,
}
}
@ -47,4 +47,4 @@ impl From<ServerError> for std::io::Error {
fn from(e: ServerError) -> Self {
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
}
}
}

47
src/server/traders.rs Normal file
View File

@ -0,0 +1,47 @@
use crate::server_error::ServerError;
use openbook_candles::{
database::fetch::fetch_top_traders_by_volume_from,
structs::openbook::{calculate_trader_volume, Trader},
utils::{to_timestampz, WebContext},
};
use {
actix_web::{get, web, HttpResponse},
serde::Deserialize,
};
#[derive(Debug, Deserialize)]
pub struct Params {
pub market_name: String,
pub from: u64,
pub to: u64,
}
#[get("/traders/base-volume")]
pub async fn get_top_traders_by_base_volume(
info: web::Query<Params>,
context: web::Data<WebContext>,
) -> Result<HttpResponse, ServerError> {
let selected_market = context.markets.iter().find(|x| x.name == info.market_name);
if selected_market.is_none() {
return Err(ServerError::MarketNotFound);
}
let selected_market = selected_market.unwrap();
let from = to_timestampz(info.from);
let to = to_timestampz(info.to);
let raw_traders =
match fetch_top_traders_by_volume_from(&context.pool, &selected_market.address, from, to)
.await
{
Ok(c) => c,
Err(_) => return Err(ServerError::DbQueryError),
};
let traders = raw_traders
.into_iter()
.map(|t| calculate_trader_volume(t, selected_market.base_decimals))
.collect::<Vec<Trader>>();
// TODO: add start and end in response?
Ok(HttpResponse::Ok().json(traders))
}

36
src/structs/candle.rs Normal file
View File

@ -0,0 +1,36 @@
use chrono::{DateTime, NaiveDateTime, Utc};
use num_traits::Zero;
use sqlx::types::Decimal;
use super::resolution::Resolution;
#[derive(Clone, Debug)]
pub struct Candle {
pub market_name: String,
pub start_time: DateTime<Utc>,
pub end_time: DateTime<Utc>,
pub resolution: String,
pub open: Decimal,
pub close: Decimal,
pub high: Decimal,
pub low: Decimal,
pub volume: Decimal,
pub complete: bool,
}
impl Candle {
pub fn create_empty_candle(market_name: String, resolution: Resolution) -> Candle {
Candle {
market_name,
start_time: DateTime::from_utc(NaiveDateTime::MIN, Utc),
end_time: DateTime::from_utc(NaiveDateTime::MIN, Utc),
resolution: resolution.to_string(),
open: Decimal::zero(),
close: Decimal::zero(),
high: Decimal::zero(),
low: Decimal::zero(),
volume: Decimal::zero(),
complete: false,
}
}
}

View File

@ -1,5 +1,5 @@
use std::fs::File;
use serde::Deserialize;
use std::fs::File;
#[derive(Debug, Clone)]
pub struct MarketInfo {
@ -22,4 +22,8 @@ pub struct MarketConfig {
pub fn load_markets(path: &str) -> Vec<MarketConfig> {
let reader = File::open(path).unwrap();
serde_json::from_reader(reader).unwrap()
}
}
pub fn valid_market(market_name: &str, markets: &Vec<MarketInfo>) -> bool {
markets.iter().any(|x| x.name == market_name)
}

View File

@ -1,4 +1,5 @@
pub mod candle;
pub mod markets;
pub mod openbook;
pub mod resolution;
pub mod tradingview;
pub mod openbook;

View File

@ -1,5 +1,9 @@
use anchor_lang::{event, AnchorDeserialize, AnchorSerialize};
use chrono::{DateTime, Utc};
use num_traits::{FromPrimitive, ToPrimitive};
use serde::Serialize;
use solana_sdk::pubkey::Pubkey;
use sqlx::types::Decimal;
#[event]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@ -19,6 +23,29 @@ pub struct OpenBookFillEventLog {
pub referrer_rebate: Option<u64>,
}
#[derive(Copy, Clone, Debug, PartialEq)]
pub struct PgOpenBookFill {
pub time: DateTime<Utc>,
pub bid: bool,
pub maker: bool,
pub native_qty_paid: Decimal,
pub native_qty_received: Decimal,
pub native_fee_or_rebate: Decimal,
}
#[derive(Clone, Debug, PartialEq)]
pub struct PgTrader {
pub open_orders_owner: String,
pub raw_ask_size: Decimal,
pub raw_bid_size: Decimal,
}
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct Trader {
pub pubkey: String,
pub volume_base_units: f64,
}
#[derive(Copy, Clone, AnchorDeserialize)]
#[cfg_attr(target_endian = "little", derive(Debug))]
#[repr(packed)]
@ -72,4 +99,47 @@ pub struct MarketState {
pub fee_rate_bps: u64,
// 46
pub referrer_rebates_accrued: u64,
}
}
pub fn calculate_fill_price_and_size(
fill: PgOpenBookFill,
base_decimals: u8,
quote_decimals: u8,
) -> (Decimal, Decimal) {
if fill.bid {
let price_before_fees = if fill.maker {
fill.native_qty_paid + fill.native_fee_or_rebate
} else {
fill.native_qty_paid - fill.native_fee_or_rebate
};
let price = (price_before_fees * token_factor(base_decimals))
/ (token_factor(quote_decimals) * fill.native_qty_received);
let size = fill.native_qty_received / token_factor(base_decimals);
(price, size)
} else {
let price_before_fees = if fill.maker {
fill.native_qty_received - fill.native_fee_or_rebate
} else {
fill.native_qty_received + fill.native_fee_or_rebate
};
let price = (price_before_fees * token_factor(base_decimals))
/ (token_factor(quote_decimals) * fill.native_qty_paid);
let size = fill.native_qty_paid / token_factor(base_decimals);
(price, size)
}
}
pub fn calculate_trader_volume(trader: PgTrader, base_decimals: u8) -> Trader {
let bid_size = trader.raw_bid_size / token_factor(base_decimals);
let ask_size = trader.raw_ask_size / token_factor(base_decimals);
Trader {
pubkey: trader.open_orders_owner,
volume_base_units: (bid_size + ask_size).to_f64().unwrap(),
// TODO: quote volume
}
}
fn token_factor(decimals: u8) -> Decimal {
Decimal::from_u64(10u64.pow(decimals as u32)).unwrap()
}

View File

@ -1,5 +1,5 @@
use std::fmt;
use chrono::Duration;
use std::fmt;
use strum::EnumIter;
#[derive(EnumIter, Copy, Clone, Eq, PartialEq)]
@ -78,4 +78,4 @@ impl Resolution {
_ => Err(()),
}
}
}
}

View File

@ -2,7 +2,7 @@ use chrono::Utc;
use num_traits::ToPrimitive;
use serde::Serialize;
use crate::database::Candle;
use super::candle::Candle;
#[derive(Serialize)]
pub struct TvResponse {
@ -50,8 +50,6 @@ impl TvResponse {
assert_eq!(low.len(), high.len());
assert_eq!(volume.len(), time.len());
let len = time.len();
TvResponse {
status: "ok".to_owned(),
error_message: None,

View File

@ -1,6 +1,6 @@
use chrono::{NaiveDateTime, Utc};
use serde_derive::Deserialize;
use sqlx::{Pool, Postgres};
use std::fs::File;
use crate::structs::markets::MarketInfo;
@ -25,7 +25,9 @@ pub struct Config {
pub struct WebContext {
pub markets: Vec<MarketInfo>,
pub pool: Pool<Postgres>
pub pool: Pool<Postgres>,
}
pub fn to_timestampz(seconds: u64) -> chrono::DateTime<Utc> {
chrono::DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(seconds as i64, 0), Utc)
}