Merge pull request #5 from dboures/remove-candles-queue
Remove candles queue + merge metrics
This commit is contained in:
commit
83b2513164
|
@ -191,6 +191,18 @@ dependencies = [
|
|||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "actix-web-prom"
|
||||
version = "0.6.0"
|
||||
source = "git+https://github.com/riordanp/actix-web-prom.git?branch=exclude-paths#614434270cbcfdffa2b3a854aff4c1e49c4973fd"
|
||||
dependencies = [
|
||||
"actix-web",
|
||||
"futures-core",
|
||||
"pin-project-lite",
|
||||
"prometheus",
|
||||
"regex",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "adler"
|
||||
version = "1.0.2"
|
||||
|
@ -3423,6 +3435,7 @@ name = "openbook-candles"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"actix-web",
|
||||
"actix-web-prom",
|
||||
"anchor-client",
|
||||
"anchor-lang",
|
||||
"anyhow",
|
||||
|
@ -3438,11 +3451,13 @@ dependencies = [
|
|||
"env_logger 0.10.0",
|
||||
"futures 0.3.27",
|
||||
"jsonrpc-core-client",
|
||||
"lazy_static",
|
||||
"log 0.4.17",
|
||||
"native-tls",
|
||||
"num-traits",
|
||||
"num_enum 0.6.1",
|
||||
"postgres-native-tls",
|
||||
"prometheus",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"serde_json",
|
||||
|
@ -3966,6 +3981,21 @@ dependencies = [
|
|||
"yansi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prometheus"
|
||||
version = "0.13.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c"
|
||||
dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
"fnv",
|
||||
"lazy_static",
|
||||
"memchr",
|
||||
"parking_lot 0.12.1",
|
||||
"protobuf",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost"
|
||||
version = "0.11.6"
|
||||
|
@ -4021,6 +4051,12 @@ dependencies = [
|
|||
"prost",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protobuf"
|
||||
version = "2.28.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
|
||||
|
||||
[[package]]
|
||||
name = "protobuf-src"
|
||||
version = "1.1.0+21.5"
|
||||
|
|
|
@ -62,9 +62,12 @@ serum_dex = { version = "0.5.10", git = "https://github.com/openbook-dex/program
|
|||
anchor-lang = ">=0.25.0"
|
||||
|
||||
actix-web = "4"
|
||||
actix-web-prom = { version = "0.6.0", git = "https://github.com/riordanp/actix-web-prom.git", branch = "exclude-paths" }
|
||||
|
||||
arrayref = "0.3.6"
|
||||
bytemuck = "1.12.3"
|
||||
num_enum = "0.6.1"
|
||||
|
||||
config = "0.13.1"
|
||||
config = "0.13.1"
|
||||
prometheus = "0.13.3"
|
||||
lazy_static = "1.4.0"
|
||||
|
|
|
@ -17,3 +17,7 @@ kill_timeout = 30
|
|||
hard_limit = 1024
|
||||
soft_limit = 1024
|
||||
type = "connections"
|
||||
|
||||
[metrics]
|
||||
port = 9091
|
||||
path = "/metrics"
|
||||
|
|
|
@ -7,3 +7,7 @@ kill_timeout = 30
|
|||
|
||||
[experimental]
|
||||
cmd = ["worker", "markets.json"]
|
||||
|
||||
[metrics]
|
||||
port = 9091
|
||||
path = "/metrics"
|
34
markets.json
34
markets.json
|
@ -4,47 +4,35 @@
|
|||
"address" : "8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6"
|
||||
},
|
||||
{
|
||||
"name" : "RLB/USDC",
|
||||
"address" : "72h8rWaWwfPUL36PAFqyQZU8RT1V3FKG7Nc45aK89xTs"
|
||||
"name" : "wBTCpo/USDC",
|
||||
"address" : "3BAKsQd3RuhZKES2DGysMhjBdwjZYKYmxRqnSMtZ4KSN"
|
||||
},
|
||||
{
|
||||
"name" : "MNGO/USDC",
|
||||
"address" : "3NnxQvDcZXputNMxaxsGvqiKpqgPfSYXpNigZNFcknmD"
|
||||
},
|
||||
{
|
||||
"name" : "BONK/SOL",
|
||||
"address" : "Hs97TCZeuYiJxooo3U73qEHXg3dKpRL4uYKYRryEK9CF"
|
||||
"name": "BONK/SOL",
|
||||
"address": "Hs97TCZeuYiJxooo3U73qEHXg3dKpRL4uYKYRryEK9CF"
|
||||
},
|
||||
{
|
||||
"name" : "BONK/USDC",
|
||||
"address" : "8PhnCfgqpgFM7ZJvttGdBVMXHuU4Q23ACxCvWkbs1M71"
|
||||
},
|
||||
{
|
||||
"name" : "WBTC/USDC",
|
||||
"address" : "3BAKsQd3RuhZKES2DGysMhjBdwjZYKYmxRqnSMtZ4KSN"
|
||||
"name": "DUAL/USDC",
|
||||
"address": "H6rrYK3SUHF2eguZCyJxnSBMJqjXhUtuaki6PHiutvum"
|
||||
},
|
||||
{
|
||||
"name": "mSOL/USDC",
|
||||
"address": "9Lyhks5bQQxb9EyyX55NtgKQzpM4WK7JCmeaWuQ5MoXD"
|
||||
},
|
||||
{
|
||||
"name": "SOL/USDT",
|
||||
"address": "2AdaV97p6SfkuMQJdu8DHhBhmJe7oWdvbm52MJfYQmfA"
|
||||
},
|
||||
{
|
||||
"name": "USDT/USDC",
|
||||
"address": "B2na8Awyd7cpC59iEU43FagJAPLigr3AP3s38KM982bu"
|
||||
},
|
||||
{
|
||||
"name": "ETH/USDC",
|
||||
"name": "ETHpo/USDC",
|
||||
"address": "BbJgE7HZMaDp5NTYvRh5jZSkQPVDTU8ubPFtpogUkEj4"
|
||||
},
|
||||
{
|
||||
"name": "BONK/USDC",
|
||||
"address": "8PhnCfgqpgFM7ZJvttGdBVMXHuU4Q23ACxCvWkbs1M71"
|
||||
},
|
||||
{
|
||||
"name": "RAY/USDC",
|
||||
"address": "DZjbn4XC8qoHKikZqzmhemykVzmossoayV9ffbsUqxVj"
|
||||
},
|
||||
{
|
||||
"name": "DUAL/USDC",
|
||||
"address": "H6rrYK3SUHF2eguZCyJxnSBMJqjXhUtuaki6PHiutvum"
|
||||
}
|
||||
]
|
|
@ -1,12 +1,7 @@
|
|||
|
||||
|
||||
use deadpool_postgres::Object;
|
||||
|
||||
use openbook_candles::{
|
||||
database::{
|
||||
initialize::connect_to_database,
|
||||
insert::{build_candles_upsert_statement},
|
||||
},
|
||||
database::{initialize::connect_to_database, insert::build_candles_upsert_statement},
|
||||
structs::{
|
||||
candle::Candle,
|
||||
markets::{fetch_market_infos, load_markets},
|
||||
|
@ -18,10 +13,9 @@ use openbook_candles::{
|
|||
minute_candles::backfill_batch_1m_candles,
|
||||
},
|
||||
};
|
||||
use std::{env};
|
||||
use std::env;
|
||||
use strum::IntoEnumIterator;
|
||||
|
||||
|
||||
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
dotenv::dotenv().ok();
|
||||
|
|
|
@ -35,7 +35,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
|
||||
let mut target_markets = HashMap::new();
|
||||
for m in market_infos.clone() {
|
||||
target_markets.insert(Pubkey::from_str(&m.address)?, 0);
|
||||
target_markets.insert(Pubkey::from_str(&m.address)?, m.name);
|
||||
}
|
||||
println!("{:?}", target_markets);
|
||||
|
||||
|
@ -57,7 +57,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
pub async fn backfill(
|
||||
rpc_url: String,
|
||||
fill_sender: &Sender<OpenBookFillEvent>,
|
||||
target_markets: &HashMap<Pubkey, u8>,
|
||||
target_markets: &HashMap<Pubkey, String>,
|
||||
) -> anyhow::Result<()> {
|
||||
println!("backfill started");
|
||||
let mut before_sig: Option<Signature> = None;
|
||||
|
@ -145,7 +145,7 @@ pub async fn get_transactions(
|
|||
rpc_client: &RpcClient,
|
||||
mut sigs: Vec<RpcConfirmedTransactionStatusWithSignature>,
|
||||
fill_sender: &Sender<OpenBookFillEvent>,
|
||||
target_markets: &HashMap<Pubkey, u8>,
|
||||
target_markets: &HashMap<Pubkey, String>,
|
||||
) {
|
||||
sigs.retain(|sig| sig.err.is_none());
|
||||
if sigs.last().is_none() {
|
||||
|
|
|
@ -59,10 +59,7 @@ pub async fn fetch_fills_from(
|
|||
let rows = client
|
||||
.query(stmt, &[&market_address_string, &start_time, &end_time])
|
||||
.await?;
|
||||
Ok(rows
|
||||
.into_iter()
|
||||
.map(PgOpenBookFill::from_row)
|
||||
.collect())
|
||||
Ok(rows.into_iter().map(PgOpenBookFill::from_row).collect())
|
||||
}
|
||||
|
||||
pub async fn fetch_latest_finished_candle(
|
||||
|
|
|
@ -36,6 +36,7 @@ pub async fn connect_to_database() -> anyhow::Result<Pool> {
|
|||
MakeTlsConnector::new(
|
||||
TlsConnector::builder()
|
||||
.add_root_certificate(Certificate::from_pem(&ca_cert)?)
|
||||
// TODO: make this configurable
|
||||
.identity(Identity::from_pkcs12(&client_key, "pass")?)
|
||||
.danger_accept_invalid_certs(false)
|
||||
.build()?,
|
||||
|
@ -86,7 +87,7 @@ pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> {
|
|||
client
|
||||
.execute(
|
||||
"CREATE TABLE IF NOT EXISTS candles (
|
||||
id serial,
|
||||
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
||||
market_name text,
|
||||
start_time timestamptz,
|
||||
end_time timestamptz,
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
use deadpool_postgres::Pool;
|
||||
use log::debug;
|
||||
use std::{
|
||||
collections::{hash_map::DefaultHasher, HashMap},
|
||||
hash::{Hash, Hasher},
|
||||
|
@ -14,7 +15,6 @@ pub async fn persist_fill_events(
|
|||
pool: &Pool,
|
||||
fill_receiver: &mut Receiver<OpenBookFillEvent>,
|
||||
) -> anyhow::Result<()> {
|
||||
let client = pool.get().await?;
|
||||
loop {
|
||||
let mut write_batch = HashMap::new();
|
||||
while write_batch.len() < 10 {
|
||||
|
@ -36,61 +36,18 @@ pub async fn persist_fill_events(
|
|||
}
|
||||
|
||||
if !write_batch.is_empty() {
|
||||
// print!("writing: {:?} events to DB\n", write_batch.len());
|
||||
|
||||
// match conn.ping().await {
|
||||
// Ok(_) => {
|
||||
debug!("writing: {:?} events to DB\n", write_batch.len());
|
||||
let upsert_statement = build_fills_upsert_statement(write_batch);
|
||||
let client = pool.get().await?;
|
||||
client
|
||||
.execute(&upsert_statement, &[])
|
||||
.await
|
||||
.map_err_anyhow()
|
||||
.unwrap();
|
||||
// }
|
||||
// Err(_) => {
|
||||
// println!("Fills ping failed");
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn persist_candles(
|
||||
pool: Pool,
|
||||
candles_receiver: &mut Receiver<Vec<Candle>>,
|
||||
) -> anyhow::Result<()> {
|
||||
let client = pool.get().await.unwrap();
|
||||
loop {
|
||||
// match client.ping().await {
|
||||
// Ok(_) => {
|
||||
match candles_receiver.try_recv() {
|
||||
Ok(candles) => {
|
||||
if candles.is_empty() {
|
||||
continue;
|
||||
}
|
||||
// print!("writing: {:?} candles to DB\n", candles.len());
|
||||
let upsert_statement = build_candles_upsert_statement(candles);
|
||||
client
|
||||
.execute(&upsert_statement, &[])
|
||||
.await
|
||||
.map_err_anyhow()
|
||||
.unwrap();
|
||||
}
|
||||
Err(TryRecvError::Empty) => continue,
|
||||
Err(TryRecvError::Disconnected) => {
|
||||
panic!("Candles sender must stay alive")
|
||||
}
|
||||
};
|
||||
// }
|
||||
// Err(_) => {
|
||||
// println!("Candle ping failed");
|
||||
// break;
|
||||
// }
|
||||
// };
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
fn build_fills_upsert_statement(events: HashMap<OpenBookFillEvent, u8>) -> String {
|
||||
let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES");
|
||||
|
@ -126,7 +83,7 @@ fn build_fills_upsert_statement(events: HashMap<OpenBookFillEvent, u8>) -> Strin
|
|||
stmt
|
||||
}
|
||||
|
||||
pub fn build_candles_upsert_statement(candles: Vec<Candle>) -> String {
|
||||
pub fn build_candles_upsert_statement(candles: &Vec<Candle>) -> String {
|
||||
let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES");
|
||||
for (idx, candle) in candles.iter().enumerate() {
|
||||
let val_str = format!(
|
||||
|
|
|
@ -1,9 +1,13 @@
|
|||
use actix_web::{
|
||||
http::StatusCode,
|
||||
middleware::Logger,
|
||||
rt::System,
|
||||
web::{self, Data},
|
||||
App, HttpServer,
|
||||
};
|
||||
use actix_web_prom::PrometheusMetricsBuilder;
|
||||
use candles::get_candles;
|
||||
use prometheus::Registry;
|
||||
|
||||
use markets::get_markets;
|
||||
use openbook_candles::{
|
||||
|
@ -12,6 +16,7 @@ use openbook_candles::{
|
|||
utils::{Config, WebContext},
|
||||
};
|
||||
use std::env;
|
||||
use std::thread;
|
||||
use traders::{get_top_traders_by_base_volume, get_top_traders_by_quote_volume};
|
||||
|
||||
mod candles;
|
||||
|
@ -39,6 +44,22 @@ async fn main() -> std::io::Result<()> {
|
|||
let market_infos = fetch_market_infos(&config, markets).await.unwrap();
|
||||
let pool = connect_to_database().await.unwrap();
|
||||
|
||||
let registry = Registry::new();
|
||||
// For serving metrics on a private port
|
||||
let private_metrics = PrometheusMetricsBuilder::new("openbook_candles_server_private")
|
||||
.registry(registry.clone())
|
||||
.exclude("/metrics")
|
||||
.exclude_status(StatusCode::NOT_FOUND)
|
||||
.endpoint("/metrics")
|
||||
.build()
|
||||
.unwrap();
|
||||
// For collecting metrics on the public api, excluding 404s
|
||||
let public_metrics = PrometheusMetricsBuilder::new("openbook_candles_server")
|
||||
.registry(registry.clone())
|
||||
.exclude_status(StatusCode::NOT_FOUND)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let context = Data::new(WebContext {
|
||||
rpc_url,
|
||||
pool,
|
||||
|
@ -46,20 +67,40 @@ async fn main() -> std::io::Result<()> {
|
|||
});
|
||||
|
||||
println!("Starting server");
|
||||
HttpServer::new(move || {
|
||||
App::new()
|
||||
.wrap(Logger::default())
|
||||
.app_data(context.clone())
|
||||
.service(
|
||||
web::scope("/api")
|
||||
.service(get_candles)
|
||||
.service(get_top_traders_by_base_volume)
|
||||
.service(get_top_traders_by_quote_volume)
|
||||
.service(get_markets)
|
||||
.service(coingecko::service()),
|
||||
)
|
||||
})
|
||||
.bind(&bind_addr)?
|
||||
.run()
|
||||
.await
|
||||
// Thread to serve public API
|
||||
let public_server = thread::spawn(move || {
|
||||
let sys = System::new();
|
||||
let srv = HttpServer::new(move || {
|
||||
App::new()
|
||||
.wrap(Logger::default())
|
||||
.wrap(public_metrics.clone())
|
||||
.app_data(context.clone())
|
||||
.service(
|
||||
web::scope("/api")
|
||||
.service(get_candles)
|
||||
.service(get_top_traders_by_base_volume)
|
||||
.service(get_top_traders_by_quote_volume)
|
||||
.service(get_markets)
|
||||
.service(coingecko::service()),
|
||||
)
|
||||
})
|
||||
.bind(&bind_addr)
|
||||
.unwrap()
|
||||
.run();
|
||||
sys.block_on(srv).unwrap();
|
||||
});
|
||||
|
||||
// Thread to serve metrics endpoint privately
|
||||
let private_server = thread::spawn(move || {
|
||||
let sys = System::new();
|
||||
let srv = HttpServer::new(move || App::new().wrap(private_metrics.clone()))
|
||||
.bind("0.0.0.0:9091")
|
||||
.unwrap()
|
||||
.run();
|
||||
sys.block_on(srv).unwrap();
|
||||
});
|
||||
|
||||
private_server.join().unwrap();
|
||||
public_server.join().unwrap();
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ use tokio_postgres::Row;
|
|||
|
||||
use super::resolution::Resolution;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct Candle {
|
||||
pub market_name: String,
|
||||
pub start_time: DateTime<Utc>,
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use chrono::{DateTime, Duration, DurationRound, Utc};
|
||||
use deadpool_postgres::Pool;
|
||||
use log::debug;
|
||||
use std::cmp::max;
|
||||
|
||||
use crate::{
|
||||
|
@ -46,12 +47,12 @@ pub async fn batch_higher_order_candles(
|
|||
fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution())
|
||||
.await?;
|
||||
if constituent_candles.is_empty() {
|
||||
// println!(
|
||||
// "Batching {}, but no candles found for: {:?}, {}",
|
||||
// resolution,
|
||||
// market_name,
|
||||
// resolution.get_constituent_resolution()
|
||||
// );
|
||||
debug!(
|
||||
"Batching {}, but no candles found for: {:?}, {}",
|
||||
resolution,
|
||||
market_name,
|
||||
resolution.get_constituent_resolution()
|
||||
);
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
|
||||
|
@ -82,7 +83,7 @@ fn combine_into_higher_order_candles(
|
|||
st: DateTime<Utc>,
|
||||
seed_candle: Candle,
|
||||
) -> Vec<Candle> {
|
||||
// println!("target_resolution: {}", target_resolution);
|
||||
debug!("combining for target_resolution: {}", target_resolution);
|
||||
|
||||
let duration = target_resolution.get_duration();
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ use std::cmp::min;
|
|||
|
||||
use chrono::{DateTime, Duration, DurationRound, Utc};
|
||||
use deadpool_postgres::Pool;
|
||||
use log::debug;
|
||||
|
||||
use crate::{
|
||||
database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
|
||||
|
@ -24,9 +25,10 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
|
|||
let start_time = candle.end_time;
|
||||
let end_time = min(
|
||||
start_time + day(),
|
||||
Utc::now().duration_trunc(Duration::minutes(1))?,
|
||||
(Utc::now() + Duration::minutes(1)).duration_trunc(Duration::minutes(1))?,
|
||||
);
|
||||
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
|
||||
|
||||
let candles = combine_fills_into_1m_candles(
|
||||
&mut fills,
|
||||
market,
|
||||
|
@ -40,7 +42,7 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
|
|||
let earliest_fill = fetch_earliest_fill(pool, market_address).await?;
|
||||
|
||||
if earliest_fill.is_none() {
|
||||
println!("No fills found for: {:?}", market_name);
|
||||
debug!("No fills found for: {:?}", market_name);
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
|
@ -98,7 +100,6 @@ fn combine_fills_into_1m_candles(
|
|||
|
||||
while matches!(fills_iter.peek(), Some(f) if f.time < end_time) {
|
||||
let fill = fills_iter.next().unwrap();
|
||||
|
||||
let (price, volume) =
|
||||
calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals);
|
||||
|
||||
|
@ -112,8 +113,8 @@ fn combine_fills_into_1m_candles(
|
|||
|
||||
candles[i].start_time = start_time;
|
||||
candles[i].end_time = end_time;
|
||||
candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time);
|
||||
|
||||
candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time)
|
||||
|| end_time < Utc::now() - Duration::minutes(10);
|
||||
start_time = end_time;
|
||||
end_time += Duration::minutes(1);
|
||||
}
|
||||
|
@ -132,7 +133,7 @@ pub async fn backfill_batch_1m_candles(
|
|||
|
||||
let earliest_fill = fetch_earliest_fill(pool, &market.address).await?;
|
||||
if earliest_fill.is_none() {
|
||||
println!("No fills found for: {:?}", &market_name);
|
||||
debug!("No fills found for: {:?}", &market_name);
|
||||
return Ok(candles);
|
||||
}
|
||||
|
||||
|
|
|
@ -3,31 +3,29 @@ pub mod minute_candles;
|
|||
|
||||
use chrono::Duration;
|
||||
use deadpool_postgres::Pool;
|
||||
use log::{error, warn};
|
||||
use strum::IntoEnumIterator;
|
||||
use tokio::{sync::mpsc::Sender, time::sleep};
|
||||
use tokio::time::sleep;
|
||||
|
||||
use crate::{
|
||||
database::insert::build_candles_upsert_statement,
|
||||
structs::{candle::Candle, markets::MarketInfo, resolution::Resolution},
|
||||
utils::AnyhowWrap,
|
||||
worker::candle_batching::minute_candles::batch_1m_candles,
|
||||
};
|
||||
|
||||
use self::higher_order_candles::batch_higher_order_candles;
|
||||
|
||||
pub async fn batch_for_market(
|
||||
pool: &Pool,
|
||||
candles_sender: &Sender<Vec<Candle>>,
|
||||
market: &MarketInfo,
|
||||
) -> anyhow::Result<()> {
|
||||
pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
|
||||
loop {
|
||||
let sender = candles_sender.clone();
|
||||
let market_clone = market.clone();
|
||||
// let client = pool.get().await?;
|
||||
|
||||
loop {
|
||||
sleep(Duration::milliseconds(2000).to_std()?).await;
|
||||
match batch_inner(pool, &sender, &market_clone).await {
|
||||
match batch_inner(pool, &market_clone).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
println!(
|
||||
error!(
|
||||
"Batching thread failed for {:?} with error: {:?}",
|
||||
market_clone.name.clone(),
|
||||
e
|
||||
|
@ -36,33 +34,33 @@ pub async fn batch_for_market(
|
|||
}
|
||||
};
|
||||
}
|
||||
println!("Restarting {:?} batching thread", market.name);
|
||||
warn!("Restarting {:?} batching thread", market.name);
|
||||
}
|
||||
}
|
||||
|
||||
async fn batch_inner(
|
||||
pool: &Pool,
|
||||
candles_sender: &Sender<Vec<Candle>>,
|
||||
market: &MarketInfo,
|
||||
) -> anyhow::Result<()> {
|
||||
async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
|
||||
let market_name = &market.name.clone();
|
||||
let candles = batch_1m_candles(pool, market).await?;
|
||||
send_candles(candles, candles_sender).await;
|
||||
|
||||
save_candles(pool, candles).await?;
|
||||
for resolution in Resolution::iter() {
|
||||
if resolution == Resolution::R1m {
|
||||
continue;
|
||||
}
|
||||
let candles = batch_higher_order_candles(pool, market_name, resolution).await?;
|
||||
send_candles(candles, candles_sender).await;
|
||||
save_candles(pool, candles).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_candles(candles: Vec<Candle>, candles_sender: &Sender<Vec<Candle>>) {
|
||||
if !candles.is_empty() {
|
||||
if let Err(_) = candles_sender.send(candles).await {
|
||||
panic!("candles receiver dropped");
|
||||
}
|
||||
async fn save_candles(pool: &Pool, candles: Vec<Candle>) -> anyhow::Result<()> {
|
||||
if candles.len() == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
let upsert_statement = build_candles_upsert_statement(&candles);
|
||||
let client = pool.get().await.unwrap();
|
||||
client
|
||||
.execute(&upsert_statement, &[])
|
||||
.await
|
||||
.map_err_anyhow()?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -1,23 +1,26 @@
|
|||
|
||||
use openbook_candles::structs::candle::Candle;
|
||||
use log::{error, info};
|
||||
use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
|
||||
use openbook_candles::structs::openbook::OpenBookFillEvent;
|
||||
use openbook_candles::utils::Config;
|
||||
use openbook_candles::worker::metrics::{
|
||||
serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE, METRIC_FILLS_QUEUE_LENGTH,
|
||||
};
|
||||
use openbook_candles::worker::trade_fetching::scrape::scrape;
|
||||
use openbook_candles::{
|
||||
database::{
|
||||
initialize::{connect_to_database, setup_database},
|
||||
insert::{persist_candles, persist_fill_events},
|
||||
insert::{persist_fill_events},
|
||||
},
|
||||
worker::candle_batching::batch_for_market,
|
||||
};
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use std::env;
|
||||
use std::{collections::HashMap, str::FromStr};
|
||||
use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
env_logger::init();
|
||||
dotenv::dotenv().ok();
|
||||
|
||||
let args: Vec<String> = env::args().collect();
|
||||
|
@ -29,22 +32,24 @@ async fn main() -> anyhow::Result<()> {
|
|||
rpc_url: rpc_url.clone(),
|
||||
};
|
||||
|
||||
let fills_queue_max_size = 10000;
|
||||
|
||||
let markets = load_markets(path_to_markets_json);
|
||||
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
|
||||
let mut target_markets = HashMap::new();
|
||||
for m in market_infos.clone() {
|
||||
target_markets.insert(Pubkey::from_str(&m.address)?, 0);
|
||||
target_markets.insert(Pubkey::from_str(&m.address)?, m.name);
|
||||
}
|
||||
println!("{:?}", target_markets);
|
||||
info!("{:?}", target_markets);
|
||||
|
||||
let pool = connect_to_database().await?;
|
||||
setup_database(&pool).await?;
|
||||
let mut handles = vec![];
|
||||
|
||||
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(1000);
|
||||
|
||||
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(fills_queue_max_size);
|
||||
let scrape_fill_sender = fill_sender.clone();
|
||||
handles.push(tokio::spawn(async move {
|
||||
scrape(&config, &fill_sender, &target_markets).await;
|
||||
scrape(&config, &scrape_fill_sender, &target_markets).await;
|
||||
}));
|
||||
|
||||
let fills_pool = pool.clone();
|
||||
|
@ -56,28 +61,35 @@ async fn main() -> anyhow::Result<()> {
|
|||
}
|
||||
}));
|
||||
|
||||
let (candle_sender, mut candle_receiver) = mpsc::channel::<Vec<Candle>>(1000);
|
||||
|
||||
for market in market_infos.into_iter() {
|
||||
let sender = candle_sender.clone();
|
||||
let batch_pool = pool.clone();
|
||||
handles.push(tokio::spawn(async move {
|
||||
batch_for_market(&batch_pool, &sender, &market)
|
||||
.await
|
||||
.unwrap();
|
||||
println!("SOMETHING WENT WRONG");
|
||||
batch_for_market(&batch_pool, &market).await.unwrap();
|
||||
error!("batching halted for market {}", &market.name);
|
||||
}));
|
||||
}
|
||||
|
||||
let persist_pool = pool.clone();
|
||||
let monitor_pool = pool.clone();
|
||||
let monitor_fill_channel = fill_sender.clone();
|
||||
handles.push(tokio::spawn(async move {
|
||||
// TODO: maybe break this out into a new function
|
||||
loop {
|
||||
persist_candles(persist_pool.clone(), &mut candle_receiver)
|
||||
.await
|
||||
.unwrap();
|
||||
let pool_status = monitor_pool.status();
|
||||
METRIC_DB_POOL_AVAILABLE.set(pool_status.available as i64);
|
||||
METRIC_DB_POOL_SIZE.set(pool_status.size as i64);
|
||||
|
||||
METRIC_FILLS_QUEUE_LENGTH
|
||||
.set((fills_queue_max_size - monitor_fill_channel.capacity()) as i64);
|
||||
|
||||
tokio::time::sleep(WaitDuration::from_secs(10)).await;
|
||||
}
|
||||
}));
|
||||
|
||||
handles.push(tokio::spawn(async move {
|
||||
// TODO: this is ugly af
|
||||
serve_metrics().await.unwrap().await.unwrap();
|
||||
}));
|
||||
|
||||
futures::future::join_all(handles).await;
|
||||
|
||||
Ok(())
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
use actix_web::{dev::Server, http::StatusCode, App, HttpServer};
|
||||
use actix_web_prom::PrometheusMetricsBuilder;
|
||||
use lazy_static::lazy_static;
|
||||
use prometheus::{
|
||||
register_int_counter_vec_with_registry, register_int_gauge_with_registry, IntCounterVec,
|
||||
IntGauge, Registry,
|
||||
};
|
||||
|
||||
lazy_static! {
|
||||
static ref METRIC_REGISTRY: Registry =
|
||||
Registry::new_custom(Some("openbook_candles_worker".to_string()), None).unwrap();
|
||||
pub static ref METRIC_TXS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
|
||||
"txs_total",
|
||||
"Total number of transactions scraped",
|
||||
&["market", "status"],
|
||||
METRIC_REGISTRY
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FILLS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
|
||||
"fills_total",
|
||||
"Total number of fills parsed",
|
||||
&["market"],
|
||||
METRIC_REGISTRY
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_CANDLES_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
|
||||
"candles_total",
|
||||
"Total number of candles generated",
|
||||
&["market"],
|
||||
METRIC_REGISTRY
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FILLS_QUEUE_LENGTH: IntGauge = register_int_gauge_with_registry!(
|
||||
"fills_queue_length",
|
||||
"Current length of the fills write queue",
|
||||
METRIC_REGISTRY
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_RPC_ERRORS_TOTAL: IntCounterVec =
|
||||
register_int_counter_vec_with_registry!(
|
||||
"rpc_errors_total",
|
||||
"RPC errors while scraping",
|
||||
&["method"],
|
||||
METRIC_REGISTRY
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_DB_POOL_SIZE: IntGauge = register_int_gauge_with_registry!(
|
||||
"db_pool_size",
|
||||
"Current size of the DB connection pool",
|
||||
METRIC_REGISTRY
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_DB_POOL_AVAILABLE: IntGauge = register_int_gauge_with_registry!(
|
||||
"db_pool_available",
|
||||
"Available DB connections in the pool",
|
||||
METRIC_REGISTRY
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
pub async fn serve_metrics() -> anyhow::Result<Server> {
|
||||
let metrics = PrometheusMetricsBuilder::new("openbook_candles_worker")
|
||||
.registry(METRIC_REGISTRY.clone())
|
||||
.exclude("/metrics")
|
||||
.exclude_status(StatusCode::NOT_FOUND)
|
||||
.endpoint("/metrics")
|
||||
.build()
|
||||
.unwrap();
|
||||
let server = HttpServer::new(move || App::new().wrap(metrics.clone()))
|
||||
.bind("0.0.0.0:9091")
|
||||
.unwrap()
|
||||
.disable_signals()
|
||||
.run();
|
||||
|
||||
Ok(server)
|
||||
}
|
|
@ -1,2 +1,3 @@
|
|||
pub mod candle_batching;
|
||||
pub mod metrics;
|
||||
pub mod trade_fetching;
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
use log::warn;
|
||||
use solana_client::client_error::Result as ClientResult;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_transaction_status::{
|
||||
|
@ -5,13 +6,16 @@ use solana_transaction_status::{
|
|||
};
|
||||
use std::{collections::HashMap, io::Error};
|
||||
|
||||
use crate::structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw};
|
||||
use crate::{
|
||||
structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw},
|
||||
worker::metrics::METRIC_RPC_ERRORS_TOTAL,
|
||||
};
|
||||
|
||||
const PROGRAM_DATA: &str = "Program data: ";
|
||||
|
||||
pub fn parse_trades_from_openbook_txns(
|
||||
txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>,
|
||||
target_markets: &HashMap<Pubkey, u8>,
|
||||
target_markets: &HashMap<Pubkey, String>,
|
||||
) -> Vec<OpenBookFillEvent> {
|
||||
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
|
||||
for txn in txns.iter_mut() {
|
||||
|
@ -34,7 +38,12 @@ pub fn parse_trades_from_openbook_txns(
|
|||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {}
|
||||
Err(e) => {
|
||||
warn!("rpc error in get_transaction {}", e);
|
||||
METRIC_RPC_ERRORS_TOTAL
|
||||
.with_label_values(&["getTransaction"])
|
||||
.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
fills_vector
|
||||
|
@ -42,7 +51,7 @@ pub fn parse_trades_from_openbook_txns(
|
|||
|
||||
fn parse_openbook_fills_from_logs(
|
||||
logs: &Vec<String>,
|
||||
target_markets: &HashMap<Pubkey, u8>,
|
||||
target_markets: &HashMap<Pubkey, String>,
|
||||
block_time: i64,
|
||||
) -> Option<Vec<OpenBookFillEvent>> {
|
||||
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
use futures::future::join_all;
|
||||
use log::{debug, warn};
|
||||
use solana_client::{
|
||||
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
|
||||
rpc_config::RpcTransactionConfig,
|
||||
|
@ -8,14 +9,18 @@ use solana_transaction_status::UiTransactionEncoding;
|
|||
use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
|
||||
use crate::{structs::openbook::OpenBookFillEvent, utils::Config};
|
||||
use crate::{
|
||||
structs::openbook::OpenBookFillEvent,
|
||||
utils::Config,
|
||||
worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL},
|
||||
};
|
||||
|
||||
use super::parsing::parse_trades_from_openbook_txns;
|
||||
|
||||
pub async fn scrape(
|
||||
config: &Config,
|
||||
fill_sender: &Sender<OpenBookFillEvent>,
|
||||
target_markets: &HashMap<Pubkey, u8>,
|
||||
target_markets: &HashMap<Pubkey, String>,
|
||||
) {
|
||||
let rpc_client =
|
||||
RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::processed());
|
||||
|
@ -39,7 +44,7 @@ pub async fn scrape_transactions(
|
|||
before_sig: Option<Signature>,
|
||||
limit: Option<usize>,
|
||||
fill_sender: &Sender<OpenBookFillEvent>,
|
||||
target_markets: &HashMap<Pubkey, u8>,
|
||||
target_markets: &HashMap<Pubkey, String>,
|
||||
) -> Option<Signature> {
|
||||
let rpc_config = GetConfirmedSignaturesForAddress2Config {
|
||||
before: before_sig,
|
||||
|
@ -57,13 +62,16 @@ pub async fn scrape_transactions(
|
|||
{
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
println!("Error in get_signatures_for_address_with_config: {}", e);
|
||||
warn!("rpc error in get_signatures_for_address_with_config: {}", e);
|
||||
METRIC_RPC_ERRORS_TOTAL
|
||||
.with_label_values(&["getSignaturesForAddress"])
|
||||
.inc();
|
||||
return before_sig;
|
||||
}
|
||||
};
|
||||
|
||||
if sigs.is_empty() {
|
||||
println!("No signatures found");
|
||||
debug!("No signatures found");
|
||||
return before_sig;
|
||||
}
|
||||
|
||||
|
@ -96,9 +104,11 @@ pub async fn scrape_transactions(
|
|||
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
|
||||
if !fills.is_empty() {
|
||||
for fill in fills.into_iter() {
|
||||
let market_name = target_markets.get(&fill.market).unwrap();
|
||||
if let Err(_) = fill_sender.send(fill).await {
|
||||
panic!("receiver dropped");
|
||||
}
|
||||
METRIC_FILLS_TOTAL.with_label_values(&[market_name]).inc();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue