Merge pull request #24 from mschneider/max/crank

integrate crank
This commit is contained in:
galactus 2023-03-11 09:19:28 +01:00 committed by GitHub
commit 6cc3444d97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 3358 additions and 215 deletions

1258
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -27,7 +27,7 @@ serde_json = "1.0.79"
serde_yaml = "0.8.23"
iter_tools = "0.1.4"
mango = { git = "https://github.com/blockworks-foundation/mango-v3.git", branch = "mango_bencher_compatible", default-features = false, features = ["no-entrypoint"] }
mango = { git = "https://github.com/blockworks-foundation/mango-v3.git", branch = "mango_bencher_compatible", default-features = false }
mango-common = { git = "https://github.com/blockworks-foundation/mango-v3.git", branch = "mango_bencher_compatible" }
solana-client = { git = "https://github.com/solana-labs/solana.git", branch="v1.15" }
@ -43,10 +43,37 @@ solana-version = { git = "https://github.com/solana-labs/solana.git", branch="v1
solana-logger = { git = "https://github.com/solana-labs/solana.git", branch="v1.15" }
solana-transaction-status = { git = "https://github.com/solana-labs/solana.git", branch="v1.15" }
solana-quic-client = { git = "https://github.com/solana-labs/solana.git", branch="v1.15" }
solana-account-decoder = { git = "https://github.com/solana-labs/solana.git", branch="v1.15" }
# pin program to mango-v3 version of solana sdk
# now we can use sdk for recent version and program for legacy
# we have a bunch of helpers to convert between the two explicitly
solana-program = "1.9.17"
thiserror = "1.0"
solana-program = ">=1.9.0"
csv = "1.0.0"
tonic = { version = "0.8.2", features = ["gzip", "tls", "tls-roots"] }
tokio = { version = "1", features = ["full"] }
anyhow = "1.0"
async-channel = "1.6"
async-trait = "0.1"
prost = "0.11"
warp = "0.3"
futures = "0.3.17"
jsonrpc-core = "18.0.0"
jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] }
arrayref = "*"
bytemuck = "1.7.2"
toml = "*"
[build-dependencies]
anyhow = "1.0.62"
cargo-lock = "8.0.2"
git-version = "0.3.5"
protobuf-src = "1.1.0"
tonic-build = "0.8.2"
vergen = "=7.2.1"
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
@ -54,4 +81,8 @@ targets = ["x86_64-unknown-linux-gnu"]
exclude = [
"deps/solana",
"deps/mango-v3",
]
]
[patch.crates-io]
# for gzip encoded responses
jsonrpc-core-client = { git = "https://github.com/ckamm/jsonrpc.git", branch = "ckamm/http-with-gzip" }

View File

@ -4,6 +4,8 @@ This project is use to stress a solana cluster like devnet, testnet or local sol
The code then will create transaction request (q) requests per seconds for (n) seconds per perp market perp user. Each transaction request will contains remove following instruction CancelAllPerpOrders and two PlacePerpOrder (one for bid and another for ask).
For the best results to avoid limits by quic it is better to fill the argument "identity" of a valid staked validator for the cluster you are testing with.
## Build
Clone repo
@ -59,4 +61,4 @@ OPTIONS:
-t, --transaction_save_file <FILENAME> To save details of all transactions during a run
--ws <URL> WebSocket URL for the solana cluster
```
```

44
build.rs Normal file
View File

@ -0,0 +1,44 @@
use {
cargo_lock::Lockfile,
std::collections::HashSet,
vergen::{vergen, Config},
};
fn main() -> anyhow::Result<()> {
compile_protos()?;
generate_env()?;
Ok(())
}
fn compile_protos() -> anyhow::Result<()> {
std::env::set_var("PROTOC", protobuf_src::protoc());
tonic_build::compile_protos("./proto/geyser.proto")?;
Ok(())
}
fn generate_env() -> anyhow::Result<()> {
vergen(Config::default())?;
// vergen git version does not looks cool
println!(
"cargo:rustc-env=GIT_VERSION={}",
git_version::git_version!()
);
// Extract Solana version
let lockfile = Lockfile::load("./Cargo.lock")?;
println!(
"cargo:rustc-env=SOLANA_SDK_VERSION={}",
lockfile
.packages
.iter()
.filter(|pkg| pkg.name.as_str() == "solana-sdk")
.map(|pkg| pkg.version.to_string())
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>()
.join(",")
);
Ok(())
}

116
proto/geyser.proto Normal file
View File

@ -0,0 +1,116 @@
syntax = "proto3";
import public "solana-storage-v1.15.2.proto";
option go_package = "github.com/rpcpool/solana-geyser-grpc/golang/proto";
package geyser;
service Geyser {
rpc Subscribe(stream SubscribeRequest) returns (stream SubscribeUpdate) {}
}
message SubscribeRequest {
map<string, SubscribeRequestFilterAccounts> accounts = 1;
map<string, SubscribeRequestFilterSlots> slots = 2;
map<string, SubscribeRequestFilterTransactions> transactions = 3;
map<string, SubscribeRequestFilterBlocks> blocks = 4;
map<string, SubscribeRequestFilterBlocksMeta> blocks_meta = 5;
}
message SubscribeRequestFilterAccounts {
repeated string account = 2;
repeated string owner = 3;
}
message SubscribeRequestFilterSlots {}
message SubscribeRequestFilterTransactions {
optional bool vote = 1;
optional bool failed = 2;
optional string signature = 5;
repeated string account_include = 3;
repeated string account_exclude = 4;
}
message SubscribeRequestFilterBlocks {}
message SubscribeRequestFilterBlocksMeta {}
message SubscribeUpdate {
repeated string filters = 1;
oneof update_oneof {
SubscribeUpdateAccount account = 2;
SubscribeUpdateSlot slot = 3;
SubscribeUpdateTransaction transaction = 4;
SubscribeUpdateBlock block = 5;
SubscribeUpdatePing ping = 6;
SubscribeUpdateBlockMeta block_meta = 7;
}
}
message SubscribeUpdateAccount {
SubscribeUpdateAccountInfo account = 1;
uint64 slot = 2;
bool is_startup = 3;
}
message SubscribeUpdateAccountInfo {
bytes pubkey = 1;
uint64 lamports = 2;
bytes owner = 3;
bool executable = 4;
uint64 rent_epoch = 5;
bytes data = 6;
uint64 write_version = 7;
optional bytes txn_signature = 8;
}
message SubscribeUpdateSlot {
uint64 slot = 1;
optional uint64 parent = 2;
SubscribeUpdateSlotStatus status = 3;
}
enum SubscribeUpdateSlotStatus {
PROCESSED = 0;
CONFIRMED = 1;
FINALIZED = 2;
}
message SubscribeUpdateTransaction {
SubscribeUpdateTransactionInfo transaction = 1;
uint64 slot = 2;
}
message SubscribeUpdateTransactionInfo {
bytes signature = 1;
bool is_vote = 2;
solana.storage.ConfirmedBlock.Transaction transaction = 3;
solana.storage.ConfirmedBlock.TransactionStatusMeta meta = 4;
uint64 index = 5;
}
message SubscribeUpdateBlock {
uint64 slot = 1;
string blockhash = 2;
solana.storage.ConfirmedBlock.Rewards rewards = 3;
solana.storage.ConfirmedBlock.UnixTimestamp block_time = 4;
solana.storage.ConfirmedBlock.BlockHeight block_height = 5;
repeated SubscribeUpdateTransactionInfo transactions = 6;
uint64 parent_slot = 7;
string parent_blockhash = 8;
}
message SubscribeUpdateBlockMeta {
uint64 slot = 1;
string blockhash = 2;
solana.storage.ConfirmedBlock.Rewards rewards = 3;
solana.storage.ConfirmedBlock.UnixTimestamp block_time = 4;
solana.storage.ConfirmedBlock.BlockHeight block_height = 5;
uint64 parent_slot = 6;
string parent_blockhash = 7;
uint64 executed_transaction_count = 8;
}
message SubscribeUpdatePing {}

View File

@ -0,0 +1,143 @@
syntax = "proto3";
package solana.storage.ConfirmedBlock;
option go_package = "github.com/rpcpool/solana-geyser-grpc/golang/proto";
message ConfirmedBlock {
string previous_blockhash = 1;
string blockhash = 2;
uint64 parent_slot = 3;
repeated ConfirmedTransaction transactions = 4;
repeated Reward rewards = 5;
UnixTimestamp block_time = 6;
BlockHeight block_height = 7;
}
message ConfirmedTransaction {
Transaction transaction = 1;
TransactionStatusMeta meta = 2;
}
message Transaction {
repeated bytes signatures = 1;
Message message = 2;
}
message Message {
MessageHeader header = 1;
repeated bytes account_keys = 2;
bytes recent_blockhash = 3;
repeated CompiledInstruction instructions = 4;
bool versioned = 5;
repeated MessageAddressTableLookup address_table_lookups = 6;
}
message MessageHeader {
uint32 num_required_signatures = 1;
uint32 num_readonly_signed_accounts = 2;
uint32 num_readonly_unsigned_accounts = 3;
}
message MessageAddressTableLookup {
bytes account_key = 1;
bytes writable_indexes = 2;
bytes readonly_indexes = 3;
}
message TransactionStatusMeta {
TransactionError err = 1;
uint64 fee = 2;
repeated uint64 pre_balances = 3;
repeated uint64 post_balances = 4;
repeated InnerInstructions inner_instructions = 5;
bool inner_instructions_none = 10;
repeated string log_messages = 6;
bool log_messages_none = 11;
repeated TokenBalance pre_token_balances = 7;
repeated TokenBalance post_token_balances = 8;
repeated Reward rewards = 9;
repeated bytes loaded_writable_addresses = 12;
repeated bytes loaded_readonly_addresses = 13;
ReturnData return_data = 14;
bool return_data_none = 15;
// Sum of compute units consumed by all instructions.
// Available since Solana v1.10.35 / v1.11.6.
// Set to `None` for txs executed on earlier versions.
optional uint64 compute_units_consumed = 16;
}
message TransactionError {
bytes err = 1;
}
message InnerInstructions {
uint32 index = 1;
repeated InnerInstruction instructions = 2;
}
message InnerInstruction {
uint32 program_id_index = 1;
bytes accounts = 2;
bytes data = 3;
// Invocation stack height of an inner instruction.
// Available since Solana v1.14.6
// Set to `None` for txs executed on earlier versions.
optional uint32 stack_height = 4;
}
message CompiledInstruction {
uint32 program_id_index = 1;
bytes accounts = 2;
bytes data = 3;
}
message TokenBalance {
uint32 account_index = 1;
string mint = 2;
UiTokenAmount ui_token_amount = 3;
string owner = 4;
string program_id = 5;
}
message UiTokenAmount {
double ui_amount = 1;
uint32 decimals = 2;
string amount = 3;
string ui_amount_string = 4;
}
message ReturnData {
bytes program_id = 1;
bytes data = 2;
}
enum RewardType {
Unspecified = 0;
Fee = 1;
Rent = 2;
Staking = 3;
Voting = 4;
}
message Reward {
string pubkey = 1;
int64 lamports = 2;
uint64 post_balance = 3;
RewardType reward_type = 4;
string commission = 5;
}
message Rewards {
repeated Reward rewards = 1;
}
message UnixTimestamp {
int64 timestamp = 1;
}
message BlockHeight {
uint64 block_height = 1;
}

146
src/account_write_filter.rs Normal file
View File

@ -0,0 +1,146 @@
use crate::{
chain_data::{AccountData, AccountWrite, ChainData, SlotData, SlotUpdate},
metrics::Metrics,
};
use async_trait::async_trait;
use log::*;
use solana_sdk::{account::WritableAccount, pubkey::Pubkey, stake_history::Epoch};
use std::{
collections::{BTreeSet, HashMap},
sync::Arc,
time::{Duration, Instant},
};
#[async_trait]
pub trait AccountWriteSink {
async fn process(&self, pubkey: &Pubkey, account: &AccountData) -> Result<(), String>;
}
#[derive(Clone)]
pub struct AccountWriteRoute {
pub matched_pubkeys: Vec<Pubkey>,
pub sink: Arc<dyn AccountWriteSink + Send + Sync>,
pub timeout_interval: Duration,
}
#[derive(Clone, Debug)]
struct AcountWriteRecord {
slot: u64,
write_version: u64,
timestamp: Instant,
}
pub fn init(
routes: Vec<AccountWriteRoute>,
metrics_sender: Metrics,
) -> anyhow::Result<(
async_channel::Sender<AccountWrite>,
async_channel::Sender<SlotUpdate>,
)> {
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
let (account_write_queue_sender, account_write_queue_receiver) =
async_channel::unbounded::<AccountWrite>();
// Slot updates flowing from the outside into the single processing thread. From
// there they'll flow into the postgres sending thread.
let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>();
let mut chain_data = ChainData::new(metrics_sender);
let mut last_updated = HashMap::<String, AcountWriteRecord>::new();
let all_queue_pks: BTreeSet<Pubkey> = routes
.iter()
.flat_map(|r| r.matched_pubkeys.iter())
.map(|pk| pk.clone())
.collect();
// update handling thread, reads both sloths and account updates
tokio::spawn(async move {
loop {
tokio::select! {
Ok(account_write) = account_write_queue_receiver.recv() => {
if !all_queue_pks.contains(&account_write.pubkey) {
trace!("account write skipped {:?}", account_write.pubkey);
continue;
}
trace!("account write processed {:?}", account_write.pubkey);
chain_data.update_account(
account_write.pubkey,
AccountData {
slot: account_write.slot,
write_version: account_write.write_version,
account: WritableAccount::create(
account_write.lamports,
account_write.data.clone(),
account_write.owner,
account_write.executable,
account_write.rent_epoch as Epoch,
),
},
);
}
Ok(slot_update) = slot_queue_receiver.recv() => {
trace!("slot {:?}", slot_update);
chain_data.update_slot(SlotData {
slot: slot_update.slot,
parent: slot_update.parent,
status: slot_update.status,
chain: 0,
});
}
}
trace!("propagate chain data downstream");
for route in routes.iter() {
for pk in route.matched_pubkeys.iter() {
match chain_data.account(&pk) {
Ok(account_info) => {
let pk_b58 = pk.to_string();
if let Some(record) = last_updated.get(&pk_b58) {
let is_unchanged = account_info.slot == record.slot
&& account_info.write_version == record.write_version;
let is_throttled =
record.timestamp.elapsed() < route.timeout_interval;
if is_unchanged || is_throttled {
trace!("skipped is_unchanged={is_unchanged} is_throttled={is_throttled} {pk_b58}");
continue;
}
};
trace!("process {pk_b58}");
match route.sink.process(pk, account_info).await {
Ok(()) => {
// todo: metrics
last_updated.insert(
pk_b58.clone(),
AcountWriteRecord {
slot: account_info.slot,
write_version: account_info.write_version,
timestamp: Instant::now(),
},
);
}
Err(_skip_reason) => {
// todo: metrics
}
}
}
Err(_) => {
// todo: metrics
}
}
}
}
}
});
Ok((account_write_queue_sender, slot_queue_sender))
}

268
src/chain_data.rs Normal file
View File

@ -0,0 +1,268 @@
use crate::metrics::{MetricType, MetricU64, Metrics};
use {
solana_sdk::{
account::{Account, AccountSharedData, ReadableAccount},
pubkey::Pubkey,
},
std::collections::HashMap,
};
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum SlotStatus {
Rooted,
Confirmed,
Processed,
}
#[derive(Clone, Debug)]
pub struct SlotData {
pub slot: u64,
pub parent: Option<u64>,
pub status: SlotStatus,
pub chain: u64, // the top slot that this is in a chain with. uncles will have values < tip
}
#[derive(Clone, Debug)]
pub struct AccountData {
pub slot: u64,
pub write_version: u64,
pub account: AccountSharedData,
}
#[derive(Clone, PartialEq, Debug)]
pub struct AccountWrite {
pub pubkey: Pubkey,
pub slot: u64,
pub write_version: u64,
pub lamports: u64,
pub owner: Pubkey,
pub executable: bool,
pub rent_epoch: u64,
pub data: Vec<u8>,
pub is_selected: bool,
}
impl AccountWrite {
pub fn from(pubkey: Pubkey, slot: u64, write_version: u64, account: Account) -> AccountWrite {
AccountWrite {
pubkey,
slot: slot,
write_version,
lamports: account.lamports,
owner: account.owner,
executable: account.executable,
rent_epoch: account.rent_epoch,
data: account.data,
is_selected: true,
}
}
}
#[derive(Clone, Debug)]
pub struct SlotUpdate {
pub slot: u64,
pub parent: Option<u64>,
pub status: SlotStatus,
}
/// Track slots and account writes
///
/// - use account() to retrieve the current best data for an account.
/// - update_from_snapshot() and update_from_websocket() update the state for new messages
pub struct ChainData {
/// only slots >= newest_rooted_slot are retained
slots: HashMap<u64, SlotData>,
/// writes to accounts, only the latest rooted write an newer are retained
accounts: HashMap<Pubkey, Vec<AccountData>>,
newest_rooted_slot: u64,
newest_processed_slot: u64,
account_versions_stored: usize,
account_bytes_stored: usize,
metric_accounts_stored: MetricU64,
metric_account_versions_stored: MetricU64,
metric_account_bytes_stored: MetricU64,
}
impl ChainData {
pub fn new(metrics_sender: Metrics) -> Self {
Self {
slots: HashMap::new(),
accounts: HashMap::new(),
newest_rooted_slot: 0,
newest_processed_slot: 0,
account_versions_stored: 0,
account_bytes_stored: 0,
metric_accounts_stored: metrics_sender
.register_u64("chaindata_accounts_stored".into(), MetricType::Gauge),
metric_account_versions_stored: metrics_sender.register_u64(
"chaindata_account_versions_stored".into(),
MetricType::Gauge,
),
metric_account_bytes_stored: metrics_sender
.register_u64("chaindata_account_bytes_stored".into(), MetricType::Gauge),
}
}
pub fn update_slot(&mut self, new_slot: SlotData) {
let new_processed_head = new_slot.slot > self.newest_processed_slot;
if new_processed_head {
self.newest_processed_slot = new_slot.slot;
}
let new_rooted_head =
new_slot.slot > self.newest_rooted_slot && new_slot.status == SlotStatus::Rooted;
if new_rooted_head {
self.newest_rooted_slot = new_slot.slot;
}
let mut parent_update = false;
use std::collections::hash_map::Entry;
match self.slots.entry(new_slot.slot) {
Entry::Vacant(v) => {
v.insert(new_slot);
}
Entry::Occupied(o) => {
let v = o.into_mut();
parent_update = v.parent != new_slot.parent && new_slot.parent.is_some();
v.parent = v.parent.or(new_slot.parent);
v.status = new_slot.status;
}
};
if new_processed_head || parent_update {
// update the "chain" field down to the first rooted slot
let mut slot = self.newest_processed_slot;
loop {
if let Some(data) = self.slots.get_mut(&slot) {
data.chain = self.newest_processed_slot;
if data.status == SlotStatus::Rooted {
break;
}
if let Some(parent) = data.parent {
slot = parent;
continue;
}
}
break;
}
}
if new_rooted_head {
// for each account, preserve only writes > newest_rooted_slot, or the newest
// rooted write
self.account_versions_stored = 0;
self.account_bytes_stored = 0;
for (_, writes) in self.accounts.iter_mut() {
let newest_rooted_write = writes
.iter()
.rev()
.find(|w| {
w.slot <= self.newest_rooted_slot
&& self
.slots
.get(&w.slot)
.map(|s| {
// sometimes we seem not to get notifications about slots
// getting rooted, hence assume non-uncle slots < newest_rooted_slot
// are rooted too
s.status == SlotStatus::Rooted
|| s.chain == self.newest_processed_slot
})
// preserved account writes for deleted slots <= newest_rooted_slot
// are expected to be rooted
.unwrap_or(true)
})
.map(|w| w.slot)
// no rooted write found: produce no effect, since writes > newest_rooted_slot are retained anyway
.unwrap_or(self.newest_rooted_slot + 1);
writes
.retain(|w| w.slot == newest_rooted_write || w.slot > self.newest_rooted_slot);
self.account_versions_stored += writes.len();
self.account_bytes_stored += writes
.iter()
.map(|w| w.account.data().len())
.fold(0, |acc, l| acc + l)
}
// now it's fine to drop any slots before the new rooted head
// as account writes for non-rooted slots before it have been dropped
self.slots.retain(|s, _| *s >= self.newest_rooted_slot);
self.metric_accounts_stored.set(self.accounts.len() as u64);
self.metric_account_versions_stored
.set(self.account_versions_stored as u64);
self.metric_account_bytes_stored
.set(self.account_bytes_stored as u64);
}
}
pub fn update_account(&mut self, pubkey: Pubkey, account: AccountData) {
use std::collections::hash_map::Entry;
match self.accounts.entry(pubkey) {
Entry::Vacant(v) => {
self.account_versions_stored += 1;
self.account_bytes_stored += account.account.data().len();
v.insert(vec![account]);
}
Entry::Occupied(o) => {
let v = o.into_mut();
// v is ordered by slot ascending. find the right position
// overwrite if an entry for the slot already exists, otherwise insert
let rev_pos = v
.iter()
.rev()
.position(|d| d.slot <= account.slot)
.unwrap_or(v.len());
let pos = v.len() - rev_pos;
if pos < v.len() && v[pos].slot == account.slot {
if v[pos].write_version < account.write_version {
v[pos] = account;
}
} else {
self.account_versions_stored += 1;
self.account_bytes_stored += account.account.data().len();
v.insert(pos, account);
}
}
};
}
fn is_account_write_live(&self, write: &AccountData) -> bool {
self.slots
.get(&write.slot)
// either the slot is rooted or in the current chain
.map(|s| s.status == SlotStatus::Rooted || s.chain == self.newest_processed_slot)
// if the slot can't be found but preceeds newest rooted, use it too (old rooted slots are removed)
.unwrap_or(
write.slot <= self.newest_rooted_slot || write.slot > self.newest_processed_slot,
)
}
/// Cloned snapshot of all the most recent live writes per pubkey
pub fn accounts_snapshot(&self) -> HashMap<Pubkey, AccountData> {
self.accounts
.iter()
.filter_map(|(pubkey, writes)| {
let latest_good_write = writes
.iter()
.rev()
.find(|w| self.is_account_write_live(w))?;
Some((pubkey.clone(), latest_good_write.clone()))
})
.collect()
}
/// Ref to the most recent live write of the pubkey
pub fn account<'a>(&'a self, pubkey: &Pubkey) -> anyhow::Result<&'a AccountData> {
self.accounts
.get(pubkey)
.ok_or(anyhow::anyhow!("account {} not found", pubkey))?
.iter()
.rev()
.find(|w| self.is_account_write_live(w))
.ok_or(anyhow::anyhow!("account {} has no live data", pubkey))
}
}

View File

@ -11,9 +11,9 @@ use chrono::Utc;
use crossbeam_channel::{Receiver, TryRecvError};
use log::{debug, error, info, trace};
use solana_client::{rpc_client::RpcClient, rpc_config::RpcBlockConfig};
use solana_program::pubkey::Pubkey;
use solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel},
pubkey::Pubkey,
signature::Signature,
};
use solana_transaction_status::RewardType;

145
src/crank.rs Normal file
View File

@ -0,0 +1,145 @@
use std::{
str::FromStr,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, RwLock,
},
thread::Builder,
time::Duration,
};
// use solana_client::rpc_client::RpcClient;
use crate::{
account_write_filter::{self, AccountWriteRoute},
grpc_plugin_source::FilterConfig,
mango::GroupConfig,
mango_v3_perp_crank_sink::MangoV3PerpCrankSink,
metrics,
states::TransactionSendRecord,
websocket_source::{self, KeeperConfig},
};
use crossbeam_channel::{unbounded, Sender};
use log::*;
use solana_client::tpu_client::TpuClient;
use solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool};
use solana_sdk::{
hash::Hash, instruction::Instruction, pubkey::Pubkey, signature::Keypair, signer::Signer,
transaction::Transaction,
};
pub fn start(
config: KeeperConfig,
_tx_record_sx: Sender<TransactionSendRecord>,
exit_signal: Arc<AtomicBool>,
blockhash: Arc<RwLock<Hash>>,
_current_slot: Arc<AtomicU64>,
tpu_client: Arc<TpuClient<QuicPool, QuicConnectionManager, QuicConfig>>,
group: &GroupConfig,
identity: &Keypair,
) {
let perp_queue_pks: Vec<_> = group
.perp_markets
.iter()
.map(|m| {
(
Pubkey::from_str(&m.public_key).unwrap(),
Pubkey::from_str(&m.events_key).unwrap(),
)
})
.collect();
let group_pk = Pubkey::from_str(&group.public_key).unwrap();
let cache_pk = Pubkey::from_str(&group.cache_key).unwrap();
let mango_program_id = Pubkey::from_str(&group.mango_program_id).unwrap();
let filter_config = FilterConfig {
program_ids: vec![group.mango_program_id.clone()],
account_ids: group
.perp_markets
.iter()
.map(|m| m.events_key.clone())
.collect(),
};
let (instruction_sender, instruction_receiver) = unbounded::<Vec<Instruction>>();
let identity = Keypair::from_bytes(identity.to_bytes().as_slice()).unwrap();
Builder::new()
.name("crank-tx-sender".into())
.spawn(move || {
info!(
"crank-tx-sender signing with keypair pk={:?}",
identity.pubkey()
);
loop {
if exit_signal.load(Ordering::Acquire) {
break;
}
if let Ok(ixs) = instruction_receiver.recv() {
// TODO add priority fee
let tx = Transaction::new_signed_with_payer(
&ixs,
Some(&identity.pubkey()),
&[&identity],
*blockhash.read().unwrap(),
);
// TODO: find perp market pk and resolve import issue between solana program versions
// tx_record_sx.send(TransactionSendRecord {
// signature: tx.signatures[0],
// sent_at: Utc::now(),
// sent_slot: current_slot.load(Ordering::Acquire),
// market_maker: identity.pubkey(),
// market: c.perp_market_pk,
// });
let ok = tpu_client.send_transaction(&tx);
trace!("send tx={:?} ok={ok}", tx.signatures[0]);
}
}
})
.unwrap();
tokio::spawn(async move {
let metrics_tx = metrics::start(
metrics::MetricsConfig {
output_stdout: true,
output_http: false,
},
"crank".into(),
);
let routes = vec![AccountWriteRoute {
matched_pubkeys: perp_queue_pks
.iter()
.map(|(_, evq_pk)| evq_pk.clone())
.collect(),
sink: Arc::new(MangoV3PerpCrankSink::new(
perp_queue_pks,
group_pk,
cache_pk,
mango_program_id,
instruction_sender,
)),
timeout_interval: Duration::default(),
}];
let (account_write_queue_sender, slot_queue_sender) =
account_write_filter::init(routes, metrics_tx.clone()).expect("filter initializes");
info!("start processing grpc events");
// grpc_plugin_source::process_events(
// &config,
// &filter_config,
// account_write_queue_sender,
// slot_queue_sender,
// metrics_tx.clone(),
// ).await;
websocket_source::process_events(
config,
&filter_config,
account_write_queue_sender,
slot_queue_sender,
)
.await;
});
}

626
src/grpc_plugin_source.rs Normal file
View File

@ -0,0 +1,626 @@
use futures::stream::once;
use geyser::geyser_client::GeyserClient;
use jsonrpc_core::futures::StreamExt;
use jsonrpc_core_client::transports::http;
use serde::Deserialize;
use solana_account_decoder::{UiAccount, UiAccountEncoding};
use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
use solana_client::rpc_response::{OptionalContext, RpcKeyedAccount};
use solana_rpc::rpc::rpc_accounts::AccountsDataClient;
use solana_rpc::rpc::rpc_accounts_scan::AccountsScanClient;
use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey};
use futures::{future, future::FutureExt};
use tonic::{
metadata::MetadataValue,
transport::{Certificate, Channel, ClientTlsConfig, Identity},
Request,
};
use log::*;
use std::{collections::HashMap, env, str::FromStr, time::Duration};
pub mod geyser {
tonic::include_proto!("geyser");
}
pub mod solana {
pub mod storage {
pub mod confirmed_block {
tonic::include_proto!("solana.storage.confirmed_block");
}
}
}
pub use geyser::*;
pub use solana::storage::confirmed_block::*;
use crate::websocket_source::KeeperConfig;
use crate::{
chain_data::{AccountWrite, SlotStatus, SlotUpdate},
metrics::{MetricType, Metrics},
AnyhowWrap,
};
#[derive(Clone, Debug, Deserialize)]
pub struct GrpcSourceConfig {
pub name: String,
pub connection_string: String,
pub retry_connection_sleep_secs: u64,
pub token: String,
}
#[derive(Clone, Debug, Deserialize)]
pub struct TlsConfig {
pub ca_cert_path: String,
pub client_cert_path: String,
pub client_key_path: String,
pub domain_name: String,
}
#[derive(Clone, Debug, Deserialize)]
pub struct FilterConfig {
pub program_ids: Vec<String>,
pub account_ids: Vec<String>,
}
#[derive(Clone, Debug, Deserialize)]
pub struct SnapshotSourceConfig {
pub rpc_http_url: String,
pub program_id: String,
}
#[derive(Clone, Debug)]
pub struct GrpcConfig {
pub dedup_queue_size: usize,
pub grpc_sources: Vec<GrpcSourceConfig>,
}
//use solana_geyser_connector_plugin_grpc::compression::zstd_decompress;
struct SnapshotData {
slot: u64,
accounts: Vec<(String, Option<UiAccount>)>,
}
enum Message {
GrpcUpdate(geyser::SubscribeUpdate),
Snapshot(SnapshotData),
}
async fn get_snapshot_gpa(
rpc_http_url: String,
program_id: String,
) -> anyhow::Result<OptionalContext<Vec<RpcKeyedAccount>>> {
let rpc_client = http::connect_with_options::<AccountsScanClient>(&rpc_http_url, true)
.await
.map_err_anyhow()?;
let account_info_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::finalized()),
data_slice: None,
min_context_slot: None,
};
let program_accounts_config = RpcProgramAccountsConfig {
filters: None,
with_context: Some(true),
account_config: account_info_config.clone(),
};
info!("requesting snapshot {}", program_id);
let account_snapshot = rpc_client
.get_program_accounts(program_id.clone(), Some(program_accounts_config.clone()))
.await
.map_err_anyhow()?;
info!("snapshot received {}", program_id);
Ok(account_snapshot)
}
async fn get_snapshot_gma(
rpc_http_url: String,
ids: Vec<String>,
) -> anyhow::Result<solana_client::rpc_response::Response<Vec<Option<UiAccount>>>> {
let rpc_client = http::connect_with_options::<AccountsDataClient>(&rpc_http_url, true)
.await
.map_err_anyhow()?;
let account_info_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::finalized()),
data_slice: None,
min_context_slot: None,
};
info!("requesting snapshot {:?}", ids);
let account_snapshot = rpc_client
.get_multiple_accounts(ids.clone(), Some(account_info_config))
.await
.map_err_anyhow()?;
info!("snapshot received {:?}", ids);
Ok(account_snapshot)
}
async fn feed_data_geyser(
grpc_config: &GrpcSourceConfig,
keeper_config: &KeeperConfig,
filter_config: &FilterConfig,
sender: async_channel::Sender<Message>,
) -> anyhow::Result<()> {
let connection_string = match &grpc_config.connection_string.chars().next().unwrap() {
'$' => env::var(&grpc_config.connection_string[1..])
.expect("reading connection string from env"),
_ => grpc_config.connection_string.clone(),
};
let rpc_http_url = match &keeper_config.rpc_url.chars().next().unwrap() {
'$' => env::var(&keeper_config.rpc_url[1..]).expect("reading connection string from env"),
_ => keeper_config.rpc_url.clone(),
};
info!("connecting to {}", connection_string);
let res = Channel::from_shared(connection_string.clone());
let endpoint = res.map(|e| {
if e.uri().scheme_str() == Some("https") {
info!("enable tls");
e.tls_config(ClientTlsConfig::new())
} else {
Ok(e)
}
})??;
let channel = endpoint.connect_lazy();
let token: MetadataValue<_> = grpc_config.token.parse()?;
let mut client = GeyserClient::with_interceptor(channel, move |mut req: Request<()>| {
req.metadata_mut().insert("x-token", token.clone());
Ok(req)
});
// If account_ids are provided, snapshot will be gMA. If only program_ids, then only the first id will be snapshot
// TODO: handle this better
if filter_config.program_ids.len() > 1 {
warn!("only one program id is supported for gPA snapshots")
}
let mut accounts = HashMap::new();
accounts.insert(
"client".to_owned(),
SubscribeRequestFilterAccounts {
account: filter_config.account_ids.clone(),
owner: filter_config.program_ids.clone(),
},
);
let mut slots = HashMap::new();
slots.insert("client".to_owned(), SubscribeRequestFilterSlots {});
let blocks = HashMap::new();
let blocks_meta = HashMap::new();
let transactions = HashMap::new();
let request = SubscribeRequest {
accounts,
blocks,
blocks_meta,
slots,
transactions,
};
info!("Going to send request: {:?}", request);
let response = client.subscribe(once(async move { request })).await?;
let mut update_stream = response.into_inner();
// We can't get a snapshot immediately since the finalized snapshot would be for a
// slot in the past and we'd be missing intermediate updates.
//
// Delay the request until the first slot we received all writes for becomes rooted
// to avoid that problem - partially. The rooted slot will still be larger than the
// finalized slot, so add a number of slots as a buffer.
//
// If that buffer isn't sufficient, there'll be a retry.
// The first slot that we will receive _all_ account writes for
let mut first_full_slot: u64 = u64::MAX;
// If a snapshot should be performed when ready.
let mut snapshot_needed = true;
// The highest "rooted" slot that has been seen.
let mut max_rooted_slot = 0;
// Data for slots will arrive out of order. This value defines how many
// slots after a slot was marked "rooted" we assume it'll not receive
// any more account write information.
//
// This is important for the write_version mapping (to know when slots can
// be dropped).
let max_out_of_order_slots = 40;
// Number of slots that we expect "finalized" commitment to lag
// behind "rooted". This matters for getProgramAccounts based snapshots,
// which will have "finalized" commitment.
let mut rooted_to_finalized_slots = 30;
let mut snapshot_gma = future::Fuse::terminated();
let mut snapshot_gpa = future::Fuse::terminated();
// The plugin sends a ping every 10s
let fatal_idle_timeout = Duration::from_secs(60);
// Highest slot that an account write came in for.
let mut newest_write_slot: u64 = 0;
struct WriteVersion {
// Write version seen on-chain
global: u64,
// The per-pubkey per-slot write version
slot: u32,
}
// map slot -> (pubkey -> WriteVersion)
//
// Since the write_version is a private indentifier per node it can't be used
// to deduplicate events from multiple nodes. Here we rewrite it such that each
// pubkey and each slot has a consecutive numbering of writes starting at 1.
//
// That number will be consistent for each node.
let mut slot_pubkey_writes = HashMap::<u64, HashMap<[u8; 32], WriteVersion>>::new();
loop {
tokio::select! {
update = update_stream.next() => {
use geyser::{subscribe_update::UpdateOneof};
let mut update = update.ok_or(anyhow::anyhow!("geyser plugin has closed the stream"))??;
trace!("update={:?}", update);
match update.update_oneof.as_mut().expect("invalid grpc") {
UpdateOneof::Slot(slot_update) => {
let status = slot_update.status;
if status == SubscribeUpdateSlotStatus::Finalized as i32 {
if first_full_slot == u64::MAX {
// TODO: is this equivalent to before? what was highesy_write_slot?
first_full_slot = slot_update.slot + 1;
}
if slot_update.slot > max_rooted_slot {
max_rooted_slot = slot_update.slot;
// drop data for slots that are well beyond rooted
slot_pubkey_writes.retain(|&k, _| k >= max_rooted_slot - max_out_of_order_slots);
}
if snapshot_needed && max_rooted_slot - rooted_to_finalized_slots > first_full_slot {
snapshot_needed = false;
if filter_config.account_ids.len() > 0 {
snapshot_gma = tokio::spawn(get_snapshot_gma(rpc_http_url.clone(), filter_config.account_ids.clone())).fuse();
} else if filter_config.program_ids.len() > 0 {
snapshot_gpa = tokio::spawn(get_snapshot_gpa(rpc_http_url.clone(), filter_config.program_ids[0].clone())).fuse();
}
}
}
},
UpdateOneof::Account(info) => {
if info.slot < first_full_slot {
// Don't try to process data for slots where we may have missed writes:
// We could not map the write_version correctly for them.
continue;
}
if info.slot > newest_write_slot {
newest_write_slot = info.slot;
} else if max_rooted_slot > 0 && info.slot < max_rooted_slot - max_out_of_order_slots {
anyhow::bail!("received write {} slots back from max rooted slot {}", max_rooted_slot - info.slot, max_rooted_slot);
}
let pubkey_writes = slot_pubkey_writes.entry(info.slot).or_default();
let mut write = match info.account.clone() {
Some(x) => x,
None => {
// TODO: handle error
continue;
},
};
let pubkey_bytes: [u8;32] = write.pubkey.try_into().expect("Pubkey be of size 32 bytes");
let write_version_mapping = pubkey_writes.entry(pubkey_bytes).or_insert(WriteVersion {
global: write.write_version,
slot: 1, // write version 0 is reserved for snapshots
});
// We assume we will receive write versions for each pubkey in sequence.
// If this is not the case, logic here does not work correctly because
// a later write could arrive first.
if write.write_version < write_version_mapping.global {
anyhow::bail!("unexpected write version: got {}, expected >= {}", write.write_version, write_version_mapping.global);
}
// Rewrite the update to use the local write version and bump it
write.write_version = write_version_mapping.slot as u64;
write_version_mapping.slot += 1;
},
UpdateOneof::Ping(_) => {},
UpdateOneof::Block(_) => {},
UpdateOneof::BlockMeta(_) => {},
UpdateOneof::Transaction(_) => {},
}
sender.send(Message::GrpcUpdate(update)).await.expect("send success");
},
snapshot = &mut snapshot_gma => {
let snapshot = snapshot??;
info!("snapshot is for slot {}, first full slot was {}", snapshot.context.slot, first_full_slot);
if snapshot.context.slot >= first_full_slot {
let accounts: Vec<(String, Option<UiAccount>)> = filter_config.account_ids.iter().zip(snapshot.value).map(|x| (x.0.clone(), x.1)).collect();
sender
.send(Message::Snapshot(SnapshotData {
accounts,
slot: snapshot.context.slot,
}))
.await
.expect("send success");
} else {
info!(
"snapshot is too old: has slot {}, expected {} minimum",
snapshot.context.slot,
first_full_slot
);
// try again in another 10 slots
snapshot_needed = true;
rooted_to_finalized_slots += 10;
}
},
snapshot = &mut snapshot_gpa => {
let snapshot = snapshot??;
if let OptionalContext::Context(snapshot_data) = snapshot {
info!("snapshot is for slot {}, first full slot was {}", snapshot_data.context.slot, first_full_slot);
if snapshot_data.context.slot >= first_full_slot {
let accounts: Vec<(String, Option<UiAccount>)> = snapshot_data.value.iter().map(|x| {
let deref = x.clone();
(deref.pubkey, Some(deref.account))
}).collect();
sender
.send(Message::Snapshot(SnapshotData {
accounts,
slot: snapshot_data.context.slot,
}))
.await
.expect("send success");
} else {
info!(
"snapshot is too old: has slot {}, expected {} minimum",
snapshot_data.context.slot,
first_full_slot
);
// try again in another 10 slots
snapshot_needed = true;
rooted_to_finalized_slots += 10;
}
} else {
anyhow::bail!("bad snapshot format");
}
},
_ = tokio::time::sleep(fatal_idle_timeout) => {
anyhow::bail!("geyser plugin hasn't sent a message in too long");
}
}
}
}
fn _make_tls_config(config: &TlsConfig) -> ClientTlsConfig {
let server_root_ca_cert = match &config.ca_cert_path.chars().next().unwrap() {
'$' => env::var(&config.ca_cert_path[1..])
.expect("reading server root ca cert from env")
.into_bytes(),
_ => std::fs::read(&config.ca_cert_path).expect("reading server root ca cert from file"),
};
let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
let client_cert = match &config.client_cert_path.chars().next().unwrap() {
'$' => env::var(&config.client_cert_path[1..])
.expect("reading client cert from env")
.into_bytes(),
_ => std::fs::read(&config.client_cert_path).expect("reading client cert from file"),
};
let client_key = match &config.client_key_path.chars().next().unwrap() {
'$' => env::var(&config.client_key_path[1..])
.expect("reading client key from env")
.into_bytes(),
_ => std::fs::read(&config.client_key_path).expect("reading client key from file"),
};
let client_identity = Identity::from_pem(client_cert, client_key);
let domain_name = match &config.domain_name.chars().next().unwrap() {
'$' => env::var(&config.domain_name[1..]).expect("reading domain name from env"),
_ => config.domain_name.clone(),
};
ClientTlsConfig::new()
.ca_certificate(server_root_ca_cert)
.identity(client_identity)
.domain_name(domain_name)
}
pub async fn process_events(
grpc_config: GrpcConfig,
keeper_config: KeeperConfig,
filter_config: &FilterConfig,
account_write_queue_sender: async_channel::Sender<AccountWrite>,
slot_queue_sender: async_channel::Sender<SlotUpdate>,
metrics_sender: Metrics,
) {
// Subscribe to geyser
let (msg_sender, msg_receiver) =
async_channel::bounded::<Message>(grpc_config.dedup_queue_size);
for grpc_source in grpc_config.grpc_sources.clone() {
let msg_sender = msg_sender.clone();
let snapshot_source = keeper_config.clone();
let metrics_sender = metrics_sender.clone();
let filter_config = filter_config.clone();
tokio::spawn(async move {
let mut metric_retries = metrics_sender.register_u64(
format!("grpc_source_{}_connection_retries", grpc_source.name,),
MetricType::Counter,
);
let metric_connected =
metrics_sender.register_bool(format!("grpc_source_{}_status", grpc_source.name));
// Continuously reconnect on failure
loop {
metric_connected.set(true);
let out = feed_data_geyser(
&grpc_source,
&snapshot_source,
&filter_config,
msg_sender.clone(),
);
let result = out.await;
// assert!(result.is_err());
if let Err(err) = result {
warn!(
"error during communication with the geyser plugin. retrying. {:?}",
err
);
}
metric_connected.set(false);
metric_retries.increment();
tokio::time::sleep(std::time::Duration::from_secs(
grpc_source.retry_connection_sleep_secs,
))
.await;
}
});
}
// slot -> (pubkey -> write_version)
//
// To avoid unnecessarily sending requests to SQL, we track the latest write_version
// for each (slot, pubkey). If an already-seen write_version comes in, it can be safely
// discarded.
let mut latest_write = HashMap::<u64, HashMap<[u8; 32], u64>>::new();
// Number of slots to retain in latest_write
let latest_write_retention = 50;
let mut metric_account_writes =
metrics_sender.register_u64("grpc_account_writes".into(), MetricType::Counter);
let mut metric_account_queue =
metrics_sender.register_u64("grpc_account_write_queue".into(), MetricType::Gauge);
let mut metric_dedup_queue =
metrics_sender.register_u64("grpc_dedup_queue".into(), MetricType::Gauge);
let mut metric_slot_queue =
metrics_sender.register_u64("grpc_slot_update_queue".into(), MetricType::Gauge);
let mut metric_slot_updates =
metrics_sender.register_u64("grpc_slot_updates".into(), MetricType::Counter);
let mut metric_snapshots =
metrics_sender.register_u64("grpc_snapshots".into(), MetricType::Counter);
let mut metric_snapshot_account_writes =
metrics_sender.register_u64("grpc_snapshot_account_writes".into(), MetricType::Counter);
loop {
metric_dedup_queue.set(msg_receiver.len() as u64);
let msg = msg_receiver.recv().await.expect("sender must not close");
use geyser::subscribe_update::UpdateOneof;
match msg {
Message::GrpcUpdate(update) => {
match update.update_oneof.expect("invalid grpc") {
UpdateOneof::Account(info) => {
let update = match info.account.clone() {
Some(x) => x,
None => {
// TODO: handle error
continue;
}
};
assert!(update.pubkey.len() == 32);
assert!(update.owner.len() == 32);
metric_account_writes.increment();
metric_account_queue.set(account_write_queue_sender.len() as u64);
// Skip writes that a different server has already sent
let pubkey_writes = latest_write.entry(info.slot).or_default();
let pubkey_bytes: [u8; 32] = update
.pubkey
.clone()
.try_into()
.expect("Pubkey should be of size 32 bytes");
let writes = pubkey_writes.entry(pubkey_bytes).or_insert(0);
if update.write_version <= *writes {
continue;
}
*writes = update.write_version;
latest_write.retain(|&k, _| k >= info.slot - latest_write_retention);
// let mut uncompressed: Vec<u8> = Vec::new();
// zstd_decompress(&update.data, &mut uncompressed).unwrap();
account_write_queue_sender
.send(AccountWrite {
pubkey: Pubkey::new_from_array(
update
.pubkey
.try_into()
.expect("Pubkey be of size 32 bytes"),
),
slot: info.slot,
write_version: update.write_version,
lamports: update.lamports,
owner: Pubkey::new_from_array(
update.owner.try_into().expect("Pubkey be of size 32 bytes"),
),
executable: update.executable,
rent_epoch: update.rent_epoch,
data: update.data,
// TODO: what should this be? related to account deletes?
is_selected: true,
})
.await
.expect("send success");
}
UpdateOneof::Slot(update) => {
metric_slot_updates.increment();
metric_slot_queue.set(slot_queue_sender.len() as u64);
let status =
SubscribeUpdateSlotStatus::from_i32(update.status).map(|v| match v {
SubscribeUpdateSlotStatus::Processed => SlotStatus::Processed,
SubscribeUpdateSlotStatus::Confirmed => SlotStatus::Confirmed,
SubscribeUpdateSlotStatus::Finalized => SlotStatus::Rooted,
});
if status.is_none() {
error!("unexpected slot status: {}", update.status);
continue;
}
let slot_update = SlotUpdate {
slot: update.slot,
parent: update.parent,
status: status.expect("qed"),
};
slot_queue_sender
.send(slot_update)
.await
.expect("send success");
}
UpdateOneof::Ping(_) => {}
UpdateOneof::Block(_) => {}
UpdateOneof::BlockMeta(_) => {}
UpdateOneof::Transaction(_) => {}
}
}
Message::Snapshot(update) => {
metric_snapshots.increment();
info!("processing snapshot...");
for account in update.accounts.iter() {
metric_snapshot_account_writes.increment();
metric_account_queue.set(account_write_queue_sender.len() as u64);
match account {
(key, Some(ui_account)) => {
// TODO: Resnapshot on invalid data?
let pubkey = Pubkey::from_str(key).unwrap();
let account: Account = ui_account.decode().unwrap();
account_write_queue_sender
.send(AccountWrite::from(pubkey, update.slot, 0, account))
.await
.expect("send success");
}
(key, None) => warn!("account not found {}", key),
}
}
info!("processing snapshot done");
}
}
}
}

View File

@ -207,11 +207,11 @@ pub fn start_blockhash_polling_service(
pub fn get_mango_market_perps_cache(
rpc_client: Arc<RpcClient>,
mango_group_config: &GroupConfig,
mango_program_pk: &Pubkey,
) -> Vec<PerpMarketCache> {
// fetch group
let mango_group_pk = Pubkey::from_str(mango_group_config.public_key.as_str()).unwrap();
let mango_group = load_from_rpc::<MangoGroup>(&rpc_client, &mango_group_pk);
let mango_program_pk = Pubkey::from_str(mango_group_config.mango_program_id.as_str()).unwrap();
let mango_cache_pk = Pubkey::from_str(mango_group.mango_cache.to_string().as_str()).unwrap();
let mango_cache = load_from_rpc::<MangoCache>(&rpc_client, &mango_cache_pk);
@ -259,7 +259,7 @@ pub fn get_mango_market_perps_cache(
order_base_lots,
price,
price_quote_lots,
mango_program_pk,
mango_program_pk: mango_program_pk.clone(),
mango_group_pk,
mango_cache_pk,
perp_market_pk,

View File

@ -1,8 +1,27 @@
pub mod account_write_filter;
pub mod chain_data;
pub mod cli;
pub mod confirmation_strategies;
pub mod crank;
pub mod grpc_plugin_source;
pub mod helpers;
pub mod keeper;
pub mod mango;
pub mod mango_v3_perp_crank_sink;
pub mod market_markers;
pub mod metrics;
pub mod rotating_queue;
pub mod states;
pub mod websocket_source;
trait AnyhowWrap {
type Value;
fn map_err_anyhow(self) -> anyhow::Result<Self::Value>;
}
impl<T, E: std::fmt::Debug> AnyhowWrap for Result<T, E> {
type Value = T;
fn map_err_anyhow(self) -> anyhow::Result<Self::Value> {
self.map_err(|err| anyhow::anyhow!("{:?}", err))
}
}

View File

@ -4,24 +4,24 @@ use {
solana_bench_mango::{
cli,
confirmation_strategies::confirmations_by_blocks,
crank,
helpers::{
get_latest_blockhash, get_mango_market_perps_cache, start_blockhash_polling_service,
write_block_data_into_csv, write_transaction_data_into_csv,
to_sdk_pk, write_block_data_into_csv, write_transaction_data_into_csv,
},
keeper::start_keepers,
mango::{AccountKeys, MangoConfig},
market_markers::start_market_making_threads,
states::{BlockData, PerpMarketCache, TransactionConfirmRecord, TransactionSendRecord},
websocket_source::KeeperConfig,
},
solana_metrics::{datapoint_info},
solana_client::{
connection_cache::ConnectionCache, rpc_client::RpcClient, tpu_client::TpuClient,
},
solana_metrics::datapoint_info,
solana_program::pubkey::Pubkey,
solana_sdk::commitment_config::CommitmentConfig,
std::{
thread::sleep,
time::Duration,
fs,
net::{IpAddr, Ipv4Addr},
str::FromStr,
@ -29,7 +29,9 @@ use {
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, RwLock,
},
thread::sleep,
thread::{Builder, JoinHandle},
time::Duration,
},
};
@ -49,14 +51,27 @@ impl MangoBencherStats {
("num_confirmed_txs", self.num_confirmed_txs, i64),
("num_error_txs", self.num_error_txs, i64),
("num_timeout_txs", self.num_timeout_txs, i64),
("percent_confirmed_txs", (self.num_confirmed_txs * 100)/self.recv_limit, i64),
("percent_error_txs", (self.num_error_txs * 100)/self.recv_limit, i64),
("percent_timeout_txs", (self.num_timeout_txs * 100)/self.recv_limit, i64),
(
"percent_confirmed_txs",
(self.num_confirmed_txs * 100) / self.recv_limit,
i64
),
(
"percent_error_txs",
(self.num_error_txs * 100) / self.recv_limit,
i64
),
(
"percent_timeout_txs",
(self.num_timeout_txs * 100) / self.recv_limit,
i64
),
);
}
}
fn main() {
#[tokio::main]
async fn main() {
solana_logger::setup_with_default("solana=info");
solana_metrics::set_panic_hook("bench-mango", /*version:*/ None);
@ -157,9 +172,9 @@ fn main() {
current_slot.clone(),
rpc_client.clone(),
);
let mango_program_pk = Pubkey::from_str(mango_group_config.mango_program_id.as_str()).unwrap();
let perp_market_caches: Vec<PerpMarketCache> =
get_mango_market_perps_cache(rpc_client.clone(), &mango_group_config);
get_mango_market_perps_cache(rpc_client.clone(), mango_group_config, &mango_program_pk);
let quote_root_bank =
Pubkey::from_str(mango_group_config.tokens.last().unwrap().root_key.as_str()).unwrap();
@ -189,11 +204,27 @@ fn main() {
let (tx_record_sx, tx_record_rx) = crossbeam_channel::unbounded();
let from_slot = current_slot.load(Ordering::Relaxed);
let keeper_config = KeeperConfig {
program_id: to_sdk_pk(&mango_program_pk),
rpc_url: json_rpc_url.clone(),
websocket_url: websocket_url.clone(),
};
crank::start(
keeper_config,
tx_record_sx.clone(),
exit_signal.clone(),
blockhash.clone(),
current_slot.clone(),
tpu_client.clone(),
mango_group_config,
id,
);
let mm_threads: Vec<JoinHandle<()>> = start_market_making_threads(
account_keys_parsed.clone(),
perp_market_caches.clone(),
tx_record_sx,
tx_record_sx.clone(),
exit_signal.clone(),
blockhash.clone(),
current_slot.clone(),
@ -204,6 +235,7 @@ fn main() {
*priority_fees_proba,
number_of_markers_per_mm,
);
let duration = duration.clone();
let quotes_per_second = quotes_per_second.clone();
let account_keys_parsed = account_keys_parsed.clone();

View File

@ -26,6 +26,7 @@ pub struct MangoConfig {
pub struct GroupConfig {
pub name: String,
pub public_key: String,
pub cache_key: String,
pub mango_program_id: String,
pub serum_program_id: String,
pub oracles: Vec<OracleConfig>,

View File

@ -0,0 +1,144 @@
use std::{cell::RefCell, collections::BTreeMap, convert::TryFrom, mem::size_of};
use arrayref::array_ref;
use async_trait::async_trait;
use crossbeam_channel::Sender;
use log::*;
use mango::{
instruction::consume_events,
queue::{AnyEvent, EventQueueHeader, EventType, FillEvent, OutEvent, Queue},
};
use solana_sdk::account::ReadableAccount;
use solana_sdk::{instruction::Instruction, pubkey::Pubkey};
use bytemuck::cast_ref;
use crate::{
account_write_filter::AccountWriteSink,
chain_data::AccountData,
helpers::{to_sdk_instruction, to_sp_pk},
};
const MAX_BACKLOG: usize = 2;
const MAX_EVENTS_PER_TX: usize = 10;
pub struct MangoV3PerpCrankSink {
mkt_pks_by_evq_pks: BTreeMap<Pubkey, Pubkey>,
group_pk: Pubkey,
cache_pk: Pubkey,
mango_v3_program: Pubkey,
instruction_sender: Sender<Vec<Instruction>>,
}
impl MangoV3PerpCrankSink {
pub fn new(
pks: Vec<(Pubkey, Pubkey)>,
group_pk: Pubkey,
cache_pk: Pubkey,
mango_v3_program: Pubkey,
instruction_sender: Sender<Vec<Instruction>>,
) -> Self {
Self {
mkt_pks_by_evq_pks: pks
.iter()
.map(|(mkt_pk, evq_pk)| (evq_pk.clone(), mkt_pk.clone()))
.collect(),
group_pk,
cache_pk,
mango_v3_program,
instruction_sender,
}
}
}
// couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue
const EVENT_SIZE: usize = 200; //size_of::<AnyEvent>();
const QUEUE_LEN: usize = 256;
type EventQueueEvents = [AnyEvent; QUEUE_LEN];
#[async_trait]
impl AccountWriteSink for MangoV3PerpCrankSink {
async fn process(&self, pk: &Pubkey, account: &AccountData) -> Result<(), String> {
let account = &account.account;
let ix: Result<Instruction, String> = {
const HEADER_SIZE: usize = size_of::<EventQueueHeader>();
let header_data = array_ref![account.data(), 0, HEADER_SIZE];
let header = RefCell::<EventQueueHeader>::new(*bytemuck::from_bytes(header_data));
let seq_num = header.clone().into_inner().seq_num.clone();
// trace!("evq {} seq_num {}", mkt.name, header.seq_num);
const QUEUE_SIZE: usize = EVENT_SIZE * QUEUE_LEN;
let events_data = array_ref![account.data(), HEADER_SIZE, QUEUE_SIZE];
let events = RefCell::<EventQueueEvents>::new(*bytemuck::from_bytes(events_data));
let event_queue = Queue {
header: header.borrow_mut(),
buf: events.borrow_mut(),
};
// only crank if at least 1 fill or a sufficient events of other categories are buffered
let contains_fill_events = event_queue
.iter()
.find(|e| e.event_type == EventType::Fill as u8)
.is_some();
let len = event_queue.iter().count();
let has_backlog = len > MAX_BACKLOG;
debug!("evq {pk:?} seq_num={seq_num} len={len} contains_fill_events={contains_fill_events} has_backlog={has_backlog}");
if !contains_fill_events && !has_backlog {
return Err("throttled".into());
}
trace!("evq {pk:?} seq_num={seq_num} len={len} contains_fill_events={contains_fill_events} has_backlog={has_backlog}");
let mut mango_accounts: Vec<_> = event_queue
.iter()
.take(MAX_EVENTS_PER_TX)
.flat_map(
|e| match EventType::try_from(e.event_type).expect("mango v4 event") {
EventType::Fill => {
let fill: &FillEvent = cast_ref(e);
vec![fill.maker, fill.taker]
}
EventType::Out => {
let out: &OutEvent = cast_ref(e);
vec![out.owner]
}
EventType::Liquidate => vec![],
},
)
.collect();
let mkt_pk = self
.mkt_pks_by_evq_pks
.get(pk)
.expect(&format!("{pk:?} is a known public key"));
let ix = to_sdk_instruction(
consume_events(
&to_sp_pk(&self.mango_v3_program),
&to_sp_pk(&self.group_pk),
&to_sp_pk(&self.cache_pk),
&to_sp_pk(mkt_pk),
&to_sp_pk(pk),
&mut mango_accounts,
MAX_EVENTS_PER_TX,
)
.unwrap(),
);
Ok(ix)
};
// info!(
// "evq={pk:?} count={} limit=10",
// event_queue.iter().count()
// );
if let Err(e) = self.instruction_sender.send(vec![ix?]) {
return Err(e.to_string());
}
Ok(())
}
}

View File

@ -292,6 +292,10 @@ pub fn start_market_making_threads(
prioritization_fee_proba: u8,
number_of_markers_per_mm: u8,
) -> Vec<JoinHandle<()>> {
let warmup_duration = Duration::from_secs(10);
info!("waiting for keepers to warmup for {warmup_duration:?}");
sleep(warmup_duration);
let mut rng = rand::thread_rng();
account_keys_parsed
.iter()
@ -308,11 +312,10 @@ pub fn start_market_making_threads(
let tpu_client = tpu_client.clone();
info!(
"wallet:{:?} https://testnet.mango.markets/account?pubkey={:?}",
"wallet: {:?} mango account: {:?}",
mango_account_signer.pubkey(),
mango_account_pk
);
//sleep(Duration::from_secs(10));
let tx_record_sx = tx_record_sx.clone();
let perp_market_caches = perp_market_caches
.choose_multiple(&mut rng, number_of_markers_per_mm as usize)

337
src/metrics.rs Normal file
View File

@ -0,0 +1,337 @@
use serde::Deserialize;
use {
log::*,
std::collections::HashMap,
std::fmt,
std::sync::{atomic, Arc, Mutex, RwLock},
tokio::time,
warp::{Filter, Rejection, Reply},
};
#[derive(Debug)]
enum Value {
U64 {
value: Arc<atomic::AtomicU64>,
metric_type: MetricType,
},
I64 {
value: Arc<atomic::AtomicI64>,
metric_type: MetricType,
},
Bool {
value: Arc<Mutex<bool>>,
metric_type: MetricType,
},
}
#[derive(Debug, Clone)]
pub enum MetricType {
Counter,
Gauge,
}
impl fmt::Display for MetricType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
MetricType::Counter => {
write!(f, "counter")
}
MetricType::Gauge => {
write!(f, "gauge")
}
}
}
}
#[derive(Debug)]
enum PrevValue {
U64(u64),
I64(i64),
Bool(bool),
}
#[derive(Clone)]
pub struct MetricU64 {
value: Arc<atomic::AtomicU64>,
}
impl MetricU64 {
pub fn value(&self) -> u64 {
self.value.load(atomic::Ordering::Acquire)
}
pub fn set(&mut self, value: u64) {
self.value.store(value, atomic::Ordering::Release);
}
pub fn set_max(&mut self, value: u64) {
self.value.fetch_max(value, atomic::Ordering::AcqRel);
}
pub fn add(&mut self, value: u64) {
self.value.fetch_add(value, atomic::Ordering::AcqRel);
}
pub fn increment(&mut self) {
self.value.fetch_add(1, atomic::Ordering::AcqRel);
}
pub fn decrement(&mut self) {
self.value.fetch_sub(1, atomic::Ordering::AcqRel);
}
}
#[derive(Clone)]
pub struct MetricI64 {
value: Arc<atomic::AtomicI64>,
}
impl MetricI64 {
pub fn set(&mut self, value: i64) {
self.value.store(value, atomic::Ordering::Release);
}
pub fn increment(&mut self) {
self.value.fetch_add(1, atomic::Ordering::AcqRel);
}
pub fn decrement(&mut self) {
self.value.fetch_sub(1, atomic::Ordering::AcqRel);
}
}
#[derive(Clone)]
pub struct MetricBool {
value: Arc<Mutex<bool>>,
}
impl MetricBool {
pub fn set(&self, value: bool) {
*self.value.lock().unwrap() = value;
}
}
#[derive(Clone)]
pub struct Metrics {
registry: Arc<RwLock<HashMap<String, Value>>>,
labels: HashMap<String, String>,
}
impl Metrics {
pub fn register_u64(&self, name: String, metric_type: MetricType) -> MetricU64 {
let mut registry = self.registry.write().unwrap();
let value = registry.entry(name).or_insert(Value::U64 {
value: Arc::new(atomic::AtomicU64::new(0)),
metric_type: metric_type,
});
MetricU64 {
value: match value {
Value::U64 {
value: v,
metric_type: _,
} => v.clone(),
_ => panic!("bad metric type"),
},
}
}
pub fn register_i64(&self, name: String, metric_type: MetricType) -> MetricI64 {
let mut registry = self.registry.write().unwrap();
let value = registry.entry(name).or_insert(Value::I64 {
value: Arc::new(atomic::AtomicI64::new(0)),
metric_type: metric_type,
});
MetricI64 {
value: match value {
Value::I64 {
value: v,
metric_type: _,
} => v.clone(),
_ => panic!("bad metric type"),
},
}
}
pub fn register_bool(&self, name: String) -> MetricBool {
let mut registry = self.registry.write().unwrap();
let value = registry.entry(name).or_insert(Value::Bool {
value: Arc::new(Mutex::new(false)),
metric_type: MetricType::Gauge,
});
MetricBool {
value: match value {
Value::Bool {
value: v,
metric_type: _,
} => v.clone(),
_ => panic!("bad metric type"),
},
}
}
pub fn get_registry_vec(&self) -> Vec<(String, String, String)> {
let mut vec: Vec<(String, String, String)> = Vec::new();
let metrics = self.registry.read().unwrap();
for (name, value) in metrics.iter() {
let (value_str, type_str) = match value {
Value::U64 {
value: v,
metric_type: t,
} => (
format!("{}", v.load(atomic::Ordering::Acquire)),
t.to_string(),
),
Value::I64 {
value: v,
metric_type: t,
} => (
format!("{}", v.load(atomic::Ordering::Acquire)),
t.to_string(),
),
Value::Bool {
value: v,
metric_type: t,
} => {
let bool_to_int = if *v.lock().unwrap() { 1 } else { 0 };
(format!("{}", bool_to_int), t.to_string())
}
};
vec.push((name.clone(), value_str, type_str));
}
vec
}
}
async fn handle_prometheus_poll(metrics: Metrics) -> Result<impl Reply, Rejection> {
debug!("handle_prometheus_poll");
let label_strings_vec: Vec<String> = metrics
.labels
.iter()
.map(|(name, value)| format!("{}=\"{}\"", name, value))
.collect();
let lines: Vec<String> = metrics
.get_registry_vec()
.iter()
.map(|(name, value, type_name)| {
let sanitized_name = str::replace(name, "-", "_");
format!(
"# HELP {} \n# TYPE {} {}\n{}{{{}}} {}",
sanitized_name,
sanitized_name,
type_name,
sanitized_name,
label_strings_vec.join(","),
value
)
})
.collect();
Ok(format!("{}\n", lines.join("\n")))
}
pub fn with_metrics(
metrics: Metrics,
) -> impl Filter<Extract = (Metrics,), Error = std::convert::Infallible> + Clone {
warp::any().map(move || metrics.clone())
}
#[derive(Clone, Debug, Deserialize)]
pub struct MetricsConfig {
pub output_stdout: bool,
pub output_http: bool,
// TODO: add configurable port and endpoint url
// TODO: add configurable write interval
}
pub fn start(config: MetricsConfig, process_name: String) -> Metrics {
let mut write_interval = time::interval(time::Duration::from_secs(60));
let registry = Arc::new(RwLock::new(HashMap::<String, Value>::new()));
let registry_c = Arc::clone(&registry);
let labels = HashMap::from([(String::from("process"), process_name)]);
let metrics_tx = Metrics { registry, labels };
let metrics_route = warp::path!("metrics")
.and(with_metrics(metrics_tx.clone()))
.and_then(handle_prometheus_poll);
if config.output_http {
// serve prometheus metrics endpoint
tokio::spawn(async move {
warp::serve(metrics_route).run(([0, 0, 0, 0], 9091)).await;
});
}
if config.output_stdout {
// periodically log to stdout
tokio::spawn(async move {
let mut previous_values = HashMap::<String, PrevValue>::new();
loop {
write_interval.tick().await;
// Nested locking! Safe because the only other user locks registry for writing and doesn't
// acquire any interior locks.
let metrics = registry_c.read().unwrap();
for (name, value) in metrics.iter() {
let previous_value = previous_values.get_mut(name);
match value {
Value::U64 {
value: v,
metric_type: _,
} => {
let new_value = v.load(atomic::Ordering::Acquire);
let previous_value = if let Some(PrevValue::U64(v)) = previous_value {
let prev = *v;
*v = new_value;
prev
} else {
previous_values.insert(name.clone(), PrevValue::U64(new_value));
0
};
let diff = new_value.wrapping_sub(previous_value) as i64;
info!("metric: {}: {} ({:+})", name, new_value, diff);
}
Value::I64 {
value: v,
metric_type: _,
} => {
let new_value = v.load(atomic::Ordering::Acquire);
let previous_value = if let Some(PrevValue::I64(v)) = previous_value {
let prev = *v;
*v = new_value;
prev
} else {
previous_values.insert(name.clone(), PrevValue::I64(new_value));
0
};
let diff = new_value - previous_value;
info!("metric: {}: {} ({:+})", name, new_value, diff);
}
Value::Bool {
value: v,
metric_type: _,
} => {
let new_value = v.lock().unwrap();
let previous_value = if let Some(PrevValue::Bool(v)) = previous_value {
let mut prev = new_value.clone();
std::mem::swap(&mut prev, v);
prev
} else {
previous_values
.insert(name.clone(), PrevValue::Bool(new_value.clone()));
false
};
if *new_value == previous_value {
info!("metric: {}: {} (unchanged)", name, &*new_value);
} else {
info!(
"metric: {}: {} (before: {})",
name, &*new_value, previous_value
);
}
}
}
}
}
});
}
metrics_tx
}

218
src/websocket_source.rs Normal file
View File

@ -0,0 +1,218 @@
use jsonrpc_core::futures::StreamExt;
use jsonrpc_core_client::transports::{http, ws};
use solana_account_decoder::UiAccountEncoding;
use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_response::{OptionalContext, Response, RpcKeyedAccount},
};
use solana_rpc::{rpc::rpc_accounts_scan::AccountsScanClient, rpc_pubsub::RpcSolPubSubClient};
use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey};
use log::*;
use std::{
collections::HashSet,
str::FromStr,
sync::Arc,
time::{Duration, Instant},
};
use crate::{
chain_data::{AccountWrite, SlotStatus, SlotUpdate},
grpc_plugin_source::FilterConfig,
AnyhowWrap,
};
enum WebsocketMessage {
SingleUpdate(Response<RpcKeyedAccount>),
SnapshotUpdate(Response<Vec<RpcKeyedAccount>>),
SlotUpdate(Arc<solana_client::rpc_response::SlotUpdate>),
}
#[derive(Debug, Clone)]
pub struct KeeperConfig {
pub program_id: Pubkey,
pub rpc_url: String,
pub websocket_url: String,
}
// TODO: the reconnecting should be part of this
async fn feed_data(
config: KeeperConfig,
_filter_config: &FilterConfig,
sender: async_channel::Sender<WebsocketMessage>,
) -> anyhow::Result<()> {
let program_id = config.program_id;
let snapshot_duration = Duration::from_secs(10);
info!("feed_data {config:?}");
let connect = ws::try_connect::<RpcSolPubSubClient>(&config.websocket_url).map_err_anyhow()?;
let client: RpcSolPubSubClient = connect.await.map_err_anyhow()?;
let rpc_client = http::connect_with_options::<AccountsScanClient>(&config.rpc_url, true)
.await
.map_err_anyhow()?;
let account_info_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::processed()),
data_slice: None,
min_context_slot: None,
};
let program_accounts_config = RpcProgramAccountsConfig {
filters: None,
with_context: Some(true),
account_config: account_info_config.clone(),
};
let mut update_sub = client
.program_subscribe(
program_id.to_string(),
Some(program_accounts_config.clone()),
)
.map_err_anyhow()?;
let mut slot_sub = client.slots_updates_subscribe().map_err_anyhow()?;
let mut last_snapshot = Instant::now() - snapshot_duration;
loop {
// occasionally cause a new snapshot to be produced
// including the first time
if last_snapshot + snapshot_duration <= Instant::now() {
let account_snapshot = rpc_client
.get_program_accounts(
program_id.to_string(),
Some(program_accounts_config.clone()),
)
.await
.map_err_anyhow()?;
if let OptionalContext::Context(account_snapshot_response) = account_snapshot {
info!("snapshot");
sender
.send(WebsocketMessage::SnapshotUpdate(account_snapshot_response))
.await
.expect("sending must succeed");
}
last_snapshot = Instant::now();
}
tokio::select! {
account = update_sub.next() => {
trace!("account {account:?}");
match account {
Some(account) => {
sender.send(WebsocketMessage::SingleUpdate(account.map_err_anyhow()?)).await.expect("sending must succeed");
},
None => {
warn!("account stream closed");
return Ok(());
},
}
},
slot_update = slot_sub.next() => {
trace!("slot {slot_update:?}");
match slot_update {
Some(slot_update) => {
sender.send(WebsocketMessage::SlotUpdate(slot_update.map_err_anyhow()?)).await.expect("sending must succeed");
},
None => {
warn!("slot update stream closed");
return Ok(());
},
}
},
_ = tokio::time::sleep(Duration::from_secs(60)) => {
warn!("websocket timeout");
return Ok(())
}
}
}
}
// TODO: rename / split / rework
pub async fn process_events(
config: KeeperConfig,
filter_config: &FilterConfig,
account_write_queue_sender: async_channel::Sender<AccountWrite>,
slot_queue_sender: async_channel::Sender<SlotUpdate>,
) {
let account_wl = HashSet::<String>::from_iter(filter_config.account_ids.iter().cloned());
// Subscribe to program account updates websocket
let (update_sender, update_receiver) = async_channel::unbounded::<WebsocketMessage>();
let filter_config = filter_config.clone();
tokio::spawn(async move {
// if the websocket disconnects, we get no data in a while etc, reconnect and try again
loop {
let out = feed_data(config.clone(), &filter_config, update_sender.clone());
let res = out.await;
info!("loop {res:?}");
}
});
// The thread that pulls updates and forwards them to the account write queue
loop {
let update = update_receiver.recv().await.unwrap();
match update {
WebsocketMessage::SingleUpdate(update) => {
if !account_wl.is_empty() && !account_wl.contains(&update.value.pubkey) {
continue;
}
trace!("single update");
let account: Account = update.value.account.decode().unwrap();
let pubkey = Pubkey::from_str(&update.value.pubkey).unwrap();
account_write_queue_sender
.send(AccountWrite::from(pubkey, update.context.slot, 0, account))
.await
.expect("send success");
}
WebsocketMessage::SnapshotUpdate(update) => {
trace!("snapshot update");
for keyed_account in update.value {
if !account_wl.is_empty() && !account_wl.contains(&keyed_account.pubkey) {
continue;
}
let account: Account = keyed_account.account.decode().unwrap();
let pubkey = Pubkey::from_str(&keyed_account.pubkey).unwrap();
account_write_queue_sender
.send(AccountWrite::from(pubkey, update.context.slot, 0, account))
.await
.expect("send success");
}
}
WebsocketMessage::SlotUpdate(update) => {
trace!("slot update");
let message = match *update {
solana_client::rpc_response::SlotUpdate::CreatedBank {
slot, parent, ..
} => Some(SlotUpdate {
slot,
parent: Some(parent),
status: SlotStatus::Processed,
}),
solana_client::rpc_response::SlotUpdate::OptimisticConfirmation {
slot,
..
} => Some(SlotUpdate {
slot,
parent: None,
status: SlotStatus::Confirmed,
}),
solana_client::rpc_response::SlotUpdate::Root { slot, .. } => {
Some(SlotUpdate {
slot,
parent: None,
status: SlotStatus::Rooted,
})
}
_ => None,
};
if let Some(message) = message {
slot_queue_sender.send(message).await.expect("send success");
}
}
}
}
}