feat: can backfill fills (slowly) and compile the project offline

This commit is contained in:
dboures 2023-03-12 18:32:12 -05:00
parent 53f55aa669
commit fb05280689
No known key found for this signature in database
GPG Key ID: AB3790129D478852
16 changed files with 614 additions and 158 deletions

View File

@ -1570,6 +1570,9 @@ name = "either"
version = "1.8.0" version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797" checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "encode_unicode" name = "encode_unicode"
@ -5982,9 +5985,12 @@ dependencies = [
"dotenvy", "dotenvy",
"either", "either",
"heck 0.4.0", "heck 0.4.0",
"hex",
"once_cell", "once_cell",
"proc-macro2 1.0.50", "proc-macro2 1.0.50",
"quote 1.0.23", "quote 1.0.23",
"serde",
"serde_json",
"sha2 0.10.6", "sha2 0.10.6",
"sqlx-core", "sqlx-core",
"sqlx-rt", "sqlx-rt",

View File

@ -11,7 +11,7 @@ tokio-stream = "0.1"
jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] } jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] }
sqlx = { version = "0.6", features = [ "runtime-tokio-native-tls" , "postgres", "chrono", "decimal" ] } sqlx = { version = "0.6", features = [ "runtime-tokio-native-tls" , "postgres", "chrono", "decimal", "offline" ] }
chrono = "0.4.23" chrono = "0.4.23"
solana-client = "=1.14.13" solana-client = "=1.14.13"

View File

@ -0,0 +1,392 @@
{
"db": "PostgreSQL",
"101e71cebdce376dec54e861042d6ee0afd56ecf79738886d85fe289059c5902": {
"describe": {
"columns": [
{
"name": "time!",
"ordinal": 0,
"type_info": "Timestamptz"
},
{
"name": "bid!",
"ordinal": 1,
"type_info": "Bool"
},
{
"name": "maker!",
"ordinal": 2,
"type_info": "Bool"
},
{
"name": "native_qty_paid!",
"ordinal": 3,
"type_info": "Numeric"
},
{
"name": "native_qty_received!",
"ordinal": 4,
"type_info": "Numeric"
},
{
"name": "native_fee_or_rebate!",
"ordinal": 5,
"type_info": "Numeric"
}
],
"nullable": [
false,
false,
false,
false,
false,
false
],
"parameters": {
"Left": [
"Text",
"Timestamptz",
"Timestamptz"
]
}
},
"query": "SELECT \n time as \"time!\",\n bid as \"bid!\",\n maker as \"maker!\",\n native_qty_paid as \"native_qty_paid!\",\n native_qty_received as \"native_qty_received!\",\n native_fee_or_rebate as \"native_fee_or_rebate!\" \n from fills \n where market = $1\n and time >= $2\n and time < $3 \n ORDER BY time asc"
},
"35e8220c601aca620da1cfcb978c8b7a64dcbf15550521b418cf509015cd88d8": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "CREATE TABLE IF NOT EXISTS fills (\n id numeric PRIMARY KEY,\n time timestamptz not null,\n market text not null,\n open_orders text not null,\n open_orders_owner text not null,\n bid bool not null,\n maker bool not null,\n native_qty_paid numeric not null,\n native_qty_received numeric not null,\n native_fee_or_rebate numeric not null,\n fee_tier text not null,\n order_id text not null\n )"
},
"4bab7d4329b2969b2ba610546c660207740c9bafe644df55fa57101df30e4899": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)"
},
"61ce67d221cf35cea33940529f7d38af1514961245f4abc95b872e88cc0dc1e0": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "DO $$\n BEGIN\n IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'unique_candles') THEN\n ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market, start_time, resolution);\n END IF;\n END $$"
},
"817ee7903cb5095f85fb787beff04ace3a452cf8749205bb230e41d8c9e03c4a": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)"
},
"830392e8e03b8e34490df87905873ffea0749f6d321bd32a99b90766d3d7e167": {
"describe": {
"columns": [
{
"name": "start_time!",
"ordinal": 0,
"type_info": "Timestamptz"
},
{
"name": "end_time!",
"ordinal": 1,
"type_info": "Timestamptz"
},
{
"name": "resolution!",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "market!",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "open!",
"ordinal": 4,
"type_info": "Numeric"
},
{
"name": "close!",
"ordinal": 5,
"type_info": "Numeric"
},
{
"name": "high!",
"ordinal": 6,
"type_info": "Numeric"
},
{
"name": "low!",
"ordinal": 7,
"type_info": "Numeric"
},
{
"name": "volume!",
"ordinal": 8,
"type_info": "Numeric"
},
{
"name": "complete!",
"ordinal": 9,
"type_info": "Bool"
}
],
"nullable": [
true,
true,
true,
true,
true,
true,
true,
true,
true,
true
],
"parameters": {
"Left": [
"Text",
"Text"
]
}
},
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market as \"market!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market = $1\n and resolution = $2\n and complete = true\n ORDER BY start_time desc LIMIT 1"
},
"900d4bd7a81a308648cd47eecb3b86b5e7afbbdc34e93ef35393ceab00fb8552": {
"describe": {
"columns": [
{
"name": "start_time!",
"ordinal": 0,
"type_info": "Timestamptz"
},
{
"name": "end_time!",
"ordinal": 1,
"type_info": "Timestamptz"
},
{
"name": "resolution!",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "market!",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "open!",
"ordinal": 4,
"type_info": "Numeric"
},
{
"name": "close!",
"ordinal": 5,
"type_info": "Numeric"
},
{
"name": "high!",
"ordinal": 6,
"type_info": "Numeric"
},
{
"name": "low!",
"ordinal": 7,
"type_info": "Numeric"
},
{
"name": "volume!",
"ordinal": 8,
"type_info": "Numeric"
},
{
"name": "complete!",
"ordinal": 9,
"type_info": "Bool"
}
],
"nullable": [
true,
true,
true,
true,
true,
true,
true,
true,
true,
true
],
"parameters": {
"Left": [
"Text",
"Text"
]
}
},
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market as \"market!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market = $1\n and resolution = $2\n ORDER BY start_time asc LIMIT 1"
},
"aa9adbbc7f215e28cc07b615f30ad9d3f415f25260cc87a1556225f01e0ef3be": {
"describe": {
"columns": [
{
"name": "start_time!",
"ordinal": 0,
"type_info": "Timestamptz"
},
{
"name": "end_time!",
"ordinal": 1,
"type_info": "Timestamptz"
},
{
"name": "resolution!",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "market!",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "open!",
"ordinal": 4,
"type_info": "Numeric"
},
{
"name": "close!",
"ordinal": 5,
"type_info": "Numeric"
},
{
"name": "high!",
"ordinal": 6,
"type_info": "Numeric"
},
{
"name": "low!",
"ordinal": 7,
"type_info": "Numeric"
},
{
"name": "volume!",
"ordinal": 8,
"type_info": "Numeric"
},
{
"name": "complete!",
"ordinal": 9,
"type_info": "Bool"
}
],
"nullable": [
true,
true,
true,
true,
true,
true,
true,
true,
true,
true
],
"parameters": {
"Left": [
"Text",
"Text",
"Timestamptz",
"Timestamptz"
]
}
},
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market as \"market!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market = $1\n and resolution = $2\n and start_time >= $3\n and end_time <= $4\n and complete = true\n ORDER BY start_time asc"
},
"c1ed9567bf732245975182fbf216da5858f89df285f8a5f17fb58ac1f33cc0e9": {
"describe": {
"columns": [
{
"name": "time!",
"ordinal": 0,
"type_info": "Timestamptz"
},
{
"name": "bid!",
"ordinal": 1,
"type_info": "Bool"
},
{
"name": "maker!",
"ordinal": 2,
"type_info": "Bool"
},
{
"name": "native_qty_paid!",
"ordinal": 3,
"type_info": "Numeric"
},
{
"name": "native_qty_received!",
"ordinal": 4,
"type_info": "Numeric"
},
{
"name": "native_fee_or_rebate!",
"ordinal": 5,
"type_info": "Numeric"
}
],
"nullable": [
false,
false,
false,
false,
false,
false
],
"parameters": {
"Left": [
"Text"
]
}
},
"query": "SELECT \n time as \"time!\",\n bid as \"bid!\",\n maker as \"maker!\",\n native_qty_paid as \"native_qty_paid!\",\n native_qty_received as \"native_qty_received!\",\n native_fee_or_rebate as \"native_fee_or_rebate!\" \n from fills \n where market = $1 \n ORDER BY time asc LIMIT 1"
},
"ebf9f73491ea62c20a25245080abb0be928e22a0d622fafa48bb01db34e84b94": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market, start_time, resolution)"
},
"ef6422f34cc3e649a90fbbbc6ad668de6ee4b0994c52f72d0295985085d7047b": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "CREATE TABLE IF NOT EXISTS candles (\n id serial,\n market text,\n start_time timestamptz,\n end_time timestamptz,\n resolution text,\n open numeric,\n close numeric,\n high numeric,\n low numeric,\n volume numeric,\n complete bool\n )"
}
}

View File

@ -4,7 +4,7 @@ use sqlx::{types::Decimal, Pool, Postgres};
use std::cmp::{max, min}; use std::cmp::{max, min};
use crate::{ use crate::{
candle_batching::DAY, candle_batching::day,
database::{ database::{
fetch::{fetch_candles_from, fetch_earliest_candle, fetch_latest_finished_candle}, fetch::{fetch_candles_from, fetch_earliest_candle, fetch_latest_finished_candle},
Candle, Resolution, Candle, Resolution,
@ -22,7 +22,7 @@ pub async fn batch_higher_order_candles(
match latest_candle { match latest_candle {
Some(candle) => { Some(candle) => {
let start_time = candle.end_time; let start_time = candle.end_time;
let end_time = start_time + DAY(); let end_time = start_time + day();
// println!("candle.end_time: {:?}", candle.end_time); // println!("candle.end_time: {:?}", candle.end_time);
// println!("start_time: {:?}", start_time); // println!("start_time: {:?}", start_time);
let mut constituent_candles = fetch_candles_from( let mut constituent_candles = fetch_candles_from(
@ -63,8 +63,8 @@ pub async fn batch_higher_order_candles(
let start_time = constituent_candle let start_time = constituent_candle
.unwrap() .unwrap()
.start_time .start_time
.duration_trunc(DAY())?; .duration_trunc(day())?;
let end_time = start_time + DAY(); let end_time = start_time + day();
let mut constituent_candles = fetch_candles_from( let mut constituent_candles = fetch_candles_from(
pool, pool,
@ -106,7 +106,7 @@ fn combine_into_higher_order_candles(
let empty_candle = let empty_candle =
Candle::create_empty_candle(constituent_candles[0].market.clone(), target_resolution); Candle::create_empty_candle(constituent_candles[0].market.clone(), target_resolution);
let mut combined_candles = let mut combined_candles =
vec![empty_candle; (DAY().num_minutes() / duration.num_minutes()) as usize]; vec![empty_candle; (day().num_minutes() / duration.num_minutes()) as usize];
println!("candles_len: {}", candles_len); println!("candles_len: {}", candles_len);

View File

@ -9,7 +9,7 @@ use crate::database::{
Candle, MarketInfo, PgOpenBookFill, Resolution, Candle, MarketInfo, PgOpenBookFill, Resolution,
}; };
use super::DAY; use super::day;
pub async fn batch_1m_candles( pub async fn batch_1m_candles(
pool: &Pool<Postgres>, pool: &Pool<Postgres>,
@ -23,7 +23,7 @@ pub async fn batch_1m_candles(
Some(candle) => { Some(candle) => {
let start_time = candle.end_time; let start_time = candle.end_time;
let end_time = min( let end_time = min(
start_time + DAY(), start_time + day(),
Utc::now().duration_trunc(Duration::minutes(1))?, Utc::now().duration_trunc(Duration::minutes(1))?,
); );
let mut fills = let mut fills =
@ -50,7 +50,7 @@ pub async fn batch_1m_candles(
.time .time
.duration_trunc(Duration::minutes(1))?; .duration_trunc(Duration::minutes(1))?;
let end_time = min( let end_time = min(
start_time + DAY(), start_time + day(),
Utc::now().duration_trunc(Duration::minutes(1))?, Utc::now().duration_trunc(Duration::minutes(1))?,
); );
let mut fills = let mut fills =

View File

@ -1,27 +1,19 @@
pub mod higher_order_candles; pub mod higher_order_candles;
pub mod minute_candles; pub mod minute_candles;
use std::cmp::{max, min}; use chrono::Duration;
use sqlx::{Pool, Postgres};
use chrono::{DateTime, Duration, DurationRound, Utc};
use num_traits::{FromPrimitive, Zero};
use sqlx::{types::Decimal, Pool, Postgres};
use strum::IntoEnumIterator; use strum::IntoEnumIterator;
use tokio::{sync::mpsc::Sender, time::sleep}; use tokio::{sync::mpsc::Sender, time::sleep};
use crate::{ use crate::{
candle_batching::minute_candles::batch_1m_candles, candle_batching::minute_candles::batch_1m_candles,
database::{ database::{Candle, MarketInfo, Resolution},
fetch::{
fetch_candles_from, fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle,
},
Candle, MarketInfo, PgOpenBookFill, Resolution,
},
}; };
use self::higher_order_candles::batch_higher_order_candles; use self::higher_order_candles::batch_higher_order_candles;
pub fn DAY() -> Duration { pub fn day() -> Duration {
Duration::days(1) Duration::days(1)
} }
@ -30,7 +22,7 @@ pub async fn batch_candles(
candles_sender: &Sender<Vec<Candle>>, candles_sender: &Sender<Vec<Candle>>,
markets: Vec<MarketInfo>, markets: Vec<MarketInfo>,
) { ) {
// tokio spawn a taks for every market // TODO: tokio spawn a taks for every market
loop { loop {
let m = MarketInfo { let m = MarketInfo {

View File

@ -1,18 +1,7 @@
use chrono::Utc;
use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
use std::{ use std::time::Duration;
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
time::{Duration, Instant},
};
use tokio::sync::mpsc::{error::TryRecvError, Receiver};
use crate::{ use crate::utils::{AnyhowWrap, Config};
trade_fetching::parsing::OpenBookFillEventLog,
utils::{AnyhowWrap, Config},
};
use super::MarketInfo;
pub async fn connect_to_database(config: &Config) -> anyhow::Result<Pool<Postgres>> { pub async fn connect_to_database(config: &Config) -> anyhow::Result<Pool<Postgres>> {
loop { loop {
@ -29,7 +18,7 @@ pub async fn connect_to_database(config: &Config) -> anyhow::Result<Pool<Postgre
} }
} }
pub async fn setup_database(pool: &Pool<Postgres>, markets: Vec<MarketInfo>) -> anyhow::Result<()> { pub async fn setup_database(pool: &Pool<Postgres>) -> anyhow::Result<()> {
let candles_table_fut = create_candles_table(pool); let candles_table_fut = create_candles_table(pool);
let fills_table_fut = create_fills_table(pool); let fills_table_fut = create_fills_table(pool);
let result = tokio::try_join!(candles_table_fut, fills_table_fut); let result = tokio::try_join!(candles_table_fut, fills_table_fut);
@ -71,7 +60,12 @@ pub async fn create_candles_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
).execute(&mut tx).await?; ).execute(&mut tx).await?;
sqlx::query!( sqlx::query!(
"ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market, start_time, resolution)" "DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'unique_candles') THEN
ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market, start_time, resolution);
END IF;
END $$"
) )
.execute(&mut tx) .execute(&mut tx)
.await?; .await?;
@ -111,7 +105,3 @@ pub async fn create_fills_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
tx.commit().await.map_err_anyhow() tx.commit().await.map_err_anyhow()
} }
pub async fn save_candles() {
unimplemented!("TODO");
}

View File

@ -1,16 +1,13 @@
use chrono::Utc; use chrono::Utc;
use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; use sqlx::{Pool, Postgres};
use std::{ use std::{
collections::hash_map::DefaultHasher, collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher}, hash::{Hash, Hasher},
time::{Duration, Instant}, time::Instant,
}; };
use tokio::sync::mpsc::{error::TryRecvError, Receiver}; use tokio::sync::mpsc::{error::TryRecvError, Receiver};
use crate::{ use crate::{trade_fetching::parsing::OpenBookFillEventLog, utils::AnyhowWrap};
trade_fetching::parsing::OpenBookFillEventLog,
utils::{AnyhowWrap, Config},
};
use super::Candle; use super::Candle;
@ -20,13 +17,12 @@ pub async fn persist_fill_events(
) { ) {
loop { loop {
let start = Instant::now(); let start = Instant::now();
let mut write_batch = Vec::new(); let mut write_batch = HashMap::new();
while write_batch.len() < 10 || start.elapsed().as_secs() > 10 { while write_batch.len() < 10 || start.elapsed().as_secs() > 10 {
match fill_receiver.try_recv() { match fill_receiver.try_recv() {
Ok(event) => { Ok(event) => {
if !write_batch.contains(&event) { if !write_batch.contains_key(&event) {
// O(n) write_batch.insert(event, 0);
write_batch.push(event)
} }
} }
Err(TryRecvError::Empty) => break, Err(TryRecvError::Empty) => break,
@ -48,9 +44,9 @@ pub async fn persist_fill_events(
} }
} }
fn build_fills_upsert_statement(events: Vec<OpenBookFillEventLog>) -> String { fn build_fills_upsert_statement(events: HashMap<OpenBookFillEventLog, u8>) -> String {
let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES"); let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES");
for (idx, event) in events.iter().enumerate() { for (idx, event) in events.keys().enumerate() {
let mut hasher = DefaultHasher::new(); let mut hasher = DefaultHasher::new();
event.hash(&mut hasher); event.hash(&mut hasher);
let val_str = format!( let val_str = format!(
@ -79,7 +75,6 @@ fn build_fills_upsert_statement(events: Vec<OpenBookFillEventLog>) -> String {
let handle_conflict = "ON CONFLICT (id) DO UPDATE SET market=excluded.market"; let handle_conflict = "ON CONFLICT (id) DO UPDATE SET market=excluded.market";
stmt = format!("{} {}", stmt, handle_conflict); stmt = format!("{} {}", stmt, handle_conflict);
print!("{}", stmt);
stmt stmt
} }

View File

@ -5,7 +5,7 @@ use num_traits::Zero;
use sqlx::types::Decimal; use sqlx::types::Decimal;
use strum::EnumIter; use strum::EnumIter;
use crate::candle_batching::DAY; use crate::candle_batching::day;
pub mod fetch; pub mod fetch;
pub mod initialize; pub mod initialize;
@ -69,7 +69,7 @@ impl Resolution {
Resolution::R1h => Duration::hours(1), Resolution::R1h => Duration::hours(1),
Resolution::R2h => Duration::hours(2), Resolution::R2h => Duration::hours(2),
Resolution::R4h => Duration::hours(4), Resolution::R4h => Duration::hours(4),
Resolution::R1d => DAY(), Resolution::R1d => day(),
} }
} }
} }
@ -115,7 +115,7 @@ pub struct PgOpenBookFill {
pub native_fee_or_rebate: Decimal, pub native_fee_or_rebate: Decimal,
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct MarketInfo { pub struct MarketInfo {
pub name: String, pub name: String,
pub address: String, pub address: String,

View File

@ -1,13 +1,19 @@
use std::{collections::HashMap, str::FromStr};
use crate::{ use crate::{
candle_batching::batch_candles, candle_batching::batch_candles,
database::{fetch::fetch_latest_finished_candle, insert::persist_candles, Candle, Resolution}, database::{
trade_fetching::{parsing::OpenBookFillEventLog, scrape::fetch_market_infos}, insert::{persist_candles, persist_fill_events},
Candle,
},
trade_fetching::{
backfill::backfill,
parsing::OpenBookFillEventLog,
scrape::{fetch_market_infos, scrape},
},
utils::Config, utils::Config,
}; };
use database::{ use database::initialize::{connect_to_database, setup_database};
fetch::fetch_earliest_fill,
initialize::{connect_to_database, setup_database},
};
use dotenv; use dotenv;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -25,41 +31,52 @@ async fn main() -> anyhow::Result<()> {
let database_url: String = dotenv::var("DATABASE_URL").unwrap(); let database_url: String = dotenv::var("DATABASE_URL").unwrap();
let config = Config { let config = Config {
rpc_url, rpc_url: rpc_url.clone(),
database_url, database_url,
max_pg_pool_connections: 5, max_pg_pool_connections: 5,
}; };
let markets = utils::load_markets("/Users/dboures/dev/openbook-candles/markets.json"); let markets = utils::load_markets("/Users/dboures/dev/openbook-candles/markets.json");
let market_infos = fetch_market_infos(&config, markets).await?; let market_infos = fetch_market_infos(&config, markets).await?;
println!("{:?}", market_infos); let mut target_markets = HashMap::new();
for m in market_infos.clone() {
target_markets.insert(Pubkey::from_str(&m.address)?, 0);
}
println!("{:?}", target_markets);
let pool = connect_to_database(&config).await?; let pool = connect_to_database(&config).await?;
// setup_database(&pool, market_infos).await?; setup_database(&pool).await?;
// let (fill_sender, fill_receiver) = mpsc::channel::<OpenBookFillEventLog>(1000); let (fill_sender, fill_receiver) = mpsc::channel::<OpenBookFillEventLog>(1000);
// tokio::spawn(async move { let bf_sender = fill_sender.clone();
// trade_fetching::scrape::scrape(&config, fill_sender.clone()).await; TODO: send the vec, it's okay let targets = target_markets.clone();
// });
// database::database::handle_fill_events(&pool, fill_receiver).await;
// trade_fetching::websocket::listen_logs().await?;
let (candle_sender, candle_receiver) = mpsc::channel::<Vec<Candle>>(1000);
let batch_pool = pool.clone();
tokio::spawn(async move { tokio::spawn(async move {
batch_candles(batch_pool, &candle_sender, market_infos).await; backfill(&rpc_url.clone(), &bf_sender, &targets).await;
}); });
let persist_pool = pool.clone(); tokio::spawn(async move {
scrape(&config, &fill_sender, &target_markets).await; //TODO: send the vec, it's okay
});
let fills_pool = pool.clone();
tokio::spawn(async move {
persist_fill_events(&fills_pool, fill_receiver).await;
});
// let (candle_sender, candle_receiver) = mpsc::channel::<Vec<Candle>>(1000);
// let batch_pool = pool.clone();
// tokio::spawn(async move { // tokio::spawn(async move {
persist_candles(persist_pool, candle_receiver).await; // batch_candles(batch_pool, &candle_sender, market_infos).await;
// }); // });
loop {} // let persist_pool = pool.clone();
// // tokio::spawn(async move {
// persist_candles(persist_pool, candle_receiver).await;
// // });
loop {} // tokio drop if one thread drops or something
Ok(()) Ok(())
} }

View File

@ -0,0 +1,66 @@
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use solana_client::{rpc_client::RpcClient, rpc_config::RpcTransactionConfig};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature};
use solana_transaction_status::UiTransactionEncoding;
use std::collections::HashMap;
use tokio::sync::mpsc::Sender;
use crate::trade_fetching::scrape::scrape_transactions;
use super::parsing::OpenBookFillEventLog;
pub async fn backfill(
rpc_url: &String,
fill_sender: &Sender<OpenBookFillEventLog>,
target_markets: &HashMap<Pubkey, u8>,
) {
let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::processed());
println!("backfill started");
let mut before_slot: Option<Signature> = None;
let end_time = (Utc::now() - Duration::days(1)).timestamp();
loop {
let last_sig_option =
scrape_transactions(&rpc_client, before_slot, None, fill_sender, target_markets).await;
if last_sig_option.is_none() {
println!("last sig is none");
continue;
}
let txn_config = RpcTransactionConfig {
encoding: Some(UiTransactionEncoding::Base64),
commitment: Some(CommitmentConfig::confirmed()),
max_supported_transaction_version: Some(0),
};
match rpc_client.get_transaction_with_config(&last_sig_option.unwrap(), txn_config) {
Ok(txn) => {
let unix_sig_time = rpc_client.get_block_time(txn.slot).unwrap();
if unix_sig_time < end_time {
break;
}
let time_left = backfill_time_left(unix_sig_time, end_time);
println!(
"{} minutes ~ {} days remaining in the backfill\n",
time_left.num_minutes(),
time_left.num_days()
);
}
Err(e) => {
println!("error: {:?}", e);
continue;
}
}
before_slot = last_sig_option;
}
print!("Backfill complete \n");
}
fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration {
let naive_cur = NaiveDateTime::from_timestamp_millis(current_time * 1000).unwrap();
let naive_bf = NaiveDateTime::from_timestamp_millis(backfill_end * 1000).unwrap();
let cur_date = DateTime::<Utc>::from_utc(naive_cur, Utc);
let bf_date = DateTime::<Utc>::from_utc(naive_bf, Utc);
cur_date - bf_date
}

View File

@ -1,3 +1,4 @@
pub mod backfill;
pub mod parsing; pub mod parsing;
pub mod scrape; pub mod scrape;
pub mod websocket; pub mod websocket;

View File

@ -2,7 +2,7 @@ use solana_client::client_error::Result as ClientResult;
use solana_transaction_status::{ use solana_transaction_status::{
option_serializer::OptionSerializer, EncodedConfirmedTransactionWithStatusMeta, option_serializer::OptionSerializer, EncodedConfirmedTransactionWithStatusMeta,
}; };
use std::io::Error; use std::{collections::HashMap, io::Error};
use anchor_lang::{event, AnchorDeserialize, AnchorSerialize}; use anchor_lang::{event, AnchorDeserialize, AnchorSerialize};
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
@ -10,7 +10,7 @@ use solana_sdk::pubkey::Pubkey;
const PROGRAM_DATA: &str = "Program data: "; const PROGRAM_DATA: &str = "Program data: ";
#[event] #[event]
#[derive(Debug, Clone, PartialEq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OpenBookFillEventLog { pub struct OpenBookFillEventLog {
pub market: Pubkey, pub market: Pubkey,
pub open_orders: Pubkey, pub open_orders: Pubkey,
@ -84,6 +84,7 @@ pub struct MarketState {
pub fn parse_trades_from_openbook_txns( pub fn parse_trades_from_openbook_txns(
txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>, txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>,
target_markets: &HashMap<Pubkey, u8>,
) -> Vec<OpenBookFillEventLog> { ) -> Vec<OpenBookFillEventLog> {
let mut fills_vector = Vec::<OpenBookFillEventLog>::new(); let mut fills_vector = Vec::<OpenBookFillEventLog>::new();
for txn in txns.iter_mut() { for txn in txns.iter_mut() {
@ -92,7 +93,7 @@ pub fn parse_trades_from_openbook_txns(
if let Some(m) = &t.transaction.meta { if let Some(m) = &t.transaction.meta {
match &m.log_messages { match &m.log_messages {
OptionSerializer::Some(logs) => { OptionSerializer::Some(logs) => {
match parse_openbook_fills_from_logs(logs) { match parse_openbook_fills_from_logs(logs, target_markets) {
Some(mut events) => fills_vector.append(&mut events), Some(mut events) => fills_vector.append(&mut events),
None => {} None => {}
} }
@ -108,7 +109,10 @@ pub fn parse_trades_from_openbook_txns(
fills_vector fills_vector
} }
fn parse_openbook_fills_from_logs(logs: &Vec<String>) -> Option<Vec<OpenBookFillEventLog>> { fn parse_openbook_fills_from_logs(
logs: &Vec<String>,
target_markets: &HashMap<Pubkey, u8>,
) -> Option<Vec<OpenBookFillEventLog>> {
let mut fills_vector = Vec::<OpenBookFillEventLog>::new(); let mut fills_vector = Vec::<OpenBookFillEventLog>::new();
for l in logs { for l in logs {
match l.strip_prefix(PROGRAM_DATA) { match l.strip_prefix(PROGRAM_DATA) {
@ -123,7 +127,9 @@ fn parse_openbook_fills_from_logs(logs: &Vec<String>) -> Option<Vec<OpenBookFill
match event { match event {
Ok(e) => { Ok(e) => {
fills_vector.push(e); if target_markets.contains_key(&e.market) {
fills_vector.push(e);
}
} }
_ => continue, _ => continue,
} }

View File

@ -10,7 +10,7 @@ use solana_sdk::{
}; };
use solana_transaction_status::{EncodedConfirmedTransactionWithStatusMeta, UiTransactionEncoding}; use solana_transaction_status::{EncodedConfirmedTransactionWithStatusMeta, UiTransactionEncoding};
use spl_token::state::Mint; use spl_token::state::Mint;
use std::{collections::HashMap, str::FromStr, time::Duration}; use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use crate::{ use crate::{
@ -21,58 +21,39 @@ use crate::{
use super::parsing::{parse_trades_from_openbook_txns, OpenBookFillEventLog}; use super::parsing::{parse_trades_from_openbook_txns, OpenBookFillEventLog};
pub async fn scrape(config: &Config, fill_sender: Sender<OpenBookFillEventLog>) { pub async fn scrape(
config: &Config,
fill_sender: &Sender<OpenBookFillEventLog>,
target_markets: &HashMap<Pubkey, u8>,
) {
let url = &config.rpc_url; let url = &config.rpc_url;
let rpc_client = RpcClient::new_with_commitment(url, CommitmentConfig::processed()); let rpc_client = RpcClient::new_with_commitment(url, CommitmentConfig::processed());
let before_slot = None; let before_slot = None;
loop { loop {
scrape_transactions(&rpc_client, before_slot, &fill_sender).await; scrape_transactions(
&rpc_client,
print!("Ding fires are done \n\n"); before_slot,
tokio::time::sleep(Duration::from_millis(500)).await; Some(150),
fill_sender,
target_markets,
)
.await;
tokio::time::sleep(WaitDuration::from_millis(250)).await;
} }
} }
// pub async fn backfill(config: &Config, fill_sender: Sender<OpenBookFillEventLog>) {
// let url = &config.rpc_url;
// let rpc_client = RpcClient::new_with_commitment(url, CommitmentConfig::processed());
// let mut before_slot: Option<Signature> = None;
// loop {
// let last_sig_option = scrape_transactions(&rpc_client, before_slot, &fill_sender).await;
// if last_sig_option.is_none() {
// continue;
// }
// match rpc_client.get_transaction(&last_sig_option.unwrap(), UiTransactionEncoding::Json) {
// Ok(txn) => {
// let unix_sig_time = rpc_client.get_block_time(txn.slot).unwrap();
// if unix_sig_time > 0 {
// // TODO: is 3 months in past
// break;
// }
// println!("backfill at {}", unix_sig_time);
// }
// Err(_) => continue,
// }
// before_slot = last_sig_option;
// }
// print!("Backfill complete \n");
// }
pub async fn scrape_transactions( pub async fn scrape_transactions(
rpc_client: &RpcClient, rpc_client: &RpcClient,
before_slot: Option<Signature>, before_sig: Option<Signature>,
limit: Option<usize>,
fill_sender: &Sender<OpenBookFillEventLog>, fill_sender: &Sender<OpenBookFillEventLog>,
target_markets: &HashMap<Pubkey, u8>,
) -> Option<Signature> { ) -> Option<Signature> {
let rpc_config = GetConfirmedSignaturesForAddress2Config { let rpc_config = GetConfirmedSignaturesForAddress2Config {
before: before_slot, before: before_sig,
until: None, until: None,
limit: Some(150), limit,
commitment: Some(CommitmentConfig::confirmed()), commitment: Some(CommitmentConfig::confirmed()),
}; };
@ -81,11 +62,24 @@ pub async fn scrape_transactions(
rpc_config, rpc_config,
) { ) {
Ok(s) => s, Ok(s) => s,
Err(_) => return before_slot, // TODO: add error log Err(e) => {
println!("Error in get_signatures_for_address_with_config: {}", e);
return before_sig;
}
}; };
if sigs.len() == 0 {
println!("No signatures found");
return before_sig;
}
let last = sigs.last().clone().unwrap();
let request_last_sig = Signature::from_str(&last.signature).unwrap();
sigs.retain(|sig| sig.err.is_none()); sigs.retain(|sig| sig.err.is_none());
let last_sig = sigs.last().unwrap().clone(); // Failed here if sigs.last().is_none() {
return Some(request_last_sig);
}
let txn_config = RpcTransactionConfig { let txn_config = RpcTransactionConfig {
encoding: Some(UiTransactionEncoding::Json), encoding: Some(UiTransactionEncoding::Json),
@ -103,7 +97,7 @@ pub async fn scrape_transactions(
}) })
.collect::<Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>>(); // TODO: am I actually getting all the txns? .collect::<Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>>(); // TODO: am I actually getting all the txns?
let fills = parse_trades_from_openbook_txns(&mut txns); let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
if fills.len() > 0 { if fills.len() > 0 {
for fill in fills.into_iter() { for fill in fills.into_iter() {
if let Err(_) = fill_sender.send(fill).await { if let Err(_) = fill_sender.send(fill).await {
@ -112,7 +106,7 @@ pub async fn scrape_transactions(
} }
} }
Some(Signature::from_str(&last_sig.signature).unwrap()) Some(request_last_sig)
} }
pub async fn fetch_market_infos( pub async fn fetch_market_infos(
@ -181,9 +175,6 @@ pub async fn fetch_market_infos(
.get_multiple_accounts_with_config(&mint_keys, rpc_config) .get_multiple_accounts_with_config(&mint_keys, rpc_config)
.unwrap() .unwrap()
.value; .value;
// println!("{:?}", mint_results);
// println!("{:?}", mint_keys);
// println!("{:?}", mint_results.len());
for i in 0..mint_results.len() { for i in 0..mint_results.len() {
let mut mint_account = mint_results[i].as_ref().unwrap().clone(); let mut mint_account = mint_results[i].as_ref().unwrap().clone();
let mut mint_bytes: &[u8] = &mut mint_account.data[..]; let mut mint_bytes: &[u8] = &mut mint_account.data[..];

View File

@ -1,28 +1,28 @@
use jsonrpc_core_client::transports::ws; // use jsonrpc_core_client::transports::ws;
use anchor_client::{ // use anchor_client::{
anchor_lang::{self, event, AnchorDeserialize, AnchorSerialize, Discriminator}, // anchor_lang::{self, event, AnchorDeserialize, AnchorSerialize, Discriminator},
ClientError as AnchorClientError, Cluster, // ClientError as AnchorClientError, Cluster,
}; // };
use log::*; // use log::*;
use solana_account_decoder::UiAccountEncoding; // use solana_account_decoder::UiAccountEncoding;
use solana_client::{ // use solana_client::{
pubsub_client::{PubsubClient, PubsubClientSubscription}, // pubsub_client::{PubsubClient, PubsubClientSubscription},
rpc_config::{ // rpc_config::{
RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcTransactionLogsConfig, // RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcTransactionLogsConfig,
RpcTransactionLogsFilter, // RpcTransactionLogsFilter,
}, // },
rpc_response::{Response, RpcKeyedAccount, RpcLogsResponse}, // rpc_response::{Response, RpcKeyedAccount, RpcLogsResponse},
}; // };
use solana_rpc::rpc_pubsub::RpcSolPubSubClient; // use solana_rpc::rpc_pubsub::RpcSolPubSubClient;
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Keypair}; // use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Keypair};
use std::{io::Error, rc::Rc, str::FromStr, time::Duration}; // use std::{io::Error, rc::Rc, str::FromStr, time::Duration};
use crate::utils::AnyhowWrap; // use crate::utils::AnyhowWrap;
use crate::{ // use crate::{
database::initialize::{connect_to_database, setup_database}, // database::initialize::{connect_to_database, setup_database},
utils::Config, // utils::Config,
}; // };
// use super::parsing::parse_and_save_logs; // use super::parsing::parse_and_save_logs;

View File

@ -1,5 +1,5 @@
use serde_derive::Deserialize; use serde_derive::Deserialize;
use std::{fs::File, io::Read}; use std::fs::File;
pub trait AnyhowWrap { pub trait AnyhowWrap {
type Value; type Value;