move to mango_feeds_connector

This commit is contained in:
Maximilian Schneider 2023-04-05 19:06:58 +02:00
parent 1c0ba3c409
commit 241e6bfd21
11 changed files with 1301 additions and 1813 deletions

1463
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -29,6 +29,7 @@ dashmap = "5.4.0"
mango = { git = "https://github.com/blockworks-foundation/mango-v3.git", tag = "v3.6.0", default-features = false }
mango-common = { git = "https://github.com/blockworks-foundation/mango-v3.git", tag = "v3.6.0" }
mango-feeds-connector = { git = "https://github.com/blockworks-foundation/mango-feeds.git", branch = "ckamm/solana-versions2", default-features = false, features = ["solana-1-15"] }
yellowstone-grpc-proto = "1.0.1"
solana-client = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }

View File

@ -1,146 +0,0 @@
use crate::{
chain_data::{AccountData, AccountWrite, ChainData, SlotData, SlotUpdate},
metrics::Metrics,
};
use async_trait::async_trait;
use log::*;
use solana_sdk::{account::WritableAccount, pubkey::Pubkey, stake_history::Epoch};
use std::{
collections::{BTreeSet, HashMap},
sync::Arc,
time::{Duration, Instant},
};
#[async_trait]
pub trait AccountWriteSink {
async fn process(&self, pubkey: &Pubkey, account: &AccountData) -> Result<(), String>;
}
#[derive(Clone)]
pub struct AccountWriteRoute {
pub matched_pubkeys: Vec<Pubkey>,
pub sink: Arc<dyn AccountWriteSink + Send + Sync>,
pub timeout_interval: Duration,
}
#[derive(Clone, Debug)]
struct AcountWriteRecord {
slot: u64,
write_version: u64,
timestamp: Instant,
}
pub fn init(
routes: Vec<AccountWriteRoute>,
metrics_sender: Metrics,
) -> anyhow::Result<(
async_channel::Sender<AccountWrite>,
async_channel::Sender<SlotUpdate>,
)> {
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
let (account_write_queue_sender, account_write_queue_receiver) =
async_channel::unbounded::<AccountWrite>();
// Slot updates flowing from the outside into the single processing thread. From
// there they'll flow into the postgres sending thread.
let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>();
let mut chain_data = ChainData::new(metrics_sender);
let mut last_updated = HashMap::<String, AcountWriteRecord>::new();
let all_queue_pks: BTreeSet<Pubkey> = routes
.iter()
.flat_map(|r| r.matched_pubkeys.iter())
.map(|pk| pk.clone())
.collect();
// update handling thread, reads both sloths and account updates
tokio::spawn(async move {
loop {
tokio::select! {
Ok(account_write) = account_write_queue_receiver.recv() => {
if !all_queue_pks.contains(&account_write.pubkey) {
trace!("account write skipped {:?}", account_write.pubkey);
continue;
}
trace!("account write processed {:?}", account_write.pubkey);
chain_data.update_account(
account_write.pubkey,
AccountData {
slot: account_write.slot,
write_version: account_write.write_version,
account: WritableAccount::create(
account_write.lamports,
account_write.data.clone(),
account_write.owner,
account_write.executable,
account_write.rent_epoch as Epoch,
),
},
);
}
Ok(slot_update) = slot_queue_receiver.recv() => {
trace!("slot {:?}", slot_update);
chain_data.update_slot(SlotData {
slot: slot_update.slot,
parent: slot_update.parent,
status: slot_update.status,
chain: 0,
});
}
}
trace!("propagate chain data downstream");
for route in routes.iter() {
for pk in route.matched_pubkeys.iter() {
match chain_data.account(&pk) {
Ok(account_info) => {
let pk_b58 = pk.to_string();
if let Some(record) = last_updated.get(&pk_b58) {
let is_unchanged = account_info.slot == record.slot
&& account_info.write_version == record.write_version;
let is_throttled =
record.timestamp.elapsed() < route.timeout_interval;
if is_unchanged || is_throttled {
trace!("skipped is_unchanged={is_unchanged} is_throttled={is_throttled} {pk_b58}");
continue;
}
};
trace!("process {pk_b58}");
match route.sink.process(pk, account_info).await {
Ok(()) => {
// todo: metrics
last_updated.insert(
pk_b58.clone(),
AcountWriteRecord {
slot: account_info.slot,
write_version: account_info.write_version,
timestamp: Instant::now(),
},
);
}
Err(_skip_reason) => {
// todo: metrics
}
}
}
Err(_) => {
// todo: metrics
}
}
}
}
}
});
Ok((account_write_queue_sender, slot_queue_sender))
}

View File

@ -1,268 +0,0 @@
use crate::metrics::{MetricType, MetricU64, Metrics};
use {
solana_sdk::{
account::{Account, AccountSharedData, ReadableAccount},
pubkey::Pubkey,
},
std::collections::HashMap,
};
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum SlotStatus {
Rooted,
Confirmed,
Processed,
}
#[derive(Clone, Debug)]
pub struct SlotData {
pub slot: u64,
pub parent: Option<u64>,
pub status: SlotStatus,
pub chain: u64, // the top slot that this is in a chain with. uncles will have values < tip
}
#[derive(Clone, Debug)]
pub struct AccountData {
pub slot: u64,
pub write_version: u64,
pub account: AccountSharedData,
}
#[derive(Clone, PartialEq, Debug)]
pub struct AccountWrite {
pub pubkey: Pubkey,
pub slot: u64,
pub write_version: u64,
pub lamports: u64,
pub owner: Pubkey,
pub executable: bool,
pub rent_epoch: u64,
pub data: Vec<u8>,
pub is_selected: bool,
}
impl AccountWrite {
pub fn from(pubkey: Pubkey, slot: u64, write_version: u64, account: Account) -> AccountWrite {
AccountWrite {
pubkey,
slot: slot,
write_version,
lamports: account.lamports,
owner: account.owner,
executable: account.executable,
rent_epoch: account.rent_epoch,
data: account.data,
is_selected: true,
}
}
}
#[derive(Clone, Debug)]
pub struct SlotUpdate {
pub slot: u64,
pub parent: Option<u64>,
pub status: SlotStatus,
}
/// Track slots and account writes
///
/// - use account() to retrieve the current best data for an account.
/// - update_from_snapshot() and update_from_websocket() update the state for new messages
pub struct ChainData {
/// only slots >= newest_rooted_slot are retained
slots: HashMap<u64, SlotData>,
/// writes to accounts, only the latest rooted write an newer are retained
accounts: HashMap<Pubkey, Vec<AccountData>>,
newest_rooted_slot: u64,
newest_processed_slot: u64,
account_versions_stored: usize,
account_bytes_stored: usize,
metric_accounts_stored: MetricU64,
metric_account_versions_stored: MetricU64,
metric_account_bytes_stored: MetricU64,
}
impl ChainData {
pub fn new(metrics_sender: Metrics) -> Self {
Self {
slots: HashMap::new(),
accounts: HashMap::new(),
newest_rooted_slot: 0,
newest_processed_slot: 0,
account_versions_stored: 0,
account_bytes_stored: 0,
metric_accounts_stored: metrics_sender
.register_u64("chaindata_accounts_stored".into(), MetricType::Gauge),
metric_account_versions_stored: metrics_sender.register_u64(
"chaindata_account_versions_stored".into(),
MetricType::Gauge,
),
metric_account_bytes_stored: metrics_sender
.register_u64("chaindata_account_bytes_stored".into(), MetricType::Gauge),
}
}
pub fn update_slot(&mut self, new_slot: SlotData) {
let new_processed_head = new_slot.slot > self.newest_processed_slot;
if new_processed_head {
self.newest_processed_slot = new_slot.slot;
}
let new_rooted_head =
new_slot.slot > self.newest_rooted_slot && new_slot.status == SlotStatus::Rooted;
if new_rooted_head {
self.newest_rooted_slot = new_slot.slot;
}
let mut parent_update = false;
use std::collections::hash_map::Entry;
match self.slots.entry(new_slot.slot) {
Entry::Vacant(v) => {
v.insert(new_slot);
}
Entry::Occupied(o) => {
let v = o.into_mut();
parent_update = v.parent != new_slot.parent && new_slot.parent.is_some();
v.parent = v.parent.or(new_slot.parent);
v.status = new_slot.status;
}
};
if new_processed_head || parent_update {
// update the "chain" field down to the first rooted slot
let mut slot = self.newest_processed_slot;
loop {
if let Some(data) = self.slots.get_mut(&slot) {
data.chain = self.newest_processed_slot;
if data.status == SlotStatus::Rooted {
break;
}
if let Some(parent) = data.parent {
slot = parent;
continue;
}
}
break;
}
}
if new_rooted_head {
// for each account, preserve only writes > newest_rooted_slot, or the newest
// rooted write
self.account_versions_stored = 0;
self.account_bytes_stored = 0;
for (_, writes) in self.accounts.iter_mut() {
let newest_rooted_write = writes
.iter()
.rev()
.find(|w| {
w.slot <= self.newest_rooted_slot
&& self
.slots
.get(&w.slot)
.map(|s| {
// sometimes we seem not to get notifications about slots
// getting rooted, hence assume non-uncle slots < newest_rooted_slot
// are rooted too
s.status == SlotStatus::Rooted
|| s.chain == self.newest_processed_slot
})
// preserved account writes for deleted slots <= newest_rooted_slot
// are expected to be rooted
.unwrap_or(true)
})
.map(|w| w.slot)
// no rooted write found: produce no effect, since writes > newest_rooted_slot are retained anyway
.unwrap_or(self.newest_rooted_slot + 1);
writes
.retain(|w| w.slot == newest_rooted_write || w.slot > self.newest_rooted_slot);
self.account_versions_stored += writes.len();
self.account_bytes_stored += writes
.iter()
.map(|w| w.account.data().len())
.fold(0, |acc, l| acc + l)
}
// now it's fine to drop any slots before the new rooted head
// as account writes for non-rooted slots before it have been dropped
self.slots.retain(|s, _| *s >= self.newest_rooted_slot);
self.metric_accounts_stored.set(self.accounts.len() as u64);
self.metric_account_versions_stored
.set(self.account_versions_stored as u64);
self.metric_account_bytes_stored
.set(self.account_bytes_stored as u64);
}
}
pub fn update_account(&mut self, pubkey: Pubkey, account: AccountData) {
use std::collections::hash_map::Entry;
match self.accounts.entry(pubkey) {
Entry::Vacant(v) => {
self.account_versions_stored += 1;
self.account_bytes_stored += account.account.data().len();
v.insert(vec![account]);
}
Entry::Occupied(o) => {
let v = o.into_mut();
// v is ordered by slot ascending. find the right position
// overwrite if an entry for the slot already exists, otherwise insert
let rev_pos = v
.iter()
.rev()
.position(|d| d.slot <= account.slot)
.unwrap_or(v.len());
let pos = v.len() - rev_pos;
if pos < v.len() && v[pos].slot == account.slot {
if v[pos].write_version < account.write_version {
v[pos] = account;
}
} else {
self.account_versions_stored += 1;
self.account_bytes_stored += account.account.data().len();
v.insert(pos, account);
}
}
};
}
fn is_account_write_live(&self, write: &AccountData) -> bool {
self.slots
.get(&write.slot)
// either the slot is rooted or in the current chain
.map(|s| s.status == SlotStatus::Rooted || s.chain == self.newest_processed_slot)
// if the slot can't be found but preceeds newest rooted, use it too (old rooted slots are removed)
.unwrap_or(
write.slot <= self.newest_rooted_slot || write.slot > self.newest_processed_slot,
)
}
/// Cloned snapshot of all the most recent live writes per pubkey
pub fn accounts_snapshot(&self) -> HashMap<Pubkey, AccountData> {
self.accounts
.iter()
.filter_map(|(pubkey, writes)| {
let latest_good_write = writes
.iter()
.rev()
.find(|w| self.is_account_write_live(w))?;
Some((pubkey.clone(), latest_good_write.clone()))
})
.collect()
}
/// Ref to the most recent live write of the pubkey
pub fn account<'a>(&'a self, pubkey: &Pubkey) -> anyhow::Result<&'a AccountData> {
self.accounts
.get(pubkey)
.ok_or(anyhow::anyhow!("account {} not found", pubkey))?
.iter()
.rev()
.find(|w| self.is_account_write_live(w))
.ok_or(anyhow::anyhow!("account {} has no live data", pubkey))
}
}

View File

@ -1,14 +1,19 @@
use crate::{
account_write_filter::{self, AccountWriteRoute},
grpc_plugin_source::FilterConfig,
helpers::to_sp_pk,
mango::GroupConfig,
mango_v3_perp_crank_sink::MangoV3PerpCrankSink,
metrics,
states::{KeeperInstruction, TransactionSendRecord},
tpu_manager::TpuManager,
websocket_source::{self, KeeperConfig},
};
use mango_feeds_connector::{
account_write_filter::{self, AccountWriteRoute},
FilterConfig,
websocket_source,
metrics, SourceConfig,
SnapshotSourceConfig, MetricsConfig,
};
use async_channel::unbounded;
use chrono::Utc;
use log::*;
@ -26,6 +31,16 @@ use std::{
};
use tokio::sync::RwLock;
#[derive(Debug, Clone)]
pub struct KeeperConfig {
pub program_id: Pubkey,
pub rpc_url: String,
pub websocket_url: String,
}
pub fn start(
config: KeeperConfig,
exit_signal: Arc<AtomicBool>,
@ -49,7 +64,7 @@ pub fn start(
let group_pk = Pubkey::from_str(&group.public_key).unwrap();
let cache_pk = Pubkey::from_str(&group.cache_key).unwrap();
let mango_program_id = Pubkey::from_str(&group.mango_program_id).unwrap();
let filter_config = FilterConfig {
let _filter_config = FilterConfig {
program_ids: vec![group.mango_program_id.clone()],
account_ids: group
.perp_markets
@ -98,7 +113,7 @@ pub fn start(
tokio::spawn(async move {
let metrics_tx = metrics::start(
metrics::MetricsConfig {
MetricsConfig {
output_stdout: true,
output_http: false,
},
@ -108,7 +123,7 @@ pub fn start(
let routes = vec![AccountWriteRoute {
matched_pubkeys: perp_queue_pks
.iter()
.map(|(_, evq_pk)| evq_pk.clone())
.map(|(_, evq_pk)| mango_feeds_connector::solana_sdk::pubkey::Pubkey::new_from_array(evq_pk.to_bytes()))
.collect(),
sink: Arc::new(MangoV3PerpCrankSink::new(
perp_queue_pks,
@ -134,8 +149,15 @@ pub fn start(
// ).await;
websocket_source::process_events(
config,
&filter_config,
&SourceConfig {
dedup_queue_size: 0,
grpc_sources: vec![],
snapshot: SnapshotSourceConfig {
rpc_http_url: config.rpc_url,
program_id: config.program_id.to_string(),
},
rpc_ws_url: config.websocket_url,
},
account_write_queue_sender,
slot_queue_sender,
)

View File

@ -1,617 +0,0 @@
use futures::stream::once;
use jsonrpc_core::futures::StreamExt;
use jsonrpc_core_client::transports::http;
use serde::Deserialize;
use solana_account_decoder::{UiAccount, UiAccountEncoding};
use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
use solana_client::rpc_response::{OptionalContext, RpcKeyedAccount};
use solana_rpc::rpc::rpc_accounts::AccountsDataClient;
use solana_rpc::rpc::rpc_accounts_scan::AccountsScanClient;
use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey};
use futures::{future, future::FutureExt};
use yellowstone_grpc_proto::tonic::{
metadata::MetadataValue,
transport::{Certificate, Channel, ClientTlsConfig, Identity},
Request,
};
use log::*;
use std::{collections::HashMap, env, str::FromStr, time::Duration};
use yellowstone_grpc_proto::prelude::{
geyser_client::GeyserClient, subscribe_update, SubscribeRequest,
SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots, SubscribeUpdate,
SubscribeUpdateSlotStatus,
};
use crate::websocket_source::KeeperConfig;
use crate::{
chain_data::{AccountWrite, SlotStatus, SlotUpdate},
metrics::{MetricType, Metrics},
AnyhowWrap,
};
#[derive(Clone, Debug, Deserialize)]
pub struct GrpcSourceConfig {
pub name: String,
pub connection_string: String,
pub retry_connection_sleep_secs: u64,
pub token: String,
}
#[derive(Clone, Debug, Deserialize)]
pub struct TlsConfig {
pub ca_cert_path: String,
pub client_cert_path: String,
pub client_key_path: String,
pub domain_name: String,
}
#[derive(Clone, Debug, Deserialize)]
pub struct FilterConfig {
pub program_ids: Vec<String>,
pub account_ids: Vec<String>,
}
#[derive(Clone, Debug, Deserialize)]
pub struct SnapshotSourceConfig {
pub rpc_http_url: String,
pub program_id: String,
}
#[derive(Clone, Debug)]
pub struct GrpcConfig {
pub dedup_queue_size: usize,
pub grpc_sources: Vec<GrpcSourceConfig>,
}
//use solana_geyser_connector_plugin_grpc::compression::zstd_decompress;
struct SnapshotData {
slot: u64,
accounts: Vec<(String, Option<UiAccount>)>,
}
enum Message {
GrpcUpdate(SubscribeUpdate),
Snapshot(SnapshotData),
}
async fn get_snapshot_gpa(
rpc_http_url: String,
program_id: String,
) -> anyhow::Result<OptionalContext<Vec<RpcKeyedAccount>>> {
let rpc_client = http::connect_with_options::<AccountsScanClient>(&rpc_http_url, true)
.await
.map_err_anyhow()?;
let account_info_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::finalized()),
data_slice: None,
min_context_slot: None,
};
let program_accounts_config = RpcProgramAccountsConfig {
filters: None,
with_context: Some(true),
account_config: account_info_config.clone(),
};
info!("requesting snapshot {}", program_id);
let account_snapshot = rpc_client
.get_program_accounts(program_id.clone(), Some(program_accounts_config.clone()))
.await
.map_err_anyhow()?;
info!("snapshot received {}", program_id);
Ok(account_snapshot)
}
async fn get_snapshot_gma(
rpc_http_url: String,
ids: Vec<String>,
) -> anyhow::Result<solana_client::rpc_response::Response<Vec<Option<UiAccount>>>> {
let rpc_client = http::connect_with_options::<AccountsDataClient>(&rpc_http_url, true)
.await
.map_err_anyhow()?;
let account_info_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::finalized()),
data_slice: None,
min_context_slot: None,
};
info!("requesting snapshot {:?}", ids);
let account_snapshot = rpc_client
.get_multiple_accounts(ids.clone(), Some(account_info_config))
.await
.map_err_anyhow()?;
info!("snapshot received {:?}", ids);
Ok(account_snapshot)
}
async fn feed_data_geyser(
grpc_config: &GrpcSourceConfig,
keeper_config: &KeeperConfig,
filter_config: &FilterConfig,
sender: async_channel::Sender<Message>,
) -> anyhow::Result<()> {
let connection_string = match &grpc_config.connection_string.chars().next().unwrap() {
'$' => env::var(&grpc_config.connection_string[1..])
.expect("reading connection string from env"),
_ => grpc_config.connection_string.clone(),
};
let rpc_http_url = match &keeper_config.rpc_url.chars().next().unwrap() {
'$' => env::var(&keeper_config.rpc_url[1..]).expect("reading connection string from env"),
_ => keeper_config.rpc_url.clone(),
};
info!("connecting to {}", connection_string);
let res = Channel::from_shared(connection_string.clone());
let endpoint = res.map(|e| {
if e.uri().scheme_str() == Some("https") {
info!("enable tls");
e.tls_config(ClientTlsConfig::new())
} else {
Ok(e)
}
})??;
let channel = endpoint.connect_lazy();
let token: MetadataValue<_> = grpc_config.token.parse()?;
let mut client = GeyserClient::with_interceptor(channel, move |mut req: Request<()>| {
req.metadata_mut().insert("x-token", token.clone());
Ok(req)
});
// If account_ids are provided, snapshot will be gMA. If only program_ids, then only the first id will be snapshot
// TODO: handle this better
if filter_config.program_ids.len() > 1 {
warn!("only one program id is supported for gPA snapshots")
}
let mut accounts = HashMap::new();
accounts.insert(
"client".to_owned(),
SubscribeRequestFilterAccounts {
account: filter_config.account_ids.clone(),
owner: filter_config.program_ids.clone(),
filters: vec![],
},
);
let mut slots = HashMap::new();
slots.insert("client".to_owned(), SubscribeRequestFilterSlots {});
let blocks = HashMap::new();
let blocks_meta = HashMap::new();
let transactions = HashMap::new();
let request = SubscribeRequest {
accounts,
blocks,
blocks_meta,
slots,
transactions,
};
info!("Going to send request: {:?}", request);
let response = client.subscribe(once(async move { request })).await?;
let mut update_stream = response.into_inner();
// We can't get a snapshot immediately since the finalized snapshot would be for a
// slot in the past and we'd be missing intermediate updates.
//
// Delay the request until the first slot we received all writes for becomes rooted
// to avoid that problem - partially. The rooted slot will still be larger than the
// finalized slot, so add a number of slots as a buffer.
//
// If that buffer isn't sufficient, there'll be a retry.
// The first slot that we will receive _all_ account writes for
let mut first_full_slot: u64 = u64::MAX;
// If a snapshot should be performed when ready.
let mut snapshot_needed = true;
// The highest "rooted" slot that has been seen.
let mut max_rooted_slot = 0;
// Data for slots will arrive out of order. This value defines how many
// slots after a slot was marked "rooted" we assume it'll not receive
// any more account write information.
//
// This is important for the write_version mapping (to know when slots can
// be dropped).
let max_out_of_order_slots = 40;
// Number of slots that we expect "finalized" commitment to lag
// behind "rooted". This matters for getProgramAccounts based snapshots,
// which will have "finalized" commitment.
let mut rooted_to_finalized_slots = 30;
let mut snapshot_gma = future::Fuse::terminated();
let mut snapshot_gpa = future::Fuse::terminated();
// The plugin sends a ping every 10s
let fatal_idle_timeout = Duration::from_secs(60);
// Highest slot that an account write came in for.
let mut newest_write_slot: u64 = 0;
struct WriteVersion {
// Write version seen on-chain
global: u64,
// The per-pubkey per-slot write version
slot: u32,
}
// map slot -> (pubkey -> WriteVersion)
//
// Since the write_version is a private indentifier per node it can't be used
// to deduplicate events from multiple nodes. Here we rewrite it such that each
// pubkey and each slot has a consecutive numbering of writes starting at 1.
//
// That number will be consistent for each node.
let mut slot_pubkey_writes = HashMap::<u64, HashMap<[u8; 32], WriteVersion>>::new();
loop {
tokio::select! {
update = update_stream.next() => {
use subscribe_update::UpdateOneof;
let mut update = update.ok_or(anyhow::anyhow!("geyser plugin has closed the stream"))??;
trace!("update={:?}", update);
match update.update_oneof.as_mut().expect("invalid grpc") {
UpdateOneof::Slot(slot_update) => {
let status = slot_update.status;
if status == SubscribeUpdateSlotStatus::Finalized as i32 {
if first_full_slot == u64::MAX {
// TODO: is this equivalent to before? what was highesy_write_slot?
first_full_slot = slot_update.slot + 1;
}
if slot_update.slot > max_rooted_slot {
max_rooted_slot = slot_update.slot;
// drop data for slots that are well beyond rooted
slot_pubkey_writes.retain(|&k, _| k >= max_rooted_slot - max_out_of_order_slots);
}
if snapshot_needed && max_rooted_slot - rooted_to_finalized_slots > first_full_slot {
snapshot_needed = false;
if filter_config.account_ids.len() > 0 {
snapshot_gma = tokio::spawn(get_snapshot_gma(rpc_http_url.clone(), filter_config.account_ids.clone())).fuse();
} else if filter_config.program_ids.len() > 0 {
snapshot_gpa = tokio::spawn(get_snapshot_gpa(rpc_http_url.clone(), filter_config.program_ids[0].clone())).fuse();
}
}
}
},
UpdateOneof::Account(info) => {
if info.slot < first_full_slot {
// Don't try to process data for slots where we may have missed writes:
// We could not map the write_version correctly for them.
continue;
}
if info.slot > newest_write_slot {
newest_write_slot = info.slot;
} else if max_rooted_slot > 0 && info.slot < max_rooted_slot - max_out_of_order_slots {
anyhow::bail!("received write {} slots back from max rooted slot {}", max_rooted_slot - info.slot, max_rooted_slot);
}
let pubkey_writes = slot_pubkey_writes.entry(info.slot).or_default();
let mut write = match info.account.clone() {
Some(x) => x,
None => {
// TODO: handle error
continue;
},
};
let pubkey_bytes: [u8;32] = write.pubkey.try_into().expect("Pubkey be of size 32 bytes");
let write_version_mapping = pubkey_writes.entry(pubkey_bytes).or_insert(WriteVersion {
global: write.write_version,
slot: 1, // write version 0 is reserved for snapshots
});
// We assume we will receive write versions for each pubkey in sequence.
// If this is not the case, logic here does not work correctly because
// a later write could arrive first.
if write.write_version < write_version_mapping.global {
anyhow::bail!("unexpected write version: got {}, expected >= {}", write.write_version, write_version_mapping.global);
}
// Rewrite the update to use the local write version and bump it
write.write_version = write_version_mapping.slot as u64;
write_version_mapping.slot += 1;
},
UpdateOneof::Ping(_) => {},
UpdateOneof::Block(_) => {},
UpdateOneof::BlockMeta(_) => {},
UpdateOneof::Transaction(_) => {},
}
sender.send(Message::GrpcUpdate(update)).await.expect("send success");
},
snapshot = &mut snapshot_gma => {
let snapshot = snapshot??;
info!("snapshot is for slot {}, first full slot was {}", snapshot.context.slot, first_full_slot);
if snapshot.context.slot >= first_full_slot {
let accounts: Vec<(String, Option<UiAccount>)> = filter_config.account_ids.iter().zip(snapshot.value).map(|x| (x.0.clone(), x.1)).collect();
sender
.send(Message::Snapshot(SnapshotData {
accounts,
slot: snapshot.context.slot,
}))
.await
.expect("send success");
} else {
info!(
"snapshot is too old: has slot {}, expected {} minimum",
snapshot.context.slot,
first_full_slot
);
// try again in another 10 slots
snapshot_needed = true;
rooted_to_finalized_slots += 10;
}
},
snapshot = &mut snapshot_gpa => {
let snapshot = snapshot??;
if let OptionalContext::Context(snapshot_data) = snapshot {
info!("snapshot is for slot {}, first full slot was {}", snapshot_data.context.slot, first_full_slot);
if snapshot_data.context.slot >= first_full_slot {
let accounts: Vec<(String, Option<UiAccount>)> = snapshot_data.value.iter().map(|x| {
let deref = x.clone();
(deref.pubkey, Some(deref.account))
}).collect();
sender
.send(Message::Snapshot(SnapshotData {
accounts,
slot: snapshot_data.context.slot,
}))
.await
.expect("send success");
} else {
info!(
"snapshot is too old: has slot {}, expected {} minimum",
snapshot_data.context.slot,
first_full_slot
);
// try again in another 10 slots
snapshot_needed = true;
rooted_to_finalized_slots += 10;
}
} else {
anyhow::bail!("bad snapshot format");
}
},
_ = tokio::time::sleep(fatal_idle_timeout) => {
anyhow::bail!("geyser plugin hasn't sent a message in too long");
}
}
}
}
fn _make_tls_config(config: &TlsConfig) -> ClientTlsConfig {
let server_root_ca_cert = match &config.ca_cert_path.chars().next().unwrap() {
'$' => env::var(&config.ca_cert_path[1..])
.expect("reading server root ca cert from env")
.into_bytes(),
_ => std::fs::read(&config.ca_cert_path).expect("reading server root ca cert from file"),
};
let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
let client_cert = match &config.client_cert_path.chars().next().unwrap() {
'$' => env::var(&config.client_cert_path[1..])
.expect("reading client cert from env")
.into_bytes(),
_ => std::fs::read(&config.client_cert_path).expect("reading client cert from file"),
};
let client_key = match &config.client_key_path.chars().next().unwrap() {
'$' => env::var(&config.client_key_path[1..])
.expect("reading client key from env")
.into_bytes(),
_ => std::fs::read(&config.client_key_path).expect("reading client key from file"),
};
let client_identity = Identity::from_pem(client_cert, client_key);
let domain_name = match &config.domain_name.chars().next().unwrap() {
'$' => env::var(&config.domain_name[1..]).expect("reading domain name from env"),
_ => config.domain_name.clone(),
};
ClientTlsConfig::new()
.ca_certificate(server_root_ca_cert)
.identity(client_identity)
.domain_name(domain_name)
}
pub async fn process_events(
grpc_config: GrpcConfig,
keeper_config: KeeperConfig,
filter_config: &FilterConfig,
account_write_queue_sender: async_channel::Sender<AccountWrite>,
slot_queue_sender: async_channel::Sender<SlotUpdate>,
metrics_sender: Metrics,
) {
// Subscribe to geyser
let (msg_sender, msg_receiver) =
async_channel::bounded::<Message>(grpc_config.dedup_queue_size);
for grpc_source in grpc_config.grpc_sources.clone() {
let msg_sender = msg_sender.clone();
let snapshot_source = keeper_config.clone();
let metrics_sender = metrics_sender.clone();
let filter_config = filter_config.clone();
tokio::spawn(async move {
let mut metric_retries = metrics_sender.register_u64(
format!("grpc_source_{}_connection_retries", grpc_source.name,),
MetricType::Counter,
);
let metric_connected =
metrics_sender.register_bool(format!("grpc_source_{}_status", grpc_source.name));
// Continuously reconnect on failure
loop {
metric_connected.set(true);
let out = feed_data_geyser(
&grpc_source,
&snapshot_source,
&filter_config,
msg_sender.clone(),
);
let result = out.await;
// assert!(result.is_err());
if let Err(err) = result {
warn!(
"error during communication with the geyser plugin. retrying. {:?}",
err
);
}
metric_connected.set(false);
metric_retries.increment();
tokio::time::sleep(std::time::Duration::from_secs(
grpc_source.retry_connection_sleep_secs,
))
.await;
}
});
}
// slot -> (pubkey -> write_version)
//
// To avoid unnecessarily sending requests to SQL, we track the latest write_version
// for each (slot, pubkey). If an already-seen write_version comes in, it can be safely
// discarded.
let mut latest_write = HashMap::<u64, HashMap<[u8; 32], u64>>::new();
// Number of slots to retain in latest_write
let latest_write_retention = 50;
let mut metric_account_writes =
metrics_sender.register_u64("grpc_account_writes".into(), MetricType::Counter);
let mut metric_account_queue =
metrics_sender.register_u64("grpc_account_write_queue".into(), MetricType::Gauge);
let mut metric_dedup_queue =
metrics_sender.register_u64("grpc_dedup_queue".into(), MetricType::Gauge);
let mut metric_slot_queue =
metrics_sender.register_u64("grpc_slot_update_queue".into(), MetricType::Gauge);
let mut metric_slot_updates =
metrics_sender.register_u64("grpc_slot_updates".into(), MetricType::Counter);
let mut metric_snapshots =
metrics_sender.register_u64("grpc_snapshots".into(), MetricType::Counter);
let mut metric_snapshot_account_writes =
metrics_sender.register_u64("grpc_snapshot_account_writes".into(), MetricType::Counter);
loop {
metric_dedup_queue.set(msg_receiver.len() as u64);
let msg = msg_receiver.recv().await.expect("sender must not close");
use subscribe_update::UpdateOneof;
match msg {
Message::GrpcUpdate(update) => {
match update.update_oneof.expect("invalid grpc") {
UpdateOneof::Account(info) => {
let update = match info.account.clone() {
Some(x) => x,
None => {
// TODO: handle error
continue;
}
};
assert!(update.pubkey.len() == 32);
assert!(update.owner.len() == 32);
metric_account_writes.increment();
metric_account_queue.set(account_write_queue_sender.len() as u64);
// Skip writes that a different server has already sent
let pubkey_writes = latest_write.entry(info.slot).or_default();
let pubkey_bytes: [u8; 32] = update
.pubkey
.clone()
.try_into()
.expect("Pubkey should be of size 32 bytes");
let writes = pubkey_writes.entry(pubkey_bytes).or_insert(0);
if update.write_version <= *writes {
continue;
}
*writes = update.write_version;
latest_write.retain(|&k, _| k >= info.slot - latest_write_retention);
// let mut uncompressed: Vec<u8> = Vec::new();
// zstd_decompress(&update.data, &mut uncompressed).unwrap();
account_write_queue_sender
.send(AccountWrite {
pubkey: Pubkey::new_from_array(
update
.pubkey
.try_into()
.expect("Pubkey be of size 32 bytes"),
),
slot: info.slot,
write_version: update.write_version,
lamports: update.lamports,
owner: Pubkey::new_from_array(
update.owner.try_into().expect("Pubkey be of size 32 bytes"),
),
executable: update.executable,
rent_epoch: update.rent_epoch,
data: update.data,
// TODO: what should this be? related to account deletes?
is_selected: true,
})
.await
.expect("send success");
}
UpdateOneof::Slot(update) => {
metric_slot_updates.increment();
metric_slot_queue.set(slot_queue_sender.len() as u64);
let status =
SubscribeUpdateSlotStatus::from_i32(update.status).map(|v| match v {
SubscribeUpdateSlotStatus::Processed => SlotStatus::Processed,
SubscribeUpdateSlotStatus::Confirmed => SlotStatus::Confirmed,
SubscribeUpdateSlotStatus::Finalized => SlotStatus::Rooted,
});
if status.is_none() {
error!("unexpected slot status: {}", update.status);
continue;
}
let slot_update = SlotUpdate {
slot: update.slot,
parent: update.parent,
status: status.expect("qed"),
};
slot_queue_sender
.send(slot_update)
.await
.expect("send success");
}
UpdateOneof::Ping(_) => {}
UpdateOneof::Block(_) => {}
UpdateOneof::BlockMeta(_) => {}
UpdateOneof::Transaction(_) => {}
}
}
Message::Snapshot(update) => {
metric_snapshots.increment();
info!("processing snapshot...");
for account in update.accounts.iter() {
metric_snapshot_account_writes.increment();
metric_account_queue.set(account_write_queue_sender.len() as u64);
match account {
(key, Some(ui_account)) => {
// TODO: Resnapshot on invalid data?
let pubkey = Pubkey::from_str(key).unwrap();
let account: Account = ui_account.decode().unwrap();
account_write_queue_sender
.send(AccountWrite::from(pubkey, update.slot, 0, account))
.await
.expect("send success");
}
(key, None) => warn!("account not found {}", key),
}
}
info!("processing snapshot done");
}
}
}
}

View File

@ -1,21 +1,17 @@
pub mod account_write_filter;
pub mod chain_data;
pub mod cli;
pub mod confirmation_strategies;
pub mod crank;
pub mod grpc_plugin_source;
pub mod helpers;
pub mod keeper;
pub mod mango;
pub mod mango_v3_perp_crank_sink;
pub mod market_markers;
pub mod metrics;
pub mod result_writer;
pub mod rotating_queue;
pub mod states;
pub mod stats;
pub mod tpu_manager;
pub mod websocket_source;
trait AnyhowWrap {
type Value;

View File

@ -4,7 +4,7 @@ use {
mango_simulation::{
cli,
confirmation_strategies::confirmations_by_blocks,
crank,
crank::{self, KeeperConfig},
helpers::{
get_latest_blockhash, get_mango_market_perps_cache, start_blockhash_polling_service,
to_sdk_pk,
@ -16,7 +16,6 @@ use {
states::PerpMarketCache,
stats::MangoSimulationStats,
tpu_manager::TpuManager,
websocket_source::KeeperConfig,
},
solana_client::{nonblocking::rpc_client::RpcClient as NbRpcClient, rpc_client::RpcClient},
solana_program::pubkey::Pubkey,

View File

@ -8,14 +8,17 @@ use mango::{
instruction::consume_events,
queue::{AnyEvent, EventQueueHeader, EventType, FillEvent, OutEvent, Queue},
};
use solana_sdk::account::ReadableAccount;
use mango_feeds_connector::solana_sdk::account::ReadableAccount;
use solana_sdk::{instruction::Instruction, pubkey::Pubkey};
use bytemuck::cast_ref;
use crate::{
use mango_feeds_connector::{
account_write_filter::AccountWriteSink,
chain_data::AccountData,
};
use crate::{
helpers::{to_sdk_instruction, to_sp_pk},
};
@ -58,7 +61,7 @@ type EventQueueEvents = [AnyEvent; QUEUE_LEN];
#[async_trait]
impl AccountWriteSink for MangoV3PerpCrankSink {
async fn process(&self, pk: &Pubkey, account: &AccountData) -> Result<(), String> {
async fn process(&self, pk: &mango_feeds_connector::solana_sdk::pubkey::Pubkey, account: &AccountData) -> Result<(), String> {
let account = &account.account;
let (ix, mkt_pk): (Result<Instruction, String>, Pubkey) = {
@ -109,9 +112,11 @@ impl AccountWriteSink for MangoV3PerpCrankSink {
)
.collect();
let pk = solana_sdk::pubkey::Pubkey::new_from_array(pk.to_bytes());
let mkt_pk = self
.mkt_pks_by_evq_pks
.get(pk)
.get(&pk)
.expect(&format!("{pk:?} is a known public key"));
let ix = to_sdk_instruction(
@ -120,7 +125,7 @@ impl AccountWriteSink for MangoV3PerpCrankSink {
&to_sp_pk(&self.group_pk),
&to_sp_pk(&self.cache_pk),
&to_sp_pk(mkt_pk),
&to_sp_pk(pk),
&to_sp_pk(&pk),
&mut mango_accounts,
MAX_EVENTS_PER_TX,
)

View File

@ -1,337 +0,0 @@
use serde::Deserialize;
use {
log::*,
std::collections::HashMap,
std::fmt,
std::sync::{atomic, Arc, Mutex, RwLock},
tokio::time,
warp::{Filter, Rejection, Reply},
};
#[derive(Debug)]
enum Value {
U64 {
value: Arc<atomic::AtomicU64>,
metric_type: MetricType,
},
I64 {
value: Arc<atomic::AtomicI64>,
metric_type: MetricType,
},
Bool {
value: Arc<Mutex<bool>>,
metric_type: MetricType,
},
}
#[derive(Debug, Clone)]
pub enum MetricType {
Counter,
Gauge,
}
impl fmt::Display for MetricType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
MetricType::Counter => {
write!(f, "counter")
}
MetricType::Gauge => {
write!(f, "gauge")
}
}
}
}
#[derive(Debug)]
enum PrevValue {
U64(u64),
I64(i64),
Bool(bool),
}
#[derive(Clone)]
pub struct MetricU64 {
value: Arc<atomic::AtomicU64>,
}
impl MetricU64 {
pub fn value(&self) -> u64 {
self.value.load(atomic::Ordering::Acquire)
}
pub fn set(&mut self, value: u64) {
self.value.store(value, atomic::Ordering::Release);
}
pub fn set_max(&mut self, value: u64) {
self.value.fetch_max(value, atomic::Ordering::AcqRel);
}
pub fn add(&mut self, value: u64) {
self.value.fetch_add(value, atomic::Ordering::AcqRel);
}
pub fn increment(&mut self) {
self.value.fetch_add(1, atomic::Ordering::AcqRel);
}
pub fn decrement(&mut self) {
self.value.fetch_sub(1, atomic::Ordering::AcqRel);
}
}
#[derive(Clone)]
pub struct MetricI64 {
value: Arc<atomic::AtomicI64>,
}
impl MetricI64 {
pub fn set(&mut self, value: i64) {
self.value.store(value, atomic::Ordering::Release);
}
pub fn increment(&mut self) {
self.value.fetch_add(1, atomic::Ordering::AcqRel);
}
pub fn decrement(&mut self) {
self.value.fetch_sub(1, atomic::Ordering::AcqRel);
}
}
#[derive(Clone)]
pub struct MetricBool {
value: Arc<Mutex<bool>>,
}
impl MetricBool {
pub fn set(&self, value: bool) {
*self.value.lock().unwrap() = value;
}
}
#[derive(Clone)]
pub struct Metrics {
registry: Arc<RwLock<HashMap<String, Value>>>,
labels: HashMap<String, String>,
}
impl Metrics {
pub fn register_u64(&self, name: String, metric_type: MetricType) -> MetricU64 {
let mut registry = self.registry.write().unwrap();
let value = registry.entry(name).or_insert(Value::U64 {
value: Arc::new(atomic::AtomicU64::new(0)),
metric_type: metric_type,
});
MetricU64 {
value: match value {
Value::U64 {
value: v,
metric_type: _,
} => v.clone(),
_ => panic!("bad metric type"),
},
}
}
pub fn register_i64(&self, name: String, metric_type: MetricType) -> MetricI64 {
let mut registry = self.registry.write().unwrap();
let value = registry.entry(name).or_insert(Value::I64 {
value: Arc::new(atomic::AtomicI64::new(0)),
metric_type: metric_type,
});
MetricI64 {
value: match value {
Value::I64 {
value: v,
metric_type: _,
} => v.clone(),
_ => panic!("bad metric type"),
},
}
}
pub fn register_bool(&self, name: String) -> MetricBool {
let mut registry = self.registry.write().unwrap();
let value = registry.entry(name).or_insert(Value::Bool {
value: Arc::new(Mutex::new(false)),
metric_type: MetricType::Gauge,
});
MetricBool {
value: match value {
Value::Bool {
value: v,
metric_type: _,
} => v.clone(),
_ => panic!("bad metric type"),
},
}
}
pub fn get_registry_vec(&self) -> Vec<(String, String, String)> {
let mut vec: Vec<(String, String, String)> = Vec::new();
let metrics = self.registry.read().unwrap();
for (name, value) in metrics.iter() {
let (value_str, type_str) = match value {
Value::U64 {
value: v,
metric_type: t,
} => (
format!("{}", v.load(atomic::Ordering::Acquire)),
t.to_string(),
),
Value::I64 {
value: v,
metric_type: t,
} => (
format!("{}", v.load(atomic::Ordering::Acquire)),
t.to_string(),
),
Value::Bool {
value: v,
metric_type: t,
} => {
let bool_to_int = if *v.lock().unwrap() { 1 } else { 0 };
(format!("{}", bool_to_int), t.to_string())
}
};
vec.push((name.clone(), value_str, type_str));
}
vec
}
}
async fn handle_prometheus_poll(metrics: Metrics) -> Result<impl Reply, Rejection> {
debug!("handle_prometheus_poll");
let label_strings_vec: Vec<String> = metrics
.labels
.iter()
.map(|(name, value)| format!("{}=\"{}\"", name, value))
.collect();
let lines: Vec<String> = metrics
.get_registry_vec()
.iter()
.map(|(name, value, type_name)| {
let sanitized_name = str::replace(name, "-", "_");
format!(
"# HELP {} \n# TYPE {} {}\n{}{{{}}} {}",
sanitized_name,
sanitized_name,
type_name,
sanitized_name,
label_strings_vec.join(","),
value
)
})
.collect();
Ok(format!("{}\n", lines.join("\n")))
}
pub fn with_metrics(
metrics: Metrics,
) -> impl Filter<Extract = (Metrics,), Error = std::convert::Infallible> + Clone {
warp::any().map(move || metrics.clone())
}
#[derive(Clone, Debug, Deserialize)]
pub struct MetricsConfig {
pub output_stdout: bool,
pub output_http: bool,
// TODO: add configurable port and endpoint url
// TODO: add configurable write interval
}
pub fn start(config: MetricsConfig, process_name: String) -> Metrics {
let mut write_interval = time::interval(time::Duration::from_secs(60));
let registry = Arc::new(RwLock::new(HashMap::<String, Value>::new()));
let registry_c = Arc::clone(&registry);
let labels = HashMap::from([(String::from("process"), process_name)]);
let metrics_tx = Metrics { registry, labels };
let metrics_route = warp::path!("metrics")
.and(with_metrics(metrics_tx.clone()))
.and_then(handle_prometheus_poll);
if config.output_http {
// serve prometheus metrics endpoint
tokio::spawn(async move {
warp::serve(metrics_route).run(([0, 0, 0, 0], 9091)).await;
});
}
if config.output_stdout {
// periodically log to stdout
tokio::spawn(async move {
let mut previous_values = HashMap::<String, PrevValue>::new();
loop {
write_interval.tick().await;
// Nested locking! Safe because the only other user locks registry for writing and doesn't
// acquire any interior locks.
let metrics = registry_c.read().unwrap();
for (name, value) in metrics.iter() {
let previous_value = previous_values.get_mut(name);
match value {
Value::U64 {
value: v,
metric_type: _,
} => {
let new_value = v.load(atomic::Ordering::Acquire);
let previous_value = if let Some(PrevValue::U64(v)) = previous_value {
let prev = *v;
*v = new_value;
prev
} else {
previous_values.insert(name.clone(), PrevValue::U64(new_value));
0
};
let diff = new_value.wrapping_sub(previous_value) as i64;
info!("metric: {}: {} ({:+})", name, new_value, diff);
}
Value::I64 {
value: v,
metric_type: _,
} => {
let new_value = v.load(atomic::Ordering::Acquire);
let previous_value = if let Some(PrevValue::I64(v)) = previous_value {
let prev = *v;
*v = new_value;
prev
} else {
previous_values.insert(name.clone(), PrevValue::I64(new_value));
0
};
let diff = new_value - previous_value;
info!("metric: {}: {} ({:+})", name, new_value, diff);
}
Value::Bool {
value: v,
metric_type: _,
} => {
let new_value = v.lock().unwrap();
let previous_value = if let Some(PrevValue::Bool(v)) = previous_value {
let mut prev = new_value.clone();
std::mem::swap(&mut prev, v);
prev
} else {
previous_values
.insert(name.clone(), PrevValue::Bool(new_value.clone()));
false
};
if *new_value == previous_value {
info!("metric: {}: {} (unchanged)", name, &*new_value);
} else {
info!(
"metric: {}: {} (before: {})",
name, &*new_value, previous_value
);
}
}
}
}
}
});
}
metrics_tx
}

View File

@ -1,218 +0,0 @@
use jsonrpc_core::futures::StreamExt;
use jsonrpc_core_client::transports::{http, ws};
use solana_account_decoder::UiAccountEncoding;
use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_response::{OptionalContext, Response, RpcKeyedAccount},
};
use solana_rpc::{rpc::rpc_accounts_scan::AccountsScanClient, rpc_pubsub::RpcSolPubSubClient};
use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey};
use log::*;
use std::{
collections::HashSet,
str::FromStr,
sync::Arc,
time::{Duration, Instant},
};
use crate::{
chain_data::{AccountWrite, SlotStatus, SlotUpdate},
grpc_plugin_source::FilterConfig,
AnyhowWrap,
};
enum WebsocketMessage {
SingleUpdate(Response<RpcKeyedAccount>),
SnapshotUpdate(Response<Vec<RpcKeyedAccount>>),
SlotUpdate(Arc<solana_client::rpc_response::SlotUpdate>),
}
#[derive(Debug, Clone)]
pub struct KeeperConfig {
pub program_id: Pubkey,
pub rpc_url: String,
pub websocket_url: String,
}
// TODO: the reconnecting should be part of this
async fn feed_data(
config: KeeperConfig,
_filter_config: &FilterConfig,
sender: async_channel::Sender<WebsocketMessage>,
) -> anyhow::Result<()> {
let program_id = config.program_id;
let snapshot_duration = Duration::from_secs(10);
info!("feed_data {config:?}");
let connect = ws::try_connect::<RpcSolPubSubClient>(&config.websocket_url).map_err_anyhow()?;
let client: RpcSolPubSubClient = connect.await.map_err_anyhow()?;
let rpc_client = http::connect_with_options::<AccountsScanClient>(&config.rpc_url, true)
.await
.map_err_anyhow()?;
let account_info_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::processed()),
data_slice: None,
min_context_slot: None,
};
let program_accounts_config = RpcProgramAccountsConfig {
filters: None,
with_context: Some(true),
account_config: account_info_config.clone(),
};
let mut update_sub = client
.program_subscribe(
program_id.to_string(),
Some(program_accounts_config.clone()),
)
.map_err_anyhow()?;
let mut slot_sub = client.slots_updates_subscribe().map_err_anyhow()?;
let mut last_snapshot = Instant::now() - snapshot_duration;
loop {
// occasionally cause a new snapshot to be produced
// including the first time
if last_snapshot + snapshot_duration <= Instant::now() {
let account_snapshot = rpc_client
.get_program_accounts(
program_id.to_string(),
Some(program_accounts_config.clone()),
)
.await
.map_err_anyhow()?;
if let OptionalContext::Context(account_snapshot_response) = account_snapshot {
info!("snapshot");
sender
.send(WebsocketMessage::SnapshotUpdate(account_snapshot_response))
.await
.expect("sending must succeed");
}
last_snapshot = Instant::now();
}
tokio::select! {
account = update_sub.next() => {
trace!("account {account:?}");
match account {
Some(account) => {
sender.send(WebsocketMessage::SingleUpdate(account.map_err_anyhow()?)).await.expect("sending must succeed");
},
None => {
warn!("account stream closed");
return Ok(());
},
}
},
slot_update = slot_sub.next() => {
trace!("slot {slot_update:?}");
match slot_update {
Some(slot_update) => {
sender.send(WebsocketMessage::SlotUpdate(slot_update.map_err_anyhow()?)).await.expect("sending must succeed");
},
None => {
warn!("slot update stream closed");
return Ok(());
},
}
},
_ = tokio::time::sleep(Duration::from_secs(60)) => {
warn!("websocket timeout");
return Ok(())
}
}
}
}
// TODO: rename / split / rework
pub async fn process_events(
config: KeeperConfig,
filter_config: &FilterConfig,
account_write_queue_sender: async_channel::Sender<AccountWrite>,
slot_queue_sender: async_channel::Sender<SlotUpdate>,
) {
let account_wl = HashSet::<String>::from_iter(filter_config.account_ids.iter().cloned());
// Subscribe to program account updates websocket
let (update_sender, update_receiver) = async_channel::unbounded::<WebsocketMessage>();
let filter_config = filter_config.clone();
tokio::spawn(async move {
// if the websocket disconnects, we get no data in a while etc, reconnect and try again
loop {
let out = feed_data(config.clone(), &filter_config, update_sender.clone());
let res = out.await;
info!("loop {res:?}");
}
});
// The thread that pulls updates and forwards them to the account write queue
loop {
let update = update_receiver.recv().await.unwrap();
match update {
WebsocketMessage::SingleUpdate(update) => {
if !account_wl.is_empty() && !account_wl.contains(&update.value.pubkey) {
continue;
}
trace!("single update");
let account: Account = update.value.account.decode().unwrap();
let pubkey = Pubkey::from_str(&update.value.pubkey).unwrap();
account_write_queue_sender
.send(AccountWrite::from(pubkey, update.context.slot, 0, account))
.await
.expect("send success");
}
WebsocketMessage::SnapshotUpdate(update) => {
trace!("snapshot update");
for keyed_account in update.value {
if !account_wl.is_empty() && !account_wl.contains(&keyed_account.pubkey) {
continue;
}
let account: Account = keyed_account.account.decode().unwrap();
let pubkey = Pubkey::from_str(&keyed_account.pubkey).unwrap();
account_write_queue_sender
.send(AccountWrite::from(pubkey, update.context.slot, 0, account))
.await
.expect("send success");
}
}
WebsocketMessage::SlotUpdate(update) => {
trace!("slot update");
let message = match *update {
solana_client::rpc_response::SlotUpdate::CreatedBank {
slot, parent, ..
} => Some(SlotUpdate {
slot,
parent: Some(parent),
status: SlotStatus::Processed,
}),
solana_client::rpc_response::SlotUpdate::OptimisticConfirmation {
slot,
..
} => Some(SlotUpdate {
slot,
parent: None,
status: SlotStatus::Confirmed,
}),
solana_client::rpc_response::SlotUpdate::Root { slot, .. } => {
Some(SlotUpdate {
slot,
parent: None,
status: SlotStatus::Rooted,
})
}
_ => None,
};
if let Some(message) = message {
slot_queue_sender.send(message).await.expect("send success");
}
}
}
}
}