Compare commits
9 Commits
88f8d71ba8
...
32aeb332b1
Author | SHA1 | Date |
---|---|---|
GroovieGermanikus | 32aeb332b1 | |
GroovieGermanikus | 6ffc8d0f0d | |
GroovieGermanikus | 61b202af9f | |
GroovieGermanikus | 430847b2c1 | |
GroovieGermanikus | 86412cfd71 | |
GroovieGermanikus | 78b8197668 | |
GroovieGermanikus | bd439a3390 | |
GroovieGermanikus | 193a1b6878 | |
GroovieGermanikus | 1d1a01c8ae |
|
@ -1378,6 +1378,7 @@ dependencies = [
|
||||||
"solana-logger",
|
"solana-logger",
|
||||||
"solana-sdk",
|
"solana-sdk",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"tonic",
|
||||||
"tonic-health",
|
"tonic-health",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
|
@ -3759,6 +3760,7 @@ dependencies = [
|
||||||
"axum",
|
"axum",
|
||||||
"base64 0.21.7",
|
"base64 0.21.7",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
"flate2",
|
||||||
"h2",
|
"h2",
|
||||||
"http",
|
"http",
|
||||||
"http-body",
|
"http-body",
|
||||||
|
|
|
@ -33,6 +33,7 @@ csv = "1.3.0"
|
||||||
|
|
||||||
dashmap = "5.5.3"
|
dashmap = "5.5.3"
|
||||||
|
|
||||||
|
tonic = { version="0.10.2", features=["gzip"] }
|
||||||
tonic-health = "0.10.2"
|
tonic-health = "0.10.2"
|
||||||
regex = "1.10.4"
|
regex = "1.10.4"
|
||||||
clap = { version = "4.2", features = ["derive"] }
|
clap = { version = "4.2", features = ["derive"] }
|
||||||
|
|
|
@ -1,25 +1,30 @@
|
||||||
use std::collections::{HashMap, VecDeque};
|
use std::collections::{HashMap, VecDeque};
|
||||||
use futures::{Stream, StreamExt};
|
use futures::{Stream, StreamExt};
|
||||||
use log::info;
|
use log::{debug, info};
|
||||||
use solana_sdk::clock::{Slot, UnixTimestamp};
|
use solana_sdk::clock::{Slot, UnixTimestamp};
|
||||||
use solana_sdk::commitment_config::CommitmentConfig;
|
use solana_sdk::commitment_config::CommitmentConfig;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::pin::pin;
|
use std::pin::pin;
|
||||||
use std::time::{SystemTime, UNIX_EPOCH};
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
|
use std::time::{Instant, SystemTime, UNIX_EPOCH};
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
|
use solana_account_decoder::parse_token::spl_token_ids;
|
||||||
use tokio::sync::mpsc::Receiver;
|
use tokio::sync::mpsc::Receiver;
|
||||||
|
|
||||||
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
|
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
|
||||||
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
|
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
|
||||||
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
|
use geyser_grpc_connector::grpcmultiplex_fastestwins::{create_multiplexed_stream, FromYellowstoneExtractor};
|
||||||
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, histogram_percentiles, Message};
|
use geyser_grpc_connector::{AtomicSlot, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, histogram_percentiles, Message};
|
||||||
use tokio::time::{sleep, Duration};
|
use tokio::time::{sleep, Duration};
|
||||||
use tracing::warn;
|
use tracing::{trace, warn};
|
||||||
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
|
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
|
||||||
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
|
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, SubscribeUpdate};
|
||||||
use yellowstone_grpc_proto::prost::Message as _;
|
use yellowstone_grpc_proto::prost::Message as _;
|
||||||
|
|
||||||
|
mod debouncer;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
pub async fn main() {
|
pub async fn main() {
|
||||||
// RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace
|
// RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace
|
||||||
|
@ -49,30 +54,115 @@ pub async fn main() {
|
||||||
let (autoconnect_tx, geyser_messages_rx) = tokio::sync::mpsc::channel(10);
|
let (autoconnect_tx, geyser_messages_rx) = tokio::sync::mpsc::channel(10);
|
||||||
let (_exit, exit_notify) = tokio::sync::broadcast::channel(1);
|
let (_exit, exit_notify) = tokio::sync::broadcast::channel(1);
|
||||||
|
|
||||||
let _accounts_task = create_geyser_autoconnection_task_with_mpsc(
|
// let _accounts_task = create_geyser_autoconnection_task_with_mpsc(
|
||||||
|
// config.clone(),
|
||||||
|
// GeyserFilter(CommitmentConfig::processed()).accounts(),
|
||||||
|
// autoconnect_tx.clone(),
|
||||||
|
// exit_notify.resubscribe(),
|
||||||
|
// );
|
||||||
|
//
|
||||||
|
// let _blocksmeta_task = create_geyser_autoconnection_task_with_mpsc(
|
||||||
|
// config.clone(),
|
||||||
|
// GeyserFilter(CommitmentConfig::processed()).blocks_meta(),
|
||||||
|
// autoconnect_tx.clone(),
|
||||||
|
// exit_notify.resubscribe(),
|
||||||
|
// );
|
||||||
|
|
||||||
|
// let _all_accounts = create_geyser_autoconnection_task_with_mpsc(
|
||||||
|
// config.clone(),
|
||||||
|
// all_accounts(),
|
||||||
|
// autoconnect_tx.clone(),
|
||||||
|
// exit_notify.resubscribe(),
|
||||||
|
// );
|
||||||
|
|
||||||
|
let _token_accounts_task = create_geyser_autoconnection_task_with_mpsc(
|
||||||
config.clone(),
|
config.clone(),
|
||||||
GeyserFilter(CommitmentConfig::processed()).accounts(),
|
token_accounts(),
|
||||||
autoconnect_tx.clone(),
|
autoconnect_tx.clone(),
|
||||||
exit_notify.resubscribe(),
|
exit_notify.resubscribe(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let _blocksmeta_task = create_geyser_autoconnection_task_with_mpsc(
|
let current_processed_slot = AtomicSlot::default();
|
||||||
config.clone(),
|
start_tracking_slots(current_processed_slot.clone());
|
||||||
GeyserFilter(CommitmentConfig::processed()).blocks_meta(),
|
start_tracking_account_consumer(geyser_messages_rx, current_processed_slot.clone());
|
||||||
autoconnect_tx.clone(),
|
|
||||||
exit_notify.resubscribe(),
|
|
||||||
);
|
|
||||||
|
|
||||||
start_tracking_account_consumer(geyser_messages_rx);
|
|
||||||
|
|
||||||
// "infinite" sleep
|
// "infinite" sleep
|
||||||
sleep(Duration::from_secs(1800)).await;
|
sleep(Duration::from_secs(1800)).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// note: this keeps track of lot of data and might blow up memory
|
|
||||||
fn start_tracking_account_consumer(mut geyser_messages_rx: Receiver<Message>) {
|
|
||||||
const RECENT_SLOTS_LIMIT: usize = 30;
|
|
||||||
|
|
||||||
|
|
||||||
|
// note processed might return a slot that night end up on a fork
|
||||||
|
fn start_tracking_slots(current_processed_slot: AtomicSlot) {
|
||||||
|
|
||||||
|
let grpc_slot_source1 = env::var("GRPC_SLOT1_ADDR").expect("need grpc url for green");
|
||||||
|
let grpc_slot_source2 = env::var("GRPC_SLOT2_ADDR").expect("need grpc url for green");
|
||||||
|
|
||||||
|
info!("Using grpc sources for slot: {}, {}",
|
||||||
|
grpc_slot_source1, grpc_slot_source2
|
||||||
|
);
|
||||||
|
|
||||||
|
let timeouts = GrpcConnectionTimeouts {
|
||||||
|
connect_timeout: Duration::from_secs(5),
|
||||||
|
request_timeout: Duration::from_secs(5),
|
||||||
|
subscribe_timeout: Duration::from_secs(5),
|
||||||
|
receive_timeout: Duration::from_secs(5),
|
||||||
|
};
|
||||||
|
|
||||||
|
let config1 = GrpcSourceConfig::new(grpc_slot_source1, None, None, timeouts.clone());
|
||||||
|
let config2 = GrpcSourceConfig::new(grpc_slot_source2, None, None, timeouts.clone());
|
||||||
|
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
debug!("start tracking slots..");
|
||||||
|
|
||||||
|
let (multiplex_tx, mut multiplex_rx) = tokio::sync::mpsc::channel(10);
|
||||||
|
// TODO expose
|
||||||
|
let (_exit, exit_notify) = tokio::sync::broadcast::channel(1);
|
||||||
|
|
||||||
|
let _blocksmeta_task1 = create_geyser_autoconnection_task_with_mpsc(
|
||||||
|
config1.clone(),
|
||||||
|
GeyserFilter(CommitmentConfig::processed()).slots(),
|
||||||
|
multiplex_tx.clone(),
|
||||||
|
exit_notify.resubscribe(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let _blocksmeta_task2 = create_geyser_autoconnection_task_with_mpsc(
|
||||||
|
config2.clone(),
|
||||||
|
GeyserFilter(CommitmentConfig::processed()).slots(),
|
||||||
|
multiplex_tx.clone(),
|
||||||
|
exit_notify.resubscribe(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut tip: Slot = 0;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match multiplex_rx.recv().await {
|
||||||
|
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
|
||||||
|
Some(UpdateOneof::Slot(update)) => {
|
||||||
|
let slot = update.slot;
|
||||||
|
if slot > tip {
|
||||||
|
tip = slot;
|
||||||
|
current_processed_slot.store(slot, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => {}
|
||||||
|
_ => {}
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
log::warn!("multiplexer channel closed - aborting");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Some(Message::Connecting(_)) => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// note: this keeps track of lot of data and might blow up memory
|
||||||
|
fn start_tracking_account_consumer(mut geyser_messages_rx: Receiver<Message>, current_processed_slot: Arc<AtomicU64>) {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
|
||||||
let mut bytes_per_slot = HashMap::<Slot, usize>::new();
|
let mut bytes_per_slot = HashMap::<Slot, usize>::new();
|
||||||
|
@ -80,19 +170,19 @@ fn start_tracking_account_consumer(mut geyser_messages_rx: Receiver<Message>) {
|
||||||
let mut wallclock_updates_per_slot_account = HashMap::<(Slot, Pubkey), Vec<SystemTime>>::new();
|
let mut wallclock_updates_per_slot_account = HashMap::<(Slot, Pubkey), Vec<SystemTime>>::new();
|
||||||
// slot written by account update
|
// slot written by account update
|
||||||
let mut current_slot: Slot = 0;
|
let mut current_slot: Slot = 0;
|
||||||
// slot from slot stream
|
|
||||||
let mut slot_just_completed: Slot = 0;
|
|
||||||
|
|
||||||
// seconds since epoch
|
// seconds since epoch
|
||||||
let mut block_time_per_slot = HashMap::<Slot, UnixTimestamp>::new();
|
let mut block_time_per_slot = HashMap::<Slot, UnixTimestamp>::new();
|
||||||
// wall clock time of block completion (i.e. processed) reported by the block meta stream
|
// wall clock time of block completion (i.e. processed) reported by the block meta stream
|
||||||
let mut block_completion_notification_time_per_slot = HashMap::<Slot, SystemTime>::new();
|
let mut block_completion_notification_time_per_slot = HashMap::<Slot, SystemTime>::new();
|
||||||
let mut recent_slot_deltas: VecDeque<i64> = VecDeque::with_capacity(RECENT_SLOTS_LIMIT);
|
|
||||||
|
let debouncer = debouncer::Debouncer::new(Duration::from_millis(5));
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match geyser_messages_rx.recv().await {
|
match geyser_messages_rx.recv().await {
|
||||||
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
|
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
|
||||||
Some(UpdateOneof::Account(update)) => {
|
Some(UpdateOneof::Account(update)) => {
|
||||||
|
let started_at = Instant::now();
|
||||||
let now = SystemTime::now();
|
let now = SystemTime::now();
|
||||||
let account_info = update.account.unwrap();
|
let account_info = update.account.unwrap();
|
||||||
let account_pk = Pubkey::try_from(account_info.pubkey).unwrap();
|
let account_pk = Pubkey::try_from(account_info.pubkey).unwrap();
|
||||||
|
@ -100,16 +190,21 @@ fn start_tracking_account_consumer(mut geyser_messages_rx: Receiver<Message>) {
|
||||||
let slot = update.slot;
|
let slot = update.slot;
|
||||||
let account_receive_time = get_epoch_sec();
|
let account_receive_time = get_epoch_sec();
|
||||||
|
|
||||||
if slot_just_completed != slot {
|
let latest_slot = current_processed_slot.load(Ordering::Relaxed);
|
||||||
if slot_just_completed != 0 {
|
|
||||||
// the perfect is value "-1"
|
if latest_slot != 0 {
|
||||||
recent_slot_deltas.push_back((slot_just_completed as i64) - (slot as i64));
|
// the perfect is value "-1"
|
||||||
if recent_slot_deltas.len() > RECENT_SLOTS_LIMIT {
|
let delta = (latest_slot as i64) - (slot as i64);
|
||||||
recent_slot_deltas.pop_front();
|
if debouncer.can_fire() {
|
||||||
}
|
debug!("Account info for upcoming slot {} was {} behind current processed slot", slot, delta);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if account_info.data.len() > 1000 {
|
||||||
|
// trace!("got account update!!! {} - {:?} - {} bytes",
|
||||||
|
// slot, account_pk, account_info.data.len());
|
||||||
|
// }
|
||||||
|
|
||||||
bytes_per_slot.entry(slot)
|
bytes_per_slot.entry(slot)
|
||||||
.and_modify(|entry| *entry += account_info.data.len())
|
.and_modify(|entry| *entry += account_info.data.len())
|
||||||
.or_insert(account_info.data.len());
|
.or_insert(account_info.data.len());
|
||||||
|
@ -120,56 +215,38 @@ fn start_tracking_account_consumer(mut geyser_messages_rx: Receiver<Message>) {
|
||||||
.and_modify(|entry| entry.push(now))
|
.and_modify(|entry| entry.push(now))
|
||||||
.or_insert(vec![now]);
|
.or_insert(vec![now]);
|
||||||
|
|
||||||
if current_slot != slot {
|
if current_slot != slot && current_slot != 0 {
|
||||||
info!("Slot: {}", slot);
|
info!("New Slot: {}", slot);
|
||||||
if current_slot != 0 {
|
info!("Slot: {} - account data transferred: {:.2} MiB", slot, *bytes_per_slot.get(¤t_slot).unwrap() as f64 / 1024.0 / 1024.0 );
|
||||||
info!("Slot: {} - account data transferred: {:.2} MiB", slot, *bytes_per_slot.get(¤t_slot).unwrap() as f64 / 1024.0 / 1024.0 );
|
|
||||||
|
|
||||||
info!("Slot: {} - num of update messages: {}", slot, updates_per_slot.get(¤t_slot).unwrap());
|
info!("Slot: {} - num of update messages: {}", slot, updates_per_slot.get(¤t_slot).unwrap());
|
||||||
|
|
||||||
let counters = wallclock_updates_per_slot_account.iter()
|
|
||||||
.filter(|((slot, _pubkey), _)| slot == ¤t_slot)
|
|
||||||
.map(|((_slot, _pubkey), updates)| updates.len() as f64)
|
|
||||||
.sorted_by(|a, b| a.partial_cmp(b).unwrap())
|
|
||||||
.collect_vec();
|
|
||||||
let count_histogram = histogram_percentiles::calculate_percentiles(&counters);
|
|
||||||
info!("Count histogram: {}", count_histogram);
|
|
||||||
|
|
||||||
let deltas = recent_slot_deltas.iter()
|
|
||||||
.map(|x| *x as f64)
|
|
||||||
.sorted_by(|a, b| a.partial_cmp(b).unwrap())
|
|
||||||
.collect_vec();
|
|
||||||
let deltas_histogram = histogram_percentiles::calculate_percentiles(&deltas);
|
|
||||||
info!("Deltas slots list: {:?}", recent_slot_deltas);
|
|
||||||
info!("Deltas histogram: {}", deltas_histogram);
|
|
||||||
|
|
||||||
if let Some(actual_block_time) = block_time_per_slot.get(¤t_slot) {
|
|
||||||
info!("Block time for slot {}: delta {} seconds", current_slot, account_receive_time - *actual_block_time);
|
|
||||||
}
|
|
||||||
|
|
||||||
let wallclock_minmax = wallclock_updates_per_slot_account.iter()
|
|
||||||
.filter(|((slot, _pubkey), _)| slot == ¤t_slot)
|
|
||||||
.flat_map(|((_slot, _pubkey), updates)| updates)
|
|
||||||
.minmax();
|
|
||||||
if let Some((min, max)) = wallclock_minmax.into_option() {
|
|
||||||
info!("Wallclock timestamp between first and last account update received for slot {}: {:.2}s",
|
|
||||||
current_slot,
|
|
||||||
max.duration_since(*min).unwrap().as_secs_f64()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
let counters = wallclock_updates_per_slot_account.iter()
|
||||||
|
.filter(|((slot, _pubkey), _)| slot == ¤t_slot)
|
||||||
|
.map(|((_slot, _pubkey), updates)| updates.len() as f64)
|
||||||
|
.sorted_by(|a, b| a.partial_cmp(b).unwrap())
|
||||||
|
.collect_vec();
|
||||||
|
let count_histogram = histogram_percentiles::calculate_percentiles(&counters);
|
||||||
|
info!("Count histogram: {}", count_histogram);
|
||||||
|
|
||||||
|
if let Some(actual_block_time) = block_time_per_slot.get(¤t_slot) {
|
||||||
|
info!("Block time for slot {}: delta {} seconds", current_slot, account_receive_time - *actual_block_time);
|
||||||
}
|
}
|
||||||
current_slot = slot;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
let wallclock_minmax = wallclock_updates_per_slot_account.iter()
|
||||||
Some(UpdateOneof::BlockMeta(update)) => {
|
.filter(|((slot, _pubkey), _)| slot == ¤t_slot)
|
||||||
let now = SystemTime::now();
|
.flat_map(|((_slot, _pubkey), updates)| updates)
|
||||||
// completed depends on commitment level which is processed ATM
|
.minmax();
|
||||||
slot_just_completed = update.slot;
|
if let Some((min, max)) = wallclock_minmax.into_option() {
|
||||||
block_time_per_slot.insert(slot_just_completed, update.block_time.unwrap().timestamp);
|
info!("Wallclock timestamp between first and last account update received for slot {}: {:.2}s",
|
||||||
block_completion_notification_time_per_slot.insert(slot_just_completed, now);
|
current_slot,
|
||||||
|
max.duration_since(*min).unwrap().as_secs_f64()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
} // -- slot changed
|
||||||
|
current_slot = slot;
|
||||||
|
|
||||||
}
|
}
|
||||||
None => {}
|
None => {}
|
||||||
_ => {}
|
_ => {}
|
||||||
|
@ -190,3 +267,74 @@ fn get_epoch_sec() -> UnixTimestamp {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.as_secs() as UnixTimestamp
|
.as_secs() as UnixTimestamp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub fn token_accounts() -> SubscribeRequest {
|
||||||
|
let mut accounts_subs = HashMap::new();
|
||||||
|
accounts_subs.insert(
|
||||||
|
"client".to_string(),
|
||||||
|
SubscribeRequestFilterAccounts {
|
||||||
|
account: vec![],
|
||||||
|
// vec!["4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg".to_string()],
|
||||||
|
owner:
|
||||||
|
spl_token_ids().iter().map(|pubkey| pubkey.to_string()).collect(),
|
||||||
|
filters: vec![],
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
SubscribeRequest {
|
||||||
|
accounts: accounts_subs,
|
||||||
|
..Default::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub fn all_accounts_and_blocksmeta() -> SubscribeRequest {
|
||||||
|
let mut accounts_subs = HashMap::new();
|
||||||
|
accounts_subs.insert(
|
||||||
|
"client".to_string(),
|
||||||
|
SubscribeRequestFilterAccounts {
|
||||||
|
account: vec![],
|
||||||
|
owner: vec![],
|
||||||
|
filters: vec![],
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
let mut slots_subs = HashMap::new();
|
||||||
|
slots_subs.insert("client".to_string(), SubscribeRequestFilterSlots {
|
||||||
|
filter_by_commitment: Some(true),
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut blocks_meta_subs = HashMap::new();
|
||||||
|
blocks_meta_subs.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {});
|
||||||
|
|
||||||
|
SubscribeRequest {
|
||||||
|
slots: slots_subs,
|
||||||
|
accounts: accounts_subs,
|
||||||
|
blocks_meta: blocks_meta_subs,
|
||||||
|
..Default::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub fn all_accounts() -> SubscribeRequest {
|
||||||
|
let mut accounts_subs = HashMap::new();
|
||||||
|
accounts_subs.insert(
|
||||||
|
"client".to_string(),
|
||||||
|
SubscribeRequestFilterAccounts {
|
||||||
|
account: vec![],
|
||||||
|
owner: vec![],
|
||||||
|
filters: vec![],
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
SubscribeRequest {
|
||||||
|
accounts: accounts_subs,
|
||||||
|
..Default::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
use std::sync::atomic::{AtomicI64, Ordering};
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Debouncer {
|
||||||
|
started_at: Instant,
|
||||||
|
cooldown_ms: i64,
|
||||||
|
last: AtomicI64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Debouncer {
|
||||||
|
pub fn new(cooldown: Duration) -> Self {
|
||||||
|
Self {
|
||||||
|
started_at: Instant::now(),
|
||||||
|
cooldown_ms: cooldown.as_millis() as i64,
|
||||||
|
last: AtomicI64::new(0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn can_fire(&self) -> bool {
|
||||||
|
let passed_total_ms = self.started_at.elapsed().as_millis() as i64;
|
||||||
|
|
||||||
|
let results = self.last.fetch_update(Ordering::SeqCst, Ordering::SeqCst,
|
||||||
|
|last| {
|
||||||
|
|
||||||
|
if passed_total_ms - last > self.cooldown_ms {
|
||||||
|
Some(passed_total_ms)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
results.is_ok()
|
||||||
|
}
|
||||||
|
}
|
|
@ -14,7 +14,7 @@ use solana_sdk::compute_budget::ComputeBudgetInstruction;
|
||||||
use solana_sdk::hash::Hash;
|
use solana_sdk::hash::Hash;
|
||||||
use solana_sdk::instruction::CompiledInstruction;
|
use solana_sdk::instruction::CompiledInstruction;
|
||||||
use solana_sdk::message::v0::MessageAddressTableLookup;
|
use solana_sdk::message::v0::MessageAddressTableLookup;
|
||||||
use solana_sdk::message::{v0, MessageHeader, VersionedMessage};
|
use solana_sdk::message::{MessageHeader, v0, VersionedMessage};
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
|
|
||||||
use solana_sdk::signature::Signature;
|
use solana_sdk::signature::Signature;
|
||||||
|
@ -26,10 +26,12 @@ use geyser_grpc_connector::grpcmultiplex_fastestwins::{
|
||||||
create_multiplexed_stream, FromYellowstoneExtractor,
|
create_multiplexed_stream, FromYellowstoneExtractor,
|
||||||
};
|
};
|
||||||
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig};
|
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig};
|
||||||
use tokio::time::{sleep, Duration};
|
use tokio::time::{Duration, sleep};
|
||||||
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
|
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
|
||||||
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
|
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
|
||||||
|
|
||||||
|
pub mod debouncer;
|
||||||
|
|
||||||
fn start_example_block_consumer(
|
fn start_example_block_consumer(
|
||||||
multiplex_stream: impl Stream<Item = ProducedBlock> + Send + 'static,
|
multiplex_stream: impl Stream<Item = ProducedBlock> + Send + 'static,
|
||||||
) {
|
) {
|
||||||
|
|
36
src/lib.rs
36
src/lib.rs
|
@ -1,6 +1,8 @@
|
||||||
use solana_sdk::commitment_config::CommitmentConfig;
|
use solana_sdk::commitment_config::CommitmentConfig;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fmt::{Debug, Display};
|
use std::fmt::{Debug, Display};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::AtomicU64;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, SubscribeUpdate};
|
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, SubscribeUpdate};
|
||||||
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
|
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
|
||||||
|
@ -15,6 +17,8 @@ mod obfuscate;
|
||||||
pub mod histogram_percentiles;
|
pub mod histogram_percentiles;
|
||||||
pub mod yellowstone_grpc_util;
|
pub mod yellowstone_grpc_util;
|
||||||
|
|
||||||
|
pub type AtomicSlot = Arc<AtomicU64>;
|
||||||
|
|
||||||
// 1-based attempt counter
|
// 1-based attempt counter
|
||||||
type Attempt = u32;
|
type Attempt = u32;
|
||||||
|
|
||||||
|
@ -101,15 +105,9 @@ impl GeyserFilter {
|
||||||
);
|
);
|
||||||
|
|
||||||
SubscribeRequest {
|
SubscribeRequest {
|
||||||
slots: HashMap::new(),
|
|
||||||
accounts: Default::default(),
|
|
||||||
transactions: HashMap::new(),
|
|
||||||
entry: Default::default(),
|
|
||||||
blocks: blocks_subs,
|
blocks: blocks_subs,
|
||||||
blocks_meta: HashMap::new(),
|
|
||||||
commitment: Some(map_commitment_level(self.0) as i32),
|
commitment: Some(map_commitment_level(self.0) as i32),
|
||||||
accounts_data_slice: Default::default(),
|
..Default::default()
|
||||||
ping: None,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,15 +116,9 @@ impl GeyserFilter {
|
||||||
blocksmeta_subs.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {});
|
blocksmeta_subs.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {});
|
||||||
|
|
||||||
SubscribeRequest {
|
SubscribeRequest {
|
||||||
slots: HashMap::new(),
|
|
||||||
accounts: Default::default(),
|
|
||||||
transactions: HashMap::new(),
|
|
||||||
entry: Default::default(),
|
|
||||||
blocks: HashMap::new(),
|
|
||||||
blocks_meta: blocksmeta_subs,
|
blocks_meta: blocksmeta_subs,
|
||||||
commitment: Some(map_commitment_level(self.0) as i32),
|
commitment: Some(map_commitment_level(self.0) as i32),
|
||||||
accounts_data_slice: Default::default(),
|
..Default::default()
|
||||||
ping: None,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,14 +133,8 @@ impl GeyserFilter {
|
||||||
|
|
||||||
SubscribeRequest {
|
SubscribeRequest {
|
||||||
slots: slots_subs,
|
slots: slots_subs,
|
||||||
accounts: Default::default(),
|
|
||||||
transactions: HashMap::new(),
|
|
||||||
entry: Default::default(),
|
|
||||||
blocks: HashMap::new(),
|
|
||||||
blocks_meta: HashMap::new(),
|
|
||||||
commitment: Some(map_commitment_level(self.0) as i32),
|
commitment: Some(map_commitment_level(self.0) as i32),
|
||||||
accounts_data_slice: Default::default(),
|
..Default::default()
|
||||||
ping: None,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,15 +150,9 @@ impl GeyserFilter {
|
||||||
);
|
);
|
||||||
|
|
||||||
SubscribeRequest {
|
SubscribeRequest {
|
||||||
slots: HashMap::new(),
|
|
||||||
accounts: accounts_subs,
|
accounts: accounts_subs,
|
||||||
transactions: HashMap::new(),
|
|
||||||
entry: Default::default(),
|
|
||||||
blocks: Default::default(),
|
|
||||||
blocks_meta: HashMap::new(),
|
|
||||||
commitment: Some(map_commitment_level(self.0) as i32),
|
commitment: Some(map_commitment_level(self.0) as i32),
|
||||||
accounts_data_slice: Default::default(),
|
..Default::default()
|
||||||
ping: None,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
use tonic::codec::CompressionEncoding;
|
||||||
use tonic_health::pb::health_client::HealthClient;
|
use tonic_health::pb::health_client::HealthClient;
|
||||||
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult, InterceptorXToken};
|
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult, InterceptorXToken};
|
||||||
use yellowstone_grpc_proto::geyser::geyser_client::GeyserClient;
|
use yellowstone_grpc_proto::geyser::geyser_client::GeyserClient;
|
||||||
|
@ -110,6 +111,7 @@ where
|
||||||
let client = GeyserGrpcClient::new(
|
let client = GeyserGrpcClient::new(
|
||||||
HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
|
HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
|
||||||
GeyserClient::with_interceptor(channel, interceptor)
|
GeyserClient::with_interceptor(channel, interceptor)
|
||||||
|
.accept_compressed(CompressionEncoding::Gzip)
|
||||||
.max_decoding_message_size(GeyserGrpcClient::max_decoding_message_size()),
|
.max_decoding_message_size(GeyserGrpcClient::max_decoding_message_size()),
|
||||||
);
|
);
|
||||||
Ok(client)
|
Ok(client)
|
||||||
|
|
Loading…
Reference in New Issue