wip: add geyser crank
This commit is contained in:
parent
d3acc5ac4f
commit
02acb24ee5
File diff suppressed because it is too large
Load Diff
21
Cargo.toml
21
Cargo.toml
|
@ -40,10 +40,27 @@ solana-version = { git = "https://github.com/solana-labs/solana.git", branch="v1
|
||||||
solana-logger = { git = "https://github.com/solana-labs/solana.git", branch="v1.15" }
|
solana-logger = { git = "https://github.com/solana-labs/solana.git", branch="v1.15" }
|
||||||
solana-transaction-status = { git = "https://github.com/solana-labs/solana.git", branch="v1.15" }
|
solana-transaction-status = { git = "https://github.com/solana-labs/solana.git", branch="v1.15" }
|
||||||
solana-quic-client = { git = "https://github.com/solana-labs/solana.git", branch="v1.15" }
|
solana-quic-client = { git = "https://github.com/solana-labs/solana.git", branch="v1.15" }
|
||||||
|
solana-account-decoder = { git = "https://github.com/solana-labs/solana.git", branch="v1.15" }
|
||||||
|
|
||||||
|
|
||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
solana-program = ">=1.9.0"
|
solana-program = ">=1.9.0"
|
||||||
csv = "1.0.0"
|
csv = "1.0.0"
|
||||||
|
tonic = { version = "0.6", features = ["tls", "compression"] }
|
||||||
|
tokio = { version = "1", features = ["full"] }
|
||||||
|
anyhow = "1.0"
|
||||||
|
async-channel = "1.6"
|
||||||
|
async-trait = "0.1"
|
||||||
|
prost = "0.9"
|
||||||
|
warp = "0.3"
|
||||||
|
futures = "0.3.17"
|
||||||
|
jsonrpc-core = "18.0.0"
|
||||||
|
jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] }
|
||||||
|
arrayref = "*"
|
||||||
|
bytemuck = "1.7.2"
|
||||||
|
|
||||||
|
[build-dependencies]
|
||||||
|
tonic-build = { version = "0.6", features = ["compression"] }
|
||||||
|
|
||||||
[package.metadata.docs.rs]
|
[package.metadata.docs.rs]
|
||||||
targets = ["x86_64-unknown-linux-gnu"]
|
targets = ["x86_64-unknown-linux-gnu"]
|
||||||
|
@ -52,3 +69,7 @@ exclude = [
|
||||||
"deps/solana",
|
"deps/solana",
|
||||||
"deps/mango-v3",
|
"deps/mango-v3",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[patch.crates-io]
|
||||||
|
# for gzip encoded responses
|
||||||
|
jsonrpc-core-client = { git = "https://github.com/ckamm/jsonrpc.git", branch = "ckamm/http-with-gzip" }
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
fn main() {
|
||||||
|
tonic_build::compile_protos("./proto/geyser.proto")
|
||||||
|
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
|
||||||
|
}
|
|
@ -0,0 +1,92 @@
|
||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
import public "solana-storage-v1.10.40.proto";
|
||||||
|
|
||||||
|
package geyser;
|
||||||
|
|
||||||
|
service Geyser {
|
||||||
|
rpc Subscribe(stream SubscribeRequest) returns (stream SubscribeUpdate) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
message SubscribeRequest {
|
||||||
|
map<string, SubscribeRequestFilterAccounts> accounts = 1;
|
||||||
|
map<string, SubscribeRequestFilterSlots> slots = 2;
|
||||||
|
map<string, SubscribeRequestFilterTransactions> transactions = 3;
|
||||||
|
map<string, SubscribeRequestFilterBlocks> blocks = 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
message SubscribeRequestFilterAccounts {
|
||||||
|
repeated string account = 2;
|
||||||
|
repeated string owner = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message SubscribeRequestFilterSlots {}
|
||||||
|
|
||||||
|
message SubscribeRequestFilterTransactions {
|
||||||
|
optional bool vote = 1;
|
||||||
|
optional bool failed = 2;
|
||||||
|
repeated string account_include = 3;
|
||||||
|
repeated string account_exclude = 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
message SubscribeRequestFilterBlocks {}
|
||||||
|
|
||||||
|
message SubscribeUpdate {
|
||||||
|
repeated string filters = 1;
|
||||||
|
oneof update_oneof {
|
||||||
|
SubscribeUpdateAccount account = 2;
|
||||||
|
SubscribeUpdateSlot slot = 3;
|
||||||
|
SubscribeUpdateTransaction transaction = 4;
|
||||||
|
SubscribeUpdateBlock block = 5;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
message SubscribeUpdateAccount {
|
||||||
|
SubscribeUpdateAccountInfo account = 1;
|
||||||
|
uint64 slot = 2;
|
||||||
|
bool is_startup = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message SubscribeUpdateAccountInfo {
|
||||||
|
bytes pubkey = 1;
|
||||||
|
uint64 lamports = 2;
|
||||||
|
bytes owner = 3;
|
||||||
|
bool executable = 4;
|
||||||
|
uint64 rent_epoch = 5;
|
||||||
|
bytes data = 6;
|
||||||
|
uint64 write_version = 7;
|
||||||
|
// bytes txn_signature = 8;
|
||||||
|
}
|
||||||
|
|
||||||
|
message SubscribeUpdateSlot {
|
||||||
|
uint64 slot = 1;
|
||||||
|
optional uint64 parent = 2;
|
||||||
|
SubscribeUpdateSlotStatus status = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
enum SubscribeUpdateSlotStatus {
|
||||||
|
PROCESSED = 0;
|
||||||
|
CONFIRMED = 1;
|
||||||
|
FINALIZED = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message SubscribeUpdateTransaction {
|
||||||
|
SubscribeUpdateTransactionInfo transaction = 1;
|
||||||
|
uint64 slot = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message SubscribeUpdateTransactionInfo {
|
||||||
|
bytes signature = 1;
|
||||||
|
bool is_vote = 2;
|
||||||
|
solana.storage.ConfirmedBlock.Transaction transaction = 3;
|
||||||
|
solana.storage.ConfirmedBlock.TransactionStatusMeta meta = 4;
|
||||||
|
// uint64 index = 5;
|
||||||
|
}
|
||||||
|
|
||||||
|
message SubscribeUpdateBlock {
|
||||||
|
uint64 slot = 1;
|
||||||
|
string blockhash = 2;
|
||||||
|
solana.storage.ConfirmedBlock.Rewards rewards = 3;
|
||||||
|
solana.storage.ConfirmedBlock.UnixTimestamp block_time = 4;
|
||||||
|
solana.storage.ConfirmedBlock.BlockHeight block_height = 5;
|
||||||
|
}
|
|
@ -0,0 +1,118 @@
|
||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
package solana.storage.ConfirmedBlock;
|
||||||
|
|
||||||
|
message ConfirmedBlock {
|
||||||
|
string previous_blockhash = 1;
|
||||||
|
string blockhash = 2;
|
||||||
|
uint64 parent_slot = 3;
|
||||||
|
repeated ConfirmedTransaction transactions = 4;
|
||||||
|
repeated Reward rewards = 5;
|
||||||
|
UnixTimestamp block_time = 6;
|
||||||
|
BlockHeight block_height = 7;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ConfirmedTransaction {
|
||||||
|
Transaction transaction = 1;
|
||||||
|
TransactionStatusMeta meta = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message Transaction {
|
||||||
|
repeated bytes signatures = 1;
|
||||||
|
Message message = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message Message {
|
||||||
|
MessageHeader header = 1;
|
||||||
|
repeated bytes account_keys = 2;
|
||||||
|
bytes recent_blockhash = 3;
|
||||||
|
repeated CompiledInstruction instructions = 4;
|
||||||
|
bool versioned = 5;
|
||||||
|
repeated MessageAddressTableLookup address_table_lookups = 6;
|
||||||
|
}
|
||||||
|
|
||||||
|
message MessageHeader {
|
||||||
|
uint32 num_required_signatures = 1;
|
||||||
|
uint32 num_readonly_signed_accounts = 2;
|
||||||
|
uint32 num_readonly_unsigned_accounts = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message MessageAddressTableLookup {
|
||||||
|
bytes account_key = 1;
|
||||||
|
bytes writable_indexes = 2;
|
||||||
|
bytes readonly_indexes = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message TransactionStatusMeta {
|
||||||
|
TransactionError err = 1;
|
||||||
|
uint64 fee = 2;
|
||||||
|
repeated uint64 pre_balances = 3;
|
||||||
|
repeated uint64 post_balances = 4;
|
||||||
|
repeated InnerInstructions inner_instructions = 5;
|
||||||
|
bool inner_instructions_none = 10;
|
||||||
|
repeated string log_messages = 6;
|
||||||
|
bool log_messages_none = 11;
|
||||||
|
repeated TokenBalance pre_token_balances = 7;
|
||||||
|
repeated TokenBalance post_token_balances = 8;
|
||||||
|
repeated Reward rewards = 9;
|
||||||
|
repeated bytes loaded_writable_addresses = 12;
|
||||||
|
repeated bytes loaded_readonly_addresses = 13;
|
||||||
|
}
|
||||||
|
|
||||||
|
message TransactionError {
|
||||||
|
bytes err = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message InnerInstructions {
|
||||||
|
uint32 index = 1;
|
||||||
|
repeated CompiledInstruction instructions = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message CompiledInstruction {
|
||||||
|
uint32 program_id_index = 1;
|
||||||
|
bytes accounts = 2;
|
||||||
|
bytes data = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message TokenBalance {
|
||||||
|
uint32 account_index = 1;
|
||||||
|
string mint = 2;
|
||||||
|
UiTokenAmount ui_token_amount = 3;
|
||||||
|
string owner = 4;
|
||||||
|
string program_id = 5;
|
||||||
|
}
|
||||||
|
|
||||||
|
message UiTokenAmount {
|
||||||
|
double ui_amount = 1;
|
||||||
|
uint32 decimals = 2;
|
||||||
|
string amount = 3;
|
||||||
|
string ui_amount_string = 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
enum RewardType {
|
||||||
|
Unspecified = 0;
|
||||||
|
Fee = 1;
|
||||||
|
Rent = 2;
|
||||||
|
Staking = 3;
|
||||||
|
Voting = 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
message Reward {
|
||||||
|
string pubkey = 1;
|
||||||
|
int64 lamports = 2;
|
||||||
|
uint64 post_balance = 3;
|
||||||
|
RewardType reward_type = 4;
|
||||||
|
string commission = 5;
|
||||||
|
}
|
||||||
|
|
||||||
|
message Rewards {
|
||||||
|
repeated Reward rewards = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message UnixTimestamp {
|
||||||
|
int64 timestamp = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message BlockHeight {
|
||||||
|
uint64 block_height = 1;
|
||||||
|
}
|
|
@ -0,0 +1,134 @@
|
||||||
|
use crate::{
|
||||||
|
chain_data::{AccountData, AccountWrite, SlotUpdate, ChainData, SlotData},
|
||||||
|
metrics::Metrics,
|
||||||
|
};
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use solana_sdk::{account::WritableAccount, stake_history::Epoch, pubkey::Pubkey};
|
||||||
|
use std::{
|
||||||
|
collections::{BTreeSet, HashMap},
|
||||||
|
sync::Arc,
|
||||||
|
time::{Duration, Instant},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
pub trait AccountWriteSink {
|
||||||
|
async fn process(&self, pubkey: &Pubkey, account: &AccountData) -> Result<(), String>;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct AccountWriteRoute {
|
||||||
|
pub matched_pubkeys: Vec<Pubkey>,
|
||||||
|
pub sink: Arc<dyn AccountWriteSink + Send + Sync>,
|
||||||
|
pub timeout_interval: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
struct AcountWriteRecord {
|
||||||
|
slot: u64,
|
||||||
|
write_version: u64,
|
||||||
|
timestamp: Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn init(
|
||||||
|
routes: Vec<AccountWriteRoute>,
|
||||||
|
metrics_sender: Metrics,
|
||||||
|
) -> anyhow::Result<(
|
||||||
|
async_channel::Sender<AccountWrite>,
|
||||||
|
async_channel::Sender<SlotUpdate>,
|
||||||
|
)> {
|
||||||
|
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
|
||||||
|
let (account_write_queue_sender, account_write_queue_receiver) =
|
||||||
|
async_channel::unbounded::<AccountWrite>();
|
||||||
|
|
||||||
|
// Slot updates flowing from the outside into the single processing thread. From
|
||||||
|
// there they'll flow into the postgres sending thread.
|
||||||
|
let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>();
|
||||||
|
|
||||||
|
let mut chain_data = ChainData::new(metrics_sender);
|
||||||
|
let mut last_updated = HashMap::<String, AcountWriteRecord>::new();
|
||||||
|
|
||||||
|
let all_queue_pks: BTreeSet<Pubkey> = routes
|
||||||
|
.iter()
|
||||||
|
.flat_map(|r| r.matched_pubkeys.iter())
|
||||||
|
.map(|pk| pk.clone())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// update handling thread, reads both sloths and account updates
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
Ok(account_write) = account_write_queue_receiver.recv() => {
|
||||||
|
if all_queue_pks.contains(&account_write.pubkey) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
chain_data.update_account(
|
||||||
|
account_write.pubkey,
|
||||||
|
AccountData {
|
||||||
|
slot: account_write.slot,
|
||||||
|
write_version: account_write.write_version,
|
||||||
|
account: WritableAccount::create(
|
||||||
|
account_write.lamports,
|
||||||
|
account_write.data.clone(),
|
||||||
|
account_write.owner,
|
||||||
|
account_write.executable,
|
||||||
|
account_write.rent_epoch as Epoch,
|
||||||
|
),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Ok(slot_update) = slot_queue_receiver.recv() => {
|
||||||
|
chain_data.update_slot(SlotData {
|
||||||
|
slot: slot_update.slot,
|
||||||
|
parent: slot_update.parent,
|
||||||
|
status: slot_update.status,
|
||||||
|
chain: 0,
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for route in routes.iter() {
|
||||||
|
for pk in route.matched_pubkeys.iter() {
|
||||||
|
match chain_data.account(&pk) {
|
||||||
|
Ok(account_info) => {
|
||||||
|
let pk_b58 = pk.to_string();
|
||||||
|
if let Some(record) = last_updated.get(&pk_b58) {
|
||||||
|
let is_unchanged = account_info.slot == record.slot
|
||||||
|
&& account_info.write_version == record.write_version;
|
||||||
|
let is_throttled =
|
||||||
|
record.timestamp.elapsed() < route.timeout_interval;
|
||||||
|
if is_unchanged || is_throttled {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match route.sink.process(pk, account_info).await {
|
||||||
|
Ok(()) => {
|
||||||
|
// todo: metrics
|
||||||
|
last_updated.insert(
|
||||||
|
pk_b58.clone(),
|
||||||
|
AcountWriteRecord {
|
||||||
|
slot: account_info.slot,
|
||||||
|
write_version: account_info.write_version,
|
||||||
|
timestamp: Instant::now(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(_skip_reason) => {
|
||||||
|
// todo: metrics
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
// todo: metrics
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok((account_write_queue_sender, slot_queue_sender))
|
||||||
|
}
|
|
@ -0,0 +1,45 @@
|
||||||
|
use log::*;
|
||||||
|
use solana_client::nonblocking::rpc_client::RpcClient;
|
||||||
|
use solana_sdk::{clock::DEFAULT_MS_PER_SLOT, commitment_config::CommitmentConfig, hash::Hash};
|
||||||
|
use std::{
|
||||||
|
sync::{Arc, RwLock},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
use tokio::{spawn, time::sleep};
|
||||||
|
|
||||||
|
const RETRY_INTERVAL: Duration = Duration::from_millis(5 * DEFAULT_MS_PER_SLOT);
|
||||||
|
|
||||||
|
pub async fn poll_loop(blockhash: Arc<RwLock<Hash>>, client: Arc<RpcClient>) {
|
||||||
|
let cfg = CommitmentConfig::processed();
|
||||||
|
loop {
|
||||||
|
let old_blockhash = *blockhash.read().unwrap();
|
||||||
|
if let Ok((new_blockhash, _)) = client.get_latest_blockhash_with_commitment(cfg).await {
|
||||||
|
if new_blockhash != old_blockhash {
|
||||||
|
debug!("new blockhash ({:?})", blockhash);
|
||||||
|
*blockhash.write().unwrap() = new_blockhash;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retry every few slots
|
||||||
|
sleep(RETRY_INTERVAL).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn init(client: Arc<RpcClient>) -> Arc<RwLock<Hash>> {
|
||||||
|
// get the first blockhash
|
||||||
|
let blockhash = Arc::new(RwLock::new(
|
||||||
|
client
|
||||||
|
.get_latest_blockhash()
|
||||||
|
.await
|
||||||
|
.expect("fetch initial blockhash"),
|
||||||
|
));
|
||||||
|
|
||||||
|
// launch task
|
||||||
|
let _join_hdl = {
|
||||||
|
// create a thread-local reference to blockhash
|
||||||
|
let blockhash_c = blockhash.clone();
|
||||||
|
spawn(async move { poll_loop(blockhash_c, client).await })
|
||||||
|
};
|
||||||
|
|
||||||
|
return blockhash;
|
||||||
|
}
|
|
@ -0,0 +1,268 @@
|
||||||
|
use crate::metrics::{MetricType, MetricU64, Metrics};
|
||||||
|
|
||||||
|
use {
|
||||||
|
solana_sdk::account::{Account, AccountSharedData, ReadableAccount},
|
||||||
|
solana_sdk::pubkey::Pubkey,
|
||||||
|
std::collections::HashMap,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, Debug, PartialEq)]
|
||||||
|
pub enum SlotStatus {
|
||||||
|
Rooted,
|
||||||
|
Confirmed,
|
||||||
|
Processed,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct SlotData {
|
||||||
|
pub slot: u64,
|
||||||
|
pub parent: Option<u64>,
|
||||||
|
pub status: SlotStatus,
|
||||||
|
pub chain: u64, // the top slot that this is in a chain with. uncles will have values < tip
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct AccountData {
|
||||||
|
pub slot: u64,
|
||||||
|
pub write_version: u64,
|
||||||
|
pub account: AccountSharedData,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Clone, PartialEq, Debug)]
|
||||||
|
pub struct AccountWrite {
|
||||||
|
pub pubkey: Pubkey,
|
||||||
|
pub slot: u64,
|
||||||
|
pub write_version: u64,
|
||||||
|
pub lamports: u64,
|
||||||
|
pub owner: Pubkey,
|
||||||
|
pub executable: bool,
|
||||||
|
pub rent_epoch: u64,
|
||||||
|
pub data: Vec<u8>,
|
||||||
|
pub is_selected: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AccountWrite {
|
||||||
|
pub fn from(pubkey: Pubkey, slot: u64, write_version: u64, account: Account) -> AccountWrite {
|
||||||
|
AccountWrite {
|
||||||
|
pubkey,
|
||||||
|
slot: slot,
|
||||||
|
write_version,
|
||||||
|
lamports: account.lamports,
|
||||||
|
owner: account.owner,
|
||||||
|
executable: account.executable,
|
||||||
|
rent_epoch: account.rent_epoch,
|
||||||
|
data: account.data,
|
||||||
|
is_selected: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct SlotUpdate {
|
||||||
|
pub slot: u64,
|
||||||
|
pub parent: Option<u64>,
|
||||||
|
pub status: SlotStatus,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// Track slots and account writes
|
||||||
|
///
|
||||||
|
/// - use account() to retrieve the current best data for an account.
|
||||||
|
/// - update_from_snapshot() and update_from_websocket() update the state for new messages
|
||||||
|
pub struct ChainData {
|
||||||
|
/// only slots >= newest_rooted_slot are retained
|
||||||
|
slots: HashMap<u64, SlotData>,
|
||||||
|
/// writes to accounts, only the latest rooted write an newer are retained
|
||||||
|
accounts: HashMap<Pubkey, Vec<AccountData>>,
|
||||||
|
newest_rooted_slot: u64,
|
||||||
|
newest_processed_slot: u64,
|
||||||
|
account_versions_stored: usize,
|
||||||
|
account_bytes_stored: usize,
|
||||||
|
metric_accounts_stored: MetricU64,
|
||||||
|
metric_account_versions_stored: MetricU64,
|
||||||
|
metric_account_bytes_stored: MetricU64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ChainData {
|
||||||
|
pub fn new(metrics_sender: Metrics) -> Self {
|
||||||
|
Self {
|
||||||
|
slots: HashMap::new(),
|
||||||
|
accounts: HashMap::new(),
|
||||||
|
newest_rooted_slot: 0,
|
||||||
|
newest_processed_slot: 0,
|
||||||
|
account_versions_stored: 0,
|
||||||
|
account_bytes_stored: 0,
|
||||||
|
metric_accounts_stored: metrics_sender
|
||||||
|
.register_u64("chaindata_accounts_stored".into(), MetricType::Gauge),
|
||||||
|
metric_account_versions_stored: metrics_sender.register_u64(
|
||||||
|
"chaindata_account_versions_stored".into(),
|
||||||
|
MetricType::Gauge,
|
||||||
|
),
|
||||||
|
metric_account_bytes_stored: metrics_sender
|
||||||
|
.register_u64("chaindata_account_bytes_stored".into(), MetricType::Gauge),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update_slot(&mut self, new_slot: SlotData) {
|
||||||
|
let new_processed_head = new_slot.slot > self.newest_processed_slot;
|
||||||
|
if new_processed_head {
|
||||||
|
self.newest_processed_slot = new_slot.slot;
|
||||||
|
}
|
||||||
|
|
||||||
|
let new_rooted_head =
|
||||||
|
new_slot.slot > self.newest_rooted_slot && new_slot.status == SlotStatus::Rooted;
|
||||||
|
if new_rooted_head {
|
||||||
|
self.newest_rooted_slot = new_slot.slot;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut parent_update = false;
|
||||||
|
|
||||||
|
use std::collections::hash_map::Entry;
|
||||||
|
match self.slots.entry(new_slot.slot) {
|
||||||
|
Entry::Vacant(v) => {
|
||||||
|
v.insert(new_slot);
|
||||||
|
}
|
||||||
|
Entry::Occupied(o) => {
|
||||||
|
let v = o.into_mut();
|
||||||
|
parent_update = v.parent != new_slot.parent && new_slot.parent.is_some();
|
||||||
|
v.parent = v.parent.or(new_slot.parent);
|
||||||
|
v.status = new_slot.status;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if new_processed_head || parent_update {
|
||||||
|
// update the "chain" field down to the first rooted slot
|
||||||
|
let mut slot = self.newest_processed_slot;
|
||||||
|
loop {
|
||||||
|
if let Some(data) = self.slots.get_mut(&slot) {
|
||||||
|
data.chain = self.newest_processed_slot;
|
||||||
|
if data.status == SlotStatus::Rooted {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if let Some(parent) = data.parent {
|
||||||
|
slot = parent;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if new_rooted_head {
|
||||||
|
// for each account, preserve only writes > newest_rooted_slot, or the newest
|
||||||
|
// rooted write
|
||||||
|
self.account_versions_stored = 0;
|
||||||
|
self.account_bytes_stored = 0;
|
||||||
|
|
||||||
|
for (_, writes) in self.accounts.iter_mut() {
|
||||||
|
let newest_rooted_write = writes
|
||||||
|
.iter()
|
||||||
|
.rev()
|
||||||
|
.find(|w| {
|
||||||
|
w.slot <= self.newest_rooted_slot
|
||||||
|
&& self
|
||||||
|
.slots
|
||||||
|
.get(&w.slot)
|
||||||
|
.map(|s| {
|
||||||
|
// sometimes we seem not to get notifications about slots
|
||||||
|
// getting rooted, hence assume non-uncle slots < newest_rooted_slot
|
||||||
|
// are rooted too
|
||||||
|
s.status == SlotStatus::Rooted
|
||||||
|
|| s.chain == self.newest_processed_slot
|
||||||
|
})
|
||||||
|
// preserved account writes for deleted slots <= newest_rooted_slot
|
||||||
|
// are expected to be rooted
|
||||||
|
.unwrap_or(true)
|
||||||
|
})
|
||||||
|
.map(|w| w.slot)
|
||||||
|
// no rooted write found: produce no effect, since writes > newest_rooted_slot are retained anyway
|
||||||
|
.unwrap_or(self.newest_rooted_slot + 1);
|
||||||
|
writes
|
||||||
|
.retain(|w| w.slot == newest_rooted_write || w.slot > self.newest_rooted_slot);
|
||||||
|
self.account_versions_stored += writes.len();
|
||||||
|
self.account_bytes_stored += writes
|
||||||
|
.iter()
|
||||||
|
.map(|w| w.account.data().len())
|
||||||
|
.fold(0, |acc, l| acc + l)
|
||||||
|
}
|
||||||
|
|
||||||
|
// now it's fine to drop any slots before the new rooted head
|
||||||
|
// as account writes for non-rooted slots before it have been dropped
|
||||||
|
self.slots.retain(|s, _| *s >= self.newest_rooted_slot);
|
||||||
|
|
||||||
|
self.metric_accounts_stored.set(self.accounts.len() as u64);
|
||||||
|
self.metric_account_versions_stored
|
||||||
|
.set(self.account_versions_stored as u64);
|
||||||
|
self.metric_account_bytes_stored
|
||||||
|
.set(self.account_bytes_stored as u64);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update_account(&mut self, pubkey: Pubkey, account: AccountData) {
|
||||||
|
use std::collections::hash_map::Entry;
|
||||||
|
match self.accounts.entry(pubkey) {
|
||||||
|
Entry::Vacant(v) => {
|
||||||
|
self.account_versions_stored += 1;
|
||||||
|
self.account_bytes_stored += account.account.data().len();
|
||||||
|
v.insert(vec![account]);
|
||||||
|
}
|
||||||
|
Entry::Occupied(o) => {
|
||||||
|
let v = o.into_mut();
|
||||||
|
// v is ordered by slot ascending. find the right position
|
||||||
|
// overwrite if an entry for the slot already exists, otherwise insert
|
||||||
|
let rev_pos = v
|
||||||
|
.iter()
|
||||||
|
.rev()
|
||||||
|
.position(|d| d.slot <= account.slot)
|
||||||
|
.unwrap_or(v.len());
|
||||||
|
let pos = v.len() - rev_pos;
|
||||||
|
if pos < v.len() && v[pos].slot == account.slot {
|
||||||
|
if v[pos].write_version < account.write_version {
|
||||||
|
v[pos] = account;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self.account_versions_stored += 1;
|
||||||
|
self.account_bytes_stored += account.account.data().len();
|
||||||
|
v.insert(pos, account);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_account_write_live(&self, write: &AccountData) -> bool {
|
||||||
|
self.slots
|
||||||
|
.get(&write.slot)
|
||||||
|
// either the slot is rooted or in the current chain
|
||||||
|
.map(|s| s.status == SlotStatus::Rooted || s.chain == self.newest_processed_slot)
|
||||||
|
// if the slot can't be found but preceeds newest rooted, use it too (old rooted slots are removed)
|
||||||
|
.unwrap_or(
|
||||||
|
write.slot <= self.newest_rooted_slot || write.slot > self.newest_processed_slot,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Cloned snapshot of all the most recent live writes per pubkey
|
||||||
|
pub fn accounts_snapshot(&self) -> HashMap<Pubkey, AccountData> {
|
||||||
|
self.accounts
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(pubkey, writes)| {
|
||||||
|
let latest_good_write = writes
|
||||||
|
.iter()
|
||||||
|
.rev()
|
||||||
|
.find(|w| self.is_account_write_live(w))?;
|
||||||
|
Some((pubkey.clone(), latest_good_write.clone()))
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Ref to the most recent live write of the pubkey
|
||||||
|
pub fn account<'a>(&'a self, pubkey: &Pubkey) -> anyhow::Result<&'a AccountData> {
|
||||||
|
self.accounts
|
||||||
|
.get(pubkey)
|
||||||
|
.ok_or(anyhow::anyhow!("account {} not found", pubkey))?
|
||||||
|
.iter()
|
||||||
|
.rev()
|
||||||
|
.find(|w| self.is_account_write_live(w))
|
||||||
|
.ok_or(anyhow::anyhow!("account {} has no live data", pubkey))
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,73 @@
|
||||||
|
use std::{thread::{JoinHandle, Builder}, sync::{Arc, RwLock}, str::FromStr, time::Duration};
|
||||||
|
|
||||||
|
// use solana_client::rpc_client::RpcClient;
|
||||||
|
use solana_sdk::{pubkey::Pubkey, signature::Keypair, instruction::Instruction};
|
||||||
|
|
||||||
|
use crate::{mango::GroupConfig, account_write_filter::AccountWriteRoute, mango_v3_perp_crank_sink::MangoV3PerpCrankSink};
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// pub async fn send_tx_loop(
|
||||||
|
// ixs_rx: async_channel::Receiver<Vec<Instruction>>,
|
||||||
|
// blockhash: Arc<RwLock<Hash>>,
|
||||||
|
// client: Arc<RpcClient>,
|
||||||
|
// keypair: Keypair,
|
||||||
|
// ) {
|
||||||
|
// info!("signing with keypair pk={:?}", keypair.pubkey());
|
||||||
|
// let cfg = RpcSendTransactionConfig {
|
||||||
|
// skip_preflight: true,
|
||||||
|
// ..RpcSendTransactionConfig::default()
|
||||||
|
// };
|
||||||
|
// loop {
|
||||||
|
// if let Ok(ixs) = ixs_rx.recv().await {
|
||||||
|
// // TODO add priority fee
|
||||||
|
// let tx = Transaction::new_signed_with_payer(
|
||||||
|
// &ixs,
|
||||||
|
// Some(&keypair.pubkey()),
|
||||||
|
// &[&keypair],
|
||||||
|
// *blockhash.read().unwrap(),
|
||||||
|
// );
|
||||||
|
// // TODO: collect metrics
|
||||||
|
// info!("send tx={:?} ok={:?}", tx.signatures[0], client.send_transaction_with_config(&tx, cfg).await);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
|
||||||
|
fn start_crank_thread(
|
||||||
|
identity: Keypair,
|
||||||
|
group: GroupConfig
|
||||||
|
) -> JoinHandle<()> {
|
||||||
|
|
||||||
|
let perp_queue_pks: Vec<_> = group.perp_markets.iter().map(|m| (Pubkey::from_str(&m.public_key).unwrap(), Pubkey::from_str(&m.events_key).unwrap())).collect();
|
||||||
|
let group_pk = Pubkey::from_str(&group.public_key).unwrap();
|
||||||
|
|
||||||
|
return Builder::new()
|
||||||
|
.name("crank".to_string())
|
||||||
|
.spawn(move || {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// Event queue updates can be consumed by client connections
|
||||||
|
let (instruction_sender, instruction_receiver) = async_channel::unbounded::<Vec<Instruction>>();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
let routes = vec![
|
||||||
|
AccountWriteRoute {
|
||||||
|
matched_pubkeys: perp_queue_pks
|
||||||
|
.iter()
|
||||||
|
.map(|(_, evq_pk)| evq_pk.clone())
|
||||||
|
.collect(),
|
||||||
|
sink: Arc::new(MangoV3PerpCrankSink::new(
|
||||||
|
perp_queue_pks,
|
||||||
|
group_pk,
|
||||||
|
instruction_sender.clone(),
|
||||||
|
)),
|
||||||
|
timeout_interval: Duration::default(),
|
||||||
|
}];
|
||||||
|
|
||||||
|
}).expect("launch crank thread")
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,609 @@
|
||||||
|
use futures::stream::once;
|
||||||
|
use geyser::geyser_client::GeyserClient;
|
||||||
|
use jsonrpc_core::futures::StreamExt;
|
||||||
|
use jsonrpc_core_client::transports::http;
|
||||||
|
|
||||||
|
use serde::Deserialize;
|
||||||
|
use solana_account_decoder::{UiAccount, UiAccountEncoding};
|
||||||
|
use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
|
||||||
|
use solana_client::rpc_response::{OptionalContext, RpcKeyedAccount};
|
||||||
|
use solana_rpc::rpc::rpc_accounts::AccountsDataClient;
|
||||||
|
use solana_rpc::rpc::rpc_accounts_scan::AccountsScanClient;
|
||||||
|
use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey};
|
||||||
|
|
||||||
|
use futures::{future, future::FutureExt};
|
||||||
|
use tonic::{
|
||||||
|
metadata::MetadataValue,
|
||||||
|
transport::{Certificate, Channel, ClientTlsConfig, Identity},
|
||||||
|
Request,
|
||||||
|
};
|
||||||
|
|
||||||
|
use log::*;
|
||||||
|
use std::{collections::HashMap, env, str::FromStr, time::Duration};
|
||||||
|
|
||||||
|
pub mod geyser {
|
||||||
|
tonic::include_proto!("geyser");
|
||||||
|
}
|
||||||
|
|
||||||
|
pub mod solana {
|
||||||
|
pub mod storage {
|
||||||
|
pub mod confirmed_block {
|
||||||
|
tonic::include_proto!("solana.storage.confirmed_block");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub use geyser::*;
|
||||||
|
pub use solana::storage::confirmed_block::*;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
chain_data::{AccountWrite, SlotStatus, SlotUpdate},
|
||||||
|
metrics::{MetricType, Metrics},
|
||||||
|
AnyhowWrap,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
|
pub struct SourceConfig {
|
||||||
|
pub dedup_queue_size: usize,
|
||||||
|
pub grpc_sources: Vec<GrpcSourceConfig>,
|
||||||
|
pub snapshot: SnapshotSourceConfig,
|
||||||
|
pub rpc_ws_url: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
|
pub struct GrpcSourceConfig {
|
||||||
|
pub name: String,
|
||||||
|
pub connection_string: String,
|
||||||
|
pub retry_connection_sleep_secs: u64,
|
||||||
|
pub tls: Option<TlsConfig>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
|
pub struct TlsConfig {
|
||||||
|
pub ca_cert_path: String,
|
||||||
|
pub client_cert_path: String,
|
||||||
|
pub client_key_path: String,
|
||||||
|
pub domain_name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
|
pub struct FilterConfig {
|
||||||
|
pub program_ids: Vec<String>,
|
||||||
|
pub account_ids: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
|
pub struct SnapshotSourceConfig {
|
||||||
|
pub rpc_http_url: String,
|
||||||
|
pub program_id: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
//use solana_geyser_connector_plugin_grpc::compression::zstd_decompress;
|
||||||
|
|
||||||
|
struct SnapshotData {
|
||||||
|
slot: u64,
|
||||||
|
accounts: Vec<(String, Option<UiAccount>)>,
|
||||||
|
}
|
||||||
|
enum Message {
|
||||||
|
GrpcUpdate(geyser::SubscribeUpdate),
|
||||||
|
Snapshot(SnapshotData),
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_snapshot_gpa(
|
||||||
|
rpc_http_url: String,
|
||||||
|
program_id: String,
|
||||||
|
) -> anyhow::Result<OptionalContext<Vec<RpcKeyedAccount>>> {
|
||||||
|
let rpc_client = http::connect_with_options::<AccountsScanClient>(&rpc_http_url, true)
|
||||||
|
.await
|
||||||
|
.map_err_anyhow()?;
|
||||||
|
|
||||||
|
let account_info_config = RpcAccountInfoConfig {
|
||||||
|
encoding: Some(UiAccountEncoding::Base64),
|
||||||
|
commitment: Some(CommitmentConfig::finalized()),
|
||||||
|
data_slice: None,
|
||||||
|
min_context_slot: None,
|
||||||
|
};
|
||||||
|
let program_accounts_config = RpcProgramAccountsConfig {
|
||||||
|
filters: None,
|
||||||
|
with_context: Some(true),
|
||||||
|
account_config: account_info_config.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
info!("requesting snapshot {}", program_id);
|
||||||
|
let account_snapshot = rpc_client
|
||||||
|
.get_program_accounts(program_id.clone(), Some(program_accounts_config.clone()))
|
||||||
|
.await
|
||||||
|
.map_err_anyhow()?;
|
||||||
|
info!("snapshot received {}", program_id);
|
||||||
|
Ok(account_snapshot)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_snapshot_gma(
|
||||||
|
rpc_http_url: String,
|
||||||
|
ids: Vec<String>,
|
||||||
|
) -> anyhow::Result<solana_client::rpc_response::Response<Vec<Option<UiAccount>>>> {
|
||||||
|
let rpc_client = http::connect_with_options::<AccountsDataClient>(&rpc_http_url, true)
|
||||||
|
.await
|
||||||
|
.map_err_anyhow()?;
|
||||||
|
|
||||||
|
let account_info_config = RpcAccountInfoConfig {
|
||||||
|
encoding: Some(UiAccountEncoding::Base64),
|
||||||
|
commitment: Some(CommitmentConfig::finalized()),
|
||||||
|
data_slice: None,
|
||||||
|
min_context_slot: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
info!("requesting snapshot {:?}", ids);
|
||||||
|
let account_snapshot = rpc_client
|
||||||
|
.get_multiple_accounts(ids.clone(), Some(account_info_config))
|
||||||
|
.await
|
||||||
|
.map_err_anyhow()?;
|
||||||
|
info!("snapshot received {:?}", ids);
|
||||||
|
Ok(account_snapshot)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn feed_data_geyser(
|
||||||
|
grpc_config: &GrpcSourceConfig,
|
||||||
|
tls_config: Option<ClientTlsConfig>,
|
||||||
|
snapshot_config: &SnapshotSourceConfig,
|
||||||
|
filter_config: &FilterConfig,
|
||||||
|
sender: async_channel::Sender<Message>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let connection_string = match &grpc_config.connection_string.chars().next().unwrap() {
|
||||||
|
'$' => env::var(&grpc_config.connection_string[1..])
|
||||||
|
.expect("reading connection string from env"),
|
||||||
|
_ => grpc_config.connection_string.clone(),
|
||||||
|
};
|
||||||
|
let rpc_http_url = match &snapshot_config.rpc_http_url.chars().next().unwrap() {
|
||||||
|
'$' => env::var(&snapshot_config.rpc_http_url[1..])
|
||||||
|
.expect("reading connection string from env"),
|
||||||
|
_ => snapshot_config.rpc_http_url.clone(),
|
||||||
|
};
|
||||||
|
info!("connecting {}", connection_string);
|
||||||
|
let endpoint = Channel::from_shared(connection_string)?;
|
||||||
|
let channel = if let Some(tls) = tls_config {
|
||||||
|
endpoint.tls_config(tls)?
|
||||||
|
} else {
|
||||||
|
endpoint
|
||||||
|
}
|
||||||
|
.connect()
|
||||||
|
.await?;
|
||||||
|
let token: MetadataValue<_> = "eed31807f710e4bb098779fb9f67".parse()?;
|
||||||
|
let mut client = GeyserClient::with_interceptor(channel, move |mut req: Request<()>| {
|
||||||
|
req.metadata_mut().insert("x-token", token.clone());
|
||||||
|
Ok(req)
|
||||||
|
});
|
||||||
|
|
||||||
|
// If account_ids are provided, snapshot will be gMA. If only program_ids, then only the first id will be snapshot
|
||||||
|
// TODO: handle this better
|
||||||
|
if filter_config.program_ids.len() > 1 {
|
||||||
|
warn!("only one program id is supported for gPA snapshots")
|
||||||
|
}
|
||||||
|
let mut accounts = HashMap::new();
|
||||||
|
accounts.insert(
|
||||||
|
"client".to_owned(),
|
||||||
|
SubscribeRequestFilterAccounts {
|
||||||
|
account: filter_config.account_ids.clone(),
|
||||||
|
owner: filter_config.program_ids.clone(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
let mut slots = HashMap::new();
|
||||||
|
slots.insert("client".to_owned(), SubscribeRequestFilterSlots {});
|
||||||
|
let blocks = HashMap::new();
|
||||||
|
let transactions = HashMap::new();
|
||||||
|
|
||||||
|
let request = SubscribeRequest {
|
||||||
|
accounts,
|
||||||
|
blocks,
|
||||||
|
slots,
|
||||||
|
transactions,
|
||||||
|
};
|
||||||
|
info!("Going to send request: {:?}", request);
|
||||||
|
|
||||||
|
let response = client.subscribe(once(async move { request })).await?;
|
||||||
|
let mut update_stream = response.into_inner();
|
||||||
|
|
||||||
|
// We can't get a snapshot immediately since the finalized snapshot would be for a
|
||||||
|
// slot in the past and we'd be missing intermediate updates.
|
||||||
|
//
|
||||||
|
// Delay the request until the first slot we received all writes for becomes rooted
|
||||||
|
// to avoid that problem - partially. The rooted slot will still be larger than the
|
||||||
|
// finalized slot, so add a number of slots as a buffer.
|
||||||
|
//
|
||||||
|
// If that buffer isn't sufficient, there'll be a retry.
|
||||||
|
|
||||||
|
// The first slot that we will receive _all_ account writes for
|
||||||
|
let mut first_full_slot: u64 = u64::MAX;
|
||||||
|
|
||||||
|
// If a snapshot should be performed when ready.
|
||||||
|
let mut snapshot_needed = true;
|
||||||
|
|
||||||
|
// The highest "rooted" slot that has been seen.
|
||||||
|
let mut max_rooted_slot = 0;
|
||||||
|
|
||||||
|
// Data for slots will arrive out of order. This value defines how many
|
||||||
|
// slots after a slot was marked "rooted" we assume it'll not receive
|
||||||
|
// any more account write information.
|
||||||
|
//
|
||||||
|
// This is important for the write_version mapping (to know when slots can
|
||||||
|
// be dropped).
|
||||||
|
let max_out_of_order_slots = 40;
|
||||||
|
|
||||||
|
// Number of slots that we expect "finalized" commitment to lag
|
||||||
|
// behind "rooted". This matters for getProgramAccounts based snapshots,
|
||||||
|
// which will have "finalized" commitment.
|
||||||
|
let mut rooted_to_finalized_slots = 30;
|
||||||
|
|
||||||
|
let mut snapshot_gma = future::Fuse::terminated();
|
||||||
|
let mut snapshot_gpa = future::Fuse::terminated();
|
||||||
|
|
||||||
|
// The plugin sends a ping every 5s or so
|
||||||
|
let fatal_idle_timeout = Duration::from_secs(60);
|
||||||
|
|
||||||
|
// Highest slot that an account write came in for.
|
||||||
|
let mut newest_write_slot: u64 = 0;
|
||||||
|
|
||||||
|
struct WriteVersion {
|
||||||
|
// Write version seen on-chain
|
||||||
|
global: u64,
|
||||||
|
// The per-pubkey per-slot write version
|
||||||
|
slot: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
// map slot -> (pubkey -> WriteVersion)
|
||||||
|
//
|
||||||
|
// Since the write_version is a private indentifier per node it can't be used
|
||||||
|
// to deduplicate events from multiple nodes. Here we rewrite it such that each
|
||||||
|
// pubkey and each slot has a consecutive numbering of writes starting at 1.
|
||||||
|
//
|
||||||
|
// That number will be consistent for each node.
|
||||||
|
let mut slot_pubkey_writes = HashMap::<u64, HashMap<[u8; 32], WriteVersion>>::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
update = update_stream.next() => {
|
||||||
|
use geyser::{subscribe_update::UpdateOneof};
|
||||||
|
let mut update = update.ok_or(anyhow::anyhow!("geyser plugin has closed the stream"))??;
|
||||||
|
match update.update_oneof.as_mut().expect("invalid grpc") {
|
||||||
|
UpdateOneof::Slot(slot_update) => {
|
||||||
|
let status = slot_update.status;
|
||||||
|
if status == SubscribeUpdateSlotStatus::Finalized as i32 {
|
||||||
|
if first_full_slot == u64::MAX {
|
||||||
|
// TODO: is this equivalent to before? what was highesy_write_slot?
|
||||||
|
first_full_slot = slot_update.slot + 1;
|
||||||
|
}
|
||||||
|
if slot_update.slot > max_rooted_slot {
|
||||||
|
max_rooted_slot = slot_update.slot;
|
||||||
|
|
||||||
|
// drop data for slots that are well beyond rooted
|
||||||
|
slot_pubkey_writes.retain(|&k, _| k >= max_rooted_slot - max_out_of_order_slots);
|
||||||
|
}
|
||||||
|
|
||||||
|
if snapshot_needed && max_rooted_slot - rooted_to_finalized_slots > first_full_slot {
|
||||||
|
snapshot_needed = false;
|
||||||
|
if filter_config.account_ids.len() > 0 {
|
||||||
|
snapshot_gma = tokio::spawn(get_snapshot_gma(rpc_http_url.clone(), filter_config.account_ids.clone())).fuse();
|
||||||
|
} else if filter_config.program_ids.len() > 0 {
|
||||||
|
snapshot_gpa = tokio::spawn(get_snapshot_gpa(rpc_http_url.clone(), filter_config.program_ids[0].clone())).fuse();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
UpdateOneof::Account(info) => {
|
||||||
|
if info.slot < first_full_slot {
|
||||||
|
// Don't try to process data for slots where we may have missed writes:
|
||||||
|
// We could not map the write_version correctly for them.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if info.slot > newest_write_slot {
|
||||||
|
newest_write_slot = info.slot;
|
||||||
|
} else if max_rooted_slot > 0 && info.slot < max_rooted_slot - max_out_of_order_slots {
|
||||||
|
anyhow::bail!("received write {} slots back from max rooted slot {}", max_rooted_slot - info.slot, max_rooted_slot);
|
||||||
|
}
|
||||||
|
|
||||||
|
let pubkey_writes = slot_pubkey_writes.entry(info.slot).or_default();
|
||||||
|
let mut write = match info.account.clone() {
|
||||||
|
Some(x) => x,
|
||||||
|
None => {
|
||||||
|
// TODO: handle error
|
||||||
|
continue;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
let pubkey_bytes = Pubkey::new(&write.pubkey).to_bytes();
|
||||||
|
let write_version_mapping = pubkey_writes.entry(pubkey_bytes).or_insert(WriteVersion {
|
||||||
|
global: write.write_version,
|
||||||
|
slot: 1, // write version 0 is reserved for snapshots
|
||||||
|
});
|
||||||
|
|
||||||
|
// We assume we will receive write versions for each pubkey in sequence.
|
||||||
|
// If this is not the case, logic here does not work correctly because
|
||||||
|
// a later write could arrive first.
|
||||||
|
if write.write_version < write_version_mapping.global {
|
||||||
|
anyhow::bail!("unexpected write version: got {}, expected >= {}", write.write_version, write_version_mapping.global);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rewrite the update to use the local write version and bump it
|
||||||
|
write.write_version = write_version_mapping.slot as u64;
|
||||||
|
write_version_mapping.slot += 1;
|
||||||
|
},
|
||||||
|
UpdateOneof::Block(_) => {},
|
||||||
|
UpdateOneof::Transaction(_) => {},
|
||||||
|
}
|
||||||
|
sender.send(Message::GrpcUpdate(update)).await.expect("send success");
|
||||||
|
},
|
||||||
|
snapshot = &mut snapshot_gma => {
|
||||||
|
let snapshot = snapshot??;
|
||||||
|
info!("snapshot is for slot {}, first full slot was {}", snapshot.context.slot, first_full_slot);
|
||||||
|
if snapshot.context.slot >= first_full_slot {
|
||||||
|
let accounts: Vec<(String, Option<UiAccount>)> = filter_config.account_ids.iter().zip(snapshot.value).map(|x| (x.0.clone(), x.1)).collect();
|
||||||
|
sender
|
||||||
|
.send(Message::Snapshot(SnapshotData {
|
||||||
|
accounts,
|
||||||
|
slot: snapshot.context.slot,
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
.expect("send success");
|
||||||
|
} else {
|
||||||
|
info!(
|
||||||
|
"snapshot is too old: has slot {}, expected {} minimum",
|
||||||
|
snapshot.context.slot,
|
||||||
|
first_full_slot
|
||||||
|
);
|
||||||
|
// try again in another 10 slots
|
||||||
|
snapshot_needed = true;
|
||||||
|
rooted_to_finalized_slots += 10;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
snapshot = &mut snapshot_gpa => {
|
||||||
|
let snapshot = snapshot??;
|
||||||
|
if let OptionalContext::Context(snapshot_data) = snapshot {
|
||||||
|
info!("snapshot is for slot {}, first full slot was {}", snapshot_data.context.slot, first_full_slot);
|
||||||
|
if snapshot_data.context.slot >= first_full_slot {
|
||||||
|
let accounts: Vec<(String, Option<UiAccount>)> = snapshot_data.value.iter().map(|x| {
|
||||||
|
let deref = x.clone();
|
||||||
|
(deref.pubkey, Some(deref.account))
|
||||||
|
}).collect();
|
||||||
|
sender
|
||||||
|
.send(Message::Snapshot(SnapshotData {
|
||||||
|
accounts,
|
||||||
|
slot: snapshot_data.context.slot,
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
.expect("send success");
|
||||||
|
} else {
|
||||||
|
info!(
|
||||||
|
"snapshot is too old: has slot {}, expected {} minimum",
|
||||||
|
snapshot_data.context.slot,
|
||||||
|
first_full_slot
|
||||||
|
);
|
||||||
|
// try again in another 10 slots
|
||||||
|
snapshot_needed = true;
|
||||||
|
rooted_to_finalized_slots += 10;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
anyhow::bail!("bad snapshot format");
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ = tokio::time::sleep(fatal_idle_timeout) => {
|
||||||
|
anyhow::bail!("geyser plugin hasn't sent a message in too long");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_tls_config(config: &TlsConfig) -> ClientTlsConfig {
|
||||||
|
let server_root_ca_cert = match &config.ca_cert_path.chars().next().unwrap() {
|
||||||
|
'$' => env::var(&config.ca_cert_path[1..])
|
||||||
|
.expect("reading server root ca cert from env")
|
||||||
|
.into_bytes(),
|
||||||
|
_ => std::fs::read(&config.ca_cert_path).expect("reading server root ca cert from file"),
|
||||||
|
};
|
||||||
|
let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
|
||||||
|
let client_cert = match &config.client_cert_path.chars().next().unwrap() {
|
||||||
|
'$' => env::var(&config.client_cert_path[1..])
|
||||||
|
.expect("reading client cert from env")
|
||||||
|
.into_bytes(),
|
||||||
|
_ => std::fs::read(&config.client_cert_path).expect("reading client cert from file"),
|
||||||
|
};
|
||||||
|
let client_key = match &config.client_key_path.chars().next().unwrap() {
|
||||||
|
'$' => env::var(&config.client_key_path[1..])
|
||||||
|
.expect("reading client key from env")
|
||||||
|
.into_bytes(),
|
||||||
|
_ => std::fs::read(&config.client_key_path).expect("reading client key from file"),
|
||||||
|
};
|
||||||
|
let client_identity = Identity::from_pem(client_cert, client_key);
|
||||||
|
let domain_name = match &config.domain_name.chars().next().unwrap() {
|
||||||
|
'$' => env::var(&config.domain_name[1..]).expect("reading domain name from env"),
|
||||||
|
_ => config.domain_name.clone(),
|
||||||
|
};
|
||||||
|
ClientTlsConfig::new()
|
||||||
|
.ca_certificate(server_root_ca_cert)
|
||||||
|
.identity(client_identity)
|
||||||
|
.domain_name(domain_name)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn process_events(
|
||||||
|
config: &SourceConfig,
|
||||||
|
filter_config: &FilterConfig,
|
||||||
|
account_write_queue_sender: async_channel::Sender<AccountWrite>,
|
||||||
|
slot_queue_sender: async_channel::Sender<SlotUpdate>,
|
||||||
|
metrics_sender: Metrics,
|
||||||
|
) {
|
||||||
|
// Subscribe to geyser
|
||||||
|
let (msg_sender, msg_receiver) = async_channel::bounded::<Message>(config.dedup_queue_size);
|
||||||
|
for grpc_source in config.grpc_sources.clone() {
|
||||||
|
let msg_sender = msg_sender.clone();
|
||||||
|
let snapshot_source = config.snapshot.clone();
|
||||||
|
let metrics_sender = metrics_sender.clone();
|
||||||
|
let f = filter_config.clone();
|
||||||
|
|
||||||
|
// Make TLS config if configured
|
||||||
|
let tls_config = grpc_source.tls.as_ref().map(make_tls_config);
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut metric_retries = metrics_sender.register_u64(
|
||||||
|
format!("grpc_source_{}_connection_retries", grpc_source.name,),
|
||||||
|
MetricType::Counter,
|
||||||
|
);
|
||||||
|
let metric_connected =
|
||||||
|
metrics_sender.register_bool(format!("grpc_source_{}_status", grpc_source.name));
|
||||||
|
|
||||||
|
// Continuously reconnect on failure
|
||||||
|
loop {
|
||||||
|
metric_connected.set(true);
|
||||||
|
let out = feed_data_geyser(
|
||||||
|
&grpc_source,
|
||||||
|
tls_config.clone(),
|
||||||
|
&snapshot_source,
|
||||||
|
&f,
|
||||||
|
msg_sender.clone(),
|
||||||
|
);
|
||||||
|
let result = out.await;
|
||||||
|
assert!(result.is_err());
|
||||||
|
if let Err(err) = result {
|
||||||
|
warn!(
|
||||||
|
"error during communication with the geyser plugin. retrying. {:?}",
|
||||||
|
err
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
metric_connected.set(false);
|
||||||
|
metric_retries.increment();
|
||||||
|
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(
|
||||||
|
grpc_source.retry_connection_sleep_secs,
|
||||||
|
))
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// slot -> (pubkey -> write_version)
|
||||||
|
//
|
||||||
|
// To avoid unnecessarily sending requests to SQL, we track the latest write_version
|
||||||
|
// for each (slot, pubkey). If an already-seen write_version comes in, it can be safely
|
||||||
|
// discarded.
|
||||||
|
let mut latest_write = HashMap::<u64, HashMap<[u8; 32], u64>>::new();
|
||||||
|
|
||||||
|
// Number of slots to retain in latest_write
|
||||||
|
let latest_write_retention = 50;
|
||||||
|
|
||||||
|
let mut metric_account_writes =
|
||||||
|
metrics_sender.register_u64("grpc_account_writes".into(), MetricType::Counter);
|
||||||
|
let mut metric_account_queue =
|
||||||
|
metrics_sender.register_u64("grpc_account_write_queue".into(), MetricType::Gauge);
|
||||||
|
let mut metric_dedup_queue =
|
||||||
|
metrics_sender.register_u64("grpc_dedup_queue".into(), MetricType::Gauge);
|
||||||
|
let mut metric_slot_queue =
|
||||||
|
metrics_sender.register_u64("grpc_slot_update_queue".into(), MetricType::Gauge);
|
||||||
|
let mut metric_slot_updates =
|
||||||
|
metrics_sender.register_u64("grpc_slot_updates".into(), MetricType::Counter);
|
||||||
|
let mut metric_snapshots =
|
||||||
|
metrics_sender.register_u64("grpc_snapshots".into(), MetricType::Counter);
|
||||||
|
let mut metric_snapshot_account_writes =
|
||||||
|
metrics_sender.register_u64("grpc_snapshot_account_writes".into(), MetricType::Counter);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
metric_dedup_queue.set(msg_receiver.len() as u64);
|
||||||
|
let msg = msg_receiver.recv().await.expect("sender must not close");
|
||||||
|
use geyser::subscribe_update::UpdateOneof;
|
||||||
|
match msg {
|
||||||
|
Message::GrpcUpdate(update) => {
|
||||||
|
match update.update_oneof.expect("invalid grpc") {
|
||||||
|
UpdateOneof::Account(info) => {
|
||||||
|
let update = match info.account.clone() {
|
||||||
|
Some(x) => x,
|
||||||
|
None => {
|
||||||
|
// TODO: handle error
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
assert!(update.pubkey.len() == 32);
|
||||||
|
assert!(update.owner.len() == 32);
|
||||||
|
|
||||||
|
metric_account_writes.increment();
|
||||||
|
metric_account_queue.set(account_write_queue_sender.len() as u64);
|
||||||
|
|
||||||
|
// Skip writes that a different server has already sent
|
||||||
|
let pubkey_writes = latest_write.entry(info.slot).or_default();
|
||||||
|
let pubkey_bytes = Pubkey::new(&update.pubkey).to_bytes();
|
||||||
|
let writes = pubkey_writes.entry(pubkey_bytes).or_insert(0);
|
||||||
|
if update.write_version <= *writes {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
*writes = update.write_version;
|
||||||
|
latest_write.retain(|&k, _| k >= info.slot - latest_write_retention);
|
||||||
|
// let mut uncompressed: Vec<u8> = Vec::new();
|
||||||
|
// zstd_decompress(&update.data, &mut uncompressed).unwrap();
|
||||||
|
account_write_queue_sender
|
||||||
|
.send(AccountWrite {
|
||||||
|
pubkey: Pubkey::new(&update.pubkey),
|
||||||
|
slot: info.slot,
|
||||||
|
write_version: update.write_version,
|
||||||
|
lamports: update.lamports,
|
||||||
|
owner: Pubkey::new(&update.owner),
|
||||||
|
executable: update.executable,
|
||||||
|
rent_epoch: update.rent_epoch,
|
||||||
|
data: update.data,
|
||||||
|
// TODO: what should this be? related to account deletes?
|
||||||
|
is_selected: true,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.expect("send success");
|
||||||
|
}
|
||||||
|
UpdateOneof::Slot(update) => {
|
||||||
|
metric_slot_updates.increment();
|
||||||
|
metric_slot_queue.set(slot_queue_sender.len() as u64);
|
||||||
|
|
||||||
|
let status =
|
||||||
|
SubscribeUpdateSlotStatus::from_i32(update.status).map(|v| match v {
|
||||||
|
SubscribeUpdateSlotStatus::Processed => SlotStatus::Processed,
|
||||||
|
SubscribeUpdateSlotStatus::Confirmed => SlotStatus::Confirmed,
|
||||||
|
SubscribeUpdateSlotStatus::Finalized => SlotStatus::Rooted,
|
||||||
|
});
|
||||||
|
if status.is_none() {
|
||||||
|
error!("unexpected slot status: {}", update.status);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let slot_update = SlotUpdate {
|
||||||
|
slot: update.slot,
|
||||||
|
parent: update.parent,
|
||||||
|
status: status.expect("qed"),
|
||||||
|
};
|
||||||
|
|
||||||
|
slot_queue_sender
|
||||||
|
.send(slot_update)
|
||||||
|
.await
|
||||||
|
.expect("send success");
|
||||||
|
}
|
||||||
|
UpdateOneof::Block(_) => {}
|
||||||
|
UpdateOneof::Transaction(_) => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Message::Snapshot(update) => {
|
||||||
|
metric_snapshots.increment();
|
||||||
|
info!("processing snapshot...");
|
||||||
|
for account in update.accounts.iter() {
|
||||||
|
metric_snapshot_account_writes.increment();
|
||||||
|
metric_account_queue.set(account_write_queue_sender.len() as u64);
|
||||||
|
|
||||||
|
match account {
|
||||||
|
(key, Some(ui_account)) => {
|
||||||
|
// TODO: Resnapshot on invalid data?
|
||||||
|
let pubkey = Pubkey::from_str(key).unwrap();
|
||||||
|
let account: Account = ui_account.decode().unwrap();
|
||||||
|
account_write_queue_sender
|
||||||
|
.send(AccountWrite::from(pubkey, update.slot, 0, account))
|
||||||
|
.await
|
||||||
|
.expect("send success");
|
||||||
|
}
|
||||||
|
(key, None) => warn!("account not found {}", key),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
info!("processing snapshot done");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
19
src/lib.rs
19
src/lib.rs
|
@ -1,7 +1,26 @@
|
||||||
|
pub mod account_write_filter;
|
||||||
|
pub mod blockhash_poller;
|
||||||
pub mod cli;
|
pub mod cli;
|
||||||
|
pub mod chain_data;
|
||||||
pub mod confirmation_strategies;
|
pub mod confirmation_strategies;
|
||||||
|
pub mod crank;
|
||||||
|
pub mod grpc_plugin_source;
|
||||||
pub mod helpers;
|
pub mod helpers;
|
||||||
pub mod mango;
|
pub mod mango;
|
||||||
|
pub mod mango_v3_perp_crank_sink;
|
||||||
pub mod market_markers;
|
pub mod market_markers;
|
||||||
|
pub mod metrics;
|
||||||
pub mod rotating_queue;
|
pub mod rotating_queue;
|
||||||
pub mod states;
|
pub mod states;
|
||||||
|
|
||||||
|
trait AnyhowWrap {
|
||||||
|
type Value;
|
||||||
|
fn map_err_anyhow(self) -> anyhow::Result<Self::Value>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, E: std::fmt::Debug> AnyhowWrap for Result<T, E> {
|
||||||
|
type Value = T;
|
||||||
|
fn map_err_anyhow(self) -> anyhow::Result<Self::Value> {
|
||||||
|
self.map_err(|err| anyhow::anyhow!("{:?}", err))
|
||||||
|
}
|
||||||
|
}
|
|
@ -64,3 +64,4 @@ pub struct MarketConfig {
|
||||||
pub asks_key: String,
|
pub asks_key: String,
|
||||||
pub events_key: String,
|
pub events_key: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,136 @@
|
||||||
|
use std::{
|
||||||
|
cell::RefCell,
|
||||||
|
collections::{BTreeMap},
|
||||||
|
convert::TryFrom,
|
||||||
|
mem::size_of,
|
||||||
|
};
|
||||||
|
|
||||||
|
use arrayref::array_ref;
|
||||||
|
use async_channel::Sender;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use mango::{
|
||||||
|
instruction::consume_events,
|
||||||
|
queue::{AnyEvent, EventQueueHeader, EventType, FillEvent, OutEvent, Queue},
|
||||||
|
};
|
||||||
|
use solana_sdk::{
|
||||||
|
account::ReadableAccount,
|
||||||
|
instruction::{Instruction},
|
||||||
|
pubkey::Pubkey,
|
||||||
|
};
|
||||||
|
|
||||||
|
use bytemuck::cast_ref;
|
||||||
|
|
||||||
|
use crate::{account_write_filter::AccountWriteSink, chain_data::AccountData};
|
||||||
|
|
||||||
|
const MAX_BACKLOG: usize = 2;
|
||||||
|
const MAX_EVENTS_PER_TX: usize = 10;
|
||||||
|
|
||||||
|
pub struct MangoV3PerpCrankSink {
|
||||||
|
pks: BTreeMap<Pubkey, Pubkey>,
|
||||||
|
group_pk: Pubkey,
|
||||||
|
cache_pk: Pubkey,
|
||||||
|
mango_v3_program: Pubkey,
|
||||||
|
instruction_sender: Sender<Vec<Instruction>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MangoV3PerpCrankSink {
|
||||||
|
pub fn new(
|
||||||
|
pks: Vec<(Pubkey, Pubkey)>,
|
||||||
|
group_pk: Pubkey,
|
||||||
|
cache_pk: Pubkey,
|
||||||
|
mango_v3_program: Pubkey,
|
||||||
|
instruction_sender: Sender<Vec<Instruction>>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
pks: pks.iter().map(|e| e.clone()).collect(),
|
||||||
|
group_pk,
|
||||||
|
cache_pk,
|
||||||
|
mango_v3_program,
|
||||||
|
instruction_sender,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue
|
||||||
|
const EVENT_SIZE: usize = 200; //size_of::<AnyEvent>();
|
||||||
|
const QUEUE_LEN: usize = 256;
|
||||||
|
type EventQueueEvents = [AnyEvent; QUEUE_LEN];
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl AccountWriteSink for MangoV3PerpCrankSink {
|
||||||
|
async fn process(&self, pk: &Pubkey, account: &AccountData) -> Result<(), String> {
|
||||||
|
let account = &account.account;
|
||||||
|
|
||||||
|
let ix: Result<Instruction, String> = {
|
||||||
|
const HEADER_SIZE: usize = size_of::<EventQueueHeader>();
|
||||||
|
let header_data = array_ref![account.data(), 0, HEADER_SIZE];
|
||||||
|
let header = RefCell::<EventQueueHeader>::new(*bytemuck::from_bytes(header_data));
|
||||||
|
// trace!("evq {} seq_num {}", mkt.name, header.seq_num);
|
||||||
|
|
||||||
|
const QUEUE_SIZE: usize = EVENT_SIZE * QUEUE_LEN;
|
||||||
|
let events_data = array_ref![account.data(), HEADER_SIZE, QUEUE_SIZE];
|
||||||
|
let events = RefCell::<EventQueueEvents>::new(*bytemuck::from_bytes(events_data));
|
||||||
|
let event_queue = Queue {
|
||||||
|
header: header.borrow_mut(),
|
||||||
|
buf: events.borrow_mut(),
|
||||||
|
};
|
||||||
|
|
||||||
|
// only crank if at least 1 fill or a sufficient events of other categories are buffered
|
||||||
|
let contains_fill_events = event_queue
|
||||||
|
.iter()
|
||||||
|
.find(|e| e.event_type == EventType::Fill as u8)
|
||||||
|
.is_some();
|
||||||
|
let has_backlog = event_queue.iter().count() > MAX_BACKLOG;
|
||||||
|
if !contains_fill_events && !has_backlog {
|
||||||
|
return Err("throttled".into());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut mango_accounts: Vec<_> = event_queue
|
||||||
|
.iter()
|
||||||
|
.take(MAX_EVENTS_PER_TX)
|
||||||
|
.flat_map(
|
||||||
|
|e| match EventType::try_from(e.event_type).expect("mango v4 event") {
|
||||||
|
EventType::Fill => {
|
||||||
|
let fill: &FillEvent = cast_ref(e);
|
||||||
|
vec![fill.maker, fill.taker]
|
||||||
|
}
|
||||||
|
EventType::Out => {
|
||||||
|
let out: &OutEvent = cast_ref(e);
|
||||||
|
vec![out.owner]
|
||||||
|
}
|
||||||
|
EventType::Liquidate => vec![],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let mkt_pk = self
|
||||||
|
.pks
|
||||||
|
.get(pk)
|
||||||
|
.expect(&format!("{pk:?} is a known public key"));
|
||||||
|
|
||||||
|
let ix = consume_events(
|
||||||
|
&self.mango_v3_program,
|
||||||
|
&self.group_pk,
|
||||||
|
&self.cache_pk,
|
||||||
|
mkt_pk,
|
||||||
|
pk,
|
||||||
|
&mut mango_accounts,
|
||||||
|
MAX_EVENTS_PER_TX,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
Ok(ix)
|
||||||
|
};
|
||||||
|
|
||||||
|
// info!(
|
||||||
|
// "evq={pk:?} count={} limit=10",
|
||||||
|
// event_queue.iter().count()
|
||||||
|
// );
|
||||||
|
|
||||||
|
if let Err(e) = self.instruction_sender.send(vec![ix?]).await {
|
||||||
|
return Err(e.to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,338 @@
|
||||||
|
use serde::Deserialize;
|
||||||
|
|
||||||
|
use {
|
||||||
|
log::*,
|
||||||
|
std::collections::HashMap,
|
||||||
|
std::fmt,
|
||||||
|
std::sync::{atomic, Arc, Mutex, RwLock},
|
||||||
|
tokio::time,
|
||||||
|
warp::{Filter, Rejection, Reply},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum Value {
|
||||||
|
U64 {
|
||||||
|
value: Arc<atomic::AtomicU64>,
|
||||||
|
metric_type: MetricType,
|
||||||
|
},
|
||||||
|
I64 {
|
||||||
|
value: Arc<atomic::AtomicI64>,
|
||||||
|
metric_type: MetricType,
|
||||||
|
},
|
||||||
|
Bool {
|
||||||
|
value: Arc<Mutex<bool>>,
|
||||||
|
metric_type: MetricType,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum MetricType {
|
||||||
|
Counter,
|
||||||
|
Gauge,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for MetricType {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
MetricType::Counter => {
|
||||||
|
write!(f, "counter")
|
||||||
|
}
|
||||||
|
MetricType::Gauge => {
|
||||||
|
write!(f, "gauge")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum PrevValue {
|
||||||
|
U64(u64),
|
||||||
|
I64(i64),
|
||||||
|
Bool(bool),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct MetricU64 {
|
||||||
|
value: Arc<atomic::AtomicU64>,
|
||||||
|
}
|
||||||
|
impl MetricU64 {
|
||||||
|
pub fn value(&self) -> u64 {
|
||||||
|
self.value.load(atomic::Ordering::Acquire)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set(&mut self, value: u64) {
|
||||||
|
self.value.store(value, atomic::Ordering::Release);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_max(&mut self, value: u64) {
|
||||||
|
self.value.fetch_max(value, atomic::Ordering::AcqRel);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add(&mut self, value: u64) {
|
||||||
|
self.value.fetch_add(value, atomic::Ordering::AcqRel);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn increment(&mut self) {
|
||||||
|
self.value.fetch_add(1, atomic::Ordering::AcqRel);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn decrement(&mut self) {
|
||||||
|
self.value.fetch_sub(1, atomic::Ordering::AcqRel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct MetricI64 {
|
||||||
|
value: Arc<atomic::AtomicI64>,
|
||||||
|
}
|
||||||
|
impl MetricI64 {
|
||||||
|
pub fn set(&mut self, value: i64) {
|
||||||
|
self.value.store(value, atomic::Ordering::Release);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn increment(&mut self) {
|
||||||
|
self.value.fetch_add(1, atomic::Ordering::AcqRel);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn decrement(&mut self) {
|
||||||
|
self.value.fetch_sub(1, atomic::Ordering::AcqRel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct MetricBool {
|
||||||
|
value: Arc<Mutex<bool>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MetricBool {
|
||||||
|
pub fn set(&self, value: bool) {
|
||||||
|
*self.value.lock().unwrap() = value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Metrics {
|
||||||
|
registry: Arc<RwLock<HashMap<String, Value>>>,
|
||||||
|
labels: HashMap<String, String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Metrics {
|
||||||
|
pub fn register_u64(&self, name: String, metric_type: MetricType) -> MetricU64 {
|
||||||
|
let mut registry = self.registry.write().unwrap();
|
||||||
|
let value = registry.entry(name).or_insert(Value::U64 {
|
||||||
|
value: Arc::new(atomic::AtomicU64::new(0)),
|
||||||
|
metric_type: metric_type,
|
||||||
|
});
|
||||||
|
MetricU64 {
|
||||||
|
value: match value {
|
||||||
|
Value::U64 {
|
||||||
|
value: v,
|
||||||
|
metric_type: _,
|
||||||
|
} => v.clone(),
|
||||||
|
_ => panic!("bad metric type"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register_i64(&self, name: String, metric_type: MetricType) -> MetricI64 {
|
||||||
|
let mut registry = self.registry.write().unwrap();
|
||||||
|
let value = registry.entry(name).or_insert(Value::I64 {
|
||||||
|
value: Arc::new(atomic::AtomicI64::new(0)),
|
||||||
|
metric_type: metric_type,
|
||||||
|
});
|
||||||
|
MetricI64 {
|
||||||
|
value: match value {
|
||||||
|
Value::I64 {
|
||||||
|
value: v,
|
||||||
|
metric_type: _,
|
||||||
|
} => v.clone(),
|
||||||
|
_ => panic!("bad metric type"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register_bool(&self, name: String) -> MetricBool {
|
||||||
|
let mut registry = self.registry.write().unwrap();
|
||||||
|
let value = registry.entry(name).or_insert(Value::Bool {
|
||||||
|
value: Arc::new(Mutex::new(false)),
|
||||||
|
metric_type: MetricType::Gauge,
|
||||||
|
});
|
||||||
|
MetricBool {
|
||||||
|
value: match value {
|
||||||
|
Value::Bool {
|
||||||
|
value: v,
|
||||||
|
metric_type: _,
|
||||||
|
} => v.clone(),
|
||||||
|
_ => panic!("bad metric type"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_registry_vec(&self) -> Vec<(String, String, String)> {
|
||||||
|
let mut vec: Vec<(String, String, String)> = Vec::new();
|
||||||
|
let metrics = self.registry.read().unwrap();
|
||||||
|
for (name, value) in metrics.iter() {
|
||||||
|
let (value_str, type_str) = match value {
|
||||||
|
Value::U64 {
|
||||||
|
value: v,
|
||||||
|
metric_type: t,
|
||||||
|
} => (
|
||||||
|
format!("{}", v.load(atomic::Ordering::Acquire)),
|
||||||
|
t.to_string(),
|
||||||
|
),
|
||||||
|
Value::I64 {
|
||||||
|
value: v,
|
||||||
|
metric_type: t,
|
||||||
|
} => (
|
||||||
|
format!("{}", v.load(atomic::Ordering::Acquire)),
|
||||||
|
t.to_string(),
|
||||||
|
),
|
||||||
|
Value::Bool {
|
||||||
|
value: v,
|
||||||
|
metric_type: t,
|
||||||
|
} => {
|
||||||
|
let bool_to_int = if *v.lock().unwrap() { 1 } else { 0 };
|
||||||
|
(format!("{}", bool_to_int), t.to_string())
|
||||||
|
}
|
||||||
|
};
|
||||||
|
vec.push((name.clone(), value_str, type_str));
|
||||||
|
}
|
||||||
|
vec
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_prometheus_poll(metrics: Metrics) -> Result<impl Reply, Rejection> {
|
||||||
|
debug!("handle_prometheus_poll");
|
||||||
|
let label_strings_vec: Vec<String> = metrics
|
||||||
|
.labels
|
||||||
|
.iter()
|
||||||
|
.map(|(name, value)| format!("{}=\"{}\"", name, value))
|
||||||
|
.collect();
|
||||||
|
let lines: Vec<String> = metrics
|
||||||
|
.get_registry_vec()
|
||||||
|
.iter()
|
||||||
|
.map(|(name, value, type_name)| {
|
||||||
|
let sanitized_name = str::replace(name, "-", "_");
|
||||||
|
format!(
|
||||||
|
"# HELP {} \n# TYPE {} {}\n{}{{{}}} {}",
|
||||||
|
sanitized_name,
|
||||||
|
sanitized_name,
|
||||||
|
type_name,
|
||||||
|
sanitized_name,
|
||||||
|
label_strings_vec.join(","),
|
||||||
|
value
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
Ok(format!("{}\n", lines.join("\n")))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn with_metrics(
|
||||||
|
metrics: Metrics,
|
||||||
|
) -> impl Filter<Extract = (Metrics,), Error = std::convert::Infallible> + Clone {
|
||||||
|
warp::any().map(move || metrics.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
|
pub struct MetricsConfig {
|
||||||
|
pub output_stdout: bool,
|
||||||
|
pub output_http: bool,
|
||||||
|
// TODO: add configurable port and endpoint url
|
||||||
|
// TODO: add configurable write interval
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub fn start(config: MetricsConfig, process_name: String) -> Metrics {
|
||||||
|
let mut write_interval = time::interval(time::Duration::from_secs(60));
|
||||||
|
|
||||||
|
let registry = Arc::new(RwLock::new(HashMap::<String, Value>::new()));
|
||||||
|
let registry_c = Arc::clone(®istry);
|
||||||
|
let labels = HashMap::from([(String::from("process"), process_name)]);
|
||||||
|
let metrics_tx = Metrics { registry, labels };
|
||||||
|
let metrics_route = warp::path!("metrics")
|
||||||
|
.and(with_metrics(metrics_tx.clone()))
|
||||||
|
.and_then(handle_prometheus_poll);
|
||||||
|
|
||||||
|
if config.output_http {
|
||||||
|
// serve prometheus metrics endpoint
|
||||||
|
tokio::spawn(async move {
|
||||||
|
warp::serve(metrics_route).run(([0, 0, 0, 0], 9091)).await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.output_stdout {
|
||||||
|
// periodically log to stdout
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut previous_values = HashMap::<String, PrevValue>::new();
|
||||||
|
loop {
|
||||||
|
write_interval.tick().await;
|
||||||
|
|
||||||
|
// Nested locking! Safe because the only other user locks registry for writing and doesn't
|
||||||
|
// acquire any interior locks.
|
||||||
|
let metrics = registry_c.read().unwrap();
|
||||||
|
for (name, value) in metrics.iter() {
|
||||||
|
let previous_value = previous_values.get_mut(name);
|
||||||
|
match value {
|
||||||
|
Value::U64 {
|
||||||
|
value: v,
|
||||||
|
metric_type: _,
|
||||||
|
} => {
|
||||||
|
let new_value = v.load(atomic::Ordering::Acquire);
|
||||||
|
let previous_value = if let Some(PrevValue::U64(v)) = previous_value {
|
||||||
|
let prev = *v;
|
||||||
|
*v = new_value;
|
||||||
|
prev
|
||||||
|
} else {
|
||||||
|
previous_values.insert(name.clone(), PrevValue::U64(new_value));
|
||||||
|
0
|
||||||
|
};
|
||||||
|
let diff = new_value.wrapping_sub(previous_value) as i64;
|
||||||
|
info!("metric: {}: {} ({:+})", name, new_value, diff);
|
||||||
|
}
|
||||||
|
Value::I64 {
|
||||||
|
value: v,
|
||||||
|
metric_type: _,
|
||||||
|
} => {
|
||||||
|
let new_value = v.load(atomic::Ordering::Acquire);
|
||||||
|
let previous_value = if let Some(PrevValue::I64(v)) = previous_value {
|
||||||
|
let prev = *v;
|
||||||
|
*v = new_value;
|
||||||
|
prev
|
||||||
|
} else {
|
||||||
|
previous_values.insert(name.clone(), PrevValue::I64(new_value));
|
||||||
|
0
|
||||||
|
};
|
||||||
|
let diff = new_value - previous_value;
|
||||||
|
info!("metric: {}: {} ({:+})", name, new_value, diff);
|
||||||
|
}
|
||||||
|
Value::Bool {
|
||||||
|
value: v,
|
||||||
|
metric_type: _,
|
||||||
|
} => {
|
||||||
|
let new_value = v.lock().unwrap();
|
||||||
|
let previous_value = if let Some(PrevValue::Bool(v)) = previous_value {
|
||||||
|
let mut prev = new_value.clone();
|
||||||
|
std::mem::swap(&mut prev, v);
|
||||||
|
prev
|
||||||
|
} else {
|
||||||
|
previous_values
|
||||||
|
.insert(name.clone(), PrevValue::Bool(new_value.clone()));
|
||||||
|
false
|
||||||
|
};
|
||||||
|
if *new_value == previous_value {
|
||||||
|
info!("metric: {}: {} (unchanged)", name, &*new_value);
|
||||||
|
} else {
|
||||||
|
info!(
|
||||||
|
"metric: {}: {} (before: {})",
|
||||||
|
name, &*new_value, previous_value
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics_tx
|
||||||
|
}
|
Loading…
Reference in New Issue