diff --git a/examples/dump_txs_stream_samples.rs b/examples/dump_txs_stream_samples.rs new file mode 100644 index 0000000..0dbab71 --- /dev/null +++ b/examples/dump_txs_stream_samples.rs @@ -0,0 +1,116 @@ +use log::{info, warn}; +use solana_sdk::clock::Slot; +use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; +use std::collections::HashMap; +use std::env; +use std::str::FromStr; +use std::time::SystemTime; + +use base64::Engine; +use csv::ReaderBuilder; +use itertools::Itertools; +use solana_sdk::borsh0_10::try_from_slice_unchecked; +/// This file mocks the core model of the RPC server. +use solana_sdk::compute_budget; +use solana_sdk::compute_budget::ComputeBudgetInstruction; +use solana_sdk::hash::Hash; +use solana_sdk::instruction::CompiledInstruction; +use solana_sdk::message::v0::MessageAddressTableLookup; +use solana_sdk::message::{v0, MessageHeader, VersionedMessage}; +use solana_sdk::pubkey::Pubkey; + +use solana_sdk::signature::Signature; +use solana_sdk::transaction::TransactionError; +use tokio::sync::broadcast; +use tokio::sync::mpsc::Receiver; +use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, SubscribeUpdateSlot}; + +use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc; +use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor; +use geyser_grpc_connector::{ + map_commitment_level, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message, +}; +use tokio::time::{sleep, Duration}; +use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; +use yellowstone_grpc_proto::geyser::SubscribeUpdate; +use yellowstone_grpc_proto::prelude::SubscribeRequestFilterAccounts; + + + +#[tokio::main] +pub async fn main() { + tracing_subscriber::fmt::init(); + + let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green"); + let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok(); + + let (_foo, exit_notify) = broadcast::channel(1); + + info!( + "Using gRPC source {} ({})", + grpc_addr_green, + grpc_x_token_green.is_some() + ); + + let timeouts = GrpcConnectionTimeouts { + connect_timeout: Duration::from_secs(5), + request_timeout: Duration::from_secs(5), + subscribe_timeout: Duration::from_secs(5), + receive_timeout: Duration::from_secs(5), + }; + + let green_config = + GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone()); + + + let (autoconnect_tx, mut transactions_rx) = tokio::sync::mpsc::channel(10); + let _tx_source_ah = create_geyser_autoconnection_task_with_mpsc( + green_config.clone(), + jupyter_trades(), + autoconnect_tx.clone(), + exit_notify, + ); + + loop { + let message = transactions_rx.recv().await; + match message { + Some(Message::GeyserSubscribeUpdate(update)) => + match update.update_oneof { + Some(UpdateOneof::Transaction(update)) => { + let tx = update.transaction.unwrap(); + let sig = Signature::try_from(tx.signature.as_slice()).unwrap(); + info!("tx {}", sig); + } + _ => {} // FIXME + }, + _ => {} // FIXME + } + } + + +} + + +fn jupyter_trades() -> SubscribeRequest { + let mut transaction_subs = HashMap::new(); + transaction_subs.insert("client".to_string(), + SubscribeRequestFilterTransactions { + vote: Some(false), + failed: Some(false), + signature: None, + account_include: vec![ + "JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4".to_string(), + ], + account_exclude: vec![], + account_required: vec![], + }); + + SubscribeRequest { + transactions: transaction_subs, + ping: None, + commitment: Some(map_commitment_level(CommitmentConfig { + commitment: CommitmentLevel::Confirmed, + }) as i32), + ..Default::default() + } +} \ No newline at end of file diff --git a/examples/stream_token_accounts.rs b/examples/stream_token_accounts.rs index 3b7ef2c..db63338 100644 --- a/examples/stream_token_accounts.rs +++ b/examples/stream_token_accounts.rs @@ -2,7 +2,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use futures::{Stream, StreamExt}; use log::{debug, info, trace}; -use solana_sdk::clock::Slot; +use solana_sdk::clock::{Clock, Slot}; use solana_sdk::commitment_config::CommitmentConfig; use std::env; use std::pin::pin; @@ -13,6 +13,7 @@ use dashmap::DashMap; use solana_account_decoder::parse_token::{parse_token, spl_token_ids, TokenAccountType, UiTokenAccount}; use solana_account_decoder::parse_token::UiAccountState::Initialized; use solana_sdk::pubkey::Pubkey; +use solana_sdk::sysvar::clock; use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream; use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor; @@ -100,6 +101,14 @@ pub async fn main() { let account_pk = Pubkey::try_from(account.pubkey).unwrap(); let size = account.data.len() as u64; + info!("got account update: {} - {:?} - {} bytes", + update.slot, account_pk, account.data.len()); + + if clock::id() == account_pk { + let clock: Clock = bincode::deserialize(&account.data).unwrap(); + info!("clock: {:#?}", clock); + } + info!("got account write: {}", account.write_version); match account_write_first_timestamp.entry(account.write_version) { Entry::Occupied(o) => { @@ -232,10 +241,10 @@ pub fn token_accounts() -> SubscribeRequest { accounts_subs.insert( "client".to_string(), SubscribeRequestFilterAccounts { - account: vec![], - // vec!["4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg".to_string()], - owner: - spl_token_ids().iter().map(|pubkey| pubkey.to_string()).collect(), + // account: vec!["4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg".to_string()], + account: vec![clock::id().to_string()], + owner: vec![], + // spl_token_ids().iter().map(|pubkey| pubkey.to_string()).collect(), filters: vec![], }, ); @@ -266,8 +275,8 @@ pub fn token_accounts_finalized() -> SubscribeRequest { accounts_subs.insert( "client".to_string(), SubscribeRequestFilterAccounts { - account: vec![], - // vec!["4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg".to_string()], + account: + vec!["4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg".to_string()], owner: spl_token_ids().iter().map(|pubkey| pubkey.to_string()).collect(), filters: vec![], diff --git a/examples/stream_vote_transactions.rs b/examples/stream_vote_transactions.rs index 462ac4a..8fd37c3 100644 --- a/examples/stream_vote_transactions.rs +++ b/examples/stream_vote_transactions.rs @@ -144,4 +144,3 @@ pub fn transaction_filter() -> SubscribeRequest { ping: None, } } - diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index 991a1b9..494be04 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -231,6 +231,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( ConnectionState::FatalError(_attempt, reason) => match reason { FatalErrorReason::DownstreamChannelClosed => { warn!("downstream closed - aborting"); + // TODO break 'main_loop instead of returning return; } FatalErrorReason::ConfigurationError => {