From e22d30b9fda515688ef2a1f0185e42dc35640075 Mon Sep 17 00:00:00 2001 From: Maximilian Schneider Date: Fri, 3 Feb 2023 19:45:22 +0900 Subject: [PATCH] add blockhash poller & transaction sender --- Cargo.lock | 1 + service-mango-crank/Cargo.toml | 1 + service-mango-crank/src/blockhash_poller.rs | 44 +++++ service-mango-crank/src/event_queue_filter.rs | 166 ----------------- service-mango-crank/src/main.rs | 56 ++---- .../src/transaction_builder.rs | 176 ++++++++++++++++++ service-mango-crank/src/transaction_sender.rs | 41 ++++ 7 files changed, 282 insertions(+), 203 deletions(-) create mode 100644 service-mango-crank/src/blockhash_poller.rs delete mode 100644 service-mango-crank/src/event_queue_filter.rs create mode 100644 service-mango-crank/src/transaction_builder.rs create mode 100644 service-mango-crank/src/transaction_sender.rs diff --git a/Cargo.lock b/Cargo.lock index 9dd084f..a96bdc1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5203,6 +5203,7 @@ dependencies = [ "serde_derive", "serde_json", "serum_dex 0.5.10 (git+https://github.com/openbook-dex/program)", + "solana-client", "solana-geyser-connector-lib", "solana-logger", "solana-sdk", diff --git a/service-mango-crank/Cargo.toml b/service-mango-crank/Cargo.toml index 2fc61be..948179f 100644 --- a/service-mango-crank/Cargo.toml +++ b/service-mango-crank/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" [dependencies] solana-geyser-connector-lib = { path = "../lib" } +solana-client = "1" solana-logger = "1" solana-sdk = "1" bs58 = "*" diff --git a/service-mango-crank/src/blockhash_poller.rs b/service-mango-crank/src/blockhash_poller.rs new file mode 100644 index 0000000..0a29d4d --- /dev/null +++ b/service-mango-crank/src/blockhash_poller.rs @@ -0,0 +1,44 @@ +use log::*; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_sdk::{clock::DEFAULT_MS_PER_SLOT, hash::Hash}; +use std::{ + sync::{Arc, RwLock}, + time::Duration, +}; +use tokio::{spawn, time::sleep}; + +const RETRY_INTERVAL: Duration = Duration::from_millis(5 * DEFAULT_MS_PER_SLOT); + +pub async fn poll_loop(blockhash: Arc>, client: Arc) { + loop { + let old_blockhash = *blockhash.read().unwrap(); + if let Ok(new_blockhash) = client.get_latest_blockhash().await { + if new_blockhash != old_blockhash { + debug!("new blockhash ({:?})", blockhash); + *blockhash.write().unwrap() = new_blockhash; + } + } + + // Retry every few slots + sleep(RETRY_INTERVAL).await; + } +} + +pub async fn init(client: Arc) -> Arc> { + // get the first blockhash + let blockhash = Arc::new(RwLock::new( + client + .get_latest_blockhash() + .await + .expect("fetch initial blockhash"), + )); + + // launch task + let join_hdl = { + // create a thread-local reference to blockhash + let blockhash_c = blockhash.clone(); + spawn(async move { poll_loop(blockhash_c, client) }) + }; + + return blockhash; +} diff --git a/service-mango-crank/src/event_queue_filter.rs b/service-mango-crank/src/event_queue_filter.rs deleted file mode 100644 index 9e4607e..0000000 --- a/service-mango-crank/src/event_queue_filter.rs +++ /dev/null @@ -1,166 +0,0 @@ - -use crate::Pubkey; -use solana_geyser_connector_lib::{AccountWrite, metrics::Metrics, SlotUpdate, chain_data::{ChainData, AccountData, SlotData}, serum::SerumEventQueueHeader}; - -use solana_sdk::{account::{WritableAccount, ReadableAccount}, stake_history::Epoch}; -use std::{borrow::BorrowMut, collections::{HashMap, HashSet}}; -use log::*; -use anchor_lang::AccountDeserialize; - -pub enum EventQueueFilterMessage {} - -pub async fn init( - perp_queue_pks: Vec<(Pubkey, Pubkey)>, - serum_queue_pks: Vec<(Pubkey, Pubkey)>, - metrics_sender: Metrics, -) -> anyhow::Result<( - async_channel::Sender, - async_channel::Sender, - async_channel::Receiver, -)> { - let metrics_sender = metrics_sender.clone(); - - // The actual message may want to also contain a retry count, if it self-reinserts on failure? - let (account_write_queue_sender, account_write_queue_receiver) = - async_channel::unbounded::(); - - // Slot updates flowing from the outside into the single processing thread. From - // there they'll flow into the postgres sending thread. - let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::(); - - // Event queue updates can be consumed by client connections - let (filter_update_sender, filter_update_receiver) = - async_channel::unbounded::(); - - - let mut chain_cache = ChainData::new(metrics_sender); - let mut perp_events_cache = HashMap::::new(); - let mut serum_events_cache = HashMap::>::new(); - let mut seq_num_cache = HashMap::::new(); - let mut last_evq_versions = HashMap::::new(); - - - let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat(); - let relevant_pubkeys = all_queue_pks - .iter() - .map(|m| m.1) - .collect::>(); - - // update handling thread, reads both sloths and account updates - tokio::spawn(async move { - loop { - tokio::select! { - Ok(account_write) = account_write_queue_receiver.recv() => { - if !relevant_pubkeys.contains(&account_write.pubkey) { - continue; - } - - chain_cache.update_account( - account_write.pubkey, - AccountData { - slot: account_write.slot, - write_version: account_write.write_version, - account: WritableAccount::create( - account_write.lamports, - account_write.data.clone(), - account_write.owner, - account_write.executable, - account_write.rent_epoch as Epoch, - ), - }, - ); - } - Ok(slot_update) = slot_queue_receiver.recv() => { - chain_cache.update_slot(SlotData { - slot: slot_update.slot, - parent: slot_update.parent, - status: slot_update.status, - chain: 0, - }); - - } - } - - for mkt in all_queue_pks.iter() { - let last_evq_version = last_evq_versions.get(&mkt.1.to_string()).unwrap_or(&(0, 0)); - let mkt_pk = mkt.1; - - match chain_cache.account(&mkt_pk) { - Ok(account_info) => { - // only process if the account state changed - let evq_version = (account_info.slot, account_info.write_version); - let evq_pk_string = mkt.1.to_string(); - trace!("evq {} write_version {:?}", evq_pk_string, evq_version); - if evq_version == *last_evq_version { - continue; - } - last_evq_versions.insert(evq_pk_string.clone(), evq_version); - - let account = &account_info.account; - let is_perp = mango_v4::check_id(account.owner()); - if is_perp { - let event_queue = - mango_v4::state::EventQueue::try_deserialize(account.data().borrow_mut()).unwrap(); - trace!( - "evq {} seq_num {}", - evq_pk_string, - event_queue.header.seq_num - ); - match seq_num_cache.get(&evq_pk_string) { - Some(old_seq_num) => match perp_events_cache.get(&evq_pk_string) { - Some(old_events) => { - }, - _ => { - info!("perp_events_cache could not find {}", evq_pk_string) - } - }, - _ => info!("seq_num_cache could not find {}", evq_pk_string), - } - - seq_num_cache - .insert(evq_pk_string.clone(), event_queue.header.seq_num.clone()); - perp_events_cache - .insert(evq_pk_string.clone(), event_queue.buf.clone()); - } else { - let inner_data = &account.data()[5..&account.data().len() - 7]; - let header_span = std::mem::size_of::(); - let header: SerumEventQueueHeader = - *bytemuck::from_bytes(&inner_data[..header_span]); - let seq_num = header.seq_num; - let count = header.count; - let rest = &inner_data[header_span..]; - let slop = rest.len() % std::mem::size_of::(); - let new_len = rest.len() - slop; - let events = &rest[..new_len]; - debug!("evq {} header_span {} header_seq_num {} header_count {} inner_len {} events_len {} sizeof Event {}", evq_pk_string, header_span, seq_num, count, inner_data.len(), events.len(), std::mem::size_of::()); - let events: &[serum_dex::state::Event] = bytemuck::cast_slice(&events); - - match seq_num_cache.get(&evq_pk_string) { - Some(old_seq_num) => match serum_events_cache.get(&evq_pk_string) { - Some(old_events) => { - - }, - _ => { - info!("serum_events_cache could not find {}", evq_pk_string) - } - }, - _ => info!("seq_num_cache could not find {}", evq_pk_string), - } - - seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone()); - serum_events_cache - .insert(evq_pk_string.clone(), events.clone().to_vec()); - } - } - Err(_) => info!("chain_cache could not find {}", mkt.1), - } - } - } - }); - - Ok(( - account_write_queue_sender, - slot_queue_sender, - filter_update_receiver, - )) -} diff --git a/service-mango-crank/src/main.rs b/service-mango-crank/src/main.rs index 3bc4321..ffccb80 100644 --- a/service-mango-crank/src/main.rs +++ b/service-mango-crank/src/main.rs @@ -1,4 +1,6 @@ -mod event_queue_filter; +mod blockhash_poller; +mod transaction_builder; +mod transaction_sender; use anchor_client::{ solana_sdk::{account::Account, commitment_config::CommitmentConfig, signature::Keypair}, @@ -13,6 +15,7 @@ use futures_util::{ pin_mut, SinkExt, StreamExt, TryStreamExt, }; use log::*; +use solana_client::{nonblocking::blockhash_query, nonblocking::rpc_client::RpcClient}; use std::{ collections::{HashMap, HashSet}, convert::identity, @@ -21,7 +24,7 @@ use std::{ net::SocketAddr, str::FromStr, sync::Arc, - sync::Mutex, + sync::{Mutex, atomic::AtomicBool}, time::Duration, }; use tokio::{ @@ -74,10 +77,12 @@ pub struct Config { #[tokio::main] async fn main() -> anyhow::Result<()> { + solana_logger::setup_with_default("info"); + let args: Vec = std::env::args().collect(); if args.len() < 2 { - eprintln!("Please enter a config file path argument."); + error!("Please enter a config file path argument."); return Ok(()); } @@ -88,15 +93,11 @@ async fn main() -> anyhow::Result<()> { toml::from_str(&contents).unwrap() }; - solana_logger::setup_with_default("info"); + let rpc_client = Arc::new(RpcClient::new(config.rpc_http_url.clone())); - let metrics_tx = metrics::start(config.metrics, "fills".into()); + let blockhash = blockhash_poller::init(rpc_client.clone()).await; - let metrics_opened_connections = - metrics_tx.register_u64("fills_feed_opened_connections".into(), MetricType::Counter); - - let metrics_closed_connections = - metrics_tx.register_u64("fills_feed_closed_connections".into(), MetricType::Counter); + let metrics_tx = metrics::start(config.metrics, "crank".into()); let rpc_url = config.rpc_http_url; let ws_url = rpc_url.replace("https", "wss"); @@ -122,6 +123,7 @@ async fn main() -> anyhow::Result<()> { .map(|(_, context)| (context.address, context.market.event_queue)) .collect(); + // fetch all serum/openbook markets to find their event queues let serum_market_pks: Vec = group_context .serum3_markets .iter() @@ -132,6 +134,7 @@ async fn main() -> anyhow::Result<()> { .rpc_async() .get_multiple_accounts(serum_market_pks.as_slice()) .await?; + let serum_market_ais: Vec<&Account> = serum_market_ais .iter() .filter_map(|maybe_ai| match maybe_ai { @@ -154,37 +157,16 @@ async fn main() -> anyhow::Result<()> { }) .collect(); - let a: Vec<(String, String)> = group_context - .serum3_markets - .iter() - .map(|(_, context)| { - ( - context.market.serum_market_external.to_string(), - context.market.name().to_owned(), - ) - }) - .collect(); - let b: Vec<(String, String)> = group_context - .perp_markets - .iter() - .map(|(_, context)| { - ( - context.address.to_string(), - context.market.name().to_owned(), - ) - }) - .collect(); - let market_pubkey_strings: HashMap = [a, b].concat().into_iter().collect(); - - let (account_write_queue_sender, slot_queue_sender, fill_receiver) = fill_event_filter::init( + let (account_write_queue_sender, slot_queue_sender, instruction_receiver) = transaction_builder::init( perp_queue_pks.clone(), serum_queue_pks.clone(), - metrics_tx.clone(), - ) - .await?; + metrics_tx.clone() + ).expect("init transaction builder"); + + transaction_sender::init(instruction_receiver, blockhash, rpc_client, Keypair::new()); info!( - "rpc connect: {}", + "connect: {}", config .source .grpc_sources diff --git a/service-mango-crank/src/transaction_builder.rs b/service-mango-crank/src/transaction_builder.rs new file mode 100644 index 0000000..7cbe234 --- /dev/null +++ b/service-mango-crank/src/transaction_builder.rs @@ -0,0 +1,176 @@ +use crate::Pubkey; +use solana_geyser_connector_lib::{ + chain_data::{AccountData, ChainData, SlotData}, + metrics::Metrics, + serum::SerumEventQueueHeader, + AccountWrite, SlotUpdate, +}; + +use anchor_lang::AccountDeserialize; +use log::*; +use solana_sdk::{ + account::{ReadableAccount, WritableAccount}, + stake_history::Epoch, + instruction::Instruction, +}; +use std::{ + borrow::BorrowMut, + collections::{HashMap, HashSet}, +}; + +pub enum EventQueueFilterMessage {} + +pub fn init( + perp_queue_pks: Vec<(Pubkey, Pubkey)>, + serum_queue_pks: Vec<(Pubkey, Pubkey)>, + metrics_sender: Metrics, +) -> anyhow::Result<( + async_channel::Sender, + async_channel::Sender, + async_channel::Receiver>, +)> { + let metrics_sender = metrics_sender.clone(); + + // The actual message may want to also contain a retry count, if it self-reinserts on failure? + let (account_write_queue_sender, account_write_queue_receiver) = + async_channel::unbounded::(); + + // Slot updates flowing from the outside into the single processing thread. From + // there they'll flow into the postgres sending thread. + let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::(); + + // Event queue updates can be consumed by client connections + let (instruction_sender, instruction_receiver) = async_channel::unbounded::>(); + + let mut chain_cache = ChainData::new(metrics_sender); + let mut perp_events_cache = HashMap::< + String, + [mango_v4::state::AnyEvent; mango_v4::state::MAX_NUM_EVENTS as usize], + >::new(); + let mut serum_events_cache = HashMap::>::new(); + let mut seq_num_cache = HashMap::::new(); + let mut last_evq_versions = HashMap::::new(); + + let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat(); + let relevant_pubkeys = all_queue_pks + .iter() + .map(|m| m.1) + .collect::>(); + + // update handling thread, reads both sloths and account updates + tokio::spawn(async move { + loop { + tokio::select! { + Ok(account_write) = account_write_queue_receiver.recv() => { + if !relevant_pubkeys.contains(&account_write.pubkey) { + continue; + } + + chain_cache.update_account( + account_write.pubkey, + AccountData { + slot: account_write.slot, + write_version: account_write.write_version, + account: WritableAccount::create( + account_write.lamports, + account_write.data.clone(), + account_write.owner, + account_write.executable, + account_write.rent_epoch as Epoch, + ), + }, + ); + } + Ok(slot_update) = slot_queue_receiver.recv() => { + chain_cache.update_slot(SlotData { + slot: slot_update.slot, + parent: slot_update.parent, + status: slot_update.status, + chain: 0, + }); + + } + } + + for mkt in all_queue_pks.iter() { + let last_evq_version = last_evq_versions.get(&mkt.1.to_string()).unwrap_or(&(0, 0)); + let mkt_pk = mkt.1; + + match chain_cache.account(&mkt_pk) { + Ok(account_info) => { + // only process if the account state changed + let evq_version = (account_info.slot, account_info.write_version); + let evq_pk_string = mkt.1.to_string(); + trace!("evq {} write_version {:?}", evq_pk_string, evq_version); + if evq_version == *last_evq_version { + continue; + } + last_evq_versions.insert(evq_pk_string.clone(), evq_version); + + let account = &account_info.account; + let is_perp = mango_v4::check_id(account.owner()); + if is_perp { + let event_queue = mango_v4::state::EventQueue::try_deserialize( + account.data().borrow_mut(), + ) + .unwrap(); + trace!( + "evq {} seq_num {}", + evq_pk_string, + event_queue.header.seq_num + ); + match seq_num_cache.get(&evq_pk_string) { + Some(old_seq_num) => match perp_events_cache.get(&evq_pk_string) { + Some(old_events) => {} + _ => { + info!("perp_events_cache could not find {}", evq_pk_string) + } + }, + _ => info!("seq_num_cache could not find {}", evq_pk_string), + } + + seq_num_cache + .insert(evq_pk_string.clone(), event_queue.header.seq_num.clone()); + perp_events_cache + .insert(evq_pk_string.clone(), event_queue.buf.clone()); + } else { + let inner_data = &account.data()[5..&account.data().len() - 7]; + let header_span = std::mem::size_of::(); + let header: SerumEventQueueHeader = + *bytemuck::from_bytes(&inner_data[..header_span]); + let seq_num = header.seq_num; + let count = header.count; + let rest = &inner_data[header_span..]; + let slop = rest.len() % std::mem::size_of::(); + let new_len = rest.len() - slop; + let events = &rest[..new_len]; + debug!("evq {} header_span {} header_seq_num {} header_count {} inner_len {} events_len {} sizeof Event {}", evq_pk_string, header_span, seq_num, count, inner_data.len(), events.len(), std::mem::size_of::()); + let events: &[serum_dex::state::Event] = bytemuck::cast_slice(&events); + + match seq_num_cache.get(&evq_pk_string) { + Some(old_seq_num) => match serum_events_cache.get(&evq_pk_string) { + Some(old_events) => {} + _ => { + info!("serum_events_cache could not find {}", evq_pk_string) + } + }, + _ => info!("seq_num_cache could not find {}", evq_pk_string), + } + + seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone()); + serum_events_cache + .insert(evq_pk_string.clone(), events.clone().to_vec()); + } + } + Err(_) => info!("chain_cache could not find {}", mkt.1), + } + } + } + }); + + Ok(( + account_write_queue_sender, + slot_queue_sender, + instruction_receiver, + )) +} diff --git a/service-mango-crank/src/transaction_sender.rs b/service-mango-crank/src/transaction_sender.rs new file mode 100644 index 0000000..d1b083d --- /dev/null +++ b/service-mango-crank/src/transaction_sender.rs @@ -0,0 +1,41 @@ +use std::sync::{Arc, RwLock}; +use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcSendTransactionConfig}; +use solana_sdk::{ + hash::Hash, instruction::Instruction, signature::Keypair, signature::Signer, + transaction::Transaction, +}; +use tokio::spawn; + +pub async fn send_loop( + ixs_rx: async_channel::Receiver>, + blockhash: Arc>, + client: Arc, + keypair: Keypair, +) { + let cfg = RpcSendTransactionConfig { + skip_preflight: true, + ..RpcSendTransactionConfig::default() + }; + loop { + if let Ok(ixs) = ixs_rx.recv().await { + let tx = Transaction::new_signed_with_payer( + &ixs, + Some(&keypair.pubkey()), + &[&keypair], + *blockhash.read().unwrap(), + ); + // TODO: collect metrics + client.send_transaction_with_config(&tx, cfg).await; + } + } +} + +pub fn init( + ixs_rx: async_channel::Receiver>, + blockhash: Arc>, + client: Arc, + keypair: Keypair, +) { + // launch task + spawn(async move { send_loop(ixs_rx, blockhash, client, keypair) }); +}