diff --git a/README.md b/README.md index 6e35185..18a5165 100644 --- a/README.md +++ b/README.md @@ -27,24 +27,26 @@ See [proto/geyser.proto](proto/geyser.proto). #### Slots - - `enabled` — broadcast slots updates +Currently all slots are broadcasted. #### Account Accounts can be filtered by: - - `any` — stream all accounts, if active all other filters should be disabled - `account` — acount Pubkey, match to any Pubkey from the array - `owner` — account owner Pubkey, match to any Pubkey from the array -All fields in filter are optional but at least 1 is required. Fields works as logical `AND`. Values in the arrays works as logical `OR`. +If all fields are empty then all accounts are broadcasted. Otherwise fields works as logical `AND` and values in arrays as logical `OR`. #### Transactions - - `any` — stream all transactions - `vote` — enable/disable broadcast `vote` transactions - `failed` — enable/disable broadcast `failed` transactions + - `accounts_include` — filter transactions which use any account + - `accounts_exclude` — filter transactions which do not use any account + +If all fields are empty then all transactions are broadcasted. Otherwise fields works as logical `AND` and values in arrays as logical `OR`. #### Blocks - - `any` — stream all blocks +Currently all blocks are broadcasted. diff --git a/proto/geyser.proto b/proto/geyser.proto index 5abf984..367c05d 100644 --- a/proto/geyser.proto +++ b/proto/geyser.proto @@ -16,24 +16,20 @@ message SubscribeRequest { } message SubscribeRequestFilterAccounts { - bool any = 1; repeated string account = 2; repeated string owner = 3; } -message SubscribeRequestFilterSlots { - bool any = 1; -} +message SubscribeRequestFilterSlots {} message SubscribeRequestFilterTransactions { - bool any = 1; - bool vote = 2; - bool failed = 3; + optional bool vote = 1; + optional bool failed = 2; + repeated string accounts_include = 3; + repeated string accounts_exclude = 4; } -message SubscribeRequestFilterBlocks { - bool any = 1; -} +message SubscribeRequestFilterBlocks {} message SubscribeUpdate { repeated string filters = 1; diff --git a/src/bin/client.rs b/src/bin/client.rs index 881af78..8d2862b 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -39,11 +39,11 @@ struct Args { #[clap(short, long)] /// Filter vote transactions - vote: bool, + vote: Option, #[clap(short, long)] /// Filter failed transactions - failed: bool, + failed: Option, #[clap(long)] /// Subscribe on block updates @@ -59,7 +59,6 @@ async fn main() -> anyhow::Result<()> { accounts.insert( "client".to_owned(), SubscribeRequestFilterAccounts { - any: args.account.is_empty() && args.owner.is_empty(), account: args.account, owner: args.owner, }, @@ -68,10 +67,7 @@ async fn main() -> anyhow::Result<()> { let mut slots = HashMap::new(); if args.slots { - slots.insert( - "client".to_owned(), - SubscribeRequestFilterSlots { any: true }, - ); + slots.insert("client".to_owned(), SubscribeRequestFilterSlots {}); } let mut transactions = HashMap::new(); @@ -79,19 +75,17 @@ async fn main() -> anyhow::Result<()> { transactions.insert( "client".to_string(), SubscribeRequestFilterTransactions { - any: true, vote: args.vote, failed: args.failed, + accounts_include: vec![], + accounts_exclude: vec![], }, ); } let mut blocks = HashMap::new(); if args.blocks { - blocks.insert( - "client".to_owned(), - SubscribeRequestFilterBlocks { any: true }, - ); + blocks.insert("client".to_owned(), SubscribeRequestFilterBlocks {}); } let mut client = GeyserClient::connect(args.endpoint).await?; diff --git a/src/filters.rs b/src/filters.rs index c8da9ed..33e6bb7 100644 --- a/src/filters.rs +++ b/src/filters.rs @@ -47,22 +47,9 @@ impl Filter { } } -#[derive(Debug)] -struct FilterAccountsExistence { - any: bool, - account: bool, - owner: bool, -} - -impl FilterAccountsExistence { - fn is_empty(&self) -> bool { - !(self.account || self.owner) - } -} - #[derive(Debug, Default)] struct FilterAccounts { - filters: HashMap, + filters: Vec, account: HashMap>, account_required: HashSet, owner: HashMap>, @@ -77,38 +64,31 @@ impl TryFrom<&HashMap> for FilterAccount ) -> Result { let mut this = Self::default(); for (name, filter) in configs { - anyhow::ensure!( - !filter.any || filter.account.is_empty() && filter.owner.is_empty(), - "`any` does not allow non-empty `accout` and `owner`" + Self::set( + &mut this.account, + &mut this.account_required, + name, + filter + .account + .iter() + .map(|v| Pubkey::from_str(v)) + .collect::, _>>()? + .into_iter(), ); - let existence = FilterAccountsExistence { - any: filter.any, - account: Self::set( - &mut this.account, - &mut this.account_required, - name, - filter - .account - .iter() - .map(|v| Pubkey::from_str(v)) - .collect::, _>>()? - .into_iter(), - ), - owner: Self::set( - &mut this.owner, - &mut this.owner_required, - name, - filter - .owner - .iter() - .map(|v| Pubkey::from_str(v)) - .collect::, _>>()? - .into_iter(), - ), - }; + Self::set( + &mut this.owner, + &mut this.owner_required, + name, + filter + .owner + .iter() + .map(|v| Pubkey::from_str(v)) + .collect::, _>>()? + .into_iter(), + ); - this.filters.insert(name.clone(), existence); + this.filters.push(name.clone()); } Ok(this) } @@ -191,15 +171,7 @@ impl<'a> FilterAccountsMatch<'a> { self.filter .filters .iter() - .filter_map(|(name, existence)| { - if existence.any { - return Some(name.clone()); - } - - if existence.is_empty() { - return None; - } - + .filter_map(|name| { let name = name.as_str(); let af = &self.filter; @@ -231,15 +203,8 @@ impl TryFrom<&HashMap> for FilterSlots { Ok(FilterSlots { filters: configs .iter() - .filter_map( - |(name, filter)| { - if filter.any { - Some(name.clone()) - } else { - None - } - }, - ) + // .filter_map(|(name, _filter)| Some(name.clone())) + .map(|(name, _filter)| name.clone()) .collect(), }) } @@ -253,9 +218,10 @@ impl FilterSlots { #[derive(Debug)] pub struct FilterTransactionsInner { - any: bool, - vote: bool, - failed: bool, + vote: Option, + failed: Option, + accounts_include: HashSet, + accounts_exclude: HashSet, } #[derive(Debug, Default)] @@ -274,9 +240,18 @@ impl TryFrom<&HashMap> for FilterTra this.filters.insert( name.clone(), FilterTransactionsInner { - any: filter.any, vote: filter.vote, failed: filter.failed, + accounts_include: filter + .accounts_include + .iter() + .map(|v| Pubkey::from_str(v)) + .collect::>()?, + accounts_exclude: filter + .accounts_exclude + .iter() + .map(|v| Pubkey::from_str(v)) + .collect::>()?, }, ); } @@ -285,30 +260,48 @@ impl TryFrom<&HashMap> for FilterTra } impl FilterTransactions { - pub fn get_filters(&self, message: &MessageTransaction) -> Vec { + pub fn get_filters( + &self, + MessageTransaction { transaction, .. }: &MessageTransaction, + ) -> Vec { self.filters .iter() .filter_map(|(name, inner)| { - let is_vote = message.transaction.is_vote; - let is_failed = message.transaction.meta.status.is_err(); - - if inner.any { - if is_vote && !inner.vote { + if let Some(is_vote) = inner.vote { + if is_vote != transaction.is_vote { return None; } - - if is_failed && !inner.failed { - return None; - } - - Some(name.clone()) - } else { - if is_vote == inner.vote && is_failed == inner.failed { - return Some(name.clone()); - } - - None } + + if let Some(is_failed) = inner.failed { + if is_failed != transaction.meta.status.is_err() { + return None; + } + } + + if !inner.accounts_include.is_empty() + && transaction + .transaction + .message() + .account_keys() + .iter() + .all(|pubkey| !inner.accounts_include.contains(pubkey)) + { + return None; + } + + if !inner.accounts_exclude.is_empty() + && transaction + .transaction + .message() + .account_keys() + .iter() + .any(|pubkey| inner.accounts_exclude.contains(pubkey)) + { + return None; + } + + Some(name.clone()) }) .collect() } @@ -328,15 +321,8 @@ impl TryFrom<&HashMap> for FilterBlocks { Ok(FilterBlocks { filters: configs .iter() - .filter_map( - |(name, filter)| { - if filter.any { - Some(name.clone()) - } else { - None - } - }, - ) + // .filter_map(|(name, _filter)| Some(name.clone())) + .map(|(name, _filter)| name.clone()) .collect(), }) } diff --git a/src/grpc.rs b/src/grpc.rs index ca45c48..60f6aae 100644 --- a/src/grpc.rs +++ b/src/grpc.rs @@ -18,7 +18,7 @@ use { }, solana_sdk::{ clock::UnixTimestamp, pubkey::Pubkey, signature::Signature, - transaction::VersionedTransaction, + transaction::SanitizedTransaction, }, solana_transaction_status::{Reward, TransactionStatusMeta}, std::{ @@ -100,7 +100,7 @@ impl From<(u64, Option, SlotStatus)> for MessageSlot { pub struct MessageTransactionInfo { pub signature: Signature, pub is_vote: bool, - pub transaction: VersionedTransaction, + pub transaction: SanitizedTransaction, pub meta: TransactionStatusMeta, // pub index: usize, } @@ -118,7 +118,7 @@ impl<'a> From<(ReplicaTransactionInfoVersions<'a>, u64)> for MessageTransaction ReplicaTransactionInfoVersions::V0_0_1(info) => MessageTransactionInfo { signature: *info.signature, is_vote: info.is_vote, - transaction: info.transaction.to_versioned_transaction(), + transaction: info.transaction.clone(), meta: info.transaction_status_meta.clone(), // index: info.index, }, diff --git a/src/proto.rs b/src/proto.rs index 6c13711..e9f5bd7 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -19,27 +19,28 @@ mod convert { clock::UnixTimestamp, instruction::CompiledInstruction, message::{ - legacy::Message as LegacyMessage, v0::MessageAddressTableLookup, MessageHeader, - VersionedMessage, + legacy::Message as LegacyMessage, + v0::{LoadedMessage, MessageAddressTableLookup}, + MessageHeader, SanitizedMessage, }, pubkey::Pubkey, signature::Signature, - transaction::VersionedTransaction, + transaction::SanitizedTransaction, }, solana_transaction_status::{ InnerInstructions, Reward, RewardType, TransactionStatusMeta, TransactionTokenBalance, }, }; - impl From<&VersionedTransaction> for super::Transaction { - fn from(value: &VersionedTransaction) -> Self { + impl From<&SanitizedTransaction> for super::Transaction { + fn from(value: &SanitizedTransaction) -> Self { Self { signatures: value - .signatures + .signatures() .iter() .map(|signature| >::as_ref(signature).into()) .collect(), - message: Some((&value.message).into()), + message: Some(value.message().into()), } } } @@ -61,11 +62,11 @@ mod convert { } } - impl From<&VersionedMessage> for super::Message { - fn from(message: &VersionedMessage) -> Self { + impl From<&SanitizedMessage> for super::Message { + fn from(message: &SanitizedMessage) -> Self { match message { - VersionedMessage::Legacy(message) => Self::from(message), - VersionedMessage::V0(message) => Self { + SanitizedMessage::Legacy(message) => Self::from(message), + SanitizedMessage::V0(LoadedMessage { message, .. }) => Self { header: Some((&message.header).into()), account_keys: message .account_keys