diff --git a/Cargo.lock b/Cargo.lock index df3c9b7..ec59b1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -842,6 +842,27 @@ dependencies = [ "subtle", ] +[[package]] +name = "csv" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" +dependencies = [ + "memchr", +] + [[package]] name = "ctr" version = "0.8.0" @@ -1242,6 +1263,7 @@ dependencies = [ "async-stream", "base64 0.21.5", "bincode", + "csv", "derive_more", "futures", "itertools 0.10.5", diff --git a/Cargo.toml b/Cargo.toml index f4a69f3..1d4ece6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,8 @@ derive_more = "0.99.17" base64 = "0.21.5" bincode = "1.3.3" +csv = "1.3.0" + [dev-dependencies] tracing-subscriber = "0.3.16" solana-logger = "1" diff --git a/examples/stream_blocks_single.rs b/examples/stream_blocks_single.rs index 47f2ff6..bff3b4e 100644 --- a/examples/stream_blocks_single.rs +++ b/examples/stream_blocks_single.rs @@ -4,6 +4,7 @@ use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig; use std::env; use std::pin::pin; +use solana_sdk::pubkey::Pubkey; use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream; use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor; @@ -13,6 +14,7 @@ use tracing::warn; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; use yellowstone_grpc_proto::prost::Message as _; +use csv::Writer; #[allow(dead_code)] fn start_example_blockmini_consumer( @@ -94,7 +96,7 @@ pub async fn main() { let green_stream = create_geyser_reconnecting_stream( config.clone(), - GeyserFilter(CommitmentConfig::finalized()).blocks_and_txs(), + GeyserFilter(CommitmentConfig::processed()).accounts(), ); let blue_stream = create_geyser_reconnecting_stream( @@ -103,13 +105,23 @@ pub async fn main() { ); tokio::spawn(async move { + let mut wtr = csv::Writer::from_path("accounts-mainnet.csv").unwrap(); + let mut green_stream = pin!(green_stream); while let Some(message) = green_stream.next().await { match message { Message::GeyserSubscribeUpdate(subscriber_update) => { - let mapped = map_block_update(*subscriber_update); - if let Some(slot) = mapped { - info!("got update (green)!!! slot: {}", slot); + match subscriber_update.update_oneof { + Some(UpdateOneof::Account(update)) => { + info!("got update (green)!!! slot: {}", update.slot); + let key = update.account.unwrap().pubkey; + let bytes: [u8; 32] = + key.try_into().unwrap_or(Pubkey::default().to_bytes()); + let pubkey = Pubkey::new_from_array(bytes); + wtr.write_record(&[pubkey.to_string()]).unwrap(); + wtr.flush().unwrap(); + } + _ => {} } } Message::Connecting(attempt) => { diff --git a/src/lib.rs b/src/lib.rs index a5c1690..64edd90 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,10 +2,7 @@ use solana_sdk::commitment_config::CommitmentConfig; use std::collections::HashMap; use std::fmt::{Debug, Display}; use std::time::Duration; -use yellowstone_grpc_proto::geyser::{ - CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, - SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, SubscribeUpdate, -}; +use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, SubscribeUpdate}; use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; pub mod channel_plugger; @@ -150,6 +147,30 @@ impl GeyserFilter { ping: None, } } + + pub fn accounts(&self) -> SubscribeRequest { + let mut accounts_subs = HashMap::new(); + accounts_subs.insert( + "client".to_string(), + SubscribeRequestFilterAccounts { + account: vec![], + owner: vec![], + filters: vec![], + }, + ); + + SubscribeRequest { + slots: HashMap::new(), + accounts: accounts_subs, + transactions: HashMap::new(), + entry: Default::default(), + blocks: Default::default(), + blocks_meta: HashMap::new(), + commitment: Some(map_commitment_level(self.0) as i32), + accounts_data_slice: Default::default(), + ping: None, + } + } } fn map_commitment_level(commitment_config: CommitmentConfig) -> CommitmentLevel {