Creating seperate filters for all accounts subscription and all transactions subscription

This commit is contained in:
godmodegalactus 2024-05-19 19:35:35 +02:00
parent 95bd1f99fa
commit 17590072b1
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
5 changed files with 73 additions and 42 deletions

View File

@ -36,8 +36,8 @@ client.subscribe(vec![
.unwrap();
```
You can also subscibe to all the account updates by setting owner : 'SystemProgram'.
Similarly you can also subscibe to all transaction update by setting filter : `Filter::Transaction(Signature::default()),`.
You can also subscibe to all the account updates by using filter `Filter::AccountsAll`
Similarly you can also subscibe to all transaction update by setting filter : `Filter::TransactionsAll,`.
### Tester

View File

@ -153,14 +153,7 @@ mod tests {
)
.await
.unwrap();
client
.subscribe(vec![Filter::Account(AccountFilter {
owner: Some(Pubkey::default()),
accounts: None,
filter: None,
})])
.await
.unwrap();
client.subscribe(vec![Filter::AccountsAll]).await.unwrap();
notify_subscription.notify_one();

View File

@ -9,31 +9,30 @@ use crate::message::Message;
#[repr(C)]
pub enum Filter {
Account(AccountFilter),
AccountsAll,
Slot,
BlockMeta,
Transaction(Signature),
TransactionsAll,
}
impl Filter {
pub fn allows(&self, message: &Message) -> bool {
match &self {
Filter::Account(account) => account.allows(message),
Filter::AccountsAll => matches!(message, Message::AccountMsg(_)),
Filter::Slot => matches!(message, Message::SlotMsg(_)),
Filter::BlockMeta => matches!(message, Message::BlockMetaMsg(_)),
Filter::Transaction(signature) => {
match message {
Message::TransactionMsg(transaction) => {
if signature == &Signature::default() {
// subscibe to all the signatures
true
} else {
// just check the first signature
transaction.signatures.iter().any(|x| x == signature)
}
// just check the first signature
transaction.signatures[0] == *signature
}
_ => false,
}
}
Filter::TransactionsAll => matches!(message, Message::TransactionMsg(_)),
}
}
}
@ -71,8 +70,7 @@ impl AccountFilter {
pub fn allows(&self, message: &Message) -> bool {
if let Message::AccountMsg(account) = message {
if let Some(owner) = self.owner {
// check if filter subscribes to all the accounts
if owner == Pubkey::default() || owner == account.owner {
if owner == account.owner {
// to do move the filtering somewhere else because here we need to decode the account data
// but cannot be avoided for now, this will lag the client is abusing this filter
// lagged clients will be dropped

View File

@ -11,6 +11,9 @@ pub struct Account {
pub slot_identifier: SlotIdentifier,
pub pubkey: Pubkey,
pub owner: Pubkey,
pub lamports: u64,
pub executable: bool,
pub rent_epoch: u64,
pub write_version: u64,
pub data: Vec<u8>,
pub compression_type: CompressionType,
@ -24,6 +27,9 @@ impl Account {
pubkey: Pubkey::new_unique(),
owner: Pubkey::new_unique(),
write_version: 0,
lamports: 12345,
rent_epoch: u64::MAX,
executable: false,
data: vec![178; data_size],
compression_type: CompressionType::None,
data_length: data_size as u64,
@ -37,19 +43,18 @@ impl Account {
slot_identifier: SlotIdentifier,
write_version: u64,
) -> Self {
let binary = bincode::serialize(&solana_account).expect("account should be serializable");
let data_length = solana_account.data.len() as u64;
let data = match compression_type {
CompressionType::None => binary,
CompressionType::None => solana_account.data,
CompressionType::Lz4Fast(speed) => lz4::block::compress(
&binary,
&solana_account.data,
Some(lz4::block::CompressionMode::FAST(speed as i32)),
true,
)
.expect("Compression should work"),
CompressionType::Lz4(compression) => lz4::block::compress(
&binary,
&solana_account.data,
Some(lz4::block::CompressionMode::HIGHCOMPRESSION(
compression as i32,
)),
@ -65,16 +70,31 @@ impl Account {
data,
compression_type,
data_length,
lamports: solana_account.lamports,
executable: solana_account.executable,
rent_epoch: solana_account.rent_epoch,
}
}
pub fn solana_account(&self) -> SolanaAccount {
match self.compression_type {
CompressionType::None => bincode::deserialize(&self.data).expect("Should deserialize"),
CompressionType::None => SolanaAccount {
lamports: self.lamports,
data: self.data.clone(),
owner: self.owner,
executable: self.executable,
rent_epoch: self.rent_epoch,
},
CompressionType::Lz4(_) | CompressionType::Lz4Fast(_) => {
let uncompressed =
let uncompressed_data =
lz4::block::decompress(&self.data, None).expect("should uncompress");
bincode::deserialize(&uncompressed).expect("Should deserialize")
SolanaAccount {
lamports: self.lamports,
data: uncompressed_data,
owner: self.owner,
executable: self.executable,
rent_epoch: self.rent_epoch,
}
}
}
}

View File

@ -7,13 +7,10 @@ use clap::Parser;
use cli::Args;
use futures::StreamExt;
use quic_geyser_client::client::Client;
use quic_geyser_common::{
filters::{AccountFilter, Filter},
types::connections_parameters::ConnectionParameters,
};
use quic_geyser_common::{filters::Filter, types::connections_parameters::ConnectionParameters};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature};
use tokio::pin;
use solana_sdk::commitment_config::CommitmentConfig;
use tokio::{pin, time::Instant};
pub mod cli;
@ -43,9 +40,16 @@ pub mod cli;
async fn main() {
let args = Args::parse();
println!("Connecting");
let client = Client::new(args.url, ConnectionParameters::default())
.await
.unwrap();
let client = Client::new(
args.url,
ConnectionParameters {
max_number_of_streams: 100_000,
streams_for_slot_data: 128,
streams_for_transactions: 10_000,
},
)
.await
.unwrap();
println!("Connected");
let bytes_transfered = Arc::new(AtomicU64::new(0));
@ -123,14 +127,10 @@ async fn main() {
println!("Subscribing");
client
.subscribe(vec![
Filter::Account(AccountFilter {
owner: Some(Pubkey::default()),
accounts: None,
filter: None,
}),
Filter::AccountsAll,
Filter::TransactionsAll,
Filter::Slot,
Filter::BlockMeta,
Filter::Transaction(Signature::default()),
])
.await
.unwrap();
@ -138,6 +138,7 @@ async fn main() {
let stream = client.create_stream();
pin!(stream);
let instant = Instant::now();
while let Some(message) = stream.next().await {
let message_size = bincode::serialize(&message).unwrap().len();
@ -146,9 +147,24 @@ async fn main() {
quic_geyser_common::message::Message::AccountMsg(account) => {
log::debug!("got account notification : {} ", account.pubkey);
account_notification.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let data_len = account.data_length as usize;
total_accounts_size
.fetch_add(account.data_length, std::sync::atomic::Ordering::Relaxed);
let _account = account.solana_account();
let solana_account = account.solana_account();
if solana_account.data.len() != data_len {
println!("data length different");
println!(
"Account : {}, owner: {}=={}, datalen: {}=={}, lamports: {}",
account.pubkey,
account.owner,
solana_account.owner,
data_len,
solana_account.data.len(),
solana_account.lamports
);
panic!("Wrong account data");
}
account_slot.store(
account.slot_identifier.slot,
std::sync::atomic::Ordering::Relaxed,
@ -179,4 +195,8 @@ async fn main() {
}
}
}
println!(
"Conection closed and streaming stopped in {} seconds",
instant.elapsed().as_secs()
);
}