Prepare mango-feeds for usage in mango-simulation (#2)

* update yellowstone

* Bump yellowstone-grpc-proto min version

Can't compile with the old version anymore since a new field was added
to the SubscribeRequestFilterAccounts struct.

* Bump connector version to 0.1.1

* Add "solana-1-15" feature that works with solana 1.15 versions

* reexport solana sdk for pubkey access

* add more logs

* fix bug in account write filter

* use gma snapshots for websocket source

* handle account write filter shutdown w/o crash

* Bump ci rust version

* Fix clippy complaints

---------

Co-authored-by: Christian Kamm <mail@ckamm.de>
Co-authored-by: Riordan Panayides <riordan@panayid.es>
This commit is contained in:
Maximilian Schneider 2023-04-20 20:24:01 +02:00 committed by GitHub
parent 079eb7efde
commit bc78b86cec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1528 additions and 1337 deletions

2090
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -23,7 +23,7 @@
if [[ -n $RUST_STABLE_VERSION ]]; then
stable_version="$RUST_STABLE_VERSION"
else
stable_version=1.59.0
stable_version=1.67.0
fi
if [[ -n $RUST_NIGHTLY_VERSION ]]; then
@ -91,7 +91,7 @@ export rust_nightly_docker_image=solanalabs/rust-nightly:"$nightly_version"
if [[ -n $RUST_STABLE_VERSION ]]; then
stable_version="$RUST_STABLE_VERSION"
else
stable_version=1.59.0
stable_version=1.67.0
fi
if [[ -n $RUST_NIGHTLY_VERSION ]]; then

View File

@ -1,6 +1,6 @@
[package]
name = "mango-feeds-connector"
version = "0.1.0"
version = "0.1.1"
authors = ["Christian Kamm <mail@ckamm.de>"]
edition = "2021"
license = "AGPL-3.0-or-later"
@ -8,15 +8,19 @@ description = "Listen to Solana account updates via geyser or websockets"
[lib]
[features]
default = ["solana-1-14"]
solana-1-14 = []
solana-1-15 = []
[dependencies]
jsonrpc-core = "18.0.0"
jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] }
solana-rpc = "~1.14.9"
solana-client = "~1.14.9"
solana-account-decoder = "~1.14.9"
solana-sdk = "~1.14.9"
solana-rpc = "1.14.9"
solana-client = "1.14.9"
solana-account-decoder = "1.14.9"
solana-sdk = "1.14.9"
tokio = { version = "1", features = ["full"] }
rustls = "0.20.8"
@ -34,4 +38,4 @@ async-trait = "0.1"
warp = "0.3"
yellowstone-grpc-proto = "1.0.1"
yellowstone-grpc-proto = "1.1.0"

View File

@ -5,6 +5,7 @@ use crate::{
};
use async_trait::async_trait;
use log::*;
use solana_sdk::{account::WritableAccount, pubkey::Pubkey, stake_history::Epoch};
use std::{
collections::{BTreeSet, HashMap},
@ -53,7 +54,7 @@ pub fn init(
let all_queue_pks: BTreeSet<Pubkey> = routes
.iter()
.flat_map(|r| r.matched_pubkeys.iter())
.map(|pk| pk.clone())
.copied()
.collect();
// update handling thread, reads both slots and account updates
@ -61,8 +62,11 @@ pub fn init(
loop {
tokio::select! {
Ok(account_write) = account_write_queue_receiver.recv() => {
if all_queue_pks.contains(&account_write.pubkey) {
if !all_queue_pks.contains(&account_write.pubkey) {
trace!("account write skipped {:?}", account_write.pubkey);
continue;
} else {
trace!("account write processed {:?}", account_write.pubkey);
}
chain_data.update_account(
@ -81,7 +85,8 @@ pub fn init(
);
}
Ok(slot_update) = slot_queue_receiver.recv() => {
chain_data.update_slot(SlotData {
trace!("slot update processed {:?}", slot_update);
chain_data.update_slot(SlotData {
slot: slot_update.slot,
parent: slot_update.parent,
status: slot_update.status,
@ -89,13 +94,18 @@ pub fn init(
});
}
else => {
warn!("channels closed, filter shutting down pks={all_queue_pks:?}");
break;
}
}
chain_data_metrics.report(&chain_data);
for route in routes.iter() {
for pk in route.matched_pubkeys.iter() {
match chain_data.account(&pk) {
match chain_data.account(pk) {
Ok(account_info) => {
let pk_b58 = pk.to_string();
if let Some(record) = last_updated.get(&pk_b58) {
@ -104,6 +114,7 @@ pub fn init(
let is_throttled =
record.timestamp.elapsed() < route.timeout_interval;
if is_unchanged || is_throttled {
trace!("skipped is_unchanged={is_unchanged} is_throttled={is_throttled} pk={pk_b58}");
continue;
}
};
@ -120,12 +131,14 @@ pub fn init(
},
);
}
Err(_skip_reason) => {
Err(skip_reason) => {
debug!("sink process skipped reason={skip_reason} pk={pk_b58}");
// todo: metrics
}
}
}
Err(_) => {
debug!("could not find pk in chain data pk={:?}", pk);
// todo: metrics
}
}

View File

@ -144,10 +144,8 @@ impl ChainData {
writes
.retain(|w| w.slot == newest_rooted_write || w.slot > self.newest_rooted_slot);
self.account_versions_stored += writes.len();
self.account_bytes_stored += writes
.iter()
.map(|w| w.account.data().len())
.fold(0, |acc, l| acc + l)
self.account_bytes_stored +=
writes.iter().map(|w| w.account.data().len()).sum::<usize>()
}
// now it's fine to drop any slots before the new rooted head

View File

@ -1,12 +1,9 @@
use futures::stream::once;
use jsonrpc_core::futures::StreamExt;
use jsonrpc_core_client::transports::http;
use solana_account_decoder::{UiAccount, UiAccountEncoding};
use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
use solana_client::rpc_response::{OptionalContext, RpcKeyedAccount};
use solana_rpc::rpc::rpc_accounts::AccountsDataClient;
use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey};
use solana_account_decoder::UiAccount;
use solana_client::rpc_response::OptionalContext;
use solana_sdk::{account::Account, pubkey::Pubkey};
use futures::{future, future::FutureExt};
use yellowstone_grpc_proto::tonic::{
@ -26,12 +23,12 @@ use yellowstone_grpc_proto::prelude::{
SubscribeUpdateSlotStatus,
};
use crate::snapshot::{get_snapshot_gma, get_snapshot_gpa};
use crate::FilterConfig;
use crate::{
chain_data::SlotStatus,
metrics::{MetricType, Metrics},
AccountWrite, AnyhowWrap, GrpcSourceConfig, SlotUpdate, SnapshotSourceConfig, SourceConfig,
TlsConfig,
AccountWrite, GrpcSourceConfig, SlotUpdate, SnapshotSourceConfig, SourceConfig, TlsConfig,
};
struct SnapshotData {
@ -43,59 +40,6 @@ enum Message {
Snapshot(SnapshotData),
}
async fn get_snapshot_gpa(
rpc_http_url: String,
program_id: String,
) -> anyhow::Result<OptionalContext<Vec<RpcKeyedAccount>>> {
let rpc_client = http::connect::<AccountsDataClient>(&rpc_http_url)
.await
.map_err_anyhow()?;
let account_info_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::finalized()),
data_slice: None,
min_context_slot: None,
};
let program_accounts_config = RpcProgramAccountsConfig {
filters: None,
with_context: Some(true),
account_config: account_info_config.clone(),
};
info!("requesting snapshot {}", program_id);
let account_snapshot = rpc_client
.get_program_accounts(program_id.clone(), Some(program_accounts_config.clone()))
.await
.map_err_anyhow()?;
info!("snapshot received {}", program_id);
Ok(account_snapshot)
}
async fn get_snapshot_gma(
rpc_http_url: String,
ids: Vec<String>,
) -> anyhow::Result<solana_client::rpc_response::Response<Vec<Option<UiAccount>>>> {
let rpc_client = http::connect::<AccountsDataClient>(&rpc_http_url)
.await
.map_err_anyhow()?;
let account_info_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::finalized()),
data_slice: None,
min_context_slot: None,
};
info!("requesting snapshot {:?}", ids);
let account_snapshot = rpc_client
.get_multiple_accounts(ids.clone(), Some(account_info_config))
.await
.map_err_anyhow()?;
info!("snapshot received {:?}", ids);
Ok(account_snapshot)
}
async fn feed_data_geyser(
grpc_config: &GrpcSourceConfig,
tls_config: Option<ClientTlsConfig>,
@ -139,6 +83,7 @@ async fn feed_data_geyser(
SubscribeRequestFilterAccounts {
account: filter_config.account_ids.clone(),
owner: filter_config.program_ids.clone(),
filters: vec![],
},
);
let mut slots = HashMap::new();
@ -238,9 +183,9 @@ async fn feed_data_geyser(
if snapshot_needed && max_rooted_slot - rooted_to_finalized_slots > first_full_slot {
snapshot_needed = false;
if filter_config.account_ids.len() > 0 {
if !filter_config.account_ids.is_empty() {
snapshot_gma = tokio::spawn(get_snapshot_gma(rpc_http_url.clone(), filter_config.account_ids.clone())).fuse();
} else if filter_config.program_ids.len() > 0 {
} else if !filter_config.program_ids.is_empty() {
snapshot_gpa = tokio::spawn(get_snapshot_gpa(rpc_http_url.clone(), filter_config.program_ids[0].clone())).fuse();
}
}
@ -268,7 +213,7 @@ async fn feed_data_geyser(
},
};
let pubkey_bytes = Pubkey::new(&write.pubkey).to_bytes();
let pubkey_bytes = Pubkey::try_from(write.pubkey).unwrap().to_bytes();
let write_version_mapping = pubkey_writes.entry(pubkey_bytes).or_insert(WriteVersion {
global: write.write_version,
slot: 1, // write version 0 is reserved for snapshots
@ -401,7 +346,7 @@ pub async fn process_events(
// Make TLS config if configured
let tls_config = grpc_source.tls.as_ref().map(make_tls_config).or_else(|| {
if grpc_source.connection_string.starts_with(&"https") {
if grpc_source.connection_string.starts_with("https") {
Some(ClientTlsConfig::new())
} else {
None
@ -499,7 +444,8 @@ pub async fn process_events(
// Skip writes that a different server has already sent
let pubkey_writes = latest_write.entry(info.slot).or_default();
let pubkey_bytes = Pubkey::new(&update.pubkey).to_bytes();
let pubkey_bytes =
Pubkey::try_from(update.pubkey.clone()).unwrap().to_bytes();
let writes = pubkey_writes.entry(pubkey_bytes).or_insert(0);
if update.write_version <= *writes {
continue;
@ -510,11 +456,11 @@ pub async fn process_events(
// zstd_decompress(&update.data, &mut uncompressed).unwrap();
account_write_queue_sender
.send(AccountWrite {
pubkey: Pubkey::new(&update.pubkey),
pubkey: Pubkey::try_from(update.pubkey.clone()).unwrap(),
slot: info.slot,
write_version: update.write_version,
lamports: update.lamports,
owner: Pubkey::new(&update.owner),
owner: Pubkey::try_from(update.owner.clone()).unwrap(),
executable: update.executable,
rent_epoch: update.rent_epoch,
data: update.data,

View File

@ -2,6 +2,7 @@ pub mod account_write_filter;
pub mod chain_data;
pub mod grpc_plugin_source;
pub mod metrics;
pub mod snapshot;
pub mod websocket_source;
use {
@ -9,6 +10,18 @@ use {
solana_sdk::{account::Account, pubkey::Pubkey},
};
#[cfg(all(feature = "solana-1-14", feature = "solana-1-15"))]
compile_error!(
"feature \"solana-1-14\" and feature \"solana-1-15\" cannot be enabled at the same time"
);
#[cfg(feature = "solana-1-14")]
use solana_rpc::rpc::rpc_accounts::AccountsDataClient as GetProgramAccountsClient;
#[cfg(feature = "solana-1-15")]
use solana_rpc::rpc::rpc_accounts_scan::AccountsScanClient as GetProgramAccountsClient;
pub use solana_sdk;
trait AnyhowWrap {
type Value;
fn map_err_anyhow(self) -> anyhow::Result<Self::Value>;
@ -38,7 +51,7 @@ impl AccountWrite {
fn from(pubkey: Pubkey, slot: u64, write_version: u64, account: Account) -> AccountWrite {
AccountWrite {
pubkey,
slot: slot,
slot,
write_version,
lamports: account.lamports,
owner: account.owner,

View File

@ -120,7 +120,7 @@ impl Metrics {
let mut registry = self.registry.write().unwrap();
let value = registry.entry(name).or_insert(Value::U64 {
value: Arc::new(atomic::AtomicU64::new(0)),
metric_type: metric_type,
metric_type,
});
MetricU64 {
value: match value {
@ -137,7 +137,7 @@ impl Metrics {
let mut registry = self.registry.write().unwrap();
let value = registry.entry(name).or_insert(Value::I64 {
value: Arc::new(atomic::AtomicI64::new(0)),
metric_type: metric_type,
metric_type,
});
MetricI64 {
value: match value {
@ -191,7 +191,7 @@ impl Metrics {
metric_type: t,
} => {
let bool_to_int = if *v.lock().unwrap() { 1 } else { 0 };
(format!("{}", bool_to_int), t.to_string())
(format!("{bool_to_int}"), t.to_string())
}
};
vec.push((name.clone(), value_str, type_str));
@ -205,7 +205,7 @@ async fn handle_prometheus_poll(metrics: Metrics) -> Result<impl Reply, Rejectio
let label_strings_vec: Vec<String> = metrics
.labels
.iter()
.map(|(name, value)| format!("{}=\"{}\"", name, value))
.map(|(name, value)| format!("{name}=\"{value}\""))
.collect();
let lines: Vec<String> = metrics
.get_registry_vec()
@ -301,12 +301,11 @@ pub fn start(config: MetricsConfig, process_name: String) -> Metrics {
} => {
let new_value = v.lock().unwrap();
let previous_value = if let Some(PrevValue::Bool(v)) = previous_value {
let mut prev = new_value.clone();
let mut prev = *new_value;
std::mem::swap(&mut prev, v);
prev
} else {
previous_values
.insert(name.clone(), PrevValue::Bool(new_value.clone()));
previous_values.insert(name.clone(), PrevValue::Bool(*new_value));
false
};
if *new_value == previous_value {

104
connector/src/snapshot.rs Normal file
View File

@ -0,0 +1,104 @@
use anyhow::anyhow;
use jsonrpc_core_client::transports::http;
use log::*;
use solana_account_decoder::{UiAccount, UiAccountEncoding};
use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_response::{OptionalContext, RpcKeyedAccount},
};
use solana_rpc::rpc::rpc_accounts::AccountsDataClient;
use solana_sdk::{commitment_config::CommitmentConfig, slot_history::Slot};
use crate::{AnyhowWrap, FilterConfig};
pub async fn get_snapshot_gpa(
rpc_http_url: String,
program_id: String,
) -> anyhow::Result<OptionalContext<Vec<RpcKeyedAccount>>> {
let rpc_client = http::connect::<crate::GetProgramAccountsClient>(&rpc_http_url)
.await
.map_err_anyhow()?;
let account_info_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::finalized()),
data_slice: None,
min_context_slot: None,
};
let program_accounts_config = RpcProgramAccountsConfig {
filters: None,
with_context: Some(true),
account_config: account_info_config.clone(),
};
info!("requesting snapshot {}", program_id);
let account_snapshot = rpc_client
.get_program_accounts(program_id.clone(), Some(program_accounts_config.clone()))
.await
.map_err_anyhow()?;
info!("snapshot received {}", program_id);
Ok(account_snapshot)
}
pub async fn get_snapshot_gma(
rpc_http_url: String,
ids: Vec<String>,
) -> anyhow::Result<solana_client::rpc_response::Response<Vec<Option<UiAccount>>>> {
let rpc_client = http::connect::<AccountsDataClient>(&rpc_http_url)
.await
.map_err_anyhow()?;
let account_info_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::finalized()),
data_slice: None,
min_context_slot: None,
};
info!("requesting snapshot {:?}", ids);
let account_snapshot = rpc_client
.get_multiple_accounts(ids.clone(), Some(account_info_config))
.await
.map_err_anyhow()?;
info!("snapshot received {:?}", ids);
Ok(account_snapshot)
}
pub async fn get_snapshot(
rpc_http_url: String,
filter_config: &FilterConfig,
) -> anyhow::Result<(Slot, Vec<(String, Option<UiAccount>)>)> {
if !filter_config.account_ids.is_empty() {
let response =
get_snapshot_gma(rpc_http_url.clone(), filter_config.account_ids.clone()).await;
if let Ok(snapshot) = response {
let accounts: Vec<(String, Option<UiAccount>)> = filter_config
.account_ids
.iter()
.zip(snapshot.value)
.map(|x| (x.0.clone(), x.1))
.collect();
Ok((snapshot.context.slot, accounts))
} else {
Err(anyhow!("invalid gma response {:?}", response))
}
} else if !filter_config.program_ids.is_empty() {
let response =
get_snapshot_gpa(rpc_http_url.clone(), filter_config.program_ids[0].clone()).await;
if let Ok(OptionalContext::Context(snapshot)) = response {
let accounts: Vec<(String, Option<UiAccount>)> = snapshot
.value
.iter()
.map(|x| {
let deref = x.clone();
(deref.pubkey, Some(deref.account))
})
.collect();
Ok((snapshot.context.slot, accounts))
} else {
Err(anyhow!("invalid gpa response {:?}", response))
}
} else {
Err(anyhow!("invalid filter_config"))
}
}

View File

@ -1,13 +1,15 @@
use jsonrpc_core::futures::StreamExt;
use jsonrpc_core_client::transports::{http, ws};
use jsonrpc_core_client::transports::ws;
use solana_account_decoder::UiAccountEncoding;
use solana_account_decoder::{UiAccount, UiAccountEncoding};
use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_response::{OptionalContext, Response, RpcKeyedAccount},
rpc_response::{Response, RpcKeyedAccount},
};
use solana_rpc::rpc_pubsub::RpcSolPubSubClient;
use solana_sdk::{
account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey, slot_history::Slot,
};
use solana_rpc::{rpc::rpc_accounts::AccountsDataClient, rpc_pubsub::RpcSolPubSubClient};
use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey};
use log::*;
use std::{
@ -16,29 +18,31 @@ use std::{
time::{Duration, Instant},
};
use crate::{chain_data::SlotStatus, AccountWrite, AnyhowWrap, SlotUpdate, SourceConfig};
use crate::{
chain_data::SlotStatus, snapshot::get_snapshot, AccountWrite, AnyhowWrap, FilterConfig,
SlotUpdate, SourceConfig,
};
enum WebsocketMessage {
SingleUpdate(Response<RpcKeyedAccount>),
SnapshotUpdate(Response<Vec<RpcKeyedAccount>>),
SnapshotUpdate((Slot, Vec<(String, Option<UiAccount>)>)),
SlotUpdate(Arc<solana_client::rpc_response::SlotUpdate>),
}
// TODO: the reconnecting should be part of this
async fn feed_data(
config: &SourceConfig,
filter_config: &FilterConfig,
sender: async_channel::Sender<WebsocketMessage>,
) -> anyhow::Result<()> {
debug!("feed_data {config:?}");
let program_id = Pubkey::from_str(&config.snapshot.program_id)?;
let snapshot_duration = Duration::from_secs(300);
let connect = ws::try_connect::<RpcSolPubSubClient>(&config.rpc_ws_url).map_err_anyhow()?;
let client = connect.await.map_err_anyhow()?;
let rpc_client = http::connect::<AccountsDataClient>(&config.snapshot.rpc_http_url)
.await
.map_err_anyhow()?;
let account_info_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::processed()),
@ -59,24 +63,25 @@ async fn feed_data(
.map_err_anyhow()?;
let mut slot_sub = client.slots_updates_subscribe().map_err_anyhow()?;
let mut last_snapshot = Instant::now() - snapshot_duration;
let mut last_snapshot = Instant::now().checked_sub(snapshot_duration).unwrap();
loop {
// occasionally cause a new snapshot to be produced
// including the first time
if last_snapshot + snapshot_duration <= Instant::now() {
let account_snapshot = rpc_client
.get_program_accounts(
program_id.to_string(),
Some(program_accounts_config.clone()),
)
.await
.map_err_anyhow()?;
if let OptionalContext::Context(account_snapshot_response) = account_snapshot {
let snapshot = get_snapshot(config.snapshot.rpc_http_url.clone(), filter_config).await;
if let Ok((slot, accounts)) = snapshot {
debug!(
"fetched new snapshot slot={slot} len={:?} time={:?}",
accounts.len(),
Instant::now() - snapshot_duration - last_snapshot
);
sender
.send(WebsocketMessage::SnapshotUpdate(account_snapshot_response))
.send(WebsocketMessage::SnapshotUpdate((slot, accounts)))
.await
.expect("sending must succeed");
} else {
error!("failed to parse snapshot")
}
last_snapshot = Instant::now();
}
@ -115,16 +120,18 @@ async fn feed_data(
// TODO: rename / split / rework
pub async fn process_events(
config: &SourceConfig,
filter_config: &FilterConfig,
account_write_queue_sender: async_channel::Sender<AccountWrite>,
slot_queue_sender: async_channel::Sender<SlotUpdate>,
) {
// Subscribe to program account updates websocket
let (update_sender, update_receiver) = async_channel::unbounded::<WebsocketMessage>();
let config = config.clone();
let filter_config = filter_config.clone();
tokio::spawn(async move {
// if the websocket disconnects, we get no data in a while etc, reconnect and try again
loop {
let out = feed_data(&config, update_sender.clone());
let out = feed_data(&config, &filter_config, update_sender.clone());
let _ = out.await;
}
});
@ -148,15 +155,21 @@ pub async fn process_events(
.await
.expect("send success");
}
WebsocketMessage::SnapshotUpdate(update) => {
trace!("snapshot update");
for keyed_account in update.value {
let account: Account = keyed_account.account.decode().unwrap();
let pubkey = Pubkey::from_str(&keyed_account.pubkey).unwrap();
account_write_queue_sender
.send(AccountWrite::from(pubkey, update.context.slot, 0, account))
.await
.expect("send success");
WebsocketMessage::SnapshotUpdate((slot, accounts)) => {
trace!("snapshot update {slot}");
for (pubkey, account) in accounts {
if let Some(account) = account {
let pubkey = Pubkey::from_str(&pubkey).unwrap();
account_write_queue_sender
.send(AccountWrite::from(
pubkey,
slot,
0,
account.decode().unwrap(),
))
.await
.expect("send success");
}
}
}
WebsocketMessage::SlotUpdate(update) => {

View File

@ -137,7 +137,7 @@ impl ToSql for SqlNumericI128 {
_: &postgres_types::Type,
out: &mut BytesMut,
) -> Result<IsNull, Box<dyn error::Error + 'static + Sync + Send>> {
let abs_val = self.0.abs() as u128;
let abs_val = self.0.unsigned_abs();
let decimals = if self.0 != 0 {
int_log::u128(abs_val)
} else {

View File

@ -28,7 +28,7 @@ tokio-tungstenite = "0.17"
bytemuck = "1.7.2"
mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
mango-v4-client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
serum_dex = { git = "https://github.com/openbook-dex/program" }
anchor-lang = "0.25.0"
anchor-client = "0.25.0"

View File

@ -41,5 +41,5 @@ pub async fn init(client: Arc<RpcClient>) -> Arc<RwLock<Hash>> {
spawn(async move { poll_loop(blockhash_c, client).await })
};
return blockhash;
blockhash
}

View File

@ -10,11 +10,12 @@ use anchor_client::{
};
use anchor_lang::prelude::Pubkey;
use bytemuck::bytes_of;
use client::{Client, MangoGroupContext};
use log::*;
use mango_v4_client::{Client, MangoGroupContext, TransactionBuilderConfig};
use solana_client::nonblocking::rpc_client::RpcClient;
use std::{
collections::HashSet,
convert::TryFrom,
fs::File,
io::Read,
str::FromStr,
@ -67,9 +68,11 @@ async fn main() -> anyhow::Result<()> {
let client = Client::new(
cluster.clone(),
CommitmentConfig::processed(),
&Keypair::new(),
Arc::new(Keypair::new()),
Some(rpc_timeout),
0,
TransactionBuilderConfig {
prioritization_micro_lamports: None,
},
);
let group_pk = Pubkey::from_str(&config.mango_group).unwrap();
let group_context =
@ -77,15 +80,15 @@ async fn main() -> anyhow::Result<()> {
let perp_queue_pks: Vec<_> = group_context
.perp_markets
.iter()
.map(|(_, context)| (context.address, context.market.event_queue))
.values()
.map(|context| (context.address, context.market.event_queue))
.collect();
// fetch all serum/openbook markets to find their event queues
let serum_market_pks: Vec<_> = group_context
.serum3_markets
.iter()
.map(|(_, context)| context.market.serum_market_external)
.values()
.map(|context| context.market.serum_market_external)
.collect();
let serum_market_ais = client
@ -109,7 +112,10 @@ async fn main() -> anyhow::Result<()> {
&pair.1.data[5..5 + std::mem::size_of::<serum_dex::state::MarketState>()],
);
let event_q = market_state.event_q;
(serum_market_pks[pair.0], Pubkey::new(bytes_of(&event_q)))
(
serum_market_pks[pair.0],
Pubkey::try_from(bytes_of(&event_q)).unwrap(),
)
})
.collect();
@ -162,6 +168,7 @@ async fn main() -> anyhow::Result<()> {
} else {
websocket_source::process_events(
&config.source,
&filter_config,
account_write_queue_sender,
slot_queue_sender,
)

View File

@ -33,7 +33,7 @@ impl MangoV4PerpCrankSink {
instruction_sender: Sender<Vec<Instruction>>,
) -> Self {
Self {
pks: pks.iter().map(|e| e.clone()).collect(),
pks: pks.iter().copied().collect(),
group_pk,
instruction_sender,
}
@ -50,8 +50,7 @@ impl AccountWriteSink for MangoV4PerpCrankSink {
// only crank if at least 1 fill or a sufficient events of other categories are buffered
let contains_fill_events = event_queue
.iter()
.find(|e| e.event_type == mango_v4::state::EventType::Fill as u8)
.is_some();
.any(|e| e.event_type == mango_v4::state::EventType::Fill as u8);
let has_backlog = event_queue.iter().count() > MAX_BACKLOG;
if !contains_fill_events && !has_backlog {
return Err("throttled".into());
@ -78,7 +77,7 @@ impl AccountWriteSink for MangoV4PerpCrankSink {
let mkt_pk = self
.pks
.get(pk)
.expect(&format!("{pk:?} is a known public key"));
.unwrap_or_else(|| panic!("{:?} is a known public key", pk));
let mut ams: Vec<_> = anchor_lang::ToAccountMetas::to_account_metas(
&mango_v4::accounts::PerpConsumeEvents {
group: self.group_pk,

View File

@ -26,7 +26,7 @@ pub struct OpenbookCrankSink {
impl OpenbookCrankSink {
pub fn new(pks: Vec<(Pubkey, Pubkey)>, instruction_sender: Sender<Vec<Instruction>>) -> Self {
Self {
pks: pks.iter().map(|e| e.clone()).collect(),
pks: pks.iter().copied().collect(),
instruction_sender,
}
}
@ -60,8 +60,7 @@ impl AccountWriteSink for OpenbookCrankSink {
// only crank if at least 1 fill or a sufficient events of other categories are buffered
let contains_fill_events = events
.iter()
.find(|e| matches!(e, serum_dex::state::EventView::Fill { .. }))
.is_some();
.any(|e| matches!(e, serum_dex::state::EventView::Fill { .. }));
let has_backlog = events.len() > MAX_BACKLOG;
if !contains_fill_events && !has_backlog {
@ -86,7 +85,7 @@ impl AccountWriteSink for OpenbookCrankSink {
let mkt_pk = self
.pks
.get(pk)
.expect(&format!("{pk:?} is a known public key"));
.unwrap_or_else(|| panic!("{:?} is a known public key", pk));
ams.append(
&mut [mkt_pk, pk, /*coin_pk*/ pk, /*pc_pk*/ pk]
.iter()

View File

@ -11,6 +11,7 @@ use crate::{
mango_v4_perp_crank_sink::MangoV4PerpCrankSink, openbook_crank_sink::OpenbookCrankSink,
};
#[allow(clippy::type_complexity)]
pub fn init(
perp_queue_pks: Vec<(Pubkey, Pubkey)>,
serum_queue_pks: Vec<(Pubkey, Pubkey)>,
@ -26,10 +27,7 @@ pub fn init(
let routes = vec![
AccountWriteRoute {
matched_pubkeys: serum_queue_pks
.iter()
.map(|(_, evq_pk)| evq_pk.clone())
.collect(),
matched_pubkeys: serum_queue_pks.iter().map(|(_, evq_pk)| *evq_pk).collect(),
sink: Arc::new(OpenbookCrankSink::new(
serum_queue_pks,
instruction_sender.clone(),
@ -37,21 +35,18 @@ pub fn init(
timeout_interval: Duration::default(),
},
AccountWriteRoute {
matched_pubkeys: perp_queue_pks
.iter()
.map(|(_, evq_pk)| evq_pk.clone())
.collect(),
matched_pubkeys: perp_queue_pks.iter().map(|(_, evq_pk)| *evq_pk).collect(),
sink: Arc::new(MangoV4PerpCrankSink::new(
perp_queue_pks,
group_pk,
instruction_sender.clone(),
instruction_sender,
)),
timeout_interval: Duration::default(),
},
];
let (account_write_queue_sender, slot_queue_sender) =
account_write_filter::init(routes, metrics_sender.clone())?;
account_write_filter::init(routes, metrics_sender)?;
Ok((
account_write_queue_sender,

View File

@ -40,7 +40,7 @@ postgres_query = { git = "https://github.com/nolanderc/rust-postgres-query", rev
base64 = "0.21.0"
mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
mango-v4-client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
serum_dex = { git = "https://github.com/jup-ag/openbook-program", branch = "feat/expose-things", features = ["no-entrypoint"] }
anchor-lang = "0.25.0"
anchor-client = "0.25.0"

View File

@ -32,6 +32,7 @@ use service_mango_fills::*;
// couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue
type EventQueueEvents = [AnyEvent; MAX_NUM_EVENTS as usize];
#[allow(clippy::too_many_arguments)]
fn publish_changes_perp(
slot: u64,
write_version: u64,
@ -46,12 +47,9 @@ fn publish_changes_perp(
metric_events_change: &mut MetricU64,
metric_events_drop: &mut MetricU64,
metric_head_update: &mut MetricU64,
metric_head_revoke: &mut MetricU64,
) {
// seq_num = N means that events (N-QUEUE_LEN) until N-1 are available
let start_seq_num = max(prev_seq_num, header.seq_num)
.checked_sub(MAX_NUM_EVENTS as u64)
.unwrap_or(0);
let start_seq_num = max(prev_seq_num, header.seq_num).saturating_sub(MAX_NUM_EVENTS as u64);
let mut checkpoint = Vec::new();
let mkt_pk_string = mkt.0.to_string();
let evq_pk_string = mkt.1.event_queue.to_string();
@ -191,7 +189,7 @@ fn publish_changes_perp(
// publish a head update event if the head changed (events were consumed)
if head != prev_head {
metric_head_update.increment();
fill_update_sender
.try_send(FillEventFilterMessage::HeadUpdate(HeadUpdate {
head,
@ -218,6 +216,7 @@ fn publish_changes_perp(
.unwrap()
}
#[allow(clippy::too_many_arguments)]
fn publish_changes_serum(
_slot: u64,
_write_version: u64,
@ -404,7 +403,7 @@ pub async fn init(
async_channel::Sender<SlotUpdate>,
async_channel::Receiver<FillEventFilterMessage>,
)> {
let metrics_sender = metrics_sender.clone();
let metrics_sender = metrics_sender;
let mut metric_events_new =
metrics_sender.register_u64("fills_feed_events_new".into(), MetricType::Counter);
@ -420,8 +419,6 @@ pub async fn init(
metrics_sender.register_u64("fills_feed_events_drop_serum".into(), MetricType::Counter);
let mut metrics_head_update =
metrics_sender.register_u64("fills_feed_head_update".into(), MetricType::Counter);
let mut metrics_head_revoke =
metrics_sender.register_u64("fills_feed_head_revoke".into(), MetricType::Counter);
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
let (account_write_queue_sender, account_write_queue_receiver) =
@ -435,7 +432,7 @@ pub async fn init(
let (fill_update_sender, fill_update_receiver) =
async_channel::unbounded::<FillEventFilterMessage>();
let account_write_queue_receiver_c = account_write_queue_receiver.clone();
let account_write_queue_receiver_c = account_write_queue_receiver;
let mut chain_cache = ChainData::new();
let mut chain_data_metrics = ChainDataMetrics::new(&metrics_sender);
@ -455,7 +452,7 @@ pub async fn init(
.map(|x| x.1.event_queue)
.collect();
let all_queue_pks: HashSet<Pubkey> =
HashSet::from_iter([perp_queue_pks.clone(), spot_queue_pks.clone()].concat());
HashSet::from_iter([perp_queue_pks, spot_queue_pks].concat());
// update handling thread, reads both sloths and account updates
tokio::spawn(async move {
@ -545,7 +542,7 @@ pub async fn init(
Some(prev_events) => publish_changes_perp(
account_info.slot,
account_info.write_version,
&mkt,
mkt,
&event_queue.header,
&event_queue.buf,
*prev_seq_num,
@ -556,7 +553,6 @@ pub async fn init(
&mut metric_events_change,
&mut metrics_events_drop,
&mut metrics_head_update,
&mut metrics_head_revoke,
),
_ => {
info!("perp_events_cache could not find {}", evq_pk_string)
@ -565,12 +561,9 @@ pub async fn init(
_ => info!("seq_num/head cache could not find {}", evq_pk_string),
}
seq_num_cache
.insert(evq_pk_string.clone(), event_queue.header.seq_num.clone());
head_cache
.insert(evq_pk_string.clone(), event_queue.header.head());
perp_events_cache
.insert(evq_pk_string.clone(), event_queue.buf.clone());
seq_num_cache.insert(evq_pk_string.clone(), event_queue.header.seq_num);
head_cache.insert(evq_pk_string.clone(), event_queue.header.head());
perp_events_cache.insert(evq_pk_string.clone(), event_queue.buf);
} else {
let inner_data = &account.data()[5..&account.data().len() - 7];
let header_span = std::mem::size_of::<SerumEventQueueHeader>();
@ -583,7 +576,7 @@ pub async fn init(
let new_len = rest.len() - slop;
let events = &rest[..new_len];
debug!("evq {} header_span {} header_seq_num {} header_count {} inner_len {} events_len {} sizeof Event {}", evq_pk_string, header_span, seq_num, count, inner_data.len(), events.len(), std::mem::size_of::<serum_dex::state::Event>());
let events: &[serum_dex::state::Event] = bytemuck::cast_slice(&events);
let events: &[serum_dex::state::Event] = bytemuck::cast_slice(events);
match seq_num_cache.get(&evq_pk_string) {
Some(prev_seq_num) => {
@ -593,7 +586,7 @@ pub async fn init(
account_info.write_version,
mkt,
&header,
&events,
events,
*prev_seq_num,
prev_events,
&fill_update_sender,
@ -612,10 +605,9 @@ pub async fn init(
_ => debug!("seq_num_cache could not find {}", evq_pk_string),
}
seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone());
seq_num_cache.insert(evq_pk_string.clone(), seq_num);
head_cache.insert(evq_pk_string.clone(), header.head as usize);
serum_events_cache
.insert(evq_pk_string.clone(), events.clone().to_vec());
serum_events_cache.insert(evq_pk_string.clone(), events.to_vec());
}
}
Err(_) => debug!("chain_cache could not find {}", mkt.1.event_queue),

View File

@ -144,8 +144,8 @@ async fn process_update(client: &Caching<Client>, update: &FillUpdate) -> anyhow
let market = &update.market_key;
let seq_num = update.event.seq_num as i64;
let fill_timestamp = Utc.timestamp_opt(update.event.timestamp as i64, 0).unwrap();
let price = update.event.price as f64;
let quantity = update.event.quantity as f64;
let price = update.event.price;
let quantity = update.event.quantity;
let slot = update.slot as i64;
let write_version = update.write_version as i64;
@ -251,7 +251,7 @@ pub async fn init(
.await;
let mut iter = results.iter();
batch.retain(|_| iter.next().unwrap().is_err());
if batch.len() > 0 {
if !batch.is_empty() {
metric_retries.add(batch.len() as u64);
error_count += 1;
if error_count - 1 < config.retry_query_max_count {

View File

@ -1,4 +1,4 @@
use std::convert::identity;
use std::convert::{identity, TryFrom};
use anchor_lang::prelude::Pubkey;
use bytemuck::cast_slice;
@ -110,15 +110,15 @@ impl FillEvent {
event_type: FillEventType::Perp,
maker: event.maker.to_string(),
taker: event.taker.to_string(),
taker_side: taker_side,
taker_side,
timestamp: event.timestamp,
seq_num: event.seq_num,
maker_client_order_id: event.maker_client_order_id,
taker_client_order_id: event.taker_client_order_id,
maker_fee: event.maker_fee,
taker_fee: event.taker_fee,
price: price,
quantity: quantity,
price,
quantity,
}
}
@ -167,8 +167,8 @@ impl FillEvent {
None => 0u64,
};
let base_multiplier = 10u64.pow(config.base_decimals.into()) as u64;
let quote_multiplier = 10u64.pow(config.quote_decimals.into()) as u64;
let base_multiplier = 10u64.pow(config.base_decimals.into());
let quote_multiplier = 10u64.pow(config.quote_decimals.into());
let (price, quantity) = match maker_side {
OrderbookSide::Bid => {
@ -197,9 +197,13 @@ impl FillEvent {
FillEvent {
event_type: FillEventType::Spot,
maker: Pubkey::new(cast_slice(&identity(maker_owner) as &[_])).to_string(),
taker: Pubkey::new(cast_slice(&identity(taker_owner) as &[_])).to_string(),
taker_side: taker_side,
maker: Pubkey::try_from(cast_slice(&identity(maker_owner) as &[_]))
.unwrap()
.to_string(),
taker: Pubkey::try_from(cast_slice(&identity(taker_owner) as &[_]))
.unwrap()
.to_string(),
taker_side,
timestamp,
seq_num,
maker_client_order_id,

View File

@ -6,7 +6,6 @@ use anchor_client::{
Cluster,
};
use anchor_lang::prelude::Pubkey;
use client::{Client, MangoGroupContext};
use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::{
future::{self, Ready},
@ -19,6 +18,7 @@ use mango_feeds_lib::{
websocket_source, FilterConfig, MarketConfig, MetricsConfig, PostgresConfig, SourceConfig,
StatusResponse,
};
use mango_v4_client::{Client, MangoGroupContext, TransactionBuilderConfig};
use service_mango_fills::{Command, FillCheckpoint, FillEventFilterMessage, FillEventType};
use std::{
collections::{HashMap, HashSet},
@ -147,7 +147,7 @@ fn handle_commands(
checkpoint_map: CheckpointMap,
market_ids: HashMap<String, String>,
) -> Ready<Result<(), Error>> {
let msg_str = msg.clone().into_text().unwrap();
let msg_str = msg.into_text().unwrap();
let command: Result<Command, serde_json::Error> = serde_json::from_str(&msg_str);
let mut peers = peer_map.lock().unwrap();
let peer = peers.get_mut(&addr).expect("peer should be in map");
@ -156,42 +156,77 @@ fn handle_commands(
Ok(Command::Subscribe(cmd)) => {
let mut wildcard = true;
// DEPRECATED
match cmd.market_id {
Some(market_id) => {
wildcard = false;
match market_ids.get(&market_id) {
None => {
let res = StatusResponse {
success: false,
message: "market not found",
};
peer.sender
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
.unwrap();
return future::ok(());
}
_ => {}
}
let subscribed = peer.market_subscriptions.insert(market_id.clone());
let res = if subscribed {
StatusResponse {
success: true,
message: "subscribed",
}
} else {
StatusResponse {
success: false,
message: "already subscribed",
}
if let Some(market_id) = cmd.market_id {
wildcard = false;
if market_ids.get(&market_id).is_none() {
let res = StatusResponse {
success: false,
message: "market not found",
};
peer.sender
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
.unwrap();
return future::ok(());
}
let subscribed = peer.market_subscriptions.insert(market_id.clone());
if subscribed {
let res = if subscribed {
StatusResponse {
success: true,
message: "subscribed",
}
} else {
StatusResponse {
success: false,
message: "already subscribed",
}
};
peer.sender
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
.unwrap();
if subscribed {
let checkpoint_map = checkpoint_map.lock().unwrap();
let checkpoint = checkpoint_map.get(&market_id);
match checkpoint {
Some(checkpoint) => {
peer.sender
.unbounded_send(Message::Text(
serde_json::to_string(&checkpoint).unwrap(),
))
.unwrap();
}
None => info!(
"no checkpoint available on client subscription for market {}",
&market_id
),
};
}
}
if let Some(cmd_market_ids) = cmd.market_ids {
wildcard = false;
for market_id in cmd_market_ids {
if market_ids.get(&market_id).is_none() {
let res = StatusResponse {
success: false,
message: &format!("market {} not found", &market_id),
};
peer.sender
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
.unwrap();
return future::ok(());
}
if peer.market_subscriptions.insert(market_id.clone()) {
let checkpoint_map = checkpoint_map.lock().unwrap();
let checkpoint = checkpoint_map.get(&market_id);
let res = StatusResponse {
success: true,
message: &format!("subscribed to market {}", &market_id),
};
peer.sender
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
.unwrap();
match checkpoint {
Some(checkpoint) => {
peer.sender
@ -207,73 +242,21 @@ fn handle_commands(
};
}
}
None => {}
}
match cmd.market_ids {
Some(cmd_market_ids) => {
wildcard = false;
for market_id in cmd_market_ids {
match market_ids.get(&market_id) {
None => {
let res = StatusResponse {
success: false,
message: &format!("market {} not found", &market_id),
};
peer.sender
.unbounded_send(Message::Text(
serde_json::to_string(&res).unwrap(),
))
.unwrap();
return future::ok(());
}
_ => {}
}
if peer.market_subscriptions.insert(market_id.clone()) {
let checkpoint_map = checkpoint_map.lock().unwrap();
let checkpoint = checkpoint_map.get(&market_id);
let res = StatusResponse {
success: true,
message: &format!("subscribed to market {}", &market_id),
};
if let Some(account_ids) = cmd.account_ids {
wildcard = false;
for account_id in account_ids {
if peer.account_subscriptions.insert(account_id.clone()) {
let res = StatusResponse {
success: true,
message: &format!("subscribed to account {}", &account_id),
};
peer.sender
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
.unwrap();
match checkpoint {
Some(checkpoint) => {
peer.sender
.unbounded_send(Message::Text(
serde_json::to_string(&checkpoint).unwrap(),
))
.unwrap();
}
None => info!(
"no checkpoint available on client subscription for market {}",
&market_id
),
};
}
peer.sender
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
.unwrap();
}
}
None => {}
}
match cmd.account_ids {
Some(account_ids) => {
wildcard = false;
for account_id in account_ids {
if peer.account_subscriptions.insert(account_id.clone()) {
let res = StatusResponse {
success: true,
message: &format!("subscribed to account {}", &account_id),
};
peer.sender
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
.unwrap();
}
}
}
None => {}
}
if wildcard {
for (market_id, market_name) in market_ids {
@ -377,9 +360,11 @@ async fn main() -> anyhow::Result<()> {
let client = Client::new(
cluster.clone(),
CommitmentConfig::processed(),
&Keypair::new(),
Arc::new(Keypair::new()),
Some(rpc_timeout),
0,
TransactionBuilderConfig {
prioritization_micro_lamports: None,
},
);
let group_context = Arc::new(
MangoGroupContext::new_from_rpc(
@ -392,8 +377,8 @@ async fn main() -> anyhow::Result<()> {
// todo: reload markets at intervals
let perp_market_configs: Vec<(Pubkey, MarketConfig)> = group_context
.perp_markets
.iter()
.map(|(_, context)| {
.values()
.map(|context| {
let quote_decimals = match group_context.tokens.get(&context.market.settle_token_index)
{
Some(token) => token.decimals,
@ -417,8 +402,8 @@ async fn main() -> anyhow::Result<()> {
let spot_market_configs: Vec<(Pubkey, MarketConfig)> = group_context
.serum3_markets
.iter()
.map(|(_, context)| {
.values()
.map(|context| {
let base_decimals = match group_context.tokens.get(&context.market.base_token_index) {
Some(token) => token.decimals,
None => panic!("token not found for market"), // todo: default?
@ -445,14 +430,14 @@ async fn main() -> anyhow::Result<()> {
let perp_queue_pks: Vec<(Pubkey, Pubkey)> = group_context
.perp_markets
.iter()
.map(|(_, context)| (context.address, context.market.event_queue))
.values()
.map(|context| (context.address, context.market.event_queue))
.collect();
let _a: Vec<(String, String)> = group_context
.serum3_markets
.iter()
.map(|(_, context)| {
.values()
.map(|context| {
(
context.market.serum_market_external.to_string(),
context.market.name().to_owned(),
@ -461,8 +446,8 @@ async fn main() -> anyhow::Result<()> {
.collect();
let b: Vec<(String, String)> = group_context
.perp_markets
.iter()
.map(|(_, context)| {
.values()
.map(|context| {
(
context.address.to_string(),
context.market.name().to_owned(),
@ -524,11 +509,10 @@ async fn main() -> anyhow::Result<()> {
}
// send fills to db
let update_c = update.clone();
match (postgres_update_sender.clone(), update_c.event.event_type) {
(Some(sender), FillEventType::Perp) => {
sender.send(update_c).await.unwrap();
}
_ => {}
if let (Some(sender), FillEventType::Perp) =
(postgres_update_sender.clone(), update_c.event.event_type)
{
sender.send(update_c).await.unwrap();
}
}
FillEventFilterMessage::Checkpoint(checkpoint) => {
@ -641,6 +625,7 @@ async fn main() -> anyhow::Result<()> {
} else {
websocket_source::process_events(
&config.source,
&filter_config,
account_write_queue_sender,
slot_queue_sender,
)

View File

@ -29,7 +29,7 @@ itertools = "0.10.5"
solana-sdk = "~1.14.9"
mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
mango-v4-client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
serum_dex = { git = "https://github.com/jup-ag/openbook-program", branch = "feat/expose-things", features = ["no-entrypoint"] }
anchor-lang = "0.25.0"
anchor-client = "0.25.0"

View File

@ -5,13 +5,13 @@ use anchor_client::{
Cluster,
};
use anchor_lang::prelude::Pubkey;
use client::{Client, MangoGroupContext};
use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::{
future::{self, Ready},
pin_mut, SinkExt, StreamExt, TryStreamExt,
};
use log::*;
use mango_v4_client::{Client, MangoGroupContext, TransactionBuilderConfig};
use std::{
collections::{HashMap, HashSet},
fs::File,
@ -157,25 +157,22 @@ fn handle_commands(
checkpoint_map: CheckpointMap,
market_ids: HashMap<String, String>,
) -> Ready<Result<(), Error>> {
let msg_str = msg.clone().into_text().unwrap();
let msg_str = msg.into_text().unwrap();
let command: Result<Command, serde_json::Error> = serde_json::from_str(&msg_str);
let mut peers = peer_map.lock().unwrap();
let peer = peers.get_mut(&addr).expect("peer should be in map");
match command {
Ok(Command::Subscribe(cmd)) => {
let market_id = cmd.clone().market_id;
match market_ids.get(&market_id) {
None => {
let res = StatusResponse {
success: false,
message: "market not found",
};
peer.sender
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
.unwrap();
return future::ok(());
}
_ => {}
let market_id = cmd.market_id;
if market_ids.get(&market_id).is_none() {
let res = StatusResponse {
success: false,
message: "market not found",
};
peer.sender
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
.unwrap();
return future::ok(());
}
let subscribed = peer.subscriptions.insert(market_id.clone());
@ -286,9 +283,11 @@ async fn main() -> anyhow::Result<()> {
let client = Client::new(
cluster.clone(),
CommitmentConfig::processed(),
&Keypair::new(),
Arc::new(Keypair::new()),
Some(rpc_timeout),
0,
TransactionBuilderConfig {
prioritization_micro_lamports: None,
},
);
let group_context = Arc::new(
MangoGroupContext::new_from_rpc(
@ -301,8 +300,8 @@ async fn main() -> anyhow::Result<()> {
// todo: reload markets at intervals
let market_configs: Vec<(Pubkey, MarketConfig)> = group_context
.perp_markets
.iter()
.map(|(_, context)| {
.values()
.map(|context| {
let quote_decimals = match group_context.tokens.get(&context.market.settle_token_index)
{
Some(token) => token.decimals,
@ -326,8 +325,8 @@ async fn main() -> anyhow::Result<()> {
let serum_market_configs: Vec<(Pubkey, MarketConfig)> = group_context
.serum3_markets
.iter()
.map(|(_, context)| {
.values()
.map(|context| {
let base_decimals = match group_context.tokens.get(&context.market.base_token_index) {
Some(token) => token.decimals,
None => panic!("token not found for market"), // todo: default?
@ -432,17 +431,18 @@ async fn main() -> anyhow::Result<()> {
.map(|c| c.connection_string.clone())
.collect::<String>()
);
let relevant_pubkeys = [market_configs.clone()]
.concat()
.iter()
.flat_map(|m| [m.1.bids.to_string(), m.1.asks.to_string()])
.collect();
let filter_config = FilterConfig {
program_ids: vec![],
account_ids: relevant_pubkeys,
};
let use_geyser = true;
if use_geyser {
let relevant_pubkeys = [market_configs.clone()]
.concat()
.iter()
.flat_map(|m| [m.1.bids.to_string(), m.1.asks.to_string()])
.collect();
let filter_config = FilterConfig {
program_ids: vec![],
account_ids: relevant_pubkeys,
};
grpc_plugin_source::process_events(
&config.source,
&filter_config,
@ -455,6 +455,7 @@ async fn main() -> anyhow::Result<()> {
} else {
websocket_source::process_events(
&config.source,
&filter_config,
account_write_queue_sender,
slot_queue_sender,
)

View File

@ -31,6 +31,7 @@ use std::{
time::{SystemTime, UNIX_EPOCH},
};
#[allow(clippy::too_many_arguments)]
fn publish_changes(
slot: u64,
write_version: u64,
@ -80,11 +81,11 @@ fn publish_changes(
"C {} {} -> {}",
current_order[0], previous_order[1], current_order[1]
);
update.push(current_order.clone());
update.push(*current_order);
}
None => {
info!("A {} {}", current_order[0], current_order[1]);
update.push(current_order.clone())
update.push(*current_order)
}
}
}
@ -108,14 +109,14 @@ fn publish_changes(
None => info!("other bookside not in cache"),
}
if update.len() == 0 {
if update.is_empty() {
return;
}
orderbook_update_sender
.try_send(OrderbookFilterMessage::Update(OrderbookUpdate {
market: mkt.0.to_string(),
side: side.clone(),
side,
update,
slot,
write_version,
@ -133,8 +134,6 @@ pub async fn init(
async_channel::Sender<SlotUpdate>,
async_channel::Receiver<OrderbookFilterMessage>,
)> {
let metrics_sender = metrics_sender.clone();
let mut metric_events_new =
metrics_sender.register_u64("orderbook_updates".into(), MetricType::Counter);
@ -150,8 +149,6 @@ pub async fn init(
let (fill_update_sender, fill_update_receiver) =
async_channel::unbounded::<OrderbookFilterMessage>();
let account_write_queue_receiver_c = account_write_queue_receiver.clone();
let mut chain_cache = ChainData::new();
let mut chain_data_metrics = ChainDataMetrics::new(&metrics_sender);
let mut bookside_cache: HashMap<String, Vec<OrderbookLevel>> = HashMap::new();
@ -168,7 +165,7 @@ pub async fn init(
tokio::spawn(async move {
loop {
tokio::select! {
Ok(account_write) = account_write_queue_receiver_c.recv() => {
Ok(account_write) = account_write_queue_receiver.recv() => {
if !relevant_pubkeys.contains(&account_write.pubkey) {
continue;
}
@ -256,9 +253,7 @@ pub async fn init(
mkt.1.quote_lot_size,
),
base_lots_to_ui_perp(
group
.map(|(_, quantity)| quantity)
.fold(0, |acc, x| acc + x),
group.map(|(_, quantity)| quantity).sum(),
mkt.1.base_decimals,
mkt.1.base_lot_size,
),
@ -275,7 +270,7 @@ pub async fn init(
mkt,
side,
&bookside,
&old_bookside,
old_bookside,
other_bookside,
&fill_update_sender,
&mut metric_events_new,
@ -330,9 +325,7 @@ pub async fn init(
mkt.1.quote_decimals,
),
base_lots_to_ui(
group
.map(|(_, quantity)| quantity)
.fold(0, |acc, x| acc + x),
group.map(|(_, quantity)| quantity).sum(),
mkt.1.base_decimals,
mkt.1.base_lot_size,
),

View File

@ -23,6 +23,6 @@ tokio = { version = "1", features = ["full"] }
serde = "1.0.130"
serde_derive = "1.0.130"
mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
mango-v4-client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
anchor-lang = "0.25.0"
anchor-client = "0.25.0"

View File

@ -16,10 +16,12 @@ use {
use anchor_client::Cluster;
use anchor_lang::Discriminator;
use client::{chain_data, health_cache, AccountFetcher, Client, MangoGroupContext};
use fixed::types::I80F48;
use mango_feeds_lib::metrics::*;
use mango_v4::state::{MangoAccount, MangoAccountValue, PerpMarketIndex};
use mango_v4_client::{
chain_data, health_cache, AccountFetcher, Client, MangoGroupContext, TransactionBuilderConfig,
};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::{account::ReadableAccount, signature::Keypair};
#[derive(Clone, Debug, Deserialize)]
@ -67,7 +69,7 @@ async fn compute_pnl(
} else {
return None;
};
Some((pp.market_index, settleable_pnl))
Some((pp.market_index, I80F48::from_bits(settleable_pnl.to_bits())))
})
.collect::<Vec<(PerpMarketIndex, I80F48)>>();
@ -126,7 +128,7 @@ fn start_pnl_updater(
// Alternatively, we could prepare the sorted and limited lists for each
// market here. That would be faster and cause less contention on the pnl_data
// lock, but it looks like it's very far from being an issue.
pnls.push((pubkey.clone(), pnl_vals));
pnls.push((*pubkey, pnl_vals));
}
*pnl_data.write().unwrap() = pnls;
@ -248,9 +250,11 @@ async fn main() -> anyhow::Result<()> {
let client = Client::new(
cluster.clone(),
commitment,
&Keypair::new(),
Arc::new(Keypair::new()),
Some(rpc_timeout),
0,
TransactionBuilderConfig {
prioritization_micro_lamports: None,
},
);
let group_context = Arc::new(
MangoGroupContext::new_from_rpc(