This commit is contained in:
dboures 2023-03-06 00:52:42 -06:00
commit 3866211c85
No known key found for this signature in database
GPG Key ID: AB3790129D478852
14 changed files with 7893 additions and 0 deletions

1
candle-creator/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

7137
candle-creator/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

34
candle-creator/Cargo.toml Normal file
View File

@ -0,0 +1,34 @@
[package]
name = "openbook-candles"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] }
sqlx = { version = "0.6", features = [ "runtime-tokio-native-tls" , "postgres" ] }
chrono = "0.4.23"
solana-client = "=1.14.13"
solana-account-decoder = "=1.14.13"
solana-transaction-status = "=1.14.13"
solana-sdk = "=1.14.13"
solana-rpc = "=1.14.13"
anchor-client = "=0.26.0"
borsh = "0.9"
async-trait = "0.1"
anyhow = "1.0"
log = "0.4"
toml = "0.5"
serde = "1.0.130"
serde_derive = "1.0.130"
serum_dex = { version = "0.5.10", git = "https://github.com/openbook-dex/program.git", default-features=false, features = ["no-entrypoint", "program"] }
anchor-lang = ">=0.25.0"

View File

@ -0,0 +1,241 @@
use chrono::Utc;
use sqlx::{
postgres::{PgPoolOptions, PgQueryResult},
Executor, Pool, Postgres,
};
use std::{time::{Duration, Instant}, collections::hash_map::DefaultHasher, hash::{Hash, Hasher}};
use tokio::sync::mpsc::{error::TryRecvError, Receiver};
use crate::{
trade_fetching::parsing::FillEventLog,
utils::{AnyhowWrap, Config},
};
pub async fn connect_to_database(config: &Config) -> anyhow::Result<Pool<Postgres>> {
// let conn_str = std::env::var("POSTGRES_CONN_STRING")
// .expect("POSTGRES_CONN_STRING environment variable must be set!");
// let config_str =
// format!("host=0.0.0.0 port=5432 password={password} user=postgres dbname=postgres");
let db_config = &config.database_config;
loop {
let pool = PgPoolOptions::new()
.max_connections(db_config.max_pg_pool_connections)
.connect(&db_config.connection_string)
.await;
if pool.is_ok() {
println!("Database connected");
return pool.map_err_anyhow();
}
println!("Failed to connect to database, retrying");
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
pub async fn setup_database(pool: &Pool<Postgres>) -> anyhow::Result<()> {
let candles_table_fut = create_candles_table(pool);
let fills_table_fut = create_fills_table(pool);
let result = tokio::try_join!(candles_table_fut, fills_table_fut);
match result {
Ok(_) => {
println!("Successfully configured database");
Ok(())
}
Err(e) => {
println!("Failed to configure database: {e}");
Err(e)
}
}
}
pub async fn create_candles_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
let mut tx = pool.begin().await.map_err_anyhow()?;
sqlx::query(
"CREATE TABLE IF NOT EXISTS candles (
id serial,
market text,
start_time timestamptz,
end_time timestamptz,
resolution text,
open numeric,
close numeric,
high numeric,
low numeric,
volume numeric,
vwap numeric
)",
)
.execute(&mut tx)
.await?;
sqlx::query(
"CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market, start_time, resolution)"
).execute(&mut tx).await?;
tx.commit().await.map_err_anyhow()
}
pub async fn create_fills_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
let mut tx = pool.begin().await.map_err_anyhow()?;
sqlx::query(
"CREATE TABLE IF NOT EXISTS fills (
id numeric PRIMARY KEY,
time timestamptz,
market text,
open_orders text,
open_orders_owner text,
bid bool,
maker bool,
native_qty_paid numeric,
native_qty_received numeric,
native_fee_or_rebate numeric,
fee_tier text,
order_id text,
client_order_id numeric,
referrer_rebate numeric
)",
)
.execute(&mut tx)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)")
.execute(&mut tx)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)")
.execute(&mut tx)
.await?;
tx.commit().await.map_err_anyhow()
}
pub async fn save_candles() {
unimplemented!("TODO");
}
pub async fn handle_fill_events(
pool: &Pool<Postgres>,
mut fill_event_receiver: Receiver<FillEventLog>,
) {
loop {
let start = Instant::now();
let mut write_batch = Vec::new();
while write_batch.len() < 10 || start.elapsed().as_secs() > 10 {
match fill_event_receiver.try_recv() {
Ok(event) => {
if !write_batch.contains(&event) {
write_batch.push(event)
}
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
panic!("sender must stay alive")
}
};
}
if write_batch.len() > 0 {
print!("writing: {:?} events to DB\n", write_batch.len());
let upsert_statement = build_fills_upsert_statement(write_batch);
sqlx::query(&upsert_statement)
.execute(pool)
.await
.map_err_anyhow()
.unwrap();
}
}
}
fn build_fills_upsert_statement(events: Vec<FillEventLog>) -> String {
let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, client_order_id, referrer_rebate) VALUES");
for (idx, event) in events.iter().enumerate() {
let mut hasher = DefaultHasher::new();
event.hash(&mut hasher);
let val_str = format!(
"({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {}, {})",
hasher.finish(),
Utc::now().to_rfc3339(),
event.market,
event.open_orders,
event.open_orders_owner,
event.bid,
event.maker,
event.native_qty_paid,
event.native_qty_received,
event.native_fee_or_rebate,
event.fee_tier,
event.order_id,
event.client_order_id.unwrap_or_else(|| 0),
event.referrer_rebate.unwrap_or_else(|| 0),
);
if idx == 0 {
stmt = format!("{} {}", &stmt, val_str);
} else {
stmt = format!("{}, {}", &stmt, val_str);
}
}
let handle_conflict = "ON CONFLICT (id) DO UPDATE SET market=excluded.market";
stmt = format!("{} {}", stmt, handle_conflict);
print!("{}", stmt);
stmt
}
// pub async fn create_markets_table() {}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use solana_sdk::pubkey::Pubkey;
use super::*;
#[test]
fn test_event_hashing() {
let event_1 = FillEventLog {
market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(),
open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(),
open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw")
.unwrap(),
bid: false,
maker: false,
native_qty_paid: 200000000,
native_qty_received: 4204317,
native_fee_or_rebate: 1683,
order_id: 387898134381964481824213,
owner_slot: 0,
fee_tier: 0,
client_order_id: None,
referrer_rebate: Some(841),
};
let event_2 = FillEventLog {
market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(),
open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(),
open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw")
.unwrap(),
bid: false,
maker: false,
native_qty_paid: 200000001,
native_qty_received: 4204317,
native_fee_or_rebate: 1683,
order_id: 387898134381964481824213,
owner_slot: 0,
fee_tier: 0,
client_order_id: None,
referrer_rebate: Some(841),
};
let mut h1 = DefaultHasher::new();
event_1.hash(&mut h1);
let mut h2 = DefaultHasher::new();
event_2.hash(&mut h2);
assert_ne!(h1.finish(), h2.finish());
}
}

View File

@ -0,0 +1,5 @@
pub mod database;
pub struct Candle {}
pub struct ParsedFill {}

View File

@ -0,0 +1,145 @@
use {
std::time::Duration,
sysinfo::SystemExt,
tokio_postgres::{tls::MakeTlsConnect, types::Type, NoTls, Socket, Statement},
};
use chrono::{NaiveDateTime, Utc};
use crate::candles::Candle;
pub struct Database {
client: tokio_postgres::Client,
insertion_statement: Statement,
pub refresh_period_ms: u64,
}
impl Database {
pub const ENTRY_SIZE: u64 = 112; // Size in bytes of a single db entry
pub const RELATIVE_CHUNK_SIZE: f64 = 0.10; // Size of a timescaledb chunk
pub async fn new(
refresh_period_ms: u64,
number_of_markets: u64,
) -> Result<Self, tokio_postgres::Error> {
let (client, connection) = connect_to_database().await;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
initialize(&client, refresh_period_ms, number_of_markets).await?;
let insertion_statement = client
.prepare(
"INSERT INTO candles VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (address, timestamp) DO UPDATE
SET close = EXCLUDED.close,
low = LEAST(EXCLUDED.low, candles.low),
high = GREATEST(EXCLUDED.high, candles.high);",
)
.await
.unwrap();
Ok(Self {
client,
insertion_statement,
refresh_period_ms,
})
}
pub async fn commit_candle(
&self,
candle: &Candle,
address: &String,
name: &String,
) -> Result<(), tokio_postgres::Error> {
self.client
.execute(
&self.insertion_statement,
&[
&chrono::DateTime::<Utc>::from_utc(
NaiveDateTime::from_timestamp(candle.ts_start, 0),
Utc,
),
address,
name,
&candle.open,
&candle.close,
&candle.high,
&candle.low,
],
)
.await?;
Ok(())
}
}
async fn connect_to_database() -> (
tokio_postgres::Client,
tokio_postgres::Connection<Socket, <tokio_postgres::NoTls as MakeTlsConnect<Socket>>::Stream>,
) {
// let password = std::env::var("POSTGRES_PASSWORD")
// .expect("POSTGRES_PASSWORD environment variable must be set!");
let password = "postgres";
let config_str =
format!("host=0.0.0.0 port=5432 password={password} user=postgres dbname=postgres");
loop {
let res = tokio_postgres::connect(&config_str, NoTls).await;
if let Ok(r) = res {
return r;
}
println!("Failed to connect to database, retrying");
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
async fn initialize(
client: &tokio_postgres::Client,
refresh_period_ms: u64,
mut number_of_markets: u64,
) -> Result<(), tokio_postgres::Error> {
number_of_markets = std::cmp::max(10, number_of_markets);
println!("=== Initializing database ===");
client
.execute(
"CREATE TABLE IF NOT EXISTS candles (
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
address VARCHAR(44),
name VARCHAR(20),
open DOUBLE PRECISION,
close DOUBLE PRECISION,
high DOUBLE PRECISION,
low DOUBLE PRECISION,
PRIMARY KEY (timestamp, address)
);",
&[],
)
.await
.unwrap();
// We convert the table to a hypertable
let o = client
.query(
"SELECT create_hypertable('candles', 'timestamp', if_not_exists => TRUE);",
&[],
)
.await
.unwrap();
println!("Output from create_hypertable");
println!("{o:?}");
// Implements the best practice detailed here
// https://docs.timescale.com/timescaledb/latest/how-to-guides/hypertables/best-practices/#time-intervals
let system_memory_kb = sysinfo::System::new_all().total_memory();
let chunk_size_ms =
refresh_period_ms * system_memory_kb * 1024 / Database::ENTRY_SIZE / number_of_markets;
let chunk_size_ms = (chunk_size_ms as f64) * Database::RELATIVE_CHUNK_SIZE;
let s = client
.prepare_typed(
"SELECT set_chunk_time_interval('candles', $1);",
&[Type::INT8],
)
.await
.unwrap();
let o = client.query(&s, &[&(chunk_size_ms as i64)]).await?;
println!("Output from set_chunk_time_interval");
println!("{o:?}");
Ok(())
}

View File

@ -0,0 +1,42 @@
use crate::{trade_fetching::parsing::FillEventLog, utils::Config};
use database::database::{connect_to_database, setup_database};
use std::{fs::File, io::Read};
use tokio::sync::mpsc;
mod database;
mod trade_fetching;
mod utils;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let config: Config = {
let mut file = File::open("./example-config.toml")?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
toml::from_str(&contents).unwrap()
};
println!("{:?}", config);
let pool = connect_to_database(&config).await?;
setup_database(&pool).await?;
let (fill_event_sender, mut fill_event_receiver) = mpsc::channel::<FillEventLog>(1000);
// spawn a thread for each market?
// what are the memory implications?
tokio::spawn(async move {
trade_fetching::scrape::scrape(&config, fill_event_sender).await;
});
database::database::handle_fill_events(&pool, fill_event_receiver).await;
// trade_fetching::websocket::listen_logs().await?;
Ok(())
}
// use getconfirmedsignaturesforaddres2 to scan txns
// find filleventlog events
// parse trade data
// persist the last 3 months on differnet timescales

View File

@ -0,0 +1,3 @@
pub mod parsing;
pub mod scrape;
pub mod websocket;

View File

@ -0,0 +1,88 @@
use solana_client::client_error::Result as ClientResult;
use solana_transaction_status::{
option_serializer::OptionSerializer, EncodedConfirmedTransactionWithStatusMeta,
};
use std::io::Error;
use anchor_lang::{event, AnchorDeserialize, AnchorSerialize};
use solana_sdk::pubkey::Pubkey;
const PROGRAM_DATA: &str = "Program data: ";
#[event]
#[derive(Debug, Clone, PartialEq, Hash)]
pub struct FillEventLog {
pub market: Pubkey,
pub open_orders: Pubkey,
pub open_orders_owner: Pubkey,
pub bid: bool,
pub maker: bool,
pub native_qty_paid: u64,
pub native_qty_received: u64,
pub native_fee_or_rebate: u64,
pub order_id: u128,
pub owner_slot: u8,
pub fee_tier: u8,
pub client_order_id: Option<u64>,
pub referrer_rebate: Option<u64>,
}
pub fn parse_fill_events_from_txns(
txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>,
) -> Vec<FillEventLog> {
let mut fills_vector = Vec::<FillEventLog>::new();
for txn in txns.iter_mut() {
// println!("{:#?}\n", txn.as_ref());
// fugly
match txn {
Ok(t) => {
if let Some(m) = &t.transaction.meta {
// println!("{:#?}\n", m.log_messages);
match &m.log_messages {
OptionSerializer::Some(logs) => match parse_fill_events_from_logs(logs) {
Some(mut events) => fills_vector.append(&mut events),
None => {}
},
OptionSerializer::None => {}
OptionSerializer::Skip => {}
}
}
}
Err(_) => {} //println!("goo: {:?}", e),
}
}
return fills_vector;
}
fn parse_fill_events_from_logs(logs: &Vec<String>) -> Option<Vec<FillEventLog>> {
let mut fills_vector = Vec::<FillEventLog>::new();
for l in logs {
match l.strip_prefix(PROGRAM_DATA) {
Some(log) => {
let borsh_bytes = match anchor_lang::__private::base64::decode(log) {
Ok(borsh_bytes) => borsh_bytes,
_ => continue,
};
let mut slice: &[u8] = &borsh_bytes[8..];
let event: Result<FillEventLog, Error> =
anchor_lang::AnchorDeserialize::deserialize(&mut slice);
match event {
Ok(e) => {
fills_vector.push(e);
}
_ => continue,
}
}
_ => (),
}
}
if fills_vector.len() > 0 {
return Some(fills_vector);
} else {
return None;
}
}

View File

@ -0,0 +1,76 @@
use anyhow::Result;
use solana_client::{
client_error::Result as ClientResult,
rpc_client::{GetConfirmedSignaturesForAddress2Config, RpcClient},
rpc_config::RpcTransactionConfig,
};
use solana_sdk::{
commitment_config::CommitmentConfig,
pubkey::Pubkey,
signature::{Keypair, Signature},
};
use solana_transaction_status::{
option_serializer::OptionSerializer, EncodedConfirmedTransactionWithStatusMeta,
UiTransactionEncoding,
};
use sqlx::{Pool, Postgres};
use std::{str::FromStr, time::Duration};
use tokio::sync::mpsc::Sender;
use crate::utils::Config;
use super::parsing::{parse_fill_events_from_txns, FillEventLog};
pub async fn scrape(config: &Config, fill_event_sender: Sender<FillEventLog>) {
let url = &config.rpc_http_url;
let rpc_client = RpcClient::new_with_commitment(url, CommitmentConfig::processed());
let openbook_key = Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap();
// let start_slot = ; //set the start point at 3 months from now (config above)
loop {
let config = GetConfirmedSignaturesForAddress2Config {
before: None,
until: None,
limit: Some(150), // TODO: None
commitment: Some(CommitmentConfig::confirmed()),
};
let mut sigs = rpc_client
.get_signatures_for_address_with_config(&openbook_key, config)
.unwrap();
sigs.retain(|sig| sig.err.is_none());
let txn_config = RpcTransactionConfig {
encoding: Some(UiTransactionEncoding::Json),
commitment: Some(CommitmentConfig::confirmed()),
max_supported_transaction_version: Some(0),
};
let mut txns = sigs
.into_iter()
.map(|sig| {
rpc_client.get_transaction_with_config(
&sig.signature.parse::<Signature>().unwrap(),
txn_config,
)
})
.collect::<Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>>(); // TODO: am I actually getting all the txns?
let fill_events = parse_fill_events_from_txns(&mut txns);
if fill_events.len() > 0 {
for event in fill_events.into_iter() {
if let Err(_) = fill_event_sender.send(event).await {
println!("receiver dropped");
return;
}
}
}
print!("Ding fires are done \n\n");
tokio::time::sleep(Duration::from_millis(500)).await;
// increment slot somehow (or move forward in time or something)
}
}

View File

@ -0,0 +1,85 @@
use jsonrpc_core_client::transports::ws;
use anchor_client::{
anchor_lang::{self, event, AnchorDeserialize, AnchorSerialize, Discriminator},
ClientError as AnchorClientError, Cluster,
};
use log::*;
use solana_account_decoder::UiAccountEncoding;
use solana_client::{
pubsub_client::{PubsubClient, PubsubClientSubscription},
rpc_config::{
RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcTransactionLogsConfig,
RpcTransactionLogsFilter,
},
rpc_response::{Response, RpcKeyedAccount, RpcLogsResponse},
};
use solana_rpc::rpc_pubsub::RpcSolPubSubClient;
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Keypair};
use std::{io::Error, rc::Rc, str::FromStr, time::Duration};
use crate::utils::AnyhowWrap;
use crate::{
database::database::{connect_to_database, setup_database},
utils::Config,
};
// use super::parsing::parse_and_save_logs;
// const PROGRAM_LOG: &str = "Program log: ";
// pub async fn listen_logs(config: Config) -> anyhow::Result<()> {
// let ws_url = config.rpc_ws_url;
// let transaction_logs_config = RpcTransactionLogsConfig {
// commitment: Some(CommitmentConfig::confirmed()),
// };
// let transaction_logs_filter = RpcTransactionLogsFilter::Mentions(vec![String::from(
// "srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX",
// )]);
// let (_log_sub, log_receiver) =
// PubsubClient::logs_subscribe(&ws_url, transaction_logs_filter, transaction_logs_config)?;
// loop {
// let response = log_receiver.recv().map_err_anyhow()?; // TODO: what to do if disconnects
// if response.value.err.is_none() {
// parse_and_save_logs(&response.value.logs);
// }
// }
// }
pub async fn listen_program_accounts() {
// let payer = Rc::new(Keypair::new());
// let connect = ws::try_connect::<RpcSolPubSubClient>(&ws_url).map_err_anyhow()?;
// let client = connect.await.map_err_anyhow()?;
// let openbook_key = Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap();
// let cluster = Cluster::Custom(rpc_url.to_string(), ws_url.to_string());
// let client = AnchorClient::new_with_options(cluster, payer, CommitmentConfig::confirmed());
// let dex_program = client.program(openbook_key);
// let account_info_config = RpcAccountInfoConfig {
// encoding: Some(UiAccountEncoding::Base64),
// data_slice: None,
// commitment: Some(CommitmentConfig::processed()),
// min_context_slot: None,
// };
// let program_accounts_config = RpcProgramAccountsConfig {
// filters: None, // TODO: add filters for markets we care about
// with_context: Some(true),
// account_config: account_info_config.clone(),
// };
// let (_program_sub, prog_receiver) = PubsubClient::program_subscribe(
// &ws_url,
// &openbook_key,
// Some(program_accounts_config)
// )?;
}

View File

@ -0,0 +1,33 @@
use serde_derive::Deserialize;
pub trait AnyhowWrap {
type Value;
fn map_err_anyhow(self) -> anyhow::Result<Self::Value>;
}
impl<T, E: std::fmt::Debug> AnyhowWrap for Result<T, E> {
type Value = T;
fn map_err_anyhow(self) -> anyhow::Result<Self::Value> {
self.map_err(|err| anyhow::anyhow!("{:?}", err))
}
}
#[derive(Clone, Debug, Deserialize)]
pub struct Config {
pub rpc_ws_url: String,
pub rpc_http_url: String,
pub database_config: DatabaseConfig,
pub markets: Vec<MarketConfig>,
}
#[derive(Clone, Debug, Deserialize)]
pub struct DatabaseConfig {
pub connection_string: String,
pub max_pg_pool_connections: u32,
}
#[derive(Clone, Debug, Deserialize)]
pub struct MarketConfig {
pub name: String,
pub market: String,
}

3
server/src/main.rs Normal file
View File

@ -0,0 +1,3 @@
fn main() {
println!("Hello, world!");
}