Add include-exclude accounts to transactions filter (#3)

This commit is contained in:
Kirill Fomichev 2022-10-24 18:11:08 -03:00 committed by GitHub
parent db50e2a4bc
commit fc8f8afb05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 112 additions and 133 deletions

View File

@ -27,24 +27,26 @@ See [proto/geyser.proto](proto/geyser.proto).
#### Slots
- `enabled` — broadcast slots updates
Currently all slots are broadcasted.
#### Account
Accounts can be filtered by:
- `any` — stream all accounts, if active all other filters should be disabled
- `account` — acount Pubkey, match to any Pubkey from the array
- `owner` — account owner Pubkey, match to any Pubkey from the array
All fields in filter are optional but at least 1 is required. Fields works as logical `AND`. Values in the arrays works as logical `OR`.
If all fields are empty then all accounts are broadcasted. Otherwise fields works as logical `AND` and values in arrays as logical `OR`.
#### Transactions
- `any` — stream all transactions
- `vote` — enable/disable broadcast `vote` transactions
- `failed` — enable/disable broadcast `failed` transactions
- `accounts_include` — filter transactions which use any account
- `accounts_exclude` — filter transactions which do not use any account
If all fields are empty then all transactions are broadcasted. Otherwise fields works as logical `AND` and values in arrays as logical `OR`.
#### Blocks
- `any` — stream all blocks
Currently all blocks are broadcasted.

View File

@ -16,24 +16,20 @@ message SubscribeRequest {
}
message SubscribeRequestFilterAccounts {
bool any = 1;
repeated string account = 2;
repeated string owner = 3;
}
message SubscribeRequestFilterSlots {
bool any = 1;
}
message SubscribeRequestFilterSlots {}
message SubscribeRequestFilterTransactions {
bool any = 1;
bool vote = 2;
bool failed = 3;
optional bool vote = 1;
optional bool failed = 2;
repeated string accounts_include = 3;
repeated string accounts_exclude = 4;
}
message SubscribeRequestFilterBlocks {
bool any = 1;
}
message SubscribeRequestFilterBlocks {}
message SubscribeUpdate {
repeated string filters = 1;

View File

@ -39,11 +39,11 @@ struct Args {
#[clap(short, long)]
/// Filter vote transactions
vote: bool,
vote: Option<bool>,
#[clap(short, long)]
/// Filter failed transactions
failed: bool,
failed: Option<bool>,
#[clap(long)]
/// Subscribe on block updates
@ -59,7 +59,6 @@ async fn main() -> anyhow::Result<()> {
accounts.insert(
"client".to_owned(),
SubscribeRequestFilterAccounts {
any: args.account.is_empty() && args.owner.is_empty(),
account: args.account,
owner: args.owner,
},
@ -68,10 +67,7 @@ async fn main() -> anyhow::Result<()> {
let mut slots = HashMap::new();
if args.slots {
slots.insert(
"client".to_owned(),
SubscribeRequestFilterSlots { any: true },
);
slots.insert("client".to_owned(), SubscribeRequestFilterSlots {});
}
let mut transactions = HashMap::new();
@ -79,19 +75,17 @@ async fn main() -> anyhow::Result<()> {
transactions.insert(
"client".to_string(),
SubscribeRequestFilterTransactions {
any: true,
vote: args.vote,
failed: args.failed,
accounts_include: vec![],
accounts_exclude: vec![],
},
);
}
let mut blocks = HashMap::new();
if args.blocks {
blocks.insert(
"client".to_owned(),
SubscribeRequestFilterBlocks { any: true },
);
blocks.insert("client".to_owned(), SubscribeRequestFilterBlocks {});
}
let mut client = GeyserClient::connect(args.endpoint).await?;

View File

@ -47,22 +47,9 @@ impl Filter {
}
}
#[derive(Debug)]
struct FilterAccountsExistence {
any: bool,
account: bool,
owner: bool,
}
impl FilterAccountsExistence {
fn is_empty(&self) -> bool {
!(self.account || self.owner)
}
}
#[derive(Debug, Default)]
struct FilterAccounts {
filters: HashMap<String, FilterAccountsExistence>,
filters: Vec<String>,
account: HashMap<Pubkey, HashSet<String>>,
account_required: HashSet<String>,
owner: HashMap<Pubkey, HashSet<String>>,
@ -77,38 +64,31 @@ impl TryFrom<&HashMap<String, SubscribeRequestFilterAccounts>> for FilterAccount
) -> Result<Self, Self::Error> {
let mut this = Self::default();
for (name, filter) in configs {
anyhow::ensure!(
!filter.any || filter.account.is_empty() && filter.owner.is_empty(),
"`any` does not allow non-empty `accout` and `owner`"
Self::set(
&mut this.account,
&mut this.account_required,
name,
filter
.account
.iter()
.map(|v| Pubkey::from_str(v))
.collect::<Result<Vec<_>, _>>()?
.into_iter(),
);
let existence = FilterAccountsExistence {
any: filter.any,
account: Self::set(
&mut this.account,
&mut this.account_required,
name,
filter
.account
.iter()
.map(|v| Pubkey::from_str(v))
.collect::<Result<Vec<_>, _>>()?
.into_iter(),
),
owner: Self::set(
&mut this.owner,
&mut this.owner_required,
name,
filter
.owner
.iter()
.map(|v| Pubkey::from_str(v))
.collect::<Result<Vec<_>, _>>()?
.into_iter(),
),
};
Self::set(
&mut this.owner,
&mut this.owner_required,
name,
filter
.owner
.iter()
.map(|v| Pubkey::from_str(v))
.collect::<Result<Vec<_>, _>>()?
.into_iter(),
);
this.filters.insert(name.clone(), existence);
this.filters.push(name.clone());
}
Ok(this)
}
@ -191,15 +171,7 @@ impl<'a> FilterAccountsMatch<'a> {
self.filter
.filters
.iter()
.filter_map(|(name, existence)| {
if existence.any {
return Some(name.clone());
}
if existence.is_empty() {
return None;
}
.filter_map(|name| {
let name = name.as_str();
let af = &self.filter;
@ -231,15 +203,8 @@ impl TryFrom<&HashMap<String, SubscribeRequestFilterSlots>> for FilterSlots {
Ok(FilterSlots {
filters: configs
.iter()
.filter_map(
|(name, filter)| {
if filter.any {
Some(name.clone())
} else {
None
}
},
)
// .filter_map(|(name, _filter)| Some(name.clone()))
.map(|(name, _filter)| name.clone())
.collect(),
})
}
@ -253,9 +218,10 @@ impl FilterSlots {
#[derive(Debug)]
pub struct FilterTransactionsInner {
any: bool,
vote: bool,
failed: bool,
vote: Option<bool>,
failed: Option<bool>,
accounts_include: HashSet<Pubkey>,
accounts_exclude: HashSet<Pubkey>,
}
#[derive(Debug, Default)]
@ -274,9 +240,18 @@ impl TryFrom<&HashMap<String, SubscribeRequestFilterTransactions>> for FilterTra
this.filters.insert(
name.clone(),
FilterTransactionsInner {
any: filter.any,
vote: filter.vote,
failed: filter.failed,
accounts_include: filter
.accounts_include
.iter()
.map(|v| Pubkey::from_str(v))
.collect::<Result<_, _>>()?,
accounts_exclude: filter
.accounts_exclude
.iter()
.map(|v| Pubkey::from_str(v))
.collect::<Result<_, _>>()?,
},
);
}
@ -285,30 +260,48 @@ impl TryFrom<&HashMap<String, SubscribeRequestFilterTransactions>> for FilterTra
}
impl FilterTransactions {
pub fn get_filters(&self, message: &MessageTransaction) -> Vec<String> {
pub fn get_filters(
&self,
MessageTransaction { transaction, .. }: &MessageTransaction,
) -> Vec<String> {
self.filters
.iter()
.filter_map(|(name, inner)| {
let is_vote = message.transaction.is_vote;
let is_failed = message.transaction.meta.status.is_err();
if inner.any {
if is_vote && !inner.vote {
if let Some(is_vote) = inner.vote {
if is_vote != transaction.is_vote {
return None;
}
if is_failed && !inner.failed {
return None;
}
Some(name.clone())
} else {
if is_vote == inner.vote && is_failed == inner.failed {
return Some(name.clone());
}
None
}
if let Some(is_failed) = inner.failed {
if is_failed != transaction.meta.status.is_err() {
return None;
}
}
if !inner.accounts_include.is_empty()
&& transaction
.transaction
.message()
.account_keys()
.iter()
.all(|pubkey| !inner.accounts_include.contains(pubkey))
{
return None;
}
if !inner.accounts_exclude.is_empty()
&& transaction
.transaction
.message()
.account_keys()
.iter()
.any(|pubkey| inner.accounts_exclude.contains(pubkey))
{
return None;
}
Some(name.clone())
})
.collect()
}
@ -328,15 +321,8 @@ impl TryFrom<&HashMap<String, SubscribeRequestFilterBlocks>> for FilterBlocks {
Ok(FilterBlocks {
filters: configs
.iter()
.filter_map(
|(name, filter)| {
if filter.any {
Some(name.clone())
} else {
None
}
},
)
// .filter_map(|(name, _filter)| Some(name.clone()))
.map(|(name, _filter)| name.clone())
.collect(),
})
}

View File

@ -18,7 +18,7 @@ use {
},
solana_sdk::{
clock::UnixTimestamp, pubkey::Pubkey, signature::Signature,
transaction::VersionedTransaction,
transaction::SanitizedTransaction,
},
solana_transaction_status::{Reward, TransactionStatusMeta},
std::{
@ -100,7 +100,7 @@ impl From<(u64, Option<u64>, SlotStatus)> for MessageSlot {
pub struct MessageTransactionInfo {
pub signature: Signature,
pub is_vote: bool,
pub transaction: VersionedTransaction,
pub transaction: SanitizedTransaction,
pub meta: TransactionStatusMeta,
// pub index: usize,
}
@ -118,7 +118,7 @@ impl<'a> From<(ReplicaTransactionInfoVersions<'a>, u64)> for MessageTransaction
ReplicaTransactionInfoVersions::V0_0_1(info) => MessageTransactionInfo {
signature: *info.signature,
is_vote: info.is_vote,
transaction: info.transaction.to_versioned_transaction(),
transaction: info.transaction.clone(),
meta: info.transaction_status_meta.clone(),
// index: info.index,
},

View File

@ -19,27 +19,28 @@ mod convert {
clock::UnixTimestamp,
instruction::CompiledInstruction,
message::{
legacy::Message as LegacyMessage, v0::MessageAddressTableLookup, MessageHeader,
VersionedMessage,
legacy::Message as LegacyMessage,
v0::{LoadedMessage, MessageAddressTableLookup},
MessageHeader, SanitizedMessage,
},
pubkey::Pubkey,
signature::Signature,
transaction::VersionedTransaction,
transaction::SanitizedTransaction,
},
solana_transaction_status::{
InnerInstructions, Reward, RewardType, TransactionStatusMeta, TransactionTokenBalance,
},
};
impl From<&VersionedTransaction> for super::Transaction {
fn from(value: &VersionedTransaction) -> Self {
impl From<&SanitizedTransaction> for super::Transaction {
fn from(value: &SanitizedTransaction) -> Self {
Self {
signatures: value
.signatures
.signatures()
.iter()
.map(|signature| <Signature as AsRef<[u8]>>::as_ref(signature).into())
.collect(),
message: Some((&value.message).into()),
message: Some(value.message().into()),
}
}
}
@ -61,11 +62,11 @@ mod convert {
}
}
impl From<&VersionedMessage> for super::Message {
fn from(message: &VersionedMessage) -> Self {
impl From<&SanitizedMessage> for super::Message {
fn from(message: &SanitizedMessage) -> Self {
match message {
VersionedMessage::Legacy(message) => Self::from(message),
VersionedMessage::V0(message) => Self {
SanitizedMessage::Legacy(message) => Self::from(message),
SanitizedMessage::V0(LoadedMessage { message, .. }) => Self {
header: Some((&message.header).into()),
account_keys: message
.account_keys