diff --git a/src/backfill-candles/main.rs b/src/backfill-candles/main.rs index 6251a9a..0321d21 100644 --- a/src/backfill-candles/main.rs +++ b/src/backfill-candles/main.rs @@ -1,16 +1,15 @@ -use anchor_lang::prelude::Pubkey; -use chrono::{DateTime, Duration, NaiveDateTime, Utc}; + + use deadpool_postgres::Object; -use futures::future::join_all; + use openbook_candles::{ database::{ initialize::connect_to_database, - insert::{build_candles_upsert_statement, persist_candles}, + insert::{build_candles_upsert_statement}, }, structs::{ candle::Candle, markets::{fetch_market_infos, load_markets}, - openbook::OpenBookFillEvent, resolution::Resolution, }, utils::{AnyhowWrap, Config}, @@ -19,9 +18,9 @@ use openbook_candles::{ minute_candles::backfill_batch_1m_candles, }, }; -use std::{collections::HashMap, env, str::FromStr}; +use std::{env}; use strum::IntoEnumIterator; -use tokio::sync::mpsc::{self, Sender}; + #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() -> anyhow::Result<()> { @@ -35,7 +34,7 @@ async fn main() -> anyhow::Result<()> { let config = Config { rpc_url: rpc_url.clone(), }; - let markets = load_markets(&path_to_markets_json); + let markets = load_markets(path_to_markets_json); let market_infos = fetch_market_infos(&config, markets.clone()).await?; println!("Backfilling candles for {:?}", markets); @@ -59,7 +58,7 @@ async fn main() -> anyhow::Result<()> { } async fn save_candles(candles: Vec, client: Object) -> anyhow::Result<()> { - if candles.len() > 0 { + if !candles.is_empty() { let upsert_statement = build_candles_upsert_statement(candles); client .execute(&upsert_statement, &[]) diff --git a/src/backfill-trades/main.rs b/src/backfill-trades/main.rs index ec9534d..23a0fb4 100644 --- a/src/backfill-trades/main.rs +++ b/src/backfill-trades/main.rs @@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> { let config = Config { rpc_url: rpc_url.clone(), }; - let markets = load_markets(&path_to_markets_json); + let markets = load_markets(path_to_markets_json); let market_infos = fetch_market_infos(&config, markets.clone()).await?; let mut target_markets = HashMap::new(); for m in market_infos.clone() { @@ -128,17 +128,17 @@ pub async fn get_signatures( } }; - if sigs.len() == 0 { + if sigs.is_empty() { println!("No signatures found"); return None; } let last = sigs.last().unwrap(); // println!("{:?}", last.block_time.unwrap()); - return Some(( + Some(( Signature::from_str(&last.signature).unwrap(), last.block_time.unwrap(), sigs, - )); + )) } pub async fn get_transactions( @@ -165,13 +165,13 @@ pub async fn get_transactions( let txn_futs: Vec<_> = signatures .iter() - .map(|s| rpc_client.get_transaction_with_config(&s, txn_config)) + .map(|s| rpc_client.get_transaction_with_config(s, txn_config)) .collect(); let mut txns = join_all(txn_futs).await; let fills = parse_trades_from_openbook_txns(&mut txns, target_markets); - if fills.len() > 0 { + if !fills.is_empty() { for fill in fills.into_iter() { // println!("Sending fill {:?}", fill); if let Err(_) = fill_sender.send(fill).await { diff --git a/src/database/fetch.rs b/src/database/fetch.rs index 1fe3f6e..5ea50ef 100644 --- a/src/database/fetch.rs +++ b/src/database/fetch.rs @@ -69,7 +69,7 @@ pub async fn fetch_fills_from( .await?; Ok(rows .into_iter() - .map(|r| PgOpenBookFill::from_row(r)) + .map(PgOpenBookFill::from_row) .collect()) } @@ -144,7 +144,7 @@ pub async fn fetch_earliest_candles( .query(&stmt, &[&market_name, &resolution.to_string()]) .await?; - Ok(rows.into_iter().map(|r| Candle::from_row(r)).collect()) + Ok(rows.into_iter().map(Candle::from_row).collect()) } pub async fn fetch_candles_from( @@ -190,7 +190,7 @@ pub async fn fetch_candles_from( ) .await?; - Ok(rows.into_iter().map(|r| Candle::from_row(r)).collect()) + Ok(rows.into_iter().map(Candle::from_row).collect()) } pub async fn fetch_top_traders_by_base_volume_from( @@ -229,7 +229,7 @@ pub async fn fetch_top_traders_by_base_volume_from( .query(&stmt, &[&market_address_string, &start_time, &end_time]) .await?; - Ok(rows.into_iter().map(|r| PgTrader::from_row(r)).collect()) + Ok(rows.into_iter().map(PgTrader::from_row).collect()) } pub async fn fetch_top_traders_by_quote_volume_from( @@ -268,7 +268,7 @@ pub async fn fetch_top_traders_by_quote_volume_from( .query(&stmt, &[&market_address_string, &start_time, &end_time]) .await?; - Ok(rows.into_iter().map(|r| PgTrader::from_row(r)).collect()) + Ok(rows.into_iter().map(PgTrader::from_row).collect()) } pub async fn fetch_coingecko_24h_volume( @@ -292,7 +292,7 @@ pub async fn fetch_coingecko_24h_volume( Ok(rows .into_iter() - .map(|r| PgCoinGecko24HourVolume::from_row(r)) + .map(PgCoinGecko24HourVolume::from_row) .collect()) } @@ -334,6 +334,6 @@ pub async fn fetch_coingecko_24h_high_low( Ok(rows .into_iter() - .map(|r| PgCoinGecko24HighLow::from_row(r)) + .map(PgCoinGecko24HighLow::from_row) .collect()) } diff --git a/src/database/initialize.rs b/src/database/initialize.rs index 65ab604..d26bc42 100644 --- a/src/database/initialize.rs +++ b/src/database/initialize.rs @@ -25,10 +25,10 @@ pub async fn connect_to_database() -> anyhow::Result { // fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills let tls = if pg_config.pg_use_ssl { pg_config.pg.ssl_mode = Some(SslMode::Require); - let ca_cert = fs::read(&pg_config.pg_ca_cert_path.expect("reading ca cert from env")) + let ca_cert = fs::read(pg_config.pg_ca_cert_path.expect("reading ca cert from env")) .expect("reading ca cert from file"); let client_key = fs::read( - &pg_config + pg_config .pg_client_key_path .expect("reading client key from env"), ) diff --git a/src/database/insert.rs b/src/database/insert.rs index cab9a10..d777f96 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -20,12 +20,10 @@ pub async fn persist_fill_events( while write_batch.len() < 10 { match fill_receiver.try_recv() { Ok(event) => { - if !write_batch.contains_key(&event) { - write_batch.insert(event, 0); - } + write_batch.entry(event).or_insert(0); } Err(TryRecvError::Empty) => { - if write_batch.len() > 0 { + if !write_batch.is_empty() { break; } else { continue; @@ -37,7 +35,7 @@ pub async fn persist_fill_events( }; } - if write_batch.len() > 0 { + if !write_batch.is_empty() { // print!("writing: {:?} events to DB\n", write_batch.len()); // match conn.ping().await { @@ -68,7 +66,7 @@ pub async fn persist_candles( // Ok(_) => { match candles_receiver.try_recv() { Ok(candles) => { - if candles.len() == 0 { + if candles.is_empty() { continue; } // print!("writing: {:?} candles to DB\n", candles.len()); diff --git a/src/server/coingecko.rs b/src/server/coingecko.rs index 12149da..a507c7e 100644 --- a/src/server/coingecko.rs +++ b/src/server/coingecko.rs @@ -71,7 +71,7 @@ pub async fn tickers(context: web::Data) -> Result = raw_volumes .into_iter() - .map(|v| v.convert_to_readable(&markets)) + .map(|v| v.convert_to_readable(markets)) .collect(); let tickers = markets .iter() diff --git a/src/server/main.rs b/src/server/main.rs index 3e71738..d319727 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -4,7 +4,7 @@ use actix_web::{ App, HttpServer, }; use candles::get_candles; -use dotenv; + use markets::get_markets; use openbook_candles::{ database::initialize::connect_to_database, diff --git a/src/server/traders.rs b/src/server/traders.rs index ad0a269..787aced 100644 --- a/src/server/traders.rs +++ b/src/server/traders.rs @@ -51,7 +51,7 @@ pub async fn get_top_traders_by_base_volume( let response = TraderResponse { start_time: info.from, end_time: info.to, - traders: traders, + traders, volume_type: VolumeType::Base.to_string(), }; Ok(HttpResponse::Ok().json(response)) @@ -90,7 +90,7 @@ pub async fn get_top_traders_by_quote_volume( let response = TraderResponse { start_time: info.from, end_time: info.to, - traders: traders, + traders, volume_type: VolumeType::Quote.to_string(), }; Ok(HttpResponse::Ok().json(response)) diff --git a/src/structs/markets.rs b/src/structs/markets.rs index c898af3..da145cb 100644 --- a/src/structs/markets.rs +++ b/src/structs/markets.rs @@ -111,8 +111,8 @@ pub async fn fetch_market_infos( .value; for i in 0..mint_results.len() { let mut mint_account = mint_results[i].as_ref().unwrap().clone(); - let mut mint_bytes: &[u8] = &mut mint_account.data[..]; - let mint = Mint::unpack_from_slice(&mut mint_bytes).unwrap(); + let mint_bytes: &[u8] = &mut mint_account.data[..]; + let mint = Mint::unpack_from_slice(mint_bytes).unwrap(); mint_key_map.insert(mint_keys[i], mint.decimals); } diff --git a/src/worker/candle_batching/higher_order_candles.rs b/src/worker/candle_batching/higher_order_candles.rs index f1edbf1..f38518b 100644 --- a/src/worker/candle_batching/higher_order_candles.rs +++ b/src/worker/candle_batching/higher_order_candles.rs @@ -30,7 +30,7 @@ pub async fn batch_higher_order_candles( end_time, ) .await?; - if constituent_candles.len() == 0 { + if constituent_candles.is_empty() { return Ok(Vec::new()); } let combined_candles = combine_into_higher_order_candles( @@ -45,7 +45,7 @@ pub async fn batch_higher_order_candles( let mut constituent_candles = fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution()) .await?; - if constituent_candles.len() == 0 { + if constituent_candles.is_empty() { // println!( // "Batching {}, but no candles found for: {:?}, {}", // resolution, @@ -56,7 +56,7 @@ pub async fn batch_higher_order_candles( } let start_time = constituent_candles[0].start_time.duration_trunc(day())?; - if constituent_candles.len() == 0 { + if constituent_candles.is_empty() { return Ok(Vec::new()); } @@ -100,7 +100,7 @@ fn combine_into_higher_order_candles( let mut combined_candles = vec![empty_candle; num_candles]; let mut con_iter = constituent_candles.iter_mut().peekable(); - let mut start_time = st.clone(); + let mut start_time = st; let mut end_time = start_time + duration; let mut last_candle = seed_candle; @@ -125,7 +125,7 @@ fn combine_into_higher_order_candles( combined_candles[i].end_time = end_time; start_time = end_time; - end_time = end_time + duration; + end_time += duration; last_candle = combined_candles[i].clone(); } @@ -152,7 +152,7 @@ pub async fn backfill_batch_higher_order_candles( ) -> anyhow::Result> { let mut constituent_candles = fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution()).await?; - if constituent_candles.len() == 0 { + if constituent_candles.is_empty() { return Ok(vec![]); } let start_time = constituent_candles[0].start_time.duration_trunc(day())?; diff --git a/src/worker/candle_batching/minute_candles.rs b/src/worker/candle_batching/minute_candles.rs index 31eb78e..24e44f5 100644 --- a/src/worker/candle_batching/minute_candles.rs +++ b/src/worker/candle_batching/minute_candles.rs @@ -53,7 +53,7 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul Utc::now().duration_trunc(Duration::minutes(1))?, ); let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; - if fills.len() > 0 { + if !fills.is_empty() { let candles = combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None); Ok(candles) @@ -77,13 +77,13 @@ fn combine_fills_into_1m_candles( let mut candles = vec![empty_candle; minutes as usize]; let mut fills_iter = fills.iter_mut().peekable(); - let mut start_time = st.clone(); + let mut start_time = st; let mut end_time = start_time + Duration::minutes(1); let mut last_price = match maybe_last_price { Some(p) => p, None => { - let first = fills_iter.peek().clone().unwrap(); + let first = fills_iter.peek().unwrap(); let (price, _) = calculate_fill_price_and_size(**first, market.base_decimals, market.quote_decimals); price @@ -115,7 +115,7 @@ fn combine_fills_into_1m_candles( candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time); start_time = end_time; - end_time = end_time + Duration::minutes(1); + end_time += Duration::minutes(1); } candles @@ -146,7 +146,7 @@ pub async fn backfill_batch_1m_candles( Utc::now().duration_trunc(Duration::minutes(1))?, ); let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; - if fills.len() > 0 { + if !fills.is_empty() { let mut minute_candles = combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None); candles.append(&mut minute_candles); diff --git a/src/worker/candle_batching/mod.rs b/src/worker/candle_batching/mod.rs index 2932ece..56259b8 100644 --- a/src/worker/candle_batching/mod.rs +++ b/src/worker/candle_batching/mod.rs @@ -60,7 +60,7 @@ async fn batch_inner( } async fn send_candles(candles: Vec, candles_sender: &Sender>) { - if candles.len() > 0 { + if !candles.is_empty() { if let Err(_) = candles_sender.send(candles).await { panic!("candles receiver dropped"); } diff --git a/src/worker/main.rs b/src/worker/main.rs index 875708f..000f0e6 100644 --- a/src/worker/main.rs +++ b/src/worker/main.rs @@ -1,4 +1,4 @@ -use dotenv; + use openbook_candles::structs::candle::Candle; use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; use openbook_candles::structs::openbook::OpenBookFillEvent; @@ -29,7 +29,7 @@ async fn main() -> anyhow::Result<()> { rpc_url: rpc_url.clone(), }; - let markets = load_markets(&path_to_markets_json); + let markets = load_markets(path_to_markets_json); let market_infos = fetch_market_infos(&config, markets.clone()).await?; let mut target_markets = HashMap::new(); for m in market_infos.clone() { diff --git a/src/worker/trade_fetching/parsing.rs b/src/worker/trade_fetching/parsing.rs index e70560d..02c8d64 100644 --- a/src/worker/trade_fetching/parsing.rs +++ b/src/worker/trade_fetching/parsing.rs @@ -71,9 +71,9 @@ fn parse_openbook_fills_from_logs( } } - if fills_vector.len() > 0 { - return Some(fills_vector); + if !fills_vector.is_empty() { + Some(fills_vector) } else { - return None; + None } } diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs index ebd3a02..6a5bf09 100644 --- a/src/worker/trade_fetching/scrape.rs +++ b/src/worker/trade_fetching/scrape.rs @@ -62,12 +62,12 @@ pub async fn scrape_transactions( } }; - if sigs.len() == 0 { + if sigs.is_empty() { println!("No signatures found"); return before_sig; } - let last = sigs.last().clone().unwrap(); + let last = sigs.last().unwrap(); let request_last_sig = Signature::from_str(&last.signature).unwrap(); sigs.retain(|sig| sig.err.is_none()); @@ -88,13 +88,13 @@ pub async fn scrape_transactions( let txn_futs: Vec<_> = signatures .iter() - .map(|s| rpc_client.get_transaction_with_config(&s, txn_config)) + .map(|s| rpc_client.get_transaction_with_config(s, txn_config)) .collect(); let mut txns = join_all(txn_futs).await; let fills = parse_trades_from_openbook_txns(&mut txns, target_markets); - if fills.len() > 0 { + if !fills.is_empty() { for fill in fills.into_iter() { if let Err(_) = fill_sender.send(fill).await { panic!("receiver dropped");