From 2cef5ef87b86f4bfdef10f123fe1b9b699490a6c Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Fri, 3 May 2024 16:07:12 +0200 Subject: [PATCH] token account --- examples/stream_token_accounts.rs | 130 ++++++++---------------------- 1 file changed, 33 insertions(+), 97 deletions(-) diff --git a/examples/stream_token_accounts.rs b/examples/stream_token_accounts.rs index ab70157..4f71e87 100644 --- a/examples/stream_token_accounts.rs +++ b/examples/stream_token_accounts.rs @@ -6,6 +6,7 @@ use solana_sdk::commitment_config::CommitmentConfig; use std::env; use std::pin::pin; use std::str::FromStr; +use solana_account_decoder::parse_token::{parse_token, spl_token_ids, TokenAccountType}; use solana_sdk::pubkey::Pubkey; use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream; @@ -17,65 +18,12 @@ use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeUpdate}; use yellowstone_grpc_proto::prost::Message as _; -#[allow(dead_code)] -fn start_example_blockmini_consumer( - multiplex_stream: impl Stream + Send + 'static, -) { - tokio::spawn(async move { - let mut blockmeta_stream = pin!(multiplex_stream); - while let Some(mini) = blockmeta_stream.next().await { - info!( - "emitted block mini #{}@{} with {} bytes from multiplexer", - mini.slot, mini.commitment_config.commitment, mini.blocksize - ); - } - }); -} - -pub struct BlockMini { - pub blocksize: usize, - pub slot: Slot, - pub commitment_config: CommitmentConfig, -} - -struct BlockMiniExtractor(CommitmentConfig); - -impl FromYellowstoneExtractor for BlockMiniExtractor { - type Target = BlockMini; - fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)> { - match update.update_oneof { - Some(UpdateOneof::Block(update_block_message)) => { - let blocksize = update_block_message.encoded_len(); - let slot = update_block_message.slot; - let mini = BlockMini { - blocksize, - slot, - commitment_config: self.0, - }; - Some((slot, mini)) - } - Some(UpdateOneof::BlockMeta(update_blockmeta_message)) => { - let blocksize = update_blockmeta_message.encoded_len(); - let slot = update_blockmeta_message.slot; - let mini = BlockMini { - blocksize, - slot, - commitment_config: self.0, - }; - Some((slot, mini)) - } - _ => None, - } - } -} - #[tokio::main] pub async fn main() { // RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace tracing_subscriber::fmt::init(); // console_subscriber::init(); - let COMMITMENT_LEVEL = CommitmentConfig::processed(); let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green"); let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok(); @@ -98,30 +46,47 @@ pub async fn main() { let green_stream = create_geyser_reconnecting_stream( config.clone(), - // GeyserFilter(COMMITMENT_LEVEL).accounts(), token_accounts(), ); - let blue_stream = create_geyser_reconnecting_stream( - config.clone(), - GeyserFilter(COMMITMENT_LEVEL).blocks_and_txs(), - ); tokio::spawn(async move { - let token_pk = Pubkey::from_str("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA").unwrap(); let mut green_stream = pin!(green_stream); while let Some(message) = green_stream.next().await { match message { Message::GeyserSubscribeUpdate(subscriber_update) => { match subscriber_update.update_oneof { Some(UpdateOneof::Account(update)) => { - let account_info = update.account.unwrap(); - let account_pk = Pubkey::try_from(account_info.pubkey).unwrap(); + let account = update.account.unwrap(); + let account_pk = Pubkey::try_from(account.pubkey).unwrap(); - // if account_pk == token_pk { - info!("got account update (green)!!! {} - {:?} - {} bytes", - update.slot, account_pk, account_info.data.len()); - // } + let account_type = parse_token(&account.data, Some(6)).unwrap(); + match account_type { + TokenAccountType::Account(account_ui) => { + // UiTokenAccount { + // mint: "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", + // owner: "7XDMxfmzmL2Hqhh8ABp4Byc5UEsMWfQdWo6r5vEVQNb8", + // token_amount: UiTokenAmount { + // ui_amount: Some(0.0), decimals: 6, amount: "0", ui_amount_string: "0" + // }, + // delegate: None, + // state: Initialized, + // is_native: false, + // rent_exempt_reserve: None, + // delegated_amount: None, + // close_authority: None, + // extensions: [] + // } + info!("it's an Account: {:?}", account_ui); + } + TokenAccountType::Mint(mint) => { + // not interesting + } + TokenAccountType::Multisig(_) => {} + } + + info!("got account update (green)!!! {} - {:?} - {} bytes", + update.slot, account_pk, account.data.len()); let bytes: [u8; 32] = account_pk.to_bytes(); } @@ -136,52 +101,23 @@ pub async fn main() { warn!("Stream aborted"); }); - tokio::spawn(async move { - let mut blue_stream = pin!(blue_stream); - let extractor = BlockMiniExtractor(COMMITMENT_LEVEL); - while let Some(message) = blue_stream.next().await { - match message { - Message::GeyserSubscribeUpdate(subscriber_update) => { - let mapped = extractor.map_yellowstone_update(*subscriber_update); - if let Some((slot, block_mini)) = mapped { - info!("got update (blue)!!! block: {} - {} bytes", slot, block_mini.blocksize); - } - } - Message::Connecting(attempt) => { - warn!("Connection attempt: {}", attempt); - } - } - } - warn!("Stream aborted"); - }); - // "infinite" sleep sleep(Duration::from_secs(1800)).await; } -fn map_block_update(update: SubscribeUpdate) -> Option { - match update.update_oneof { - Some(UpdateOneof::Block(update_block_message)) => { - let slot = update_block_message.slot; - Some(slot) - } - _ => None, - } -} - pub fn token_accounts() -> SubscribeRequest { let mut accounts_subs = HashMap::new(); accounts_subs.insert( "client".to_string(), SubscribeRequestFilterAccounts { account: vec![], - owner: vec![ - "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA".to_string(), - ], + owner: + spl_token_ids().iter().map(|pubkey| pubkey.to_string()).collect(), filters: vec![], }, ); + SubscribeRequest { slots: HashMap::new(), accounts: accounts_subs,