Initial commit

This commit is contained in:
Christian Kamm 2022-01-07 12:34:42 +01:00
commit 943f5f29eb
7 changed files with 6537 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
target

5825
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

41
Cargo.toml Normal file
View File

@ -0,0 +1,41 @@
[package]
name = "liquidatable-accounts-feed"
version = "0.1.0"
authors = ["Christian Kamm <mail@ckamm.de>"]
edition = "2021"
[dependencies]
jsonrpc-core = "18.0.0"
jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] }
solana-rpc = "=1.8.11"
solana-client = "=1.8.11"
solana-account-decoder = "=1.8.11"
solana-sdk = "=1.8.11"
solana-logger = "=1.8.11"
tokio = { version = "1", features = ["full"] }
serde = "1.0.130"
serde_derive = "1.0.130"
serde_json = "1.0.68"
bs58 = "0.3.1"
log = "0.4"
rand = "0.7"
anyhow = "1.0"
fixed = { version = "=1.9.0", features = ["serde"] }
bytes = "1.0"
toml = "0.5"
futures = "0.3.17"
futures-core = "0.3"
futures-util = "0.3"
async-stream = "0.2"
async-channel = "1.6"
async-trait = "0.1"
[patch.crates-io]
# for gzip encoded responses
jsonrpc-core-client = { git = "https://github.com/ckamm/jsonrpc.git", branch = "ckamm/http-with-gzip" }

7
example-config.toml Normal file
View File

@ -0,0 +1,7 @@
rpc_ws_url = "ws://api.mainnet-beta.solana.com"
rpc_http_url = "https://api.mainnet-beta.solana.com"
mango_program_id = "mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68"
mango_group_id = "98pjRuQjK3qA6gXts96PqZT4Ze5QmnCmt3QYjhbUSPue"
mango_signer_id = "9BVcYqEQxyccuwznvxXqDkSJFavvTyheiTYk231T1A8S"
serum_program_id = "9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin"

292
src/main.rs Normal file
View File

@ -0,0 +1,292 @@
pub mod metrics;
pub mod websocket_source;
use {
async_trait::async_trait,
log::*,
serde_derive::Deserialize,
solana_sdk::{account::Account, pubkey::Pubkey},
std::collections::BTreeMap,
std::collections::HashMap,
std::fs::File,
std::io::Read,
std::sync::Arc,
std::str::FromStr,
};
trait AnyhowWrap {
type Value;
fn map_err_anyhow(self) -> anyhow::Result<Self::Value>;
}
impl<T, E: std::fmt::Debug> AnyhowWrap for Result<T, E> {
type Value = T;
fn map_err_anyhow(self) -> anyhow::Result<Self::Value> {
self.map_err(|err| anyhow::anyhow!("{:?}", err))
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum SlotStatus {
Rooted,
Confirmed,
Processed,
}
#[derive(Clone, Debug)]
pub struct SlotData {
pub slot: u64,
pub parent: Option<u64>,
pub status: SlotStatus,
pub chain: u64, // the top slot that this is in a chain with. uncles will have values < tip
}
#[derive(Clone, Debug)]
pub struct AccountData {
pub slot: u64,
pub account: Account,
}
#[derive(Clone, Debug, Deserialize)]
pub struct Config {
pub rpc_ws_url: String,
pub rpc_http_url: String,
pub mango_program_id: String,
pub mango_group_id: String,
pub serum_program_id: String,
pub mango_signer_id: String,
}
pub fn encode_address(addr: &Pubkey) -> String {
bs58::encode(&addr.to_bytes()).into_string()
}
#[derive(Default)]
struct ChainData {
slots: HashMap<u64, SlotData>,
accounts: HashMap<Pubkey, Vec<AccountData>>,
newest_rooted_slot: u64,
newest_processed_slot: u64,
}
impl ChainData {
fn update_slot(&mut self, new_slot: SlotData) {
let new_processed_head = new_slot.slot > self.newest_processed_slot;
if new_processed_head {
self.newest_processed_slot = new_slot.slot;
}
let new_rooted_head =
new_slot.slot > self.newest_rooted_slot && new_slot.status == SlotStatus::Rooted;
if new_rooted_head {
self.newest_rooted_slot = new_slot.slot;
}
let mut parent_update = false;
use std::collections::hash_map::Entry;
match self.slots.entry(new_slot.slot) {
Entry::Vacant(v) => {
v.insert(new_slot);
}
Entry::Occupied(o) => {
let v = o.into_mut();
parent_update = v.parent != new_slot.parent && new_slot.parent.is_some();
v.parent = v.parent.or(new_slot.parent);
v.status = new_slot.status;
}
};
if new_processed_head || parent_update {
// update the "chain" field down to the first rooted slot
let mut slot = self.newest_processed_slot;
loop {
if let Some(data) = self.slots.get_mut(&slot) {
data.chain = self.newest_processed_slot;
if data.status == SlotStatus::Rooted {
break;
}
if let Some(parent) = data.parent {
slot = parent;
continue;
}
}
break;
}
}
if new_rooted_head {
// for each account, preserve only writes > newest_rooted_slot, or the newest
// rooted write
for (_, writes) in self.accounts.iter_mut() {
let newest_rooted_write = writes
.iter()
.rev()
.find(|w| {
w.slot <= self.newest_rooted_slot
&& self
.slots
.get(&w.slot)
.map(|s| {
// sometimes we seem not to get notifications about slots
// getting rooted, hence assume non-uncle slots < newest_rooted_slot
// are rooted too
s.status == SlotStatus::Rooted
|| s.chain == self.newest_processed_slot
})
// preserved account writes for deleted slots <= newest_rooted_slot
// are expected to be rooted
.unwrap_or(true)
})
.map(|w| w.slot)
// no rooted write found: produce no effect, since writes > newest_rooted_slot are retained anyway
.unwrap_or(self.newest_rooted_slot + 1);
writes
.retain(|w| w.slot == newest_rooted_write || w.slot > self.newest_rooted_slot);
}
// now it's fine to drop any slots before the new rooted head
// as account writes for non-rooted slots before it have been dropped
self.slots.retain(|s, _| *s >= self.newest_rooted_slot);
}
}
fn update_account(&mut self, pubkey: Pubkey, account: AccountData) {
use std::collections::hash_map::Entry;
match self.accounts.entry(pubkey) {
Entry::Vacant(v) => {
v.insert(vec![account]);
}
Entry::Occupied(o) => {
let v = o.into_mut();
// v is ordered by slot ascending. find the right position
// overwrite if an entry for the slot already exists, otherwise insert
let rev_pos = v
.iter()
.rev()
.position(|d| d.slot <= account.slot)
.unwrap_or(v.len());
let pos = v.len() - rev_pos;
if pos < v.len() && v[pos].slot == account.slot {
v[pos] = account;
} else {
v.insert(pos, account);
}
}
};
}
fn update_from_websocket(&mut self, message: websocket_source::Message) {
match message {
websocket_source::Message::Account(account_write) => {
info!("single message");
self.update_account(
account_write.pubkey,
AccountData {
slot: account_write.slot,
account: account_write.account,
},
);
}
websocket_source::Message::Slot(slot_update) => {
info!("slot message");
let slot_update = match *slot_update {
solana_client::rpc_response::SlotUpdate::CreatedBank {
slot, parent, ..
} => Some(SlotData {
slot,
parent: Some(parent),
status: SlotStatus::Processed,
chain: 0,
}),
solana_client::rpc_response::SlotUpdate::OptimisticConfirmation {
slot,
..
} => Some(SlotData {
slot,
parent: None,
status: SlotStatus::Confirmed,
chain: 0,
}),
solana_client::rpc_response::SlotUpdate::Root { slot, .. } => Some(SlotData {
slot,
parent: None,
status: SlotStatus::Rooted,
chain: 0,
}),
_ => None,
};
if let Some(update) = slot_update {
self.update_slot(update);
}
}
}
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args: Vec<String> = std::env::args().collect();
if args.len() < 2 {
println!("requires a config file argument");
return Ok(());
}
let config: Config = {
let mut file = File::open(&args[1])?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
toml::from_str(&contents).unwrap()
};
let mango_program_id = Pubkey::from_str(&config.mango_program_id)?;
solana_logger::setup_with_default("info");
info!("startup");
let metrics_tx = metrics::start();
let (websocket_sender, websocket_receiver) =
async_channel::unbounded::<websocket_source::Message>();
websocket_source::start(config, websocket_sender);
// TODO: Also have a snapshot source
let mut chain_data = ChainData::default();
loop {
let message = websocket_receiver.recv().await.unwrap();
info!("got message");
// build a model of slots and accounts in `chain_data`
// this code should be generic so it can be reused in future projects
chain_data.update_from_websocket(message.clone());
// specific program logic using the mirrored data
match message {
websocket_source::Message::Account(account_write) => {
// TODO: Do we need to check health when open orders accounts change?
if account_write.account.owner != mango_program_id || account_write.account.data.len() == 0 {
continue;
}
let kind = account_write.account.data[0];
match kind {
// MangoAccount
1 => {
// TODO: Check that it belongs to the right group
// TODO: track mango account pubkeys in a set - need to iterate them for health checks
// TODO: check health of this particular one
},
// MangoCache
7 => {
// TODO: Check that it belongs to the right group, for that, we need to check the MangoGroup account
// TODO: check health of all accounts
},
_ => {},
}
},
_ => {}
}
}
}

196
src/metrics.rs Normal file
View File

@ -0,0 +1,196 @@
use {
log::*,
std::collections::HashMap,
std::sync::{atomic, Arc, Mutex, RwLock},
tokio::time,
};
#[derive(Debug)]
enum Value {
U64(Arc<atomic::AtomicU64>),
I64(Arc<atomic::AtomicI64>),
String(Arc<Mutex<String>>),
}
#[derive(Debug)]
enum PrevValue {
U64(u64),
I64(i64),
String(String),
}
#[derive(Clone)]
pub struct MetricU64 {
value: Arc<atomic::AtomicU64>,
}
impl MetricU64 {
pub fn value(&self) -> u64 {
self.value.load(atomic::Ordering::Acquire)
}
pub fn set(&mut self, value: u64) {
self.value.store(value, atomic::Ordering::Release);
}
pub fn set_max(&mut self, value: u64) {
self.value.fetch_max(value, atomic::Ordering::AcqRel);
}
pub fn add(&mut self, value: u64) {
self.value.fetch_add(value, atomic::Ordering::AcqRel);
}
pub fn increment(&mut self) {
self.value.fetch_add(1, atomic::Ordering::AcqRel);
}
pub fn decrement(&mut self) {
self.value.fetch_sub(1, atomic::Ordering::AcqRel);
}
}
#[derive(Clone)]
pub struct MetricI64 {
value: Arc<atomic::AtomicI64>,
}
impl MetricI64 {
pub fn set(&mut self, value: i64) {
self.value.store(value, atomic::Ordering::Release);
}
pub fn increment(&mut self) {
self.value.fetch_add(1, atomic::Ordering::AcqRel);
}
pub fn decrement(&mut self) {
self.value.fetch_sub(1, atomic::Ordering::AcqRel);
}
}
#[derive(Clone)]
pub struct MetricString {
value: Arc<Mutex<String>>,
}
impl MetricString {
pub fn set(&self, value: String) {
*self.value.lock().unwrap() = value;
}
}
#[derive(Clone)]
pub struct Metrics {
registry: Arc<RwLock<HashMap<String, Value>>>,
}
impl Metrics {
pub fn register_u64(&self, name: String) -> MetricU64 {
let mut registry = self.registry.write().unwrap();
let value = registry
.entry(name)
.or_insert(Value::U64(Arc::new(atomic::AtomicU64::new(0))));
MetricU64 {
value: match value {
Value::U64(v) => v.clone(),
_ => panic!("bad metric type"),
},
}
}
pub fn register_i64(&self, name: String) -> MetricI64 {
let mut registry = self.registry.write().unwrap();
let value = registry
.entry(name)
.or_insert(Value::I64(Arc::new(atomic::AtomicI64::new(0))));
MetricI64 {
value: match value {
Value::I64(v) => v.clone(),
_ => panic!("bad metric type"),
},
}
}
pub fn register_string(&self, name: String) -> MetricString {
let mut registry = self.registry.write().unwrap();
let value = registry
.entry(name)
.or_insert(Value::String(Arc::new(Mutex::new(String::new()))));
MetricString {
value: match value {
Value::String(v) => v.clone(),
_ => panic!("bad metric type"),
},
}
}
}
pub fn start() -> Metrics {
let mut write_interval = time::interval(time::Duration::from_secs(60));
let registry = Arc::new(RwLock::new(HashMap::<String, Value>::new()));
let registry_c = Arc::clone(&registry);
tokio::spawn(async move {
let mut previous_values = HashMap::<String, PrevValue>::new();
loop {
write_interval.tick().await;
// Nested locking! Safe because the only other user locks registry for writing and doesn't
// acquire any interior locks.
let metrics = registry_c.read().unwrap();
for (name, value) in metrics.iter() {
let previous_value = previous_values.get_mut(name);
match value {
Value::U64(v) => {
let new_value = v.load(atomic::Ordering::Acquire);
let previous_value = if let Some(PrevValue::U64(v)) = previous_value {
let prev = *v;
*v = new_value;
prev
} else {
previous_values.insert(name.clone(), PrevValue::U64(new_value));
0
};
let diff = new_value.wrapping_sub(previous_value) as i64;
info!("metric: {}: {} ({:+})", name, new_value, diff);
}
Value::I64(v) => {
let new_value = v.load(atomic::Ordering::Acquire);
let previous_value = if let Some(PrevValue::I64(v)) = previous_value {
let prev = *v;
*v = new_value;
prev
} else {
previous_values.insert(name.clone(), PrevValue::I64(new_value));
0
};
let diff = new_value - previous_value;
info!("metric: {}: {} ({:+})", name, new_value, diff);
}
Value::String(v) => {
let new_value = v.lock().unwrap();
let previous_value = if let Some(PrevValue::String(v)) = previous_value {
let mut prev = new_value.clone();
std::mem::swap(&mut prev, v);
prev
} else {
previous_values
.insert(name.clone(), PrevValue::String(new_value.clone()));
"".into()
};
if *new_value == previous_value {
info!("metric: {}: {} (unchanged)", name, &*new_value);
} else {
info!(
"metric: {}: {} (before: {})",
name, &*new_value, previous_value
);
}
}
}
}
}
});
Metrics { registry }
}

175
src/websocket_source.rs Normal file
View File

@ -0,0 +1,175 @@
use jsonrpc_core::futures::StreamExt;
use jsonrpc_core_client::transports::{http, ws};
use solana_account_decoder::UiAccountEncoding;
use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_filter::{Memcmp, MemcmpEncodedBytes, RpcFilterType},
rpc_response::{Response, RpcKeyedAccount},
};
use solana_rpc::{rpc::rpc_full::FullClient, rpc::OptionalContext, rpc_pubsub::RpcSolPubSubClient};
use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey};
use log::*;
use std::{
str::FromStr,
sync::Arc,
time::{Duration, Instant},
};
use crate::{AnyhowWrap, Config};
#[derive(Clone)]
pub struct AccountUpdate {
pub pubkey: Pubkey,
pub slot: u64,
pub account: Account, // possibly shareddata?
}
impl AccountUpdate {
pub fn from_rpc(rpc: Response<RpcKeyedAccount>) -> anyhow::Result<Self> {
let pubkey = Pubkey::from_str(&rpc.value.pubkey)?;
let account: Account = rpc
.value
.account
.decode()
.ok_or(anyhow::anyhow!("could not decode account"))?;
Ok(AccountUpdate {
pubkey,
slot: rpc.context.slot,
account,
})
}
}
#[derive(Clone)]
pub enum Message {
Account(AccountUpdate),
Slot(Arc<solana_client::rpc_response::SlotUpdate>),
}
async fn feed_data(config: &Config, sender: async_channel::Sender<Message>) -> anyhow::Result<()> {
let mango_program_id = Pubkey::from_str(&config.mango_program_id)?;
let serum_program_id = Pubkey::from_str(&config.serum_program_id)?;
let mango_signer_id = Pubkey::from_str(&config.mango_signer_id)?;
let snapshot_duration = Duration::from_secs(300);
let connect = ws::try_connect::<RpcSolPubSubClient>(&config.rpc_ws_url).map_err_anyhow()?;
let client = connect.await.map_err_anyhow()?;
let rpc_client = http::connect_with_options::<FullClient>(&config.rpc_http_url, true)
.await
.map_err_anyhow()?;
let account_info_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::processed()),
data_slice: None,
};
let all_accounts_config = RpcProgramAccountsConfig {
filters: None,
with_context: Some(true),
account_config: account_info_config.clone(),
};
let open_orders_accounts_config = RpcProgramAccountsConfig {
// filter for only OpenOrders with mango_signer as owner
filters: Some(vec![
RpcFilterType::DataSize(3228), // open orders size
RpcFilterType::Memcmp(Memcmp {
offset: 0,
// "serum" + u64 that is Initialized (1) + OpenOrders (4)
bytes: MemcmpEncodedBytes::Base58("AcUQf4PGf6fCHGwmpB".into()),
encoding: None,
}),
RpcFilterType::Memcmp(Memcmp {
offset: 45, // owner is the 4th field, after "serum" (header), account_flags: u64 and market: Pubkey
bytes: MemcmpEncodedBytes::Bytes(mango_signer_id.to_bytes().into()),
encoding: None,
}),
]),
with_context: Some(true),
account_config: account_info_config.clone(),
};
let mut mango_sub = client
.program_subscribe(
mango_program_id.to_string(),
Some(all_accounts_config.clone()),
)
.map_err_anyhow()?;
let mut open_orders_sub = client
.program_subscribe(
serum_program_id.to_string(),
Some(open_orders_accounts_config.clone()),
)
.map_err_anyhow()?;
let mut slot_sub = client.slots_updates_subscribe().map_err_anyhow()?;
let mut last_snapshot = Instant::now() - snapshot_duration;
loop {
// occasionally cause a new snapshot to be produced
// including the first time
/*
if last_snapshot + snapshot_duration <= Instant::now() {
let account_snapshot = rpc_client
.get_program_accounts(
mango_program_id.to_string(),
Some(all_accounts_config.clone()),
)
.await
.map_err_anyhow()?;
if let OptionalContext::Context(account_snapshot_response) = account_snapshot {
sender
.send(Message::SnapshotUpdate(account_snapshot_response))
.await
.expect("sending must succeed");
}
last_snapshot = Instant::now();
}
*/
tokio::select! {
message = mango_sub.next() => {
if let Some(data) = message {
let response = data.map_err_anyhow()?;
sender.send(Message::Account(AccountUpdate::from_rpc(response)?)).await.expect("sending must succeed");
} else {
warn!("mango stream closed");
return Ok(());
}
},
message = open_orders_sub.next() => {
if let Some(data) = message {
let response = data.map_err_anyhow()?;
sender.send(Message::Account(AccountUpdate::from_rpc(response)?)).await.expect("sending must succeed");
} else {
warn!("serum stream closed");
return Ok(());
}
},
message = slot_sub.next() => {
if let Some(data) = message {
sender.send(Message::Slot(data.map_err_anyhow()?)).await.expect("sending must succeed");
} else {
warn!("slot update stream closed");
return Ok(());
}
},
_ = tokio::time::sleep(Duration::from_secs(60)) => {
warn!("websocket timeout");
return Ok(())
}
}
}
}
pub fn start(config: Config, sender: async_channel::Sender<Message>) {
tokio::spawn(async move {
// if the websocket disconnects, we get no data in a while etc, reconnect and try again
loop {
let out = feed_data(&config, sender.clone());
let _ = out.await;
}
});
}