Compare commits

...

3 Commits

Author SHA1 Message Date
musitdev 2fff2b0824 verify block when a block is received 2023-11-21 17:22:16 +01:00
musitdev f882194049 add some logs 2023-11-21 17:03:19 +01:00
musitdev cd67496690 add block and slot confirm verification 2023-11-21 16:39:15 +01:00
3 changed files with 55 additions and 186 deletions

View File

@ -17,7 +17,8 @@ use std::time::Duration;
//const STAKE_FILE: &str = "epoch528_leader_schedule_stakes.txt"; //const STAKE_FILE: &str = "epoch528_leader_schedule_stakes.txt";
//const RPC_URL: &str = "http://localhost:8899"; //const RPC_URL: &str = "http://localhost:8899";
const RPC_URL: &str = "https://api.testnet.solana.com"; //const RPC_URL: &str = "https://api.testnet.solana.com";
const RPC_URL: &str = "https://api.mainnet-beta.solana.com";
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {

View File

@ -55,6 +55,7 @@ use yellowstone_grpc_proto::geyser::SubscribeUpdateAccount;
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterAccounts; use yellowstone_grpc_proto::prelude::SubscribeRequestFilterAccounts;
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocks; use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocks;
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta; use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta;
use yellowstone_grpc_proto::prelude::SubscribeUpdateBlock;
use yellowstone_grpc_proto::{ use yellowstone_grpc_proto::{
prelude::{subscribe_update::UpdateOneof, SubscribeRequestFilterSlots}, prelude::{subscribe_update::UpdateOneof, SubscribeRequestFilterSlots},
tonic::service::Interceptor, tonic::service::Interceptor,
@ -254,11 +255,13 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
//For DEBUG TODO remove: //For DEBUG TODO remove:
//start stake verification loop //start stake verification loop
let mut stake_verification_sender = // let mut stake_verification_sender =
crate::stakestore::start_stake_verification_loop(RPC_URL.to_string()).await; // crate::stakestore::start_stake_verification_loop(RPC_URL.to_string()).await;
//TODO remove. Store parent hash to see if we don't miss a block. //use to process block at confirm slot.
let mut parent_block_slot = None; //at confirm slot are send before the block.
//Verify that the last confirmed slot receive is the block slot.
let mut last_confirmed_slot = 0;
loop { loop {
tokio::select! { tokio::select! {
Some(req) = request_rx.recv() => { Some(req) = request_rx.recv() => {
@ -409,6 +412,11 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
} }
} }
if let CommitmentLevel::Confirmed = slot.status() {
log::trace!("Receive confirmed slot:{}", slot.slot);
last_confirmed_slot = slot.slot;
}
} }
Some(UpdateOneof::BlockMeta(block_meta)) => { Some(UpdateOneof::BlockMeta(block_meta)) => {
log::info!("Receive Block Meta at slot: {}", block_meta.slot); log::info!("Receive Block Meta at slot: {}", block_meta.slot);
@ -420,50 +428,11 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
block.parent_slot, block.parent_slot,
); );
//TODO remove; Detect missing block let slot = block.slot;
if let Some(parent_block_slot) = parent_block_slot { if last_confirmed_slot != 0 && last_confirmed_slot != slot {
if parent_block_slot != block.parent_slot { log::error!("No block found for slot:{last_confirmed_slot}. Get bloc for slot:{slot}");
log::error!("Bad parent slot stored:{} block:{}, miss a block"
,parent_block_slot,block.parent_slot
);
}
}
parent_block_slot = Some(block.slot);
//parse to detect stake merge tx.
//first in the main thread then in a specific thread.
let stake_public_key: Vec<u8> = solana_sdk::stake::program::id().to_bytes().to_vec();
for notif_tx in block.transactions {
if !notif_tx.is_vote {
if let Some(message) = notif_tx.transaction.and_then(|tx| tx.message) {
for instruction in message.instructions {
//filter stake tx
if message.account_keys[instruction.program_id_index as usize] == stake_public_key {
let source_bytes: [u8; 64] = notif_tx.signature[..solana_sdk::signature::SIGNATURE_BYTES]
.try_into()
.unwrap();
log::info!("New stake Tx sign:{} at block slot:{:?} current_slot:{} accounts:{:?}"
, solana_sdk::signature::Signature::from(source_bytes).to_string()
, block.slot
, current_epoch_state.current_slot.confirmed_slot
, instruction.accounts
);
let program_index = instruction.program_id_index;
crate::stakestore::process_stake_tx_message(
&mut stake_verification_sender,
&mut stakestore
, &message.account_keys
, instruction
, program_index
, block.slot
, current_epoch_state.current_epoch_end_slot(),
).await;
}
}
}
}
} }
process_block(block, slot);
} }
Some(UpdateOneof::Ping(_)) => log::trace!("UpdateOneof::Ping"), Some(UpdateOneof::Ping(_)) => log::trace!("UpdateOneof::Ping"),
@ -509,6 +478,42 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
Ok(()) Ok(())
} }
fn process_block(block: SubscribeUpdateBlock, confirmed_slot: Slot) {
//parse to detect stake merge tx.
//first in the main thread then in a specific thread.
let stake_public_key: Vec<u8> = solana_sdk::stake::program::id().to_bytes().to_vec();
for notif_tx in block.transactions {
if !notif_tx.is_vote {
if let Some(message) = notif_tx.transaction.and_then(|tx| tx.message) {
for instruction in message.instructions {
//filter stake tx
if message.account_keys[instruction.program_id_index as usize]
== stake_public_key
{
let source_bytes: [u8; 64] = notif_tx.signature
[..solana_sdk::signature::SIGNATURE_BYTES]
.try_into()
.unwrap();
log::info!(
"New stake Tx sign:{} at block slot:{:?} current_slot:{} accounts:{:?}",
solana_sdk::signature::Signature::from(source_bytes).to_string(),
block.slot,
confirmed_slot,
instruction.accounts
);
let program_index = instruction.program_id_index;
crate::stakestore::process_stake_tx_message(
&message.account_keys,
instruction,
program_index,
);
}
}
}
}
}
}
#[derive(Debug)] #[derive(Debug)]
#[allow(dead_code)] #[allow(dead_code)]
pub struct AccountPretty { pub struct AccountPretty {

View File

@ -346,19 +346,6 @@ pub async fn start_stake_verification_loop(
request_tx request_tx
} }
async fn send_verification(
_stake_sender: &mut Sender<(String, Pubkey, Option<StoredStake>)>,
_stakestore: &mut StakeStore,
_instr: &str,
_stake_pybkey: Pubkey,
) {
// let current_stake = stakestore.stakes.get(&stake_pybkey).cloned();
// stake_sender
// .send((instr.to_string(), stake_pybkey, current_stake))
// .await
// .unwrap();
}
fn verify_account_len(account_keys: &[Pubkey], instr_accounts: &[u8], indexes: Vec<usize>) -> bool { fn verify_account_len(account_keys: &[Pubkey], instr_accounts: &[u8], indexes: Vec<usize>) -> bool {
!indexes !indexes
.into_iter() .into_iter()
@ -367,15 +354,11 @@ fn verify_account_len(account_keys: &[Pubkey], instr_accounts: &[u8], indexes: V
.is_some() .is_some()
} }
pub async fn process_stake_tx_message( pub fn process_stake_tx_message(
stake_sender: &mut Sender<(String, Pubkey, Option<StoredStake>)>,
stakestore: &mut StakeStore,
account_keys_vec: &[Vec<u8>], account_keys_vec: &[Vec<u8>],
instruction: CompiledInstruction, instruction: CompiledInstruction,
//for debug and trace purpose. //for debug and trace purpose.
program_id_index: u32, program_id_index: u32,
_tx_slot: Slot,
_current_end_epoch_slot: u64,
) { ) {
//for tracing purpose //for tracing purpose
let account_keys: Vec<Pubkey> = account_keys_vec let account_keys: Vec<Pubkey> = account_keys_vec
@ -427,14 +410,6 @@ pub async fn process_stake_tx_message(
} else { } else {
log::warn!("StakeInstruction::Initialize authorized:{authorized} lockup:{lockup} Index error in instruction:{:?}", instruction.accounts); log::warn!("StakeInstruction::Initialize authorized:{authorized} lockup:{lockup} Index error in instruction:{:?}", instruction.accounts);
} }
send_verification(
stake_sender,
stakestore,
"Initialize",
account_keys[instruction.accounts[0] as usize],
)
.await;
} }
StakeInstruction::Authorize(new_authorized, authority_type) => { StakeInstruction::Authorize(new_authorized, authority_type) => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) {
@ -458,14 +433,6 @@ pub async fn process_stake_tx_message(
) )
}); });
log::info!("StakeInstruction::Authorize value:{value} custodian:{custodian:?}"); log::info!("StakeInstruction::Authorize value:{value} custodian:{custodian:?}");
send_verification(
stake_sender,
stakestore,
"Authorize",
account_keys[instruction.accounts[0] as usize],
)
.await;
} }
StakeInstruction::DelegateStake => { StakeInstruction::DelegateStake => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4, 5]) { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4, 5]) {
@ -484,14 +451,6 @@ pub async fn process_stake_tx_message(
"stakeAuthority": account_keys[instruction.accounts[5] as usize].to_string(), "stakeAuthority": account_keys[instruction.accounts[5] as usize].to_string(),
}); });
log::info!("StakeInstruction::DelegateStake infos:{info}"); log::info!("StakeInstruction::DelegateStake infos:{info}");
send_verification(
stake_sender,
stakestore,
"DelegateStake",
account_keys[instruction.accounts[0] as usize],
)
.await;
} }
StakeInstruction::Split(lamports) => { StakeInstruction::Split(lamports) => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) {
@ -508,14 +467,6 @@ pub async fn process_stake_tx_message(
"lamports": lamports, "lamports": lamports,
}); });
log::info!("StakeInstruction::Split infos:{info}"); log::info!("StakeInstruction::Split infos:{info}");
send_verification(
stake_sender,
stakestore,
"Split",
account_keys[instruction.accounts[0] as usize],
)
.await;
} }
StakeInstruction::Withdraw(lamports) => { StakeInstruction::Withdraw(lamports) => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) {
@ -536,14 +487,6 @@ pub async fn process_stake_tx_message(
let custodian = (instruction.accounts.len() >= 6) let custodian = (instruction.accounts.len() >= 6)
.then(|| json!(account_keys[instruction.accounts[5] as usize].to_string())); .then(|| json!(account_keys[instruction.accounts[5] as usize].to_string()));
log::info!("StakeInstruction::Withdraw custodian:{custodian:?}infos:{info}"); log::info!("StakeInstruction::Withdraw custodian:{custodian:?}infos:{info}");
send_verification(
stake_sender,
stakestore,
"Withdraw",
account_keys[instruction.accounts[0] as usize],
)
.await;
} }
StakeInstruction::Deactivate => { StakeInstruction::Deactivate => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) {
@ -585,14 +528,6 @@ pub async fn process_stake_tx_message(
"custodian": account_keys[instruction.accounts[1] as usize].to_string(), "custodian": account_keys[instruction.accounts[1] as usize].to_string(),
}); });
log::info!("StakeInstruction::SetLockup unixTimestamp:{unix_timestamp} epoch:{epoch} custodian:{custodian} infos:{info}"); log::info!("StakeInstruction::SetLockup unixTimestamp:{unix_timestamp} epoch:{epoch} custodian:{custodian} infos:{info}");
send_verification(
stake_sender,
stakestore,
"SetLockup",
account_keys[instruction.accounts[0] as usize],
)
.await;
} }
StakeInstruction::AuthorizeWithSeed(args) => { StakeInstruction::AuthorizeWithSeed(args) => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1]) { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1]) {
@ -615,14 +550,6 @@ pub async fn process_stake_tx_message(
let custodian = (instruction.accounts.len() >= 4) let custodian = (instruction.accounts.len() >= 4)
.then(|| json!(account_keys[instruction.accounts[3] as usize].to_string())); .then(|| json!(account_keys[instruction.accounts[3] as usize].to_string()));
log::info!("StakeInstruction::AuthorizeWithSeed clockSysvar:{clock_sysvar:?} custodian:{custodian:?} infos:{info}"); log::info!("StakeInstruction::AuthorizeWithSeed clockSysvar:{clock_sysvar:?} custodian:{custodian:?} infos:{info}");
send_verification(
stake_sender,
stakestore,
"AuthorizeWithSeed",
account_keys[instruction.accounts[0] as usize],
)
.await;
} }
StakeInstruction::InitializeChecked => { StakeInstruction::InitializeChecked => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) {
@ -639,14 +566,6 @@ pub async fn process_stake_tx_message(
"withdrawer": account_keys[instruction.accounts[3] as usize].to_string(), "withdrawer": account_keys[instruction.accounts[3] as usize].to_string(),
}); });
log::info!("StakeInstruction::InitializeChecked infos:{info}"); log::info!("StakeInstruction::InitializeChecked infos:{info}");
send_verification(
stake_sender,
stakestore,
"InitializeChecked",
account_keys[instruction.accounts[0] as usize],
)
.await;
} }
StakeInstruction::AuthorizeChecked(authority_type) => { StakeInstruction::AuthorizeChecked(authority_type) => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) {
@ -666,14 +585,6 @@ pub async fn process_stake_tx_message(
let custodian = (instruction.accounts.len() >= 5) let custodian = (instruction.accounts.len() >= 5)
.then(|| json!(account_keys[instruction.accounts[4] as usize].to_string())); .then(|| json!(account_keys[instruction.accounts[4] as usize].to_string()));
log::info!("StakeInstruction::AuthorizeChecked custodian:{custodian:?} infos:{info}"); log::info!("StakeInstruction::AuthorizeChecked custodian:{custodian:?} infos:{info}");
send_verification(
stake_sender,
stakestore,
"AuthorizeChecked",
account_keys[instruction.accounts[0] as usize],
)
.await;
} }
StakeInstruction::AuthorizeCheckedWithSeed(args) => { StakeInstruction::AuthorizeCheckedWithSeed(args) => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) {
@ -697,14 +608,6 @@ pub async fn process_stake_tx_message(
log::info!( log::info!(
"StakeInstruction::AuthorizeCheckedWithSeed custodian:{custodian:?} infos:{info}" "StakeInstruction::AuthorizeCheckedWithSeed custodian:{custodian:?} infos:{info}"
); );
send_verification(
stake_sender,
stakestore,
"AuthorizeCheckedWithSeed",
account_keys[instruction.accounts[0] as usize],
)
.await;
} }
StakeInstruction::SetLockupChecked(lockup_args) => { StakeInstruction::SetLockupChecked(lockup_args) => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) {
@ -729,14 +632,6 @@ pub async fn process_stake_tx_message(
"custodian": account_keys[instruction.accounts[1] as usize].to_string(), "custodian": account_keys[instruction.accounts[1] as usize].to_string(),
}); });
log::info!("StakeInstruction::SetLockupChecked unixTimestamp:{unix_timestamp} epoch:{epoch} custodian:{custodian:?} infos:{info}"); log::info!("StakeInstruction::SetLockupChecked unixTimestamp:{unix_timestamp} epoch:{epoch} custodian:{custodian:?} infos:{info}");
send_verification(
stake_sender,
stakestore,
"SetLockupChecked",
account_keys[instruction.accounts[0] as usize],
)
.await;
} }
StakeInstruction::GetMinimumDelegation => { StakeInstruction::GetMinimumDelegation => {
log::info!("StakeInstruction::GetMinimumDelegation"); log::info!("StakeInstruction::GetMinimumDelegation");
@ -755,14 +650,6 @@ pub async fn process_stake_tx_message(
"referenceVoteAccount": account_keys[instruction.accounts[2] as usize].to_string(), "referenceVoteAccount": account_keys[instruction.accounts[2] as usize].to_string(),
}); });
log::info!("StakeInstruction::DeactivateDelinquent infos:{info}"); log::info!("StakeInstruction::DeactivateDelinquent infos:{info}");
send_verification(
stake_sender,
stakestore,
"DeactivateDelinquent",
account_keys[instruction.accounts[0] as usize],
)
.await;
} }
StakeInstruction::Redelegate => { StakeInstruction::Redelegate => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) {
@ -780,14 +667,6 @@ pub async fn process_stake_tx_message(
"stakeAuthority": account_keys[instruction.accounts[4] as usize].to_string(), "stakeAuthority": account_keys[instruction.accounts[4] as usize].to_string(),
}); });
log::info!("StakeInstruction::Redelegate infos:{info}"); log::info!("StakeInstruction::Redelegate infos:{info}");
send_verification(
stake_sender,
stakestore,
"Redelegate",
account_keys[instruction.accounts[0] as usize],
)
.await;
} }
StakeInstruction::Merge => { StakeInstruction::Merge => {
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) { if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) {
@ -827,22 +706,6 @@ pub async fn process_stake_tx_message(
// }, // },
// current_end_epoch_slot, // current_end_epoch_slot,
// ); // );
// send_verification(
// stake_sender,
// stakestore,
// "Merge Destination",
// account_keys[instruction.accounts[0] as usize],
// )
// .await;
// send_verification(
// stake_sender,
// stakestore,
// "Merge Source",
// account_keys[instruction.accounts[1] as usize],
// )
// .await;
} }
} }
} }