geyser-grpc-connector/examples/subscribe_accounts.rs

194 lines
6.2 KiB
Rust

//
// ```
// ssh eclipse-rpc -Nv
// ```
//
use futures::{Stream, StreamExt};
use itertools::Itertools;
use log::{debug, info};
use solana_account_decoder::parse_token::spl_token_ids;
use solana_sdk::clock::{Slot, UnixTimestamp};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::hash::{hash, Hash};
use solana_sdk::pubkey::Pubkey;
use std::cmp::min;
use std::collections::{HashMap, VecDeque};
use std::pin::pin;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use std::{env, iter};
use tokio::sync::mpsc::Receiver;
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use tokio::time::{sleep, Duration};
use tracing::{trace, warn};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{
SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocksMeta,
SubscribeRequestFilterSlots, SubscribeUpdate,
};
use yellowstone_grpc_proto::prost::Message as _;
type AtomicSlot = Arc<AtomicU64>;
#[tokio::main]
pub async fn main() {
// RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace
tracing_subscriber::fmt::init();
// console_subscriber::init();
let grpc_addr = env::var("GRPC_ADDR").expect("need grpc url");
let grpc_x_token = env::var("GRPC_X_TOKEN").ok();
info!(
"Using grpc source on {} ({})",
grpc_addr,
grpc_x_token.is_some()
);
let timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(25),
request_timeout: Duration::from_secs(25),
subscribe_timeout: Duration::from_secs(25),
receive_timeout: Duration::from_secs(25),
};
let config = GrpcSourceConfig::new(grpc_addr, grpc_x_token, None, timeouts.clone());
let (autoconnect_tx, geyser_messages_rx) = tokio::sync::mpsc::channel(10);
let (_exit_tx, exit_rx) = tokio::sync::broadcast::channel::<()>(1);
let _all_accounts = create_geyser_autoconnection_task_with_mpsc(
config.clone(),
all_accounts(),
autoconnect_tx.clone(),
exit_rx.resubscribe(),
);
let current_processed_slot = AtomicSlot::default();
start_tracking_account_consumer(geyser_messages_rx, current_processed_slot.clone());
// "infinite" sleep
sleep(Duration::from_secs(1800)).await;
}
// note: this keeps track of lot of data and might blow up memory
fn start_tracking_account_consumer(
mut geyser_messages_rx: Receiver<Message>,
current_processed_slot: Arc<AtomicU64>,
) {
tokio::spawn(async move {
loop {
match geyser_messages_rx.recv().await {
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
Some(UpdateOneof::Account(update)) => {
let started_at = Instant::now();
let now = SystemTime::now();
let account_info = update.account.unwrap();
let account_pk = Pubkey::try_from(account_info.pubkey).unwrap();
let account_owner_pk = Pubkey::try_from(account_info.owner).unwrap();
// note: slot is referencing the block that is just built while the slot number reported from BlockMeta/Slot uses the slot after the block is built
let slot = update.slot;
let account_receive_time = get_epoch_sec();
info!(
"Account update: slot: {}, account_pk: {}, account_owner_pk: {}, account_receive_time: {}",
slot, account_pk, account_owner_pk, account_receive_time
);
}
None => {}
_ => {}
},
None => {
log::warn!("multiplexer channel closed - aborting");
return;
}
Some(Message::Connecting(_)) => {}
}
}
});
}
fn get_epoch_sec() -> UnixTimestamp {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as UnixTimestamp
}
pub fn token_accounts() -> SubscribeRequest {
let mut accounts_subs = HashMap::new();
accounts_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec![],
// vec!["4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg".to_string()],
owner: spl_token_ids()
.iter()
.map(|pubkey| pubkey.to_string())
.collect(),
filters: vec![],
},
);
SubscribeRequest {
accounts: accounts_subs,
..Default::default()
}
}
pub fn all_accounts_and_blocksmeta() -> SubscribeRequest {
let mut accounts_subs = HashMap::new();
accounts_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec![],
owner: vec![],
filters: vec![],
},
);
let mut slots_subs = HashMap::new();
slots_subs.insert(
"client".to_string(),
SubscribeRequestFilterSlots {
filter_by_commitment: Some(true),
},
);
let mut blocks_meta_subs = HashMap::new();
blocks_meta_subs.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {});
SubscribeRequest {
slots: slots_subs,
accounts: accounts_subs,
blocks_meta: blocks_meta_subs,
..Default::default()
}
}
pub fn all_accounts() -> SubscribeRequest {
let mut accounts_subs = HashMap::new();
accounts_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec![],
owner: vec![],
filters: vec![],
},
);
SubscribeRequest {
accounts: accounts_subs,
..Default::default()
}
}