geyser: add `transactions_status` filter (#310)

This commit is contained in:
Kirill Fomichev 2024-04-02 20:46:43 -04:00
parent 834b986d47
commit f8d2d30382
No known key found for this signature in database
GPG Key ID: 6AA0144D5E0C0C0A
17 changed files with 3543 additions and 1522 deletions

View File

@ -15,6 +15,7 @@ The minor version will be incremented upon a breaking change and the patch versi
### Features ### Features
- client: add gRPC channel options to Node.js ([#306](https://github.com/rpcpool/yellowstone-grpc/pull/306)) - client: add gRPC channel options to Node.js ([#306](https://github.com/rpcpool/yellowstone-grpc/pull/306))
- geyser: add `transactions_status` filter ([#310](https://github.com/rpcpool/yellowstone-grpc/pull/310))
### Breaking ### Breaking

File diff suppressed because it is too large Load Diff

View File

@ -3,7 +3,7 @@ use {
clap::{Parser, Subcommand, ValueEnum}, clap::{Parser, Subcommand, ValueEnum},
futures::{future::TryFutureExt, sink::SinkExt, stream::StreamExt}, futures::{future::TryFutureExt, sink::SinkExt, stream::StreamExt},
log::{error, info}, log::{error, info},
solana_sdk::{pubkey::Pubkey, signature::Signature}, solana_sdk::{pubkey::Pubkey, signature::Signature, transaction::TransactionError},
solana_transaction_status::{EncodedTransactionWithStatusMeta, UiTransactionEncoding}, solana_transaction_status::{EncodedTransactionWithStatusMeta, UiTransactionEncoding},
std::{ std::{
collections::HashMap, collections::HashMap,
@ -23,7 +23,7 @@ use {
SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta,
SubscribeRequestFilterEntry, SubscribeRequestFilterSlots, SubscribeRequestFilterEntry, SubscribeRequestFilterSlots,
SubscribeRequestFilterTransactions, SubscribeRequestPing, SubscribeUpdateAccount, SubscribeRequestFilterTransactions, SubscribeRequestPing, SubscribeUpdateAccount,
SubscribeUpdateTransaction, SubscribeUpdateTransaction, SubscribeUpdateTransactionStatus,
}, },
tonic::service::Interceptor, tonic::service::Interceptor,
}, },
@ -32,6 +32,7 @@ use {
type SlotsFilterMap = HashMap<String, SubscribeRequestFilterSlots>; type SlotsFilterMap = HashMap<String, SubscribeRequestFilterSlots>;
type AccountFilterMap = HashMap<String, SubscribeRequestFilterAccounts>; type AccountFilterMap = HashMap<String, SubscribeRequestFilterAccounts>;
type TransactionsFilterMap = HashMap<String, SubscribeRequestFilterTransactions>; type TransactionsFilterMap = HashMap<String, SubscribeRequestFilterTransactions>;
type TransactionsStatusFilterMap = HashMap<String, SubscribeRequestFilterTransactions>;
type EntryFilterMap = HashMap<String, SubscribeRequestFilterEntry>; type EntryFilterMap = HashMap<String, SubscribeRequestFilterEntry>;
type BlocksFilterMap = HashMap<String, SubscribeRequestFilterBlocks>; type BlocksFilterMap = HashMap<String, SubscribeRequestFilterBlocks>;
type BlocksMetaFilterMap = HashMap<String, SubscribeRequestFilterBlocksMeta>; type BlocksMetaFilterMap = HashMap<String, SubscribeRequestFilterBlocksMeta>;
@ -167,6 +168,34 @@ struct ActionSubscribe {
#[clap(long)] #[clap(long)]
transactions_account_required: Vec<String>, transactions_account_required: Vec<String>,
/// Subscribe on transactions_status updates
#[clap(long)]
transactions_status: bool,
/// Filter vote transactions for transactions_status
#[clap(long)]
transactions_status_vote: Option<bool>,
/// Filter failed transactions for transactions_status
#[clap(long)]
transactions_status_failed: Option<bool>,
/// Filter by transaction signature for transactions_status
#[clap(long)]
transactions_status_signature: Option<String>,
/// Filter included account in transactions for transactions_status
#[clap(long)]
transactions_status_account_include: Vec<String>,
/// Filter excluded account in transactions for transactions_status
#[clap(long)]
transactions_status_account_exclude: Vec<String>,
/// Filter required account in transactions for transactions_status
#[clap(long)]
transactions_status_account_required: Vec<String>,
#[clap(long)] #[clap(long)]
entry: bool, entry: bool,
@ -287,6 +316,21 @@ impl Action {
); );
} }
let mut transactions_status: TransactionsStatusFilterMap = HashMap::new();
if args.transactions_status {
transactions_status.insert(
"client".to_string(),
SubscribeRequestFilterTransactions {
vote: args.transactions_status_vote,
failed: args.transactions_status_failed,
signature: args.transactions_status_signature.clone(),
account_include: args.transactions_status_account_include.clone(),
account_exclude: args.transactions_status_account_exclude.clone(),
account_required: args.transactions_status_account_required.clone(),
},
);
}
let mut entry: EntryFilterMap = HashMap::new(); let mut entry: EntryFilterMap = HashMap::new();
if args.entry { if args.entry {
entry.insert("client".to_owned(), SubscribeRequestFilterEntry {}); entry.insert("client".to_owned(), SubscribeRequestFilterEntry {});
@ -331,6 +375,7 @@ impl Action {
slots, slots,
accounts, accounts,
transactions, transactions,
transactions_status,
entry, entry,
blocks, blocks,
blocks_meta, blocks_meta,
@ -427,6 +472,29 @@ impl From<SubscribeUpdateTransaction> for TransactionPretty {
} }
} }
#[allow(dead_code)]
#[derive(Debug)]
pub struct TransactionStatusPretty {
slot: u64,
signature: Signature,
is_vote: bool,
index: u64,
err: Option<TransactionError>,
}
impl From<SubscribeUpdateTransactionStatus> for TransactionStatusPretty {
fn from(status: SubscribeUpdateTransactionStatus) -> Self {
Self {
slot: status.slot,
signature: Signature::try_from(status.signature.as_slice()).expect("valid signature"),
is_vote: status.is_vote,
index: status.index,
err: yellowstone_grpc_proto::convert_from::create_tx_error(status.err.as_ref())
.expect("valid tx err"),
}
}
}
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
env::set_var( env::set_var(
@ -563,6 +631,14 @@ async fn geyser_subscribe(
); );
continue; continue;
} }
Some(UpdateOneof::TransactionStatus(status)) => {
let status: TransactionStatusPretty = status.into();
info!(
"new transaction update: filters {:?}, transaction status: {:?}",
msg.filters, status
);
continue;
}
Some(UpdateOneof::Ping(_)) => { Some(UpdateOneof::Ping(_)) => {
// This is necessary to keep load balancers that expect client pings alive. If your load balancer doesn't // This is necessary to keep load balancers that expect client pings alive. If your load balancer doesn't
// require periodic client pings then this is unnecessary // require periodic client pings then this is unnecessary
@ -594,6 +670,7 @@ async fn geyser_subscribe(
slots: new_slots.clone(), slots: new_slots.clone(),
accounts: HashMap::default(), accounts: HashMap::default(),
transactions: HashMap::default(), transactions: HashMap::default(),
transactions_status: HashMap::default(),
entry: HashMap::default(), entry: HashMap::default(),
blocks: HashMap::default(), blocks: HashMap::default(),
blocks_meta: HashMap::default(), blocks_meta: HashMap::default(),

View File

@ -91,7 +91,8 @@ async fn main() -> anyhow::Result<()> {
.send(SubscribeRequest { .send(SubscribeRequest {
slots: HashMap::new(), slots: HashMap::new(),
accounts: HashMap::new(), accounts: HashMap::new(),
transactions: hashmap! { "".to_owned() => SubscribeRequestFilterTransactions { transactions: HashMap::new(),
transactions_status: hashmap! { "".to_owned() => SubscribeRequestFilterTransactions {
vote: args.vote, vote: args.vote,
failed: args.failed, failed: args.failed,
signature: args.signature, signature: args.signature,
@ -113,9 +114,9 @@ async fn main() -> anyhow::Result<()> {
match message { match message {
Ok(msg) => { Ok(msg) => {
match msg.update_oneof { match msg.update_oneof {
Some(UpdateOneof::Transaction(tx)) => { Some(UpdateOneof::TransactionStatus(tx)) => {
let entry = messages.entry(tx.slot).or_default(); let entry = messages.entry(tx.slot).or_default();
let sig = Signature::try_from(tx.transaction.unwrap().signature.as_slice()) let sig = Signature::try_from(tx.signature.as_slice())
.expect("valid signature from transaction") .expect("valid signature from transaction")
.to_string(); .to_string();
if let Some(timestamp) = entry.0 { if let Some(timestamp) = entry.0 {

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -92,6 +92,7 @@ export default class Client {
accounts: { [key: string]: SubscribeRequestFilterAccounts }, accounts: { [key: string]: SubscribeRequestFilterAccounts },
slots: { [key: string]: SubscribeRequestFilterSlots }, slots: { [key: string]: SubscribeRequestFilterSlots },
transactions: { [key: string]: SubscribeRequestFilterTransactions }, transactions: { [key: string]: SubscribeRequestFilterTransactions },
transactionsStatus: { [key: string]: SubscribeRequestFilterTransactions },
entry: { [key: string]: SubscribeRequestFilterEntry }, entry: { [key: string]: SubscribeRequestFilterEntry },
blocks: { [key: string]: SubscribeRequestFilterBlocks }, blocks: { [key: string]: SubscribeRequestFilterBlocks },
blocksMeta: { [key: string]: SubscribeRequestFilterBlocksMeta }, blocksMeta: { [key: string]: SubscribeRequestFilterBlocksMeta },
@ -106,6 +107,7 @@ export default class Client {
accounts, accounts,
slots, slots,
transactions, transactions,
transactionsStatus,
entry, entry,
blocks, blocks,
blocksMeta, blocksMeta,

View File

@ -212,6 +212,7 @@ impl<F: Interceptor> GeyserGrpcClient<F> {
slots: HashMap<String, SubscribeRequestFilterSlots>, slots: HashMap<String, SubscribeRequestFilterSlots>,
accounts: HashMap<String, SubscribeRequestFilterAccounts>, accounts: HashMap<String, SubscribeRequestFilterAccounts>,
transactions: HashMap<String, SubscribeRequestFilterTransactions>, transactions: HashMap<String, SubscribeRequestFilterTransactions>,
transactions_status: HashMap<String, SubscribeRequestFilterTransactions>,
entry: HashMap<String, SubscribeRequestFilterEntry>, entry: HashMap<String, SubscribeRequestFilterEntry>,
blocks: HashMap<String, SubscribeRequestFilterBlocks>, blocks: HashMap<String, SubscribeRequestFilterBlocks>,
blocks_meta: HashMap<String, SubscribeRequestFilterBlocksMeta>, blocks_meta: HashMap<String, SubscribeRequestFilterBlocksMeta>,
@ -223,6 +224,7 @@ impl<F: Interceptor> GeyserGrpcClient<F> {
slots, slots,
accounts, accounts,
transactions, transactions,
transactions_status,
entry, entry,
blocks, blocks,
blocks_meta, blocks_meta,

View File

@ -35,6 +35,14 @@
"account_exclude_max": 10, "account_exclude_max": 10,
"account_required_max": 10 "account_required_max": 10
}, },
"transactions_status": {
"max": 1,
"any": false,
"account_include_max": 10,
"account_include_reject": ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"],
"account_exclude_max": 10,
"account_required_max": 10
},
"blocks": { "blocks": {
"max": 1, "max": 1,
"account_include_max": 10, "account_include_max": 10,

View File

@ -138,6 +138,7 @@ pub struct ConfigGrpcFilters {
pub accounts: ConfigGrpcFiltersAccounts, pub accounts: ConfigGrpcFiltersAccounts,
pub slots: ConfigGrpcFiltersSlots, pub slots: ConfigGrpcFiltersSlots,
pub transactions: ConfigGrpcFiltersTransactions, pub transactions: ConfigGrpcFiltersTransactions,
pub transactions_status: ConfigGrpcFiltersTransactions,
pub blocks: ConfigGrpcFiltersBlocks, pub blocks: ConfigGrpcFiltersBlocks,
pub blocks_meta: ConfigGrpcFiltersBlocksMeta, pub blocks_meta: ConfigGrpcFiltersBlocksMeta,
pub entry: ConfigGrpcFiltersEntry, pub entry: ConfigGrpcFiltersEntry,

View File

@ -33,6 +33,7 @@ pub struct Filter {
accounts: FilterAccounts, accounts: FilterAccounts,
slots: FilterSlots, slots: FilterSlots,
transactions: FilterTransactions, transactions: FilterTransactions,
transactions_status: FilterTransactions,
entry: FilterEntry, entry: FilterEntry,
blocks: FilterBlocks, blocks: FilterBlocks,
blocks_meta: FilterBlocksMeta, blocks_meta: FilterBlocksMeta,
@ -46,7 +47,16 @@ impl Filter {
Ok(Self { Ok(Self {
accounts: FilterAccounts::new(&config.accounts, &limit.accounts)?, accounts: FilterAccounts::new(&config.accounts, &limit.accounts)?,
slots: FilterSlots::new(&config.slots, &limit.slots)?, slots: FilterSlots::new(&config.slots, &limit.slots)?,
transactions: FilterTransactions::new(&config.transactions, &limit.transactions)?, transactions: FilterTransactions::new(
&config.transactions,
&limit.transactions,
FilterTransactionsType::Transaction,
)?,
transactions_status: FilterTransactions::new(
&config.transactions_status,
&limit.transactions_status,
FilterTransactionsType::TransactionStatus,
)?,
entry: FilterEntry::new(&config.entry, &limit.entry)?, entry: FilterEntry::new(&config.entry, &limit.entry)?,
blocks: FilterBlocks::new(&config.blocks, &limit.blocks)?, blocks: FilterBlocks::new(&config.blocks, &limit.blocks)?,
blocks_meta: FilterBlocksMeta::new(&config.blocks_meta, &limit.blocks_meta)?, blocks_meta: FilterBlocksMeta::new(&config.blocks_meta, &limit.blocks_meta)?,
@ -91,38 +101,42 @@ impl Filter {
} }
pub fn get_filters<'a>( pub fn get_filters<'a>(
&self, &'a self,
message: &'a Message, message: &'a Message,
commitment: Option<CommitmentLevel>, commitment: Option<CommitmentLevel>,
) -> Vec<(Vec<String>, MessageRef<'a>)> { ) -> Box<dyn Iterator<Item = (Vec<String>, MessageRef<'a>)> + Send + 'a> {
match message { match message {
Message::Account(message) => self.accounts.get_filters(message), Message::Account(message) => self.accounts.get_filters(message),
Message::Slot(message) => self.slots.get_filters(message, commitment), Message::Slot(message) => self.slots.get_filters(message, commitment),
Message::Transaction(message) => self.transactions.get_filters(message), Message::Transaction(message) => Box::new(
self.transactions
.get_filters(message)
.chain(self.transactions_status.get_filters(message)),
),
Message::Entry(message) => self.entry.get_filters(message), Message::Entry(message) => self.entry.get_filters(message),
Message::Block(message) => self.blocks.get_filters(message), Message::Block(message) => self.blocks.get_filters(message),
Message::BlockMeta(message) => self.blocks_meta.get_filters(message), Message::BlockMeta(message) => self.blocks_meta.get_filters(message),
} }
} }
pub fn get_update( pub fn get_update<'a>(
&self, &'a self,
message: &Message, message: &'a Message,
commitment: Option<CommitmentLevel>, commitment: Option<CommitmentLevel>,
) -> Vec<SubscribeUpdate> { ) -> Box<dyn Iterator<Item = SubscribeUpdate> + Send + 'a> {
self.get_filters(message, commitment) Box::new(
.into_iter() self.get_filters(message, commitment)
.filter_map(|(filters, message)| { .filter_map(|(filters, message)| {
if filters.is_empty() { if filters.is_empty() {
None None
} else { } else {
Some(SubscribeUpdate { Some(SubscribeUpdate {
filters, filters,
update_oneof: Some(message.to_proto(&self.accounts_data_slice)), update_oneof: Some(message.to_proto(&self.accounts_data_slice)),
}) })
} }
}) }),
.collect() )
} }
pub fn get_pong_msg(&self) -> Option<SubscribeUpdate> { pub fn get_pong_msg(&self) -> Option<SubscribeUpdate> {
@ -197,12 +211,18 @@ impl FilterAccounts {
Ok(required) Ok(required)
} }
fn get_filters<'a>(&self, message: &'a MessageAccount) -> Vec<(Vec<String>, MessageRef<'a>)> { fn get_filters<'a>(
&'a self,
message: &'a MessageAccount,
) -> Box<dyn Iterator<Item = (Vec<String>, MessageRef<'a>)> + Send + 'a> {
let mut filter = FilterAccountsMatch::new(self); let mut filter = FilterAccountsMatch::new(self);
filter.match_account(&message.account.pubkey); filter.match_account(&message.account.pubkey);
filter.match_owner(&message.account.owner); filter.match_owner(&message.account.owner);
filter.match_data(&message.account.data); filter.match_data(&message.account.data);
vec![(filter.get_filters(), MessageRef::Account(message))] Box::new(std::iter::once((
filter.get_filters(),
MessageRef::Account(message),
)))
} }
} }
@ -391,11 +411,11 @@ impl FilterSlots {
} }
fn get_filters<'a>( fn get_filters<'a>(
&self, &'a self,
message: &'a MessageSlot, message: &'a MessageSlot,
commitment: Option<CommitmentLevel>, commitment: Option<CommitmentLevel>,
) -> Vec<(Vec<String>, MessageRef<'a>)> { ) -> Box<dyn Iterator<Item = (Vec<String>, MessageRef<'a>)> + Send + 'a> {
vec![( Box::new(std::iter::once((
self.filters self.filters
.iter() .iter()
.filter_map(|(name, inner)| { .filter_map(|(name, inner)| {
@ -407,10 +427,16 @@ impl FilterSlots {
}) })
.collect(), .collect(),
MessageRef::Slot(message), MessageRef::Slot(message),
)] )))
} }
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FilterTransactionsType {
Transaction,
TransactionStatus,
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct FilterTransactionsInner { pub struct FilterTransactionsInner {
vote: Option<bool>, vote: Option<bool>,
@ -421,8 +447,9 @@ pub struct FilterTransactionsInner {
account_required: Vec<Pubkey>, account_required: Vec<Pubkey>,
} }
#[derive(Debug, Default, Clone)] #[derive(Debug, Clone)]
pub struct FilterTransactions { pub struct FilterTransactions {
filter_type: FilterTransactionsType,
filters: HashMap<String, FilterTransactionsInner>, filters: HashMap<String, FilterTransactionsInner>,
} }
@ -430,10 +457,11 @@ impl FilterTransactions {
fn new( fn new(
configs: &HashMap<String, SubscribeRequestFilterTransactions>, configs: &HashMap<String, SubscribeRequestFilterTransactions>,
limit: &ConfigGrpcFiltersTransactions, limit: &ConfigGrpcFiltersTransactions,
filter_type: FilterTransactionsType,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
ConfigGrpcFilters::check_max(configs.len(), limit.max)?; ConfigGrpcFilters::check_max(configs.len(), limit.max)?;
let mut this = Self::default(); let mut filters = HashMap::new();
for (name, filter) in configs { for (name, filter) in configs {
ConfigGrpcFilters::check_any( ConfigGrpcFilters::check_any(
filter.vote.is_none() filter.vote.is_none()
@ -456,7 +484,7 @@ impl FilterTransactions {
limit.account_required_max, limit.account_required_max,
)?; )?;
this.filters.insert( filters.insert(
name.clone(), name.clone(),
FilterTransactionsInner { FilterTransactionsInner {
vote: filter.vote, vote: filter.vote,
@ -485,13 +513,16 @@ impl FilterTransactions {
}, },
); );
} }
Ok(this) Ok(Self {
filter_type,
filters,
})
} }
pub fn get_filters<'a>( pub fn get_filters<'a>(
&self, &'a self,
message: &'a MessageTransaction, message: &'a MessageTransaction,
) -> Vec<(Vec<String>, MessageRef<'a>)> { ) -> Box<dyn Iterator<Item = (Vec<String>, MessageRef<'a>)> + Send + 'a> {
let filters = self let filters = self
.filters .filters
.iter() .iter()
@ -565,7 +596,11 @@ impl FilterTransactions {
Some(name.clone()) Some(name.clone())
}) })
.collect(); .collect();
vec![(filters, MessageRef::Transaction(message))] let message = match self.filter_type {
FilterTransactionsType::Transaction => MessageRef::Transaction(message),
FilterTransactionsType::TransactionStatus => MessageRef::TransactionStatus(message),
};
Box::new(std::iter::once((filters, message)))
} }
} }
@ -590,8 +625,14 @@ impl FilterEntry {
}) })
} }
fn get_filters<'a>(&self, message: &'a MessageEntry) -> Vec<(Vec<String>, MessageRef<'a>)> { fn get_filters<'a>(
vec![(self.filters.clone(), MessageRef::Entry(message))] &'a self,
message: &'a MessageEntry,
) -> Box<dyn Iterator<Item = (Vec<String>, MessageRef<'a>)> + Send + 'a> {
Box::new(std::iter::once((
self.filters.clone(),
MessageRef::Entry(message),
)))
} }
} }
@ -654,68 +695,68 @@ impl FilterBlocks {
Ok(this) Ok(this)
} }
fn get_filters<'a>(&self, message: &'a MessageBlock) -> Vec<(Vec<String>, MessageRef<'a>)> { fn get_filters<'a>(
self.filters &'a self,
.iter() message: &'a MessageBlock,
.map(|(filter, inner)| { ) -> Box<dyn Iterator<Item = (Vec<String>, MessageRef<'a>)> + Send + 'a> {
#[allow(clippy::unnecessary_filter_map)] Box::new(self.filters.iter().map(move |(filter, inner)| {
let transactions = #[allow(clippy::unnecessary_filter_map)]
if matches!(inner.include_transactions, None | Some(true)) { let transactions = if matches!(inner.include_transactions, None | Some(true)) {
message message
.transactions .transactions
.iter() .iter()
.filter_map(|tx| { .filter_map(|tx| {
if !inner.account_include.is_empty() if !inner.account_include.is_empty()
&& tx.transaction.message().account_keys().iter().all( && tx
|pubkey| { .transaction
inner.account_include.binary_search(pubkey).is_err() .message()
}, .account_keys()
) .iter()
{ .all(|pubkey| inner.account_include.binary_search(pubkey).is_err())
return None; {
} return None;
}
Some(tx) Some(tx)
}) })
.collect::<Vec<_>>() .collect::<Vec<_>>()
} else { } else {
vec![] vec![]
}; };
#[allow(clippy::unnecessary_filter_map)] #[allow(clippy::unnecessary_filter_map)]
let accounts = if inner.include_accounts == Some(true) { let accounts = if inner.include_accounts == Some(true) {
message message
.accounts .accounts
.iter() .iter()
.filter_map(|account| { .filter_map(|account| {
if !inner.account_include.is_empty() if !inner.account_include.is_empty()
&& inner && inner
.account_include .account_include
.binary_search(&account.pubkey) .binary_search(&account.pubkey)
.is_err() .is_err()
{ {
return None; return None;
} }
Some(account) Some(account)
}) })
.collect::<Vec<_>>() .collect::<Vec<_>>()
} else { } else {
vec![] vec![]
}; };
let entries = if inner.include_entries == Some(true) { let entries = if inner.include_entries == Some(true) {
message.entries.iter().collect::<Vec<_>>() message.entries.iter().collect::<Vec<_>>()
} else { } else {
vec![] vec![]
}; };
( (
vec![filter.clone()], vec![filter.clone()],
MessageRef::Block((message, transactions, accounts, entries).into()), MessageRef::Block((message, transactions, accounts, entries).into()),
) )
}) }))
.collect()
} }
} }
@ -740,8 +781,14 @@ impl FilterBlocksMeta {
}) })
} }
fn get_filters<'a>(&self, message: &'a MessageBlockMeta) -> Vec<(Vec<String>, MessageRef<'a>)> { fn get_filters<'a>(
vec![(self.filters.clone(), MessageRef::BlockMeta(message))] &'a self,
message: &'a MessageBlockMeta,
) -> Box<dyn Iterator<Item = (Vec<String>, MessageRef<'a>)> + Send + 'a> {
Box::new(std::iter::once((
self.filters.clone(),
MessageRef::BlockMeta(message),
)))
} }
} }
@ -788,7 +835,7 @@ mod tests {
crate::{ crate::{
config::ConfigGrpcFilters, config::ConfigGrpcFilters,
filters::Filter, filters::Filter,
grpc::{Message, MessageTransaction, MessageTransactionInfo}, grpc::{Message, MessageRef, MessageTransaction, MessageTransactionInfo},
}, },
solana_sdk::{ solana_sdk::{
hash::Hash, hash::Hash,
@ -855,6 +902,7 @@ mod tests {
accounts: HashMap::new(), accounts: HashMap::new(),
slots: HashMap::new(), slots: HashMap::new(),
transactions: HashMap::new(), transactions: HashMap::new(),
transactions_status: HashMap::new(),
blocks: HashMap::new(), blocks: HashMap::new(),
blocks_meta: HashMap::new(), blocks_meta: HashMap::new(),
entry: HashMap::new(), entry: HashMap::new(),
@ -884,6 +932,7 @@ mod tests {
accounts, accounts,
slots: HashMap::new(), slots: HashMap::new(),
transactions: HashMap::new(), transactions: HashMap::new(),
transactions_status: HashMap::new(),
blocks: HashMap::new(), blocks: HashMap::new(),
blocks_meta: HashMap::new(), blocks_meta: HashMap::new(),
entry: HashMap::new(), entry: HashMap::new(),
@ -918,6 +967,7 @@ mod tests {
accounts: HashMap::new(), accounts: HashMap::new(),
slots: HashMap::new(), slots: HashMap::new(),
transactions, transactions,
transactions_status: HashMap::new(),
blocks: HashMap::new(), blocks: HashMap::new(),
blocks_meta: HashMap::new(), blocks_meta: HashMap::new(),
entry: HashMap::new(), entry: HashMap::new(),
@ -951,6 +1001,7 @@ mod tests {
accounts: HashMap::new(), accounts: HashMap::new(),
slots: HashMap::new(), slots: HashMap::new(),
transactions, transactions,
transactions_status: HashMap::new(),
blocks: HashMap::new(), blocks: HashMap::new(),
blocks_meta: HashMap::new(), blocks_meta: HashMap::new(),
entry: HashMap::new(), entry: HashMap::new(),
@ -990,6 +1041,7 @@ mod tests {
accounts: HashMap::new(), accounts: HashMap::new(),
slots: HashMap::new(), slots: HashMap::new(),
transactions, transactions,
transactions_status: HashMap::new(),
blocks: HashMap::new(), blocks: HashMap::new(),
blocks_meta: HashMap::new(), blocks_meta: HashMap::new(),
entry: HashMap::new(), entry: HashMap::new(),
@ -1003,9 +1055,12 @@ mod tests {
let message_transaction = let message_transaction =
create_message_transaction(&keypair_b, vec![account_key_b, account_key_a]); create_message_transaction(&keypair_b, vec![account_key_b, account_key_a]);
let message = Message::Transaction(message_transaction); let message = Message::Transaction(message_transaction);
for (filters, _message) in filter.get_filters(&message, None) { let updates = filter.get_filters(&message, None).collect::<Vec<_>>();
assert!(!filters.is_empty()); assert_eq!(updates.len(), 2);
} assert_eq!(updates[0].0, vec!["serum"]);
assert!(matches!(updates[0].1, MessageRef::Transaction(_)));
assert_eq!(updates[1].0, Vec::<String>::new());
assert!(matches!(updates[1].1, MessageRef::TransactionStatus(_)));
} }
#[test] #[test]
@ -1033,6 +1088,7 @@ mod tests {
accounts: HashMap::new(), accounts: HashMap::new(),
slots: HashMap::new(), slots: HashMap::new(),
transactions, transactions,
transactions_status: HashMap::new(),
blocks: HashMap::new(), blocks: HashMap::new(),
blocks_meta: HashMap::new(), blocks_meta: HashMap::new(),
entry: HashMap::new(), entry: HashMap::new(),
@ -1046,9 +1102,12 @@ mod tests {
let message_transaction = let message_transaction =
create_message_transaction(&keypair_b, vec![account_key_b, account_key_a]); create_message_transaction(&keypair_b, vec![account_key_b, account_key_a]);
let message = Message::Transaction(message_transaction); let message = Message::Transaction(message_transaction);
for (filters, _message) in filter.get_filters(&message, None) { let updates = filter.get_filters(&message, None).collect::<Vec<_>>();
assert!(!filters.is_empty()); assert_eq!(updates.len(), 2);
} assert_eq!(updates[0].0, vec!["serum"]);
assert!(matches!(updates[0].1, MessageRef::Transaction(_)));
assert_eq!(updates[1].0, Vec::<String>::new());
assert!(matches!(updates[1].1, MessageRef::TransactionStatus(_)));
} }
#[test] #[test]
@ -1076,6 +1135,7 @@ mod tests {
accounts: HashMap::new(), accounts: HashMap::new(),
slots: HashMap::new(), slots: HashMap::new(),
transactions, transactions,
transactions_status: HashMap::new(),
blocks: HashMap::new(), blocks: HashMap::new(),
blocks_meta: HashMap::new(), blocks_meta: HashMap::new(),
entry: HashMap::new(), entry: HashMap::new(),
@ -1125,6 +1185,7 @@ mod tests {
accounts: HashMap::new(), accounts: HashMap::new(),
slots: HashMap::new(), slots: HashMap::new(),
transactions, transactions,
transactions_status: HashMap::new(),
blocks: HashMap::new(), blocks: HashMap::new(),
blocks_meta: HashMap::new(), blocks_meta: HashMap::new(),
entry: HashMap::new(), entry: HashMap::new(),
@ -1140,9 +1201,12 @@ mod tests {
vec![account_key_x, account_key_y, account_key_z], vec![account_key_x, account_key_y, account_key_z],
); );
let message = Message::Transaction(message_transaction); let message = Message::Transaction(message_transaction);
for (filters, _message) in filter.get_filters(&message, None) { let updates = filter.get_filters(&message, None).collect::<Vec<_>>();
assert!(!filters.is_empty()); assert_eq!(updates.len(), 2);
} assert_eq!(updates[0].0, vec!["serum"]);
assert!(matches!(updates[0].1, MessageRef::Transaction(_)));
assert_eq!(updates[1].0, Vec::<String>::new());
assert!(matches!(updates[1].1, MessageRef::TransactionStatus(_)));
} }
#[test] #[test]
@ -1176,6 +1240,7 @@ mod tests {
accounts: HashMap::new(), accounts: HashMap::new(),
slots: HashMap::new(), slots: HashMap::new(),
transactions, transactions,
transactions_status: HashMap::new(),
blocks: HashMap::new(), blocks: HashMap::new(),
blocks_meta: HashMap::new(), blocks_meta: HashMap::new(),
entry: HashMap::new(), entry: HashMap::new(),

View File

@ -51,6 +51,7 @@ use {
SubscribeUpdateAccount, SubscribeUpdateAccountInfo, SubscribeUpdateBlock, SubscribeUpdateAccount, SubscribeUpdateAccountInfo, SubscribeUpdateBlock,
SubscribeUpdateBlockMeta, SubscribeUpdateEntry, SubscribeUpdatePing, SubscribeUpdateBlockMeta, SubscribeUpdateEntry, SubscribeUpdatePing,
SubscribeUpdateSlot, SubscribeUpdateTransaction, SubscribeUpdateTransactionInfo, SubscribeUpdateSlot, SubscribeUpdateTransaction, SubscribeUpdateTransactionInfo,
SubscribeUpdateTransactionStatus, TransactionError as SubscribeUpdateTransactionError,
}, },
}, },
}; };
@ -393,6 +394,7 @@ pub enum MessageRef<'a> {
Slot(&'a MessageSlot), Slot(&'a MessageSlot),
Account(&'a MessageAccount), Account(&'a MessageAccount),
Transaction(&'a MessageTransaction), Transaction(&'a MessageTransaction),
TransactionStatus(&'a MessageTransaction),
Entry(&'a MessageEntry), Entry(&'a MessageEntry),
Block(MessageBlockRef<'a>), Block(MessageBlockRef<'a>),
BlockMeta(&'a MessageBlockMeta), BlockMeta(&'a MessageBlockMeta),
@ -415,6 +417,21 @@ impl<'a> MessageRef<'a> {
transaction: Some(message.transaction.to_proto()), transaction: Some(message.transaction.to_proto()),
slot: message.slot, slot: message.slot,
}), }),
Self::TransactionStatus(message) => {
UpdateOneof::TransactionStatus(SubscribeUpdateTransactionStatus {
slot: message.slot,
signature: message.transaction.signature.as_ref().into(),
is_vote: message.transaction.is_vote,
index: message.transaction.index as u64,
err: match &message.transaction.meta.status {
Ok(()) => None,
Err(err) => Some(SubscribeUpdateTransactionError {
err: bincode::serialize(&err)
.expect("transaction error to serialize to bytes"),
}),
},
})
}
Self::Entry(message) => UpdateOneof::Entry(message.to_proto()), Self::Entry(message) => UpdateOneof::Entry(message.to_proto()),
Self::Block(message) => UpdateOneof::Block(SubscribeUpdateBlock { Self::Block(message) => UpdateOneof::Block(SubscribeUpdateBlock {
slot: message.slot, slot: message.slot,
@ -1204,6 +1221,7 @@ impl Geyser for GrpcService {
accounts: HashMap::new(), accounts: HashMap::new(),
slots: HashMap::new(), slots: HashMap::new(),
transactions: HashMap::new(), transactions: HashMap::new(),
transactions_status: HashMap::new(),
blocks: HashMap::new(), blocks: HashMap::new(),
blocks_meta: HashMap::new(), blocks_meta: HashMap::new(),
entry: HashMap::new(), entry: HashMap::new(),

View File

@ -26,6 +26,7 @@ message SubscribeRequest {
map<string, SubscribeRequestFilterAccounts> accounts = 1; map<string, SubscribeRequestFilterAccounts> accounts = 1;
map<string, SubscribeRequestFilterSlots> slots = 2; map<string, SubscribeRequestFilterSlots> slots = 2;
map<string, SubscribeRequestFilterTransactions> transactions = 3; map<string, SubscribeRequestFilterTransactions> transactions = 3;
map<string, SubscribeRequestFilterTransactions> transactions_status = 10;
map<string, SubscribeRequestFilterBlocks> blocks = 4; map<string, SubscribeRequestFilterBlocks> blocks = 4;
map<string, SubscribeRequestFilterBlocksMeta> blocks_meta = 5; map<string, SubscribeRequestFilterBlocksMeta> blocks_meta = 5;
map<string, SubscribeRequestFilterEntry> entry = 8; map<string, SubscribeRequestFilterEntry> entry = 8;
@ -96,6 +97,7 @@ message SubscribeUpdate {
SubscribeUpdateAccount account = 2; SubscribeUpdateAccount account = 2;
SubscribeUpdateSlot slot = 3; SubscribeUpdateSlot slot = 3;
SubscribeUpdateTransaction transaction = 4; SubscribeUpdateTransaction transaction = 4;
SubscribeUpdateTransactionStatus transaction_status = 10;
SubscribeUpdateBlock block = 5; SubscribeUpdateBlock block = 5;
SubscribeUpdatePing ping = 6; SubscribeUpdatePing ping = 6;
SubscribeUpdatePong pong = 9; SubscribeUpdatePong pong = 9;
@ -140,6 +142,14 @@ message SubscribeUpdateTransactionInfo {
uint64 index = 5; uint64 index = 5;
} }
message SubscribeUpdateTransactionStatus {
uint64 slot = 1;
bytes signature = 2;
bool is_vote = 3;
uint64 index = 4;
solana.storage.ConfirmedBlock.TransactionError err = 5;
}
message SubscribeUpdateBlock { message SubscribeUpdateBlock {
uint64 slot = 1; uint64 slot = 1;
string blockhash = 2; string blockhash = 2;

View File

@ -282,6 +282,9 @@ impl ArgsAction {
Some(UpdateOneof::Transaction(msg)) => { Some(UpdateOneof::Transaction(msg)) => {
info!("#{}, transaction", msg.slot) info!("#{}, transaction", msg.slot)
} }
Some(UpdateOneof::TransactionStatus(msg)) => {
info!("#{}, transaction status", msg.slot)
}
Some(UpdateOneof::Block(msg)) => info!("#{}, block", msg.slot), Some(UpdateOneof::Block(msg)) => info!("#{}, block", msg.slot),
Some(UpdateOneof::Ping(_)) => {} Some(UpdateOneof::Ping(_)) => {}
Some(UpdateOneof::Pong(_)) => {} Some(UpdateOneof::Pong(_)) => {}

View File

@ -275,6 +275,7 @@ impl ArgsAction {
UpdateOneof::Account(msg) => msg.slot, UpdateOneof::Account(msg) => msg.slot,
UpdateOneof::Slot(msg) => msg.slot, UpdateOneof::Slot(msg) => msg.slot,
UpdateOneof::Transaction(msg) => msg.slot, UpdateOneof::Transaction(msg) => msg.slot,
UpdateOneof::TransactionStatus(msg) => msg.slot,
UpdateOneof::Block(msg) => msg.slot, UpdateOneof::Block(msg) => msg.slot,
UpdateOneof::Ping(_) => continue, UpdateOneof::Ping(_) => continue,
UpdateOneof::Pong(_) => continue, UpdateOneof::Pong(_) => continue,

View File

@ -44,6 +44,7 @@ pub struct ConfigGrpcRequest {
pub slots: HashMap<String, ConfigGrpcRequestSlots>, pub slots: HashMap<String, ConfigGrpcRequestSlots>,
pub accounts: HashMap<String, ConfigGrpcRequestAccounts>, pub accounts: HashMap<String, ConfigGrpcRequestAccounts>,
pub transactions: HashMap<String, ConfigGrpcRequestTransactions>, pub transactions: HashMap<String, ConfigGrpcRequestTransactions>,
pub transactions_status: HashMap<String, ConfigGrpcRequestTransactions>,
pub entries: HashSet<String>, pub entries: HashSet<String>,
pub blocks: HashMap<String, ConfigGrpcRequestBlocks>, pub blocks: HashMap<String, ConfigGrpcRequestBlocks>,
pub blocks_meta: HashSet<String>, pub blocks_meta: HashSet<String>,
@ -71,6 +72,7 @@ impl GrpcRequestToProto<SubscribeRequest> for ConfigGrpcRequest {
slots: ConfigGrpcRequest::map_to_proto(self.slots), slots: ConfigGrpcRequest::map_to_proto(self.slots),
accounts: ConfigGrpcRequest::map_to_proto(self.accounts), accounts: ConfigGrpcRequest::map_to_proto(self.accounts),
transactions: ConfigGrpcRequest::map_to_proto(self.transactions), transactions: ConfigGrpcRequest::map_to_proto(self.transactions),
transactions_status: ConfigGrpcRequest::map_to_proto(self.transactions_status),
entry: ConfigGrpcRequest::set_to_proto(self.entries), entry: ConfigGrpcRequest::set_to_proto(self.entries),
blocks: ConfigGrpcRequest::map_to_proto(self.blocks), blocks: ConfigGrpcRequest::map_to_proto(self.blocks),
blocks_meta: ConfigGrpcRequest::set_to_proto(self.blocks_meta), blocks_meta: ConfigGrpcRequest::set_to_proto(self.blocks_meta),

View File

@ -111,6 +111,7 @@ pub enum GprcMessageKind {
Account, Account,
Slot, Slot,
Transaction, Transaction,
TransactionStatus,
Block, Block,
Ping, Ping,
Pong, Pong,
@ -125,6 +126,7 @@ impl From<&UpdateOneof> for GprcMessageKind {
UpdateOneof::Account(_) => Self::Account, UpdateOneof::Account(_) => Self::Account,
UpdateOneof::Slot(_) => Self::Slot, UpdateOneof::Slot(_) => Self::Slot,
UpdateOneof::Transaction(_) => Self::Transaction, UpdateOneof::Transaction(_) => Self::Transaction,
UpdateOneof::TransactionStatus(_) => Self::TransactionStatus,
UpdateOneof::Block(_) => Self::Block, UpdateOneof::Block(_) => Self::Block,
UpdateOneof::Ping(_) => Self::Ping, UpdateOneof::Ping(_) => Self::Ping,
UpdateOneof::Pong(_) => Self::Pong, UpdateOneof::Pong(_) => Self::Pong,
@ -140,6 +142,7 @@ impl GprcMessageKind {
GprcMessageKind::Account => "account", GprcMessageKind::Account => "account",
GprcMessageKind::Slot => "slot", GprcMessageKind::Slot => "slot",
GprcMessageKind::Transaction => "transaction", GprcMessageKind::Transaction => "transaction",
GprcMessageKind::TransactionStatus => "transactionstatus",
GprcMessageKind::Block => "block", GprcMessageKind::Block => "block",
GprcMessageKind::Ping => "ping", GprcMessageKind::Ping => "ping",
GprcMessageKind::Pong => "pong", GprcMessageKind::Pong => "pong",