add memcmp/datasize filter to accounts (#101)

This commit is contained in:
Kirill Fomichev 2023-03-26 21:40:39 -03:00 committed by GitHub
parent 8c4f71648c
commit e342e0b4e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 249 additions and 37 deletions

19
Cargo.lock generated
View File

@ -1403,6 +1403,12 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286"
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "hmac"
version = "0.8.1"
@ -4123,7 +4129,7 @@ dependencies = [
[[package]]
name = "yellowstone-grpc-client"
version = "1.0.1+solana.1.15.2"
version = "1.1.0+solana.1.15.2"
dependencies = [
"bytes",
"futures",
@ -4136,14 +4142,17 @@ dependencies = [
[[package]]
name = "yellowstone-grpc-client-simple"
version = "1.0.0+solana.1.15.2"
version = "1.1.0+solana.1.15.2"
dependencies = [
"anyhow",
"backoff",
"bs58",
"clap",
"env_logger 0.10.0",
"futures",
"hex",
"log",
"solana-sdk",
"tokio",
"yellowstone-grpc-client",
"yellowstone-grpc-proto",
@ -4151,10 +4160,12 @@ dependencies = [
[[package]]
name = "yellowstone-grpc-geyser"
version = "0.5.3+solana.1.15.2"
version = "0.6.0+solana.1.15.2"
dependencies = [
"anyhow",
"base64 0.21.0",
"bincode",
"bs58",
"cargo-lock",
"clap",
"futures",
@ -4179,7 +4190,7 @@ dependencies = [
[[package]]
name = "yellowstone-grpc-proto"
version = "1.0.1+solana.1.15.2"
version = "1.1.0+solana.1.15.2"
dependencies = [
"anyhow",
"prost",

View File

@ -32,8 +32,9 @@ Accounts can be filtered by:
- `account` — acount Pubkey, match to any Pubkey from the array
- `owner` — account owner Pubkey, match to any Pubkey from the array
- `filters` — same as `getProgramAccounts` filters, array of `dataSize` or `Memcmp` (bytes, base58, base64 are supported)
If all fields are empty then all accounts are broadcasted. Otherwise fields works as logical `AND` and values in arrays as logical `OR`.
If all fields are empty then all accounts are broadcasted. Otherwise fields works as logical `AND` and values in arrays as logical `OR` (except values in `filters` which works as logical `AND`).
#### Transactions

View File

@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-client-simple"
version = "1.0.0+solana.1.15.2"
version = "1.1.0+solana.1.15.2"
authors = ["Triton One"]
edition = "2021"
publish = false
@ -11,10 +11,13 @@ name = "client"
[dependencies]
anyhow = "1.0.62"
backoff = { version = "0.4.0", features = ["tokio"] }
bs58 = "0.4.0"
clap = { version = "3.2.22", features = ["cargo", "derive"] }
env_logger = "0.10.0"
futures = "0.3.24"
hex = "0.4.3"
log = { version = "0.4.14", features = ["std"] }
yellowstone-grpc-proto = { path = "../../yellowstone-grpc-proto" }
yellowstone-grpc-client = { path = "../../yellowstone-grpc-client" }
solana-sdk = "=1.15.2"
tokio = { version = "1.21.2", features = ["rt-multi-thread", "macros", "time"] }
yellowstone-grpc-client = { path = "../../yellowstone-grpc-client" }
yellowstone-grpc-proto = { path = "../../yellowstone-grpc-proto" }

View File

@ -3,12 +3,16 @@ use {
clap::Parser,
futures::{sink::SinkExt, stream::StreamExt},
log::{error, info},
solana_sdk::pubkey::Pubkey,
std::collections::HashMap,
yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError},
yellowstone_grpc_proto::prelude::{
SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks,
SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots,
SubscribeRequestFilterTransactions,
subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof,
subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof,
subscribe_update::UpdateOneof, SubscribeRequest, SubscribeRequestFilterAccounts,
SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterAccountsFilterMemcmp,
SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta,
SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, SubscribeUpdateAccount,
},
};
@ -34,6 +38,14 @@ struct Args {
#[clap(long)]
accounts_owner: Vec<String>,
/// Filter by Offset and Data, format: `offset,data in base58`
#[clap(long)]
accounts_memcmp: Vec<String>,
/// Filter by Data size
#[clap(long)]
accounts_datasize: Option<u64>,
/// Subscribe on slots updates
#[clap(long)]
slots: bool,
@ -81,6 +93,45 @@ type TransactionsFilterMap = HashMap<String, SubscribeRequestFilterTransactions>
type BlocksFilterMap = HashMap<String, SubscribeRequestFilterBlocks>;
type BlocksMetaFilterMap = HashMap<String, SubscribeRequestFilterBlocksMeta>;
#[derive(Debug)]
#[allow(dead_code)]
pub struct AccountPretty {
is_startup: bool,
slot: u64,
pubkey: Pubkey,
lamports: u64,
owner: Pubkey,
executable: bool,
rent_epoch: u64,
data: String,
write_version: u64,
txn_signature: String,
}
impl From<SubscribeUpdateAccount> for AccountPretty {
fn from(
SubscribeUpdateAccount {
is_startup,
slot,
account,
}: SubscribeUpdateAccount,
) -> Self {
let account = account.expect("should be defined");
Self {
is_startup,
slot,
pubkey: Pubkey::try_from(account.pubkey).expect("valid pubkey"),
lamports: account.lamports,
owner: Pubkey::try_from(account.owner).expect("valid pubkey"),
executable: account.executable,
rent_epoch: account.rent_epoch,
data: hex::encode(account.data),
write_version: account.write_version,
txn_signature: bs58::encode(account.txn_signature.unwrap_or_default()).into_string(),
}
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info");
@ -90,11 +141,38 @@ async fn main() -> anyhow::Result<()> {
let mut accounts: AccountFilterMap = HashMap::new();
if args.accounts {
let mut filters = vec![];
for filter in args.accounts_memcmp {
match filter.split_once(',') {
Some((offset, data)) => {
filters.push(SubscribeRequestFilterAccountsFilter {
filter: Some(AccountsFilterDataOneof::Memcmp(
SubscribeRequestFilterAccountsFilterMemcmp {
offset: offset
.parse()
.map_err(|_| anyhow::anyhow!("invalid offset"))?,
data: Some(AccountsFilterMemcmpOneof::Base58(
data.trim().to_string(),
)),
},
)),
});
}
_ => anyhow::bail!("invalid memcmp"),
}
}
if let Some(datasize) = args.accounts_datasize {
filters.push(SubscribeRequestFilterAccountsFilter {
filter: Some(AccountsFilterDataOneof::Datasize(datasize)),
});
}
accounts.insert(
"client".to_owned(),
SubscribeRequestFilterAccounts {
account: args.accounts_account,
owner: args.accounts_owner,
filters,
},
);
}
@ -162,7 +240,21 @@ async fn main() -> anyhow::Result<()> {
let mut counter = 0;
while let Some(message) = stream.next().await {
match message {
Ok(message) => info!("new message: {:?}", message),
Ok(msg) => {
#[allow(clippy::single_match)]
match msg.update_oneof {
Some(UpdateOneof::Account(account)) => {
let account: AccountPretty = account.into();
info!(
"new account update: filters {:?}, account: {:#?}",
msg.filters, account
);
continue;
}
_ => {}
}
info!("new message: {:?}", msg)
}
Err(error) => error!("error: {:?}", error),
}

View File

@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-client"
version = "1.0.1+solana.1.15.2"
version = "1.1.0+solana.1.15.2"
authors = ["Triton One"]
edition = "2021"
description = "Yellowstone gRPC Geyser Simple Client"
@ -15,7 +15,7 @@ futures = "0.3.24"
http = "0.2.8"
thiserror = "1.0"
tonic = { version = "0.8.2", features = ["gzip", "tls", "tls-roots"] }
yellowstone-grpc-proto = { path = "../yellowstone-grpc-proto", version = "1.0.1+solana.1.15.2" }
yellowstone-grpc-proto = { path = "../yellowstone-grpc-proto", version = "1.1.0+solana.1.15.2" }
[dev-dependencies]
tokio = { version = "1.21.2", features = ["macros"] }

View File

@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-geyser"
version = "0.5.3+solana.1.15.2"
version = "0.6.0+solana.1.15.2"
authors = ["Triton One"]
edition = "2021"
description = "Yellowstone gRPC Geyser Plugin"
@ -14,7 +14,9 @@ name = "config-check"
[dependencies]
anyhow = "1.0.62"
base64 = "0.21.0"
bincode = "1.3.3"
bs58 = "0.4.0"
clap = { version = "3.2.22", features = ["cargo", "derive"] }
futures = "0.3.24"
hyper = { version = "0.14.20", features = ["server"] }

View File

@ -9,15 +9,17 @@ use {
MessageTransaction,
},
proto::{
SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks,
SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots,
SubscribeRequestFilterTransactions,
subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof,
subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof,
SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterAccountsFilter,
SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta,
SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions,
},
},
base64::{engine::general_purpose::STANDARD as base64_engine, Engine},
solana_sdk::{pubkey::Pubkey, signature::Signature},
std::{
collections::{HashMap, HashSet},
hash::Hash,
iter::FromIterator,
str::FromStr,
},
@ -80,7 +82,7 @@ impl Filter {
#[derive(Debug, Default)]
struct FilterAccounts {
filters: Vec<String>,
filters: Vec<(String, FilterAccountsData)>,
account: HashMap<Pubkey, HashSet<String>>,
account_required: HashSet<String>,
owner: HashMap<Pubkey, HashSet<String>>,
@ -121,7 +123,8 @@ impl FilterAccounts {
Filter::decode_pubkeys(&filter.owner, limit.map(|v| &v.owner_reject))?,
);
this.filters.push(name.clone());
this.filters
.push((name.clone(), FilterAccountsData::new(&filter.filters)?));
}
Ok(this)
}
@ -149,15 +152,93 @@ impl FilterAccounts {
let mut filter = FilterAccountsMatch::new(self);
filter.match_account(&message.account.pubkey);
filter.match_owner(&message.account.owner);
filter.match_data(&message.account.data);
filter.get_filters()
}
}
#[derive(Debug, Default)]
struct FilterAccountsData {
memcmp: Vec<(usize, Vec<u8>)>,
datasize: Option<usize>,
}
impl FilterAccountsData {
fn new(filters: &[SubscribeRequestFilterAccountsFilter]) -> anyhow::Result<Self> {
const MAX_FILTERS: usize = 4;
const MAX_DATA_SIZE: usize = 128;
const MAX_DATA_BASE58_SIZE: usize = 175;
const MAX_DATA_BASE64_SIZE: usize = 172;
anyhow::ensure!(
filters.len() <= MAX_FILTERS,
"Too many filters provided; max {MAX_FILTERS}"
);
let mut this = Self::default();
for filter in filters {
match &filter.filter {
Some(AccountsFilterDataOneof::Memcmp(memcmp)) => {
let data = match &memcmp.data {
Some(AccountsFilterMemcmpOneof::Bytes(data)) => data.clone(),
Some(AccountsFilterMemcmpOneof::Base58(data)) => {
anyhow::ensure!(data.len() <= MAX_DATA_BASE58_SIZE, "data too large");
bs58::decode(data)
.into_vec()
.map_err(|_| anyhow::anyhow!("invalid base58"))?
}
Some(AccountsFilterMemcmpOneof::Base64(data)) => {
anyhow::ensure!(data.len() <= MAX_DATA_BASE64_SIZE, "data too large");
base64_engine
.decode(data)
.map_err(|_| anyhow::anyhow!("invalid base64"))?
}
None => anyhow::bail!("data for memcmp should be defined"),
};
anyhow::ensure!(data.len() <= MAX_DATA_SIZE, "data too large");
this.memcmp.push((memcmp.offset as usize, data));
}
Some(AccountsFilterDataOneof::Datasize(datasize)) => {
anyhow::ensure!(
this.datasize.replace(*datasize as usize).is_none(),
"datasize used more than once",
);
}
None => {
anyhow::bail!("filter should be defined");
}
}
}
Ok(this)
}
fn is_empty(&self) -> bool {
self.memcmp.is_empty() && self.datasize.is_none()
}
fn is_match(&self, data: &[u8]) -> bool {
if matches!(self.datasize, Some(datasize) if data.len() != datasize) {
return false;
}
for (offset, bytes) in self.memcmp.iter() {
if data.len() < *offset + bytes.len() {
return false;
}
let data = &data[*offset..*offset + bytes.len()];
if data != bytes {
return false;
}
}
true
}
}
#[derive(Debug)]
pub struct FilterAccountsMatch<'a> {
filter: &'a FilterAccounts,
account: HashSet<String>,
owner: HashSet<String>,
account: HashSet<&'a str>,
owner: HashSet<&'a str>,
data: HashSet<&'a str>,
}
impl<'a> FilterAccountsMatch<'a> {
@ -166,39 +247,41 @@ impl<'a> FilterAccountsMatch<'a> {
filter,
account: Default::default(),
owner: Default::default(),
data: Default::default(),
}
}
fn extend<Q: Hash + Eq>(
set: &mut HashSet<String>,
map: &HashMap<Q, HashSet<String>>,
key: &Q,
) -> bool {
fn extend(set: &mut HashSet<&'a str>, map: &'a HashMap<Pubkey, HashSet<String>>, key: &Pubkey) {
if let Some(names) = map.get(key) {
for name in names {
if !set.contains(name) {
set.insert(name.clone());
if !set.contains(name.as_str()) {
set.insert(name);
}
}
true
} else {
false
}
}
pub fn match_account(&mut self, pubkey: &Pubkey) -> bool {
pub fn match_account(&mut self, pubkey: &Pubkey) {
Self::extend(&mut self.account, &self.filter.account, pubkey)
}
pub fn match_owner(&mut self, pubkey: &Pubkey) -> bool {
pub fn match_owner(&mut self, pubkey: &Pubkey) {
Self::extend(&mut self.owner, &self.filter.owner, pubkey)
}
pub fn match_data(&mut self, data: &[u8]) {
for (name, filter) in self.filter.filters.iter() {
if filter.is_match(data) {
self.data.insert(name);
}
}
}
pub fn get_filters(&self) -> Vec<String> {
self.filter
.filters
.iter()
.filter_map(|name| {
.filter_map(|(name, filter)| {
let name = name.as_str();
let af = &self.filter;
@ -209,6 +292,9 @@ impl<'a> FilterAccountsMatch<'a> {
if af.owner_required.contains(name) && !self.owner.contains(name) {
return None;
}
if !filter.is_empty() && !self.data.contains(name) {
return None;
}
Some(name.to_string())
})

View File

@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-proto"
version = "1.0.1+solana.1.15.2"
version = "1.1.0+solana.1.15.2"
authors = ["Triton One"]
edition = "2021"
description = "Yellowstone gRPC Geyser Protobuf Definitions"

View File

@ -21,6 +21,23 @@ message SubscribeRequest {
message SubscribeRequestFilterAccounts {
repeated string account = 2;
repeated string owner = 3;
repeated SubscribeRequestFilterAccountsFilter filters = 4;
}
message SubscribeRequestFilterAccountsFilter {
oneof filter {
SubscribeRequestFilterAccountsFilterMemcmp memcmp = 1;
uint64 datasize = 2;
}
}
message SubscribeRequestFilterAccountsFilterMemcmp {
uint64 offset = 1;
oneof data {
bytes bytes = 2;
string base58 = 3;
string base64 = 4;
}
}
message SubscribeRequestFilterSlots {}