style: clippy and dead code cleanup
This commit is contained in:
parent
4ba8fd5ec5
commit
c7ee74b872
|
@ -1,2 +1,4 @@
|
|||
/target
|
||||
.env
|
||||
*.cer*
|
||||
*.pks*
|
|
@ -102,11 +102,11 @@ pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> an
|
|||
continue;
|
||||
}
|
||||
let last = sigs.last().unwrap();
|
||||
let last_time = last.block_time.unwrap().clone();
|
||||
let last_time = last.block_time.unwrap();
|
||||
let last_signature = last.signature.clone();
|
||||
let transactions = sigs
|
||||
.into_iter()
|
||||
.map(|s| PgTransaction::from_rpc_confirmed_transaction(s))
|
||||
.map(PgTransaction::from_rpc_confirmed_transaction)
|
||||
.collect::<Vec<PgTransaction>>();
|
||||
|
||||
if transactions.is_empty() {
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use deadpool_postgres::Pool;
|
||||
use log::debug;
|
||||
use std::collections::HashMap;
|
||||
use tokio::sync::mpsc::{error::TryRecvError, Receiver};
|
||||
|
||||
|
||||
|
||||
|
||||
use crate::{
|
||||
structs::{candle::Candle, openbook::OpenBookFillEvent, transaction::PgTransaction},
|
||||
|
@ -19,8 +19,8 @@ pub async fn insert_fills_atomically(
|
|||
let db_txn = client.build_transaction().start().await?;
|
||||
|
||||
// 1. Insert fills
|
||||
if fills.len() > 0 {
|
||||
let fills_statement = build_fills_upsert_statement_not_crazy(fills);
|
||||
if !fills.is_empty() {
|
||||
let fills_statement = build_fills_upsert_statement(fills);
|
||||
db_txn
|
||||
.execute(&fills_statement, &[])
|
||||
.await
|
||||
|
@ -42,78 +42,7 @@ pub async fn insert_fills_atomically(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn persist_fill_events(
|
||||
pool: &Pool,
|
||||
fill_receiver: &mut Receiver<OpenBookFillEvent>,
|
||||
) -> anyhow::Result<()> {
|
||||
loop {
|
||||
let mut write_batch = HashMap::new();
|
||||
while write_batch.len() < 10 {
|
||||
match fill_receiver.try_recv() {
|
||||
Ok(event) => {
|
||||
write_batch.entry(event).or_insert(0);
|
||||
}
|
||||
Err(TryRecvError::Empty) => {
|
||||
if !write_batch.is_empty() {
|
||||
break;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Err(TryRecvError::Disconnected) => {
|
||||
panic!("Fills sender must stay alive")
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
if !write_batch.is_empty() {
|
||||
debug!("writing: {:?} events to DB\n", write_batch.len());
|
||||
let upsert_statement = build_fills_upsert_statement(write_batch);
|
||||
let client = pool.get().await?;
|
||||
client
|
||||
.execute(&upsert_statement, &[])
|
||||
.await
|
||||
.map_err_anyhow()
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
fn build_fills_upsert_statement(events: HashMap<OpenBookFillEvent, u8>) -> String {
|
||||
let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES");
|
||||
for (idx, event) in events.keys().enumerate() {
|
||||
let val_str = format!(
|
||||
"({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})",
|
||||
event.signature,
|
||||
to_timestampz(event.block_time as u64).to_rfc3339(),
|
||||
event.market,
|
||||
event.open_orders,
|
||||
event.open_orders_owner,
|
||||
event.bid,
|
||||
event.maker,
|
||||
event.native_qty_paid,
|
||||
event.native_qty_received,
|
||||
event.native_fee_or_rebate,
|
||||
event.fee_tier,
|
||||
event.order_id,
|
||||
event.log_index,
|
||||
);
|
||||
|
||||
if idx == 0 {
|
||||
stmt = format!("{} {}", &stmt, val_str);
|
||||
} else {
|
||||
stmt = format!("{}, {}", &stmt, val_str);
|
||||
}
|
||||
}
|
||||
|
||||
let handle_conflict = "ON CONFLICT DO NOTHING";
|
||||
|
||||
stmt = format!("{} {}", stmt, handle_conflict);
|
||||
stmt
|
||||
}
|
||||
|
||||
fn build_fills_upsert_statement_not_crazy(fills: Vec<OpenBookFillEvent>) -> String {
|
||||
fn build_fills_upsert_statement(fills: Vec<OpenBookFillEvent>) -> String {
|
||||
let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES");
|
||||
for (idx, fill) in fills.iter().enumerate() {
|
||||
let val_str = format!(
|
||||
|
|
|
@ -55,7 +55,7 @@ async fn main() -> std::io::Result<()> {
|
|||
.unwrap();
|
||||
// For collecting metrics on the public api, excluding 404s
|
||||
let public_metrics = PrometheusMetricsBuilder::new("openbook_candles_server")
|
||||
.registry(registry.clone())
|
||||
.registry(registry)
|
||||
.exclude_status(StatusCode::NOT_FOUND)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
|
|
@ -53,7 +53,7 @@ async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
|
|||
}
|
||||
|
||||
async fn save_candles(pool: &Pool, candles: Vec<Candle>) -> anyhow::Result<()> {
|
||||
if candles.len() == 0 {
|
||||
if candles.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let upsert_statement = build_candles_upsert_statement(&candles);
|
||||
|
|
|
@ -50,7 +50,7 @@ pub fn parse_trades_from_openbook_txns(
|
|||
}
|
||||
}
|
||||
}
|
||||
sig_strings.retain(|s| !failed_sigs.contains(&s));
|
||||
sig_strings.retain(|s| !failed_sigs.contains(s));
|
||||
(fills_vector, sig_strings)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,23 +1,23 @@
|
|||
use deadpool_postgres::Pool;
|
||||
use futures::future::join_all;
|
||||
use log::{debug, info, warn};
|
||||
use log::{debug, warn};
|
||||
use solana_client::{
|
||||
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
|
||||
rpc_config::RpcTransactionConfig,
|
||||
};
|
||||
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature};
|
||||
use solana_transaction_status::UiTransactionEncoding;
|
||||
use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use std::{collections::HashMap, time::Duration as WaitDuration};
|
||||
|
||||
|
||||
use crate::{
|
||||
database::{
|
||||
fetch::fetch_worker_transactions,
|
||||
insert::{build_transactions_insert_statement, insert_fills_atomically},
|
||||
},
|
||||
structs::{openbook::OpenBookFillEvent, transaction::PgTransaction},
|
||||
utils::{AnyhowWrap, Config, OPENBOOK_KEY},
|
||||
worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL, METRIC_TRANSACTIONS_TOTAL},
|
||||
structs::{transaction::PgTransaction},
|
||||
utils::{AnyhowWrap, OPENBOOK_KEY},
|
||||
worker::metrics::{METRIC_RPC_ERRORS_TOTAL, METRIC_TRANSACTIONS_TOTAL},
|
||||
};
|
||||
|
||||
use super::parsing::parse_trades_from_openbook_txns;
|
||||
|
@ -53,7 +53,7 @@ pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<(
|
|||
}
|
||||
let transactions: Vec<PgTransaction> = sigs
|
||||
.into_iter()
|
||||
.map(|s| PgTransaction::from_rpc_confirmed_transaction(s))
|
||||
.map(PgTransaction::from_rpc_confirmed_transaction)
|
||||
.collect();
|
||||
|
||||
debug!("Scraper writing: {:?} txns to DB\n", transactions.len());
|
||||
|
@ -66,7 +66,6 @@ pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<(
|
|||
METRIC_TRANSACTIONS_TOTAL.inc_by(num_txns);
|
||||
}
|
||||
// TODO: graceful shutdown
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn scrape_fills(
|
||||
|
@ -80,7 +79,7 @@ pub async fn scrape_fills(
|
|||
|
||||
loop {
|
||||
let transactions = fetch_worker_transactions(worker_id, pool).await?;
|
||||
if transactions.len() == 0 {
|
||||
if transactions.is_empty() {
|
||||
debug!("No signatures found by worker {}", worker_id);
|
||||
tokio::time::sleep(WaitDuration::from_secs(1)).await;
|
||||
continue;
|
||||
|
@ -116,6 +115,4 @@ pub async fn scrape_fills(
|
|||
// Write fills to the database, and update properly fetched transactions as processed
|
||||
insert_fills_atomically(pool, worker_id, fills, completed_sigs).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue