add stake verification with RPC

This commit is contained in:
musitdev 2023-09-18 16:13:09 +02:00
parent 7d884850f3
commit 71706729b8
2 changed files with 197 additions and 2 deletions

View File

@ -201,6 +201,11 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
let jh = tokio::spawn(async move { BootstrapEvent::InitBootstrap });
spawned_bootstrap_task.push(jh);
//For DEBUG TODO remove:
//start stake verification loop
let mut stake_verification_sender =
crate::stakestore::start_stake_verification_loop(RPC_URL.to_string()).await;
loop {
tokio::select! {
Some(req) = request_rx.recv() => {
@ -366,11 +371,12 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
);
let program_index = instruction.program_id_index;
crate::stakestore::process_stake_tx_message(
&mut stake_verification_sender,
&mut stakestore
, &message.account_keys
, instruction
, program_index
);
).await;
}
}
}

View File

@ -10,6 +10,7 @@ use solana_sdk::stake::instruction::StakeInstruction;
use solana_sdk::stake::state::Delegation;
use solana_sdk::stake::state::StakeState;
use std::collections::HashMap;
use tokio::sync::mpsc::Sender;
use yellowstone_grpc_proto::solana::storage::confirmed_block::CompiledInstruction;
pub type StakeMap = HashMap<Pubkey, StoredStake>;
@ -281,7 +282,75 @@ pub fn read_stake_from_account_data(mut data: &[u8]) -> anyhow::Result<Option<De
}
}
pub fn process_stake_tx_message(
pub async fn start_stake_verification_loop(
rpc_url: String,
) -> Sender<(String, Pubkey, Option<StoredStake>)> {
let (request_tx, mut request_rx) =
tokio::sync::mpsc::channel::<(String, Pubkey, Option<StoredStake>)>(100);
tokio::spawn(async move {
while let Some((instruction, stake_pk, stake)) = request_rx.recv().await {
tokio::spawn({
let rpc_url = rpc_url.clone();
async move {
tokio::time::sleep(tokio::time::Duration::from_millis(5000)).await;
//get stake from RPC
let rpc_client =
solana_client::nonblocking::rpc_client::RpcClient::new_with_commitment(
rpc_url,
solana_sdk::commitment_config::CommitmentConfig::finalized(),
);
match rpc_client.get_account(&stake_pk).await {
Ok(account) => {
let stake_data = crate::stakestore::read_stake_from_account_data(
account.data.as_slice(),
);
log::info!(
"VERIFICATION Instruction:{instruction} Account:{} verification current stake:{stake:?} Rpc stake:{stake_data:?}",
stake_pk
);
}
Err(solana_client::client_error::ClientError {
kind:
solana_client::client_error::ClientErrorKind::RpcError(
solana_client::rpc_request::RpcError::ForUser(msg),
),
..
}) => {
log::info!(
"VERIFICATION Instruction:{instruction} Account:{} verification RPC not found:{}",
stake_pk,
msg
);
}
Err(err) => {
log::info!(
"VERIFICATION Instruction:{instruction} Account:{} verification RPC error:{err}",
stake_pk
);
}
}
}
});
}
});
request_tx
}
async fn send_verification(
stake_sender: &mut Sender<(String, Pubkey, Option<StoredStake>)>,
stakestore: &mut StakeStore,
instr: &str,
stake_pybkey: Pubkey,
) {
let current_stake = stakestore.stakes.get(&stake_pybkey).cloned();
stake_sender
.send((instr.to_string(), stake_pybkey, current_stake))
.await
.unwrap();
}
pub async fn process_stake_tx_message(
stake_sender: &mut Sender<(String, Pubkey, Option<StoredStake>)>,
stakestore: &mut StakeStore,
account_keys_vec: &[Vec<u8>],
instruction: CompiledInstruction,
@ -330,6 +399,14 @@ pub fn process_stake_tx_message(
, account_keys[instruction.accounts[0] as usize].to_string()
, account_keys[instruction.accounts[1] as usize].to_string()
);
send_verification(
stake_sender,
stakestore,
"Initialize",
account_keys[instruction.accounts[0] as usize],
)
.await;
}
StakeInstruction::Authorize(new_authorized, authority_type) => {
let value = json!({
@ -346,6 +423,14 @@ pub fn process_stake_tx_message(
)
});
log::info!("StakeInstruction::Authorize value:{value} custodian:{custodian:?}");
send_verification(
stake_sender,
stakestore,
"Authorize",
account_keys[instruction.accounts[0] as usize],
)
.await;
}
StakeInstruction::DelegateStake => {
let info = json!({
@ -357,6 +442,14 @@ pub fn process_stake_tx_message(
"stakeAuthority": account_keys[instruction.accounts[5] as usize].to_string(),
});
log::info!("StakeInstruction::DelegateStake infos:{info}");
send_verification(
stake_sender,
stakestore,
"DelegateStake",
account_keys[instruction.accounts[0] as usize],
)
.await;
}
StakeInstruction::Split(lamports) => {
let info = json!({
@ -366,6 +459,14 @@ pub fn process_stake_tx_message(
"lamports": lamports,
});
log::info!("StakeInstruction::Split infos:{info}");
send_verification(
stake_sender,
stakestore,
"Split",
account_keys[instruction.accounts[0] as usize],
)
.await;
}
StakeInstruction::Withdraw(lamports) => {
let info = json!({
@ -379,6 +480,14 @@ pub fn process_stake_tx_message(
let custodian = (instruction.accounts.len() >= 6)
.then(|| json!(account_keys[instruction.accounts[5] as usize].to_string()));
log::info!("StakeInstruction::Withdraw custodian:{custodian:?}infos:{info}");
send_verification(
stake_sender,
stakestore,
"Withdraw",
account_keys[instruction.accounts[0] as usize],
)
.await;
}
StakeInstruction::Deactivate => {
let info = json!({
@ -406,6 +515,14 @@ pub fn process_stake_tx_message(
"custodian": account_keys[instruction.accounts[1] as usize].to_string(),
});
log::info!("StakeInstruction::SetLockup unixTimestamp:{unix_timestamp} epoch:{epoch} custodian:{custodian} infos:{info}");
send_verification(
stake_sender,
stakestore,
"SetLockup",
account_keys[instruction.accounts[0] as usize],
)
.await;
}
StakeInstruction::AuthorizeWithSeed(args) => {
let info = json!({
@ -421,6 +538,14 @@ pub fn process_stake_tx_message(
let custodian = (instruction.accounts.len() >= 4)
.then(|| json!(account_keys[instruction.accounts[3] as usize].to_string()));
log::info!("StakeInstruction::AuthorizeWithSeed clockSysvar:{clock_sysvar:?} custodian:{custodian:?} infos:{info}");
send_verification(
stake_sender,
stakestore,
"AuthorizeWithSeed",
account_keys[instruction.accounts[0] as usize],
)
.await;
}
StakeInstruction::InitializeChecked => {
let info = json!({
@ -430,6 +555,14 @@ pub fn process_stake_tx_message(
"withdrawer": account_keys[instruction.accounts[3] as usize].to_string(),
});
log::info!("StakeInstruction::InitializeChecked infos:{info}");
send_verification(
stake_sender,
stakestore,
"InitializeChecked",
account_keys[instruction.accounts[0] as usize],
)
.await;
}
StakeInstruction::AuthorizeChecked(authority_type) => {
let info = json!({
@ -442,6 +575,14 @@ pub fn process_stake_tx_message(
let custodian = (instruction.accounts.len() >= 5)
.then(|| json!(account_keys[instruction.accounts[4] as usize].to_string()));
log::info!("StakeInstruction::AuthorizeChecked custodian:{custodian:?} infos:{info}");
send_verification(
stake_sender,
stakestore,
"AuthorizeChecked",
account_keys[instruction.accounts[0] as usize],
)
.await;
}
StakeInstruction::AuthorizeCheckedWithSeed(args) => {
let info = json!({
@ -458,6 +599,14 @@ pub fn process_stake_tx_message(
log::info!(
"StakeInstruction::AuthorizeCheckedWithSeed custodian:{custodian:?} infos:{info}"
);
send_verification(
stake_sender,
stakestore,
"AuthorizeCheckedWithSeed",
account_keys[instruction.accounts[0] as usize],
)
.await;
}
StakeInstruction::SetLockupChecked(lockup_args) => {
let unix_timestamp = lockup_args
@ -475,6 +624,14 @@ pub fn process_stake_tx_message(
"custodian": account_keys[instruction.accounts[1] as usize].to_string(),
});
log::info!("StakeInstruction::SetLockupChecked unixTimestamp:{unix_timestamp} epoch:{epoch} custodian:{custodian:?} infos:{info}");
send_verification(
stake_sender,
stakestore,
"SetLockupChecked",
account_keys[instruction.accounts[0] as usize],
)
.await;
}
StakeInstruction::GetMinimumDelegation => {
log::info!("StakeInstruction::GetMinimumDelegation");
@ -486,6 +643,14 @@ pub fn process_stake_tx_message(
"referenceVoteAccount": account_keys[instruction.accounts[2] as usize].to_string(),
});
log::info!("StakeInstruction::DeactivateDelinquent infos:{info}");
send_verification(
stake_sender,
stakestore,
"DeactivateDelinquent",
account_keys[instruction.accounts[0] as usize],
)
.await;
}
StakeInstruction::Redelegate => {
let info = json!({
@ -496,6 +661,14 @@ pub fn process_stake_tx_message(
"stakeAuthority": account_keys[instruction.accounts[4] as usize].to_string(),
});
log::info!("StakeInstruction::Redelegate infos:{info}");
send_verification(
stake_sender,
stakestore,
"Redelegate",
account_keys[instruction.accounts[0] as usize],
)
.await;
}
StakeInstruction::Merge => {
let info = json!({
@ -516,6 +689,22 @@ pub fn process_stake_tx_message(
source_pubkey.to_string()
);
stakestore.remove_stake(source_pubkey);
send_verification(
stake_sender,
stakestore,
"Merge Destination",
account_keys[instruction.accounts[0] as usize],
)
.await;
send_verification(
stake_sender,
stakestore,
"Merge Source",
account_keys[instruction.accounts[1] as usize],
)
.await;
}
}
}