lint: cargo clippy
This commit is contained in:
parent
7ae8339ebb
commit
6e16ef8e04
|
@ -1,16 +1,15 @@
|
||||||
use anchor_lang::prelude::Pubkey;
|
|
||||||
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
|
|
||||||
use deadpool_postgres::Object;
|
use deadpool_postgres::Object;
|
||||||
use futures::future::join_all;
|
|
||||||
use openbook_candles::{
|
use openbook_candles::{
|
||||||
database::{
|
database::{
|
||||||
initialize::connect_to_database,
|
initialize::connect_to_database,
|
||||||
insert::{build_candles_upsert_statement, persist_candles},
|
insert::{build_candles_upsert_statement},
|
||||||
},
|
},
|
||||||
structs::{
|
structs::{
|
||||||
candle::Candle,
|
candle::Candle,
|
||||||
markets::{fetch_market_infos, load_markets},
|
markets::{fetch_market_infos, load_markets},
|
||||||
openbook::OpenBookFillEvent,
|
|
||||||
resolution::Resolution,
|
resolution::Resolution,
|
||||||
},
|
},
|
||||||
utils::{AnyhowWrap, Config},
|
utils::{AnyhowWrap, Config},
|
||||||
|
@ -19,9 +18,9 @@ use openbook_candles::{
|
||||||
minute_candles::backfill_batch_1m_candles,
|
minute_candles::backfill_batch_1m_candles,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use std::{collections::HashMap, env, str::FromStr};
|
use std::{env};
|
||||||
use strum::IntoEnumIterator;
|
use strum::IntoEnumIterator;
|
||||||
use tokio::sync::mpsc::{self, Sender};
|
|
||||||
|
|
||||||
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
@ -35,7 +34,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let config = Config {
|
let config = Config {
|
||||||
rpc_url: rpc_url.clone(),
|
rpc_url: rpc_url.clone(),
|
||||||
};
|
};
|
||||||
let markets = load_markets(&path_to_markets_json);
|
let markets = load_markets(path_to_markets_json);
|
||||||
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
|
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
|
||||||
println!("Backfilling candles for {:?}", markets);
|
println!("Backfilling candles for {:?}", markets);
|
||||||
|
|
||||||
|
@ -59,7 +58,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn save_candles(candles: Vec<Candle>, client: Object) -> anyhow::Result<()> {
|
async fn save_candles(candles: Vec<Candle>, client: Object) -> anyhow::Result<()> {
|
||||||
if candles.len() > 0 {
|
if !candles.is_empty() {
|
||||||
let upsert_statement = build_candles_upsert_statement(candles);
|
let upsert_statement = build_candles_upsert_statement(candles);
|
||||||
client
|
client
|
||||||
.execute(&upsert_statement, &[])
|
.execute(&upsert_statement, &[])
|
||||||
|
|
|
@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let config = Config {
|
let config = Config {
|
||||||
rpc_url: rpc_url.clone(),
|
rpc_url: rpc_url.clone(),
|
||||||
};
|
};
|
||||||
let markets = load_markets(&path_to_markets_json);
|
let markets = load_markets(path_to_markets_json);
|
||||||
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
|
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
|
||||||
let mut target_markets = HashMap::new();
|
let mut target_markets = HashMap::new();
|
||||||
for m in market_infos.clone() {
|
for m in market_infos.clone() {
|
||||||
|
@ -128,17 +128,17 @@ pub async fn get_signatures(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if sigs.len() == 0 {
|
if sigs.is_empty() {
|
||||||
println!("No signatures found");
|
println!("No signatures found");
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
let last = sigs.last().unwrap();
|
let last = sigs.last().unwrap();
|
||||||
// println!("{:?}", last.block_time.unwrap());
|
// println!("{:?}", last.block_time.unwrap());
|
||||||
return Some((
|
Some((
|
||||||
Signature::from_str(&last.signature).unwrap(),
|
Signature::from_str(&last.signature).unwrap(),
|
||||||
last.block_time.unwrap(),
|
last.block_time.unwrap(),
|
||||||
sigs,
|
sigs,
|
||||||
));
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_transactions(
|
pub async fn get_transactions(
|
||||||
|
@ -165,13 +165,13 @@ pub async fn get_transactions(
|
||||||
|
|
||||||
let txn_futs: Vec<_> = signatures
|
let txn_futs: Vec<_> = signatures
|
||||||
.iter()
|
.iter()
|
||||||
.map(|s| rpc_client.get_transaction_with_config(&s, txn_config))
|
.map(|s| rpc_client.get_transaction_with_config(s, txn_config))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let mut txns = join_all(txn_futs).await;
|
let mut txns = join_all(txn_futs).await;
|
||||||
|
|
||||||
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
|
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
|
||||||
if fills.len() > 0 {
|
if !fills.is_empty() {
|
||||||
for fill in fills.into_iter() {
|
for fill in fills.into_iter() {
|
||||||
// println!("Sending fill {:?}", fill);
|
// println!("Sending fill {:?}", fill);
|
||||||
if let Err(_) = fill_sender.send(fill).await {
|
if let Err(_) = fill_sender.send(fill).await {
|
||||||
|
|
|
@ -69,7 +69,7 @@ pub async fn fetch_fills_from(
|
||||||
.await?;
|
.await?;
|
||||||
Ok(rows
|
Ok(rows
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|r| PgOpenBookFill::from_row(r))
|
.map(PgOpenBookFill::from_row)
|
||||||
.collect())
|
.collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -144,7 +144,7 @@ pub async fn fetch_earliest_candles(
|
||||||
.query(&stmt, &[&market_name, &resolution.to_string()])
|
.query(&stmt, &[&market_name, &resolution.to_string()])
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(rows.into_iter().map(|r| Candle::from_row(r)).collect())
|
Ok(rows.into_iter().map(Candle::from_row).collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn fetch_candles_from(
|
pub async fn fetch_candles_from(
|
||||||
|
@ -190,7 +190,7 @@ pub async fn fetch_candles_from(
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(rows.into_iter().map(|r| Candle::from_row(r)).collect())
|
Ok(rows.into_iter().map(Candle::from_row).collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn fetch_top_traders_by_base_volume_from(
|
pub async fn fetch_top_traders_by_base_volume_from(
|
||||||
|
@ -229,7 +229,7 @@ pub async fn fetch_top_traders_by_base_volume_from(
|
||||||
.query(&stmt, &[&market_address_string, &start_time, &end_time])
|
.query(&stmt, &[&market_address_string, &start_time, &end_time])
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(rows.into_iter().map(|r| PgTrader::from_row(r)).collect())
|
Ok(rows.into_iter().map(PgTrader::from_row).collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn fetch_top_traders_by_quote_volume_from(
|
pub async fn fetch_top_traders_by_quote_volume_from(
|
||||||
|
@ -268,7 +268,7 @@ pub async fn fetch_top_traders_by_quote_volume_from(
|
||||||
.query(&stmt, &[&market_address_string, &start_time, &end_time])
|
.query(&stmt, &[&market_address_string, &start_time, &end_time])
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(rows.into_iter().map(|r| PgTrader::from_row(r)).collect())
|
Ok(rows.into_iter().map(PgTrader::from_row).collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn fetch_coingecko_24h_volume(
|
pub async fn fetch_coingecko_24h_volume(
|
||||||
|
@ -292,7 +292,7 @@ pub async fn fetch_coingecko_24h_volume(
|
||||||
|
|
||||||
Ok(rows
|
Ok(rows
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|r| PgCoinGecko24HourVolume::from_row(r))
|
.map(PgCoinGecko24HourVolume::from_row)
|
||||||
.collect())
|
.collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -334,6 +334,6 @@ pub async fn fetch_coingecko_24h_high_low(
|
||||||
|
|
||||||
Ok(rows
|
Ok(rows
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|r| PgCoinGecko24HighLow::from_row(r))
|
.map(PgCoinGecko24HighLow::from_row)
|
||||||
.collect())
|
.collect())
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,10 +25,10 @@ pub async fn connect_to_database() -> anyhow::Result<Pool> {
|
||||||
// fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills
|
// fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills
|
||||||
let tls = if pg_config.pg_use_ssl {
|
let tls = if pg_config.pg_use_ssl {
|
||||||
pg_config.pg.ssl_mode = Some(SslMode::Require);
|
pg_config.pg.ssl_mode = Some(SslMode::Require);
|
||||||
let ca_cert = fs::read(&pg_config.pg_ca_cert_path.expect("reading ca cert from env"))
|
let ca_cert = fs::read(pg_config.pg_ca_cert_path.expect("reading ca cert from env"))
|
||||||
.expect("reading ca cert from file");
|
.expect("reading ca cert from file");
|
||||||
let client_key = fs::read(
|
let client_key = fs::read(
|
||||||
&pg_config
|
pg_config
|
||||||
.pg_client_key_path
|
.pg_client_key_path
|
||||||
.expect("reading client key from env"),
|
.expect("reading client key from env"),
|
||||||
)
|
)
|
||||||
|
|
|
@ -20,12 +20,10 @@ pub async fn persist_fill_events(
|
||||||
while write_batch.len() < 10 {
|
while write_batch.len() < 10 {
|
||||||
match fill_receiver.try_recv() {
|
match fill_receiver.try_recv() {
|
||||||
Ok(event) => {
|
Ok(event) => {
|
||||||
if !write_batch.contains_key(&event) {
|
write_batch.entry(event).or_insert(0);
|
||||||
write_batch.insert(event, 0);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Err(TryRecvError::Empty) => {
|
Err(TryRecvError::Empty) => {
|
||||||
if write_batch.len() > 0 {
|
if !write_batch.is_empty() {
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
continue;
|
continue;
|
||||||
|
@ -37,7 +35,7 @@ pub async fn persist_fill_events(
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
if write_batch.len() > 0 {
|
if !write_batch.is_empty() {
|
||||||
// print!("writing: {:?} events to DB\n", write_batch.len());
|
// print!("writing: {:?} events to DB\n", write_batch.len());
|
||||||
|
|
||||||
// match conn.ping().await {
|
// match conn.ping().await {
|
||||||
|
@ -68,7 +66,7 @@ pub async fn persist_candles(
|
||||||
// Ok(_) => {
|
// Ok(_) => {
|
||||||
match candles_receiver.try_recv() {
|
match candles_receiver.try_recv() {
|
||||||
Ok(candles) => {
|
Ok(candles) => {
|
||||||
if candles.len() == 0 {
|
if candles.is_empty() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// print!("writing: {:?} candles to DB\n", candles.len());
|
// print!("writing: {:?} candles to DB\n", candles.len());
|
||||||
|
|
|
@ -71,7 +71,7 @@ pub async fn tickers(context: web::Data<WebContext>) -> Result<HttpResponse, Ser
|
||||||
let default_volume = CoinGecko24HourVolume::default();
|
let default_volume = CoinGecko24HourVolume::default();
|
||||||
let volumes: Vec<CoinGecko24HourVolume> = raw_volumes
|
let volumes: Vec<CoinGecko24HourVolume> = raw_volumes
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|v| v.convert_to_readable(&markets))
|
.map(|v| v.convert_to_readable(markets))
|
||||||
.collect();
|
.collect();
|
||||||
let tickers = markets
|
let tickers = markets
|
||||||
.iter()
|
.iter()
|
||||||
|
|
|
@ -4,7 +4,7 @@ use actix_web::{
|
||||||
App, HttpServer,
|
App, HttpServer,
|
||||||
};
|
};
|
||||||
use candles::get_candles;
|
use candles::get_candles;
|
||||||
use dotenv;
|
|
||||||
use markets::get_markets;
|
use markets::get_markets;
|
||||||
use openbook_candles::{
|
use openbook_candles::{
|
||||||
database::initialize::connect_to_database,
|
database::initialize::connect_to_database,
|
||||||
|
|
|
@ -51,7 +51,7 @@ pub async fn get_top_traders_by_base_volume(
|
||||||
let response = TraderResponse {
|
let response = TraderResponse {
|
||||||
start_time: info.from,
|
start_time: info.from,
|
||||||
end_time: info.to,
|
end_time: info.to,
|
||||||
traders: traders,
|
traders,
|
||||||
volume_type: VolumeType::Base.to_string(),
|
volume_type: VolumeType::Base.to_string(),
|
||||||
};
|
};
|
||||||
Ok(HttpResponse::Ok().json(response))
|
Ok(HttpResponse::Ok().json(response))
|
||||||
|
@ -90,7 +90,7 @@ pub async fn get_top_traders_by_quote_volume(
|
||||||
let response = TraderResponse {
|
let response = TraderResponse {
|
||||||
start_time: info.from,
|
start_time: info.from,
|
||||||
end_time: info.to,
|
end_time: info.to,
|
||||||
traders: traders,
|
traders,
|
||||||
volume_type: VolumeType::Quote.to_string(),
|
volume_type: VolumeType::Quote.to_string(),
|
||||||
};
|
};
|
||||||
Ok(HttpResponse::Ok().json(response))
|
Ok(HttpResponse::Ok().json(response))
|
||||||
|
|
|
@ -111,8 +111,8 @@ pub async fn fetch_market_infos(
|
||||||
.value;
|
.value;
|
||||||
for i in 0..mint_results.len() {
|
for i in 0..mint_results.len() {
|
||||||
let mut mint_account = mint_results[i].as_ref().unwrap().clone();
|
let mut mint_account = mint_results[i].as_ref().unwrap().clone();
|
||||||
let mut mint_bytes: &[u8] = &mut mint_account.data[..];
|
let mint_bytes: &[u8] = &mut mint_account.data[..];
|
||||||
let mint = Mint::unpack_from_slice(&mut mint_bytes).unwrap();
|
let mint = Mint::unpack_from_slice(mint_bytes).unwrap();
|
||||||
|
|
||||||
mint_key_map.insert(mint_keys[i], mint.decimals);
|
mint_key_map.insert(mint_keys[i], mint.decimals);
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,7 +30,7 @@ pub async fn batch_higher_order_candles(
|
||||||
end_time,
|
end_time,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
if constituent_candles.len() == 0 {
|
if constituent_candles.is_empty() {
|
||||||
return Ok(Vec::new());
|
return Ok(Vec::new());
|
||||||
}
|
}
|
||||||
let combined_candles = combine_into_higher_order_candles(
|
let combined_candles = combine_into_higher_order_candles(
|
||||||
|
@ -45,7 +45,7 @@ pub async fn batch_higher_order_candles(
|
||||||
let mut constituent_candles =
|
let mut constituent_candles =
|
||||||
fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution())
|
fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution())
|
||||||
.await?;
|
.await?;
|
||||||
if constituent_candles.len() == 0 {
|
if constituent_candles.is_empty() {
|
||||||
// println!(
|
// println!(
|
||||||
// "Batching {}, but no candles found for: {:?}, {}",
|
// "Batching {}, but no candles found for: {:?}, {}",
|
||||||
// resolution,
|
// resolution,
|
||||||
|
@ -56,7 +56,7 @@ pub async fn batch_higher_order_candles(
|
||||||
}
|
}
|
||||||
let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
|
let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
|
||||||
|
|
||||||
if constituent_candles.len() == 0 {
|
if constituent_candles.is_empty() {
|
||||||
return Ok(Vec::new());
|
return Ok(Vec::new());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -100,7 +100,7 @@ fn combine_into_higher_order_candles(
|
||||||
let mut combined_candles = vec![empty_candle; num_candles];
|
let mut combined_candles = vec![empty_candle; num_candles];
|
||||||
|
|
||||||
let mut con_iter = constituent_candles.iter_mut().peekable();
|
let mut con_iter = constituent_candles.iter_mut().peekable();
|
||||||
let mut start_time = st.clone();
|
let mut start_time = st;
|
||||||
let mut end_time = start_time + duration;
|
let mut end_time = start_time + duration;
|
||||||
|
|
||||||
let mut last_candle = seed_candle;
|
let mut last_candle = seed_candle;
|
||||||
|
@ -125,7 +125,7 @@ fn combine_into_higher_order_candles(
|
||||||
combined_candles[i].end_time = end_time;
|
combined_candles[i].end_time = end_time;
|
||||||
|
|
||||||
start_time = end_time;
|
start_time = end_time;
|
||||||
end_time = end_time + duration;
|
end_time += duration;
|
||||||
|
|
||||||
last_candle = combined_candles[i].clone();
|
last_candle = combined_candles[i].clone();
|
||||||
}
|
}
|
||||||
|
@ -152,7 +152,7 @@ pub async fn backfill_batch_higher_order_candles(
|
||||||
) -> anyhow::Result<Vec<Candle>> {
|
) -> anyhow::Result<Vec<Candle>> {
|
||||||
let mut constituent_candles =
|
let mut constituent_candles =
|
||||||
fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution()).await?;
|
fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution()).await?;
|
||||||
if constituent_candles.len() == 0 {
|
if constituent_candles.is_empty() {
|
||||||
return Ok(vec![]);
|
return Ok(vec![]);
|
||||||
}
|
}
|
||||||
let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
|
let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
|
||||||
|
|
|
@ -53,7 +53,7 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
|
||||||
Utc::now().duration_trunc(Duration::minutes(1))?,
|
Utc::now().duration_trunc(Duration::minutes(1))?,
|
||||||
);
|
);
|
||||||
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
|
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
|
||||||
if fills.len() > 0 {
|
if !fills.is_empty() {
|
||||||
let candles =
|
let candles =
|
||||||
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
|
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
|
||||||
Ok(candles)
|
Ok(candles)
|
||||||
|
@ -77,13 +77,13 @@ fn combine_fills_into_1m_candles(
|
||||||
let mut candles = vec![empty_candle; minutes as usize];
|
let mut candles = vec![empty_candle; minutes as usize];
|
||||||
|
|
||||||
let mut fills_iter = fills.iter_mut().peekable();
|
let mut fills_iter = fills.iter_mut().peekable();
|
||||||
let mut start_time = st.clone();
|
let mut start_time = st;
|
||||||
let mut end_time = start_time + Duration::minutes(1);
|
let mut end_time = start_time + Duration::minutes(1);
|
||||||
|
|
||||||
let mut last_price = match maybe_last_price {
|
let mut last_price = match maybe_last_price {
|
||||||
Some(p) => p,
|
Some(p) => p,
|
||||||
None => {
|
None => {
|
||||||
let first = fills_iter.peek().clone().unwrap();
|
let first = fills_iter.peek().unwrap();
|
||||||
let (price, _) =
|
let (price, _) =
|
||||||
calculate_fill_price_and_size(**first, market.base_decimals, market.quote_decimals);
|
calculate_fill_price_and_size(**first, market.base_decimals, market.quote_decimals);
|
||||||
price
|
price
|
||||||
|
@ -115,7 +115,7 @@ fn combine_fills_into_1m_candles(
|
||||||
candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time);
|
candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time);
|
||||||
|
|
||||||
start_time = end_time;
|
start_time = end_time;
|
||||||
end_time = end_time + Duration::minutes(1);
|
end_time += Duration::minutes(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
candles
|
candles
|
||||||
|
@ -146,7 +146,7 @@ pub async fn backfill_batch_1m_candles(
|
||||||
Utc::now().duration_trunc(Duration::minutes(1))?,
|
Utc::now().duration_trunc(Duration::minutes(1))?,
|
||||||
);
|
);
|
||||||
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
|
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
|
||||||
if fills.len() > 0 {
|
if !fills.is_empty() {
|
||||||
let mut minute_candles =
|
let mut minute_candles =
|
||||||
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
|
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
|
||||||
candles.append(&mut minute_candles);
|
candles.append(&mut minute_candles);
|
||||||
|
|
|
@ -60,7 +60,7 @@ async fn batch_inner(
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_candles(candles: Vec<Candle>, candles_sender: &Sender<Vec<Candle>>) {
|
async fn send_candles(candles: Vec<Candle>, candles_sender: &Sender<Vec<Candle>>) {
|
||||||
if candles.len() > 0 {
|
if !candles.is_empty() {
|
||||||
if let Err(_) = candles_sender.send(candles).await {
|
if let Err(_) = candles_sender.send(candles).await {
|
||||||
panic!("candles receiver dropped");
|
panic!("candles receiver dropped");
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use dotenv;
|
|
||||||
use openbook_candles::structs::candle::Candle;
|
use openbook_candles::structs::candle::Candle;
|
||||||
use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
|
use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
|
||||||
use openbook_candles::structs::openbook::OpenBookFillEvent;
|
use openbook_candles::structs::openbook::OpenBookFillEvent;
|
||||||
|
@ -29,7 +29,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
rpc_url: rpc_url.clone(),
|
rpc_url: rpc_url.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let markets = load_markets(&path_to_markets_json);
|
let markets = load_markets(path_to_markets_json);
|
||||||
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
|
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
|
||||||
let mut target_markets = HashMap::new();
|
let mut target_markets = HashMap::new();
|
||||||
for m in market_infos.clone() {
|
for m in market_infos.clone() {
|
||||||
|
|
|
@ -71,9 +71,9 @@ fn parse_openbook_fills_from_logs(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if fills_vector.len() > 0 {
|
if !fills_vector.is_empty() {
|
||||||
return Some(fills_vector);
|
Some(fills_vector)
|
||||||
} else {
|
} else {
|
||||||
return None;
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,12 +62,12 @@ pub async fn scrape_transactions(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if sigs.len() == 0 {
|
if sigs.is_empty() {
|
||||||
println!("No signatures found");
|
println!("No signatures found");
|
||||||
return before_sig;
|
return before_sig;
|
||||||
}
|
}
|
||||||
|
|
||||||
let last = sigs.last().clone().unwrap();
|
let last = sigs.last().unwrap();
|
||||||
let request_last_sig = Signature::from_str(&last.signature).unwrap();
|
let request_last_sig = Signature::from_str(&last.signature).unwrap();
|
||||||
|
|
||||||
sigs.retain(|sig| sig.err.is_none());
|
sigs.retain(|sig| sig.err.is_none());
|
||||||
|
@ -88,13 +88,13 @@ pub async fn scrape_transactions(
|
||||||
|
|
||||||
let txn_futs: Vec<_> = signatures
|
let txn_futs: Vec<_> = signatures
|
||||||
.iter()
|
.iter()
|
||||||
.map(|s| rpc_client.get_transaction_with_config(&s, txn_config))
|
.map(|s| rpc_client.get_transaction_with_config(s, txn_config))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let mut txns = join_all(txn_futs).await;
|
let mut txns = join_all(txn_futs).await;
|
||||||
|
|
||||||
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
|
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
|
||||||
if fills.len() > 0 {
|
if !fills.is_empty() {
|
||||||
for fill in fills.into_iter() {
|
for fill in fills.into_iter() {
|
||||||
if let Err(_) = fill_sender.send(fill).await {
|
if let Err(_) = fill_sender.send(fill).await {
|
||||||
panic!("receiver dropped");
|
panic!("receiver dropped");
|
||||||
|
|
Loading…
Reference in New Issue