add log and calcul on last epoch slot

This commit is contained in:
musitdev 2023-09-02 17:52:28 +02:00
parent d386a17b16
commit f1c1b02dd7
3 changed files with 43 additions and 9 deletions

View File

@ -56,7 +56,7 @@ fn calculate_leader_schedule(
.map(|(pubkey, stake)| (*pubkey, *stake))
.collect();
sort_stakes(&mut stakes);
log::trace!("calculate_leader_schedule stakes:{stakes:?} epoch:{current_epoch_info:?}");
log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{current_epoch_info:?}");
Ok(LeaderSchedule::new(
&stakes,
seed,

View File

@ -20,13 +20,15 @@ use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::CommitmentLevel;
use yellowstone_grpc_proto::geyser::SubscribeUpdateAccount;
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterAccounts;
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocks;
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta;
use yellowstone_grpc_proto::{
prelude::{subscribe_update::UpdateOneof, SubscribeRequestFilterSlots, SubscribeUpdateSlot},
tonic::service::Interceptor,
};
mod leader_schedule;
mod rpc;
//mod rpc;
mod stakestore;
type Slot = u64;
@ -42,6 +44,13 @@ const RPC_URL: &str = "http://localhost:8899";
const STAKESTORE_INITIAL_CAPACITY: usize = 600000;
pub fn log_end_epoch(current_slot: Slot, end_epoch_slot: Slot, msg: String) {
//log 50 end slot.
if current_slot != 0 && current_slot + 50 > end_epoch_slot {
log::info!("{current_slot}/{end_epoch_slot} {}", msg);
}
}
/// Connect to yellow stone plugin using yellow stone gRpc Client
#[tokio::main]
async fn main() -> anyhow::Result<()> {
@ -89,11 +98,12 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetPa));
//subscribe Geyser grpc
//slot subscription
let mut slots = HashMap::new();
slots.insert("client".to_string(), SubscribeRequestFilterSlots {});
//account subscription
let mut accounts: HashMap<String, SubscribeRequestFilterAccounts> = HashMap::new();
accounts.insert(
"client".to_owned(),
SubscribeRequestFilterAccounts {
@ -106,13 +116,30 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
},
);
//block subscription
let mut blocks = HashMap::new();
blocks.insert(
"client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
},
);
//block Meta subscription filter
let mut blocks_meta = HashMap::new();
blocks_meta.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {});
let mut confirmed_stream = client
.subscribe_once(
slots.clone(),
accounts.clone(), //accounts
Default::default(), //tx
Default::default(), //entry
//blocks, //full block
Default::default(), //full block
//blocks_meta, //block meta
Default::default(), //block meta
Some(CommitmentLevel::Confirmed),
vec![],
@ -280,9 +307,11 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
Some(UpdateOneof::Slot(slot)) => {
//update current slot
//log::info!("Processing slot: {:?} current slot:{:?}", slot, current_slot);
log_end_epoch(current_slot.confirmed_slot, next_epoch_start_slot, format!("Receive slot: {:?} at commitment:{:?}", slot.slot, slot.status()));
current_slot.update_slot(&slot);
if current_slot.confirmed_slot >= next_epoch_start_slot { //slot can be non consecutif.
if current_slot.confirmed_slot >= next_epoch_start_slot-1 { //slot can be non consecutif.
log::info!("End epoch slot. Calculate schedule at current slot:{}", current_slot.confirmed_slot);
let Ok(stake_map) = extract_stakestore(&mut stakestore) else {
log::info!("Epoch schedule aborted because a getPA is currently running.");
@ -305,6 +334,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
let move_epoch = current_epoch.clone();
move || {
let schedule = crate::leader_schedule::calculate_leader_schedule_from_stake_map(&stake_map, &move_epoch);
log::info!("End calculate leader schedule at slot:{}", current_slot.confirmed_slot);
TaskResult::ScheduleResult(schedule.ok(), stake_map)
}
});
@ -315,6 +345,12 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
}
}
Some(UpdateOneof::BlockMeta(block_meta)) => {
log::info!("Receive Block Meta at slot: {}", block_meta.slot);
}
Some(UpdateOneof::Block(block)) => {
println!("Receive Block at slot: {}", block.slot);
}
Some(UpdateOneof::Ping(_)) => log::trace!("UpdateOneof::Ping"),
bad_msg => {
log::info!("Geyser stream unexpected message received:{:?}",bad_msg);
@ -425,7 +461,7 @@ fn read_account(
};
if geyser_account.slot != current_slot {
log::trace!(
log::warn!(
"Get geyser account on a different slot:{} of the current:{current_slot}",
geyser_account.slot
);

View File

@ -30,13 +30,13 @@ fn stake_map_insert_stake(map: &mut StakeMap, stake_account: Pubkey, stake: Stor
std::collections::hash_map::Entry::Occupied(occupied) => {
let strstake = occupied.into_mut(); // <-- get mut reference to existing value
if strstake.last_update_slot < stake.last_update_slot {
log::trace!("Stake updated for: {stake_account} stake:{stake:?}");
log::info!("Stake updated for: {stake_account} stake:{stake:?}");
*strstake = stake;
}
}
// If value doesn't exist yet, then insert a new value of 1
std::collections::hash_map::Entry::Vacant(vacant) => {
log::trace!("New stake added for: {stake_account} stake:{stake:?}");
log::info!("New stake added for: {stake_account} stake:{stake:?}");
vacant.insert(stake);
}
};
@ -118,8 +118,6 @@ impl StakeStore {
last_update_slot: new_account.slot,
write_version: new_account.write_version,
};
log::trace!("add_stake ststake:{ststake:?}");
//during extract push the new update or
//don't add account change that has been done in next epoch.
let insert_stake = !self.extracted || ststake.last_update_slot > current_end_epoch_slot;