refactor: simplify writing fills to db

This commit is contained in:
dboures 2023-03-14 01:21:15 -05:00
parent 0057a2faa2
commit 8ccfcbde04
No known key found for this signature in database
GPG Key ID: AB3790129D478852
5 changed files with 121 additions and 119 deletions

View File

@ -1,12 +1,12 @@
use dotenv;
use openbook_candles::candle_creation::candle_batching::batch_candles;
use openbook_candles::candle_creation::trade_fetching::scrape::{fetch_market_infos, scrape};
use openbook_candles::candle_creation::trade_fetching::scrape::scrape;
use openbook_candles::database::{
initialize::{connect_to_database, setup_database},
insert::{persist_candles, persist_fill_events},
};
use openbook_candles::structs::candle::Candle;
use openbook_candles::structs::markets::load_markets;
use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
use openbook_candles::structs::openbook::OpenBookFillEventLog;
use openbook_candles::utils::Config;
use solana_sdk::pubkey::Pubkey;
@ -17,6 +17,7 @@ use tokio::sync::mpsc;
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
let path_to_markets_json: String = dotenv::var("PATH_TO_MARKETS_JSON").unwrap();
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
let database_url: String = dotenv::var("DATABASE_URL").unwrap();
let max_pg_pool_connections: u32 = dotenv::var("MAX_PG_POOL_CONNS_WORKER")
@ -30,7 +31,7 @@ async fn main() -> anyhow::Result<()> {
max_pg_pool_connections,
};
let markets = load_markets("/Users/dboures/dev/openbook-candles/markets.json");
let markets = load_markets(&path_to_markets_json);
let market_infos = fetch_market_infos(&config, markets).await?;
let mut target_markets = HashMap::new();
for m in market_infos.clone() {

View File

@ -1,26 +1,14 @@
use anchor_lang::AnchorDeserialize;
use futures::future::join_all;
use solana_account_decoder::UiAccountEncoding;
use solana_client::{
nonblocking::rpc_client::RpcClient,
rpc_client::GetConfirmedSignaturesForAddress2Config,
rpc_config::{RpcAccountInfoConfig, RpcTransactionConfig},
};
use solana_sdk::{
commitment_config::CommitmentConfig, program_pack::Pack, pubkey::Pubkey, signature::Signature,
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
rpc_config::RpcTransactionConfig,
};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature};
use solana_transaction_status::UiTransactionEncoding;
use spl_token::state::Mint;
use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
use tokio::sync::mpsc::Sender;
use crate::{
structs::{
markets::{MarketConfig, MarketInfo},
openbook::{MarketState, OpenBookFillEventLog},
},
utils::Config,
};
use crate::{structs::openbook::OpenBookFillEventLog, utils::Config};
use super::parsing::parse_trades_from_openbook_txns;
@ -116,95 +104,3 @@ pub async fn scrape_transactions(
Some(request_last_sig)
}
pub async fn fetch_market_infos(
config: &Config,
markets: Vec<MarketConfig>,
) -> anyhow::Result<Vec<MarketInfo>> {
let rpc_client =
RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::processed());
let rpc_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
data_slice: None,
commitment: Some(CommitmentConfig::confirmed()),
min_context_slot: None,
};
let market_keys = markets
.iter()
.map(|x| Pubkey::from_str(&x.address).unwrap())
.collect::<Vec<Pubkey>>();
let mut market_results = rpc_client
.get_multiple_accounts_with_config(&market_keys, rpc_config.clone())
.await?
.value;
let mut mint_key_map = HashMap::new();
let mut market_infos = market_results
.iter_mut()
.map(|r| {
let get_account_result = r.as_mut().unwrap();
let mut market_bytes: &[u8] = &mut get_account_result.data[5..];
let raw_market: MarketState =
AnchorDeserialize::deserialize(&mut market_bytes).unwrap();
let market_address_string = serum_bytes_to_pubkey(raw_market.own_address).to_string();
let base_mint_key = serum_bytes_to_pubkey(raw_market.coin_mint);
let quote_mint_key = serum_bytes_to_pubkey(raw_market.pc_mint);
mint_key_map.insert(base_mint_key, 0);
mint_key_map.insert(quote_mint_key, 0);
let market_name = markets
.iter()
.find(|x| x.address == market_address_string)
.unwrap()
.name
.clone();
MarketInfo {
name: market_name,
address: market_address_string,
base_decimals: 0,
quote_decimals: 0,
base_mint_key: base_mint_key.to_string(),
quote_mint_key: quote_mint_key.to_string(),
base_lot_size: raw_market.coin_lot_size,
quote_lot_size: raw_market.pc_lot_size,
}
})
.collect::<Vec<MarketInfo>>();
let mint_keys = mint_key_map.keys().cloned().collect::<Vec<Pubkey>>();
let mint_results = rpc_client
.get_multiple_accounts_with_config(&mint_keys, rpc_config)
.await?
.value;
for i in 0..mint_results.len() {
let mut mint_account = mint_results[i].as_ref().unwrap().clone();
let mut mint_bytes: &[u8] = &mut mint_account.data[..];
let mint = Mint::unpack_from_slice(&mut mint_bytes).unwrap();
mint_key_map.insert(mint_keys[i], mint.decimals);
}
for i in 0..market_infos.len() {
let base_key = Pubkey::from_str(&market_infos[i].base_mint_key).unwrap();
let quote_key = Pubkey::from_str(&market_infos[i].quote_mint_key).unwrap();
market_infos[i].base_decimals = *mint_key_map.get(&base_key).unwrap();
market_infos[i].quote_decimals = *mint_key_map.get(&quote_key).unwrap();
}
Ok(market_infos)
}
fn serum_bytes_to_pubkey(data: [u64; 4]) -> Pubkey {
let mut res = [0; 32];
for i in 0..4 {
res[8 * i..][..8].copy_from_slice(&data[i].to_le_bytes());
}
Pubkey::new_from_array(res)
}

View File

@ -3,7 +3,6 @@ use sqlx::{Pool, Postgres};
use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher},
time::Instant,
};
use tokio::sync::mpsc::{error::TryRecvError, Receiver};
@ -17,16 +16,21 @@ pub async fn persist_fill_events(
mut fill_receiver: Receiver<OpenBookFillEventLog>,
) {
loop {
let start = Instant::now();
let mut write_batch = HashMap::new();
while write_batch.len() < 10 || start.elapsed().as_secs() > 10 {
while write_batch.len() < 10 {
match fill_receiver.try_recv() {
Ok(event) => {
if !write_batch.contains_key(&event) {
write_batch.insert(event, 0);
}
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Empty) => {
if write_batch.len() > 0 {
break;
} else {
continue;
}
}
Err(TryRecvError::Disconnected) => {
panic!("Fills sender must stay alive")
}

View File

@ -7,9 +7,8 @@ use candles::get_candles;
use dotenv;
use markets::get_markets;
use openbook_candles::{
candle_creation::trade_fetching::scrape::fetch_market_infos,
database::initialize::connect_to_database,
structs::markets::load_markets,
structs::markets::{fetch_market_infos, load_markets},
utils::{Config, WebContext},
};
use traders::{get_top_traders_by_base_volume, get_top_traders_by_quote_volume};
@ -24,6 +23,7 @@ async fn main() -> std::io::Result<()> {
dotenv::dotenv().ok();
env_logger::init();
let path_to_markets_json: String = dotenv::var("PATH_TO_MARKETS_JSON").unwrap();
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
let database_url: String = dotenv::var("DATABASE_URL").unwrap();
let max_pg_pool_connections: u32 = dotenv::var("MAX_PG_POOL_CONNS_SERVER")
@ -37,7 +37,7 @@ async fn main() -> std::io::Result<()> {
max_pg_pool_connections,
};
let markets = load_markets("/Users/dboures/dev/openbook-candles/markets.json");
let markets = load_markets(&path_to_markets_json);
let market_infos = fetch_market_infos(&config, markets).await.unwrap();
let pool = connect_to_database(&config).await.unwrap();

View File

@ -1,5 +1,14 @@
use anchor_lang::AnchorDeserialize;
use serde::{Deserialize, Serialize};
use std::fs::File;
use solana_account_decoder::UiAccountEncoding;
use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcAccountInfoConfig};
use solana_sdk::{commitment_config::CommitmentConfig, program_pack::Pack, pubkey::Pubkey};
use spl_token::state::Mint;
use std::{collections::HashMap, fs::File, str::FromStr};
use crate::utils::Config;
use super::openbook::MarketState;
#[derive(Debug, Clone, Serialize)]
pub struct MarketInfo {
@ -27,3 +36,95 @@ pub fn load_markets(path: &str) -> Vec<MarketConfig> {
pub fn valid_market(market_name: &str, markets: &Vec<MarketInfo>) -> bool {
markets.iter().any(|x| x.name == market_name)
}
pub async fn fetch_market_infos(
config: &Config,
markets: Vec<MarketConfig>,
) -> anyhow::Result<Vec<MarketInfo>> {
let rpc_client =
RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::processed());
let rpc_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
data_slice: None,
commitment: Some(CommitmentConfig::confirmed()),
min_context_slot: None,
};
let market_keys = markets
.iter()
.map(|x| Pubkey::from_str(&x.address).unwrap())
.collect::<Vec<Pubkey>>();
let mut market_results = rpc_client
.get_multiple_accounts_with_config(&market_keys, rpc_config.clone())
.await?
.value;
let mut mint_key_map = HashMap::new();
let mut market_infos = market_results
.iter_mut()
.map(|r| {
let get_account_result = r.as_mut().unwrap();
let mut market_bytes: &[u8] = &mut get_account_result.data[5..];
let raw_market: MarketState =
AnchorDeserialize::deserialize(&mut market_bytes).unwrap();
let market_address_string = serum_bytes_to_pubkey(raw_market.own_address).to_string();
let base_mint_key = serum_bytes_to_pubkey(raw_market.coin_mint);
let quote_mint_key = serum_bytes_to_pubkey(raw_market.pc_mint);
mint_key_map.insert(base_mint_key, 0);
mint_key_map.insert(quote_mint_key, 0);
let market_name = markets
.iter()
.find(|x| x.address == market_address_string)
.unwrap()
.name
.clone();
MarketInfo {
name: market_name,
address: market_address_string,
base_decimals: 0,
quote_decimals: 0,
base_mint_key: base_mint_key.to_string(),
quote_mint_key: quote_mint_key.to_string(),
base_lot_size: raw_market.coin_lot_size,
quote_lot_size: raw_market.pc_lot_size,
}
})
.collect::<Vec<MarketInfo>>();
let mint_keys = mint_key_map.keys().cloned().collect::<Vec<Pubkey>>();
let mint_results = rpc_client
.get_multiple_accounts_with_config(&mint_keys, rpc_config)
.await?
.value;
for i in 0..mint_results.len() {
let mut mint_account = mint_results[i].as_ref().unwrap().clone();
let mut mint_bytes: &[u8] = &mut mint_account.data[..];
let mint = Mint::unpack_from_slice(&mut mint_bytes).unwrap();
mint_key_map.insert(mint_keys[i], mint.decimals);
}
for i in 0..market_infos.len() {
let base_key = Pubkey::from_str(&market_infos[i].base_mint_key).unwrap();
let quote_key = Pubkey::from_str(&market_infos[i].quote_mint_key).unwrap();
market_infos[i].base_decimals = *mint_key_map.get(&base_key).unwrap();
market_infos[i].quote_decimals = *mint_key_map.get(&quote_key).unwrap();
}
Ok(market_infos)
}
fn serum_bytes_to_pubkey(data: [u64; 4]) -> Pubkey {
let mut res = [0; 32];
for i in 0..4 {
res[8 * i..][..8].copy_from_slice(&data[i].to_le_bytes());
}
Pubkey::new_from_array(res)
}