From 39199b842a89edf2f7039472d69cc752142ad283 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Fri, 3 May 2024 14:52:29 +0200 Subject: [PATCH] stream token accounts --- examples/stream_token_accounts.rs | 197 ++++++++++++++++++++++++++++++ 1 file changed, 197 insertions(+) create mode 100644 examples/stream_token_accounts.rs diff --git a/examples/stream_token_accounts.rs b/examples/stream_token_accounts.rs new file mode 100644 index 0000000..ab70157 --- /dev/null +++ b/examples/stream_token_accounts.rs @@ -0,0 +1,197 @@ +use std::collections::HashMap; +use futures::{Stream, StreamExt}; +use log::info; +use solana_sdk::clock::Slot; +use solana_sdk::commitment_config::CommitmentConfig; +use std::env; +use std::pin::pin; +use std::str::FromStr; +use solana_sdk::pubkey::Pubkey; + +use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream; +use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor; +use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message}; +use tokio::time::{sleep, Duration}; +use tracing::warn; +use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; +use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeUpdate}; +use yellowstone_grpc_proto::prost::Message as _; + +#[allow(dead_code)] +fn start_example_blockmini_consumer( + multiplex_stream: impl Stream + Send + 'static, +) { + tokio::spawn(async move { + let mut blockmeta_stream = pin!(multiplex_stream); + while let Some(mini) = blockmeta_stream.next().await { + info!( + "emitted block mini #{}@{} with {} bytes from multiplexer", + mini.slot, mini.commitment_config.commitment, mini.blocksize + ); + } + }); +} + +pub struct BlockMini { + pub blocksize: usize, + pub slot: Slot, + pub commitment_config: CommitmentConfig, +} + +struct BlockMiniExtractor(CommitmentConfig); + +impl FromYellowstoneExtractor for BlockMiniExtractor { + type Target = BlockMini; + fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)> { + match update.update_oneof { + Some(UpdateOneof::Block(update_block_message)) => { + let blocksize = update_block_message.encoded_len(); + let slot = update_block_message.slot; + let mini = BlockMini { + blocksize, + slot, + commitment_config: self.0, + }; + Some((slot, mini)) + } + Some(UpdateOneof::BlockMeta(update_blockmeta_message)) => { + let blocksize = update_blockmeta_message.encoded_len(); + let slot = update_blockmeta_message.slot; + let mini = BlockMini { + blocksize, + slot, + commitment_config: self.0, + }; + Some((slot, mini)) + } + _ => None, + } + } +} + +#[tokio::main] +pub async fn main() { + // RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace + tracing_subscriber::fmt::init(); + // console_subscriber::init(); + + let COMMITMENT_LEVEL = CommitmentConfig::processed(); + let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green"); + let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok(); + + info!( + "Using grpc source on {} ({})", + grpc_addr_green, + grpc_x_token_green.is_some() + ); + + let timeouts = GrpcConnectionTimeouts { + connect_timeout: Duration::from_secs(25), + request_timeout: Duration::from_secs(25), + subscribe_timeout: Duration::from_secs(25), + receive_timeout: Duration::from_secs(25), + }; + + let config = GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone()); + + info!("Write Block stream.."); + + let green_stream = create_geyser_reconnecting_stream( + config.clone(), + // GeyserFilter(COMMITMENT_LEVEL).accounts(), + token_accounts(), + ); + + let blue_stream = create_geyser_reconnecting_stream( + config.clone(), + GeyserFilter(COMMITMENT_LEVEL).blocks_and_txs(), + ); + + tokio::spawn(async move { + let token_pk = Pubkey::from_str("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA").unwrap(); + let mut green_stream = pin!(green_stream); + while let Some(message) = green_stream.next().await { + match message { + Message::GeyserSubscribeUpdate(subscriber_update) => { + match subscriber_update.update_oneof { + Some(UpdateOneof::Account(update)) => { + let account_info = update.account.unwrap(); + let account_pk = Pubkey::try_from(account_info.pubkey).unwrap(); + + // if account_pk == token_pk { + info!("got account update (green)!!! {} - {:?} - {} bytes", + update.slot, account_pk, account_info.data.len()); + // } + let bytes: [u8; 32] = + account_pk.to_bytes(); + } + _ => {} + } + } + Message::Connecting(attempt) => { + warn!("Connection attempt: {}", attempt); + } + } + } + warn!("Stream aborted"); + }); + + tokio::spawn(async move { + let mut blue_stream = pin!(blue_stream); + let extractor = BlockMiniExtractor(COMMITMENT_LEVEL); + while let Some(message) = blue_stream.next().await { + match message { + Message::GeyserSubscribeUpdate(subscriber_update) => { + let mapped = extractor.map_yellowstone_update(*subscriber_update); + if let Some((slot, block_mini)) = mapped { + info!("got update (blue)!!! block: {} - {} bytes", slot, block_mini.blocksize); + } + } + Message::Connecting(attempt) => { + warn!("Connection attempt: {}", attempt); + } + } + } + warn!("Stream aborted"); + }); + + // "infinite" sleep + sleep(Duration::from_secs(1800)).await; +} + +fn map_block_update(update: SubscribeUpdate) -> Option { + match update.update_oneof { + Some(UpdateOneof::Block(update_block_message)) => { + let slot = update_block_message.slot; + Some(slot) + } + _ => None, + } +} + +pub fn token_accounts() -> SubscribeRequest { + let mut accounts_subs = HashMap::new(); + accounts_subs.insert( + "client".to_string(), + SubscribeRequestFilterAccounts { + account: vec![], + owner: vec![ + "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA".to_string(), + ], + filters: vec![], + }, + ); + + SubscribeRequest { + slots: HashMap::new(), + accounts: accounts_subs, + transactions: HashMap::new(), + entry: Default::default(), + blocks: Default::default(), + blocks_meta: HashMap::new(), + commitment: None, + accounts_data_slice: Default::default(), + ping: None, + } +} +