diff --git a/Cargo.lock b/Cargo.lock index e81682a..2062b51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5445,6 +5445,7 @@ dependencies = [ "solana-version", "thiserror", "tokio", + "toml", "tonic 0.6.2", "tonic-build 0.6.2", "warp", diff --git a/Cargo.toml b/Cargo.toml index f1e1d6b..e9aece3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ serde_derive = "1.0.103" serde_json = "1.0.79" serde_yaml = "0.8.23" -mango = { git = "https://github.com/blockworks-foundation/mango-v3.git", branch = "mango_bencher_compatible", default-features = false, features = ["no-entrypoint"] } +mango = { git = "https://github.com/blockworks-foundation/mango-v3.git", branch = "mango_bencher_compatible", default-features = false } mango-common = { git = "https://github.com/blockworks-foundation/mango-v3.git", branch = "mango_bencher_compatible" } solana-client = { git = "https://github.com/solana-labs/solana.git", branch="v1.15" } @@ -42,9 +42,12 @@ solana-transaction-status = { git = "https://github.com/solana-labs/solana.git", solana-quic-client = { git = "https://github.com/solana-labs/solana.git", branch="v1.15" } solana-account-decoder = { git = "https://github.com/solana-labs/solana.git", branch="v1.15" } +# pin program to mango-v3 version of solana sdk +# now we can use sdk for recent version and program for legacy +# we have a bunch of helpers to convert between the two explicitly +solana-program = "1.9.17" thiserror = "1.0" -solana-program = ">=1.9.0" csv = "1.0.0" tonic = { version = "0.6", features = ["tls", "compression"] } tokio = { version = "1", features = ["full"] } @@ -58,6 +61,8 @@ jsonrpc-core = "18.0.0" jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] } arrayref = "*" bytemuck = "1.7.2" +toml = "*" + [build-dependencies] tonic-build = { version = "0.6", features = ["compression"] } diff --git a/src/account_write_filter.rs b/src/account_write_filter.rs index 801d54e..d61ed04 100644 --- a/src/account_write_filter.rs +++ b/src/account_write_filter.rs @@ -1,134 +1,134 @@ use crate::{ - chain_data::{AccountData, AccountWrite, SlotUpdate, ChainData, SlotData}, - metrics::Metrics, + chain_data::{AccountData, AccountWrite, ChainData, SlotData, SlotUpdate}, + metrics::Metrics, }; use async_trait::async_trait; -use solana_sdk::{account::WritableAccount, stake_history::Epoch, pubkey::Pubkey}; +use solana_sdk::{account::WritableAccount, pubkey::Pubkey, stake_history::Epoch}; use std::{ - collections::{BTreeSet, HashMap}, - sync::Arc, - time::{Duration, Instant}, + collections::{BTreeSet, HashMap}, + sync::Arc, + time::{Duration, Instant}, }; #[async_trait] pub trait AccountWriteSink { - async fn process(&self, pubkey: &Pubkey, account: &AccountData) -> Result<(), String>; + async fn process(&self, pubkey: &Pubkey, account: &AccountData) -> Result<(), String>; } #[derive(Clone)] pub struct AccountWriteRoute { - pub matched_pubkeys: Vec, - pub sink: Arc, - pub timeout_interval: Duration, + pub matched_pubkeys: Vec, + pub sink: Arc, + pub timeout_interval: Duration, } #[derive(Clone, Debug)] struct AcountWriteRecord { - slot: u64, - write_version: u64, - timestamp: Instant, + slot: u64, + write_version: u64, + timestamp: Instant, } pub fn init( - routes: Vec, - metrics_sender: Metrics, + routes: Vec, + metrics_sender: Metrics, ) -> anyhow::Result<( - async_channel::Sender, - async_channel::Sender, + async_channel::Sender, + async_channel::Sender, )> { - // The actual message may want to also contain a retry count, if it self-reinserts on failure? - let (account_write_queue_sender, account_write_queue_receiver) = - async_channel::unbounded::(); + // The actual message may want to also contain a retry count, if it self-reinserts on failure? + let (account_write_queue_sender, account_write_queue_receiver) = + async_channel::unbounded::(); - // Slot updates flowing from the outside into the single processing thread. From - // there they'll flow into the postgres sending thread. - let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::(); + // Slot updates flowing from the outside into the single processing thread. From + // there they'll flow into the postgres sending thread. + let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::(); - let mut chain_data = ChainData::new(metrics_sender); - let mut last_updated = HashMap::::new(); + let mut chain_data = ChainData::new(metrics_sender); + let mut last_updated = HashMap::::new(); - let all_queue_pks: BTreeSet = routes - .iter() - .flat_map(|r| r.matched_pubkeys.iter()) - .map(|pk| pk.clone()) - .collect(); + let all_queue_pks: BTreeSet = routes + .iter() + .flat_map(|r| r.matched_pubkeys.iter()) + .map(|pk| pk.clone()) + .collect(); - // update handling thread, reads both sloths and account updates - tokio::spawn(async move { - loop { - tokio::select! { - Ok(account_write) = account_write_queue_receiver.recv() => { - if all_queue_pks.contains(&account_write.pubkey) { - continue; - } + // update handling thread, reads both sloths and account updates + tokio::spawn(async move { + loop { + tokio::select! { + Ok(account_write) = account_write_queue_receiver.recv() => { + if all_queue_pks.contains(&account_write.pubkey) { + continue; + } - chain_data.update_account( - account_write.pubkey, - AccountData { - slot: account_write.slot, - write_version: account_write.write_version, - account: WritableAccount::create( - account_write.lamports, - account_write.data.clone(), - account_write.owner, - account_write.executable, - account_write.rent_epoch as Epoch, - ), - }, - ); - } - Ok(slot_update) = slot_queue_receiver.recv() => { - chain_data.update_slot(SlotData { - slot: slot_update.slot, - parent: slot_update.parent, - status: slot_update.status, - chain: 0, - }); + chain_data.update_account( + account_write.pubkey, + AccountData { + slot: account_write.slot, + write_version: account_write.write_version, + account: WritableAccount::create( + account_write.lamports, + account_write.data.clone(), + account_write.owner, + account_write.executable, + account_write.rent_epoch as Epoch, + ), + }, + ); + } + Ok(slot_update) = slot_queue_receiver.recv() => { + chain_data.update_slot(SlotData { + slot: slot_update.slot, + parent: slot_update.parent, + status: slot_update.status, + chain: 0, + }); - } - } + } + } - for route in routes.iter() { - for pk in route.matched_pubkeys.iter() { - match chain_data.account(&pk) { - Ok(account_info) => { - let pk_b58 = pk.to_string(); - if let Some(record) = last_updated.get(&pk_b58) { - let is_unchanged = account_info.slot == record.slot - && account_info.write_version == record.write_version; - let is_throttled = - record.timestamp.elapsed() < route.timeout_interval; - if is_unchanged || is_throttled { - continue; - } - }; + for route in routes.iter() { + for pk in route.matched_pubkeys.iter() { + match chain_data.account(&pk) { + Ok(account_info) => { + let pk_b58 = pk.to_string(); + if let Some(record) = last_updated.get(&pk_b58) { + let is_unchanged = account_info.slot == record.slot + && account_info.write_version == record.write_version; + let is_throttled = + record.timestamp.elapsed() < route.timeout_interval; + if is_unchanged || is_throttled { + continue; + } + }; - match route.sink.process(pk, account_info).await { - Ok(()) => { - // todo: metrics - last_updated.insert( - pk_b58.clone(), - AcountWriteRecord { - slot: account_info.slot, - write_version: account_info.write_version, - timestamp: Instant::now(), - }, - ); - } - Err(_skip_reason) => { - // todo: metrics - } - } - } - Err(_) => { - // todo: metrics - } - } - } - } - } - }); + match route.sink.process(pk, account_info).await { + Ok(()) => { + // todo: metrics + last_updated.insert( + pk_b58.clone(), + AcountWriteRecord { + slot: account_info.slot, + write_version: account_info.write_version, + timestamp: Instant::now(), + }, + ); + } + Err(_skip_reason) => { + // todo: metrics + } + } + } + Err(_) => { + // todo: metrics + } + } + } + } + } + }); - Ok((account_write_queue_sender, slot_queue_sender)) + Ok((account_write_queue_sender, slot_queue_sender)) } diff --git a/src/chain_data.rs b/src/chain_data.rs index 25dd811..ebf1ece 100644 --- a/src/chain_data.rs +++ b/src/chain_data.rs @@ -1,8 +1,7 @@ use crate::metrics::{MetricType, MetricU64, Metrics}; use { - solana_sdk::account::{Account, AccountSharedData, ReadableAccount}, - solana_sdk::pubkey::Pubkey, + solana_sdk::{pubkey::Pubkey, account::{Account, AccountSharedData, ReadableAccount}}, std::collections::HashMap, }; @@ -28,7 +27,6 @@ pub struct AccountData { pub account: AccountSharedData, } - #[derive(Clone, PartialEq, Debug)] pub struct AccountWrite { pub pubkey: Pubkey, @@ -65,7 +63,6 @@ pub struct SlotUpdate { pub status: SlotStatus, } - /// Track slots and account writes /// /// - use account() to retrieve the current best data for an account. diff --git a/src/confirmation_strategies.rs b/src/confirmation_strategies.rs index 8aabbb8..5fc863a 100644 --- a/src/confirmation_strategies.rs +++ b/src/confirmation_strategies.rs @@ -14,9 +14,9 @@ use chrono::Utc; use crossbeam_channel::{Receiver, TryRecvError}; use log::{debug, error, info, trace}; use solana_client::{rpc_client::RpcClient, rpc_config::RpcBlockConfig}; -use solana_program::pubkey::Pubkey; use solana_sdk::{ commitment_config::{CommitmentConfig, CommitmentLevel}, + pubkey::Pubkey, signature::Signature, }; use solana_transaction_status::RewardType; diff --git a/src/crank.rs b/src/crank.rs index 57afe43..962db74 100644 --- a/src/crank.rs +++ b/src/crank.rs @@ -1,73 +1,110 @@ -use std::{thread::{JoinHandle, Builder}, sync::{Arc, RwLock}, str::FromStr, time::Duration}; +use std::{ + fs::File, + io::Read, + str::FromStr, + sync::{Arc, RwLock}, + thread::{Builder, JoinHandle}, + time::Duration, +}; // use solana_client::rpc_client::RpcClient; -use solana_sdk::{pubkey::Pubkey, signature::Keypair, instruction::Instruction}; +use crate::{ + account_write_filter::{self, AccountWriteRoute}, + grpc_plugin_source::{self, FilterConfig, SourceConfig}, + mango::GroupConfig, + mango_v3_perp_crank_sink::MangoV3PerpCrankSink, + metrics, blockhash_poller, transaction_sender, +}; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_sdk::{instruction::Instruction, pubkey::Pubkey, signature::Keypair}; -use crate::{mango::GroupConfig, account_write_filter::AccountWriteRoute, mango_v3_perp_crank_sink::MangoV3PerpCrankSink}; +fn start_crank_thread(identity: Keypair, group: GroupConfig) -> JoinHandle<()> { + let perp_queue_pks: Vec<_> = group + .perp_markets + .iter() + .map(|m| { + ( + Pubkey::from_str(&m.public_key).unwrap(), + Pubkey::from_str(&m.events_key).unwrap(), + ) + }) + .collect(); + let group_pk = Pubkey::from_str(&group.public_key).unwrap(); + let cache_pk = Pubkey::from_str(&group.cache_pk).unwrap(); + let mango_program_id = Pubkey::from_str(&group.mango_program_id).unwrap(); + let filter_config = FilterConfig { + program_ids: vec![], + account_ids: group.perp_markets.iter().map(|m| m.events_key.clone()).collect(), + }; + return Builder::new() + .name("crank".to_string()) + .spawn(move || { + let config: SourceConfig = { + let mut file = File::open("source.toml").expect("source.toml file in cwd"); + let mut contents = String::new(); + file.read_to_string(&mut contents) + .expect("source.toml to contain data"); + toml::from_str(&contents).unwrap() + }; + let metrics_tx = metrics::start( + metrics::MetricsConfig { + output_stdout: true, + output_http: false, + }, + "crank".into(), + ); + let rpc_client = Arc::new(RpcClient::new("".into())); -// pub async fn send_tx_loop( -// ixs_rx: async_channel::Receiver>, -// blockhash: Arc>, -// client: Arc, -// keypair: Keypair, -// ) { -// info!("signing with keypair pk={:?}", keypair.pubkey()); -// let cfg = RpcSendTransactionConfig { -// skip_preflight: true, -// ..RpcSendTransactionConfig::default() -// }; -// loop { -// if let Ok(ixs) = ixs_rx.recv().await { -// // TODO add priority fee -// let tx = Transaction::new_signed_with_payer( -// &ixs, -// Some(&keypair.pubkey()), -// &[&keypair], -// *blockhash.read().unwrap(), -// ); -// // TODO: collect metrics -// info!("send tx={:?} ok={:?}", tx.signatures[0], client.send_transaction_with_config(&tx, cfg).await); -// } -// } -// } + // TODO await future + let blockhash = blockhash_poller::init(rpc_client.clone()); + // Event queue updates can be consumed by client connections + let (instruction_sender, instruction_receiver) = + async_channel::unbounded::>(); -fn start_crank_thread( - identity: Keypair, - group: GroupConfig -) -> JoinHandle<()> { + transaction_sender::init( + instruction_receiver, + blockhash, + rpc_client, + identity + ); + let routes = vec![AccountWriteRoute { + matched_pubkeys: perp_queue_pks + .iter() + .map(|(_, evq_pk)| evq_pk.clone()) + .collect(), + sink: Arc::new(MangoV3PerpCrankSink::new( + perp_queue_pks, + group_pk, + cache_pk, + mango_program_id, + instruction_sender.clone(), + )), + timeout_interval: Duration::default(), + }]; - let perp_queue_pks: Vec<_> = group.perp_markets.iter().map(|m| (Pubkey::from_str(&m.public_key).unwrap(), Pubkey::from_str(&m.events_key).unwrap())).collect(); - let group_pk = Pubkey::from_str(&group.public_key).unwrap(); + let (account_write_queue_sender, slot_queue_sender) = + account_write_filter::init(routes, metrics_tx.clone()).expect("filter initializes"); - return Builder::new() - .name("crank".to_string()) - .spawn(move || { + // TODO figure out how to start tokio stuff here + grpc_plugin_source::process_events( + &config, + &filter_config, + account_write_queue_sender, + slot_queue_sender, + metrics_tx.clone(), + ); + // .await; + }) + .expect("launch crank thread"); - - - // Event queue updates can be consumed by client connections - let (instruction_sender, instruction_receiver) = async_channel::unbounded::>(); - - - - let routes = vec![ - AccountWriteRoute { - matched_pubkeys: perp_queue_pks - .iter() - .map(|(_, evq_pk)| evq_pk.clone()) - .collect(), - sink: Arc::new(MangoV3PerpCrankSink::new( - perp_queue_pks, - group_pk, - instruction_sender.clone(), - )), - timeout_interval: Duration::default(), - }]; - - }).expect("launch crank thread") - -} + // TODO also implement websocket handler + // websocket_source::process_events( + // &config.source, + // account_write_queue_sender, + // slot_queue_sender, + // ) +} diff --git a/src/lib.rs b/src/lib.rs index 8bbe3b2..97d122b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,7 @@ pub mod market_markers; pub mod metrics; pub mod rotating_queue; pub mod states; +pub mod transaction_sender; trait AnyhowWrap { type Value; diff --git a/src/mango.rs b/src/mango.rs index 167df81..ad2eb33 100644 --- a/src/mango.rs +++ b/src/mango.rs @@ -26,6 +26,7 @@ pub struct MangoConfig { pub struct GroupConfig { pub name: String, pub public_key: String, + pub cache_pk: String, pub mango_program_id: String, pub serum_program_id: String, pub oracles: Vec, diff --git a/src/mango_v3_perp_crank_sink.rs b/src/mango_v3_perp_crank_sink.rs index 4f6255b..83f3cdc 100644 --- a/src/mango_v3_perp_crank_sink.rs +++ b/src/mango_v3_perp_crank_sink.rs @@ -12,15 +12,14 @@ use mango::{ instruction::consume_events, queue::{AnyEvent, EventQueueHeader, EventType, FillEvent, OutEvent, Queue}, }; +use solana_sdk::{pubkey::Pubkey, instruction::Instruction}; use solana_sdk::{ account::ReadableAccount, - instruction::{Instruction}, - pubkey::Pubkey, }; use bytemuck::cast_ref; -use crate::{account_write_filter::AccountWriteSink, chain_data::AccountData}; +use crate::{account_write_filter::AccountWriteSink, chain_data::AccountData, helpers::{to_sdk_instruction, to_sp_pk}}; const MAX_BACKLOG: usize = 2; const MAX_EVENTS_PER_TX: usize = 10; @@ -108,16 +107,16 @@ impl AccountWriteSink for MangoV3PerpCrankSink { .get(pk) .expect(&format!("{pk:?} is a known public key")); - let ix = consume_events( - &self.mango_v3_program, - &self.group_pk, - &self.cache_pk, - mkt_pk, - pk, + let ix = to_sdk_instruction(consume_events( + &to_sp_pk(&self.mango_v3_program), + &to_sp_pk(&self.group_pk), + &to_sp_pk(&self.cache_pk), + &to_sp_pk(mkt_pk), + &to_sp_pk(pk), &mut mango_accounts, MAX_EVENTS_PER_TX, ) - .unwrap(); + .unwrap()); Ok(ix) }; diff --git a/src/transaction_sender.rs b/src/transaction_sender.rs new file mode 100644 index 0000000..bc84aa3 --- /dev/null +++ b/src/transaction_sender.rs @@ -0,0 +1,44 @@ +use log::*; +use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcSendTransactionConfig}; +use solana_sdk::{ + hash::Hash, instruction::Instruction, signature::Keypair, signature::Signer, + transaction::Transaction, +}; +use std::sync::{Arc, RwLock}; +use tokio::spawn; + +pub async fn send_loop( + ixs_rx: async_channel::Receiver>, + blockhash: Arc>, + client: Arc, + keypair: Keypair, +) { + info!("signing with keypair pk={:?}", keypair.pubkey()); + let cfg = RpcSendTransactionConfig { + skip_preflight: true, + ..RpcSendTransactionConfig::default() + }; + loop { + if let Ok(ixs) = ixs_rx.recv().await { + // TODO add priority fee + let tx = Transaction::new_signed_with_payer( + &ixs, + Some(&keypair.pubkey()), + &[&keypair], + *blockhash.read().unwrap(), + ); + // TODO: collect metrics + info!("send tx={:?} ok={:?}", tx.signatures[0], + client.send_transaction_with_config(&tx, cfg).await); + } + } +} + +pub fn init( + ixs_rx: async_channel::Receiver>, + blockhash: Arc>, + client: Arc, + keypair: Keypair, +) { + spawn(async move { send_loop(ixs_rx, blockhash, client, keypair).await }); +}