Compare commits
24 Commits
93d95005f5
...
01b22cec38
Author | SHA1 | Date |
---|---|---|
Riordan Panayides | 01b22cec38 | |
Riordan Panayides | 762cc4e770 | |
riordanp | 0d6185a58a | |
Riordan Panayides | a01fec5eff | |
Riordan Panayides | 3ad2335300 | |
dboures | 67b3583ba6 | |
dboures | dc1726af43 | |
dboures | fd4eebc034 | |
dboures | 83b2513164 | |
Riordan Panayides | a5570b816e | |
dboures | 493ced0b00 | |
dboures | e0d677c241 | |
dboures | bf653c9672 | |
Riordan Panayides | 89e2fa7178 | |
dboures | df461e8451 | |
Riordan Panayides | 4d81f6c7f6 | |
Riordan Panayides | 95f923f7c7 | |
Riordan Panayides | c055d8b992 | |
Riordan Panayides | 3a515d61a0 | |
Riordan Panayides | 35937c9572 | |
Riordan Panayides | 9489bd3e78 | |
Riordan Panayides | ac2e906688 | |
Riordan Panayides | 522e7a5865 | |
Riordan Panayides | 0591f5b3ab |
|
@ -191,6 +191,18 @@ dependencies = [
|
||||||
"syn 1.0.109",
|
"syn 1.0.109",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "actix-web-prom"
|
||||||
|
version = "0.6.0"
|
||||||
|
source = "git+https://github.com/riordanp/actix-web-prom.git?branch=exclude-paths#614434270cbcfdffa2b3a854aff4c1e49c4973fd"
|
||||||
|
dependencies = [
|
||||||
|
"actix-web",
|
||||||
|
"futures-core",
|
||||||
|
"pin-project-lite",
|
||||||
|
"prometheus",
|
||||||
|
"regex",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "adler"
|
name = "adler"
|
||||||
version = "1.0.2"
|
version = "1.0.2"
|
||||||
|
@ -3423,6 +3435,7 @@ name = "openbook-candles"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"actix-web",
|
"actix-web",
|
||||||
|
"actix-web-prom",
|
||||||
"anchor-client",
|
"anchor-client",
|
||||||
"anchor-lang",
|
"anchor-lang",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
@ -3438,11 +3451,13 @@ dependencies = [
|
||||||
"env_logger 0.10.0",
|
"env_logger 0.10.0",
|
||||||
"futures 0.3.27",
|
"futures 0.3.27",
|
||||||
"jsonrpc-core-client",
|
"jsonrpc-core-client",
|
||||||
|
"lazy_static",
|
||||||
"log 0.4.17",
|
"log 0.4.17",
|
||||||
"native-tls",
|
"native-tls",
|
||||||
"num-traits",
|
"num-traits",
|
||||||
"num_enum 0.6.1",
|
"num_enum 0.6.1",
|
||||||
"postgres-native-tls",
|
"postgres-native-tls",
|
||||||
|
"prometheus",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
@ -3966,6 +3981,21 @@ dependencies = [
|
||||||
"yansi",
|
"yansi",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "prometheus"
|
||||||
|
version = "0.13.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if 1.0.0",
|
||||||
|
"fnv",
|
||||||
|
"lazy_static",
|
||||||
|
"memchr",
|
||||||
|
"parking_lot 0.12.1",
|
||||||
|
"protobuf",
|
||||||
|
"thiserror",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "prost"
|
name = "prost"
|
||||||
version = "0.11.6"
|
version = "0.11.6"
|
||||||
|
@ -4021,6 +4051,12 @@ dependencies = [
|
||||||
"prost",
|
"prost",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "protobuf"
|
||||||
|
version = "2.28.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "protobuf-src"
|
name = "protobuf-src"
|
||||||
version = "1.1.0+21.5"
|
version = "1.1.0+21.5"
|
||||||
|
|
|
@ -62,9 +62,12 @@ serum_dex = { version = "0.5.10", git = "https://github.com/openbook-dex/program
|
||||||
anchor-lang = ">=0.25.0"
|
anchor-lang = ">=0.25.0"
|
||||||
|
|
||||||
actix-web = "4"
|
actix-web = "4"
|
||||||
|
actix-web-prom = { version = "0.6.0", git = "https://github.com/riordanp/actix-web-prom.git", branch = "exclude-paths" }
|
||||||
|
|
||||||
arrayref = "0.3.6"
|
arrayref = "0.3.6"
|
||||||
bytemuck = "1.12.3"
|
bytemuck = "1.12.3"
|
||||||
num_enum = "0.6.1"
|
num_enum = "0.6.1"
|
||||||
|
|
||||||
config = "0.13.1"
|
config = "0.13.1"
|
||||||
|
prometheus = "0.13.3"
|
||||||
|
lazy_static = "1.4.0"
|
||||||
|
|
|
@ -17,3 +17,7 @@ kill_timeout = 30
|
||||||
hard_limit = 1024
|
hard_limit = 1024
|
||||||
soft_limit = 1024
|
soft_limit = 1024
|
||||||
type = "connections"
|
type = "connections"
|
||||||
|
|
||||||
|
[metrics]
|
||||||
|
port = 9091
|
||||||
|
path = "/metrics"
|
||||||
|
|
|
@ -7,3 +7,7 @@ kill_timeout = 30
|
||||||
|
|
||||||
[experimental]
|
[experimental]
|
||||||
cmd = ["worker", "markets.json"]
|
cmd = ["worker", "markets.json"]
|
||||||
|
|
||||||
|
[metrics]
|
||||||
|
port = 9091
|
||||||
|
path = "/metrics"
|
34
markets.json
34
markets.json
|
@ -4,47 +4,35 @@
|
||||||
"address" : "8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6"
|
"address" : "8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name" : "RLB/USDC",
|
"name" : "wBTCpo/USDC",
|
||||||
"address" : "72h8rWaWwfPUL36PAFqyQZU8RT1V3FKG7Nc45aK89xTs"
|
"address" : "3BAKsQd3RuhZKES2DGysMhjBdwjZYKYmxRqnSMtZ4KSN"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name" : "MNGO/USDC",
|
"name" : "MNGO/USDC",
|
||||||
"address" : "3NnxQvDcZXputNMxaxsGvqiKpqgPfSYXpNigZNFcknmD"
|
"address" : "3NnxQvDcZXputNMxaxsGvqiKpqgPfSYXpNigZNFcknmD"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name" : "BONK/SOL",
|
"name": "BONK/SOL",
|
||||||
"address" : "Hs97TCZeuYiJxooo3U73qEHXg3dKpRL4uYKYRryEK9CF"
|
"address": "Hs97TCZeuYiJxooo3U73qEHXg3dKpRL4uYKYRryEK9CF"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name" : "BONK/USDC",
|
"name": "DUAL/USDC",
|
||||||
"address" : "8PhnCfgqpgFM7ZJvttGdBVMXHuU4Q23ACxCvWkbs1M71"
|
"address": "H6rrYK3SUHF2eguZCyJxnSBMJqjXhUtuaki6PHiutvum"
|
||||||
},
|
|
||||||
{
|
|
||||||
"name" : "WBTC/USDC",
|
|
||||||
"address" : "3BAKsQd3RuhZKES2DGysMhjBdwjZYKYmxRqnSMtZ4KSN"
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name": "mSOL/USDC",
|
"name": "mSOL/USDC",
|
||||||
"address": "9Lyhks5bQQxb9EyyX55NtgKQzpM4WK7JCmeaWuQ5MoXD"
|
"address": "9Lyhks5bQQxb9EyyX55NtgKQzpM4WK7JCmeaWuQ5MoXD"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name": "SOL/USDT",
|
"name": "ETHpo/USDC",
|
||||||
"address": "2AdaV97p6SfkuMQJdu8DHhBhmJe7oWdvbm52MJfYQmfA"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "USDT/USDC",
|
|
||||||
"address": "B2na8Awyd7cpC59iEU43FagJAPLigr3AP3s38KM982bu"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "ETH/USDC",
|
|
||||||
"address": "BbJgE7HZMaDp5NTYvRh5jZSkQPVDTU8ubPFtpogUkEj4"
|
"address": "BbJgE7HZMaDp5NTYvRh5jZSkQPVDTU8ubPFtpogUkEj4"
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"name": "BONK/USDC",
|
||||||
|
"address": "8PhnCfgqpgFM7ZJvttGdBVMXHuU4Q23ACxCvWkbs1M71"
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"name": "RAY/USDC",
|
"name": "RAY/USDC",
|
||||||
"address": "DZjbn4XC8qoHKikZqzmhemykVzmossoayV9ffbsUqxVj"
|
"address": "DZjbn4XC8qoHKikZqzmhemykVzmossoayV9ffbsUqxVj"
|
||||||
},
|
|
||||||
{
|
|
||||||
"name": "DUAL/USDC",
|
|
||||||
"address": "H6rrYK3SUHF2eguZCyJxnSBMJqjXhUtuaki6PHiutvum"
|
|
||||||
}
|
}
|
||||||
]
|
]
|
|
@ -1,12 +1,7 @@
|
||||||
|
|
||||||
|
|
||||||
use deadpool_postgres::Object;
|
use deadpool_postgres::Object;
|
||||||
|
|
||||||
use openbook_candles::{
|
use openbook_candles::{
|
||||||
database::{
|
database::{initialize::connect_to_database, insert::build_candles_upsert_statement},
|
||||||
initialize::connect_to_database,
|
|
||||||
insert::{build_candles_upsert_statement},
|
|
||||||
},
|
|
||||||
structs::{
|
structs::{
|
||||||
candle::Candle,
|
candle::Candle,
|
||||||
markets::{fetch_market_infos, load_markets},
|
markets::{fetch_market_infos, load_markets},
|
||||||
|
@ -18,10 +13,9 @@ use openbook_candles::{
|
||||||
minute_candles::backfill_batch_1m_candles,
|
minute_candles::backfill_batch_1m_candles,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use std::{env};
|
use std::env;
|
||||||
use strum::IntoEnumIterator;
|
use strum::IntoEnumIterator;
|
||||||
|
|
||||||
|
|
||||||
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
dotenv::dotenv().ok();
|
dotenv::dotenv().ok();
|
||||||
|
@ -59,7 +53,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
async fn save_candles(candles: Vec<Candle>, client: Object) -> anyhow::Result<()> {
|
async fn save_candles(candles: Vec<Candle>, client: Object) -> anyhow::Result<()> {
|
||||||
if !candles.is_empty() {
|
if !candles.is_empty() {
|
||||||
let upsert_statement = build_candles_upsert_statement(candles);
|
let upsert_statement = build_candles_upsert_statement(&candles);
|
||||||
client
|
client
|
||||||
.execute(&upsert_statement, &[])
|
.execute(&upsert_statement, &[])
|
||||||
.await
|
.await
|
||||||
|
|
|
@ -1,23 +1,24 @@
|
||||||
use anchor_lang::prelude::Pubkey;
|
use anchor_lang::prelude::Pubkey;
|
||||||
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
|
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
|
||||||
use futures::future::join_all;
|
use deadpool_postgres::Pool;
|
||||||
|
use log::debug;
|
||||||
use openbook_candles::{
|
use openbook_candles::{
|
||||||
database::{initialize::connect_to_database, insert::persist_fill_events},
|
database::{
|
||||||
|
initialize::{connect_to_database, setup_database},
|
||||||
|
insert::build_transactions_insert_statement,
|
||||||
|
},
|
||||||
structs::{
|
structs::{
|
||||||
markets::{fetch_market_infos, load_markets},
|
markets::{fetch_market_infos, load_markets},
|
||||||
openbook::OpenBookFillEvent,
|
transaction::{PgTransaction, NUM_TRANSACTION_PARTITIONS},
|
||||||
},
|
},
|
||||||
utils::Config,
|
utils::{AnyhowWrap, Config, OPENBOOK_KEY},
|
||||||
worker::trade_fetching::parsing::parse_trades_from_openbook_txns,
|
worker::trade_fetching::scrape::scrape_fills,
|
||||||
};
|
};
|
||||||
use solana_client::{
|
use solana_client::{
|
||||||
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
|
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
|
||||||
rpc_config::RpcTransactionConfig, rpc_response::RpcConfirmedTransactionStatusWithSignature,
|
|
||||||
};
|
};
|
||||||
use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature};
|
use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature};
|
||||||
use solana_transaction_status::UiTransactionEncoding;
|
|
||||||
use std::{collections::HashMap, env, str::FromStr};
|
use std::{collections::HashMap, env, str::FromStr};
|
||||||
use tokio::sync::mpsc::{self, Sender};
|
|
||||||
|
|
||||||
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
@ -26,6 +27,8 @@ async fn main() -> anyhow::Result<()> {
|
||||||
assert!(args.len() == 2);
|
assert!(args.len() == 2);
|
||||||
|
|
||||||
let path_to_markets_json = &args[1];
|
let path_to_markets_json = &args[1];
|
||||||
|
// let num_days = args[2].parse::<i64>().unwrap(); // TODO: implement
|
||||||
|
let num_days = 1;
|
||||||
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
|
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
|
||||||
|
|
||||||
let config = Config {
|
let config = Config {
|
||||||
|
@ -35,150 +38,98 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
|
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
|
||||||
let mut target_markets = HashMap::new();
|
let mut target_markets = HashMap::new();
|
||||||
for m in market_infos.clone() {
|
for m in market_infos.clone() {
|
||||||
target_markets.insert(Pubkey::from_str(&m.address)?, 0);
|
target_markets.insert(Pubkey::from_str(&m.address)?, m.name);
|
||||||
}
|
}
|
||||||
println!("{:?}", target_markets);
|
println!("{:?}", target_markets);
|
||||||
|
|
||||||
let pool = connect_to_database().await?;
|
let pool = connect_to_database().await?;
|
||||||
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(1000);
|
setup_database(&pool).await?;
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
loop {
|
|
||||||
persist_fill_events(&pool, &mut fill_receiver)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
backfill(rpc_url, &fill_sender, &target_markets).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn backfill(
|
|
||||||
rpc_url: String,
|
|
||||||
fill_sender: &Sender<OpenBookFillEvent>,
|
|
||||||
target_markets: &HashMap<Pubkey, u8>,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
println!("backfill started");
|
|
||||||
let mut before_sig: Option<Signature> = None;
|
|
||||||
let mut now_time = Utc::now().timestamp();
|
|
||||||
let end_time = (Utc::now() - Duration::days(1)).timestamp();
|
|
||||||
|
|
||||||
let mut handles = vec![];
|
let mut handles = vec![];
|
||||||
|
|
||||||
while now_time > end_time {
|
let rpc_clone = rpc_url.clone();
|
||||||
let rpc_client =
|
let pool_clone = pool.clone();
|
||||||
RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
|
handles.push(tokio::spawn(async move {
|
||||||
let maybe_r = get_signatures(&rpc_client, before_sig).await;
|
fetch_signatures(rpc_clone, &pool_clone, num_days)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}));
|
||||||
|
|
||||||
match maybe_r {
|
// Low priority improvement: batch fills into 1000's per worker
|
||||||
Some((last, time, sigs)) => {
|
for id in 0..NUM_TRANSACTION_PARTITIONS {
|
||||||
now_time = time;
|
let rpc_clone = rpc_url.clone();
|
||||||
before_sig = Some(last);
|
let pool_clone = pool.clone();
|
||||||
let time_left = backfill_time_left(now_time, end_time);
|
let markets_clone = target_markets.clone();
|
||||||
println!(
|
handles.push(tokio::spawn(async move {
|
||||||
"{} minutes ~ {} days remaining in the backfill\n",
|
scrape_fills(id as i32, rpc_clone, &pool_clone, &markets_clone)
|
||||||
time_left.num_minutes(),
|
.await
|
||||||
time_left.num_days()
|
.unwrap();
|
||||||
);
|
}));
|
||||||
|
|
||||||
let cloned_markets = target_markets.clone();
|
|
||||||
let cloned_sender = fill_sender.clone();
|
|
||||||
let handle = tokio::spawn(async move {
|
|
||||||
get_transactions(&rpc_client, sigs, &cloned_sender, &cloned_markets).await;
|
|
||||||
});
|
|
||||||
handles.push(handle);
|
|
||||||
}
|
|
||||||
None => {}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
futures::future::join_all(handles).await;
|
// TODO: spawn status thread
|
||||||
|
|
||||||
println!("Backfill complete \n");
|
futures::future::join_all(handles).await;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_signatures(
|
pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> anyhow::Result<()> {
|
||||||
rpc_client: &RpcClient,
|
let mut before_sig: Option<Signature> = None;
|
||||||
before_sig: Option<Signature>,
|
let mut now_time = Utc::now().timestamp();
|
||||||
) -> Option<(
|
let end_time = (Utc::now() - Duration::days(num_days)).timestamp();
|
||||||
Signature,
|
let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
|
||||||
i64,
|
|
||||||
Vec<RpcConfirmedTransactionStatusWithSignature>,
|
|
||||||
)> {
|
|
||||||
let rpc_config = GetConfirmedSignaturesForAddress2Config {
|
|
||||||
before: before_sig,
|
|
||||||
until: None,
|
|
||||||
limit: None,
|
|
||||||
commitment: Some(CommitmentConfig::confirmed()),
|
|
||||||
};
|
|
||||||
|
|
||||||
let sigs = match rpc_client
|
while now_time > end_time {
|
||||||
.get_signatures_for_address_with_config(
|
let rpc_config = GetConfirmedSignaturesForAddress2Config {
|
||||||
&Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(),
|
before: before_sig,
|
||||||
rpc_config,
|
until: None,
|
||||||
)
|
limit: None,
|
||||||
.await
|
commitment: Some(CommitmentConfig::confirmed()),
|
||||||
{
|
};
|
||||||
Ok(s) => s,
|
|
||||||
Err(e) => {
|
|
||||||
println!("Error in get_signatures_for_address_with_config: {}", e);
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if sigs.is_empty() {
|
let sigs = match rpc_client
|
||||||
println!("No signatures found");
|
.get_signatures_for_address_with_config(&OPENBOOK_KEY, rpc_config)
|
||||||
return None;
|
.await
|
||||||
}
|
{
|
||||||
let last = sigs.last().unwrap();
|
Ok(sigs) => sigs,
|
||||||
// println!("{:?}", last.block_time.unwrap());
|
Err(e) => {
|
||||||
Some((
|
println!("Error fetching signatures: {}", e);
|
||||||
Signature::from_str(&last.signature).unwrap(),
|
continue;
|
||||||
last.block_time.unwrap(),
|
|
||||||
sigs,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn get_transactions(
|
|
||||||
rpc_client: &RpcClient,
|
|
||||||
mut sigs: Vec<RpcConfirmedTransactionStatusWithSignature>,
|
|
||||||
fill_sender: &Sender<OpenBookFillEvent>,
|
|
||||||
target_markets: &HashMap<Pubkey, u8>,
|
|
||||||
) {
|
|
||||||
sigs.retain(|sig| sig.err.is_none());
|
|
||||||
if sigs.last().is_none() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let txn_config = RpcTransactionConfig {
|
|
||||||
encoding: Some(UiTransactionEncoding::Json),
|
|
||||||
commitment: Some(CommitmentConfig::confirmed()),
|
|
||||||
max_supported_transaction_version: Some(0),
|
|
||||||
};
|
|
||||||
|
|
||||||
let signatures: Vec<_> = sigs
|
|
||||||
.into_iter()
|
|
||||||
.map(|sig| sig.signature.parse::<Signature>().unwrap())
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let txn_futs: Vec<_> = signatures
|
|
||||||
.iter()
|
|
||||||
.map(|s| rpc_client.get_transaction_with_config(s, txn_config))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let mut txns = join_all(txn_futs).await;
|
|
||||||
|
|
||||||
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
|
|
||||||
if !fills.is_empty() {
|
|
||||||
for fill in fills.into_iter() {
|
|
||||||
// println!("Sending fill {:?}", fill);
|
|
||||||
if let Err(_) = fill_sender.send(fill).await {
|
|
||||||
panic!("receiver dropped");
|
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
if sigs.is_empty() {
|
||||||
|
println!("No signatures found, trying again");
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
let last = sigs.last().unwrap();
|
||||||
|
let last_time = last.block_time.unwrap().clone();
|
||||||
|
let last_signature = last.signature.clone();
|
||||||
|
let transactions = sigs
|
||||||
|
.into_iter()
|
||||||
|
.map(|s| PgTransaction::from_rpc_confirmed_transaction(s))
|
||||||
|
.collect::<Vec<PgTransaction>>();
|
||||||
|
|
||||||
|
if transactions.is_empty() {
|
||||||
|
println!("No transactions found, trying again");
|
||||||
|
}
|
||||||
|
debug!("writing: {:?} txns to DB\n", transactions.len());
|
||||||
|
let upsert_statement = build_transactions_insert_statement(transactions);
|
||||||
|
let client = pool.get().await?;
|
||||||
|
client
|
||||||
|
.execute(&upsert_statement, &[])
|
||||||
|
.await
|
||||||
|
.map_err_anyhow()?;
|
||||||
|
|
||||||
|
now_time = last_time;
|
||||||
|
before_sig = Some(Signature::from_str(&last_signature)?);
|
||||||
|
let time_left = backfill_time_left(now_time, end_time);
|
||||||
|
println!(
|
||||||
|
"{} minutes ~ {} days remaining in the backfill\n",
|
||||||
|
time_left.num_minutes(),
|
||||||
|
time_left.num_days()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration {
|
fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration {
|
||||||
|
|
|
@ -4,6 +4,7 @@ use crate::structs::{
|
||||||
openbook::PgOpenBookFill,
|
openbook::PgOpenBookFill,
|
||||||
resolution::Resolution,
|
resolution::Resolution,
|
||||||
trader::PgTrader,
|
trader::PgTrader,
|
||||||
|
transaction::PgTransaction,
|
||||||
};
|
};
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use deadpool_postgres::{GenericClient, Pool};
|
use deadpool_postgres::{GenericClient, Pool};
|
||||||
|
@ -59,10 +60,7 @@ pub async fn fetch_fills_from(
|
||||||
let rows = client
|
let rows = client
|
||||||
.query(stmt, &[&market_address_string, &start_time, &end_time])
|
.query(stmt, &[&market_address_string, &start_time, &end_time])
|
||||||
.await?;
|
.await?;
|
||||||
Ok(rows
|
Ok(rows.into_iter().map(PgOpenBookFill::from_row).collect())
|
||||||
.into_iter()
|
|
||||||
.map(PgOpenBookFill::from_row)
|
|
||||||
.collect())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn fetch_latest_finished_candle(
|
pub async fn fetch_latest_finished_candle(
|
||||||
|
@ -323,3 +321,23 @@ pub async fn fetch_coingecko_24h_high_low(
|
||||||
.map(PgCoinGecko24HighLow::from_row)
|
.map(PgCoinGecko24HighLow::from_row)
|
||||||
.collect())
|
.collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Fetches unprocessed, non-error transactions for the specified worker partition.
|
||||||
|
/// Pulls at most 50 transactions at a time.
|
||||||
|
pub async fn fetch_worker_transactions(
|
||||||
|
worker_id: i32,
|
||||||
|
pool: &Pool,
|
||||||
|
) -> anyhow::Result<Vec<PgTransaction>> {
|
||||||
|
let client = pool.get().await?;
|
||||||
|
|
||||||
|
let stmt = r#"SELECT signature, program_pk, block_datetime, slot, err, "processed", worker_partition
|
||||||
|
FROM transactions
|
||||||
|
where worker_partition = $1
|
||||||
|
and err = false
|
||||||
|
and processed = false
|
||||||
|
LIMIT 50"#;
|
||||||
|
|
||||||
|
let rows = client.query(stmt, &[&worker_id]).await?;
|
||||||
|
|
||||||
|
Ok(rows.into_iter().map(PgTransaction::from_row).collect())
|
||||||
|
}
|
||||||
|
|
|
@ -36,6 +36,7 @@ pub async fn connect_to_database() -> anyhow::Result<Pool> {
|
||||||
MakeTlsConnector::new(
|
MakeTlsConnector::new(
|
||||||
TlsConnector::builder()
|
TlsConnector::builder()
|
||||||
.add_root_certificate(Certificate::from_pem(&ca_cert)?)
|
.add_root_certificate(Certificate::from_pem(&ca_cert)?)
|
||||||
|
// TODO: make this configurable
|
||||||
.identity(Identity::from_pkcs12(&client_key, "pass")?)
|
.identity(Identity::from_pkcs12(&client_key, "pass")?)
|
||||||
.danger_accept_invalid_certs(false)
|
.danger_accept_invalid_certs(false)
|
||||||
.build()?,
|
.build()?,
|
||||||
|
@ -66,8 +67,9 @@ pub async fn connect_to_database() -> anyhow::Result<Pool> {
|
||||||
|
|
||||||
pub async fn setup_database(pool: &Pool) -> anyhow::Result<()> {
|
pub async fn setup_database(pool: &Pool) -> anyhow::Result<()> {
|
||||||
let candles_table_fut = create_candles_table(pool);
|
let candles_table_fut = create_candles_table(pool);
|
||||||
|
let transactions_table_fut = create_transactions_table(pool);
|
||||||
let fills_table_fut = create_fills_table(pool);
|
let fills_table_fut = create_fills_table(pool);
|
||||||
let result = tokio::try_join!(candles_table_fut, fills_table_fut);
|
let result = tokio::try_join!(candles_table_fut, transactions_table_fut, fills_table_fut);
|
||||||
match result {
|
match result {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
println!("Successfully configured database");
|
println!("Successfully configured database");
|
||||||
|
@ -86,7 +88,7 @@ pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> {
|
||||||
client
|
client
|
||||||
.execute(
|
.execute(
|
||||||
"CREATE TABLE IF NOT EXISTS candles (
|
"CREATE TABLE IF NOT EXISTS candles (
|
||||||
id serial,
|
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
||||||
market_name text,
|
market_name text,
|
||||||
start_time timestamptz,
|
start_time timestamptz,
|
||||||
end_time timestamptz,
|
end_time timestamptz,
|
||||||
|
@ -125,7 +127,7 @@ pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> {
|
||||||
client
|
client
|
||||||
.execute(
|
.execute(
|
||||||
"CREATE TABLE IF NOT EXISTS fills (
|
"CREATE TABLE IF NOT EXISTS fills (
|
||||||
id numeric PRIMARY KEY,
|
signature text not null,
|
||||||
time timestamptz not null,
|
time timestamptz not null,
|
||||||
market text not null,
|
market text not null,
|
||||||
open_orders text not null,
|
open_orders text not null,
|
||||||
|
@ -136,19 +138,14 @@ pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> {
|
||||||
native_qty_received double precision not null,
|
native_qty_received double precision not null,
|
||||||
native_fee_or_rebate double precision not null,
|
native_fee_or_rebate double precision not null,
|
||||||
fee_tier text not null,
|
fee_tier text not null,
|
||||||
order_id text not null
|
order_id text not null,
|
||||||
|
log_index int4 not null,
|
||||||
|
CONSTRAINT fills_pk PRIMARY KEY (signature, log_index)
|
||||||
)",
|
)",
|
||||||
&[],
|
&[],
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
client
|
|
||||||
.execute(
|
|
||||||
"CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)",
|
|
||||||
&[],
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
client
|
client
|
||||||
.execute(
|
.execute(
|
||||||
"CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)",
|
"CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)",
|
||||||
|
@ -157,3 +154,41 @@ pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> {
|
||||||
.await?;
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn create_transactions_table(pool: &Pool) -> anyhow::Result<()> {
|
||||||
|
let client = pool.get().await?;
|
||||||
|
|
||||||
|
client
|
||||||
|
.execute(
|
||||||
|
"CREATE TABLE IF NOT EXISTS transactions (
|
||||||
|
signature text NOT NULL,
|
||||||
|
program_pk text NOT NULL,
|
||||||
|
block_datetime timestamptz NOT NULL,
|
||||||
|
slot bigint NOT NULL,
|
||||||
|
err bool NOT NULL,
|
||||||
|
processed bool NOT NULL,
|
||||||
|
worker_partition int4 NOT NULL,
|
||||||
|
CONSTRAINT transactions_pk PRIMARY KEY (signature, worker_partition)
|
||||||
|
) PARTITION BY LIST (worker_partition);",
|
||||||
|
&[],
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
client.batch_execute(
|
||||||
|
"CREATE INDEX IF NOT EXISTS transactions_processed_err_idx ON ONLY transactions (signature) WHERE processed IS NOT TRUE and err IS NOT TRUE;
|
||||||
|
CREATE INDEX IF NOT EXISTS transactions_program_pk_idx ON ONLY transactions USING btree (program_pk, slot DESC);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS transactions_0 PARTITION OF transactions FOR VALUES IN (0);
|
||||||
|
CREATE TABLE IF NOT EXISTS transactions_1 PARTITION OF transactions FOR VALUES IN (1);
|
||||||
|
CREATE TABLE IF NOT EXISTS transactions_2 PARTITION OF transactions FOR VALUES IN (2);
|
||||||
|
CREATE TABLE IF NOT EXISTS transactions_3 PARTITION OF transactions FOR VALUES IN (3);
|
||||||
|
CREATE TABLE IF NOT EXISTS transactions_4 PARTITION OF transactions FOR VALUES IN (4);
|
||||||
|
CREATE TABLE IF NOT EXISTS transactions_5 PARTITION OF transactions FOR VALUES IN (5);
|
||||||
|
CREATE TABLE IF NOT EXISTS transactions_6 PARTITION OF transactions FOR VALUES IN (6);
|
||||||
|
CREATE TABLE IF NOT EXISTS transactions_7 PARTITION OF transactions FOR VALUES IN (7);
|
||||||
|
CREATE TABLE IF NOT EXISTS transactions_8 PARTITION OF transactions FOR VALUES IN (8);
|
||||||
|
CREATE TABLE IF NOT EXISTS transactions_9 PARTITION OF transactions FOR VALUES IN (9);"
|
||||||
|
).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
|
@ -1,20 +1,51 @@
|
||||||
use deadpool_postgres::Pool;
|
use deadpool_postgres::Pool;
|
||||||
use std::{
|
use log::debug;
|
||||||
collections::{hash_map::DefaultHasher, HashMap},
|
use std::collections::HashMap;
|
||||||
hash::{Hash, Hasher},
|
|
||||||
};
|
|
||||||
use tokio::sync::mpsc::{error::TryRecvError, Receiver};
|
use tokio::sync::mpsc::{error::TryRecvError, Receiver};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
structs::{candle::Candle, openbook::OpenBookFillEvent},
|
structs::{candle::Candle, openbook::OpenBookFillEvent, transaction::PgTransaction},
|
||||||
utils::{to_timestampz, AnyhowWrap},
|
utils::{to_timestampz, AnyhowWrap},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
pub async fn insert_fills_atomically(
|
||||||
|
pool: &Pool,
|
||||||
|
worker_id: i32,
|
||||||
|
fills: Vec<OpenBookFillEvent>,
|
||||||
|
signatures: Vec<String>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let mut client = pool.get().await?;
|
||||||
|
|
||||||
|
let db_txn = client.build_transaction().start().await?;
|
||||||
|
|
||||||
|
// 1. Insert fills
|
||||||
|
if fills.len() > 0 {
|
||||||
|
let fills_statement = build_fills_upsert_statement_not_crazy(fills);
|
||||||
|
db_txn
|
||||||
|
.execute(&fills_statement, &[])
|
||||||
|
.await
|
||||||
|
.map_err_anyhow()
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Update txns table as processed
|
||||||
|
let transactions_statement =
|
||||||
|
build_transactions_processed_update_statement(worker_id, signatures);
|
||||||
|
db_txn
|
||||||
|
.execute(&transactions_statement, &[])
|
||||||
|
.await
|
||||||
|
.map_err_anyhow()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
db_txn.commit().await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn persist_fill_events(
|
pub async fn persist_fill_events(
|
||||||
pool: &Pool,
|
pool: &Pool,
|
||||||
fill_receiver: &mut Receiver<OpenBookFillEvent>,
|
fill_receiver: &mut Receiver<OpenBookFillEvent>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let client = pool.get().await?;
|
|
||||||
loop {
|
loop {
|
||||||
let mut write_batch = HashMap::new();
|
let mut write_batch = HashMap::new();
|
||||||
while write_batch.len() < 10 {
|
while write_batch.len() < 10 {
|
||||||
|
@ -36,70 +67,25 @@ pub async fn persist_fill_events(
|
||||||
}
|
}
|
||||||
|
|
||||||
if !write_batch.is_empty() {
|
if !write_batch.is_empty() {
|
||||||
// print!("writing: {:?} events to DB\n", write_batch.len());
|
debug!("writing: {:?} events to DB\n", write_batch.len());
|
||||||
|
|
||||||
// match conn.ping().await {
|
|
||||||
// Ok(_) => {
|
|
||||||
let upsert_statement = build_fills_upsert_statement(write_batch);
|
let upsert_statement = build_fills_upsert_statement(write_batch);
|
||||||
|
let client = pool.get().await?;
|
||||||
client
|
client
|
||||||
.execute(&upsert_statement, &[])
|
.execute(&upsert_statement, &[])
|
||||||
.await
|
.await
|
||||||
.map_err_anyhow()
|
.map_err_anyhow()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
// }
|
|
||||||
// Err(_) => {
|
|
||||||
// println!("Fills ping failed");
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn persist_candles(
|
|
||||||
pool: Pool,
|
|
||||||
candles_receiver: &mut Receiver<Vec<Candle>>,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let client = pool.get().await.unwrap();
|
|
||||||
loop {
|
|
||||||
// match client.ping().await {
|
|
||||||
// Ok(_) => {
|
|
||||||
match candles_receiver.try_recv() {
|
|
||||||
Ok(candles) => {
|
|
||||||
if candles.is_empty() {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// print!("writing: {:?} candles to DB\n", candles.len());
|
|
||||||
let upsert_statement = build_candles_upsert_statement(candles);
|
|
||||||
client
|
|
||||||
.execute(&upsert_statement, &[])
|
|
||||||
.await
|
|
||||||
.map_err_anyhow()
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
Err(TryRecvError::Empty) => continue,
|
|
||||||
Err(TryRecvError::Disconnected) => {
|
|
||||||
panic!("Candles sender must stay alive")
|
|
||||||
}
|
|
||||||
};
|
|
||||||
// }
|
|
||||||
// Err(_) => {
|
|
||||||
// println!("Candle ping failed");
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// };
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[allow(deprecated)]
|
#[allow(deprecated)]
|
||||||
fn build_fills_upsert_statement(events: HashMap<OpenBookFillEvent, u8>) -> String {
|
fn build_fills_upsert_statement(events: HashMap<OpenBookFillEvent, u8>) -> String {
|
||||||
let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES");
|
let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES");
|
||||||
for (idx, event) in events.keys().enumerate() {
|
for (idx, event) in events.keys().enumerate() {
|
||||||
let mut hasher = DefaultHasher::new();
|
|
||||||
event.hash(&mut hasher);
|
|
||||||
let val_str = format!(
|
let val_str = format!(
|
||||||
"({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {})",
|
"({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})",
|
||||||
hasher.finish(),
|
event.signature,
|
||||||
to_timestampz(event.block_time as u64).to_rfc3339(),
|
to_timestampz(event.block_time as u64).to_rfc3339(),
|
||||||
event.market,
|
event.market,
|
||||||
event.open_orders,
|
event.open_orders,
|
||||||
|
@ -111,6 +97,7 @@ fn build_fills_upsert_statement(events: HashMap<OpenBookFillEvent, u8>) -> Strin
|
||||||
event.native_fee_or_rebate,
|
event.native_fee_or_rebate,
|
||||||
event.fee_tier,
|
event.fee_tier,
|
||||||
event.order_id,
|
event.order_id,
|
||||||
|
event.log_index,
|
||||||
);
|
);
|
||||||
|
|
||||||
if idx == 0 {
|
if idx == 0 {
|
||||||
|
@ -120,13 +107,46 @@ fn build_fills_upsert_statement(events: HashMap<OpenBookFillEvent, u8>) -> Strin
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let handle_conflict = "ON CONFLICT (id) DO UPDATE SET market=excluded.market";
|
let handle_conflict = "ON CONFLICT DO NOTHING";
|
||||||
|
|
||||||
stmt = format!("{} {}", stmt, handle_conflict);
|
stmt = format!("{} {}", stmt, handle_conflict);
|
||||||
stmt
|
stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn build_candles_upsert_statement(candles: Vec<Candle>) -> String {
|
fn build_fills_upsert_statement_not_crazy(fills: Vec<OpenBookFillEvent>) -> String {
|
||||||
|
let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES");
|
||||||
|
for (idx, fill) in fills.iter().enumerate() {
|
||||||
|
let val_str = format!(
|
||||||
|
"(\'{}\', \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})",
|
||||||
|
fill.signature,
|
||||||
|
to_timestampz(fill.block_time as u64).to_rfc3339(),
|
||||||
|
fill.market,
|
||||||
|
fill.open_orders,
|
||||||
|
fill.open_orders_owner,
|
||||||
|
fill.bid,
|
||||||
|
fill.maker,
|
||||||
|
fill.native_qty_paid,
|
||||||
|
fill.native_qty_received,
|
||||||
|
fill.native_fee_or_rebate,
|
||||||
|
fill.fee_tier,
|
||||||
|
fill.order_id,
|
||||||
|
fill.log_index,
|
||||||
|
);
|
||||||
|
|
||||||
|
if idx == 0 {
|
||||||
|
stmt = format!("{} {}", &stmt, val_str);
|
||||||
|
} else {
|
||||||
|
stmt = format!("{}, {}", &stmt, val_str);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let handle_conflict = "ON CONFLICT DO NOTHING";
|
||||||
|
|
||||||
|
stmt = format!("{} {}", stmt, handle_conflict);
|
||||||
|
stmt
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn build_candles_upsert_statement(candles: &Vec<Candle>) -> String {
|
||||||
let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES");
|
let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES");
|
||||||
for (idx, candle) in candles.iter().enumerate() {
|
for (idx, candle) in candles.iter().enumerate() {
|
||||||
let val_str = format!(
|
let val_str = format!(
|
||||||
|
@ -164,56 +184,53 @@ pub fn build_candles_upsert_statement(candles: Vec<Candle>) -> String {
|
||||||
stmt
|
stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
pub fn build_transactions_insert_statement(transactions: Vec<PgTransaction>) -> String {
|
||||||
mod tests {
|
let mut stmt = String::from("INSERT INTO transactions (signature, program_pk, block_datetime, slot, err, processed, worker_partition) VALUES");
|
||||||
use super::*;
|
for (idx, txn) in transactions.iter().enumerate() {
|
||||||
use solana_sdk::pubkey::Pubkey;
|
let val_str = format!(
|
||||||
use std::str::FromStr;
|
"(\'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {})",
|
||||||
|
txn.signature,
|
||||||
|
txn.program_pk,
|
||||||
|
txn.block_datetime.to_rfc3339(),
|
||||||
|
txn.slot,
|
||||||
|
txn.err,
|
||||||
|
txn.processed,
|
||||||
|
txn.worker_partition,
|
||||||
|
);
|
||||||
|
|
||||||
#[test]
|
if idx == 0 {
|
||||||
fn test_event_hashing() {
|
stmt = format!("{} {}", &stmt, val_str);
|
||||||
let event_1 = OpenBookFillEvent {
|
} else {
|
||||||
market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(),
|
stmt = format!("{}, {}", &stmt, val_str);
|
||||||
open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(),
|
}
|
||||||
open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw")
|
|
||||||
.unwrap(),
|
|
||||||
bid: false,
|
|
||||||
maker: false,
|
|
||||||
native_qty_paid: 200000000,
|
|
||||||
native_qty_received: 4204317,
|
|
||||||
native_fee_or_rebate: 1683,
|
|
||||||
order_id: 387898134381964481824213,
|
|
||||||
owner_slot: 0,
|
|
||||||
fee_tier: 0,
|
|
||||||
client_order_id: None,
|
|
||||||
referrer_rebate: Some(841),
|
|
||||||
block_time: 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
let event_2 = OpenBookFillEvent {
|
|
||||||
market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(),
|
|
||||||
open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(),
|
|
||||||
open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw")
|
|
||||||
.unwrap(),
|
|
||||||
bid: false,
|
|
||||||
maker: false,
|
|
||||||
native_qty_paid: 200000001,
|
|
||||||
native_qty_received: 4204317,
|
|
||||||
native_fee_or_rebate: 1683,
|
|
||||||
order_id: 387898134381964481824213,
|
|
||||||
owner_slot: 0,
|
|
||||||
fee_tier: 0,
|
|
||||||
client_order_id: None,
|
|
||||||
referrer_rebate: Some(841),
|
|
||||||
block_time: 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut h1 = DefaultHasher::new();
|
|
||||||
event_1.hash(&mut h1);
|
|
||||||
|
|
||||||
let mut h2 = DefaultHasher::new();
|
|
||||||
event_2.hash(&mut h2);
|
|
||||||
|
|
||||||
assert_ne!(h1.finish(), h2.finish());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let handle_conflict = "ON CONFLICT DO NOTHING";
|
||||||
|
|
||||||
|
stmt = format!("{} {}", stmt, handle_conflict);
|
||||||
|
stmt
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn build_transactions_processed_update_statement(
|
||||||
|
worker_id: i32,
|
||||||
|
processed_signatures: Vec<String>,
|
||||||
|
) -> String {
|
||||||
|
let mut stmt = String::from(
|
||||||
|
"UPDATE transactions
|
||||||
|
SET processed = true
|
||||||
|
WHERE transactions.signature IN (",
|
||||||
|
);
|
||||||
|
for (idx, sig) in processed_signatures.iter().enumerate() {
|
||||||
|
let val_str = if idx == processed_signatures.len() - 1 {
|
||||||
|
format!("\'{}\'", sig,)
|
||||||
|
} else {
|
||||||
|
format!("\'{}\',", sig,)
|
||||||
|
};
|
||||||
|
stmt = format!("{} {}", &stmt, val_str);
|
||||||
|
}
|
||||||
|
|
||||||
|
let worker_stmt = format!(") AND worker_partition = {} ", worker_id);
|
||||||
|
|
||||||
|
stmt = format!("{} {}", stmt, worker_stmt);
|
||||||
|
stmt
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,13 @@
|
||||||
use actix_web::{
|
use actix_web::{
|
||||||
|
http::StatusCode,
|
||||||
middleware::Logger,
|
middleware::Logger,
|
||||||
|
rt::System,
|
||||||
web::{self, Data},
|
web::{self, Data},
|
||||||
App, HttpServer,
|
App, HttpServer,
|
||||||
};
|
};
|
||||||
|
use actix_web_prom::PrometheusMetricsBuilder;
|
||||||
use candles::get_candles;
|
use candles::get_candles;
|
||||||
|
use prometheus::Registry;
|
||||||
|
|
||||||
use markets::get_markets;
|
use markets::get_markets;
|
||||||
use openbook_candles::{
|
use openbook_candles::{
|
||||||
|
@ -12,6 +16,7 @@ use openbook_candles::{
|
||||||
utils::{Config, WebContext},
|
utils::{Config, WebContext},
|
||||||
};
|
};
|
||||||
use std::env;
|
use std::env;
|
||||||
|
use std::thread;
|
||||||
use traders::{get_top_traders_by_base_volume, get_top_traders_by_quote_volume};
|
use traders::{get_top_traders_by_base_volume, get_top_traders_by_quote_volume};
|
||||||
|
|
||||||
mod candles;
|
mod candles;
|
||||||
|
@ -39,6 +44,22 @@ async fn main() -> std::io::Result<()> {
|
||||||
let market_infos = fetch_market_infos(&config, markets).await.unwrap();
|
let market_infos = fetch_market_infos(&config, markets).await.unwrap();
|
||||||
let pool = connect_to_database().await.unwrap();
|
let pool = connect_to_database().await.unwrap();
|
||||||
|
|
||||||
|
let registry = Registry::new();
|
||||||
|
// For serving metrics on a private port
|
||||||
|
let private_metrics = PrometheusMetricsBuilder::new("openbook_candles_server_private")
|
||||||
|
.registry(registry.clone())
|
||||||
|
.exclude("/metrics")
|
||||||
|
.exclude_status(StatusCode::NOT_FOUND)
|
||||||
|
.endpoint("/metrics")
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
// For collecting metrics on the public api, excluding 404s
|
||||||
|
let public_metrics = PrometheusMetricsBuilder::new("openbook_candles_server")
|
||||||
|
.registry(registry.clone())
|
||||||
|
.exclude_status(StatusCode::NOT_FOUND)
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
let context = Data::new(WebContext {
|
let context = Data::new(WebContext {
|
||||||
rpc_url,
|
rpc_url,
|
||||||
pool,
|
pool,
|
||||||
|
@ -46,20 +67,40 @@ async fn main() -> std::io::Result<()> {
|
||||||
});
|
});
|
||||||
|
|
||||||
println!("Starting server");
|
println!("Starting server");
|
||||||
HttpServer::new(move || {
|
// Thread to serve public API
|
||||||
App::new()
|
let public_server = thread::spawn(move || {
|
||||||
.wrap(Logger::default())
|
let sys = System::new();
|
||||||
.app_data(context.clone())
|
let srv = HttpServer::new(move || {
|
||||||
.service(
|
App::new()
|
||||||
web::scope("/api")
|
.wrap(Logger::default())
|
||||||
.service(get_candles)
|
.wrap(public_metrics.clone())
|
||||||
.service(get_top_traders_by_base_volume)
|
.app_data(context.clone())
|
||||||
.service(get_top_traders_by_quote_volume)
|
.service(
|
||||||
.service(get_markets)
|
web::scope("/api")
|
||||||
.service(coingecko::service()),
|
.service(get_candles)
|
||||||
)
|
.service(get_top_traders_by_base_volume)
|
||||||
})
|
.service(get_top_traders_by_quote_volume)
|
||||||
.bind(&bind_addr)?
|
.service(get_markets)
|
||||||
.run()
|
.service(coingecko::service()),
|
||||||
.await
|
)
|
||||||
|
})
|
||||||
|
.bind(&bind_addr)
|
||||||
|
.unwrap()
|
||||||
|
.run();
|
||||||
|
sys.block_on(srv).unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Thread to serve metrics endpoint privately
|
||||||
|
let private_server = thread::spawn(move || {
|
||||||
|
let sys = System::new();
|
||||||
|
let srv = HttpServer::new(move || App::new().wrap(private_metrics.clone()))
|
||||||
|
.bind("0.0.0.0:9091")
|
||||||
|
.unwrap()
|
||||||
|
.run();
|
||||||
|
sys.block_on(srv).unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
private_server.join().unwrap();
|
||||||
|
public_server.join().unwrap();
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,7 @@ use tokio_postgres::Row;
|
||||||
|
|
||||||
use super::resolution::Resolution;
|
use super::resolution::Resolution;
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
pub struct Candle {
|
pub struct Candle {
|
||||||
pub market_name: String,
|
pub market_name: String,
|
||||||
pub start_time: DateTime<Utc>,
|
pub start_time: DateTime<Utc>,
|
||||||
|
|
|
@ -6,3 +6,4 @@ pub mod resolution;
|
||||||
pub mod slab;
|
pub mod slab;
|
||||||
pub mod trader;
|
pub mod trader;
|
||||||
pub mod tradingview;
|
pub mod tradingview;
|
||||||
|
pub mod transaction;
|
||||||
|
|
|
@ -22,8 +22,14 @@ pub struct OpenBookFillEventRaw {
|
||||||
pub referrer_rebate: Option<u64>,
|
pub referrer_rebate: Option<u64>,
|
||||||
}
|
}
|
||||||
impl OpenBookFillEventRaw {
|
impl OpenBookFillEventRaw {
|
||||||
pub fn with_time(self, block_time: i64) -> OpenBookFillEvent {
|
pub fn into_event(
|
||||||
|
self,
|
||||||
|
signature: String,
|
||||||
|
block_time: i64,
|
||||||
|
log_index: usize,
|
||||||
|
) -> OpenBookFillEvent {
|
||||||
OpenBookFillEvent {
|
OpenBookFillEvent {
|
||||||
|
signature,
|
||||||
market: self.market,
|
market: self.market,
|
||||||
open_orders: self.open_orders,
|
open_orders: self.open_orders,
|
||||||
open_orders_owner: self.open_orders_owner,
|
open_orders_owner: self.open_orders_owner,
|
||||||
|
@ -38,6 +44,7 @@ impl OpenBookFillEventRaw {
|
||||||
client_order_id: self.client_order_id,
|
client_order_id: self.client_order_id,
|
||||||
referrer_rebate: self.referrer_rebate,
|
referrer_rebate: self.referrer_rebate,
|
||||||
block_time,
|
block_time,
|
||||||
|
log_index,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -45,6 +52,7 @@ impl OpenBookFillEventRaw {
|
||||||
#[event]
|
#[event]
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
pub struct OpenBookFillEvent {
|
pub struct OpenBookFillEvent {
|
||||||
|
pub signature: String,
|
||||||
pub market: Pubkey,
|
pub market: Pubkey,
|
||||||
pub open_orders: Pubkey,
|
pub open_orders: Pubkey,
|
||||||
pub open_orders_owner: Pubkey,
|
pub open_orders_owner: Pubkey,
|
||||||
|
@ -59,6 +67,7 @@ pub struct OpenBookFillEvent {
|
||||||
pub client_order_id: Option<u64>,
|
pub client_order_id: Option<u64>,
|
||||||
pub referrer_rebate: Option<u64>,
|
pub referrer_rebate: Option<u64>,
|
||||||
pub block_time: i64,
|
pub block_time: i64,
|
||||||
|
pub log_index: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, PartialEq)]
|
#[derive(Copy, Clone, Debug, PartialEq)]
|
||||||
|
|
|
@ -0,0 +1,52 @@
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use solana_client::rpc_response::RpcConfirmedTransactionStatusWithSignature;
|
||||||
|
use tokio_postgres::Row;
|
||||||
|
|
||||||
|
use crate::utils::{to_timestampz, OPENBOOK_KEY};
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
|
pub struct PgTransaction {
|
||||||
|
pub signature: String,
|
||||||
|
pub program_pk: String,
|
||||||
|
pub block_datetime: DateTime<Utc>,
|
||||||
|
pub slot: u64,
|
||||||
|
pub err: bool,
|
||||||
|
pub processed: bool,
|
||||||
|
pub worker_partition: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub const NUM_TRANSACTION_PARTITIONS: u64 = 10;
|
||||||
|
|
||||||
|
impl PgTransaction {
|
||||||
|
pub fn from_rpc_confirmed_transaction(
|
||||||
|
rpc_confirmed_transaction: RpcConfirmedTransactionStatusWithSignature,
|
||||||
|
) -> Self {
|
||||||
|
PgTransaction {
|
||||||
|
signature: rpc_confirmed_transaction.signature,
|
||||||
|
program_pk: OPENBOOK_KEY.to_string(),
|
||||||
|
block_datetime: to_timestampz(rpc_confirmed_transaction.block_time.unwrap() as u64),
|
||||||
|
slot: rpc_confirmed_transaction.slot,
|
||||||
|
err: rpc_confirmed_transaction.err.is_some(),
|
||||||
|
processed: false,
|
||||||
|
worker_partition: (rpc_confirmed_transaction.slot % NUM_TRANSACTION_PARTITIONS) as i32,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_row(row: Row) -> Self {
|
||||||
|
let slot_raw = row.get::<usize, i64>(3);
|
||||||
|
PgTransaction {
|
||||||
|
signature: row.get(0),
|
||||||
|
program_pk: row.get(1),
|
||||||
|
block_datetime: row.get(2),
|
||||||
|
slot: slot_raw as u64,
|
||||||
|
err: row.get(4),
|
||||||
|
processed: row.get(5),
|
||||||
|
worker_partition: row.get(6),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum ProcessState {
|
||||||
|
Processed,
|
||||||
|
Unprocessed,
|
||||||
|
}
|
|
@ -1,9 +1,13 @@
|
||||||
|
use anchor_lang::prelude::Pubkey;
|
||||||
use chrono::{NaiveDateTime, Utc};
|
use chrono::{NaiveDateTime, Utc};
|
||||||
use deadpool_postgres::Pool;
|
use deadpool_postgres::Pool;
|
||||||
use serde_derive::Deserialize;
|
use serde_derive::Deserialize;
|
||||||
|
use solana_sdk::pubkey;
|
||||||
|
|
||||||
use crate::structs::markets::MarketInfo;
|
use crate::structs::markets::MarketInfo;
|
||||||
|
|
||||||
|
pub const OPENBOOK_KEY: Pubkey = pubkey!("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX");
|
||||||
|
|
||||||
pub trait AnyhowWrap {
|
pub trait AnyhowWrap {
|
||||||
type Value;
|
type Value;
|
||||||
fn map_err_anyhow(self) -> anyhow::Result<Self::Value>;
|
fn map_err_anyhow(self) -> anyhow::Result<Self::Value>;
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
use chrono::{DateTime, Duration, DurationRound, Utc};
|
use chrono::{DateTime, Duration, DurationRound, Utc};
|
||||||
use deadpool_postgres::Pool;
|
use deadpool_postgres::Pool;
|
||||||
|
use log::debug;
|
||||||
use std::cmp::max;
|
use std::cmp::max;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
@ -46,12 +47,12 @@ pub async fn batch_higher_order_candles(
|
||||||
fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution())
|
fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution())
|
||||||
.await?;
|
.await?;
|
||||||
if constituent_candles.is_empty() {
|
if constituent_candles.is_empty() {
|
||||||
// println!(
|
debug!(
|
||||||
// "Batching {}, but no candles found for: {:?}, {}",
|
"Batching {}, but no candles found for: {:?}, {}",
|
||||||
// resolution,
|
resolution,
|
||||||
// market_name,
|
market_name,
|
||||||
// resolution.get_constituent_resolution()
|
resolution.get_constituent_resolution()
|
||||||
// );
|
);
|
||||||
return Ok(Vec::new());
|
return Ok(Vec::new());
|
||||||
}
|
}
|
||||||
let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
|
let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
|
||||||
|
@ -82,7 +83,7 @@ fn combine_into_higher_order_candles(
|
||||||
st: DateTime<Utc>,
|
st: DateTime<Utc>,
|
||||||
seed_candle: Candle,
|
seed_candle: Candle,
|
||||||
) -> Vec<Candle> {
|
) -> Vec<Candle> {
|
||||||
// println!("target_resolution: {}", target_resolution);
|
debug!("combining for target_resolution: {}", target_resolution);
|
||||||
|
|
||||||
let duration = target_resolution.get_duration();
|
let duration = target_resolution.get_duration();
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ use std::cmp::min;
|
||||||
|
|
||||||
use chrono::{DateTime, Duration, DurationRound, Utc};
|
use chrono::{DateTime, Duration, DurationRound, Utc};
|
||||||
use deadpool_postgres::Pool;
|
use deadpool_postgres::Pool;
|
||||||
|
use log::debug;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
|
database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
|
||||||
|
@ -24,9 +25,10 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
|
||||||
let start_time = candle.end_time;
|
let start_time = candle.end_time;
|
||||||
let end_time = min(
|
let end_time = min(
|
||||||
start_time + day(),
|
start_time + day(),
|
||||||
Utc::now().duration_trunc(Duration::minutes(1))?,
|
(Utc::now() + Duration::minutes(1)).duration_trunc(Duration::minutes(1))?,
|
||||||
);
|
);
|
||||||
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
|
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
|
||||||
|
|
||||||
let candles = combine_fills_into_1m_candles(
|
let candles = combine_fills_into_1m_candles(
|
||||||
&mut fills,
|
&mut fills,
|
||||||
market,
|
market,
|
||||||
|
@ -40,7 +42,7 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
|
||||||
let earliest_fill = fetch_earliest_fill(pool, market_address).await?;
|
let earliest_fill = fetch_earliest_fill(pool, market_address).await?;
|
||||||
|
|
||||||
if earliest_fill.is_none() {
|
if earliest_fill.is_none() {
|
||||||
println!("No fills found for: {:?}", market_name);
|
debug!("No fills found for: {:?}", market_name);
|
||||||
return Ok(Vec::new());
|
return Ok(Vec::new());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,7 +100,6 @@ fn combine_fills_into_1m_candles(
|
||||||
|
|
||||||
while matches!(fills_iter.peek(), Some(f) if f.time < end_time) {
|
while matches!(fills_iter.peek(), Some(f) if f.time < end_time) {
|
||||||
let fill = fills_iter.next().unwrap();
|
let fill = fills_iter.next().unwrap();
|
||||||
|
|
||||||
let (price, volume) =
|
let (price, volume) =
|
||||||
calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals);
|
calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals);
|
||||||
|
|
||||||
|
@ -112,8 +113,8 @@ fn combine_fills_into_1m_candles(
|
||||||
|
|
||||||
candles[i].start_time = start_time;
|
candles[i].start_time = start_time;
|
||||||
candles[i].end_time = end_time;
|
candles[i].end_time = end_time;
|
||||||
candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time);
|
candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time)
|
||||||
|
|| end_time < Utc::now() - Duration::minutes(10);
|
||||||
start_time = end_time;
|
start_time = end_time;
|
||||||
end_time += Duration::minutes(1);
|
end_time += Duration::minutes(1);
|
||||||
}
|
}
|
||||||
|
@ -132,7 +133,7 @@ pub async fn backfill_batch_1m_candles(
|
||||||
|
|
||||||
let earliest_fill = fetch_earliest_fill(pool, &market.address).await?;
|
let earliest_fill = fetch_earliest_fill(pool, &market.address).await?;
|
||||||
if earliest_fill.is_none() {
|
if earliest_fill.is_none() {
|
||||||
println!("No fills found for: {:?}", &market_name);
|
debug!("No fills found for: {:?}", &market_name);
|
||||||
return Ok(candles);
|
return Ok(candles);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,31 +3,31 @@ pub mod minute_candles;
|
||||||
|
|
||||||
use chrono::Duration;
|
use chrono::Duration;
|
||||||
use deadpool_postgres::Pool;
|
use deadpool_postgres::Pool;
|
||||||
|
use log::{error, warn};
|
||||||
use strum::IntoEnumIterator;
|
use strum::IntoEnumIterator;
|
||||||
use tokio::{sync::mpsc::Sender, time::sleep};
|
use tokio::time::sleep;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
database::insert::build_candles_upsert_statement,
|
||||||
structs::{candle::Candle, markets::MarketInfo, resolution::Resolution},
|
structs::{candle::Candle, markets::MarketInfo, resolution::Resolution},
|
||||||
|
utils::AnyhowWrap,
|
||||||
worker::candle_batching::minute_candles::batch_1m_candles,
|
worker::candle_batching::minute_candles::batch_1m_candles,
|
||||||
};
|
};
|
||||||
|
|
||||||
use self::higher_order_candles::batch_higher_order_candles;
|
use self::higher_order_candles::batch_higher_order_candles;
|
||||||
|
|
||||||
pub async fn batch_for_market(
|
use super::metrics::METRIC_CANDLES_TOTAL;
|
||||||
pool: &Pool,
|
|
||||||
candles_sender: &Sender<Vec<Candle>>,
|
pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
|
||||||
market: &MarketInfo,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
loop {
|
loop {
|
||||||
let sender = candles_sender.clone();
|
|
||||||
let market_clone = market.clone();
|
let market_clone = market.clone();
|
||||||
// let client = pool.get().await?;
|
|
||||||
loop {
|
loop {
|
||||||
sleep(Duration::milliseconds(2000).to_std()?).await;
|
sleep(Duration::milliseconds(2000).to_std()?).await;
|
||||||
match batch_inner(pool, &sender, &market_clone).await {
|
match batch_inner(pool, &market_clone).await {
|
||||||
Ok(_) => {}
|
Ok(_) => {}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
println!(
|
error!(
|
||||||
"Batching thread failed for {:?} with error: {:?}",
|
"Batching thread failed for {:?} with error: {:?}",
|
||||||
market_clone.name.clone(),
|
market_clone.name.clone(),
|
||||||
e
|
e
|
||||||
|
@ -36,33 +36,39 @@ pub async fn batch_for_market(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
println!("Restarting {:?} batching thread", market.name);
|
warn!("Restarting {:?} batching thread", market.name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn batch_inner(
|
async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
|
||||||
pool: &Pool,
|
|
||||||
candles_sender: &Sender<Vec<Candle>>,
|
|
||||||
market: &MarketInfo,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let market_name = &market.name.clone();
|
let market_name = &market.name.clone();
|
||||||
let candles = batch_1m_candles(pool, market).await?;
|
let candles = batch_1m_candles(pool, market).await?;
|
||||||
send_candles(candles, candles_sender).await;
|
METRIC_CANDLES_TOTAL
|
||||||
|
.with_label_values(&[market.name.as_str()])
|
||||||
|
.inc_by(candles.clone().len() as u64);
|
||||||
|
save_candles(pool, candles).await?;
|
||||||
for resolution in Resolution::iter() {
|
for resolution in Resolution::iter() {
|
||||||
if resolution == Resolution::R1m {
|
if resolution == Resolution::R1m {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let candles = batch_higher_order_candles(pool, market_name, resolution).await?;
|
let candles = batch_higher_order_candles(pool, market_name, resolution).await?;
|
||||||
send_candles(candles, candles_sender).await;
|
METRIC_CANDLES_TOTAL
|
||||||
}
|
.with_label_values(&[market.name.as_str()])
|
||||||
|
.inc_by(candles.clone().len() as u64);
|
||||||
|
save_candles(pool, candles).await?;
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_candles(candles: Vec<Candle>, candles_sender: &Sender<Vec<Candle>>) {
|
async fn save_candles(pool: &Pool, candles: Vec<Candle>) -> anyhow::Result<()> {
|
||||||
if !candles.is_empty() {
|
if candles.len() == 0 {
|
||||||
if let Err(_) = candles_sender.send(candles).await {
|
return Ok(());
|
||||||
panic!("candles receiver dropped");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
let upsert_statement = build_candles_upsert_statement(&candles);
|
||||||
|
let client = pool.get().await.unwrap();
|
||||||
|
client
|
||||||
|
.execute(&upsert_statement, &[])
|
||||||
|
.await
|
||||||
|
.map_err_anyhow()?;
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,23 +1,22 @@
|
||||||
|
use log::{error, info};
|
||||||
use openbook_candles::structs::candle::Candle;
|
|
||||||
use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
|
use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
|
||||||
use openbook_candles::structs::openbook::OpenBookFillEvent;
|
use openbook_candles::structs::transaction::NUM_TRANSACTION_PARTITIONS;
|
||||||
use openbook_candles::utils::Config;
|
use openbook_candles::utils::Config;
|
||||||
use openbook_candles::worker::trade_fetching::scrape::scrape;
|
use openbook_candles::worker::metrics::{
|
||||||
|
serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE,
|
||||||
|
};
|
||||||
|
use openbook_candles::worker::trade_fetching::scrape::{scrape_fills, scrape_signatures};
|
||||||
use openbook_candles::{
|
use openbook_candles::{
|
||||||
database::{
|
database::initialize::{connect_to_database, setup_database},
|
||||||
initialize::{connect_to_database, setup_database},
|
|
||||||
insert::{persist_candles, persist_fill_events},
|
|
||||||
},
|
|
||||||
worker::candle_batching::batch_for_market,
|
worker::candle_batching::batch_for_market,
|
||||||
};
|
};
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::{collections::HashMap, str::FromStr};
|
use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
|
||||||
use tokio::sync::mpsc;
|
|
||||||
|
|
||||||
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
env_logger::init();
|
||||||
dotenv::dotenv().ok();
|
dotenv::dotenv().ok();
|
||||||
|
|
||||||
let args: Vec<String> = env::args().collect();
|
let args: Vec<String> = env::args().collect();
|
||||||
|
@ -33,51 +32,59 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
|
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
|
||||||
let mut target_markets = HashMap::new();
|
let mut target_markets = HashMap::new();
|
||||||
for m in market_infos.clone() {
|
for m in market_infos.clone() {
|
||||||
target_markets.insert(Pubkey::from_str(&m.address)?, 0);
|
target_markets.insert(Pubkey::from_str(&m.address)?, m.name);
|
||||||
}
|
}
|
||||||
println!("{:?}", target_markets);
|
info!("{:?}", target_markets);
|
||||||
|
|
||||||
let pool = connect_to_database().await?;
|
let pool = connect_to_database().await?;
|
||||||
setup_database(&pool).await?;
|
setup_database(&pool).await?;
|
||||||
let mut handles = vec![];
|
let mut handles = vec![];
|
||||||
|
|
||||||
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(1000);
|
// signature scraping
|
||||||
|
let rpc_clone = rpc_url.clone();
|
||||||
|
let pool_clone = pool.clone();
|
||||||
handles.push(tokio::spawn(async move {
|
handles.push(tokio::spawn(async move {
|
||||||
scrape(&config, &fill_sender, &target_markets).await;
|
scrape_signatures(rpc_clone, &pool_clone).await.unwrap();
|
||||||
}));
|
}));
|
||||||
|
|
||||||
let fills_pool = pool.clone();
|
// transaction/fill scraping
|
||||||
handles.push(tokio::spawn(async move {
|
for id in 0..NUM_TRANSACTION_PARTITIONS {
|
||||||
loop {
|
let rpc_clone = rpc_url.clone();
|
||||||
persist_fill_events(&fills_pool, &mut fill_receiver)
|
let pool_clone = pool.clone();
|
||||||
.await
|
let markets_clone = target_markets.clone();
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
|
|
||||||
let (candle_sender, mut candle_receiver) = mpsc::channel::<Vec<Candle>>(1000);
|
|
||||||
|
|
||||||
for market in market_infos.into_iter() {
|
|
||||||
let sender = candle_sender.clone();
|
|
||||||
let batch_pool = pool.clone();
|
|
||||||
handles.push(tokio::spawn(async move {
|
handles.push(tokio::spawn(async move {
|
||||||
batch_for_market(&batch_pool, &sender, &market)
|
scrape_fills(id as i32, rpc_clone, &pool_clone, &markets_clone)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
println!("SOMETHING WENT WRONG");
|
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
let persist_pool = pool.clone();
|
// candle batching
|
||||||
|
for market in market_infos.into_iter() {
|
||||||
|
let batch_pool = pool.clone();
|
||||||
|
handles.push(tokio::spawn(async move {
|
||||||
|
batch_for_market(&batch_pool, &market).await.unwrap();
|
||||||
|
error!("batching halted for market {}", &market.name);
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
let monitor_pool = pool.clone();
|
||||||
handles.push(tokio::spawn(async move {
|
handles.push(tokio::spawn(async move {
|
||||||
|
// TODO: maybe break this out into a new function
|
||||||
loop {
|
loop {
|
||||||
persist_candles(persist_pool.clone(), &mut candle_receiver)
|
let pool_status = monitor_pool.status();
|
||||||
.await
|
METRIC_DB_POOL_AVAILABLE.set(pool_status.available as i64);
|
||||||
.unwrap();
|
METRIC_DB_POOL_SIZE.set(pool_status.size as i64);
|
||||||
|
|
||||||
|
tokio::time::sleep(WaitDuration::from_secs(10)).await;
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
handles.push(tokio::spawn(async move {
|
||||||
|
// TODO: this is ugly af
|
||||||
|
serve_metrics().await.unwrap().await.unwrap();
|
||||||
|
}));
|
||||||
|
|
||||||
futures::future::join_all(handles).await;
|
futures::future::join_all(handles).await;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -0,0 +1,76 @@
|
||||||
|
use actix_web::{dev::Server, http::StatusCode, App, HttpServer};
|
||||||
|
use actix_web_prom::PrometheusMetricsBuilder;
|
||||||
|
use lazy_static::lazy_static;
|
||||||
|
use prometheus::{
|
||||||
|
register_int_counter_vec_with_registry, register_int_counter_with_registry,
|
||||||
|
register_int_gauge_with_registry, IntCounter, IntCounterVec, IntGauge, Registry,
|
||||||
|
};
|
||||||
|
|
||||||
|
lazy_static! {
|
||||||
|
static ref METRIC_REGISTRY: Registry =
|
||||||
|
Registry::new_custom(Some("openbook_candles_worker".to_string()), None).unwrap();
|
||||||
|
pub static ref METRIC_TXS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
|
||||||
|
"txs_total",
|
||||||
|
"Total number of transactions scraped",
|
||||||
|
&["market", "status"],
|
||||||
|
METRIC_REGISTRY
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
pub static ref METRIC_FILLS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
|
||||||
|
"fills_total",
|
||||||
|
"Total number of fills parsed",
|
||||||
|
&["market"],
|
||||||
|
METRIC_REGISTRY
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
pub static ref METRIC_CANDLES_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
|
||||||
|
"candles_total",
|
||||||
|
"Total number of candles generated",
|
||||||
|
&["market"],
|
||||||
|
METRIC_REGISTRY
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
pub static ref METRIC_TRANSACTIONS_TOTAL: IntCounter = register_int_counter_with_registry!(
|
||||||
|
"transactions_total",
|
||||||
|
"Total number of transaction signatures scraped",
|
||||||
|
METRIC_REGISTRY
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
pub static ref METRIC_RPC_ERRORS_TOTAL: IntCounterVec =
|
||||||
|
register_int_counter_vec_with_registry!(
|
||||||
|
"rpc_errors_total",
|
||||||
|
"RPC errors while scraping",
|
||||||
|
&["method"],
|
||||||
|
METRIC_REGISTRY
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
pub static ref METRIC_DB_POOL_SIZE: IntGauge = register_int_gauge_with_registry!(
|
||||||
|
"db_pool_size",
|
||||||
|
"Current size of the DB connection pool",
|
||||||
|
METRIC_REGISTRY
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
pub static ref METRIC_DB_POOL_AVAILABLE: IntGauge = register_int_gauge_with_registry!(
|
||||||
|
"db_pool_available",
|
||||||
|
"Available DB connections in the pool",
|
||||||
|
METRIC_REGISTRY
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn serve_metrics() -> anyhow::Result<Server> {
|
||||||
|
let metrics = PrometheusMetricsBuilder::new("openbook_candles_worker")
|
||||||
|
.registry(METRIC_REGISTRY.clone())
|
||||||
|
.exclude("/metrics")
|
||||||
|
.exclude_status(StatusCode::NOT_FOUND)
|
||||||
|
.endpoint("/metrics")
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
let server = HttpServer::new(move || App::new().wrap(metrics.clone()))
|
||||||
|
.bind("0.0.0.0:9091")
|
||||||
|
.unwrap()
|
||||||
|
.disable_signals()
|
||||||
|
.run();
|
||||||
|
|
||||||
|
Ok(server)
|
||||||
|
}
|
|
@ -1,2 +1,3 @@
|
||||||
pub mod candle_batching;
|
pub mod candle_batching;
|
||||||
|
pub mod metrics;
|
||||||
pub mod trade_fetching;
|
pub mod trade_fetching;
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use log::warn;
|
||||||
use solana_client::client_error::Result as ClientResult;
|
use solana_client::client_error::Result as ClientResult;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use solana_transaction_status::{
|
use solana_transaction_status::{
|
||||||
|
@ -5,16 +6,20 @@ use solana_transaction_status::{
|
||||||
};
|
};
|
||||||
use std::{collections::HashMap, io::Error};
|
use std::{collections::HashMap, io::Error};
|
||||||
|
|
||||||
use crate::structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw};
|
use crate::{
|
||||||
|
structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw},
|
||||||
|
worker::metrics::METRIC_RPC_ERRORS_TOTAL,
|
||||||
|
};
|
||||||
|
|
||||||
const PROGRAM_DATA: &str = "Program data: ";
|
const PROGRAM_DATA: &str = "Program data: ";
|
||||||
|
|
||||||
pub fn parse_trades_from_openbook_txns(
|
pub fn parse_trades_from_openbook_txns(
|
||||||
txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>,
|
txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>,
|
||||||
target_markets: &HashMap<Pubkey, u8>,
|
sig_strings: &Vec<String>,
|
||||||
|
target_markets: &HashMap<Pubkey, String>,
|
||||||
) -> Vec<OpenBookFillEvent> {
|
) -> Vec<OpenBookFillEvent> {
|
||||||
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
|
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
|
||||||
for txn in txns.iter_mut() {
|
for (idx, txn) in txns.iter_mut().enumerate() {
|
||||||
match txn {
|
match txn {
|
||||||
Ok(t) => {
|
Ok(t) => {
|
||||||
if let Some(m) = &t.transaction.meta {
|
if let Some(m) = &t.transaction.meta {
|
||||||
|
@ -23,6 +28,7 @@ pub fn parse_trades_from_openbook_txns(
|
||||||
match parse_openbook_fills_from_logs(
|
match parse_openbook_fills_from_logs(
|
||||||
logs,
|
logs,
|
||||||
target_markets,
|
target_markets,
|
||||||
|
sig_strings[idx].clone(),
|
||||||
t.block_time.unwrap(),
|
t.block_time.unwrap(),
|
||||||
) {
|
) {
|
||||||
Some(mut events) => fills_vector.append(&mut events),
|
Some(mut events) => fills_vector.append(&mut events),
|
||||||
|
@ -34,7 +40,12 @@ pub fn parse_trades_from_openbook_txns(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(_) => {}
|
Err(e) => {
|
||||||
|
warn!("rpc error in get_transaction {}", e);
|
||||||
|
METRIC_RPC_ERRORS_TOTAL
|
||||||
|
.with_label_values(&["getTransaction"])
|
||||||
|
.inc();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fills_vector
|
fills_vector
|
||||||
|
@ -42,11 +53,12 @@ pub fn parse_trades_from_openbook_txns(
|
||||||
|
|
||||||
fn parse_openbook_fills_from_logs(
|
fn parse_openbook_fills_from_logs(
|
||||||
logs: &Vec<String>,
|
logs: &Vec<String>,
|
||||||
target_markets: &HashMap<Pubkey, u8>,
|
target_markets: &HashMap<Pubkey, String>,
|
||||||
|
signature: String,
|
||||||
block_time: i64,
|
block_time: i64,
|
||||||
) -> Option<Vec<OpenBookFillEvent>> {
|
) -> Option<Vec<OpenBookFillEvent>> {
|
||||||
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
|
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
|
||||||
for l in logs {
|
for (idx, l) in logs.iter().enumerate() {
|
||||||
match l.strip_prefix(PROGRAM_DATA) {
|
match l.strip_prefix(PROGRAM_DATA) {
|
||||||
Some(log) => {
|
Some(log) => {
|
||||||
let borsh_bytes = match anchor_lang::__private::base64::decode(log) {
|
let borsh_bytes = match anchor_lang::__private::base64::decode(log) {
|
||||||
|
@ -59,7 +71,7 @@ fn parse_openbook_fills_from_logs(
|
||||||
|
|
||||||
match event {
|
match event {
|
||||||
Ok(e) => {
|
Ok(e) => {
|
||||||
let fill_event = e.with_time(block_time);
|
let fill_event = e.into_event(signature.clone(), block_time, idx);
|
||||||
if target_markets.contains_key(&fill_event.market) {
|
if target_markets.contains_key(&fill_event.market) {
|
||||||
fills_vector.push(fill_event);
|
fills_vector.push(fill_event);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,106 +1,121 @@
|
||||||
|
use deadpool_postgres::Pool;
|
||||||
use futures::future::join_all;
|
use futures::future::join_all;
|
||||||
|
use log::{debug, info, warn};
|
||||||
use solana_client::{
|
use solana_client::{
|
||||||
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
|
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
|
||||||
rpc_config::RpcTransactionConfig,
|
rpc_config::RpcTransactionConfig,
|
||||||
};
|
};
|
||||||
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature};
|
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature};
|
||||||
use solana_transaction_status::UiTransactionEncoding;
|
use solana_transaction_status::UiTransactionEncoding;
|
||||||
use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
|
use std::{collections::HashMap, str::FromStr};
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
|
|
||||||
use crate::{structs::openbook::OpenBookFillEvent, utils::Config};
|
use crate::{
|
||||||
|
database::{
|
||||||
|
fetch::fetch_worker_transactions,
|
||||||
|
insert::{build_transactions_insert_statement, insert_fills_atomically},
|
||||||
|
},
|
||||||
|
structs::{openbook::OpenBookFillEvent, transaction::PgTransaction},
|
||||||
|
utils::{AnyhowWrap, Config, OPENBOOK_KEY},
|
||||||
|
worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL, METRIC_TRANSACTIONS_TOTAL},
|
||||||
|
};
|
||||||
|
|
||||||
use super::parsing::parse_trades_from_openbook_txns;
|
use super::parsing::parse_trades_from_openbook_txns;
|
||||||
|
|
||||||
pub async fn scrape(
|
|
||||||
config: &Config,
|
|
||||||
fill_sender: &Sender<OpenBookFillEvent>,
|
|
||||||
target_markets: &HashMap<Pubkey, u8>,
|
|
||||||
) {
|
|
||||||
let rpc_client =
|
|
||||||
RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::processed());
|
|
||||||
|
|
||||||
let before_slot = None;
|
pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<()> {
|
||||||
|
let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
scrape_transactions(
|
let rpc_config = GetConfirmedSignaturesForAddress2Config {
|
||||||
&rpc_client,
|
before: None,
|
||||||
before_slot,
|
until: None,
|
||||||
Some(150),
|
limit: None,
|
||||||
fill_sender,
|
commitment: Some(CommitmentConfig::confirmed()),
|
||||||
target_markets,
|
};
|
||||||
)
|
|
||||||
.await;
|
|
||||||
tokio::time::sleep(WaitDuration::from_millis(250)).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn scrape_transactions(
|
let sigs = match rpc_client
|
||||||
rpc_client: &RpcClient,
|
.get_signatures_for_address_with_config(&OPENBOOK_KEY, rpc_config)
|
||||||
before_sig: Option<Signature>,
|
.await
|
||||||
limit: Option<usize>,
|
{
|
||||||
fill_sender: &Sender<OpenBookFillEvent>,
|
Ok(sigs) => sigs,
|
||||||
target_markets: &HashMap<Pubkey, u8>,
|
Err(e) => {
|
||||||
) -> Option<Signature> {
|
warn!("rpc error in get_signatures_for_address_with_config: {}", e);
|
||||||
let rpc_config = GetConfirmedSignaturesForAddress2Config {
|
METRIC_RPC_ERRORS_TOTAL
|
||||||
before: before_sig,
|
.with_label_values(&["getSignaturesForAddress"])
|
||||||
until: None,
|
.inc();
|
||||||
limit,
|
continue;
|
||||||
commitment: Some(CommitmentConfig::confirmed()),
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut sigs = match rpc_client
|
|
||||||
.get_signatures_for_address_with_config(
|
|
||||||
&Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(),
|
|
||||||
rpc_config,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(s) => s,
|
|
||||||
Err(e) => {
|
|
||||||
println!("Error in get_signatures_for_address_with_config: {}", e);
|
|
||||||
return before_sig;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if sigs.is_empty() {
|
|
||||||
println!("No signatures found");
|
|
||||||
return before_sig;
|
|
||||||
}
|
|
||||||
|
|
||||||
let last = sigs.last().unwrap();
|
|
||||||
let request_last_sig = Signature::from_str(&last.signature).unwrap();
|
|
||||||
|
|
||||||
sigs.retain(|sig| sig.err.is_none());
|
|
||||||
if sigs.last().is_none() {
|
|
||||||
return Some(request_last_sig);
|
|
||||||
}
|
|
||||||
|
|
||||||
let txn_config = RpcTransactionConfig {
|
|
||||||
encoding: Some(UiTransactionEncoding::Json),
|
|
||||||
commitment: Some(CommitmentConfig::confirmed()),
|
|
||||||
max_supported_transaction_version: Some(0),
|
|
||||||
};
|
|
||||||
|
|
||||||
let signatures: Vec<_> = sigs
|
|
||||||
.into_iter()
|
|
||||||
.map(|sig| sig.signature.parse::<Signature>().unwrap())
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let txn_futs: Vec<_> = signatures
|
|
||||||
.iter()
|
|
||||||
.map(|s| rpc_client.get_transaction_with_config(s, txn_config))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let mut txns = join_all(txn_futs).await;
|
|
||||||
|
|
||||||
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
|
|
||||||
if !fills.is_empty() {
|
|
||||||
for fill in fills.into_iter() {
|
|
||||||
if let Err(_) = fill_sender.send(fill).await {
|
|
||||||
panic!("receiver dropped");
|
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
if sigs.is_empty() {
|
||||||
|
debug!("No signatures found, trying again");
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
let transactions: Vec<PgTransaction> = sigs
|
||||||
|
.into_iter()
|
||||||
|
.map(|s| PgTransaction::from_rpc_confirmed_transaction(s))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
debug!("Scraper writing: {:?} txns to DB\n", transactions.len());
|
||||||
|
let upsert_statement = build_transactions_insert_statement(transactions);
|
||||||
|
let client = pool.get().await?;
|
||||||
|
let num_txns = client
|
||||||
|
.execute(&upsert_statement, &[])
|
||||||
|
.await
|
||||||
|
.map_err_anyhow()?;
|
||||||
|
METRIC_TRANSACTIONS_TOTAL.inc_by(num_txns);
|
||||||
|
}
|
||||||
|
// TODO: graceful shutdown
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn scrape_fills(
|
||||||
|
worker_id: i32,
|
||||||
|
rpc_url: String,
|
||||||
|
pool: &Pool,
|
||||||
|
target_markets: &HashMap<Pubkey, String>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
debug!("Worker {} started \n", worker_id);
|
||||||
|
let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed());
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let transactions = fetch_worker_transactions(worker_id, pool).await?;
|
||||||
|
if transactions.len() == 0 {
|
||||||
|
debug!("No signatures found by worker {}", worker_id);
|
||||||
|
tokio::time::sleep(WaitDuration::from_secs(1)).await;
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
// for each signature, fetch the transaction
|
||||||
|
let txn_config = RpcTransactionConfig {
|
||||||
|
encoding: Some(UiTransactionEncoding::Json),
|
||||||
|
commitment: Some(CommitmentConfig::confirmed()),
|
||||||
|
max_supported_transaction_version: Some(0),
|
||||||
|
};
|
||||||
|
|
||||||
|
let sig_strings = transactions
|
||||||
|
.iter()
|
||||||
|
.map(|t| t.signature.clone())
|
||||||
|
.collect::<Vec<String>>();
|
||||||
|
|
||||||
|
let signatures: Vec<_> = transactions
|
||||||
|
.into_iter()
|
||||||
|
.map(|t| t.signature.parse::<Signature>().unwrap())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let txn_futs: Vec<_> = signatures
|
||||||
|
.iter()
|
||||||
|
.map(|s| rpc_client.get_transaction_with_config(s, txn_config))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let mut txns = join_all(txn_futs).await;
|
||||||
|
|
||||||
|
// TODO: reenable total fills metric
|
||||||
|
let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets);
|
||||||
|
|
||||||
|
// Write any fills to the database, and update the transactions as processed
|
||||||
|
insert_fills_atomically(pool, worker_id, fills, sig_strings).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Some(request_last_sig)
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue