Merge pull request #4 from blockworks-foundation/aggregate_stake

Aggregate stake
This commit is contained in:
Philippe Delrieu 2023-11-17 17:37:04 +01:00 committed by GitHub
commit 66ea7c949f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 213 additions and 43 deletions

View File

@ -23,6 +23,10 @@ path = "bin/stakehistory.rs"
name = "sysvaraccount" name = "sysvaraccount"
path = "bin/sysvaraccount.rs" path = "bin/sysvaraccount.rs"
[[bin]]
name = "send_get_vote_account"
path = "bin/send_get_vote_account.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View File

@ -98,7 +98,7 @@ async fn main() -> anyhow::Result<()> {
}; };
let leader_schedule = calculate_leader_schedule(validator_stakes, &epoch).unwrap(); let leader_schedule = calculate_leader_schedule(validator_stakes, &epoch).unwrap();
verify_schedule(leader_schedule).unwrap(); verify_schedule(leader_schedule, &aggregate_stake_file_path).unwrap();
Ok(()) Ok(())
} }
@ -128,7 +128,7 @@ fn calculate_leader_schedule(
)) ))
} }
fn verify_schedule(schedule: LeaderSchedule) -> anyhow::Result<()> { fn verify_schedule(schedule: LeaderSchedule, file_name: &str) -> anyhow::Result<()> {
log::info!("verify_schedule Start."); log::info!("verify_schedule Start.");
let rpc_client = RpcClient::new_with_timeout_and_commitment( let rpc_client = RpcClient::new_with_timeout_and_commitment(
RPC_URL.to_string(), RPC_URL.to_string(),
@ -154,6 +154,16 @@ fn verify_schedule(schedule: LeaderSchedule) -> anyhow::Result<()> {
//map leaderscheudle to HashMap<PubKey, Vec<slot>> //map leaderscheudle to HashMap<PubKey, Vec<slot>>
let slot_leaders = schedule.get_slot_leaders(); let slot_leaders = schedule.get_slot_leaders();
//save_slot_leaders
let string_slot_leader: Vec<String> = slot_leaders.iter().map(|pk| pk.to_string()).collect();
let serialized_schedule = serde_json::to_string(&string_slot_leader).unwrap();
let filename = format!("{file_name}.schedule.json",);
// Write to the file
let mut file = File::create(filename).unwrap();
file.write_all(serialized_schedule.as_bytes()).unwrap();
file.flush().unwrap();
log::info!("aggregate_leader_schedule len:{}", slot_leaders.len()); log::info!("aggregate_leader_schedule len:{}", slot_leaders.len());
let mut input_leader_schedule: HashMap<String, Vec<usize>> = HashMap::new(); let mut input_leader_schedule: HashMap<String, Vec<usize>> = HashMap::new();
for (slot, pubkey) in slot_leaders.iter().copied().enumerate() { for (slot, pubkey) in slot_leaders.iter().copied().enumerate() {

View File

@ -0,0 +1,45 @@
use futures_util::future::join_all;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
const LITE_RPC_URL: &str = "http://127.0.0.1:8890";
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
log::info!("Start send get vote account");
let mut handles = vec![];
for _ in 0..10 {
let h = tokio::task::spawn(async move {
call_get_vote_account().await;
});
handles.push(h);
}
join_all(handles).await;
//std::thread::sleep(std::time::Duration::from_millis(1000000));
Ok(())
}
async fn call_get_vote_account() {
let rpc_client =
RpcClient::new_with_commitment(LITE_RPC_URL.to_string(), CommitmentConfig::confirmed());
loop {
match tokio::time::timeout(
std::time::Duration::from_millis(1000),
rpc_client.get_vote_accounts(),
)
.await
{
Ok(Ok(accounts)) => {
if accounts.current.len() == 0 {
log::info!("get vote account len:0",);
}
}
Err(_) => panic!("get_vote_account timeout"),
Ok(Err(err)) => panic!("get_vote_account rpc error:{err}"),
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
}

View File

@ -46,14 +46,14 @@ fn stake_map_notify_stake(map: &mut StakeMap, stake: StoredStake) {
//doesn't erase new state with an old one. Can arrive during bootstrapping. //doesn't erase new state with an old one. Can arrive during bootstrapping.
//several instructions can be done in the same slot. //several instructions can be done in the same slot.
if strstake.last_update_slot <= stake.last_update_slot { if strstake.last_update_slot <= stake.last_update_slot {
log::trace!("stake_map_notify_stake Stake store updated stake: {} old_stake:{strstake:?} stake:{stake:?}", stake.pubkey); log::info!("stake_map_notify_stake updated stake: {} old_stake:{strstake:?} stake:{stake:?}", stake.pubkey);
*strstake = stake; *strstake = stake;
} }
} }
// If value doesn't exist yet, then insert a new value of 1 // If value doesn't exist yet, then insert a new value of 1
std::collections::hash_map::Entry::Vacant(vacant) => { std::collections::hash_map::Entry::Vacant(vacant) => {
log::trace!( log::info!(
"stake_map_notify_stake Stake store insert stake: {} stake:{stake:?}", "stake_map_notify_stake insert stake: {} stake:{stake:?}",
stake.pubkey stake.pubkey
); );
vacant.insert(stake); vacant.insert(stake);
@ -347,16 +347,24 @@ pub async fn start_stake_verification_loop(
} }
async fn send_verification( async fn send_verification(
stake_sender: &mut Sender<(String, Pubkey, Option<StoredStake>)>, _stake_sender: &mut Sender<(String, Pubkey, Option<StoredStake>)>,
stakestore: &mut StakeStore, _stakestore: &mut StakeStore,
instr: &str, _instr: &str,
stake_pybkey: Pubkey, _stake_pybkey: Pubkey,
) { ) {
let current_stake = stakestore.stakes.get(&stake_pybkey).cloned(); // let current_stake = stakestore.stakes.get(&stake_pybkey).cloned();
stake_sender // stake_sender
.send((instr.to_string(), stake_pybkey, current_stake)) // .send((instr.to_string(), stake_pybkey, current_stake))
.await // .await
.unwrap(); // .unwrap();
}
fn verify_account_len(account_keys: &[Pubkey], instr_accounts: &[u8], indexes: Vec<usize>) -> bool {
!indexes
.into_iter()
.filter(|index| (instr_accounts[*index] as usize) >= account_keys.len())
.next()
.is_some()
} }
pub async fn process_stake_tx_message( pub async fn process_stake_tx_message(
@ -366,8 +374,8 @@ pub async fn process_stake_tx_message(
instruction: CompiledInstruction, instruction: CompiledInstruction,
//for debug and trace purpose. //for debug and trace purpose.
program_id_index: u32, program_id_index: u32,
tx_slot: Slot, _tx_slot: Slot,
current_end_epoch_slot: u64, _current_end_epoch_slot: u64,
) { ) {
//for tracing purpose //for tracing purpose
let account_keys: Vec<Pubkey> = account_keys_vec let account_keys: Vec<Pubkey> = account_keys_vec
@ -381,20 +389,21 @@ pub async fn process_stake_tx_message(
.collect(); .collect();
//for debug get stake account index //for debug get stake account index
log::info!( log::trace!(
"Found tx with stake account accounts:{:?} stake account index:{program_id_index:?}", "Found tx with stake account accounts:{:?} stake account index:{program_id_index:?}",
account_keys, account_keys,
); );
//merge and delegate has 1 instruction. Create as 2 instructions and the first is not a StakeInstruction. //merge and delegate has 1 instruction. Create as 2 instructions and the first is not a StakeInstruction.
log::info!( log::trace!(
"Before read instruction of program_id_index:{}", "Before read instruction of program_id_index:{}",
instruction.program_id_index instruction.program_id_index
); );
let Ok(stake_inst) = bincode::deserialize::<StakeInstruction>(&instruction.data) else { let Ok(stake_inst) = bincode::deserialize::<StakeInstruction>(&instruction.data) else {
log::info!( log::warn!(
"Error during stake instruction decoding :{:?}", "Error during stake instruction decoding :{:?} stake_account:{:?}",
&instruction.data &instruction.data,
account_keys,
); );
return; return;
}; };
@ -410,10 +419,14 @@ pub async fn process_stake_tx_message(
"custodian": lockup.custodian.to_string(), "custodian": lockup.custodian.to_string(),
}); });
log::info!("StakeInstruction::Initialize authorized:{authorized} lockup:{lockup} stakeAccount:{} rentSysvar:{}" if verify_account_len(&account_keys, &instruction.accounts, vec![0, 1]) {
log::info!("StakeInstruction::Initialize authorized:{authorized} lockup:{lockup} stakeAccount:{} rentSysvar:{}"
, account_keys[instruction.accounts[0] as usize].to_string() , account_keys[instruction.accounts[0] as usize].to_string()
, account_keys[instruction.accounts[1] as usize].to_string() , account_keys[instruction.accounts[1] as usize].to_string()
); );
} else {
log::warn!("StakeInstruction::Initialize authorized:{authorized} lockup:{lockup} Index error in instruction:{:?}", instruction.accounts);
}
send_verification( send_verification(
stake_sender, stake_sender,
@ -424,6 +437,13 @@ pub async fn process_stake_tx_message(
.await; .await;
} }
StakeInstruction::Authorize(new_authorized, authority_type) => { StakeInstruction::Authorize(new_authorized, authority_type) => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) {
log::warn!(
"StakeInstruction::Authorize authorized Index error in instruction:{:?}",
instruction.accounts
);
return;
}
let value = json!({ let value = json!({
"stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(), "stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(),
"clockSysvar": account_keys[instruction.accounts[1] as usize].to_string(), "clockSysvar": account_keys[instruction.accounts[1] as usize].to_string(),
@ -448,6 +468,13 @@ pub async fn process_stake_tx_message(
.await; .await;
} }
StakeInstruction::DelegateStake => { StakeInstruction::DelegateStake => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4, 5]) {
log::warn!(
"StakeInstruction::DelegateStake authorized Index error in instruction:{:?}",
instruction.accounts
);
return;
}
let info = json!({ let info = json!({
"stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(), "stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(),
"voteAccount": account_keys[instruction.accounts[1] as usize].to_string(), "voteAccount": account_keys[instruction.accounts[1] as usize].to_string(),
@ -467,6 +494,13 @@ pub async fn process_stake_tx_message(
.await; .await;
} }
StakeInstruction::Split(lamports) => { StakeInstruction::Split(lamports) => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) {
log::warn!(
"StakeInstruction::Split authorized Index error in instruction:{:?}",
instruction.accounts
);
return;
}
let info = json!({ let info = json!({
"stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(), "stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(),
"newSplitAccount": account_keys[instruction.accounts[1] as usize].to_string(), "newSplitAccount": account_keys[instruction.accounts[1] as usize].to_string(),
@ -484,6 +518,13 @@ pub async fn process_stake_tx_message(
.await; .await;
} }
StakeInstruction::Withdraw(lamports) => { StakeInstruction::Withdraw(lamports) => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) {
log::warn!(
"StakeInstruction::Withdraw authorized Index error in instruction:{:?}",
instruction.accounts
);
return;
}
let info = json!({ let info = json!({
"stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(), "stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(),
"destination": account_keys[instruction.accounts[1] as usize].to_string(), "destination": account_keys[instruction.accounts[1] as usize].to_string(),
@ -505,6 +546,13 @@ pub async fn process_stake_tx_message(
.await; .await;
} }
StakeInstruction::Deactivate => { StakeInstruction::Deactivate => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) {
log::warn!(
"StakeInstruction::Deactivate authorized Index error in instruction:{:?}",
instruction.accounts
);
return;
}
let info = json!({ let info = json!({
"stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(), "stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(),
"clockSysvar": account_keys[instruction.accounts[1] as usize].to_string(), "clockSysvar": account_keys[instruction.accounts[1] as usize].to_string(),
@ -513,6 +561,13 @@ pub async fn process_stake_tx_message(
log::info!("StakeInstruction::Deactivate infos:{info}"); log::info!("StakeInstruction::Deactivate infos:{info}");
} }
StakeInstruction::SetLockup(lockup_args) => { StakeInstruction::SetLockup(lockup_args) => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1]) {
log::warn!(
"StakeInstruction::SetLockup authorized Index error in instruction:{:?}",
instruction.accounts
);
return;
}
let unix_timestamp = lockup_args let unix_timestamp = lockup_args
.unix_timestamp .unix_timestamp
.map(|timestamp| json!(timestamp)) .map(|timestamp| json!(timestamp))
@ -540,6 +595,13 @@ pub async fn process_stake_tx_message(
.await; .await;
} }
StakeInstruction::AuthorizeWithSeed(args) => { StakeInstruction::AuthorizeWithSeed(args) => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1]) {
log::warn!(
"StakeInstruction::AuthorizeWithSeed authorized Index error in instruction:{:?}",
instruction.accounts
);
return;
}
let info = json!({ let info = json!({
"stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(), "stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(),
"authorityBase": account_keys[instruction.accounts[1] as usize].to_string(), "authorityBase": account_keys[instruction.accounts[1] as usize].to_string(),
@ -563,6 +625,13 @@ pub async fn process_stake_tx_message(
.await; .await;
} }
StakeInstruction::InitializeChecked => { StakeInstruction::InitializeChecked => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) {
log::warn!(
"StakeInstruction::InitializeChecked authorized Index error in instruction:{:?}",
instruction.accounts
);
return;
}
let info = json!({ let info = json!({
"stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(), "stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(),
"rentSysvar": account_keys[instruction.accounts[1] as usize].to_string(), "rentSysvar": account_keys[instruction.accounts[1] as usize].to_string(),
@ -580,6 +649,13 @@ pub async fn process_stake_tx_message(
.await; .await;
} }
StakeInstruction::AuthorizeChecked(authority_type) => { StakeInstruction::AuthorizeChecked(authority_type) => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) {
log::warn!(
"StakeInstruction::AuthorizeChecked authorized Index error in instruction:{:?}",
instruction.accounts
);
return;
}
let info = json!({ let info = json!({
"stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(), "stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(),
"clockSysvar": account_keys[instruction.accounts[1] as usize].to_string(), "clockSysvar": account_keys[instruction.accounts[1] as usize].to_string(),
@ -600,6 +676,13 @@ pub async fn process_stake_tx_message(
.await; .await;
} }
StakeInstruction::AuthorizeCheckedWithSeed(args) => { StakeInstruction::AuthorizeCheckedWithSeed(args) => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) {
log::warn!(
"StakeInstruction::AuthorizeCheckedWithSeed authorized Index error in instruction:{:?}",
instruction.accounts
);
return;
}
let info = json!({ let info = json!({
"stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(), "stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(),
"authorityBase": account_keys[instruction.accounts[1] as usize].to_string(), "authorityBase": account_keys[instruction.accounts[1] as usize].to_string(),
@ -624,6 +707,13 @@ pub async fn process_stake_tx_message(
.await; .await;
} }
StakeInstruction::SetLockupChecked(lockup_args) => { StakeInstruction::SetLockupChecked(lockup_args) => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) {
log::warn!(
"StakeInstruction::SetLockupChecked authorized Index error in instruction:{:?}",
instruction.accounts
);
return;
}
let unix_timestamp = lockup_args let unix_timestamp = lockup_args
.unix_timestamp .unix_timestamp
.map(|timestamp| json!(timestamp)) .map(|timestamp| json!(timestamp))
@ -652,6 +742,13 @@ pub async fn process_stake_tx_message(
log::info!("StakeInstruction::GetMinimumDelegation"); log::info!("StakeInstruction::GetMinimumDelegation");
} }
StakeInstruction::DeactivateDelinquent => { StakeInstruction::DeactivateDelinquent => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) {
log::warn!(
"StakeInstruction::DeactivateDelinquent authorized Index error in instruction:{:?}",
instruction.accounts
);
return;
}
let info = json!({ let info = json!({
"stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(), "stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(),
"voteAccount": account_keys[instruction.accounts[1] as usize].to_string(), "voteAccount": account_keys[instruction.accounts[1] as usize].to_string(),
@ -668,6 +765,13 @@ pub async fn process_stake_tx_message(
.await; .await;
} }
StakeInstruction::Redelegate => { StakeInstruction::Redelegate => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) {
log::warn!(
"StakeInstruction::Redelegate authorized Index error in instruction:{:?}",
instruction.accounts
);
return;
}
let info = json!({ let info = json!({
"stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(), "stakeAccount": account_keys[instruction.accounts[0] as usize].to_string(),
"newStakeAccount": account_keys[instruction.accounts[1] as usize].to_string(), "newStakeAccount": account_keys[instruction.accounts[1] as usize].to_string(),
@ -686,6 +790,13 @@ pub async fn process_stake_tx_message(
.await; .await;
} }
StakeInstruction::Merge => { StakeInstruction::Merge => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) {
log::warn!(
"StakeInstruction::Merge authorized Index error in instruction:{:?}",
instruction.accounts
);
return;
}
let info = json!({ let info = json!({
"destination": account_keys[instruction.accounts[0] as usize].to_string(), "destination": account_keys[instruction.accounts[0] as usize].to_string(),
"source": account_keys[instruction.accounts[1] as usize].to_string(), "source": account_keys[instruction.accounts[1] as usize].to_string(),
@ -694,28 +805,28 @@ pub async fn process_stake_tx_message(
"stakeAuthority": account_keys[instruction.accounts[4] as usize].to_string(), "stakeAuthority": account_keys[instruction.accounts[4] as usize].to_string(),
}); });
log::info!("StakeInstruction::Merge infos:{info}"); log::info!("StakeInstruction::Merge infos:{info}");
let source_account = &account_keys_vec[instruction.accounts[1] as usize]; // let source_account = &account_keys_vec[instruction.accounts[1] as usize];
let source_bytes: [u8; 32] = source_account[..solana_sdk::pubkey::PUBKEY_BYTES] // let source_bytes: [u8; 32] = source_account[..solana_sdk::pubkey::PUBKEY_BYTES]
.try_into() // .try_into()
.unwrap(); // .unwrap();
let source_pubkey = Pubkey::new_from_array(source_bytes); //let source_pubkey = Pubkey::new_from_array(source_bytes);
log::info!( // log::info!(
"DETECT MERGE for source account:{}", // "DETECT MERGE for source account:{}",
source_pubkey.to_string() // source_pubkey.to_string()
); // );
stakestore.notify_stake_action( // stakestore.notify_stake_action(
ExtractedAction::Remove(source_pubkey, tx_slot), // ExtractedAction::Remove(source_pubkey, tx_slot),
current_end_epoch_slot, // current_end_epoch_slot,
); // );
stakestore.notify_stake_action( // stakestore.notify_stake_action(
ExtractedAction::Merge { // ExtractedAction::Merge {
source_account: account_keys[instruction.accounts[1] as usize], // source_account: account_keys[instruction.accounts[1] as usize],
destination_account: account_keys[instruction.accounts[0] as usize], // destination_account: account_keys[instruction.accounts[0] as usize],
update_slot: tx_slot, // update_slot: tx_slot,
}, // },
current_end_epoch_slot, // current_end_epoch_slot,
); // );
// send_verification( // send_verification(
// stake_sender, // stake_sender,