Compare commits
No commits in common. "2fff2b082422790061bf1ad3b63e6a5fdc7aa3ea" and "c12392170956f10b534aff95e78198596ee95df1" have entirely different histories.
2fff2b0824
...
c123921709
|
@ -17,8 +17,7 @@ use std::time::Duration;
|
|||
|
||||
//const STAKE_FILE: &str = "epoch528_leader_schedule_stakes.txt";
|
||||
//const RPC_URL: &str = "http://localhost:8899";
|
||||
//const RPC_URL: &str = "https://api.testnet.solana.com";
|
||||
const RPC_URL: &str = "https://api.mainnet-beta.solana.com";
|
||||
const RPC_URL: &str = "https://api.testnet.solana.com";
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
|
|
|
@ -55,7 +55,6 @@ use yellowstone_grpc_proto::geyser::SubscribeUpdateAccount;
|
|||
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterAccounts;
|
||||
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocks;
|
||||
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta;
|
||||
use yellowstone_grpc_proto::prelude::SubscribeUpdateBlock;
|
||||
use yellowstone_grpc_proto::{
|
||||
prelude::{subscribe_update::UpdateOneof, SubscribeRequestFilterSlots},
|
||||
tonic::service::Interceptor,
|
||||
|
@ -255,13 +254,11 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
|||
|
||||
//For DEBUG TODO remove:
|
||||
//start stake verification loop
|
||||
// let mut stake_verification_sender =
|
||||
// crate::stakestore::start_stake_verification_loop(RPC_URL.to_string()).await;
|
||||
let mut stake_verification_sender =
|
||||
crate::stakestore::start_stake_verification_loop(RPC_URL.to_string()).await;
|
||||
|
||||
//use to process block at confirm slot.
|
||||
//at confirm slot are send before the block.
|
||||
//Verify that the last confirmed slot receive is the block slot.
|
||||
let mut last_confirmed_slot = 0;
|
||||
//TODO remove. Store parent hash to see if we don't miss a block.
|
||||
let mut parent_block_slot = None;
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(req) = request_rx.recv() => {
|
||||
|
@ -412,11 +409,6 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
|||
}
|
||||
}
|
||||
|
||||
if let CommitmentLevel::Confirmed = slot.status() {
|
||||
log::trace!("Receive confirmed slot:{}", slot.slot);
|
||||
last_confirmed_slot = slot.slot;
|
||||
}
|
||||
|
||||
}
|
||||
Some(UpdateOneof::BlockMeta(block_meta)) => {
|
||||
log::info!("Receive Block Meta at slot: {}", block_meta.slot);
|
||||
|
@ -428,11 +420,50 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
|||
block.parent_slot,
|
||||
);
|
||||
|
||||
let slot = block.slot;
|
||||
if last_confirmed_slot != 0 && last_confirmed_slot != slot {
|
||||
log::error!("No block found for slot:{last_confirmed_slot}. Get bloc for slot:{slot}");
|
||||
//TODO remove; Detect missing block
|
||||
if let Some(parent_block_slot) = parent_block_slot {
|
||||
if parent_block_slot != block.parent_slot {
|
||||
log::error!("Bad parent slot stored:{} block:{}, miss a block"
|
||||
,parent_block_slot,block.parent_slot
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
parent_block_slot = Some(block.slot);
|
||||
|
||||
//parse to detect stake merge tx.
|
||||
//first in the main thread then in a specific thread.
|
||||
let stake_public_key: Vec<u8> = solana_sdk::stake::program::id().to_bytes().to_vec();
|
||||
for notif_tx in block.transactions {
|
||||
if !notif_tx.is_vote {
|
||||
if let Some(message) = notif_tx.transaction.and_then(|tx| tx.message) {
|
||||
for instruction in message.instructions {
|
||||
//filter stake tx
|
||||
if message.account_keys[instruction.program_id_index as usize] == stake_public_key {
|
||||
let source_bytes: [u8; 64] = notif_tx.signature[..solana_sdk::signature::SIGNATURE_BYTES]
|
||||
.try_into()
|
||||
.unwrap();
|
||||
log::info!("New stake Tx sign:{} at block slot:{:?} current_slot:{} accounts:{:?}"
|
||||
, solana_sdk::signature::Signature::from(source_bytes).to_string()
|
||||
, block.slot
|
||||
, current_epoch_state.current_slot.confirmed_slot
|
||||
, instruction.accounts
|
||||
);
|
||||
let program_index = instruction.program_id_index;
|
||||
crate::stakestore::process_stake_tx_message(
|
||||
&mut stake_verification_sender,
|
||||
&mut stakestore
|
||||
, &message.account_keys
|
||||
, instruction
|
||||
, program_index
|
||||
, block.slot
|
||||
, current_epoch_state.current_epoch_end_slot(),
|
||||
).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
process_block(block, slot);
|
||||
|
||||
}
|
||||
Some(UpdateOneof::Ping(_)) => log::trace!("UpdateOneof::Ping"),
|
||||
|
@ -478,42 +509,6 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn process_block(block: SubscribeUpdateBlock, confirmed_slot: Slot) {
|
||||
//parse to detect stake merge tx.
|
||||
//first in the main thread then in a specific thread.
|
||||
let stake_public_key: Vec<u8> = solana_sdk::stake::program::id().to_bytes().to_vec();
|
||||
for notif_tx in block.transactions {
|
||||
if !notif_tx.is_vote {
|
||||
if let Some(message) = notif_tx.transaction.and_then(|tx| tx.message) {
|
||||
for instruction in message.instructions {
|
||||
//filter stake tx
|
||||
if message.account_keys[instruction.program_id_index as usize]
|
||||
== stake_public_key
|
||||
{
|
||||
let source_bytes: [u8; 64] = notif_tx.signature
|
||||
[..solana_sdk::signature::SIGNATURE_BYTES]
|
||||
.try_into()
|
||||
.unwrap();
|
||||
log::info!(
|
||||
"New stake Tx sign:{} at block slot:{:?} current_slot:{} accounts:{:?}",
|
||||
solana_sdk::signature::Signature::from(source_bytes).to_string(),
|
||||
block.slot,
|
||||
confirmed_slot,
|
||||
instruction.accounts
|
||||
);
|
||||
let program_index = instruction.program_id_index;
|
||||
crate::stakestore::process_stake_tx_message(
|
||||
&message.account_keys,
|
||||
instruction,
|
||||
program_index,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
pub struct AccountPretty {
|
||||
|
|
|
@ -346,6 +346,19 @@ pub async fn start_stake_verification_loop(
|
|||
request_tx
|
||||
}
|
||||
|
||||
async fn send_verification(
|
||||
_stake_sender: &mut Sender<(String, Pubkey, Option<StoredStake>)>,
|
||||
_stakestore: &mut StakeStore,
|
||||
_instr: &str,
|
||||
_stake_pybkey: Pubkey,
|
||||
) {
|
||||
// let current_stake = stakestore.stakes.get(&stake_pybkey).cloned();
|
||||
// stake_sender
|
||||
// .send((instr.to_string(), stake_pybkey, current_stake))
|
||||
// .await
|
||||
// .unwrap();
|
||||
}
|
||||
|
||||
fn verify_account_len(account_keys: &[Pubkey], instr_accounts: &[u8], indexes: Vec<usize>) -> bool {
|
||||
!indexes
|
||||
.into_iter()
|
||||
|
@ -354,11 +367,15 @@ fn verify_account_len(account_keys: &[Pubkey], instr_accounts: &[u8], indexes: V
|
|||
.is_some()
|
||||
}
|
||||
|
||||
pub fn process_stake_tx_message(
|
||||
pub async fn process_stake_tx_message(
|
||||
stake_sender: &mut Sender<(String, Pubkey, Option<StoredStake>)>,
|
||||
stakestore: &mut StakeStore,
|
||||
account_keys_vec: &[Vec<u8>],
|
||||
instruction: CompiledInstruction,
|
||||
//for debug and trace purpose.
|
||||
program_id_index: u32,
|
||||
_tx_slot: Slot,
|
||||
_current_end_epoch_slot: u64,
|
||||
) {
|
||||
//for tracing purpose
|
||||
let account_keys: Vec<Pubkey> = account_keys_vec
|
||||
|
@ -410,6 +427,14 @@ pub fn process_stake_tx_message(
|
|||
} else {
|
||||
log::warn!("StakeInstruction::Initialize authorized:{authorized} lockup:{lockup} Index error in instruction:{:?}", instruction.accounts);
|
||||
}
|
||||
|
||||
send_verification(
|
||||
stake_sender,
|
||||
stakestore,
|
||||
"Initialize",
|
||||
account_keys[instruction.accounts[0] as usize],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
StakeInstruction::Authorize(new_authorized, authority_type) => {
|
||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) {
|
||||
|
@ -433,6 +458,14 @@ pub fn process_stake_tx_message(
|
|||
)
|
||||
});
|
||||
log::info!("StakeInstruction::Authorize value:{value} custodian:{custodian:?}");
|
||||
|
||||
send_verification(
|
||||
stake_sender,
|
||||
stakestore,
|
||||
"Authorize",
|
||||
account_keys[instruction.accounts[0] as usize],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
StakeInstruction::DelegateStake => {
|
||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4, 5]) {
|
||||
|
@ -451,6 +484,14 @@ pub fn process_stake_tx_message(
|
|||
"stakeAuthority": account_keys[instruction.accounts[5] as usize].to_string(),
|
||||
});
|
||||
log::info!("StakeInstruction::DelegateStake infos:{info}");
|
||||
|
||||
send_verification(
|
||||
stake_sender,
|
||||
stakestore,
|
||||
"DelegateStake",
|
||||
account_keys[instruction.accounts[0] as usize],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
StakeInstruction::Split(lamports) => {
|
||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) {
|
||||
|
@ -467,6 +508,14 @@ pub fn process_stake_tx_message(
|
|||
"lamports": lamports,
|
||||
});
|
||||
log::info!("StakeInstruction::Split infos:{info}");
|
||||
|
||||
send_verification(
|
||||
stake_sender,
|
||||
stakestore,
|
||||
"Split",
|
||||
account_keys[instruction.accounts[0] as usize],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
StakeInstruction::Withdraw(lamports) => {
|
||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) {
|
||||
|
@ -487,6 +536,14 @@ pub fn process_stake_tx_message(
|
|||
let custodian = (instruction.accounts.len() >= 6)
|
||||
.then(|| json!(account_keys[instruction.accounts[5] as usize].to_string()));
|
||||
log::info!("StakeInstruction::Withdraw custodian:{custodian:?}infos:{info}");
|
||||
|
||||
send_verification(
|
||||
stake_sender,
|
||||
stakestore,
|
||||
"Withdraw",
|
||||
account_keys[instruction.accounts[0] as usize],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
StakeInstruction::Deactivate => {
|
||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) {
|
||||
|
@ -528,6 +585,14 @@ pub fn process_stake_tx_message(
|
|||
"custodian": account_keys[instruction.accounts[1] as usize].to_string(),
|
||||
});
|
||||
log::info!("StakeInstruction::SetLockup unixTimestamp:{unix_timestamp} epoch:{epoch} custodian:{custodian} infos:{info}");
|
||||
|
||||
send_verification(
|
||||
stake_sender,
|
||||
stakestore,
|
||||
"SetLockup",
|
||||
account_keys[instruction.accounts[0] as usize],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
StakeInstruction::AuthorizeWithSeed(args) => {
|
||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1]) {
|
||||
|
@ -550,6 +615,14 @@ pub fn process_stake_tx_message(
|
|||
let custodian = (instruction.accounts.len() >= 4)
|
||||
.then(|| json!(account_keys[instruction.accounts[3] as usize].to_string()));
|
||||
log::info!("StakeInstruction::AuthorizeWithSeed clockSysvar:{clock_sysvar:?} custodian:{custodian:?} infos:{info}");
|
||||
|
||||
send_verification(
|
||||
stake_sender,
|
||||
stakestore,
|
||||
"AuthorizeWithSeed",
|
||||
account_keys[instruction.accounts[0] as usize],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
StakeInstruction::InitializeChecked => {
|
||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) {
|
||||
|
@ -566,6 +639,14 @@ pub fn process_stake_tx_message(
|
|||
"withdrawer": account_keys[instruction.accounts[3] as usize].to_string(),
|
||||
});
|
||||
log::info!("StakeInstruction::InitializeChecked infos:{info}");
|
||||
|
||||
send_verification(
|
||||
stake_sender,
|
||||
stakestore,
|
||||
"InitializeChecked",
|
||||
account_keys[instruction.accounts[0] as usize],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
StakeInstruction::AuthorizeChecked(authority_type) => {
|
||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) {
|
||||
|
@ -585,6 +666,14 @@ pub fn process_stake_tx_message(
|
|||
let custodian = (instruction.accounts.len() >= 5)
|
||||
.then(|| json!(account_keys[instruction.accounts[4] as usize].to_string()));
|
||||
log::info!("StakeInstruction::AuthorizeChecked custodian:{custodian:?} infos:{info}");
|
||||
|
||||
send_verification(
|
||||
stake_sender,
|
||||
stakestore,
|
||||
"AuthorizeChecked",
|
||||
account_keys[instruction.accounts[0] as usize],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
StakeInstruction::AuthorizeCheckedWithSeed(args) => {
|
||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) {
|
||||
|
@ -608,6 +697,14 @@ pub fn process_stake_tx_message(
|
|||
log::info!(
|
||||
"StakeInstruction::AuthorizeCheckedWithSeed custodian:{custodian:?} infos:{info}"
|
||||
);
|
||||
|
||||
send_verification(
|
||||
stake_sender,
|
||||
stakestore,
|
||||
"AuthorizeCheckedWithSeed",
|
||||
account_keys[instruction.accounts[0] as usize],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
StakeInstruction::SetLockupChecked(lockup_args) => {
|
||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) {
|
||||
|
@ -632,6 +729,14 @@ pub fn process_stake_tx_message(
|
|||
"custodian": account_keys[instruction.accounts[1] as usize].to_string(),
|
||||
});
|
||||
log::info!("StakeInstruction::SetLockupChecked unixTimestamp:{unix_timestamp} epoch:{epoch} custodian:{custodian:?} infos:{info}");
|
||||
|
||||
send_verification(
|
||||
stake_sender,
|
||||
stakestore,
|
||||
"SetLockupChecked",
|
||||
account_keys[instruction.accounts[0] as usize],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
StakeInstruction::GetMinimumDelegation => {
|
||||
log::info!("StakeInstruction::GetMinimumDelegation");
|
||||
|
@ -650,6 +755,14 @@ pub fn process_stake_tx_message(
|
|||
"referenceVoteAccount": account_keys[instruction.accounts[2] as usize].to_string(),
|
||||
});
|
||||
log::info!("StakeInstruction::DeactivateDelinquent infos:{info}");
|
||||
|
||||
send_verification(
|
||||
stake_sender,
|
||||
stakestore,
|
||||
"DeactivateDelinquent",
|
||||
account_keys[instruction.accounts[0] as usize],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
StakeInstruction::Redelegate => {
|
||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) {
|
||||
|
@ -667,6 +780,14 @@ pub fn process_stake_tx_message(
|
|||
"stakeAuthority": account_keys[instruction.accounts[4] as usize].to_string(),
|
||||
});
|
||||
log::info!("StakeInstruction::Redelegate infos:{info}");
|
||||
|
||||
send_verification(
|
||||
stake_sender,
|
||||
stakestore,
|
||||
"Redelegate",
|
||||
account_keys[instruction.accounts[0] as usize],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
StakeInstruction::Merge => {
|
||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) {
|
||||
|
@ -706,6 +827,22 @@ pub fn process_stake_tx_message(
|
|||
// },
|
||||
// current_end_epoch_slot,
|
||||
// );
|
||||
|
||||
// send_verification(
|
||||
// stake_sender,
|
||||
// stakestore,
|
||||
// "Merge Destination",
|
||||
// account_keys[instruction.accounts[0] as usize],
|
||||
// )
|
||||
// .await;
|
||||
|
||||
// send_verification(
|
||||
// stake_sender,
|
||||
// stakestore,
|
||||
// "Merge Source",
|
||||
// account_keys[instruction.accounts[1] as usize],
|
||||
// )
|
||||
// .await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue