diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ba8d4ce --- /dev/null +++ b/.gitignore @@ -0,0 +1,15 @@ +# Generated by Cargo +# will have compiled files and executables +**/target + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html +**/Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk + +*.DS_Store +*.vscode +*.idea + diff --git a/leadershedule/Cargo.toml b/leadershedule/Cargo.toml new file mode 100644 index 0000000..6806db5 --- /dev/null +++ b/leadershedule/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "leadershedule" +version = "0.2.2" +authors = ["phild@mango.markets>"] +repository = "https://github.com/blockworks-foundation/solana-rpc-v2" +license = "AGPL" +edition = "2021" +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.70" +log = "0.4.17" +tracing-subscriber = "0.3.16" +tokio = { version = "1.*", features = ["full", "fs"]} +tokio-stream = "0.1.*" +borsh = "0.10.3" +time = { version = "0.3.25", features=["local-offset"]} +chrono = "0.4.26" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +serde_derive = "1.0" + +solana-client = "1.16.2" +solana-sdk = "1.16.2" +solana-transaction-status = "1.16.2" +solana-ledger = "1.16.2" diff --git a/leadershedule/src/main.rs b/leadershedule/src/main.rs new file mode 100644 index 0000000..7f0b73d --- /dev/null +++ b/leadershedule/src/main.rs @@ -0,0 +1,309 @@ +use borsh::BorshDeserialize; +use chrono::{Datelike, Local, NaiveDate, NaiveTime, Timelike}; +use core::str::FromStr; +use serde_json; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_ledger::leader_schedule::LeaderSchedule; +use solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS; +use solana_sdk::commitment_config::CommitmentConfig; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::stake::state::StakeState; +use std::collections::HashMap; +use std::env; +use std::time::Duration; +use time::{Duration as TimeDuration, OffsetDateTime, UtcOffset}; +use tokio::fs::File; +use tokio::io::AsyncWriteExt; + +const RPC_URL: &str = "https://api.mainnet-beta.solana.com"; +//const RPC_URL: &str = "https://api.testnet.solana.com"; +//const RPC_URL: &str = "https://api.devnet.solana.com"; + +const SLOTS_IN_EPOCH: u64 = 432000; + +#[tokio::main(flavor = "multi_thread", worker_threads = 16)] +pub async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + let args: Vec = env::args().collect(); + + if args.len() != 4 { + eprintln!("Please provide 3 arguments: hour, minute and seconds"); + std::process::exit(1); + } + + let target_hour: u32 = args[1] + .parse() + .expect("First argument should be a number representing the hour"); + let target_minute: u32 = args[2] + .parse() + .expect("Second argument should be a number representing the minute"); + let target_second: u32 = args[3] + .parse() + .expect("Third argument should be a number representing the seconds"); + + let seconds_until_target = seconds_until_target_time(target_hour, target_minute, target_second); + log::info!("seconds_until_target:{}", seconds_until_target); + let to_wait = Duration::from_secs(seconds_until_target as u64 - 30); + tokio::time::sleep(to_wait).await; + + let mut counter = 0; + let mut schedule_counter = 0; + let mut epoch_offset = 1; + + loop { + match write_schedule(0).await { + Ok(()) => { + epoch_offset = 0; + schedule_counter += 1; + if schedule_counter == 3 { + break; + } + tokio::time::sleep(Duration::from_secs(30)).await; + } + Err(err) => { + log::info!("error:{err}"); + tokio::time::sleep(tokio::time::Duration::from_millis(2)).await; + counter += 1; + if counter == 5 { + break; + } + } + } + } + Ok(()) +} + +async fn write_schedule(epoch_offset: u64) -> anyhow::Result<()> { + let schedule = process_schedule(epoch_offset).await?; + let serialized_map = serde_json::to_string(&schedule).unwrap(); + let now = Local::now(); + let date_string = format!( + "{}_{}_{}-{}_{}_{}", + now.year(), + now.month(), + now.day(), + now.hour(), + now.minute(), + now.second() + ); + + // Create the file name + let file_name = format!("output_{}.json", date_string); + + // Write to the file + let mut file = File::create(file_name).await?; + file.write_all(serialized_map.as_bytes()).await?; + //show all schedule aggregated. + let mut print_finalized = schedule + .into_iter() + .map(|(key, values)| format!("{key}:{:?}", values)) + .collect::>(); + print_finalized.sort(); + log::info!("leader_schedule_finalized:{:?}", print_finalized); + Ok(()) +} + +async fn process_schedule(epoch_offset: u64) -> anyhow::Result> { + let rpc_client = + RpcClient::new_with_commitment(RPC_URL.to_string(), CommitmentConfig::finalized()); + + let slot = rpc_client.get_slot().await?; + + // Fetch current epoch + let epoch_info = rpc_client.get_epoch_info().await?; + + let current_epoch = epoch_info.epoch; + + // Fetch stakes in current epoch + let response = rpc_client + .get_program_accounts(&solana_sdk::stake::program::id()) + .await?; + + log::info!("current_slot:{slot:?}"); + log::info!("epoch_info:{epoch_info:?}"); + log::info!("get_program_accounts:{:?}", response.len()); + + let mut stakes = HashMap::::new(); + + for (pubkey, account) in response { + // Zero-length accounts owned by the stake program are system accounts that were re-assigned and are to be + // ignored + if account.data.len() == 0 { + continue; + } + + match StakeState::deserialize(&mut account.data.as_slice())? { + StakeState::Stake(_, stake) => { + // Ignore stake accounts activated in this epoch (or later, to include activation_epoch of + // u64::MAX which indicates no activation ever happened) + if stake.delegation.activation_epoch >= current_epoch { + continue; + } + // Ignore stake accounts deactivated before this epoch + if stake.delegation.deactivation_epoch < current_epoch { + continue; + } + // Add the stake in this stake account to the total for the delegated-to vote account + *(stakes + .entry(stake.delegation.voter_pubkey.clone()) + .or_insert(0)) += stake.delegation.stake; + } + _ => (), + } + } + + let leader_schedule = calculate_leader_schedule(current_epoch + epoch_offset, stakes); + + let mut leader_schedule_aggregated: HashMap = leader_schedule + .get_slot_leaders() + .iter() + .fold(HashMap::new(), |mut sc, l| { + sc.entry(l.to_string()).or_insert((0, 0, 0)).1 += 1; + sc + }); + // for (leader, nb) in leader_schedule_aggregated { + // println!("{leader}:{nb}"); + // } + + //build vote account node key association table + let vote_account = rpc_client.get_vote_accounts().await?; + let note_vote_table = vote_account + .current + .iter() + .chain(vote_account.delinquent.iter()) + .map(|va| (va.node_pubkey.clone(), va.vote_pubkey.clone())) + .collect::>(); + + //get leader schedule from rpc + let leader_schedule_finalized = rpc_client.get_leader_schedule(Some(slot)).await?; //Some(slot) + + let binding = "Vote key not found".to_string(); + leader_schedule_finalized + .unwrap() + .into_iter() + .for_each(|(key, slots)| { + let vote_key = note_vote_table.get(&key.to_string()).unwrap_or(&binding); + leader_schedule_aggregated + .entry(vote_key.clone()) + .or_insert((0, 0, 0)) + .0 += slots.len() as u64 + }); + + //build schedule from vote account. + let vote_stackes: HashMap = vote_account + .current + .iter() + .chain(vote_account.delinquent.iter()) + .map(|va| { + ( + Pubkey::from_str(&va.vote_pubkey).unwrap(), + va.activated_stake, + ) + }) + .collect(); + let leader_schedule_va = calculate_leader_schedule(current_epoch + epoch_offset, vote_stackes); + leader_schedule_va.get_slot_leaders().iter().for_each(|l| { + leader_schedule_aggregated + .entry(l.to_string()) + .or_insert((0, 0, 0)) + .2 += 1; + }); + + // log::info!( + // "vote account current:{:?}", + // vote_account + // .current + // .iter() + // .map(|va| format!("{}/{}", va.vote_pubkey, va.node_pubkey)) + // .collect::>() + // ); + // log::info!( + // "vote account delinquent:{:?}", + // vote_account + // .delinquent + // .iter() + // .map(|va| format!("{}/{}", va.vote_pubkey, va.node_pubkey)) + // .collect::>() + // ); + + Ok(leader_schedule_aggregated) +} + +//Copied from leader_schedule_utils.rs +// Mostly cribbed from leader_schedule_utils +fn calculate_leader_schedule(epoch: u64, stakes: HashMap) -> LeaderSchedule { + let mut seed = [0u8; 32]; + seed[0..8].copy_from_slice(&epoch.to_le_bytes()); + let mut stakes: Vec<_> = stakes + .iter() + .map(|(pubkey, stake)| (*pubkey, *stake)) + .collect(); + sort_stakes(&mut stakes); + LeaderSchedule::new(&stakes, seed, SLOTS_IN_EPOCH, NUM_CONSECUTIVE_LEADER_SLOTS) +} + +// Cribbed from leader_schedule_utils +fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) { + // Sort first by stake. If stakes are the same, sort by pubkey to ensure a + // deterministic result. + // Note: Use unstable sort, because we dedup right after to remove the equal elements. + stakes.sort_unstable_by(|(l_pubkey, l_stake), (r_pubkey, r_stake)| { + if r_stake == l_stake { + r_pubkey.cmp(l_pubkey) + } else { + r_stake.cmp(l_stake) + } + }); + + // Now that it's sorted, we can do an O(n) dedup. + stakes.dedup(); +} + +fn seconds_until_target_time_with_time( + target_hour: u8, + target_minute: u8, + target_second: u8, +) -> i64 { + //let local_offset = UtcOffset::local_offset_at(OffsetDateTime::UNIX_EPOCH); + //log::info!("{local_offset:?}"); + //set UTC+2 + let utcp2 = UtcOffset::from_hms(2, 0, 0).unwrap(); + let now = OffsetDateTime::now_utc().to_offset(utcp2); + //let now = OffsetDateTime::now_utc(); + log::info!("now:{now:?}"); + let mut target_time = now + .date() + .with_hms(target_hour, target_minute, target_second) + .unwrap() + .assume_offset(utcp2); + + // If the target time has passed for today, calculate for next day + if now > target_time { + log::info!("add one day"); + target_time = target_time + TimeDuration::days(1); + } + log::info!("target_time:{target_time:?}"); + + let duration_until_target = target_time - now; + duration_until_target.whole_seconds() +} + +fn seconds_until_target_time(target_hour: u32, target_minute: u32, target_second: u32) -> u64 { + let now = Local::now(); + log::info!("now:{now:?}"); + let today = now.date_naive(); + let target_naive_time = + NaiveTime::from_hms_opt(target_hour, target_minute, target_second).unwrap(); + let mut target_time = NaiveDate::and_time(&today, target_naive_time); + + // If the target time has passed for today, calculate for next day + if target_time < now.naive_local() { + target_time = NaiveDate::and_time(&(today + chrono::Duration::days(1)), target_naive_time); + } + + log::info!("target_time:{target_time:?}"); + let duration_until_target = target_time + .signed_duration_since(now.naive_local()) + .num_seconds() as u64; + duration_until_target +} diff --git a/stake_aggregate/Cargo.toml b/stake_aggregate/Cargo.toml new file mode 100644 index 0000000..abbd4b7 --- /dev/null +++ b/stake_aggregate/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "stake_aggregate" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.71" +borsh = "0.10.3" +bs58 = "0.4.0" +futures = { version = "0.3.28", default-features = false } +futures-util = "0.3.28" +hex = "0.4.3" +log = "0.4.17" +tracing-subscriber = "0.3.16" +tokio = { version = "1.*", features = ["full"] } + +yellowstone-grpc-client = { path = "../../yellowstone-grpc/yellowstone-grpc-client" } +yellowstone-grpc-proto = { path = "../../yellowstone-grpc/yellowstone-grpc-proto" } + + +#yellowstone-grpc-client = "1.8.0+solana.1.16.1" +#yellowstone-grpc-proto = "1.8.0+solana.1.16.1" + +solana-sdk = "1.16.*" +solana-client = "1.16.*" +solana-ledger = "1.16.*" \ No newline at end of file diff --git a/stake_aggregate/src/leader_schedule.rs b/stake_aggregate/src/leader_schedule.rs new file mode 100644 index 0000000..ea8a027 --- /dev/null +++ b/stake_aggregate/src/leader_schedule.rs @@ -0,0 +1,64 @@ +use solana_ledger::leader_schedule::LeaderSchedule; +use solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS; +use solana_sdk::epoch_info::EpochInfo; +use solana_sdk::pubkey::Pubkey; +use std::collections::HashMap; + +pub fn calculate_leader_schedule_from_stake_map( + stake_map: &crate::stakestore::StakeMap, + current_epoch_info: &EpochInfo, +) -> LeaderSchedule { + let mut stakes = HashMap::::new(); + for storestake in stake_map.values() { + // Ignore stake accounts activated in this epoch (or later, to include activation_epoch of + // u64::MAX which indicates no activation ever happened) + if storestake.stake.activation_epoch >= current_epoch_info.epoch { + continue; + } + // Ignore stake accounts deactivated before this epoch + if storestake.stake.deactivation_epoch < current_epoch_info.epoch { + continue; + } + // Add the stake in this stake account to the total for the delegated-to vote account + *(stakes.entry(storestake.stake.voter_pubkey).or_insert(0)) += storestake.stake.stake; + } + calculate_leader_schedule(stakes, current_epoch_info) +} + +//Copied from leader_schedule_utils.rs +// Mostly cribbed from leader_schedule_utils +fn calculate_leader_schedule( + stakes: HashMap, + current_epoch_info: &EpochInfo, +) -> LeaderSchedule { + let mut seed = [0u8; 32]; + seed[0..8].copy_from_slice(¤t_epoch_info.epoch.to_le_bytes()); + let mut stakes: Vec<_> = stakes + .iter() + .map(|(pubkey, stake)| (*pubkey, *stake)) + .collect(); + sort_stakes(&mut stakes); + LeaderSchedule::new( + &stakes, + seed, + current_epoch_info.slots_in_epoch, + NUM_CONSECUTIVE_LEADER_SLOTS, + ) +} + +// Cribbed from leader_schedule_utils +fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) { + // Sort first by stake. If stakes are the same, sort by pubkey to ensure a + // deterministic result. + // Note: Use unstable sort, because we dedup right after to remove the equal elements. + stakes.sort_unstable_by(|(l_pubkey, l_stake), (r_pubkey, r_stake)| { + if r_stake == l_stake { + r_pubkey.cmp(l_pubkey) + } else { + r_stake.cmp(l_stake) + } + }); + + // Now that it's sorted, we can do an O(n) dedup. + stakes.dedup(); +} diff --git a/stake_aggregate/src/main.rs b/stake_aggregate/src/main.rs new file mode 100644 index 0000000..9a53034 --- /dev/null +++ b/stake_aggregate/src/main.rs @@ -0,0 +1,332 @@ +use crate::stakestore::StakeStore; +use futures_util::stream::FuturesUnordered; +use futures_util::StreamExt; +use solana_client::client_error::ClientError; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_ledger::leader_schedule::LeaderSchedule; +use solana_sdk::account::Account; +use solana_sdk::commitment_config::CommitmentConfig; +use solana_sdk::epoch_info::EpochInfo; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::stake::state::Delegation; +use solana_sdk::stake::state::StakeState; +use std::collections::HashMap; +use yellowstone_grpc_client::GeyserGrpcClient; +use yellowstone_grpc_proto::geyser::CommitmentLevel; +use yellowstone_grpc_proto::geyser::SubscribeUpdateAccount; +use yellowstone_grpc_proto::prelude::SubscribeRequestFilterAccounts; +use yellowstone_grpc_proto::{ + prelude::{subscribe_update::UpdateOneof, SubscribeRequestFilterSlots, SubscribeUpdateSlot}, + tonic::service::Interceptor, +}; + +mod leader_schedule; +mod stakestore; + +type Slot = u64; + +const GRPC_URL: &str = "http://127.0.0.0:10000"; +const RPC_URL: &str = "https://api.mainnet-beta.solana.com"; +//const RPC_URL: &str = "https://api.testnet.solana.com"; +//const RPC_URL: &str = "https://api.devnet.solana.com"; + +const STAKESTORE_INITIAL_CAPACITY: usize = 600000; + +/// Connect to yellow stone plugin using yellow stone gRpc Client +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + + let mut client = GeyserGrpcClient::connect(GRPC_URL, None::<&'static str>, None)?; + + let version = client.get_version().await?; + println!("Version: {:?}", version); + + let ctrl_c_signal = tokio::signal::ctrl_c(); + + tokio::select! { + res = run_loop(client) => { + // This should never happen + log::error!("Services quit unexpectedly {res:?}"); + } + _ = ctrl_c_signal => { + log::info!("Received ctrl+c signal"); + } + } + + Ok(()) +} + +async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Result<()> { + //local vars + let mut current_slot: CurrentSlot = Default::default(); + let mut stakestore = StakeStore::new(STAKESTORE_INITIAL_CAPACITY); + let mut current_epoch = { + let rpc_client = + RpcClient::new_with_commitment(RPC_URL.to_string(), CommitmentConfig::finalized()); + + // Fetch current epoch + rpc_client.get_epoch_info().await? + }; + log::trace!("Run_loop init current_epoch:{current_epoch:?}"); + + let mut spawned_task_toexec = FuturesUnordered::new(); + let mut spawned_task_result = FuturesUnordered::new(); + + spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetPa)); + + let mut slots = HashMap::new(); + slots.insert("client".to_string(), SubscribeRequestFilterSlots {}); + + let mut accounts: HashMap = HashMap::new(); + + accounts.insert( + "client".to_owned(), + SubscribeRequestFilterAccounts { + account: vec![], + owner: vec![solana_sdk::stake::program::id().to_string()], + filters: vec![], + }, + ); + + let mut confirmed_stream = client + .subscribe_once( + slots, + accounts, //accounts + Default::default(), //tx + Default::default(), //entry + Default::default(), //full block + Default::default(), //block meta + Some(CommitmentLevel::Confirmed), + vec![], + ) + .await?; + + loop { + tokio::select! { + //Execute RPC call in another task + Some(to_exec) = spawned_task_toexec.next() => { + let jh = tokio::spawn(async move { + match to_exec { + TaskToExec::RpcGetPa => { + let rpc_client = RpcClient::new_with_commitment(RPC_URL.to_string(), CommitmentConfig::finalized()); + let res = rpc_client.get_program_accounts(&solana_sdk::stake::program::id()).await; + TaskResult::RpcGetPa(res) + }, + TaskToExec::RpcGetCurrentEpoch => { + let rpc_client = RpcClient::new_with_commitment(RPC_URL.to_string(), CommitmentConfig::finalized()); + let res = rpc_client.get_epoch_info().await; + TaskResult::CurrentEpoch(res) + } + } + }); + spawned_task_result.push(jh); + } + //Manage RPC call result execution + Some(some_res) = spawned_task_result.next() => { + match some_res { + Ok(TaskResult::RpcGetPa(Ok(pa_list))) => { + let new_store = std::mem::take(&mut stakestore); + let Ok((new_store, mut stake_map)) = new_store.extract() else { + //retry later, epoch schedule is currently processed + spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetPa)); + continue; + }; + stakestore = new_store; + //merge new PA with stake map in a specific thread + log::trace!("Run_loop before Program account stake merge"); + + let jh = tokio::task::spawn_blocking(|| { + crate::stakestore::merge_program_account_in_strake_map(&mut stake_map, pa_list); + TaskResult::MergePAList(stake_map) + }); + spawned_task_result.push(jh); + } + Ok(TaskResult::CurrentEpoch(Ok(epoch_info))) => { + log::trace!("Run_loop update new epoch:{epoch_info:?}"); + current_epoch = epoch_info; + } + Ok(TaskResult::MergePAList(stake_map)) => { + let new_store = std::mem::take(&mut stakestore); + let Ok(new_store) = new_store.merge_stake(stake_map) else { + //should never occurs because only getPA can merge and only one occurs at time. + // during PA no epoch schedule can be done. + log::warn!("merge stake on a non extract stake map"); + //restart the getPA. + spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetPa)); + continue; + }; + stakestore = new_store; + log::trace!("Run_loop end Program account stake merge"); + } + _ => log::warn!("RPC call return invalid result: {some_res:?}"), + } + + } + + //get confirmed slot or account + ret = confirmed_stream.next() => { + match ret { + Some(message) => { + //process the message + match message { + Ok(msg) => { + log::info!("new message: {msg:?}"); + match msg.update_oneof { + Some(UpdateOneof::Account(account)) => { + //store new account stake. + if let Some(account) = read_account(account, current_slot.confirmed_slot) { + if let Err(err) = stakestore.add_stake(account) { + log::warn!("Can't add new stake from account data err:{}", err); + continue; + } + } + } + Some(UpdateOneof::Slot(slot)) => { + //update current slot + log::info!("Processing slot: {:?}", slot); + current_slot.update_slot(&slot); + + if current_slot.confirmed_slot == current_epoch.slot_index + current_epoch.slots_in_epoch { + log::info!("End epoch slot. Calculate schedule."); + let new_store = std::mem::take(&mut stakestore); + let Ok((new_store, stake_map)) = new_store.extract() else { + log::info!("Epoch schedule aborted because a getPA is currently running."); + continue; + }; + stakestore = new_store; + + //calculate schedule in a dedicated thread. + let move_epoch = current_epoch.clone(); + let jh = tokio::task::spawn_blocking(move || { + let schedule = crate::leader_schedule::calculate_leader_schedule_from_stake_map(&stake_map, &move_epoch); + TaskResult::ScheduleResult(schedule) + }); + spawned_task_result.push(jh); + + //change epoch. Change manually then update using RPC. + current_epoch.epoch +=1; + current_epoch.slot_index += current_epoch.slots_in_epoch + 1; + spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetCurrentEpoch)); + + + } + } + bad_msg => { + log::info!("Geyser stream unexpected message received:{:?}",bad_msg); + } + } + } + Err(error) => { + log::error!("Geyser stream receive an error has message: {error:?}, try to reconnect and resynchronize."); + break; + } + } + } + None => { + log::warn!("The geyser stream close try to reconnect and resynchronize."); + break; //TODO reconnect. + } + } + } + } + } + + Ok(()) +} + +#[derive(Default)] +struct CurrentSlot { + processed_slot: u64, + confirmed_slot: u64, + finalized_slot: u64, +} + +impl CurrentSlot { + fn update_slot(&mut self, slot: &SubscribeUpdateSlot) { + match slot.status() { + CommitmentLevel::Processed => self.processed_slot = slot.slot, + CommitmentLevel::Confirmed => self.confirmed_slot = slot.slot, + CommitmentLevel::Finalized => self.finalized_slot = slot.slot, + } + } +} + +#[derive(Debug)] +#[allow(dead_code)] +pub struct AccountPretty { + is_startup: bool, + slot: u64, + pubkey: Pubkey, + lamports: u64, + owner: Pubkey, + executable: bool, + rent_epoch: u64, + data: Vec, + write_version: u64, + txn_signature: String, +} + +impl AccountPretty { + fn read_stake(&self) -> anyhow::Result> { + crate::stakestore::read_stake_from_account_data(self.data.as_slice()) + } +} + +// fn update_stakes(stakes: &mut StakeStore, stake: Delegation, current_epoch: u64) { +// // Ignore stake accounts activated in this epoch (or later, to include activation_epoch of +// // u64::MAX which indicates no activation ever happened) +// if stake.activation_epoch >= current_epoch { +// return; +// } +// // Ignore stake accounts deactivated before this epoch +// if stake.deactivation_epoch < current_epoch { +// return; +// } +// // Add the stake in this stake account to the total for the delegated-to vote account +// *(stakes.entry(stake.voter_pubkey.clone()).or_insert(0)) += stake.stake; +// } + +fn read_account( + geyser_account: SubscribeUpdateAccount, + current_slot: u64, +) -> Option { + let Some(inner_account) = geyser_account.account else { + log::warn!("Receive a SubscribeUpdateAccount without account."); + return None; + }; + + if geyser_account.slot != current_slot { + log::info!( + "Get geyser account on a different slot:{} of the current:{current_slot}", + geyser_account.slot + ); + } + + Some(AccountPretty { + is_startup: geyser_account.is_startup, + slot: geyser_account.slot, + pubkey: Pubkey::try_from(inner_account.pubkey).expect("valid pubkey"), + lamports: inner_account.lamports, + owner: Pubkey::try_from(inner_account.owner).expect("valid pubkey"), + executable: inner_account.executable, + rent_epoch: inner_account.rent_epoch, + data: inner_account.data, + write_version: inner_account.write_version, + txn_signature: bs58::encode(inner_account.txn_signature.unwrap_or_default()).into_string(), + }) +} + +#[derive(Debug)] +enum TaskToExec { + RpcGetPa, + RpcGetCurrentEpoch, +} + +#[derive(Debug)] +enum TaskResult { + RpcGetPa(Result, ClientError>), + CurrentEpoch(Result), + MergePAList(crate::stakestore::StakeMap), + ScheduleResult(LeaderSchedule), +} diff --git a/stake_aggregate/src/stakestore.rs b/stake_aggregate/src/stakestore.rs new file mode 100644 index 0000000..811474d --- /dev/null +++ b/stake_aggregate/src/stakestore.rs @@ -0,0 +1,140 @@ +use crate::AccountPretty; +use crate::Slot; +use crate::StakeState; +use anyhow::bail; +use borsh::BorshDeserialize; +use solana_sdk::account::Account; +use solana_sdk::stake::state::Delegation; +use std::collections::HashMap; + +use crate::Pubkey; + +pub type StakeMap = HashMap; + +fn stake_map_insert_stake(map: &mut StakeMap, stake_account: Pubkey, stake: StoredStake) { + match map.entry(stake_account) { + // If value already exists, then increment it by one + std::collections::hash_map::Entry::Occupied(occupied) => { + let strstake = occupied.into_mut(); // <-- get mut reference to existing value + if strstake.last_update_slot < stake.last_update_slot { + *strstake = stake; + } + } + // If value doesn't exist yet, then insert a new value of 1 + std::collections::hash_map::Entry::Vacant(vacant) => { + vacant.insert(stake); + } + }; +} + +#[derive(Debug, Default)] +pub struct StoredStake { + pub stake: Delegation, + pub last_update_slot: Slot, + pub write_version: u64, +} + +#[derive(Debug, Default)] +pub struct StakeStore { + stakes: StakeMap, + updates: Vec<(Pubkey, StoredStake)>, + extracted: bool, +} + +impl StakeStore { + pub fn new(capacity: usize) -> Self { + StakeStore { + stakes: HashMap::with_capacity(capacity), + updates: vec![], + extracted: false, + } + } + + //return the contained stake map to do an external update. + // During extract period (between extract and merge) added stake a stored to be processed later. + //if the store is already extracted return an error. + pub fn extract(self) -> anyhow::Result<(Self, StakeMap)> { + if self.extracted { + bail!("StakeStore already extracted. Try later"); + } + let stakestore = StakeStore { + stakes: HashMap::new(), + updates: self.updates, + extracted: true, + }; + Ok((stakestore, self.stakes)) + } + + pub fn merge_stake(self, stakes: StakeMap) -> anyhow::Result { + if !self.extracted { + bail!("StakeStore merge of non extracted map. Try later"); + } + let mut stakestore = StakeStore { + stakes, + updates: vec![], + extracted: false, + }; + + //apply stake added during extraction. + for (stake_account, stake) in self.updates { + stakestore.insert_stake(stake_account, stake); + } + Ok(stakestore) + } + + pub fn add_stake(&mut self, new_account: AccountPretty) -> anyhow::Result<()> { + let Ok(Some(delegated_stake)) = new_account.read_stake() else { + bail!("Can't read stake from account data"); + }; + + let ststake = StoredStake { + stake: delegated_stake, + last_update_slot: new_account.slot, + write_version: new_account.write_version, + }; + match self.extracted { + true => self.updates.push((new_account.pubkey, ststake)), + false => self.insert_stake(new_account.pubkey, ststake), + } + Ok(()) + } + + fn insert_stake(&mut self, stake_account: Pubkey, stake: StoredStake) { + stake_map_insert_stake(&mut self.stakes, stake_account, stake); + } +} + +pub fn merge_program_account_in_strake_map( + stake_map: &mut StakeMap, + pa_list: Vec<(Pubkey, Account)>, +) { + pa_list + .into_iter() + .filter_map( + |(pk, account)| match read_stake_from_account_data(&account.data) { + Ok(opt_stake) => opt_stake.map(|stake| (pk, stake)), + Err(err) => { + log::warn!("Error during pa account data deserialisation:{err}"); + None + } + }, + ) + .for_each(|(pk, delegated_stake)| { + let stake = StoredStake { + stake: delegated_stake, + last_update_slot: 0, + write_version: 0, + }; + stake_map_insert_stake(stake_map, pk, stake); + }); +} + +pub fn read_stake_from_account_data(mut data: &[u8]) -> anyhow::Result> { + if data.is_empty() { + bail!("Error: read strake of PA account with empty data"); + } + match StakeState::deserialize(&mut data)? { + StakeState::Stake(_, stake) => Ok(Some(stake.delegation)), + _ => Ok(None), + } +}