Use new geyser plugin, add orderbook service

This commit is contained in:
Riordan Panayides 2022-12-16 11:00:42 +00:00
parent c00935bae7
commit 49bcdeee77
11 changed files with 1862 additions and 759 deletions

1408
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -3,6 +3,7 @@ members = [
"lib",
"service-mango-fills",
"service-mango-pnl",
"service-mango-orderbook",
]

View File

@ -44,6 +44,7 @@ log = "0.4"
rand = "0.7"
anyhow = "1.0"
bytes = "1.0"
itertools = "0.10.5"
futures = "0.3.17"
futures-core = "0.3"

View File

@ -1,35 +1,51 @@
use futures::stream::once;
use geyser::geyser_client::GeyserClient;
use jsonrpc_core::futures::StreamExt;
use jsonrpc_core_client::transports::http;
use solana_account_decoder::UiAccountEncoding;
use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
use solana_client::rpc_response::{Response, RpcKeyedAccount};
use solana_rpc::{rpc::rpc_accounts::AccountsDataClient, rpc::OptionalContext};
use solana_client::rpc_response::{OptionalContext, Response, RpcKeyedAccount};
use solana_rpc::{rpc::rpc_accounts::AccountsDataClient};
use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey};
use futures::{future, future::FutureExt};
use tonic::transport::{Certificate, ClientTlsConfig, Endpoint, Identity};
use tonic::{
metadata::MetadataValue, Request,
transport::{Channel, Certificate, Identity, ClientTlsConfig},
};
use log::*;
use std::{collections::HashMap, env, str::FromStr, time::Duration};
pub mod geyser_proto {
tonic::include_proto!("accountsdb");
pub mod geyser {
tonic::include_proto!("geyser");
}
use geyser_proto::accounts_db_client::AccountsDbClient;
pub mod solana {
pub mod storage {
pub mod confirmed_block {
tonic::include_proto!("solana.storage.confirmed_block");
}
}
}
pub use geyser::*;
pub use solana::storage::confirmed_block::*;
use crate::FilterConfig;
use crate::{
metrics::{MetricType, Metrics},
AccountWrite, AnyhowWrap, GrpcSourceConfig, SlotStatus, SlotUpdate, SnapshotSourceConfig,
SourceConfig, TlsConfig,
};
use solana_geyser_connector_plugin_grpc::compression::zstd_decompress;
//use solana_geyser_connector_plugin_grpc::compression::zstd_decompress;
type SnapshotData = Response<Vec<RpcKeyedAccount>>;
enum Message {
GrpcUpdate(geyser_proto::Update),
GrpcUpdate(geyser::SubscribeUpdate),
Snapshot(SnapshotData),
}
@ -69,6 +85,7 @@ async fn feed_data_geyser(
grpc_config: &GrpcSourceConfig,
tls_config: Option<ClientTlsConfig>,
snapshot_config: &SnapshotSourceConfig,
filter_config: &FilterConfig,
sender: async_channel::Sender<Message>,
) -> anyhow::Result<()> {
let program_id = Pubkey::from_str(&snapshot_config.program_id)?;
@ -82,7 +99,8 @@ async fn feed_data_geyser(
.expect("reading connection string from env"),
_ => snapshot_config.rpc_http_url.clone(),
};
let endpoint = Endpoint::from_str(&connection_string)?;
info!("connecting {}", connection_string);
let endpoint = Channel::from_shared(connection_string)?;
let channel = if let Some(tls) = tls_config {
endpoint.tls_config(tls)?
} else {
@ -90,12 +108,38 @@ async fn feed_data_geyser(
}
.connect()
.await?;
let mut client = AccountsDbClient::new(channel);
let token: MetadataValue<_> = "dbbf36253d0b2e6a85618a4ef2fa".parse()?;
let mut client = GeyserClient::with_interceptor(channel, move |mut req: Request<()>| {
req.metadata_mut().insert("x-token", token.clone());
Ok(req)
});
let mut update_stream = client
.subscribe(geyser_proto::SubscribeRequest {})
.await?
.into_inner();
let mut accounts = HashMap::new();
accounts.insert(
"client".to_owned(),
SubscribeRequestFilterAccounts {
account: Vec::new(),
owner: filter_config.program_ids.clone(),
},
);
let mut slots = HashMap::new();
slots.insert(
"client".to_owned(),
SubscribeRequestFilterSlots {},
);
let blocks = HashMap::new();
let transactions = HashMap::new();
let request = SubscribeRequest {
accounts,
blocks,
slots,
transactions,
};
info!("Going to send request: {:?}", request);
let response = client.subscribe(once(async move { request })).await?;
let mut update_stream = response.into_inner();
// We can't get a snapshot immediately since the finalized snapshot would be for a
// slot in the past and we'd be missing intermediate updates.
@ -155,15 +199,16 @@ async fn feed_data_geyser(
loop {
tokio::select! {
update = update_stream.next() => {
use geyser_proto::{update::UpdateOneof, slot_update::Status};
use geyser::{subscribe_update::UpdateOneof};
let mut update = update.ok_or(anyhow::anyhow!("geyser plugin has closed the stream"))??;
match update.update_oneof.as_mut().expect("invalid grpc") {
UpdateOneof::SubscribeResponse(subscribe_response) => {
first_full_slot = subscribe_response.highest_write_slot + 1;
},
UpdateOneof::SlotUpdate(slot_update) => {
UpdateOneof::Slot(slot_update) => {
let status = slot_update.status;
if status == Status::Rooted as i32 {
if status == SubscribeUpdateSlotStatus::Finalized as i32 {
if first_full_slot == u64::MAX {
// TODO: is this equivalent to before? what was highesy_write_slot?
first_full_slot = slot_update.slot + 1;
}
if slot_update.slot > max_rooted_slot {
max_rooted_slot = slot_update.slot;
@ -176,21 +221,28 @@ async fn feed_data_geyser(
}
}
},
UpdateOneof::AccountWrite(write) => {
if write.slot < first_full_slot {
UpdateOneof::Account(info) => {
if info.slot < first_full_slot {
// Don't try to process data for slots where we may have missed writes:
// We could not map the write_version correctly for them.
continue;
}
if write.slot > newest_write_slot {
newest_write_slot = write.slot;
} else if max_rooted_slot > 0 && write.slot < max_rooted_slot - max_out_of_order_slots {
anyhow::bail!("received write {} slots back from max rooted slot {}", max_rooted_slot - write.slot, max_rooted_slot);
if info.slot > newest_write_slot {
newest_write_slot = info.slot;
} else if max_rooted_slot > 0 && info.slot < max_rooted_slot - max_out_of_order_slots {
anyhow::bail!("received write {} slots back from max rooted slot {}", max_rooted_slot - info.slot, max_rooted_slot);
}
let pubkey_writes = slot_pubkey_writes.entry(write.slot).or_default();
let pubkey_writes = slot_pubkey_writes.entry(info.slot).or_default();
let mut write = match info.account.clone() {
Some(x) => x,
None => {
// TODO: handle error
continue;
},
};
let pubkey_bytes = Pubkey::new(&write.pubkey).to_bytes();
let write_version_mapping = pubkey_writes.entry(pubkey_bytes).or_insert(WriteVersion {
global: write.write_version,
@ -208,7 +260,8 @@ async fn feed_data_geyser(
write.write_version = write_version_mapping.slot as u64;
write_version_mapping.slot += 1;
},
geyser_proto::update::UpdateOneof::Ping(_) => {},
UpdateOneof::Block(_) => {},
UpdateOneof::Transaction(_) => {},
}
sender.send(Message::GrpcUpdate(update)).await.expect("send success");
},
@ -275,6 +328,7 @@ fn make_tls_config(config: &TlsConfig) -> ClientTlsConfig {
pub async fn process_events(
config: &SourceConfig,
filter_config: &FilterConfig,
account_write_queue_sender: async_channel::Sender<AccountWrite>,
slot_queue_sender: async_channel::Sender<SlotUpdate>,
metrics_sender: Metrics,
@ -285,6 +339,7 @@ pub async fn process_events(
let msg_sender = msg_sender.clone();
let snapshot_source = config.snapshot.clone();
let metrics_sender = metrics_sender.clone();
let f = filter_config.clone();
// Make TLS config if configured
let tls_config = grpc_source.tls.as_ref().map(make_tls_config);
@ -304,6 +359,7 @@ pub async fn process_events(
&grpc_source,
tls_config.clone(),
&snapshot_source,
&f,
msg_sender.clone(),
);
let result = out.await;
@ -351,11 +407,18 @@ pub async fn process_events(
loop {
let msg = msg_receiver.recv().await.expect("sender must not close");
use geyser::{subscribe_update::UpdateOneof};
match msg {
Message::GrpcUpdate(update) => {
match update.update_oneof.expect("invalid grpc") {
geyser_proto::update::UpdateOneof::AccountWrite(update) => {
UpdateOneof::Account(info) => {
let update = match info.account.clone() {
Some(x) => x,
None => {
// TODO: handle error
continue;
},
};
assert!(update.pubkey.len() == 32);
assert!(update.owner.len() == 32);
@ -363,40 +426,40 @@ pub async fn process_events(
metric_account_queue.set(account_write_queue_sender.len() as u64);
// Skip writes that a different server has already sent
let pubkey_writes = latest_write.entry(update.slot).or_default();
let pubkey_writes = latest_write.entry(info.slot).or_default();
let pubkey_bytes = Pubkey::new(&update.pubkey).to_bytes();
let writes = pubkey_writes.entry(pubkey_bytes).or_insert(0);
if update.write_version <= *writes {
continue;
}
*writes = update.write_version;
latest_write.retain(|&k, _| k >= update.slot - latest_write_retention);
let mut uncompressed: Vec<u8> = Vec::new();
zstd_decompress(&update.data, &mut uncompressed).unwrap();
latest_write.retain(|&k, _| k >= info.slot - latest_write_retention);
// let mut uncompressed: Vec<u8> = Vec::new();
// zstd_decompress(&update.data, &mut uncompressed).unwrap();
account_write_queue_sender
.send(AccountWrite {
pubkey: Pubkey::new(&update.pubkey),
slot: update.slot,
slot: info.slot,
write_version: update.write_version,
lamports: update.lamports,
owner: Pubkey::new(&update.owner),
executable: update.executable,
rent_epoch: update.rent_epoch,
data: uncompressed,
is_selected: update.is_selected,
data: update.data,
// TODO: what should this be? related to account deletes?
is_selected: true,
})
.await
.expect("send success");
}
geyser_proto::update::UpdateOneof::SlotUpdate(update) => {
UpdateOneof::Slot(update) => {
metric_slot_updates.increment();
metric_slot_queue.set(slot_queue_sender.len() as u64);
use geyser_proto::slot_update::Status;
let status = Status::from_i32(update.status).map(|v| match v {
Status::Processed => SlotStatus::Processed,
Status::Confirmed => SlotStatus::Confirmed,
Status::Rooted => SlotStatus::Rooted,
let status = SubscribeUpdateSlotStatus::from_i32(update.status).map(|v| match v {
SubscribeUpdateSlotStatus::Processed => SlotStatus::Processed,
SubscribeUpdateSlotStatus::Confirmed => SlotStatus::Confirmed,
SubscribeUpdateSlotStatus::Finalized => SlotStatus::Rooted,
});
if status.is_none() {
error!("unexpected slot status: {}", update.status);
@ -413,8 +476,8 @@ pub async fn process_events(
.await
.expect("send success");
}
geyser_proto::update::UpdateOneof::Ping(_) => {}
geyser_proto::update::UpdateOneof::SubscribeResponse(_) => {}
UpdateOneof::Block(_) => {},
UpdateOneof::Transaction(_) => {},
}
}
Message::Snapshot(update) => {

View File

@ -1,5 +1,6 @@
pub mod chain_data;
pub mod fill_event_filter;
pub mod orderbook_filter;
pub mod grpc_plugin_source;
pub mod memory_target;
pub mod metrics;
@ -8,6 +9,7 @@ pub mod postgres_types_numeric;
pub mod websocket_source;
pub use chain_data::SlotStatus;
use serde::{Serialize, Serializer, ser::SerializeStruct};
use {
async_trait::async_trait,
@ -117,6 +119,30 @@ pub struct SourceConfig {
pub rpc_ws_url: String,
}
#[derive(Clone, Debug, Deserialize)]
pub struct FilterConfig {
pub program_ids: Vec<String>,
}
#[derive(Clone, Debug)]
pub struct StatusResponse<'a> {
pub success: bool,
pub message: &'a str,
}
impl<'a> Serialize for StatusResponse<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("Status", 2)?;
state.serialize_field("success", &self.success)?;
state.serialize_field("message", &self.message)?;
state.end()
}
}
#[derive(Clone, Debug, Deserialize)]
pub struct SnapshotSourceConfig {
pub rpc_http_url: String,

365
lib/src/orderbook_filter.rs Normal file
View File

@ -0,0 +1,365 @@
use crate::{
chain_data::{AccountData, ChainData, SlotData},
metrics::{MetricType, Metrics},
AccountWrite, SlotUpdate,
};
use log::*;
use serde::{ser::SerializeStruct, Serialize, Serializer};
use solana_sdk::{
account::{ReadableAccount, WritableAccount},
clock::Epoch,
pubkey::Pubkey,
};
use std::{
borrow::BorrowMut,
collections::{HashMap, HashSet}, time::{UNIX_EPOCH, SystemTime},
};
use itertools::Itertools;
use crate::metrics::MetricU64;
use anchor_lang::AccountDeserialize;
use mango_v4::state::{BookSide, OrderTreeType};
#[derive(Clone, Debug)]
pub enum OrderbookSide {
Bid = 0,
Ask = 1,
}
impl Serialize for OrderbookSide {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer
{
match *self {
OrderbookSide::Bid => serializer.serialize_unit_variant("Side", 0, "bid"),
OrderbookSide::Ask => serializer.serialize_unit_variant("Side", 1, "ask"),
}
}
}
#[derive(Clone, Debug)]
pub struct OrderbookLevel {
pub price: i64,
pub size: i64,
}
impl Serialize for OrderbookLevel {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("OrderbookLevel", 2)?;
state.serialize_field("price", &self.price)?;
state.serialize_field("size", &self.size)?;
state.end()
}
}
#[derive(Clone, Debug)]
pub struct OrderbookUpdate {
pub market: String,
pub side: OrderbookSide,
pub update: Vec<OrderbookLevel>,
pub slot: u64,
pub write_version: u64,
}
impl Serialize for OrderbookUpdate {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("OrderbookUpdate", 5)?;
state.serialize_field("market", &self.market)?;
state.serialize_field("side", &self.side)?;
state.serialize_field("update", &self.update)?;
state.serialize_field("slot", &self.slot)?;
state.serialize_field("write_version", &self.write_version)?;
state.end()
}
}
#[derive(Clone, Debug)]
pub struct OrderbookCheckpoint {
pub market: String,
pub bids: Vec<OrderbookLevel>,
pub asks: Vec<OrderbookLevel>,
pub slot: u64,
pub write_version: u64,
}
impl Serialize for OrderbookCheckpoint {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("OrderbookCheckpoint", 3)?;
state.serialize_field("market", &self.market)?;
state.serialize_field("bids", &self.bids)?;
state.serialize_field("bids", &self.asks)?;
state.serialize_field("slot", &self.slot)?;
state.serialize_field("write_version", &self.write_version)?;
state.end()
}
}
pub enum OrderbookFilterMessage {
Update(OrderbookUpdate),
Checkpoint(OrderbookCheckpoint),
}
pub struct MarketConfig {
pub name: String,
pub bids: Pubkey,
pub asks: Pubkey,
}
fn publish_changes(
slot: u64,
write_version: u64,
mkt: &(Pubkey, MarketConfig),
bookside: &BookSide,
old_bookside: &BookSide,
other_bookside: Option<&BookSide>,
orderbook_update_sender: &async_channel::Sender<OrderbookFilterMessage>,
metric_updates: &mut MetricU64,
) {
let time_now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
let oracle_price_lots = 0; // todo: does this matter? where to find it?
let side = match bookside.nodes.order_tree_type() {
OrderTreeType::Bids => OrderbookSide::Bid,
OrderTreeType::Asks => OrderbookSide::Ask,
};
let current_l2_snapshot: Vec<OrderbookLevel> = bookside
.iter_valid(time_now, oracle_price_lots)
.map(|item| (item.node.price_data() as i64, item.node.quantity))
.group_by(|(price, _)| *price)
.into_iter()
.map(|(price, group)| OrderbookLevel { price, size: group.map(|(_, quantity)| quantity).fold(0, |acc, x| acc + x)})
.collect();
let previous_l2_snapshot: Vec<OrderbookLevel> = old_bookside
.iter_valid(time_now, oracle_price_lots)
.map(|item| (item.node.price_data() as i64, item.node.quantity))
.group_by(|(price, _)| *price)
.into_iter()
.map(|(price, group)| OrderbookLevel { price, size: group.map(|(_, quantity)| quantity).fold(0, |acc, x| acc + x)})
.collect();
let mut update: Vec<OrderbookLevel> = vec!();
// push diff for levels that are no longer present
for previous_order in previous_l2_snapshot.iter() {
let peer = current_l2_snapshot
.iter()
.find(|level| previous_order.price == level.price);
match peer {
None => {
info!("level removed {}", previous_order.price);
update.push(OrderbookLevel {
price: previous_order.price,
size: 0,
});
},
_ => continue
}
}
// push diff where there's a new level or size has changed
for current_order in &current_l2_snapshot {
let peer = previous_l2_snapshot
.iter()
.find(|item| item.price == current_order.price);
match peer {
Some(previous_order) => {
if previous_order.size == current_order.size {
continue;
}
debug!("size changed {} -> {}", previous_order.size, current_order.size);
update.push(current_order.clone());
},
None => {
debug!("new level {},{}", current_order.price, current_order.size);
update.push(current_order.clone())
}
}
}
match other_bookside {
Some(other_bookside) => {
let other_l2_snapshot = other_bookside
.iter_valid(time_now, oracle_price_lots)
.map(|item| (item.node.price_data() as i64, item.node.quantity))
.group_by(|(price, _)| *price)
.into_iter()
.map(|(price, group)| OrderbookLevel { price, size: group.map(|(_, quantity)| quantity).fold(0, |acc, x| acc + x)})
.collect();
let (bids, asks) = match side {
OrderbookSide::Bid => (current_l2_snapshot, other_l2_snapshot),
OrderbookSide::Ask => (other_l2_snapshot, current_l2_snapshot)
};
orderbook_update_sender
.try_send(OrderbookFilterMessage::Checkpoint(OrderbookCheckpoint {
slot,
write_version,
bids,
asks,
market: mkt.0.to_string(),
}))
.unwrap()
},
None => info!("other bookside not in cache"),
}
if update.len() == 0 {
return;
}
info!("diff {} {:?}", mkt.1.name, update);
orderbook_update_sender
.try_send(OrderbookFilterMessage::Update(OrderbookUpdate {
market: mkt.0.to_string(),
side: side.clone(),
update,
slot,
write_version,
}))
.unwrap(); // TODO: use anyhow to bubble up error
metric_updates.increment();
}
pub async fn init(
market_configs: Vec<(Pubkey, MarketConfig)>,
metrics_sender: Metrics,
) -> anyhow::Result<(
async_channel::Sender<AccountWrite>,
async_channel::Sender<SlotUpdate>,
async_channel::Receiver<OrderbookFilterMessage>,
)> {
let metrics_sender = metrics_sender.clone();
let mut metric_events_new =
metrics_sender.register_u64("orderbook_updates".into(), MetricType::Counter);
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
let (account_write_queue_sender, account_write_queue_receiver) =
async_channel::unbounded::<AccountWrite>();
// Slot updates flowing from the outside into the single processing thread. From
// there they'll flow into the postgres sending thread.
let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>();
// Fill updates can be consumed by client connections, they contain all fills for all markets
let (fill_update_sender, fill_update_receiver) =
async_channel::unbounded::<OrderbookFilterMessage>();
let account_write_queue_receiver_c = account_write_queue_receiver.clone();
let mut chain_cache = ChainData::new();
let mut bookside_cache: HashMap<String, BookSide> = HashMap::new();
let mut last_write_versions = HashMap::<String, (u64, u64)>::new();
let relevant_pubkeys = market_configs.iter().flat_map(|m| [m.1.bids, m.1.asks]).collect::<HashSet<Pubkey>>();
info!("relevant_pubkeys {:?}", relevant_pubkeys);
// update handling thread, reads both sloths and account updates
tokio::spawn(async move {
loop {
tokio::select! {
Ok(account_write) = account_write_queue_receiver_c.recv() => {
if !relevant_pubkeys.contains(&account_write.pubkey) {
continue;
}
info!("updating account {}", &account_write.pubkey);
chain_cache.update_account(
account_write.pubkey,
AccountData {
slot: account_write.slot,
write_version: account_write.write_version,
account: WritableAccount::create(
account_write.lamports,
account_write.data.clone(),
account_write.owner,
account_write.executable,
account_write.rent_epoch as Epoch,
),
},
);
}
Ok(slot_update) = slot_queue_receiver.recv() => {
chain_cache.update_slot(SlotData {
slot: slot_update.slot,
parent: slot_update.parent,
status: slot_update.status,
chain: 0,
});
}
}
for mkt in market_configs.iter() {
for side in 0..2 {
let mkt_pk = mkt.0;
let side_pk = if side == 0 {
mkt.1.bids
} else {
mkt.1.asks
};
let other_side_pk = if side == 0 {
mkt.1.asks
} else {
mkt.1.bids
};
let last_write_version = last_write_versions
.get(&side_pk.to_string())
.unwrap_or(&(0, 0));
match chain_cache.account(&side_pk) {
Ok(account_info) => {
let side_pk_string = side_pk.to_string();
let write_version = (account_info.slot, account_info.write_version);
// todo: should this be <= so we don't overwrite with old data received late?
if write_version == *last_write_version {
continue;
}
last_write_versions.insert(side_pk_string.clone(), write_version);
let account = &account_info.account;
let bookside =
BookSide::try_deserialize(account.data().borrow_mut()).unwrap();
let other_bookside = bookside_cache.get(&other_side_pk.to_string());
match bookside_cache.get(&side_pk_string) {
Some(old_bookside) => publish_changes(
account_info.slot,
account_info.write_version,
mkt,
&bookside,
&old_bookside,
other_bookside,
&fill_update_sender,
&mut metric_events_new,
),
_ => info!("bookside_cache could not find {}", side_pk_string),
}
bookside_cache.insert(side_pk_string.clone(), bookside.clone());
}
Err(_) => info!("chain_cache could not find {}", mkt_pk),
}
}
}
}
});
Ok((
account_write_queue_sender,
slot_queue_sender,
fill_update_receiver,
))
}

View File

@ -4,11 +4,10 @@ use jsonrpc_core_client::transports::{http, ws};
use solana_account_decoder::UiAccountEncoding;
use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
//rpc_filter::RpcFilterType,
rpc_response::{Response, RpcKeyedAccount},
rpc_response::{OptionalContext, Response, RpcKeyedAccount},
};
use solana_rpc::{
rpc::rpc_accounts::AccountsDataClient, rpc::OptionalContext, rpc_pubsub::RpcSolPubSubClient,
rpc::rpc_accounts::AccountsDataClient, rpc_pubsub::RpcSolPubSubClient,
};
use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey};

View File

@ -1,58 +1,92 @@
syntax = "proto3";
option java_multiple_files = true;
option java_package = "mango.v3.geyser";
option java_outer_classname = "GeyserProto";
import public "solana-storage-v1.10.40.proto";
package accountsdb;
package geyser;
service AccountsDb {
rpc Subscribe(SubscribeRequest) returns (stream Update) {}
service Geyser {
rpc Subscribe(stream SubscribeRequest) returns (stream SubscribeUpdate) {}
}
message SubscribeRequest {
map<string, SubscribeRequestFilterAccounts> accounts = 1;
map<string, SubscribeRequestFilterSlots> slots = 2;
map<string, SubscribeRequestFilterTransactions> transactions = 3;
map<string, SubscribeRequestFilterBlocks> blocks = 4;
}
message Update {
message SubscribeRequestFilterAccounts {
repeated string account = 2;
repeated string owner = 3;
}
message SubscribeRequestFilterSlots {}
message SubscribeRequestFilterTransactions {
optional bool vote = 1;
optional bool failed = 2;
repeated string account_include = 3;
repeated string account_exclude = 4;
}
message SubscribeRequestFilterBlocks {}
message SubscribeUpdate {
repeated string filters = 1;
oneof update_oneof {
AccountWrite account_write = 1;
SlotUpdate slot_update = 2;
Ping ping = 3;
SubscribeResponse subscribe_response = 4;
SubscribeUpdateAccount account = 2;
SubscribeUpdateSlot slot = 3;
SubscribeUpdateTransaction transaction = 4;
SubscribeUpdateBlock block = 5;
}
}
message AccountWrite {
uint64 slot = 1;
bytes pubkey = 2;
uint64 lamports = 3;
bytes owner = 4;
bool executable = 5;
uint64 rent_epoch = 6;
bytes data = 7;
uint64 write_version = 8;
// Is this write part of the set of writes sent on startup?
bool is_startup = 9;
// Is this write sent because it matches the connector criterion?
// If false, then it is sent because this address previously matched
// the criterion (i.e. account is closed/reused)
bool is_selected = 10;
message SubscribeUpdateAccount {
SubscribeUpdateAccountInfo account = 1;
uint64 slot = 2;
bool is_startup = 3;
}
message SlotUpdate {
message SubscribeUpdateAccountInfo {
bytes pubkey = 1;
uint64 lamports = 2;
bytes owner = 3;
bool executable = 4;
uint64 rent_epoch = 5;
bytes data = 6;
uint64 write_version = 7;
// bytes txn_signature = 8;
}
message SubscribeUpdateSlot {
uint64 slot = 1;
optional uint64 parent = 2;
enum Status {
PROCESSED = 0;
ROOTED = 1;
CONFIRMED = 2;
}
Status status = 3;
SubscribeUpdateSlotStatus status = 3;
}
message Ping {
enum SubscribeUpdateSlotStatus {
PROCESSED = 0;
CONFIRMED = 1;
FINALIZED = 2;
}
message SubscribeResponse {
uint64 highest_write_slot = 1;
message SubscribeUpdateTransaction {
SubscribeUpdateTransactionInfo transaction = 1;
uint64 slot = 2;
}
message SubscribeUpdateTransactionInfo {
bytes signature = 1;
bool is_vote = 2;
solana.storage.ConfirmedBlock.Transaction transaction = 3;
solana.storage.ConfirmedBlock.TransactionStatusMeta meta = 4;
// uint64 index = 5;
}
message SubscribeUpdateBlock {
uint64 slot = 1;
string blockhash = 2;
solana.storage.ConfirmedBlock.Rewards rewards = 3;
solana.storage.ConfirmedBlock.UnixTimestamp block_time = 4;
solana.storage.ConfirmedBlock.BlockHeight block_height = 5;
}

View File

@ -0,0 +1,118 @@
syntax = "proto3";
package solana.storage.ConfirmedBlock;
message ConfirmedBlock {
string previous_blockhash = 1;
string blockhash = 2;
uint64 parent_slot = 3;
repeated ConfirmedTransaction transactions = 4;
repeated Reward rewards = 5;
UnixTimestamp block_time = 6;
BlockHeight block_height = 7;
}
message ConfirmedTransaction {
Transaction transaction = 1;
TransactionStatusMeta meta = 2;
}
message Transaction {
repeated bytes signatures = 1;
Message message = 2;
}
message Message {
MessageHeader header = 1;
repeated bytes account_keys = 2;
bytes recent_blockhash = 3;
repeated CompiledInstruction instructions = 4;
bool versioned = 5;
repeated MessageAddressTableLookup address_table_lookups = 6;
}
message MessageHeader {
uint32 num_required_signatures = 1;
uint32 num_readonly_signed_accounts = 2;
uint32 num_readonly_unsigned_accounts = 3;
}
message MessageAddressTableLookup {
bytes account_key = 1;
bytes writable_indexes = 2;
bytes readonly_indexes = 3;
}
message TransactionStatusMeta {
TransactionError err = 1;
uint64 fee = 2;
repeated uint64 pre_balances = 3;
repeated uint64 post_balances = 4;
repeated InnerInstructions inner_instructions = 5;
bool inner_instructions_none = 10;
repeated string log_messages = 6;
bool log_messages_none = 11;
repeated TokenBalance pre_token_balances = 7;
repeated TokenBalance post_token_balances = 8;
repeated Reward rewards = 9;
repeated bytes loaded_writable_addresses = 12;
repeated bytes loaded_readonly_addresses = 13;
}
message TransactionError {
bytes err = 1;
}
message InnerInstructions {
uint32 index = 1;
repeated CompiledInstruction instructions = 2;
}
message CompiledInstruction {
uint32 program_id_index = 1;
bytes accounts = 2;
bytes data = 3;
}
message TokenBalance {
uint32 account_index = 1;
string mint = 2;
UiTokenAmount ui_token_amount = 3;
string owner = 4;
string program_id = 5;
}
message UiTokenAmount {
double ui_amount = 1;
uint32 decimals = 2;
string amount = 3;
string ui_amount_string = 4;
}
enum RewardType {
Unspecified = 0;
Fee = 1;
Rent = 2;
Staking = 3;
Voting = 4;
}
message Reward {
string pubkey = 1;
int64 lamports = 2;
uint64 post_balance = 3;
RewardType reward_type = 4;
string commission = 5;
}
message Rewards {
repeated Reward rewards = 1;
}
message UnixTimestamp {
int64 timestamp = 1;
}
message BlockHeight {
uint64 block_height = 1;
}

View File

@ -0,0 +1,31 @@
[package]
name = "service-mango-orderbook"
version = "0.1.0"
authors = ["Riordan Panayides <riordan@panayid.es>"]
edition = "2021"
[dependencies]
solana-geyser-connector-lib = { path = "../lib" }
solana-logger = "*"
bs58 = "*"
log = "*"
anyhow = "*"
toml = "*"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
futures-channel = "0.3"
futures-util = "0.3"
ws = "^0.9.2"
async-channel = "1.6"
async-trait = "0.1"
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.17"
bytemuck = "1.7.2"
mango-v4 = { git = "ssh://git@github.com/blockworks-foundation/mango-v4", branch = "dev" }
client = { git = "ssh://git@github.com/blockworks-foundation/mango-v4", branch = "dev" }
serum_dex = { git = "ssh://git@github.com/openbook-dex/program", branch = "master" }
anchor-lang = "0.25.0"
anchor-client = "0.25.0"

View File

@ -0,0 +1,405 @@
use anchor_client::{
solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair},
Cluster,
};
use anchor_lang::prelude::Pubkey;
use client::{Client, MangoGroupContext};
use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::{
future::{self, Ready},
pin_mut,
SinkExt, StreamExt, TryStreamExt,
};
use log::*;
use std::{
collections::{HashMap, HashSet},
fs::File,
io::Read,
net::SocketAddr,
str::FromStr,
sync::Arc,
sync::Mutex,
time::Duration,
};
use tokio::{
net::{TcpListener, TcpStream},
pin,
};
use tokio_tungstenite::tungstenite::{protocol::Message, Error};
use serde::{Deserialize};
use solana_geyser_connector_lib::{
metrics::{MetricType, MetricU64},
orderbook_filter::{self, MarketConfig, OrderbookCheckpoint, OrderbookFilterMessage},
FilterConfig, StatusResponse,
};
use solana_geyser_connector_lib::{
grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig,
};
type CheckpointMap = Arc<Mutex<HashMap<String, OrderbookCheckpoint>>>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Peer>>>;
#[derive(Clone, Debug, Deserialize)]
#[serde(tag = "command")]
pub enum Command {
#[serde(rename = "subscribe")]
Subscribe(SubscribeCommand),
#[serde(rename = "unsubscribe")]
Unsubscribe(UnsubscribeCommand),
#[serde(rename = "getMarkets")]
GetMarkets,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SubscribeCommand {
pub market_id: String,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct UnsubscribeCommand {
pub market_id: String,
}
#[derive(Clone, Debug)]
pub struct Peer {
pub sender: UnboundedSender<Message>,
pub subscriptions: HashSet<String>,
}
#[derive(Clone, Debug, Deserialize)]
pub struct Config {
pub source: SourceConfig,
pub metrics: MetricsConfig,
pub bind_ws_addr: String,
pub rpc_http_url: String,
pub mango_group: String,
}
async fn handle_connection_error(
checkpoint_map: CheckpointMap,
peer_map: PeerMap,
market_ids: Vec<String>,
raw_stream: TcpStream,
addr: SocketAddr,
metrics_opened_connections: MetricU64,
metrics_closed_connections: MetricU64,
) {
metrics_opened_connections.clone().increment();
let result = handle_connection(
checkpoint_map,
peer_map.clone(),
market_ids,
raw_stream,
addr,
)
.await;
if result.is_err() {
error!("connection {} error {}", addr, result.unwrap_err());
};
metrics_closed_connections.clone().increment();
peer_map.lock().unwrap().remove(&addr);
}
async fn handle_connection(
checkpoint_map: CheckpointMap,
peer_map: PeerMap,
market_ids: Vec<String>,
raw_stream: TcpStream,
addr: SocketAddr,
) -> Result<(), Error> {
info!("ws connected: {}", addr);
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
let (ws_tx, ws_rx) = ws_stream.split();
// 1: publish channel in peer map
let (chan_tx, chan_rx) = unbounded();
{
peer_map.lock().unwrap().insert(
addr,
Peer {
sender: chan_tx,
subscriptions: HashSet::<String>::new(),
},
);
}
let receive_commands = ws_rx.try_for_each(|msg| {
handle_commands(
addr,
msg,
peer_map.clone(),
checkpoint_map.clone(),
market_ids.clone(),
)
});
let forward_updates = chan_rx.map(Ok).forward(ws_tx);
pin_mut!(receive_commands, forward_updates);
future::select(receive_commands, forward_updates).await;
peer_map.lock().unwrap().remove(&addr);
info!("ws disconnected: {}", &addr);
Ok(())
}
fn handle_commands(
addr: SocketAddr,
msg: Message,
peer_map: PeerMap,
checkpoint_map: CheckpointMap,
market_ids: Vec<String>,
) -> Ready<Result<(), Error>> {
let msg_str = msg.clone().into_text().unwrap();
let command: Result<Command, serde_json::Error> = serde_json::from_str(&msg_str);
let mut peers = peer_map.lock().unwrap();
let peer = peers.get_mut(&addr).expect("peer should be in map");
match command {
Ok(Command::Subscribe(cmd)) => {
let market_id = cmd.clone().market_id;
let subscribed = peer.subscriptions.insert(market_id.clone());
if !market_ids.contains(&market_id) {
let res = StatusResponse {
success: false,
message: "market not found",
};
peer.sender
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
.unwrap();
return future::ok(());
}
let res = if subscribed {
StatusResponse {
success: true,
message: "subscribed",
}
} else {
StatusResponse {
success: false,
message: "already subscribed",
}
};
peer.sender
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
.unwrap();
if subscribed {
let checkpoint_map = checkpoint_map.lock().unwrap();
let checkpoint = checkpoint_map.get(&market_id);
match checkpoint {
Some(checkpoint) => {
peer.sender
.unbounded_send(Message::Text(
serde_json::to_string(&checkpoint).unwrap(),
))
.unwrap();
}
None => info!("no checkpoint available on client subscription"), // todo: what to do here?
}
}
}
Ok(Command::Unsubscribe(cmd)) => {
info!("unsubscribe {}", cmd.market_id);
let unsubscribed = peer.subscriptions.remove(&cmd.market_id);
let res = if unsubscribed {
StatusResponse {
success: true,
message: "unsubscribed",
}
} else {
StatusResponse {
success: false,
message: "not subscribed",
}
};
peer.sender
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
.unwrap();
},
Ok(Command::GetMarkets) => {
info!("getMarkets");
peer.sender
.unbounded_send(Message::Text(serde_json::to_string(&market_ids).unwrap()))
.unwrap();
},
Err(err) => {
info!("error deserializing user input {:?}", err);
let res = StatusResponse {
success: false,
message: "invalid input",
};
peer.sender
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
.unwrap();
}
};
future::ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args: Vec<String> = std::env::args().collect();
if args.len() < 2 {
eprintln!("Please enter a config file path argument");
return Ok(());
}
let config: Config = {
let mut file = File::open(&args[1])?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
toml::from_str(&contents).unwrap()
};
solana_logger::setup_with_default("info");
let metrics_tx = metrics::start(config.metrics, "orderbook".into());
let metrics_opened_connections =
metrics_tx.register_u64("orderbook_opened_connections".into(), MetricType::Counter);
let metrics_closed_connections =
metrics_tx.register_u64("orderbook_closed_connections".into(), MetricType::Counter);
let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new()));
let peers = PeerMap::new(Mutex::new(HashMap::new()));
let rpc_url = config.rpc_http_url;
let ws_url = rpc_url.replace("https", "wss");
let rpc_timeout = Duration::from_secs(10);
let cluster = Cluster::Custom(rpc_url.clone(), ws_url.clone());
let client = Client::new(
cluster.clone(),
CommitmentConfig::processed(),
&Keypair::new(),
Some(rpc_timeout),
);
let group_context = Arc::new(MangoGroupContext::new_from_rpc(
Pubkey::from_str(&config.mango_group).unwrap(),
client.cluster.clone(),
client.commitment,
)?);
// todo: reload markets at intervals
let market_pubkey_strings: Vec<String> = group_context
.perp_markets
.iter()
.map(|(_, context)| context.address.to_string())
.collect();
let market_configs: Vec<(Pubkey, MarketConfig)> = group_context
.perp_markets
.iter()
.map(|(_, context)| {
(
context.address,
MarketConfig {
name: context.market.name().to_owned(),
bids: context.market.bids,
asks: context.market.asks,
},
)
})
.collect();
let (account_write_queue_sender, slot_queue_sender, orderbook_receiver) =
orderbook_filter::init(market_configs, metrics_tx.clone()).await?;
let checkpoints_ref_thread = checkpoints.clone();
let peers_ref_thread = peers.clone();
tokio::spawn(async move {
pin!(orderbook_receiver);
loop {
let message = orderbook_receiver.recv().await.unwrap();
match message {
OrderbookFilterMessage::Update(update) => {
debug!("ws update {} {:?}", update.market, update.side);
let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
for (addr, peer) in peer_copy.iter_mut() {
let json = serde_json::to_string(&update).unwrap();
// only send updates if the peer is subscribed
if peer.subscriptions.contains(&update.market) {
let result = peer.sender.send(Message::Text(json)).await;
if result.is_err() {
error!(
"ws update {} {:?} fill could not reach {}",
update.market, update.side, addr
);
}
}
}
}
OrderbookFilterMessage::Checkpoint(checkpoint) => {
checkpoints_ref_thread
.lock()
.unwrap()
.insert(checkpoint.market.clone(), checkpoint);
}
}
}
});
info!("ws listen: {}", config.bind_ws_addr);
let try_socket = TcpListener::bind(&config.bind_ws_addr).await;
let listener = try_socket.expect("Failed to bind");
tokio::spawn(async move {
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {
tokio::spawn(handle_connection_error(
checkpoints.clone(),
peers.clone(),
market_pubkey_strings.clone(),
stream,
addr,
metrics_opened_connections.clone(),
metrics_closed_connections.clone(),
));
}
});
info!(
"rpc connect: {}",
config
.source
.grpc_sources
.iter()
.map(|c| c.connection_string.clone())
.collect::<String>()
);
let use_geyser = true;
if use_geyser {
let filter_config = FilterConfig {
program_ids: vec![
"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(),
//"srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX".into(),
],
};
grpc_plugin_source::process_events(
&config.source,
&filter_config,
account_write_queue_sender,
slot_queue_sender,
metrics_tx.clone(),
)
.await;
} else {
websocket_source::process_events(
&config.source,
account_write_queue_sender,
slot_queue_sender,
)
.await;
}
Ok(())
}