Add custom filtering

This commit is contained in:
Serge Farny 2024-03-13 10:53:12 +01:00
parent 5e72a9e7eb
commit 1096ee33a8
13 changed files with 121 additions and 33 deletions

2
.gitignore vendored
View File

@ -4,6 +4,8 @@
.DS_Store
.idea/
*.pem
.vscode
.idea
node_modules
dist

4
Cargo.lock generated
View File

@ -2708,7 +2708,7 @@ dependencies = [
[[package]]
name = "mango-feeds-connector"
version = "0.3.0"
version = "0.3.1"
dependencies = [
"anyhow",
"async-channel",
@ -2718,7 +2718,6 @@ dependencies = [
"itertools 0.10.5",
"jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-core-client",
"log 0.4.21",
"rustls 0.20.9",
"serde",
"serde_derive",
@ -2728,6 +2727,7 @@ dependencies = [
"solana-rpc",
"solana-sdk",
"tokio",
"tracing",
"warp",
"yellowstone-grpc-client",
"yellowstone-grpc-proto",

View File

@ -23,7 +23,7 @@ jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] }
bs58 = "0.5"
base64 = "0.21.0"
log = "0.4"
tracing = "0.1.40"
rand = "0.7"
anyhow = "1.0"
toml = "0.5"

View File

@ -1,6 +1,6 @@
[package]
name = "mango-feeds-connector"
version = "0.3.0"
version = "0.3.1"
authors = ["Christian Kamm <mail@ckamm.de>"]
edition = "2021"
license = "AGPL-3.0-or-later"
@ -27,7 +27,6 @@ rustls = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
log = { workspace = true }
anyhow = { workspace = true }
itertools = { workspace = true }
@ -40,6 +39,7 @@ async-trait = { workspace = true }
warp = { workspace = true }
# 1.9.0+solana.1.16.1
tracing = { workspace = true }
yellowstone-grpc-client = { workspace = true }
yellowstone-grpc-proto = { workspace = true }

View File

@ -95,8 +95,8 @@ async fn main() -> anyhow::Result<()> {
let filter_config = filter_config1;
grpc_plugin_source::process_events(
&config,
&filter_config,
config,
filter_config,
account_write_queue_sender,
slot_queue_sender,
metrics_tx.clone(),

View File

@ -95,8 +95,8 @@ async fn main() -> anyhow::Result<()> {
let filter_config = filter_config1;
grpc_plugin_source::process_events(
&config,
&filter_config,
config,
filter_config,
account_write_queue_sender,
slot_queue_sender,
metrics_tx.clone(),

View File

@ -77,8 +77,8 @@ async fn main() -> anyhow::Result<()> {
});
websocket_source::process_events(
&config,
&filter_config,
config,
filter_config,
account_write_queue_sender,
slot_queue_sender,
)

View File

@ -5,13 +5,13 @@ use crate::{
};
use async_trait::async_trait;
use log::*;
use solana_sdk::{account::WritableAccount, pubkey::Pubkey, stake_history::Epoch};
use std::{
collections::{BTreeSet, HashMap},
sync::Arc,
time::{Duration, Instant},
};
use tracing::*;
#[async_trait]
pub trait AccountWriteSink {
@ -83,7 +83,7 @@ pub fn init(
),
},
);
}
},
Ok(slot_update) = slot_queue_receiver.recv() => {
trace!("slot update processed {:?}", slot_update);
chain_data.update_slot(SlotData {

View File

@ -11,21 +11,26 @@ use yellowstone_grpc_proto::tonic::{
Request,
};
use log::*;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{collections::HashMap, env, str::FromStr, time::Duration};
use tracing::*;
use yellowstone_grpc_proto::geyser::{
subscribe_request_filter_accounts_filter, subscribe_request_filter_accounts_filter_memcmp,
SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterAccountsFilterMemcmp,
};
use yellowstone_grpc_proto::prelude::{
geyser_client::GeyserClient, subscribe_update, CommitmentLevel, SubscribeRequest,
SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots, SubscribeUpdate,
};
use crate::snapshot::{get_snapshot_gma, get_snapshot_gpa};
use crate::snapshot::{get_filtered_snapshot_gpa, get_snapshot_gma, get_snapshot_gpa};
use crate::{
chain_data::SlotStatus,
metrics::{MetricType, Metrics},
AccountWrite, GrpcSourceConfig, SlotUpdate, SnapshotSourceConfig, SourceConfig, TlsConfig,
AccountWrite, FeedFilterType, GrpcSourceConfig, SlotUpdate, SnapshotSourceConfig, SourceConfig,
TlsConfig,
};
use crate::{EntityFilter, FilterConfig};
@ -113,6 +118,29 @@ async fn feed_data_geyser(
},
);
}
EntityFilter::FilterByProgramIdSelective(program_id, criteria) => {
accounts.insert(
"client".to_owned(),
SubscribeRequestFilterAccounts {
account: vec![],
owner: vec![program_id.to_string()],
filters: criteria.iter().map(|c| {
SubscribeRequestFilterAccountsFilter {
filter: Some(match c {
FeedFilterType::DataSize(ds) => subscribe_request_filter_accounts_filter::Filter::Datasize(*ds),
FeedFilterType::Memcmp(cmp) => {
subscribe_request_filter_accounts_filter::Filter::Memcmp(SubscribeRequestFilterAccountsFilterMemcmp {
offset: cmp.offset as u64,
data: Some(subscribe_request_filter_accounts_filter_memcmp::Data::Bytes(cmp.bytes.clone()))
})
},
FeedFilterType::TokenAccountState => subscribe_request_filter_accounts_filter::Filter::TokenAccountState(true)
})
}
}).collect(),
},
);
}
}
slots.insert(
@ -225,6 +253,9 @@ async fn feed_data_geyser(
EntityFilter::FilterByProgramId(program_id) => {
snapshot_gpa = tokio::spawn(get_snapshot_gpa(snapshot_rpc_http_url.clone(), program_id.to_string())).fuse();
},
EntityFilter::FilterByProgramIdSelective(program_id,filters) => {
snapshot_gpa = tokio::spawn(get_filtered_snapshot_gpa(snapshot_rpc_http_url.clone(), program_id.to_string(), Some(filters.clone()))).fuse();
}
};
}
}
@ -363,8 +394,8 @@ fn make_tls_config(config: &TlsConfig) -> ClientTlsConfig {
}
pub async fn process_events(
config: &SourceConfig,
filter_config: &FilterConfig,
config: SourceConfig,
filter_config: FilterConfig,
account_write_queue_sender: async_channel::Sender<AccountWrite>,
slot_queue_sender: async_channel::Sender<SlotUpdate>,
metrics_sender: Metrics,
@ -544,6 +575,7 @@ pub async fn process_events(
Message::Snapshot(update) => {
metric_snapshots.increment();
info!("processing snapshot...");
for account in update.accounts.iter() {
metric_snapshot_account_writes.increment();
metric_account_queue.set(account_write_queue_sender.len() as u64);
@ -553,6 +585,7 @@ pub async fn process_events(
// TODO: Resnapshot on invalid data?
let pubkey = Pubkey::from_str(key).unwrap();
let account: Account = ui_account.decode().unwrap();
account_write_queue_sender
.send(AccountWrite::from(pubkey, update.slot, 0, account))
.await
@ -561,6 +594,7 @@ pub async fn process_events(
(key, None) => warn!("account not found {}", key),
}
}
info!("processing snapshot done");
}
}

View File

@ -6,9 +6,10 @@ pub mod snapshot;
pub mod websocket_source;
use itertools::Itertools;
use solana_client::rpc_filter::RpcFilterType;
use std::str::FromStr;
use {
serde_derive::Deserialize,
serde_derive::{Deserialize, Serialize},
solana_sdk::{account::Account, pubkey::Pubkey},
};
@ -94,10 +95,36 @@ pub struct SnapshotSourceConfig {
pub rpc_http_url: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum FeedFilterType {
DataSize(u64),
Memcmp(Memcmp),
TokenAccountState,
}
impl FeedFilterType {
fn to_rpc_filter(&self) -> RpcFilterType {
match self {
FeedFilterType::Memcmp(m) => RpcFilterType::Memcmp(
solana_client::rpc_filter::Memcmp::new_raw_bytes(m.offset, m.bytes.clone()),
),
FeedFilterType::DataSize(ds) => RpcFilterType::DataSize(*ds),
FeedFilterType::TokenAccountState => RpcFilterType::TokenAccountState,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Memcmp {
pub offset: usize,
pub bytes: Vec<u8>,
}
#[derive(Clone, Debug, Deserialize)]
pub enum EntityFilter {
FilterByAccountIds(Vec<Pubkey>),
FilterByProgramId(Pubkey),
FilterByProgramIdSelective(Pubkey, Vec<FeedFilterType>),
}
impl EntityFilter {
pub fn filter_by_program_id(program_id: &str) -> Self {

View File

@ -1,10 +1,10 @@
use {
crate::MetricsConfig,
log::*,
std::collections::HashMap,
std::fmt,
std::sync::{atomic, Arc, Mutex, RwLock},
tokio::time,
tracing::*,
warp::{Filter, Rejection, Reply},
};

View File

@ -1,14 +1,15 @@
use jsonrpc_core_client::transports::http;
use log::*;
use solana_account_decoder::{UiAccount, UiAccountEncoding};
use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_response::{OptionalContext, RpcKeyedAccount},
};
use solana_rpc::rpc::rpc_accounts::AccountsDataClient;
use solana_rpc::rpc::rpc_accounts_scan::AccountsScanClient;
use solana_sdk::{commitment_config::CommitmentConfig, slot_history::Slot};
use tracing::*;
use crate::AnyhowWrap;
use crate::{AnyhowWrap, FeedFilterType};
/// gPA snapshot struct
pub struct SnapshotProgramAccounts {
@ -27,7 +28,15 @@ pub async fn get_snapshot_gpa(
rpc_http_url: String,
program_id: String,
) -> anyhow::Result<SnapshotProgramAccounts> {
let rpc_client = http::connect::<crate::GetProgramAccountsClient>(&rpc_http_url)
get_filtered_snapshot_gpa(rpc_http_url, program_id, None).await
}
pub async fn get_filtered_snapshot_gpa(
rpc_http_url: String,
program_id: String,
filters: Option<Vec<FeedFilterType>>,
) -> anyhow::Result<SnapshotProgramAccounts> {
let rpc_client = http::connect_with_options::<AccountsScanClient>(&rpc_http_url, true)
.await
.map_err_anyhow()?;
@ -38,7 +47,7 @@ pub async fn get_snapshot_gpa(
min_context_slot: None,
};
let program_accounts_config = RpcProgramAccountsConfig {
filters: None,
filters: filters.map(|v| v.iter().map(|f| f.to_rpc_filter()).collect()),
with_context: Some(true),
account_config: account_info_config.clone(),
};

View File

@ -12,7 +12,6 @@ use solana_sdk::{
};
use anyhow::Context;
use log::*;
use std::ops::Sub;
use std::{
str::FromStr,
@ -20,13 +19,14 @@ use std::{
time::{Duration, Instant},
};
use tokio::time::timeout;
use tracing::*;
use crate::snapshot::{
get_snapshot_gma, get_snapshot_gpa, SnapshotMultipleAccounts, SnapshotProgramAccounts,
};
use crate::{
chain_data::SlotStatus, AccountWrite, AnyhowWrap, EntityFilter, FilterConfig, SlotUpdate,
SourceConfig,
chain_data::SlotStatus, AccountWrite, AnyhowWrap, EntityFilter, FeedFilterType, FilterConfig,
SlotUpdate, SourceConfig,
};
const SNAPSHOT_REFRESH_INTERVAL: Duration = Duration::from_secs(300);
@ -54,6 +54,15 @@ async fn feed_data(
EntityFilter::FilterByProgramId(program_id) => {
feed_data_by_program(config, program_id.to_string(), sender).await
}
EntityFilter::FilterByProgramIdSelective(program_id, filters) => {
feed_data_by_program_and_filters(
config,
program_id.to_string(),
sender,
Some(filters.clone()),
)
.await
}
}
}
@ -182,6 +191,15 @@ async fn feed_data_by_program(
config: &SourceConfig,
program_id: String,
sender: async_channel::Sender<WebsocketMessage>,
) -> anyhow::Result<()> {
feed_data_by_program_and_filters(config, program_id, sender, None).await
}
async fn feed_data_by_program_and_filters(
config: &SourceConfig,
program_id: String,
sender: async_channel::Sender<WebsocketMessage>,
filters: Option<Vec<FeedFilterType>>,
) -> anyhow::Result<()> {
debug!("feed_data_by_program");
@ -198,7 +216,7 @@ async fn feed_data_by_program(
min_context_slot: None,
};
let program_accounts_config = RpcProgramAccountsConfig {
filters: None,
filters: filters.map(|v| v.iter().map(|f| f.to_rpc_filter()).collect()),
with_context: Some(true),
account_config: account_info_config.clone(),
};
@ -292,16 +310,14 @@ async fn feed_data_by_program(
// TODO: rename / split / rework
pub async fn process_events(
config: &SourceConfig,
filter_config: &FilterConfig,
config: SourceConfig,
filter_config: FilterConfig,
account_write_queue_sender: async_channel::Sender<AccountWrite>,
slot_queue_sender: async_channel::Sender<SlotUpdate>,
) {
// Subscribe to program account updates websocket
let (update_sender, update_receiver) = async_channel::unbounded::<WebsocketMessage>();
info!("using config {config:?}");
let config = config.clone();
let filter_config = filter_config.clone();
tokio::spawn(async move {
// if the websocket disconnects, we get no data in a while etc, reconnect and try again
loop {
@ -342,7 +358,7 @@ pub async fn process_events(
.expect("send success");
}
WebsocketMessage::SnapshotUpdate((slot, accounts)) => {
trace!("snapshot update {slot}");
debug!("snapshot update {slot}");
for (pubkey, account) in accounts {
if let Some(account) = account {
let pubkey = Pubkey::from_str(&pubkey).unwrap();