From e342e0b4e5bb095cb18d6dff914e4a78604285af Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Sun, 26 Mar 2023 21:40:39 -0300 Subject: [PATCH] add memcmp/datasize filter to accounts (#101) --- Cargo.lock | 19 +++- README.md | 3 +- examples/rust/Cargo.toml | 9 +- examples/rust/src/bin/client.rs | 100 ++++++++++++++++- yellowstone-grpc-client/Cargo.toml | 4 +- yellowstone-grpc-geyser/Cargo.toml | 4 +- yellowstone-grpc-geyser/src/filters.rs | 128 ++++++++++++++++++---- yellowstone-grpc-proto/Cargo.toml | 2 +- yellowstone-grpc-proto/proto/geyser.proto | 17 +++ 9 files changed, 249 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9aa2b8c..db06a27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1403,6 +1403,12 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "hmac" version = "0.8.1" @@ -4123,7 +4129,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-client" -version = "1.0.1+solana.1.15.2" +version = "1.1.0+solana.1.15.2" dependencies = [ "bytes", "futures", @@ -4136,14 +4142,17 @@ dependencies = [ [[package]] name = "yellowstone-grpc-client-simple" -version = "1.0.0+solana.1.15.2" +version = "1.1.0+solana.1.15.2" dependencies = [ "anyhow", "backoff", + "bs58", "clap", "env_logger 0.10.0", "futures", + "hex", "log", + "solana-sdk", "tokio", "yellowstone-grpc-client", "yellowstone-grpc-proto", @@ -4151,10 +4160,12 @@ dependencies = [ [[package]] name = "yellowstone-grpc-geyser" -version = "0.5.3+solana.1.15.2" +version = "0.6.0+solana.1.15.2" dependencies = [ "anyhow", + "base64 0.21.0", "bincode", + "bs58", "cargo-lock", "clap", "futures", @@ -4179,7 +4190,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-proto" -version = "1.0.1+solana.1.15.2" +version = "1.1.0+solana.1.15.2" dependencies = [ "anyhow", "prost", diff --git a/README.md b/README.md index 294c2cc..7ebd28f 100644 --- a/README.md +++ b/README.md @@ -32,8 +32,9 @@ Accounts can be filtered by: - `account` — acount Pubkey, match to any Pubkey from the array - `owner` — account owner Pubkey, match to any Pubkey from the array + - `filters` — same as `getProgramAccounts` filters, array of `dataSize` or `Memcmp` (bytes, base58, base64 are supported) -If all fields are empty then all accounts are broadcasted. Otherwise fields works as logical `AND` and values in arrays as logical `OR`. +If all fields are empty then all accounts are broadcasted. Otherwise fields works as logical `AND` and values in arrays as logical `OR` (except values in `filters` which works as logical `AND`). #### Transactions diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml index 5de01c9..254d595 100644 --- a/examples/rust/Cargo.toml +++ b/examples/rust/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-client-simple" -version = "1.0.0+solana.1.15.2" +version = "1.1.0+solana.1.15.2" authors = ["Triton One"] edition = "2021" publish = false @@ -11,10 +11,13 @@ name = "client" [dependencies] anyhow = "1.0.62" backoff = { version = "0.4.0", features = ["tokio"] } +bs58 = "0.4.0" clap = { version = "3.2.22", features = ["cargo", "derive"] } env_logger = "0.10.0" futures = "0.3.24" +hex = "0.4.3" log = { version = "0.4.14", features = ["std"] } -yellowstone-grpc-proto = { path = "../../yellowstone-grpc-proto" } -yellowstone-grpc-client = { path = "../../yellowstone-grpc-client" } +solana-sdk = "=1.15.2" tokio = { version = "1.21.2", features = ["rt-multi-thread", "macros", "time"] } +yellowstone-grpc-client = { path = "../../yellowstone-grpc-client" } +yellowstone-grpc-proto = { path = "../../yellowstone-grpc-proto" } diff --git a/examples/rust/src/bin/client.rs b/examples/rust/src/bin/client.rs index 37d81d7..17b103c 100644 --- a/examples/rust/src/bin/client.rs +++ b/examples/rust/src/bin/client.rs @@ -3,12 +3,16 @@ use { clap::Parser, futures::{sink::SinkExt, stream::StreamExt}, log::{error, info}, + solana_sdk::pubkey::Pubkey, std::collections::HashMap, yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError}, yellowstone_grpc_proto::prelude::{ - SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks, - SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, - SubscribeRequestFilterTransactions, + subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof, + subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, + subscribe_update::UpdateOneof, SubscribeRequest, SubscribeRequestFilterAccounts, + SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterAccountsFilterMemcmp, + SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, + SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, SubscribeUpdateAccount, }, }; @@ -34,6 +38,14 @@ struct Args { #[clap(long)] accounts_owner: Vec, + /// Filter by Offset and Data, format: `offset,data in base58` + #[clap(long)] + accounts_memcmp: Vec, + + /// Filter by Data size + #[clap(long)] + accounts_datasize: Option, + /// Subscribe on slots updates #[clap(long)] slots: bool, @@ -81,6 +93,45 @@ type TransactionsFilterMap = HashMap type BlocksFilterMap = HashMap; type BlocksMetaFilterMap = HashMap; +#[derive(Debug)] +#[allow(dead_code)] +pub struct AccountPretty { + is_startup: bool, + slot: u64, + pubkey: Pubkey, + lamports: u64, + owner: Pubkey, + executable: bool, + rent_epoch: u64, + data: String, + write_version: u64, + txn_signature: String, +} + +impl From for AccountPretty { + fn from( + SubscribeUpdateAccount { + is_startup, + slot, + account, + }: SubscribeUpdateAccount, + ) -> Self { + let account = account.expect("should be defined"); + Self { + is_startup, + slot, + pubkey: Pubkey::try_from(account.pubkey).expect("valid pubkey"), + lamports: account.lamports, + owner: Pubkey::try_from(account.owner).expect("valid pubkey"), + executable: account.executable, + rent_epoch: account.rent_epoch, + data: hex::encode(account.data), + write_version: account.write_version, + txn_signature: bs58::encode(account.txn_signature.unwrap_or_default()).into_string(), + } + } +} + #[tokio::main] async fn main() -> anyhow::Result<()> { std::env::set_var("RUST_LOG", "info"); @@ -90,11 +141,38 @@ async fn main() -> anyhow::Result<()> { let mut accounts: AccountFilterMap = HashMap::new(); if args.accounts { + let mut filters = vec![]; + for filter in args.accounts_memcmp { + match filter.split_once(',') { + Some((offset, data)) => { + filters.push(SubscribeRequestFilterAccountsFilter { + filter: Some(AccountsFilterDataOneof::Memcmp( + SubscribeRequestFilterAccountsFilterMemcmp { + offset: offset + .parse() + .map_err(|_| anyhow::anyhow!("invalid offset"))?, + data: Some(AccountsFilterMemcmpOneof::Base58( + data.trim().to_string(), + )), + }, + )), + }); + } + _ => anyhow::bail!("invalid memcmp"), + } + } + if let Some(datasize) = args.accounts_datasize { + filters.push(SubscribeRequestFilterAccountsFilter { + filter: Some(AccountsFilterDataOneof::Datasize(datasize)), + }); + } + accounts.insert( "client".to_owned(), SubscribeRequestFilterAccounts { account: args.accounts_account, owner: args.accounts_owner, + filters, }, ); } @@ -162,7 +240,21 @@ async fn main() -> anyhow::Result<()> { let mut counter = 0; while let Some(message) = stream.next().await { match message { - Ok(message) => info!("new message: {:?}", message), + Ok(msg) => { + #[allow(clippy::single_match)] + match msg.update_oneof { + Some(UpdateOneof::Account(account)) => { + let account: AccountPretty = account.into(); + info!( + "new account update: filters {:?}, account: {:#?}", + msg.filters, account + ); + continue; + } + _ => {} + } + info!("new message: {:?}", msg) + } Err(error) => error!("error: {:?}", error), } diff --git a/yellowstone-grpc-client/Cargo.toml b/yellowstone-grpc-client/Cargo.toml index e041629..251d4aa 100644 --- a/yellowstone-grpc-client/Cargo.toml +++ b/yellowstone-grpc-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-client" -version = "1.0.1+solana.1.15.2" +version = "1.1.0+solana.1.15.2" authors = ["Triton One"] edition = "2021" description = "Yellowstone gRPC Geyser Simple Client" @@ -15,7 +15,7 @@ futures = "0.3.24" http = "0.2.8" thiserror = "1.0" tonic = { version = "0.8.2", features = ["gzip", "tls", "tls-roots"] } -yellowstone-grpc-proto = { path = "../yellowstone-grpc-proto", version = "1.0.1+solana.1.15.2" } +yellowstone-grpc-proto = { path = "../yellowstone-grpc-proto", version = "1.1.0+solana.1.15.2" } [dev-dependencies] tokio = { version = "1.21.2", features = ["macros"] } diff --git a/yellowstone-grpc-geyser/Cargo.toml b/yellowstone-grpc-geyser/Cargo.toml index 30e9021..ec01281 100644 --- a/yellowstone-grpc-geyser/Cargo.toml +++ b/yellowstone-grpc-geyser/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-geyser" -version = "0.5.3+solana.1.15.2" +version = "0.6.0+solana.1.15.2" authors = ["Triton One"] edition = "2021" description = "Yellowstone gRPC Geyser Plugin" @@ -14,7 +14,9 @@ name = "config-check" [dependencies] anyhow = "1.0.62" +base64 = "0.21.0" bincode = "1.3.3" +bs58 = "0.4.0" clap = { version = "3.2.22", features = ["cargo", "derive"] } futures = "0.3.24" hyper = { version = "0.14.20", features = ["server"] } diff --git a/yellowstone-grpc-geyser/src/filters.rs b/yellowstone-grpc-geyser/src/filters.rs index 7c4ec9d..de437c8 100644 --- a/yellowstone-grpc-geyser/src/filters.rs +++ b/yellowstone-grpc-geyser/src/filters.rs @@ -9,15 +9,17 @@ use { MessageTransaction, }, proto::{ - SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks, - SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, - SubscribeRequestFilterTransactions, + subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof, + subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, + SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterAccountsFilter, + SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, + SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, }, }, + base64::{engine::general_purpose::STANDARD as base64_engine, Engine}, solana_sdk::{pubkey::Pubkey, signature::Signature}, std::{ collections::{HashMap, HashSet}, - hash::Hash, iter::FromIterator, str::FromStr, }, @@ -80,7 +82,7 @@ impl Filter { #[derive(Debug, Default)] struct FilterAccounts { - filters: Vec, + filters: Vec<(String, FilterAccountsData)>, account: HashMap>, account_required: HashSet, owner: HashMap>, @@ -121,7 +123,8 @@ impl FilterAccounts { Filter::decode_pubkeys(&filter.owner, limit.map(|v| &v.owner_reject))?, ); - this.filters.push(name.clone()); + this.filters + .push((name.clone(), FilterAccountsData::new(&filter.filters)?)); } Ok(this) } @@ -149,15 +152,93 @@ impl FilterAccounts { let mut filter = FilterAccountsMatch::new(self); filter.match_account(&message.account.pubkey); filter.match_owner(&message.account.owner); + filter.match_data(&message.account.data); filter.get_filters() } } +#[derive(Debug, Default)] +struct FilterAccountsData { + memcmp: Vec<(usize, Vec)>, + datasize: Option, +} + +impl FilterAccountsData { + fn new(filters: &[SubscribeRequestFilterAccountsFilter]) -> anyhow::Result { + const MAX_FILTERS: usize = 4; + const MAX_DATA_SIZE: usize = 128; + const MAX_DATA_BASE58_SIZE: usize = 175; + const MAX_DATA_BASE64_SIZE: usize = 172; + + anyhow::ensure!( + filters.len() <= MAX_FILTERS, + "Too many filters provided; max {MAX_FILTERS}" + ); + + let mut this = Self::default(); + for filter in filters { + match &filter.filter { + Some(AccountsFilterDataOneof::Memcmp(memcmp)) => { + let data = match &memcmp.data { + Some(AccountsFilterMemcmpOneof::Bytes(data)) => data.clone(), + Some(AccountsFilterMemcmpOneof::Base58(data)) => { + anyhow::ensure!(data.len() <= MAX_DATA_BASE58_SIZE, "data too large"); + bs58::decode(data) + .into_vec() + .map_err(|_| anyhow::anyhow!("invalid base58"))? + } + Some(AccountsFilterMemcmpOneof::Base64(data)) => { + anyhow::ensure!(data.len() <= MAX_DATA_BASE64_SIZE, "data too large"); + base64_engine + .decode(data) + .map_err(|_| anyhow::anyhow!("invalid base64"))? + } + None => anyhow::bail!("data for memcmp should be defined"), + }; + anyhow::ensure!(data.len() <= MAX_DATA_SIZE, "data too large"); + this.memcmp.push((memcmp.offset as usize, data)); + } + Some(AccountsFilterDataOneof::Datasize(datasize)) => { + anyhow::ensure!( + this.datasize.replace(*datasize as usize).is_none(), + "datasize used more than once", + ); + } + None => { + anyhow::bail!("filter should be defined"); + } + } + } + Ok(this) + } + + fn is_empty(&self) -> bool { + self.memcmp.is_empty() && self.datasize.is_none() + } + + fn is_match(&self, data: &[u8]) -> bool { + if matches!(self.datasize, Some(datasize) if data.len() != datasize) { + return false; + } + for (offset, bytes) in self.memcmp.iter() { + if data.len() < *offset + bytes.len() { + return false; + } + let data = &data[*offset..*offset + bytes.len()]; + if data != bytes { + return false; + } + } + true + } +} + #[derive(Debug)] pub struct FilterAccountsMatch<'a> { filter: &'a FilterAccounts, - account: HashSet, - owner: HashSet, + account: HashSet<&'a str>, + owner: HashSet<&'a str>, + data: HashSet<&'a str>, } impl<'a> FilterAccountsMatch<'a> { @@ -166,39 +247,41 @@ impl<'a> FilterAccountsMatch<'a> { filter, account: Default::default(), owner: Default::default(), + data: Default::default(), } } - fn extend( - set: &mut HashSet, - map: &HashMap>, - key: &Q, - ) -> bool { + fn extend(set: &mut HashSet<&'a str>, map: &'a HashMap>, key: &Pubkey) { if let Some(names) = map.get(key) { for name in names { - if !set.contains(name) { - set.insert(name.clone()); + if !set.contains(name.as_str()) { + set.insert(name); } } - true - } else { - false } } - pub fn match_account(&mut self, pubkey: &Pubkey) -> bool { + pub fn match_account(&mut self, pubkey: &Pubkey) { Self::extend(&mut self.account, &self.filter.account, pubkey) } - pub fn match_owner(&mut self, pubkey: &Pubkey) -> bool { + pub fn match_owner(&mut self, pubkey: &Pubkey) { Self::extend(&mut self.owner, &self.filter.owner, pubkey) } + pub fn match_data(&mut self, data: &[u8]) { + for (name, filter) in self.filter.filters.iter() { + if filter.is_match(data) { + self.data.insert(name); + } + } + } + pub fn get_filters(&self) -> Vec { self.filter .filters .iter() - .filter_map(|name| { + .filter_map(|(name, filter)| { let name = name.as_str(); let af = &self.filter; @@ -209,6 +292,9 @@ impl<'a> FilterAccountsMatch<'a> { if af.owner_required.contains(name) && !self.owner.contains(name) { return None; } + if !filter.is_empty() && !self.data.contains(name) { + return None; + } Some(name.to_string()) }) diff --git a/yellowstone-grpc-proto/Cargo.toml b/yellowstone-grpc-proto/Cargo.toml index 685e973..26baa97 100644 --- a/yellowstone-grpc-proto/Cargo.toml +++ b/yellowstone-grpc-proto/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-proto" -version = "1.0.1+solana.1.15.2" +version = "1.1.0+solana.1.15.2" authors = ["Triton One"] edition = "2021" description = "Yellowstone gRPC Geyser Protobuf Definitions" diff --git a/yellowstone-grpc-proto/proto/geyser.proto b/yellowstone-grpc-proto/proto/geyser.proto index 292d750..d21a863 100644 --- a/yellowstone-grpc-proto/proto/geyser.proto +++ b/yellowstone-grpc-proto/proto/geyser.proto @@ -21,6 +21,23 @@ message SubscribeRequest { message SubscribeRequestFilterAccounts { repeated string account = 2; repeated string owner = 3; + repeated SubscribeRequestFilterAccountsFilter filters = 4; +} + +message SubscribeRequestFilterAccountsFilter { + oneof filter { + SubscribeRequestFilterAccountsFilterMemcmp memcmp = 1; + uint64 datasize = 2; + } +} + +message SubscribeRequestFilterAccountsFilterMemcmp { + uint64 offset = 1; + oneof data { + bytes bytes = 2; + string base58 = 3; + string base64 = 4; + } } message SubscribeRequestFilterSlots {}