From a0aa76b51d4952f9a13521638d130cc9ced58945 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Wed, 19 Oct 2022 19:19:59 -0300 Subject: [PATCH] change accounts filter --- Cargo.lock | 2 +- Cargo.toml | 2 +- README.md | 17 ++++ config.json | 2 +- proto/geyser.proto | 23 +++-- src/bin/client.rs | 31 ++++-- src/filters.rs | 239 +++++++++++++++++++++++++++++++++++++-------- src/grpc.rs | 157 +++++++++++++++++++++++++---- src/plugin.rs | 44 ++------- 9 files changed, 403 insertions(+), 114 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c7d7ba8..6655974 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2398,7 +2398,6 @@ name = "solana-geyser-grpc" version = "0.1.0" dependencies = [ "anyhow", - "bs58", "cargo-lock", "clap", "futures", @@ -2412,6 +2411,7 @@ dependencies = [ "serde_json", "solana-geyser-plugin-interface", "solana-logger", + "solana-sdk", "tokio", "tokio-stream", "tonic", diff --git a/Cargo.toml b/Cargo.toml index 6d69bb1..0b55a49 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,6 @@ crate-type = ["cdylib", "rlib"] [dependencies] anyhow = "1" -bs58 = "0.4" clap = { version = "3", features = ["cargo", "derive"] } futures = "0.3" hyper = { version = "0.14", features = ["server"] } @@ -21,6 +20,7 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" solana-geyser-plugin-interface = "=1.10.40" solana-logger = "=1.10.40" +solana-sdk = "=1.10.40" tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "time"] } tokio-stream = "0.1" tonic = { version = "0.8", features = ["gzip"] } diff --git a/README.md b/README.md index bf6dcf7..84ed8f5 100644 --- a/README.md +++ b/README.md @@ -22,3 +22,20 @@ new message: Ok(SubscribeUpdate { update_oneof: Some(Account(SubscribeUpdateAcco new message: Ok(SubscribeUpdate { update_oneof: Some(Slot(SubscribeUpdateSlot { slot: 3159, parent: None, status: Confirmed })) }) ^C ``` + +### Filters + +See [proto/geyser.proto](proto/geyser.proto). + +#### Slots + + - `enabled` — broadcast slots updates + +#### Account + +Accounts can be filtered by: + + - `account` — acount Pubkey, match to any Pubkey from the array + - `owner` — account owner Pubkey, match to any Pubkey from the array + +All fields in filter are optional, if all filters are empty then all accounts broadcasted. Fields works as logical `AND`. Values in the arrays works as logical `OR`. diff --git a/config.json b/config.json index fe76690..37ef6ce 100644 --- a/config.json +++ b/config.json @@ -1,5 +1,5 @@ { - "libpath": "../target/debug/libsolana_geyser_grpc.so", + "libpath": "target/debug/libsolana_geyser_grpc.so", "log": { "level": "info" }, diff --git a/proto/geyser.proto b/proto/geyser.proto index 654c68b..6369dd7 100644 --- a/proto/geyser.proto +++ b/proto/geyser.proto @@ -7,9 +7,18 @@ service Geyser { } message SubscribeRequest { - bool any = 1; - repeated string accounts = 2; - repeated string owners = 3; + SubscribeRequestSlots slots = 1; + repeated SubscribeRequestAccounts accounts = 2; +} + +message SubscribeRequestSlots { + bool enabled = 1; +} + +message SubscribeRequestAccounts { + string filter = 1; + repeated string account = 2; + repeated string owner = 3; } message SubscribeUpdate { @@ -20,9 +29,10 @@ message SubscribeUpdate { } message SubscribeUpdateAccount { - SubscribeUpdateAccountInfo account = 1; - uint64 slot = 2; - bool is_startup = 3; + repeated string filters = 1; + SubscribeUpdateAccountInfo account = 2; + uint64 slot = 3; + bool is_startup = 4; } message SubscribeUpdateAccountInfo { @@ -33,6 +43,7 @@ message SubscribeUpdateAccountInfo { uint64 rent_epoch = 5; bytes data = 6; uint64 write_version = 7; + optional bytes txn_signature = 8; } message SubscribeUpdateSlot { diff --git a/src/bin/client.rs b/src/bin/client.rs index cfdf734..51498be 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -1,7 +1,10 @@ use { clap::Parser, futures::stream::StreamExt, - solana_geyser_grpc::grpc::proto::{geyser_client::GeyserClient, SubscribeRequest}, + solana_geyser_grpc::grpc::proto::{ + geyser_client::GeyserClient, SubscribeRequest, SubscribeRequestAccounts, + SubscribeRequestSlots, + }, tonic::Request, }; @@ -13,14 +16,14 @@ struct Args { endpoint: String, #[clap(short, long)] - /// Stream all accounts - any: bool, + /// Subscribe on slots updates + slots: bool, - #[clap(short, long, conflicts_with = "any")] + #[clap(short, long)] /// Filter by Account Pubkey - accounts: Vec, + account: Vec, - #[clap(short, long, conflicts_with = "any")] + #[clap(short, long)] /// Filter by Owner Pubkey owner: Vec, } @@ -31,16 +34,24 @@ async fn main() -> anyhow::Result<()> { let mut client = GeyserClient::connect(args.endpoint).await?; let request = Request::new(SubscribeRequest { - any: args.any, - accounts: args.accounts, - owners: args.owner, + slots: Some(SubscribeRequestSlots { + enabled: args.slots, + }), + accounts: vec![SubscribeRequestAccounts { + filter: "client".to_owned(), + account: args.account, + owner: args.owner, + }], }); let response = client.subscribe(request).await?; let mut stream = response.into_inner(); println!("stream opened"); while let Some(message) = stream.next().await { - println!("new message: {:?}", message); + match message { + Ok(message) => println!("new message: {:?}", message), + Err(error) => eprintln!("error: {:?}", error), + } } println!("stream closed"); diff --git a/src/filters.rs b/src/filters.rs index 47abdf1..1fe5f6a 100644 --- a/src/filters.rs +++ b/src/filters.rs @@ -1,49 +1,210 @@ -use std::collections::HashSet; +use { + crate::grpc::proto::{SubscribeRequest, SubscribeRequestAccounts, SubscribeRequestSlots}, + solana_sdk::pubkey::Pubkey, + std::{ + collections::{HashMap, HashSet}, + convert::{From, TryFrom}, + hash::Hash, + str::FromStr, + }, +}; -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct AccountsFilter { - any: bool, - accounts: HashSet>, - owners: HashSet>, +#[derive(Debug, Default)] +struct FilterSlots { + enabled: bool, } -impl Default for AccountsFilter { - fn default() -> Self { - Self { - any: true, - accounts: HashSet::default(), - owners: HashSet::default(), +impl From<&SubscribeRequestSlots> for FilterSlots { + fn from(config: &SubscribeRequestSlots) -> Self { + FilterSlots { + enabled: config.enabled, } } } -impl AccountsFilter { - pub fn new(any: bool, accounts: &[T1], owners: &[T2]) -> anyhow::Result - where - for<'a> T1: AsRef<[u8]> + std::cmp::PartialEq<&'a str> + std::fmt::Debug, - T2: AsRef<[u8]> + std::fmt::Debug, - { - anyhow::ensure!( - !any || accounts.is_empty() && owners.is_empty(), - "`any` is not allow non-empty `accouts` and `owners`" - ); - anyhow::ensure!(accounts.len() < 10_000, "Maximum 10k accounts are allowed"); - anyhow::ensure!(owners.len() < 10_000, "Maximum 10k owners are allowed"); +#[derive(Debug)] +struct FilterAccountsExistence { + account: bool, + owner: bool, +} - Ok(AccountsFilter { - any, - accounts: accounts - .iter() - .map(|key| bs58::decode(key).into_vec()) - .collect::>()?, - owners: owners - .iter() - .map(|key| bs58::decode(key).into_vec()) - .collect::>()?, - }) - } - - pub fn is_account_selected(&self, account: &[u8], owner: &[u8]) -> bool { - self.any || self.accounts.contains(account) || self.owners.contains(owner) +impl FilterAccountsExistence { + fn is_empty(&self) -> bool { + !(self.account || self.owner) + } +} + +#[derive(Debug, Default)] +struct FilterAccounts { + filters: HashMap, + account: HashMap>, + account_required: HashSet, + owner: HashMap>, + owner_required: HashSet, +} + +impl TryFrom<&Vec> for FilterAccounts { + type Error = anyhow::Error; + + fn try_from(configs: &Vec) -> Result { + let mut this = Self::default(); + for config in configs { + let existence = FilterAccountsExistence { + account: Self::set( + &mut this.account, + &mut this.account_required, + &config.filter, + config + .account + .iter() + .map(|v| Pubkey::from_str(v)) + .collect::, _>>()? + .into_iter(), + ), + owner: Self::set( + &mut this.owner, + &mut this.owner_required, + &config.filter, + config + .owner + .iter() + .map(|v| Pubkey::from_str(v)) + .collect::, _>>()? + .into_iter(), + ), + }; + + anyhow::ensure!( + this.filters + .insert(config.filter.clone(), existence) + .is_none(), + "filter {} duplicated", + config.filter + ); + } + Ok(this) + } +} + +impl FilterAccounts { + fn set( + map: &mut HashMap>, + map_required: &mut HashSet, + name: &str, + keys: I, + ) -> bool + where + Q: Hash + Eq + Clone, + I: Iterator, + { + let mut required = false; + for key in keys { + if map.entry(key).or_default().insert(name.to_string()) { + required = true; + } + } + + if required { + map_required.insert(name.to_string()); + } + required + } +} + +#[derive(Debug)] +pub struct Filter { + slots: FilterSlots, + accounts: FilterAccounts, +} + +impl TryFrom<&SubscribeRequest> for Filter { + type Error = anyhow::Error; + + fn try_from(config: &SubscribeRequest) -> Result { + Ok(Self { + slots: config + .slots + .as_ref() + .map(FilterSlots::from) + .unwrap_or_default(), + accounts: FilterAccounts::try_from(&config.accounts)?, + }) + } +} + +impl Filter { + pub fn is_slots_enabled(&self) -> bool { + self.slots.enabled + } + + pub fn create_accounts_match(&self) -> FilterAccountsMatch { + FilterAccountsMatch::new(&self.accounts) + } +} + +#[derive(Debug)] +pub struct FilterAccountsMatch<'a> { + filter: &'a FilterAccounts, + account: HashSet, + owner: HashSet, +} + +impl<'a> FilterAccountsMatch<'a> { + fn new(filter: &'a FilterAccounts) -> Self { + Self { + filter, + account: Default::default(), + owner: Default::default(), + } + } + + fn extend( + set: &mut HashSet, + map: &HashMap>, + key: &Q, + ) -> bool { + if let Some(names) = map.get(key) { + for name in names { + if !set.contains(name) { + set.insert(name.clone()); + } + } + true + } else { + false + } + } + + pub fn match_account(&mut self, pubkey: &Pubkey) -> bool { + Self::extend(&mut self.account, &self.filter.account, pubkey) + } + + pub fn match_owner(&mut self, pubkey: &Pubkey) -> bool { + Self::extend(&mut self.owner, &self.filter.owner, pubkey) + } + + pub fn get_filters(&self) -> Vec { + self.filter + .filters + .iter() + .filter_map(|(name, existence)| { + if existence.is_empty() { + return Some(name.clone()); + } + + let name = name.as_str(); + let af = &self.filter; + + // If filter name in required but not in matched => return `false` + if af.account_required.contains(name) && !self.account.contains(name) { + return None; + } + if af.owner_required.contains(name) && !self.owner.contains(name) { + return None; + } + + Some(name.to_string()) + }) + .collect() } } diff --git a/src/grpc.rs b/src/grpc.rs index a0f58bf..8865a5a 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -1,15 +1,20 @@ use { crate::{ config::ConfigGrpc, - filters::AccountsFilter, + filters::Filter, grpc::proto::{ geyser_server::{Geyser, GeyserServer}, subscribe_update::UpdateOneof, SubscribeRequest, SubscribeUpdate, SubscribeUpdateAccount, SubscribeUpdateAccountInfo, + SubscribeUpdateSlot, SubscribeUpdateSlotStatus, }, prom::CONNECTIONS_TOTAL, }, log::*, + solana_geyser_plugin_interface::geyser_plugin_interface::{ + ReplicaAccountInfoVersions, SlotStatus, + }, + solana_sdk::{pubkey::Pubkey, signature::Signature}, std::{ collections::HashMap, sync::atomic::{AtomicUsize, Ordering}, @@ -28,10 +33,111 @@ pub mod proto { tonic::include_proto!("geyser"); } +#[derive(Debug)] +pub struct UpdateAccountMessageAccount { + pub pubkey: Pubkey, + pub lamports: u64, + pub owner: Pubkey, + pub executable: bool, + pub rent_epoch: u64, + pub data: Vec, + pub write_version: u64, + pub txn_signature: Option, +} + +#[derive(Debug)] +pub struct UpdateAccountMessage { + pub account: UpdateAccountMessageAccount, + pub slot: u64, + pub is_startup: bool, +} + +impl<'a> From<(ReplicaAccountInfoVersions<'a>, u64, bool)> for UpdateAccountMessage { + fn from((account, slot, is_startup): (ReplicaAccountInfoVersions<'a>, u64, bool)) -> Self { + Self { + account: match account { + ReplicaAccountInfoVersions::V0_0_1(info) => UpdateAccountMessageAccount { + pubkey: Pubkey::new(info.pubkey), + lamports: info.lamports, + owner: Pubkey::new(info.owner), + executable: info.executable, + rent_epoch: info.rent_epoch, + data: info.data.into(), + write_version: info.write_version, + txn_signature: None, + }, + }, + slot, + is_startup, + } + } +} + +impl UpdateAccountMessage { + fn with_filters(&self, filters: Vec) -> SubscribeUpdate { + SubscribeUpdate { + update_oneof: Some(UpdateOneof::Account(SubscribeUpdateAccount { + account: Some(SubscribeUpdateAccountInfo { + pubkey: self.account.pubkey.as_ref().into(), + lamports: self.account.lamports, + owner: self.account.owner.as_ref().into(), + executable: self.account.executable, + rent_epoch: self.account.rent_epoch, + data: self.account.data.clone(), + write_version: self.account.write_version, + txn_signature: self.account.txn_signature.map(|sig| sig.as_ref().into()), + }), + slot: self.slot, + is_startup: self.is_startup, + filters, + })), + } + } +} + +#[derive(Debug)] +pub struct UpdateSlotMessage { + slot: u64, + parent: Option, + status: SubscribeUpdateSlotStatus, +} + +impl From<(u64, Option, SlotStatus)> for UpdateSlotMessage { + fn from((slot, parent, status): (u64, Option, SlotStatus)) -> Self { + Self { + slot, + parent, + status: match status { + SlotStatus::Processed => SubscribeUpdateSlotStatus::Processed, + SlotStatus::Confirmed => SubscribeUpdateSlotStatus::Confirmed, + SlotStatus::Rooted => SubscribeUpdateSlotStatus::Rooted, + }, + } + } +} + +impl From<&UpdateSlotMessage> for SubscribeUpdate { + fn from(msg: &UpdateSlotMessage) -> Self { + Self { + update_oneof: Some(UpdateOneof::Slot(SubscribeUpdateSlot { + slot: msg.slot, + parent: msg.parent, + status: msg.status as i32, + })), + } + } +} + +#[derive(Debug)] +pub enum Message { + UpdateAccount(UpdateAccountMessage), + UpdateSlot(UpdateSlotMessage), +} + #[derive(Debug)] struct ClientConnection { id: usize, - accounts_filter: AccountsFilter, + filter: Filter, stream_tx: mpsc::Sender>, } @@ -46,7 +152,7 @@ impl GrpcService { pub fn create( config: ConfigGrpc, ) -> Result< - (mpsc::UnboundedSender, oneshot::Sender<()>), + (mpsc::UnboundedSender, oneshot::Sender<()>), Box, > { // Bind service address @@ -86,7 +192,7 @@ impl GrpcService { } async fn send_loop( - mut update_channel_rx: mpsc::UnboundedReceiver, + mut update_channel_rx: mpsc::UnboundedReceiver, mut new_clients_rx: mpsc::UnboundedReceiver, ) { let mut clients: HashMap = HashMap::new(); @@ -97,20 +203,32 @@ impl GrpcService { let mut ids_closed = vec![]; for client in clients.values() { - let message = match &message.update_oneof { - Some(UpdateOneof::Account(SubscribeUpdateAccount { - account: Some(SubscribeUpdateAccountInfo { pubkey, owner,.. }), - .. - })) if !client.accounts_filter.is_account_selected(pubkey, owner) => { - continue; - } - _ => message.clone(), - }; + if let Some(message) = match &message { + Message::UpdateAccount(message) => { + let mut filter = client.filter.create_accounts_match(); + filter.match_account(&message.account.pubkey); + filter.match_owner(&message.account.owner); - match client.stream_tx.try_send(Ok(message)) { - Ok(()) => {}, - Err(mpsc::error::TrySendError::Full(_)) => ids_full.push(client.id), - Err(mpsc::error::TrySendError::Closed(_)) => ids_closed.push(client.id), + let filters = filter.get_filters(); + if !filters.is_empty() { + Some(message.with_filters(filters)) + } else { + None + } + } + Message::UpdateSlot(message) => { + if client.filter.is_slots_enabled() { + Some(message.into()) + } else { + None + } + } + } { + match client.stream_tx.try_send(Ok(message)) { + Ok(()) => {}, + Err(mpsc::error::TrySendError::Full(_)) => ids_full.push(client.id), + Err(mpsc::error::TrySendError::Closed(_)) => ids_closed.push(client.id), + } } } @@ -152,8 +270,7 @@ impl Geyser for GrpcService { let id = self.subscribe_id.fetch_add(1, Ordering::SeqCst); info!("{}, new subscriber", id); - let data = request.get_ref(); - let accounts_filter = match AccountsFilter::new(data.any, &data.accounts, &data.owners) { + let filter = match Filter::try_from(request.get_ref()) { Ok(filter) => filter, Err(error) => { let message = format!("failed to create filter: {:?}", error); @@ -165,7 +282,7 @@ impl Geyser for GrpcService { let (stream_tx, stream_rx) = mpsc::channel(self.config.channel_capacity); if let Err(_error) = self.new_clients_tx.send(ClientConnection { id, - accounts_filter, + filter, stream_tx, }) { return Err(Status::internal("")); diff --git a/src/plugin.rs b/src/plugin.rs index 6622653..744d64f 100644 --- a/src/plugin.rs +++ b/src/plugin.rs @@ -1,13 +1,7 @@ use { crate::{ config::Config, - grpc::{ - proto::{ - subscribe_update::UpdateOneof, SubscribeUpdate, SubscribeUpdateAccount, - SubscribeUpdateAccountInfo, SubscribeUpdateSlot, SubscribeUpdateSlotStatus, - }, - GrpcService, - }, + grpc::{GrpcService, Message}, prom::{PrometheusService, SLOT_STATUS}, }, solana_geyser_plugin_interface::geyser_plugin_interface::{ @@ -24,7 +18,7 @@ use { #[derive(Debug)] pub struct PluginInner { runtime: Runtime, - grpc_channel: mpsc::UnboundedSender, + grpc_channel: mpsc::UnboundedSender, grpc_shutdown_tx: oneshot::Sender<()>, prometheus: PrometheusService, } @@ -81,23 +75,9 @@ impl GeyserPlugin for Plugin { is_startup: bool, ) -> PluginResult<()> { let inner = self.inner.as_ref().expect("initialized"); - let _ = inner.grpc_channel.send(match account { - ReplicaAccountInfoVersions::V0_0_1(info) => SubscribeUpdate { - update_oneof: Some(UpdateOneof::Account(SubscribeUpdateAccount { - account: Some(SubscribeUpdateAccountInfo { - pubkey: info.pubkey.into(), - lamports: info.lamports, - owner: info.owner.into(), - executable: info.executable, - rent_epoch: info.rent_epoch, - data: info.data.into(), - write_version: info.write_version, - }), - slot, - is_startup, - })), - }, - }); + let _ = inner + .grpc_channel + .send(Message::UpdateAccount((account, slot, is_startup).into())); Ok(()) } @@ -109,17 +89,9 @@ impl GeyserPlugin for Plugin { status: SlotStatus, ) -> PluginResult<()> { let inner = self.inner.as_ref().expect("initialized"); - let _ = inner.grpc_channel.send(SubscribeUpdate { - update_oneof: Some(UpdateOneof::Slot(SubscribeUpdateSlot { - slot, - parent, - status: match status { - SlotStatus::Processed => SubscribeUpdateSlotStatus::Processed, - SlotStatus::Confirmed => SubscribeUpdateSlotStatus::Confirmed, - SlotStatus::Rooted => SubscribeUpdateSlotStatus::Rooted, - } as i32, - })), - }); + let _ = inner + .grpc_channel + .send(Message::UpdateSlot((slot, parent, status).into())); SLOT_STATUS .with_label_values(&[match status {