diff --git a/candle-creator/Cargo.lock b/candle-creator/Cargo.lock index 2a20551..c6306e2 100644 --- a/candle-creator/Cargo.lock +++ b/candle-creator/Cargo.lock @@ -1445,6 +1445,12 @@ dependencies = [ "syn 0.15.44", ] +[[package]] +name = "dotenv" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" + [[package]] name = "dotenvy" version = "0.15.6" @@ -3118,10 +3124,12 @@ dependencies = [ "async-trait", "borsh", "chrono", + "dotenv", "jsonrpc-core-client", "log 0.4.17", "serde", "serde_derive", + "serde_json", "serum_dex", "solana-account-decoder", "solana-client", @@ -3131,7 +3139,6 @@ dependencies = [ "sqlx", "tokio", "tokio-stream", - "toml", ] [[package]] diff --git a/candle-creator/Cargo.toml b/candle-creator/Cargo.toml index 311125a..9f00b79 100644 --- a/candle-creator/Cargo.toml +++ b/candle-creator/Cargo.toml @@ -26,9 +26,10 @@ async-trait = "0.1" anyhow = "1.0" log = "0.4" -toml = "0.5" -serde = "1.0.130" -serde_derive = "1.0.130" +dotenv = "0.15.0" +serde = "1.0" +serde_json = "1.0" +serde_derive = "1.0" serum_dex = { version = "0.5.10", git = "https://github.com/openbook-dex/program.git", default-features=false, features = ["no-entrypoint", "program"] } anchor-lang = ">=0.25.0" diff --git a/candle-creator/src/database/database.rs b/candle-creator/src/database/database.rs index 426bf18..9970b07 100644 --- a/candle-creator/src/database/database.rs +++ b/candle-creator/src/database/database.rs @@ -3,26 +3,23 @@ use sqlx::{ postgres::{PgPoolOptions, PgQueryResult}, Executor, Pool, Postgres, }; -use std::{time::{Duration, Instant}, collections::hash_map::DefaultHasher, hash::{Hash, Hasher}}; +use std::{ + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, + time::{Duration, Instant}, +}; use tokio::sync::mpsc::{error::TryRecvError, Receiver}; use crate::{ - trade_fetching::parsing::FillEventLog, + trade_fetching::parsing::OpenBookFillEventLog, utils::{AnyhowWrap, Config}, }; pub async fn connect_to_database(config: &Config) -> anyhow::Result> { - // let conn_str = std::env::var("POSTGRES_CONN_STRING") - // .expect("POSTGRES_CONN_STRING environment variable must be set!"); - - // let config_str = - // format!("host=0.0.0.0 port=5432 password={password} user=postgres dbname=postgres"); - let db_config = &config.database_config; - loop { let pool = PgPoolOptions::new() - .max_connections(db_config.max_pg_pool_connections) - .connect(&db_config.connection_string) + .max_connections(config.max_pg_pool_connections) + .connect(&config.database_url) .await; if pool.is_ok() { println!("Database connected"); @@ -118,15 +115,16 @@ pub async fn save_candles() { pub async fn handle_fill_events( pool: &Pool, - mut fill_event_receiver: Receiver, + mut fill_receiver: Receiver, ) { loop { let start = Instant::now(); let mut write_batch = Vec::new(); while write_batch.len() < 10 || start.elapsed().as_secs() > 10 { - match fill_event_receiver.try_recv() { + match fill_receiver.try_recv() { Ok(event) => { if !write_batch.contains(&event) { + // O(n) write_batch.push(event) } } @@ -149,7 +147,7 @@ pub async fn handle_fill_events( } } -fn build_fills_upsert_statement(events: Vec) -> String { +fn build_fills_upsert_statement(events: Vec) -> String { let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, client_order_id, referrer_rebate) VALUES"); for (idx, event) in events.iter().enumerate() { let mut hasher = DefaultHasher::new(); @@ -190,13 +188,13 @@ fn build_fills_upsert_statement(events: Vec) -> String { #[cfg(test)] mod tests { - use std::str::FromStr; - use solana_sdk::pubkey::Pubkey; use super::*; + use solana_sdk::pubkey::Pubkey; + use std::str::FromStr; #[test] fn test_event_hashing() { - let event_1 = FillEventLog { + let event_1 = OpenBookFillEventLog { market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(), open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(), open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw") @@ -213,7 +211,7 @@ mod tests { referrer_rebate: Some(841), }; - let event_2 = FillEventLog { + let event_2 = OpenBookFillEventLog { market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(), open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(), open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw") diff --git a/candle-creator/src/database/mod.rs b/candle-creator/src/database/mod.rs index 3ad932e..12c5fba 100644 --- a/candle-creator/src/database/mod.rs +++ b/candle-creator/src/database/mod.rs @@ -1,5 +1,15 @@ +use chrono::{DateTime, Utc}; +use solana_sdk::pubkey::Pubkey; + pub mod database; pub struct Candle {} -pub struct ParsedFill {} +pub struct MarketInfo { + pub market_key: Pubkey, + pub market_name: String, + pub base_symbol: String, + pub quote_symbol: String, + pub base_decimals: u8, + pub quote_decimals: u8, +} diff --git a/candle-creator/src/database/sample.rs b/candle-creator/src/database/sample.rs deleted file mode 100644 index 3a202f0..0000000 --- a/candle-creator/src/database/sample.rs +++ /dev/null @@ -1,145 +0,0 @@ -use { - std::time::Duration, - sysinfo::SystemExt, - tokio_postgres::{tls::MakeTlsConnect, types::Type, NoTls, Socket, Statement}, -}; - -use chrono::{NaiveDateTime, Utc}; - -use crate::candles::Candle; - -pub struct Database { - client: tokio_postgres::Client, - insertion_statement: Statement, - pub refresh_period_ms: u64, -} - -impl Database { - pub const ENTRY_SIZE: u64 = 112; // Size in bytes of a single db entry - pub const RELATIVE_CHUNK_SIZE: f64 = 0.10; // Size of a timescaledb chunk - pub async fn new( - refresh_period_ms: u64, - number_of_markets: u64, - ) -> Result { - let (client, connection) = connect_to_database().await; - tokio::spawn(async move { - if let Err(e) = connection.await { - eprintln!("connection error: {}", e); - } - }); - initialize(&client, refresh_period_ms, number_of_markets).await?; - let insertion_statement = client - .prepare( - "INSERT INTO candles VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (address, timestamp) DO UPDATE - SET close = EXCLUDED.close, - low = LEAST(EXCLUDED.low, candles.low), - high = GREATEST(EXCLUDED.high, candles.high);", - ) - .await - .unwrap(); - Ok(Self { - client, - insertion_statement, - refresh_period_ms, - }) - } - - pub async fn commit_candle( - &self, - candle: &Candle, - address: &String, - name: &String, - ) -> Result<(), tokio_postgres::Error> { - self.client - .execute( - &self.insertion_statement, - &[ - &chrono::DateTime::::from_utc( - NaiveDateTime::from_timestamp(candle.ts_start, 0), - Utc, - ), - address, - name, - &candle.open, - &candle.close, - &candle.high, - &candle.low, - ], - ) - .await?; - Ok(()) - } -} - -async fn connect_to_database() -> ( - tokio_postgres::Client, - tokio_postgres::Connection>::Stream>, -) { - // let password = std::env::var("POSTGRES_PASSWORD") - // .expect("POSTGRES_PASSWORD environment variable must be set!"); - let password = "postgres"; - let config_str = - format!("host=0.0.0.0 port=5432 password={password} user=postgres dbname=postgres"); - loop { - let res = tokio_postgres::connect(&config_str, NoTls).await; - if let Ok(r) = res { - return r; - } - println!("Failed to connect to database, retrying"); - tokio::time::sleep(Duration::from_millis(500)).await; - } -} - -async fn initialize( - client: &tokio_postgres::Client, - refresh_period_ms: u64, - mut number_of_markets: u64, -) -> Result<(), tokio_postgres::Error> { - number_of_markets = std::cmp::max(10, number_of_markets); - println!("=== Initializing database ==="); - client - .execute( - "CREATE TABLE IF NOT EXISTS candles ( - timestamp TIMESTAMP WITH TIME ZONE NOT NULL, - address VARCHAR(44), - name VARCHAR(20), - open DOUBLE PRECISION, - close DOUBLE PRECISION, - high DOUBLE PRECISION, - low DOUBLE PRECISION, - PRIMARY KEY (timestamp, address) - );", - &[], - ) - .await - .unwrap(); - // We convert the table to a hypertable - let o = client - .query( - "SELECT create_hypertable('candles', 'timestamp', if_not_exists => TRUE);", - &[], - ) - .await - .unwrap(); - println!("Output from create_hypertable"); - println!("{o:?}"); - - // Implements the best practice detailed here - // https://docs.timescale.com/timescaledb/latest/how-to-guides/hypertables/best-practices/#time-intervals - let system_memory_kb = sysinfo::System::new_all().total_memory(); - let chunk_size_ms = - refresh_period_ms * system_memory_kb * 1024 / Database::ENTRY_SIZE / number_of_markets; - let chunk_size_ms = (chunk_size_ms as f64) * Database::RELATIVE_CHUNK_SIZE; - let s = client - .prepare_typed( - "SELECT set_chunk_time_interval('candles', $1);", - &[Type::INT8], - ) - .await - .unwrap(); - let o = client.query(&s, &[&(chunk_size_ms as i64)]).await?; - println!("Output from set_chunk_time_interval"); - println!("{o:?}"); - Ok(()) -} \ No newline at end of file diff --git a/candle-creator/src/main.rs b/candle-creator/src/main.rs index d05460a..70d9e1b 100644 --- a/candle-creator/src/main.rs +++ b/candle-creator/src/main.rs @@ -1,6 +1,6 @@ -use crate::{trade_fetching::parsing::FillEventLog, utils::Config}; +use crate::{trade_fetching::parsing::OpenBookFillEventLog, utils::Config}; use database::database::{connect_to_database, setup_database}; -use std::{fs::File, io::Read}; +use dotenv; use tokio::sync::mpsc; mod database; @@ -9,11 +9,16 @@ mod utils; #[tokio::main] async fn main() -> anyhow::Result<()> { - let config: Config = { - let mut file = File::open("./example-config.toml")?; - let mut contents = String::new(); - file.read_to_string(&mut contents)?; - toml::from_str(&contents).unwrap() + dotenv::dotenv().ok(); + + let rpc_url: String = dotenv::var("RPC_URL").unwrap(); + let database_url: String = dotenv::var("DATABASE_URL").unwrap(); + + let config = Config { + rpc_url, + database_url, + max_pg_pool_connections: 5, + markets: utils::load_markets("/Users/dboures/dev/openbook-candles/markets.json"), }; println!("{:?}", config); @@ -21,16 +26,20 @@ async fn main() -> anyhow::Result<()> { let pool = connect_to_database(&config).await?; setup_database(&pool).await?; - let (fill_event_sender, mut fill_event_receiver) = mpsc::channel::(1000); + let (fill_sender, fill_receiver) = mpsc::channel::(1000); // spawn a thread for each market? // what are the memory implications? + // tokio::spawn(async move { + // trade_fetching::scrape::scrape(&config, fill_event_sender.clone()).await; + // }); + tokio::spawn(async move { - trade_fetching::scrape::scrape(&config, fill_event_sender).await; + trade_fetching::scrape::scrape(&config, fill_sender.clone()).await; }); - database::database::handle_fill_events(&pool, fill_event_receiver).await; + database::database::handle_fill_events(&pool, fill_receiver).await; // trade_fetching::websocket::listen_logs().await?; Ok(()) diff --git a/candle-creator/src/trade_fetching/parsing.rs b/candle-creator/src/trade_fetching/parsing.rs index c8a78b0..00d765e 100644 --- a/candle-creator/src/trade_fetching/parsing.rs +++ b/candle-creator/src/trade_fetching/parsing.rs @@ -11,7 +11,7 @@ const PROGRAM_DATA: &str = "Program data: "; #[event] #[derive(Debug, Clone, PartialEq, Hash)] -pub struct FillEventLog { +pub struct OpenBookFillEventLog { pub market: Pubkey, pub open_orders: Pubkey, pub open_orders_owner: Pubkey, @@ -27,37 +27,34 @@ pub struct FillEventLog { pub referrer_rebate: Option, } -pub fn parse_fill_events_from_txns( +pub fn parse_trades_from_openbook_txns( txns: &mut Vec>, -) -> Vec { - let mut fills_vector = Vec::::new(); +) -> Vec { + let mut fills_vector = Vec::::new(); for txn in txns.iter_mut() { - // println!("{:#?}\n", txn.as_ref()); - - // fugly match txn { Ok(t) => { if let Some(m) = &t.transaction.meta { - // println!("{:#?}\n", m.log_messages); - match &m.log_messages { - OptionSerializer::Some(logs) => match parse_fill_events_from_logs(logs) { - Some(mut events) => fills_vector.append(&mut events), - None => {} - }, + OptionSerializer::Some(logs) => { + match parse_openbook_fills_from_logs(logs) { + Some(mut events) => fills_vector.append(&mut events), + None => {} + } + } OptionSerializer::None => {} OptionSerializer::Skip => {} } } } - Err(_) => {} //println!("goo: {:?}", e), + Err(_) => {} } } - return fills_vector; + fills_vector } -fn parse_fill_events_from_logs(logs: &Vec) -> Option> { - let mut fills_vector = Vec::::new(); +fn parse_openbook_fills_from_logs(logs: &Vec) -> Option> { + let mut fills_vector = Vec::::new(); for l in logs { match l.strip_prefix(PROGRAM_DATA) { Some(log) => { @@ -66,7 +63,7 @@ fn parse_fill_events_from_logs(logs: &Vec) -> Option> _ => continue, }; let mut slice: &[u8] = &borsh_bytes[8..]; - let event: Result = + let event: Result = anchor_lang::AnchorDeserialize::deserialize(&mut slice); match event { diff --git a/candle-creator/src/trade_fetching/scrape.rs b/candle-creator/src/trade_fetching/scrape.rs index 5cb6416..06dc0b8 100644 --- a/candle-creator/src/trade_fetching/scrape.rs +++ b/candle-creator/src/trade_fetching/scrape.rs @@ -1,76 +1,119 @@ -use anyhow::Result; use solana_client::{ client_error::Result as ClientResult, rpc_client::{GetConfirmedSignaturesForAddress2Config, RpcClient}, rpc_config::RpcTransactionConfig, }; -use solana_sdk::{ - commitment_config::CommitmentConfig, - pubkey::Pubkey, - signature::{Keypair, Signature}, -}; -use solana_transaction_status::{ - option_serializer::OptionSerializer, EncodedConfirmedTransactionWithStatusMeta, - UiTransactionEncoding, -}; -use sqlx::{Pool, Postgres}; +use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature}; +use solana_transaction_status::{EncodedConfirmedTransactionWithStatusMeta, UiTransactionEncoding}; use std::{str::FromStr, time::Duration}; use tokio::sync::mpsc::Sender; use crate::utils::Config; -use super::parsing::{parse_fill_events_from_txns, FillEventLog}; +use super::parsing::{parse_trades_from_openbook_txns, OpenBookFillEventLog}; -pub async fn scrape(config: &Config, fill_event_sender: Sender) { - let url = &config.rpc_http_url; +// use serde::{Deserialize, Serialize}; + +pub async fn scrape(config: &Config, fill_sender: Sender) { + let url = &config.rpc_url; let rpc_client = RpcClient::new_with_commitment(url, CommitmentConfig::processed()); - let openbook_key = Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(); + fetch_market(&rpc_client).await; - // let start_slot = ; //set the start point at 3 months from now (config above) + let before_slot = None; loop { - let config = GetConfirmedSignaturesForAddress2Config { - before: None, - until: None, - limit: Some(150), // TODO: None - commitment: Some(CommitmentConfig::confirmed()), - }; - - let mut sigs = rpc_client - .get_signatures_for_address_with_config(&openbook_key, config) - .unwrap(); - - sigs.retain(|sig| sig.err.is_none()); - - let txn_config = RpcTransactionConfig { - encoding: Some(UiTransactionEncoding::Json), - commitment: Some(CommitmentConfig::confirmed()), - max_supported_transaction_version: Some(0), - }; - - let mut txns = sigs - .into_iter() - .map(|sig| { - rpc_client.get_transaction_with_config( - &sig.signature.parse::().unwrap(), - txn_config, - ) - }) - .collect::>>(); // TODO: am I actually getting all the txns? - - let fill_events = parse_fill_events_from_txns(&mut txns); - if fill_events.len() > 0 { - for event in fill_events.into_iter() { - if let Err(_) = fill_event_sender.send(event).await { - println!("receiver dropped"); - return; - } - } - } + scrape_transactions(&rpc_client, before_slot, &fill_sender).await; print!("Ding fires are done \n\n"); tokio::time::sleep(Duration::from_millis(500)).await; - - // increment slot somehow (or move forward in time or something) } } + +pub async fn backfill(config: &Config, fill_sender: Sender) { + let url = &config.rpc_url; + let rpc_client = RpcClient::new_with_commitment(url, CommitmentConfig::processed()); + + let mut before_slot: Option = None; + + loop { + let last_sig = scrape_transactions(&rpc_client, before_slot, &fill_sender).await; + + match rpc_client.get_transaction(&last_sig, UiTransactionEncoding::Json) { + Ok(txn) => { + let unix_sig_time = rpc_client.get_block_time(txn.slot).unwrap(); + if unix_sig_time > 0 { + // TODO: is 3 months in past + break; + } + println!("backfill at {}", unix_sig_time); + } + Err(_) => continue, + } + before_slot = Some(last_sig); + } + + print!("Backfill complete \n"); +} + +pub async fn scrape_transactions( + rpc_client: &RpcClient, + before_slot: Option, + fill_sender: &Sender, +) -> Signature { + let rpc_config = GetConfirmedSignaturesForAddress2Config { + before: before_slot, + until: None, + limit: Some(150), + commitment: Some(CommitmentConfig::confirmed()), + }; + + let mut sigs = match rpc_client.get_signatures_for_address_with_config( + &Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(), + rpc_config, + ) { + Ok(s) => s, + Err(_) => return before_slot.unwrap(), + }; + + sigs.retain(|sig| sig.err.is_none()); + let last_sig = sigs.last().unwrap().clone(); + + let txn_config = RpcTransactionConfig { + encoding: Some(UiTransactionEncoding::Json), + commitment: Some(CommitmentConfig::confirmed()), + max_supported_transaction_version: Some(0), + }; + + let mut txns = sigs + .into_iter() + .map(|sig| { + rpc_client.get_transaction_with_config( + &sig.signature.parse::().unwrap(), + txn_config, + ) + }) + .collect::>>(); // TODO: am I actually getting all the txns? + + let fills = parse_trades_from_openbook_txns(&mut txns); + if fills.len() > 0 { + for fill in fills.into_iter() { + if let Err(_) = fill_sender.send(fill).await { + panic!("receiver dropped"); + } + } + } + + Signature::from_str(&last_sig.signature).unwrap() +} + +async fn fetch_market(rpc_client: &RpcClient) { + let data = rpc_client + .get_account_data( + &Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(), + ) + .unwrap(); + + println!("{}", data.len()); + + // simply the market object in TS +} diff --git a/candle-creator/src/utils/mod.rs b/candle-creator/src/utils/mod.rs index a3db9ef..14490ed 100644 --- a/candle-creator/src/utils/mod.rs +++ b/candle-creator/src/utils/mod.rs @@ -1,4 +1,5 @@ use serde_derive::Deserialize; +use std::{fs::File, io::Read}; pub trait AnyhowWrap { type Value; @@ -14,16 +15,10 @@ impl AnyhowWrap for Result { #[derive(Clone, Debug, Deserialize)] pub struct Config { - pub rpc_ws_url: String, - pub rpc_http_url: String, - pub database_config: DatabaseConfig, - pub markets: Vec, -} - -#[derive(Clone, Debug, Deserialize)] -pub struct DatabaseConfig { - pub connection_string: String, + pub rpc_url: String, + pub database_url: String, pub max_pg_pool_connections: u32, + pub markets: Vec, } #[derive(Clone, Debug, Deserialize)] @@ -31,3 +26,8 @@ pub struct MarketConfig { pub name: String, pub market: String, } + +pub fn load_markets(path: &str) -> Vec { + let reader = File::open(path).unwrap(); + serde_json::from_reader(reader).unwrap() +} diff --git a/markets.json b/markets.json new file mode 100644 index 0000000..c0656f8 --- /dev/null +++ b/markets.json @@ -0,0 +1,6 @@ +[ + { + "name" : "SOL/USDC", + "market" : "8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6" + } +] \ No newline at end of file diff --git a/server/Cargo.lock b/server/Cargo.lock index 8fbc9fe..e84e90d 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -236,6 +236,12 @@ dependencies = [ "alloc-no-stdlib", ] +[[package]] +name = "anyhow" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "224afbd727c3d6e4b90103ece64b8d1b67fbb1973b1046c2281eed3f3803f800" + [[package]] name = "atoi" version = "1.0.0" @@ -480,6 +486,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "dotenv" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" + [[package]] name = "dotenvy" version = "0.15.6" @@ -501,6 +513,19 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "env_logger" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85cdab6a89accf66733ad5a1693a4dcced6aeff64602b634530dd73c1f3ee9f0" +dependencies = [ + "humantime", + "is-terminal", + "log", + "regex", + "termcolor", +] + [[package]] name = "errno" version = "0.2.8" @@ -705,6 +730,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hermit-abi" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286" + [[package]] name = "hex" version = "0.4.3" @@ -752,6 +783,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "idna" version = "0.3.0" @@ -791,6 +828,18 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "is-terminal" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b6b32576413a8e69b90e952e4a026476040d81017b80445deda5f2d3921857" +dependencies = [ + "hermit-abi 0.3.1", + "io-lifetimes", + "rustix", + "windows-sys 0.45.0", +] + [[package]] name = "itertools" version = "0.10.5" @@ -976,7 +1025,7 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b" dependencies = [ - "hermit-abi", + "hermit-abi 0.2.6", "libc", ] @@ -991,6 +1040,9 @@ name = "openbook-candle-server" version = "0.1.0" dependencies = [ "actix-web", + "anyhow", + "dotenv", + "env_logger", "sqlx", ] @@ -1522,6 +1574,15 @@ dependencies = [ "windows-sys 0.42.0", ] +[[package]] +name = "termcolor" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6" +dependencies = [ + "winapi-util", +] + [[package]] name = "thiserror" version = "1.0.39" @@ -1817,6 +1878,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/server/Cargo.toml b/server/Cargo.toml index 38a8df9..5be1d8b 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -9,3 +9,5 @@ edition = "2021" anyhow = "1.0" actix-web = "4.3.1" sqlx = { version = "0.6", features = [ "runtime-tokio-native-tls" , "postgres" ] } +env_logger = "0.10.0" +dotenv = "0.15.0" diff --git a/server/src/main.rs b/server/src/main.rs index 73148f8..1686811 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,28 +1,63 @@ -use actix_web::{get, post, web, App, HttpResponse, HttpServer, Responder, middleware::Logger}; +use actix_web::{ + get, + middleware::Logger, + web::{self, Data}, + App, HttpResponse, HttpServer, Responder, +}; +use dotenv; +use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; #[get("/")] async fn hello() -> impl Responder { HttpResponse::Ok().body("Hello world!") } -#[post("/echo")] -async fn echo(req_body: String) -> impl Responder { - HttpResponse::Ok().body(req_body) +#[get("/trade-count")] +async fn get_total_trades(pool_data: web::Data>) -> impl Responder { + let pool = pool_data.get_ref(); + let total_query = sqlx::query!("Select COUNT(*) as total from fills") + .fetch_one(pool) + .await + .unwrap(); + let total_trades: i64 = total_query.total.unwrap_or_else(|| 0); + HttpResponse::Ok().json(total_trades) } +// #[get("/recent-trades")] +// async fn get_recent_trades(pool_data: web::Data>, market: String) -> impl Responder { +// let pool = pool_data.get_ref(); +// let trades_query= sqlx::query!("Select * as total from fills").fetch_one(pool).await.unwrap(); +// let total_trades: i64 = total_query.total.unwrap_or_else(|| 0); +// HttpResponse::Ok().json(total_trades) +// } + async fn manual_hello() -> impl Responder { HttpResponse::Ok().body("Hey there!") } #[actix_web::main] -async fn main() -> anyhow::Result<()> { +async fn main() -> anyhow::Result<(), std::io::Error> { + dotenv::dotenv().ok(); + env_logger::init(); + let database_url = dotenv::var("DATABASE_URL").unwrap(); - HttpServer::new(|| { + // let context = Data::new(Context { + // markets: utils::markets::load_markets(markets_path), + // pool, + // }); + + let pool = PgPoolOptions::new() + .max_connections(15) + .connect(&database_url) + .await + .unwrap(); + + HttpServer::new(move || { App::new() .wrap(Logger::default()) - .service(hello) - .service(echo) + .app_data(Data::new(pool.clone())) + .service(get_total_trades) .route("/hey", web::get().to(manual_hello)) }) .bind(("127.0.0.1", 8080))?