remove unused backfill and websocket files

This commit is contained in:
dboures 2023-03-14 00:34:23 -05:00
parent c5d3a1f4ab
commit 6daf18bbe6
No known key found for this signature in database
GPG Key ID: AB3790129D478852
8 changed files with 21 additions and 198 deletions

View File

@ -80,12 +80,12 @@ fn combine_fills_into_1m_candles(
let mut last_price = match maybe_last_price {
Some(p) => p,
None => {
None => {
let first = fills_iter.peek().clone().unwrap();
let (price, _) =
calculate_fill_price_and_size(**first, market.base_decimals, market.quote_decimals);
price
}
}
};
for i in 0..candles.len() {

View File

@ -1,9 +1,6 @@
use dotenv;
use openbook_candles::candle_creation::candle_batching::batch_candles;
use openbook_candles::candle_creation::trade_fetching::{
backfill::backfill,
scrape::{fetch_market_infos, scrape},
};
use openbook_candles::candle_creation::trade_fetching::scrape::{fetch_market_infos, scrape};
use openbook_candles::database::{
initialize::{connect_to_database, setup_database},
insert::{persist_candles, persist_fill_events},
@ -43,42 +40,32 @@ async fn main() -> anyhow::Result<()> {
let pool = connect_to_database(&config).await?;
setup_database(&pool).await?;
let mut handles = vec![];
let (fill_sender, fill_receiver) = mpsc::channel::<OpenBookFillEventLog>(1000);
// let bf_sender = fill_sender.clone();
// let targets = target_markets.clone();
// tokio::spawn(async move {
// backfill(&rpc_url.clone(), &bf_sender, &targets).await;
// });
tokio::spawn(async move {
handles.push(tokio::spawn(async move {
scrape(&config, &fill_sender, &target_markets).await; //TODO: send the vec, it's okay
});
}));
let fills_pool = pool.clone();
tokio::spawn(async move {
handles.push(tokio::spawn(async move {
persist_fill_events(&fills_pool, fill_receiver).await;
});
}));
// let (candle_sender, candle_receiver) = mpsc::channel::<Vec<Candle>>(1000);
let (candle_sender, candle_receiver) = mpsc::channel::<Vec<Candle>>(1000);
// let batch_pool = pool.clone();
// tokio::spawn(async move {
// batch_candles(batch_pool, &candle_sender, market_infos).await;
// });
let batch_pool = pool.clone();
handles.push(tokio::spawn(async move {
batch_candles(batch_pool, &candle_sender, market_infos).await;
}));
// let persist_pool = pool.clone();
// // tokio::spawn(async move {
// persist_candles(persist_pool, candle_receiver).await;
// // });
let persist_pool = pool.clone();
handles.push(tokio::spawn(async move {
persist_candles(persist_pool, candle_receiver).await;
}));
loop {} // tokio drop if one thread drops or something
futures::future::join_all(handles).await;
Ok(())
}
// use getconfirmedsignaturesforaddres2 to scan txns
// find filleventlog events
// parse trade data
// persist the last 3 months on differnet timescales

View File

@ -1,70 +0,0 @@
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcTransactionConfig};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature};
use solana_transaction_status::UiTransactionEncoding;
use std::collections::HashMap;
use tokio::sync::mpsc::Sender;
use crate::{
candle_creation::trade_fetching::scrape::scrape_transactions,
structs::openbook::OpenBookFillEventLog,
};
pub async fn backfill(
rpc_url: String,
fill_sender: &Sender<OpenBookFillEventLog>,
target_markets: &HashMap<Pubkey, u8>,
) {
let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::processed());
println!("backfill started");
let mut before_slot: Option<Signature> = None;
let end_time = (Utc::now() - Duration::days(1)).timestamp();
loop {
let last_sig_option =
scrape_transactions(&rpc_client, before_slot, None, fill_sender, target_markets).await;
if last_sig_option.is_none() {
println!("last sig is none");
continue;
}
let txn_config = RpcTransactionConfig {
encoding: Some(UiTransactionEncoding::Base64),
commitment: Some(CommitmentConfig::confirmed()),
max_supported_transaction_version: Some(0),
};
match rpc_client
.get_transaction_with_config(&last_sig_option.unwrap(), txn_config)
.await
{
Ok(txn) => {
let unix_sig_time = rpc_client.get_block_time(txn.slot).await.unwrap();
if unix_sig_time < end_time {
break;
}
let time_left = backfill_time_left(unix_sig_time, end_time);
println!(
"{} minutes ~ {} days remaining in the backfill\n",
time_left.num_minutes(),
time_left.num_days()
);
}
Err(e) => {
println!("error: {:?}", e);
continue;
}
}
before_slot = last_sig_option;
}
print!("Backfill complete \n");
}
fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration {
let naive_cur = NaiveDateTime::from_timestamp_millis(current_time * 1000).unwrap();
let naive_bf = NaiveDateTime::from_timestamp_millis(backfill_end * 1000).unwrap();
let cur_date = DateTime::<Utc>::from_utc(naive_cur, Utc);
let bf_date = DateTime::<Utc>::from_utc(naive_bf, Utc);
cur_date - bf_date
}

View File

@ -1,4 +1,2 @@
pub mod backfill;
pub mod parsing;
pub mod scrape;
pub mod websocket;

View File

@ -1,12 +1,10 @@
use solana_client::client_error::Result as ClientResult;
use solana_sdk::pubkey::Pubkey;
use solana_transaction_status::{
option_serializer::OptionSerializer, EncodedConfirmedTransactionWithStatusMeta,
};
use std::{collections::HashMap, io::Error};
use anchor_lang::{event, AnchorDeserialize, AnchorSerialize};
use solana_sdk::pubkey::Pubkey;
use crate::structs::openbook::OpenBookFillEventLog;
const PROGRAM_DATA: &str = "Program data: ";

View File

@ -2,7 +2,6 @@ use anchor_lang::AnchorDeserialize;
use futures::future::join_all;
use solana_account_decoder::UiAccountEncoding;
use solana_client::{
client_error::Result as ClientResult,
nonblocking::rpc_client::RpcClient,
rpc_client::GetConfirmedSignaturesForAddress2Config,
rpc_config::{RpcAccountInfoConfig, RpcTransactionConfig},
@ -10,7 +9,7 @@ use solana_client::{
use solana_sdk::{
commitment_config::CommitmentConfig, program_pack::Pack, pubkey::Pubkey, signature::Signature,
};
use solana_transaction_status::{EncodedConfirmedTransactionWithStatusMeta, UiTransactionEncoding};
use solana_transaction_status::UiTransactionEncoding;
use spl_token::state::Mint;
use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
use tokio::sync::mpsc::Sender;

View File

@ -1,85 +0,0 @@
// use jsonrpc_core_client::transports::ws;
// use anchor_client::{
// anchor_lang::{self, event, AnchorDeserialize, AnchorSerialize, Discriminator},
// ClientError as AnchorClientError, Cluster,
// };
// use log::*;
// use solana_account_decoder::UiAccountEncoding;
// use solana_client::{
// pubsub_client::{PubsubClient, PubsubClientSubscription},
// rpc_config::{
// RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcTransactionLogsConfig,
// RpcTransactionLogsFilter,
// },
// rpc_response::{Response, RpcKeyedAccount, RpcLogsResponse},
// };
// use solana_rpc::rpc_pubsub::RpcSolPubSubClient;
// use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Keypair};
// use std::{io::Error, rc::Rc, str::FromStr, time::Duration};
// use crate::utils::AnyhowWrap;
// use crate::{
// database::initialize::{connect_to_database, setup_database},
// utils::Config,
// };
// use super::parsing::parse_and_save_logs;
// const PROGRAM_LOG: &str = "Program log: ";
// pub async fn listen_logs(config: Config) -> anyhow::Result<()> {
// let ws_url = config.rpc_ws_url;
// let transaction_logs_config = RpcTransactionLogsConfig {
// commitment: Some(CommitmentConfig::confirmed()),
// };
// let transaction_logs_filter = RpcTransactionLogsFilter::Mentions(vec![String::from(
// "srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX",
// )]);
// let (_log_sub, log_receiver) =
// PubsubClient::logs_subscribe(&ws_url, transaction_logs_filter, transaction_logs_config)?;
// loop {
// let response = log_receiver.recv().map_err_anyhow()?; // TODO: what to do if disconnects
// if response.value.err.is_none() {
// parse_and_save_logs(&response.value.logs);
// }
// }
// }
pub async fn listen_program_accounts() {
// let payer = Rc::new(Keypair::new());
// let connect = ws::try_connect::<RpcSolPubSubClient>(&ws_url).map_err_anyhow()?;
// let client = connect.await.map_err_anyhow()?;
// let openbook_key = Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap();
// let cluster = Cluster::Custom(rpc_url.to_string(), ws_url.to_string());
// let client = AnchorClient::new_with_options(cluster, payer, CommitmentConfig::confirmed());
// let dex_program = client.program(openbook_key);
// let account_info_config = RpcAccountInfoConfig {
// encoding: Some(UiAccountEncoding::Base64),
// data_slice: None,
// commitment: Some(CommitmentConfig::processed()),
// min_context_slot: None,
// };
// let program_accounts_config = RpcProgramAccountsConfig {
// filters: None, // TODO: add filters for markets we care about
// with_context: Some(true),
// account_config: account_info_config.clone(),
// };
// let (_program_sub, prog_receiver) = PubsubClient::program_subscribe(
// &ws_url,
// &openbook_key,
// Some(program_accounts_config)
// )?;
}

View File

@ -1,10 +1,6 @@
use openbook_candles::{
database::fetch::fetch_tradingview_candles,
structs::{
markets::{valid_market, MarketInfo},
resolution::Resolution,
tradingview::TvResponse,
},
structs::{markets::valid_market, resolution::Resolution, tradingview::TvResponse},
utils::{to_timestampz, WebContext},
};