From f1c1b02dd75d77ae7e02e8d61e031238faec7fb1 Mon Sep 17 00:00:00 2001 From: musitdev Date: Sat, 2 Sep 2023 17:52:28 +0200 Subject: [PATCH] add log and calcul on last epoch slot --- stake_aggregate/src/leader_schedule.rs | 2 +- stake_aggregate/src/main.rs | 44 +++++++++++++++++++++++--- stake_aggregate/src/stakestore.rs | 6 ++-- 3 files changed, 43 insertions(+), 9 deletions(-) diff --git a/stake_aggregate/src/leader_schedule.rs b/stake_aggregate/src/leader_schedule.rs index 9c2f03b..ed8061f 100644 --- a/stake_aggregate/src/leader_schedule.rs +++ b/stake_aggregate/src/leader_schedule.rs @@ -56,7 +56,7 @@ fn calculate_leader_schedule( .map(|(pubkey, stake)| (*pubkey, *stake)) .collect(); sort_stakes(&mut stakes); - log::trace!("calculate_leader_schedule stakes:{stakes:?} epoch:{current_epoch_info:?}"); + log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{current_epoch_info:?}"); Ok(LeaderSchedule::new( &stakes, seed, diff --git a/stake_aggregate/src/main.rs b/stake_aggregate/src/main.rs index bf5696f..c20f34d 100644 --- a/stake_aggregate/src/main.rs +++ b/stake_aggregate/src/main.rs @@ -20,13 +20,15 @@ use yellowstone_grpc_client::GeyserGrpcClient; use yellowstone_grpc_proto::geyser::CommitmentLevel; use yellowstone_grpc_proto::geyser::SubscribeUpdateAccount; use yellowstone_grpc_proto::prelude::SubscribeRequestFilterAccounts; +use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocks; +use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta; use yellowstone_grpc_proto::{ prelude::{subscribe_update::UpdateOneof, SubscribeRequestFilterSlots, SubscribeUpdateSlot}, tonic::service::Interceptor, }; mod leader_schedule; -mod rpc; +//mod rpc; mod stakestore; type Slot = u64; @@ -42,6 +44,13 @@ const RPC_URL: &str = "http://localhost:8899"; const STAKESTORE_INITIAL_CAPACITY: usize = 600000; +pub fn log_end_epoch(current_slot: Slot, end_epoch_slot: Slot, msg: String) { + //log 50 end slot. + if current_slot != 0 && current_slot + 50 > end_epoch_slot { + log::info!("{current_slot}/{end_epoch_slot} {}", msg); + } +} + /// Connect to yellow stone plugin using yellow stone gRpc Client #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -89,11 +98,12 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetPa)); //subscribe Geyser grpc + //slot subscription let mut slots = HashMap::new(); slots.insert("client".to_string(), SubscribeRequestFilterSlots {}); + //account subscription let mut accounts: HashMap = HashMap::new(); - accounts.insert( "client".to_owned(), SubscribeRequestFilterAccounts { @@ -106,13 +116,30 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re }, ); + //block subscription + let mut blocks = HashMap::new(); + blocks.insert( + "client".to_string(), + SubscribeRequestFilterBlocks { + account_include: Default::default(), + include_transactions: Some(true), + include_accounts: Some(false), + }, + ); + + //block Meta subscription filter + let mut blocks_meta = HashMap::new(); + blocks_meta.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {}); + let mut confirmed_stream = client .subscribe_once( slots.clone(), accounts.clone(), //accounts Default::default(), //tx Default::default(), //entry + //blocks, //full block Default::default(), //full block + //blocks_meta, //block meta Default::default(), //block meta Some(CommitmentLevel::Confirmed), vec![], @@ -280,9 +307,11 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re Some(UpdateOneof::Slot(slot)) => { //update current slot //log::info!("Processing slot: {:?} current slot:{:?}", slot, current_slot); + log_end_epoch(current_slot.confirmed_slot, next_epoch_start_slot, format!("Receive slot: {:?} at commitment:{:?}", slot.slot, slot.status())); + current_slot.update_slot(&slot); - if current_slot.confirmed_slot >= next_epoch_start_slot { //slot can be non consecutif. + if current_slot.confirmed_slot >= next_epoch_start_slot-1 { //slot can be non consecutif. log::info!("End epoch slot. Calculate schedule at current slot:{}", current_slot.confirmed_slot); let Ok(stake_map) = extract_stakestore(&mut stakestore) else { log::info!("Epoch schedule aborted because a getPA is currently running."); @@ -305,6 +334,7 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re let move_epoch = current_epoch.clone(); move || { let schedule = crate::leader_schedule::calculate_leader_schedule_from_stake_map(&stake_map, &move_epoch); + log::info!("End calculate leader schedule at slot:{}", current_slot.confirmed_slot); TaskResult::ScheduleResult(schedule.ok(), stake_map) } }); @@ -315,6 +345,12 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re } } + Some(UpdateOneof::BlockMeta(block_meta)) => { + log::info!("Receive Block Meta at slot: {}", block_meta.slot); + } + Some(UpdateOneof::Block(block)) => { + println!("Receive Block at slot: {}", block.slot); + } Some(UpdateOneof::Ping(_)) => log::trace!("UpdateOneof::Ping"), bad_msg => { log::info!("Geyser stream unexpected message received:{:?}",bad_msg); @@ -425,7 +461,7 @@ fn read_account( }; if geyser_account.slot != current_slot { - log::trace!( + log::warn!( "Get geyser account on a different slot:{} of the current:{current_slot}", geyser_account.slot ); diff --git a/stake_aggregate/src/stakestore.rs b/stake_aggregate/src/stakestore.rs index ab71482..dee4eb1 100644 --- a/stake_aggregate/src/stakestore.rs +++ b/stake_aggregate/src/stakestore.rs @@ -30,13 +30,13 @@ fn stake_map_insert_stake(map: &mut StakeMap, stake_account: Pubkey, stake: Stor std::collections::hash_map::Entry::Occupied(occupied) => { let strstake = occupied.into_mut(); // <-- get mut reference to existing value if strstake.last_update_slot < stake.last_update_slot { - log::trace!("Stake updated for: {stake_account} stake:{stake:?}"); + log::info!("Stake updated for: {stake_account} stake:{stake:?}"); *strstake = stake; } } // If value doesn't exist yet, then insert a new value of 1 std::collections::hash_map::Entry::Vacant(vacant) => { - log::trace!("New stake added for: {stake_account} stake:{stake:?}"); + log::info!("New stake added for: {stake_account} stake:{stake:?}"); vacant.insert(stake); } }; @@ -118,8 +118,6 @@ impl StakeStore { last_update_slot: new_account.slot, write_version: new_account.write_version, }; - - log::trace!("add_stake ststake:{ststake:?}"); //during extract push the new update or //don't add account change that has been done in next epoch. let insert_stake = !self.extracted || ststake.last_update_slot > current_end_epoch_slot;