diff --git a/src/candle_creation/candle_batching/minute_candles.rs b/src/candle_creation/candle_batching/minute_candles.rs index c45de28..00263e8 100644 --- a/src/candle_creation/candle_batching/minute_candles.rs +++ b/src/candle_creation/candle_batching/minute_candles.rs @@ -80,12 +80,12 @@ fn combine_fills_into_1m_candles( let mut last_price = match maybe_last_price { Some(p) => p, - None => { + None => { let first = fills_iter.peek().clone().unwrap(); let (price, _) = calculate_fill_price_and_size(**first, market.base_decimals, market.quote_decimals); price - } + } }; for i in 0..candles.len() { diff --git a/src/candle_creation/main.rs b/src/candle_creation/main.rs index a41b362..984c45a 100644 --- a/src/candle_creation/main.rs +++ b/src/candle_creation/main.rs @@ -1,9 +1,6 @@ use dotenv; use openbook_candles::candle_creation::candle_batching::batch_candles; -use openbook_candles::candle_creation::trade_fetching::{ - backfill::backfill, - scrape::{fetch_market_infos, scrape}, -}; +use openbook_candles::candle_creation::trade_fetching::scrape::{fetch_market_infos, scrape}; use openbook_candles::database::{ initialize::{connect_to_database, setup_database}, insert::{persist_candles, persist_fill_events}, @@ -43,42 +40,32 @@ async fn main() -> anyhow::Result<()> { let pool = connect_to_database(&config).await?; setup_database(&pool).await?; + let mut handles = vec![]; let (fill_sender, fill_receiver) = mpsc::channel::(1000); - // let bf_sender = fill_sender.clone(); - // let targets = target_markets.clone(); - // tokio::spawn(async move { - // backfill(&rpc_url.clone(), &bf_sender, &targets).await; - // }); - - tokio::spawn(async move { + handles.push(tokio::spawn(async move { scrape(&config, &fill_sender, &target_markets).await; //TODO: send the vec, it's okay - }); + })); let fills_pool = pool.clone(); - tokio::spawn(async move { + handles.push(tokio::spawn(async move { persist_fill_events(&fills_pool, fill_receiver).await; - }); + })); - // let (candle_sender, candle_receiver) = mpsc::channel::>(1000); + let (candle_sender, candle_receiver) = mpsc::channel::>(1000); - // let batch_pool = pool.clone(); - // tokio::spawn(async move { - // batch_candles(batch_pool, &candle_sender, market_infos).await; - // }); + let batch_pool = pool.clone(); + handles.push(tokio::spawn(async move { + batch_candles(batch_pool, &candle_sender, market_infos).await; + })); - // let persist_pool = pool.clone(); - // // tokio::spawn(async move { - // persist_candles(persist_pool, candle_receiver).await; - // // }); + let persist_pool = pool.clone(); + handles.push(tokio::spawn(async move { + persist_candles(persist_pool, candle_receiver).await; + })); - loop {} // tokio drop if one thread drops or something + futures::future::join_all(handles).await; Ok(()) } - -// use getconfirmedsignaturesforaddres2 to scan txns -// find filleventlog events -// parse trade data -// persist the last 3 months on differnet timescales diff --git a/src/candle_creation/trade_fetching/backfill.rs b/src/candle_creation/trade_fetching/backfill.rs deleted file mode 100644 index 634ced8..0000000 --- a/src/candle_creation/trade_fetching/backfill.rs +++ /dev/null @@ -1,70 +0,0 @@ -use chrono::{DateTime, Duration, NaiveDateTime, Utc}; -use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcTransactionConfig}; -use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature}; -use solana_transaction_status::UiTransactionEncoding; -use std::collections::HashMap; -use tokio::sync::mpsc::Sender; - -use crate::{ - candle_creation::trade_fetching::scrape::scrape_transactions, - structs::openbook::OpenBookFillEventLog, -}; - -pub async fn backfill( - rpc_url: String, - fill_sender: &Sender, - target_markets: &HashMap, -) { - let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::processed()); - - println!("backfill started"); - let mut before_slot: Option = None; - let end_time = (Utc::now() - Duration::days(1)).timestamp(); - loop { - let last_sig_option = - scrape_transactions(&rpc_client, before_slot, None, fill_sender, target_markets).await; - - if last_sig_option.is_none() { - println!("last sig is none"); - continue; - } - - let txn_config = RpcTransactionConfig { - encoding: Some(UiTransactionEncoding::Base64), - commitment: Some(CommitmentConfig::confirmed()), - max_supported_transaction_version: Some(0), - }; - match rpc_client - .get_transaction_with_config(&last_sig_option.unwrap(), txn_config) - .await - { - Ok(txn) => { - let unix_sig_time = rpc_client.get_block_time(txn.slot).await.unwrap(); - if unix_sig_time < end_time { - break; - } - let time_left = backfill_time_left(unix_sig_time, end_time); - println!( - "{} minutes ~ {} days remaining in the backfill\n", - time_left.num_minutes(), - time_left.num_days() - ); - } - Err(e) => { - println!("error: {:?}", e); - continue; - } - } - before_slot = last_sig_option; - } - - print!("Backfill complete \n"); -} - -fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration { - let naive_cur = NaiveDateTime::from_timestamp_millis(current_time * 1000).unwrap(); - let naive_bf = NaiveDateTime::from_timestamp_millis(backfill_end * 1000).unwrap(); - let cur_date = DateTime::::from_utc(naive_cur, Utc); - let bf_date = DateTime::::from_utc(naive_bf, Utc); - cur_date - bf_date -} diff --git a/src/candle_creation/trade_fetching/mod.rs b/src/candle_creation/trade_fetching/mod.rs index fc1f1bc..4e93e83 100644 --- a/src/candle_creation/trade_fetching/mod.rs +++ b/src/candle_creation/trade_fetching/mod.rs @@ -1,4 +1,2 @@ -pub mod backfill; pub mod parsing; pub mod scrape; -pub mod websocket; diff --git a/src/candle_creation/trade_fetching/parsing.rs b/src/candle_creation/trade_fetching/parsing.rs index 8043179..4ac699b 100644 --- a/src/candle_creation/trade_fetching/parsing.rs +++ b/src/candle_creation/trade_fetching/parsing.rs @@ -1,12 +1,10 @@ use solana_client::client_error::Result as ClientResult; +use solana_sdk::pubkey::Pubkey; use solana_transaction_status::{ option_serializer::OptionSerializer, EncodedConfirmedTransactionWithStatusMeta, }; use std::{collections::HashMap, io::Error}; -use anchor_lang::{event, AnchorDeserialize, AnchorSerialize}; -use solana_sdk::pubkey::Pubkey; - use crate::structs::openbook::OpenBookFillEventLog; const PROGRAM_DATA: &str = "Program data: "; diff --git a/src/candle_creation/trade_fetching/scrape.rs b/src/candle_creation/trade_fetching/scrape.rs index fc174f1..78cb49b 100644 --- a/src/candle_creation/trade_fetching/scrape.rs +++ b/src/candle_creation/trade_fetching/scrape.rs @@ -2,7 +2,6 @@ use anchor_lang::AnchorDeserialize; use futures::future::join_all; use solana_account_decoder::UiAccountEncoding; use solana_client::{ - client_error::Result as ClientResult, nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, rpc_config::{RpcAccountInfoConfig, RpcTransactionConfig}, @@ -10,7 +9,7 @@ use solana_client::{ use solana_sdk::{ commitment_config::CommitmentConfig, program_pack::Pack, pubkey::Pubkey, signature::Signature, }; -use solana_transaction_status::{EncodedConfirmedTransactionWithStatusMeta, UiTransactionEncoding}; +use solana_transaction_status::UiTransactionEncoding; use spl_token::state::Mint; use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; use tokio::sync::mpsc::Sender; diff --git a/src/candle_creation/trade_fetching/websocket.rs b/src/candle_creation/trade_fetching/websocket.rs deleted file mode 100644 index 2030c35..0000000 --- a/src/candle_creation/trade_fetching/websocket.rs +++ /dev/null @@ -1,85 +0,0 @@ -// use jsonrpc_core_client::transports::ws; - -// use anchor_client::{ -// anchor_lang::{self, event, AnchorDeserialize, AnchorSerialize, Discriminator}, -// ClientError as AnchorClientError, Cluster, -// }; -// use log::*; -// use solana_account_decoder::UiAccountEncoding; -// use solana_client::{ -// pubsub_client::{PubsubClient, PubsubClientSubscription}, -// rpc_config::{ -// RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcTransactionLogsConfig, -// RpcTransactionLogsFilter, -// }, -// rpc_response::{Response, RpcKeyedAccount, RpcLogsResponse}, -// }; -// use solana_rpc::rpc_pubsub::RpcSolPubSubClient; -// use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Keypair}; -// use std::{io::Error, rc::Rc, str::FromStr, time::Duration}; - -// use crate::utils::AnyhowWrap; -// use crate::{ -// database::initialize::{connect_to_database, setup_database}, -// utils::Config, -// }; - -// use super::parsing::parse_and_save_logs; - -// const PROGRAM_LOG: &str = "Program log: "; - -// pub async fn listen_logs(config: Config) -> anyhow::Result<()> { -// let ws_url = config.rpc_ws_url; - -// let transaction_logs_config = RpcTransactionLogsConfig { -// commitment: Some(CommitmentConfig::confirmed()), -// }; - -// let transaction_logs_filter = RpcTransactionLogsFilter::Mentions(vec![String::from( -// "srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX", -// )]); - -// let (_log_sub, log_receiver) = -// PubsubClient::logs_subscribe(&ws_url, transaction_logs_filter, transaction_logs_config)?; - -// loop { -// let response = log_receiver.recv().map_err_anyhow()?; // TODO: what to do if disconnects -// if response.value.err.is_none() { -// parse_and_save_logs(&response.value.logs); -// } -// } -// } - -pub async fn listen_program_accounts() { - // let payer = Rc::new(Keypair::new()); - - // let connect = ws::try_connect::(&ws_url).map_err_anyhow()?; - - // let client = connect.await.map_err_anyhow()?; - // let openbook_key = Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(); - - // let cluster = Cluster::Custom(rpc_url.to_string(), ws_url.to_string()); - - // let client = AnchorClient::new_with_options(cluster, payer, CommitmentConfig::confirmed()); - - // let dex_program = client.program(openbook_key); - - // let account_info_config = RpcAccountInfoConfig { - // encoding: Some(UiAccountEncoding::Base64), - // data_slice: None, - // commitment: Some(CommitmentConfig::processed()), - // min_context_slot: None, - // }; - - // let program_accounts_config = RpcProgramAccountsConfig { - // filters: None, // TODO: add filters for markets we care about - // with_context: Some(true), - // account_config: account_info_config.clone(), - // }; - - // let (_program_sub, prog_receiver) = PubsubClient::program_subscribe( - // &ws_url, - // &openbook_key, - // Some(program_accounts_config) - // )?; -} diff --git a/src/server/candles.rs b/src/server/candles.rs index 1773c2e..cb24d2f 100644 --- a/src/server/candles.rs +++ b/src/server/candles.rs @@ -1,10 +1,6 @@ use openbook_candles::{ database::fetch::fetch_tradingview_candles, - structs::{ - markets::{valid_market, MarketInfo}, - resolution::Resolution, - tradingview::TvResponse, - }, + structs::{markets::valid_market, resolution::Resolution, tradingview::TvResponse}, utils::{to_timestampz, WebContext}, };