add blockhash poller & transaction sender

This commit is contained in:
Maximilian Schneider 2023-02-03 19:45:22 +09:00
parent 31149f7e0d
commit e22d30b9fd
7 changed files with 282 additions and 203 deletions

1
Cargo.lock generated
View File

@ -5203,6 +5203,7 @@ dependencies = [
"serde_derive",
"serde_json",
"serum_dex 0.5.10 (git+https://github.com/openbook-dex/program)",
"solana-client",
"solana-geyser-connector-lib",
"solana-logger",
"solana-sdk",

View File

@ -6,6 +6,7 @@ edition = "2018"
[dependencies]
solana-geyser-connector-lib = { path = "../lib" }
solana-client = "1"
solana-logger = "1"
solana-sdk = "1"
bs58 = "*"

View File

@ -0,0 +1,44 @@
use log::*;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{clock::DEFAULT_MS_PER_SLOT, hash::Hash};
use std::{
sync::{Arc, RwLock},
time::Duration,
};
use tokio::{spawn, time::sleep};
const RETRY_INTERVAL: Duration = Duration::from_millis(5 * DEFAULT_MS_PER_SLOT);
pub async fn poll_loop(blockhash: Arc<RwLock<Hash>>, client: Arc<RpcClient>) {
loop {
let old_blockhash = *blockhash.read().unwrap();
if let Ok(new_blockhash) = client.get_latest_blockhash().await {
if new_blockhash != old_blockhash {
debug!("new blockhash ({:?})", blockhash);
*blockhash.write().unwrap() = new_blockhash;
}
}
// Retry every few slots
sleep(RETRY_INTERVAL).await;
}
}
pub async fn init(client: Arc<RpcClient>) -> Arc<RwLock<Hash>> {
// get the first blockhash
let blockhash = Arc::new(RwLock::new(
client
.get_latest_blockhash()
.await
.expect("fetch initial blockhash"),
));
// launch task
let join_hdl = {
// create a thread-local reference to blockhash
let blockhash_c = blockhash.clone();
spawn(async move { poll_loop(blockhash_c, client) })
};
return blockhash;
}

View File

@ -1,166 +0,0 @@
use crate::Pubkey;
use solana_geyser_connector_lib::{AccountWrite, metrics::Metrics, SlotUpdate, chain_data::{ChainData, AccountData, SlotData}, serum::SerumEventQueueHeader};
use solana_sdk::{account::{WritableAccount, ReadableAccount}, stake_history::Epoch};
use std::{borrow::BorrowMut, collections::{HashMap, HashSet}};
use log::*;
use anchor_lang::AccountDeserialize;
pub enum EventQueueFilterMessage {}
pub async fn init(
perp_queue_pks: Vec<(Pubkey, Pubkey)>,
serum_queue_pks: Vec<(Pubkey, Pubkey)>,
metrics_sender: Metrics,
) -> anyhow::Result<(
async_channel::Sender<AccountWrite>,
async_channel::Sender<SlotUpdate>,
async_channel::Receiver<EventQueueFilterMessage>,
)> {
let metrics_sender = metrics_sender.clone();
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
let (account_write_queue_sender, account_write_queue_receiver) =
async_channel::unbounded::<AccountWrite>();
// Slot updates flowing from the outside into the single processing thread. From
// there they'll flow into the postgres sending thread.
let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>();
// Event queue updates can be consumed by client connections
let (filter_update_sender, filter_update_receiver) =
async_channel::unbounded::<EventQueueFilterMessage>();
let mut chain_cache = ChainData::new(metrics_sender);
let mut perp_events_cache = HashMap::<String, [mango_v4::state::AnyEvent; mango_v4::state::MAX_NUM_EVENTS as usize]>::new();
let mut serum_events_cache = HashMap::<String, Vec<serum_dex::state::Event>>::new();
let mut seq_num_cache = HashMap::<String, u64>::new();
let mut last_evq_versions = HashMap::<String, (u64, u64)>::new();
let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat();
let relevant_pubkeys = all_queue_pks
.iter()
.map(|m| m.1)
.collect::<HashSet<Pubkey>>();
// update handling thread, reads both sloths and account updates
tokio::spawn(async move {
loop {
tokio::select! {
Ok(account_write) = account_write_queue_receiver.recv() => {
if !relevant_pubkeys.contains(&account_write.pubkey) {
continue;
}
chain_cache.update_account(
account_write.pubkey,
AccountData {
slot: account_write.slot,
write_version: account_write.write_version,
account: WritableAccount::create(
account_write.lamports,
account_write.data.clone(),
account_write.owner,
account_write.executable,
account_write.rent_epoch as Epoch,
),
},
);
}
Ok(slot_update) = slot_queue_receiver.recv() => {
chain_cache.update_slot(SlotData {
slot: slot_update.slot,
parent: slot_update.parent,
status: slot_update.status,
chain: 0,
});
}
}
for mkt in all_queue_pks.iter() {
let last_evq_version = last_evq_versions.get(&mkt.1.to_string()).unwrap_or(&(0, 0));
let mkt_pk = mkt.1;
match chain_cache.account(&mkt_pk) {
Ok(account_info) => {
// only process if the account state changed
let evq_version = (account_info.slot, account_info.write_version);
let evq_pk_string = mkt.1.to_string();
trace!("evq {} write_version {:?}", evq_pk_string, evq_version);
if evq_version == *last_evq_version {
continue;
}
last_evq_versions.insert(evq_pk_string.clone(), evq_version);
let account = &account_info.account;
let is_perp = mango_v4::check_id(account.owner());
if is_perp {
let event_queue =
mango_v4::state::EventQueue::try_deserialize(account.data().borrow_mut()).unwrap();
trace!(
"evq {} seq_num {}",
evq_pk_string,
event_queue.header.seq_num
);
match seq_num_cache.get(&evq_pk_string) {
Some(old_seq_num) => match perp_events_cache.get(&evq_pk_string) {
Some(old_events) => {
},
_ => {
info!("perp_events_cache could not find {}", evq_pk_string)
}
},
_ => info!("seq_num_cache could not find {}", evq_pk_string),
}
seq_num_cache
.insert(evq_pk_string.clone(), event_queue.header.seq_num.clone());
perp_events_cache
.insert(evq_pk_string.clone(), event_queue.buf.clone());
} else {
let inner_data = &account.data()[5..&account.data().len() - 7];
let header_span = std::mem::size_of::<SerumEventQueueHeader>();
let header: SerumEventQueueHeader =
*bytemuck::from_bytes(&inner_data[..header_span]);
let seq_num = header.seq_num;
let count = header.count;
let rest = &inner_data[header_span..];
let slop = rest.len() % std::mem::size_of::<serum_dex::state::Event>();
let new_len = rest.len() - slop;
let events = &rest[..new_len];
debug!("evq {} header_span {} header_seq_num {} header_count {} inner_len {} events_len {} sizeof Event {}", evq_pk_string, header_span, seq_num, count, inner_data.len(), events.len(), std::mem::size_of::<serum_dex::state::Event>());
let events: &[serum_dex::state::Event] = bytemuck::cast_slice(&events);
match seq_num_cache.get(&evq_pk_string) {
Some(old_seq_num) => match serum_events_cache.get(&evq_pk_string) {
Some(old_events) => {
},
_ => {
info!("serum_events_cache could not find {}", evq_pk_string)
}
},
_ => info!("seq_num_cache could not find {}", evq_pk_string),
}
seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone());
serum_events_cache
.insert(evq_pk_string.clone(), events.clone().to_vec());
}
}
Err(_) => info!("chain_cache could not find {}", mkt.1),
}
}
}
});
Ok((
account_write_queue_sender,
slot_queue_sender,
filter_update_receiver,
))
}

View File

@ -1,4 +1,6 @@
mod event_queue_filter;
mod blockhash_poller;
mod transaction_builder;
mod transaction_sender;
use anchor_client::{
solana_sdk::{account::Account, commitment_config::CommitmentConfig, signature::Keypair},
@ -13,6 +15,7 @@ use futures_util::{
pin_mut, SinkExt, StreamExt, TryStreamExt,
};
use log::*;
use solana_client::{nonblocking::blockhash_query, nonblocking::rpc_client::RpcClient};
use std::{
collections::{HashMap, HashSet},
convert::identity,
@ -21,7 +24,7 @@ use std::{
net::SocketAddr,
str::FromStr,
sync::Arc,
sync::Mutex,
sync::{Mutex, atomic::AtomicBool},
time::Duration,
};
use tokio::{
@ -74,10 +77,12 @@ pub struct Config {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
solana_logger::setup_with_default("info");
let args: Vec<String> = std::env::args().collect();
if args.len() < 2 {
eprintln!("Please enter a config file path argument.");
error!("Please enter a config file path argument.");
return Ok(());
}
@ -88,15 +93,11 @@ async fn main() -> anyhow::Result<()> {
toml::from_str(&contents).unwrap()
};
solana_logger::setup_with_default("info");
let rpc_client = Arc::new(RpcClient::new(config.rpc_http_url.clone()));
let metrics_tx = metrics::start(config.metrics, "fills".into());
let blockhash = blockhash_poller::init(rpc_client.clone()).await;
let metrics_opened_connections =
metrics_tx.register_u64("fills_feed_opened_connections".into(), MetricType::Counter);
let metrics_closed_connections =
metrics_tx.register_u64("fills_feed_closed_connections".into(), MetricType::Counter);
let metrics_tx = metrics::start(config.metrics, "crank".into());
let rpc_url = config.rpc_http_url;
let ws_url = rpc_url.replace("https", "wss");
@ -122,6 +123,7 @@ async fn main() -> anyhow::Result<()> {
.map(|(_, context)| (context.address, context.market.event_queue))
.collect();
// fetch all serum/openbook markets to find their event queues
let serum_market_pks: Vec<Pubkey> = group_context
.serum3_markets
.iter()
@ -132,6 +134,7 @@ async fn main() -> anyhow::Result<()> {
.rpc_async()
.get_multiple_accounts(serum_market_pks.as_slice())
.await?;
let serum_market_ais: Vec<&Account> = serum_market_ais
.iter()
.filter_map(|maybe_ai| match maybe_ai {
@ -154,37 +157,16 @@ async fn main() -> anyhow::Result<()> {
})
.collect();
let a: Vec<(String, String)> = group_context
.serum3_markets
.iter()
.map(|(_, context)| {
(
context.market.serum_market_external.to_string(),
context.market.name().to_owned(),
)
})
.collect();
let b: Vec<(String, String)> = group_context
.perp_markets
.iter()
.map(|(_, context)| {
(
context.address.to_string(),
context.market.name().to_owned(),
)
})
.collect();
let market_pubkey_strings: HashMap<String, String> = [a, b].concat().into_iter().collect();
let (account_write_queue_sender, slot_queue_sender, fill_receiver) = fill_event_filter::init(
let (account_write_queue_sender, slot_queue_sender, instruction_receiver) = transaction_builder::init(
perp_queue_pks.clone(),
serum_queue_pks.clone(),
metrics_tx.clone(),
)
.await?;
metrics_tx.clone()
).expect("init transaction builder");
transaction_sender::init(instruction_receiver, blockhash, rpc_client, Keypair::new());
info!(
"rpc connect: {}",
"connect: {}",
config
.source
.grpc_sources

View File

@ -0,0 +1,176 @@
use crate::Pubkey;
use solana_geyser_connector_lib::{
chain_data::{AccountData, ChainData, SlotData},
metrics::Metrics,
serum::SerumEventQueueHeader,
AccountWrite, SlotUpdate,
};
use anchor_lang::AccountDeserialize;
use log::*;
use solana_sdk::{
account::{ReadableAccount, WritableAccount},
stake_history::Epoch,
instruction::Instruction,
};
use std::{
borrow::BorrowMut,
collections::{HashMap, HashSet},
};
pub enum EventQueueFilterMessage {}
pub fn init(
perp_queue_pks: Vec<(Pubkey, Pubkey)>,
serum_queue_pks: Vec<(Pubkey, Pubkey)>,
metrics_sender: Metrics,
) -> anyhow::Result<(
async_channel::Sender<AccountWrite>,
async_channel::Sender<SlotUpdate>,
async_channel::Receiver<Vec<Instruction>>,
)> {
let metrics_sender = metrics_sender.clone();
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
let (account_write_queue_sender, account_write_queue_receiver) =
async_channel::unbounded::<AccountWrite>();
// Slot updates flowing from the outside into the single processing thread. From
// there they'll flow into the postgres sending thread.
let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>();
// Event queue updates can be consumed by client connections
let (instruction_sender, instruction_receiver) = async_channel::unbounded::<Vec<Instruction>>();
let mut chain_cache = ChainData::new(metrics_sender);
let mut perp_events_cache = HashMap::<
String,
[mango_v4::state::AnyEvent; mango_v4::state::MAX_NUM_EVENTS as usize],
>::new();
let mut serum_events_cache = HashMap::<String, Vec<serum_dex::state::Event>>::new();
let mut seq_num_cache = HashMap::<String, u64>::new();
let mut last_evq_versions = HashMap::<String, (u64, u64)>::new();
let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat();
let relevant_pubkeys = all_queue_pks
.iter()
.map(|m| m.1)
.collect::<HashSet<Pubkey>>();
// update handling thread, reads both sloths and account updates
tokio::spawn(async move {
loop {
tokio::select! {
Ok(account_write) = account_write_queue_receiver.recv() => {
if !relevant_pubkeys.contains(&account_write.pubkey) {
continue;
}
chain_cache.update_account(
account_write.pubkey,
AccountData {
slot: account_write.slot,
write_version: account_write.write_version,
account: WritableAccount::create(
account_write.lamports,
account_write.data.clone(),
account_write.owner,
account_write.executable,
account_write.rent_epoch as Epoch,
),
},
);
}
Ok(slot_update) = slot_queue_receiver.recv() => {
chain_cache.update_slot(SlotData {
slot: slot_update.slot,
parent: slot_update.parent,
status: slot_update.status,
chain: 0,
});
}
}
for mkt in all_queue_pks.iter() {
let last_evq_version = last_evq_versions.get(&mkt.1.to_string()).unwrap_or(&(0, 0));
let mkt_pk = mkt.1;
match chain_cache.account(&mkt_pk) {
Ok(account_info) => {
// only process if the account state changed
let evq_version = (account_info.slot, account_info.write_version);
let evq_pk_string = mkt.1.to_string();
trace!("evq {} write_version {:?}", evq_pk_string, evq_version);
if evq_version == *last_evq_version {
continue;
}
last_evq_versions.insert(evq_pk_string.clone(), evq_version);
let account = &account_info.account;
let is_perp = mango_v4::check_id(account.owner());
if is_perp {
let event_queue = mango_v4::state::EventQueue::try_deserialize(
account.data().borrow_mut(),
)
.unwrap();
trace!(
"evq {} seq_num {}",
evq_pk_string,
event_queue.header.seq_num
);
match seq_num_cache.get(&evq_pk_string) {
Some(old_seq_num) => match perp_events_cache.get(&evq_pk_string) {
Some(old_events) => {}
_ => {
info!("perp_events_cache could not find {}", evq_pk_string)
}
},
_ => info!("seq_num_cache could not find {}", evq_pk_string),
}
seq_num_cache
.insert(evq_pk_string.clone(), event_queue.header.seq_num.clone());
perp_events_cache
.insert(evq_pk_string.clone(), event_queue.buf.clone());
} else {
let inner_data = &account.data()[5..&account.data().len() - 7];
let header_span = std::mem::size_of::<SerumEventQueueHeader>();
let header: SerumEventQueueHeader =
*bytemuck::from_bytes(&inner_data[..header_span]);
let seq_num = header.seq_num;
let count = header.count;
let rest = &inner_data[header_span..];
let slop = rest.len() % std::mem::size_of::<serum_dex::state::Event>();
let new_len = rest.len() - slop;
let events = &rest[..new_len];
debug!("evq {} header_span {} header_seq_num {} header_count {} inner_len {} events_len {} sizeof Event {}", evq_pk_string, header_span, seq_num, count, inner_data.len(), events.len(), std::mem::size_of::<serum_dex::state::Event>());
let events: &[serum_dex::state::Event] = bytemuck::cast_slice(&events);
match seq_num_cache.get(&evq_pk_string) {
Some(old_seq_num) => match serum_events_cache.get(&evq_pk_string) {
Some(old_events) => {}
_ => {
info!("serum_events_cache could not find {}", evq_pk_string)
}
},
_ => info!("seq_num_cache could not find {}", evq_pk_string),
}
seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone());
serum_events_cache
.insert(evq_pk_string.clone(), events.clone().to_vec());
}
}
Err(_) => info!("chain_cache could not find {}", mkt.1),
}
}
}
});
Ok((
account_write_queue_sender,
slot_queue_sender,
instruction_receiver,
))
}

View File

@ -0,0 +1,41 @@
use std::sync::{Arc, RwLock};
use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcSendTransactionConfig};
use solana_sdk::{
hash::Hash, instruction::Instruction, signature::Keypair, signature::Signer,
transaction::Transaction,
};
use tokio::spawn;
pub async fn send_loop(
ixs_rx: async_channel::Receiver<Vec<Instruction>>,
blockhash: Arc<RwLock<Hash>>,
client: Arc<RpcClient>,
keypair: Keypair,
) {
let cfg = RpcSendTransactionConfig {
skip_preflight: true,
..RpcSendTransactionConfig::default()
};
loop {
if let Ok(ixs) = ixs_rx.recv().await {
let tx = Transaction::new_signed_with_payer(
&ixs,
Some(&keypair.pubkey()),
&[&keypair],
*blockhash.read().unwrap(),
);
// TODO: collect metrics
client.send_transaction_with_config(&tx, cfg).await;
}
}
}
pub fn init(
ixs_rx: async_channel::Receiver<Vec<Instruction>>,
blockhash: Arc<RwLock<Hash>>,
client: Arc<RpcClient>,
keypair: Keypair,
) {
// launch task
spawn(async move { send_loop(ixs_rx, blockhash, client, keypair) });
}