count data
This commit is contained in:
parent
087625de3f
commit
88f8d71ba8
|
@ -19,11 +19,11 @@ use tokio::time::{sleep, Duration};
|
||||||
use tracing::field::debug;
|
use tracing::field::debug;
|
||||||
use tracing::warn;
|
use tracing::warn;
|
||||||
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
|
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
|
||||||
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeUpdate};
|
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots, SubscribeUpdate};
|
||||||
use yellowstone_grpc_proto::prost::Message as _;
|
use yellowstone_grpc_proto::prost::Message as _;
|
||||||
|
|
||||||
|
|
||||||
const ENABLE_TIMESTAMP_TAGGING: bool = true;
|
const ENABLE_TIMESTAMP_TAGGING: bool = false;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
pub async fn main() {
|
pub async fn main() {
|
||||||
|
@ -63,16 +63,27 @@ pub async fn main() {
|
||||||
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
let mut bytes_per_slot: HashMap<Slot, u64> = HashMap::new();
|
||||||
|
let mut updates_per_slot: HashMap<Slot, u64> = HashMap::new();
|
||||||
|
|
||||||
|
let mut changing_slot = 0;
|
||||||
|
let mut current_slot = 0;
|
||||||
|
|
||||||
|
|
||||||
let mut green_stream = pin!(green_stream);
|
let mut green_stream = pin!(green_stream);
|
||||||
while let Some(message) = green_stream.next().await {
|
while let Some(message) = green_stream.next().await {
|
||||||
match message {
|
match message {
|
||||||
Message::GeyserSubscribeUpdate(subscriber_update) => {
|
Message::GeyserSubscribeUpdate(subscriber_update) => {
|
||||||
match subscriber_update.update_oneof {
|
match subscriber_update.update_oneof {
|
||||||
|
Some(UpdateOneof::Slot(update)) => {
|
||||||
|
current_slot = update.slot;
|
||||||
|
}
|
||||||
Some(UpdateOneof::Account(update)) => {
|
Some(UpdateOneof::Account(update)) => {
|
||||||
|
let slot = update.slot as Slot;
|
||||||
let account = update.account.unwrap();
|
let account = update.account.unwrap();
|
||||||
let account_pk = Pubkey::try_from(account.pubkey).unwrap();
|
let account_pk = Pubkey::try_from(account.pubkey).unwrap();
|
||||||
let size = account.data.len();
|
let size = account.data.len() as u64;
|
||||||
info!("got account update (green)!!! {} - {:?} - {} bytes",
|
trace!("got account update (green)!!! {} - {:?} - {} bytes",
|
||||||
update.slot, account_pk, account.data.len());
|
update.slot, account_pk, account.data.len());
|
||||||
|
|
||||||
if ENABLE_TIMESTAMP_TAGGING {
|
if ENABLE_TIMESTAMP_TAGGING {
|
||||||
|
@ -120,6 +131,22 @@ pub async fn main() {
|
||||||
.or_insert_with(DashMap::new)
|
.or_insert_with(DashMap::new)
|
||||||
.insert(mint, account_ui);
|
.insert(mint, account_ui);
|
||||||
|
|
||||||
|
bytes_per_slot.entry(slot)
|
||||||
|
.and_modify(|total| *total += size).or_insert(size);
|
||||||
|
|
||||||
|
updates_per_slot.entry(slot)
|
||||||
|
.and_modify(|total| *total += 1).or_insert(1);
|
||||||
|
|
||||||
|
info!("delta: {}", (slot as i64) - (current_slot as i64));
|
||||||
|
|
||||||
|
if slot != changing_slot && changing_slot != 0 {
|
||||||
|
let total_bytes = bytes_per_slot.get(&changing_slot).unwrap();
|
||||||
|
let updates_count = updates_per_slot.get(&changing_slot).unwrap();
|
||||||
|
// info!("Slot {} - Total bytes: {} in {} updates", slot, total_bytes, updates_count);
|
||||||
|
}
|
||||||
|
changing_slot = slot;
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
Ok(TokenAccountType::Mint(mint)) => {
|
Ok(TokenAccountType::Mint(mint)) => {
|
||||||
// not interesting
|
// not interesting
|
||||||
|
@ -173,16 +200,23 @@ pub fn token_accounts() -> SubscribeRequest {
|
||||||
accounts_subs.insert(
|
accounts_subs.insert(
|
||||||
"client".to_string(),
|
"client".to_string(),
|
||||||
SubscribeRequestFilterAccounts {
|
SubscribeRequestFilterAccounts {
|
||||||
account: vec!["4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg".to_string()],
|
account: vec![],
|
||||||
owner: vec![],
|
// vec!["4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg".to_string()],
|
||||||
// spl_token_ids().iter().map(|pubkey| pubkey.to_string()).collect(),
|
owner:
|
||||||
|
spl_token_ids().iter().map(|pubkey| pubkey.to_string()).collect(),
|
||||||
filters: vec![],
|
filters: vec![],
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
||||||
|
let mut slots_subs = HashMap::new();
|
||||||
|
slots_subs.insert("client".to_string(), SubscribeRequestFilterSlots {
|
||||||
|
filter_by_commitment: Some(true),
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
SubscribeRequest {
|
SubscribeRequest {
|
||||||
slots: HashMap::new(),
|
slots: slots_subs,
|
||||||
accounts: accounts_subs,
|
accounts: accounts_subs,
|
||||||
transactions: HashMap::new(),
|
transactions: HashMap::new(),
|
||||||
entry: Default::default(),
|
entry: Default::default(),
|
||||||
|
|
Loading…
Reference in New Issue