Merge pull request #5 from blockworks-foundation/aggregate_stake
Add cluster info
This commit is contained in:
commit
37e2fb23f0
|
@ -0,0 +1,45 @@
|
||||||
|
[package]
|
||||||
|
name = "clusterinfo"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
anyhow = "1.0.71"
|
||||||
|
bincode = "1.3.3"
|
||||||
|
borsh = "0.10.3"
|
||||||
|
bs58 = "0.4.0"
|
||||||
|
hex = "0.4.3"
|
||||||
|
log = "0.4.17"
|
||||||
|
tracing-subscriber = "0.3.16"
|
||||||
|
nom = "7.1.3"
|
||||||
|
itertools = "0.11.0"
|
||||||
|
reqwest = "0.11"
|
||||||
|
serde = "1.0"
|
||||||
|
serde_json = "1.0"
|
||||||
|
thiserror = "1.0.40"
|
||||||
|
|
||||||
|
futures = { version = "0.3.28", default-features = false }
|
||||||
|
futures-util = "0.3.28"
|
||||||
|
tokio = { version = "1.*", features = ["full"] }
|
||||||
|
async-stream = "0.3.5"
|
||||||
|
|
||||||
|
yellowstone-grpc-client = { path = "../../yellowstone-grpc-clusterinfo/yellowstone-grpc-client" }
|
||||||
|
yellowstone-grpc-proto = { path = "../../yellowstone-grpc-clusterinfo/yellowstone-grpc-proto" }
|
||||||
|
|
||||||
|
#patch to build locally
|
||||||
|
solana-program = { path = "../../solana-geyzer-cluster/sdk/program"}
|
||||||
|
solana-sdk = { path = "../../solana-geyzer-cluster/sdk"}
|
||||||
|
solana-client = { path = "../../solana-geyzer-cluster/client"}
|
||||||
|
solana-rpc-client-api = { path = "../../solana-geyzer-cluster/rpc-client-api" }
|
||||||
|
solana-version = { path = "../../solana-geyzer-cluster/version"}
|
||||||
|
solana-account-decoder = { path = "../../solana-geyzer-cluster/account-decoder" }
|
||||||
|
solana-ledger = { path = "../../solana-geyzer-cluster/ledger"}
|
||||||
|
solana-streamer = { path = "../../solana-geyzer-cluster/streamer"}
|
||||||
|
|
||||||
|
|
||||||
|
#patch to build locally
|
||||||
|
[patch.crates-io]
|
||||||
|
solana-program = { path = "../../solana-geyzer-cluster/sdk/program"}
|
||||||
|
solana-zk-token-sdk = { path = "../../solana-geyzer-cluster/zk-token-sdk"}
|
|
@ -0,0 +1,226 @@
|
||||||
|
// RUST_LOG=info cargo run
|
||||||
|
|
||||||
|
use async_stream::stream;
|
||||||
|
use futures::{Stream, StreamExt};
|
||||||
|
use solana_client::rpc_client::RpcClient;
|
||||||
|
use solana_sdk::commitment_config::CommitmentConfig;
|
||||||
|
use solana_streamer::socket::SocketAddrSpace;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::collections::HashSet;
|
||||||
|
use std::pin::pin;
|
||||||
|
use std::str::FromStr;
|
||||||
|
use yellowstone_grpc_client::GeyserGrpcClient;
|
||||||
|
use yellowstone_grpc_proto::geyser::CommitmentLevel;
|
||||||
|
use yellowstone_grpc_proto::prelude::subscribe_update_cluster_info;
|
||||||
|
use yellowstone_grpc_proto::prelude::SubscribeUpdate;
|
||||||
|
use yellowstone_grpc_proto::prelude::SubscribeUpdateClusterInfo;
|
||||||
|
use yellowstone_grpc_proto::prelude::SubscribeUpdateClusterInfoUpdate;
|
||||||
|
use yellowstone_grpc_proto::prelude::{subscribe_update::UpdateOneof, SubscribeRequestFilterSlots};
|
||||||
|
use yellowstone_grpc_proto::tonic::Status;
|
||||||
|
|
||||||
|
const RPC_URL: &str = "http://localhost:8899";
|
||||||
|
const GRPC_URL: &str = "http://localhost:10000";
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
tracing_subscriber::fmt::init();
|
||||||
|
|
||||||
|
let ctrl_c_signal = tokio::signal::ctrl_c();
|
||||||
|
|
||||||
|
//config
|
||||||
|
let allow_private_addr = false;
|
||||||
|
let addr_verifier = SocketAddrSpace::new(allow_private_addr);
|
||||||
|
//let genesis_config_hash = solana_sdk::Hash();
|
||||||
|
// let shred_version =
|
||||||
|
// solana_sdk::shred_version::compute_shred_version(&genesis_config.hash(), Some(&hard_forks));
|
||||||
|
|
||||||
|
let cluster_info_stream = create_geyser_clusterinfo_stream().await?;
|
||||||
|
|
||||||
|
let jh = tokio::spawn({
|
||||||
|
let shred_version = TESTNET_SHRED_VERSION;
|
||||||
|
|
||||||
|
async move {
|
||||||
|
run_loop(cluster_info_stream, shred_version, addr_verifier).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
res = jh => {
|
||||||
|
log::error!("Process quit unexpectedly {res:?}");
|
||||||
|
|
||||||
|
}
|
||||||
|
_ = ctrl_c_signal => {
|
||||||
|
log::info!("Received ctrl+c signal");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
//let socket_addr_space = SocketAddrSpace::new(matches.is_present("allow_private_addr"));
|
||||||
|
// pub shred_version: u16 = 0,
|
||||||
|
|
||||||
|
const TESTNET_SHRED_VERSION: u16 = 5106;
|
||||||
|
const TESTNET_GENESIS_HASH: &str = "4uhcVJyU9pJkvQyS88uRDiswHXSCkY3zQawwpjk2NsNY";
|
||||||
|
|
||||||
|
const MAINNET_SHRED_VERSION: u16 = 5106;
|
||||||
|
const MAINNET_GENESIS_HASH: &str = "5eykt4UsFv8P8NJdTREpY1vzqKqZKvdpKuc147dw2N9d";
|
||||||
|
|
||||||
|
fn verify_node(
|
||||||
|
node: &SubscribeUpdateClusterInfoUpdate,
|
||||||
|
addr_verifier: &SocketAddrSpace,
|
||||||
|
shred_version: u16,
|
||||||
|
) -> bool {
|
||||||
|
node.gossip
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|addr| std::net::SocketAddr::from_str(addr).ok())
|
||||||
|
.map(|addr| addr_verifier.check(&addr))
|
||||||
|
.unwrap_or(false)
|
||||||
|
&& node.shred_version as u16 == shred_version
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_loop<S>(cluster_info_stream: S, shred_version: u16, socket_addr_space: SocketAddrSpace)
|
||||||
|
where
|
||||||
|
S: Stream<Item = SubscribeUpdateClusterInfo>,
|
||||||
|
{
|
||||||
|
let mut node_list: HashMap<String, SubscribeUpdateClusterInfoUpdate> = HashMap::new();
|
||||||
|
let mut cluster_info_stream = pin!(cluster_info_stream);
|
||||||
|
|
||||||
|
//Log current size of the cluster node info list size.
|
||||||
|
let mut log_interval = tokio::time::interval(tokio::time::Duration::from_secs(15));
|
||||||
|
//spik first tick
|
||||||
|
log_interval.tick().await;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
//log interval TODO remove
|
||||||
|
_ = log_interval.tick() => {
|
||||||
|
log::info!("Current cluster info list size:{}", node_list.len());
|
||||||
|
//verify cluster_infos
|
||||||
|
let geyzer_clluster = node_list.clone();
|
||||||
|
tokio::task::spawn_blocking(move ||{
|
||||||
|
match get_rpc_cluster_info() {
|
||||||
|
Ok(rpc_cluster) => {
|
||||||
|
verify_clusters(geyzer_clluster, rpc_cluster);
|
||||||
|
},
|
||||||
|
Err(err) => log::warn!("Error during get RPC cluster nodes: {err}"),
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Some(update) = cluster_info_stream.next() => {
|
||||||
|
match update.data {
|
||||||
|
Some(subscribe_update_cluster_info::Data::Update(update)) => {
|
||||||
|
log::trace!("{}, ClusterInfo update", update.pubkey);
|
||||||
|
if verify_node(&update, &socket_addr_space, shred_version) {
|
||||||
|
node_list.insert(update.pubkey.clone(), update);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(subscribe_update_cluster_info::Data::Remove(pk)) => {
|
||||||
|
log::info!("{}, ClusterInfo remove", pk);
|
||||||
|
node_list.remove(&pk);
|
||||||
|
}
|
||||||
|
None => {}
|
||||||
|
}
|
||||||
|
|
||||||
|
// match verify_node(&update, &socket_addr_space, shred_version) {
|
||||||
|
// true => {node_list.insert(update.pubkey.clone(), update);},
|
||||||
|
// false=> log::info!("verify_node fail for:{} for addr:{}", update.pubkey, update.gossip.unwrap_or("None".to_string())),
|
||||||
|
// }
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_geyser_clusterinfo_stream(
|
||||||
|
) -> anyhow::Result<impl Stream<Item = SubscribeUpdateClusterInfo>> {
|
||||||
|
let geyzer_stream = subscribe_geyzer_clusterinfo(GRPC_URL.to_string()).await?;
|
||||||
|
|
||||||
|
Ok(stream! {
|
||||||
|
for await update_message in geyzer_stream {
|
||||||
|
match update_message {
|
||||||
|
Ok(update_message) => {
|
||||||
|
match update_message.update_oneof {
|
||||||
|
Some(UpdateOneof::ClusterInfo(update_cluster_info)) =>
|
||||||
|
{
|
||||||
|
yield update_cluster_info;
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
Err(tonic_status) => {
|
||||||
|
// TODO identify non-recoverable errors and cancel stream
|
||||||
|
panic!("Receive error on geyzer stream {:?}", tonic_status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} // -- production loop
|
||||||
|
}) // -- stream!
|
||||||
|
}
|
||||||
|
|
||||||
|
//subscribe Geyser grpc
|
||||||
|
async fn subscribe_geyzer_clusterinfo(
|
||||||
|
grpc_url: String,
|
||||||
|
) -> anyhow::Result<impl Stream<Item = Result<SubscribeUpdate, Status>>> {
|
||||||
|
let mut client = GeyserGrpcClient::connect(grpc_url, None::<&'static str>, None)?;
|
||||||
|
|
||||||
|
let mut slots = HashMap::new();
|
||||||
|
slots.insert("client".to_string(), SubscribeRequestFilterSlots::default());
|
||||||
|
|
||||||
|
//account subscription
|
||||||
|
let mut cluster_filter: HashSet<String> = HashSet::new();
|
||||||
|
cluster_filter.insert("client".to_owned());
|
||||||
|
|
||||||
|
let confirmed_stream = client
|
||||||
|
.subscribe_once(
|
||||||
|
slots.clone(),
|
||||||
|
Default::default(), //accounts
|
||||||
|
Default::default(), //tx
|
||||||
|
Default::default(), //entry
|
||||||
|
Default::default(), //full block
|
||||||
|
Default::default(), //block meta
|
||||||
|
Some(CommitmentLevel::Processed),
|
||||||
|
vec![],
|
||||||
|
None,
|
||||||
|
Some(cluster_filter),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(confirmed_stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_rpc_cluster_info() -> anyhow::Result<Vec<solana_client::rpc_response::RpcContactInfo>> {
|
||||||
|
let rpc_client = RpcClient::new_with_timeout_and_commitment(
|
||||||
|
RPC_URL.to_string(),
|
||||||
|
tokio::time::Duration::from_secs(600),
|
||||||
|
CommitmentConfig::confirmed(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let cluster_nodes = rpc_client.get_cluster_nodes()?;
|
||||||
|
log::info!("RPC cluster nodes size:{}", cluster_nodes.len());
|
||||||
|
Ok(cluster_nodes)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn verify_clusters(
|
||||||
|
mut geyzer_cluster: HashMap<String, SubscribeUpdateClusterInfoUpdate>,
|
||||||
|
rpc_cluster: Vec<solana_client::rpc_response::RpcContactInfo>,
|
||||||
|
) {
|
||||||
|
//verify if all rpc are in geyzer
|
||||||
|
for rpc in &rpc_cluster {
|
||||||
|
if let None = geyzer_cluster.remove(&rpc.pubkey) {
|
||||||
|
log::info!("Rpc node not present in geyzer: {} ", rpc.pubkey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
geyzer_cluster.iter().for_each(|(key, node)| {
|
||||||
|
log::info!(
|
||||||
|
"Geyzer node not present in RPC: {} gossip:{} shred_version:{} node:{:?}",
|
||||||
|
key,
|
||||||
|
node.gossip
|
||||||
|
.as_ref()
|
||||||
|
.map(|addr| addr.to_string())
|
||||||
|
.unwrap_or("None".to_string()),
|
||||||
|
node.shred_version,
|
||||||
|
node
|
||||||
|
)
|
||||||
|
});
|
||||||
|
}
|
|
@ -1,2 +1,2 @@
|
||||||
[toolchain]
|
[toolchain]
|
||||||
channel = "1.70.0"
|
channel = "1.72.0"
|
||||||
|
|
|
@ -27,6 +27,10 @@ path = "bin/sysvaraccount.rs"
|
||||||
name = "send_get_vote_account"
|
name = "send_get_vote_account"
|
||||||
path = "bin/send_get_vote_account.rs"
|
path = "bin/send_get_vote_account.rs"
|
||||||
|
|
||||||
|
[[bin]]
|
||||||
|
name = "testdeleteacc"
|
||||||
|
path = "bin/testdeleteacc.rs"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
@ -55,21 +59,21 @@ jsonrpsee = { version = "0.20.0", features = ["macros", "server", "full"] }
|
||||||
#jsonrpsee-types = "0.20.0"
|
#jsonrpsee-types = "0.20.0"
|
||||||
thiserror = "1.0.40"
|
thiserror = "1.0.40"
|
||||||
|
|
||||||
#yellowstone-grpc-client = { path = "../../yellowstone-grpc/yellowstone-grpc-client" }
|
#yellowstone-grpc-client = { path = "../../yellowstone-grpc-delete-account/yellowstone-grpc-client" }
|
||||||
#yellowstone-grpc-proto = { path = "../../yellowstone-grpc/yellowstone-grpc-proto" }
|
#yellowstone-grpc-proto = { path = "../../yellowstone-grpc-delete-account/yellowstone-grpc-proto" }
|
||||||
|
|
||||||
|
|
||||||
#yellowstone-grpc-client = { git = "http://github.com/rpcpool/yellowstone-grpc", rev = "c89b89dfc5f03f11f45ac4a6e832386a1d94cb67" }
|
#yellowstone-grpc-client = { git = "http://github.com/rpcpool/yellowstone-grpc", rev = "c89b89dfc5f03f11f45ac4a6e832386a1d94cb67" }
|
||||||
#yellowstone-grpc-proto = { git = "http://github.com/rpcpool/yellowstone-grpc", rev = "c89b89dfc5f03f11f45ac4a6e832386a1d94cb67" }
|
#yellowstone-grpc-proto = { git = "http://github.com/rpcpool/yellowstone-grpc", rev = "c89b89dfc5f03f11f45ac4a6e832386a1d94cb67" }
|
||||||
|
|
||||||
#yellowstone-grpc-client = "1.11.0+solana.1.16.14"
|
yellowstone-grpc-client = "1.11.0+solana.1.16.14"
|
||||||
#yellowstone-grpc-proto = "1.10.0+solana.1.16.14"
|
yellowstone-grpc-proto = "1.10.0+solana.1.16.14"
|
||||||
|
|
||||||
#yellowstone-grpc-client = "v1.10.0+solana.1.16.17"
|
#yellowstone-grpc-client = "v1.10.0+solana.1.16.17"
|
||||||
#yellowstone-grpc-proto = "v1.10.0+solana.1.16.17"
|
#yellowstone-grpc-proto = "v1.10.0+solana.1.16.17"
|
||||||
|
|
||||||
yellowstone-grpc-client = "1.10.0"
|
#yellowstone-grpc-client = "1.10.0"
|
||||||
yellowstone-grpc-proto = "1.10.0"
|
#yellowstone-grpc-proto = "1.10.0"
|
||||||
|
|
||||||
solana-sdk = "1.16.14"
|
solana-sdk = "1.16.14"
|
||||||
solana-client = "1.16.14"
|
solana-client = "1.16.14"
|
||||||
|
@ -78,3 +82,19 @@ solana-rpc-client-api = "1.16.14"
|
||||||
solana-version = "1.16.14"
|
solana-version = "1.16.14"
|
||||||
solana-account-decoder = "1.16.14"
|
solana-account-decoder = "1.16.14"
|
||||||
solana-program = "1.16.14"
|
solana-program = "1.16.14"
|
||||||
|
|
||||||
|
|
||||||
|
#patch to build locally
|
||||||
|
#solana-program = { path = "../../solana/sdk/program" , version = "=1.18.0"}
|
||||||
|
#solana-sdk = { path = "../../solana/sdk" , version = "=1.18.0"}
|
||||||
|
#solana-client = { path = "../../solana/client" , version = "=1.18.0"}
|
||||||
|
#solana-rpc-client-api = { path = "../../solana/rpc-client-api" , version = "=1.18.0"}
|
||||||
|
#solana-version = { path = "../../solana/version" , version = "=1.18.0"}
|
||||||
|
#solana-account-decoder = { path = "../../solana/account-decoder" , version = "=1.18.0"}
|
||||||
|
#solana-ledger = { path = "../../solana/ledger" , version = "=1.18.0"}
|
||||||
|
|
||||||
|
|
||||||
|
#patch to build locally
|
||||||
|
#[patch.crates-io]
|
||||||
|
#solana-program = { path = "../../solana/sdk/program" , version = "=1.18.0"}
|
||||||
|
#solana-zk-token-sdk = { path = "../../solana/zk-token-sdk" , version = "=1.18.0"}
|
||||||
|
|
|
@ -18,6 +18,7 @@ use std::time::Duration;
|
||||||
//const STAKE_FILE: &str = "epoch528_leader_schedule_stakes.txt";
|
//const STAKE_FILE: &str = "epoch528_leader_schedule_stakes.txt";
|
||||||
//const RPC_URL: &str = "http://localhost:8899";
|
//const RPC_URL: &str = "http://localhost:8899";
|
||||||
const RPC_URL: &str = "https://api.testnet.solana.com";
|
const RPC_URL: &str = "https://api.testnet.solana.com";
|
||||||
|
//const RPC_URL: &str = "https://api.mainnet-beta.solana.com";
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
|
|
@ -62,7 +62,7 @@ async fn run_loop<F: Interceptor>(
|
||||||
//subscribe Geyser grpc
|
//subscribe Geyser grpc
|
||||||
//slot subscription
|
//slot subscription
|
||||||
let mut slots = HashMap::new();
|
let mut slots = HashMap::new();
|
||||||
slots.insert("client".to_string(), SubscribeRequestFilterSlots {});
|
slots.insert("client".to_string(), SubscribeRequestFilterSlots::default());
|
||||||
|
|
||||||
//account subscription
|
//account subscription
|
||||||
let mut accounts_filter: HashMap<String, SubscribeRequestFilterAccounts> = HashMap::new();
|
let mut accounts_filter: HashMap<String, SubscribeRequestFilterAccounts> = HashMap::new();
|
||||||
|
@ -87,6 +87,7 @@ async fn run_loop<F: Interceptor>(
|
||||||
Default::default(), //block meta
|
Default::default(), //block meta
|
||||||
Some(CommitmentLevel::Confirmed),
|
Some(CommitmentLevel::Confirmed),
|
||||||
vec![],
|
vec![],
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,104 @@
|
||||||
|
// cargo run --bin testdeleteacc
|
||||||
|
|
||||||
|
use anyhow::bail;
|
||||||
|
use futures_util::StreamExt;
|
||||||
|
use solana_sdk::pubkey::Pubkey;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use yellowstone_grpc_client::GeyserGrpcClient;
|
||||||
|
use yellowstone_grpc_proto::geyser::CommitmentLevel;
|
||||||
|
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterAccounts;
|
||||||
|
use yellowstone_grpc_proto::{
|
||||||
|
prelude::{subscribe_update::UpdateOneof, SubscribeRequestFilterSlots},
|
||||||
|
tonic::service::Interceptor,
|
||||||
|
};
|
||||||
|
|
||||||
|
const GRPC_URL: &str = "http://localhost:10000";
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
tracing_subscriber::fmt::init();
|
||||||
|
|
||||||
|
let client = GeyserGrpcClient::connect(GRPC_URL, None::<&'static str>, None)?;
|
||||||
|
|
||||||
|
let ctrl_c_signal = tokio::signal::ctrl_c();
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
res = run_loop(client) => {
|
||||||
|
// This should never happen
|
||||||
|
log::error!("Services quit unexpectedly {res:?}");
|
||||||
|
}
|
||||||
|
_ = ctrl_c_signal => {
|
||||||
|
log::info!("Received ctrl+c signal");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Result<()> {
|
||||||
|
//subscribe Geyser grpc
|
||||||
|
//slot subscription
|
||||||
|
let mut slots = HashMap::new();
|
||||||
|
slots.insert("client".to_string(), SubscribeRequestFilterSlots::default());
|
||||||
|
|
||||||
|
//account subscription
|
||||||
|
let mut accounts_filter: HashMap<String, SubscribeRequestFilterAccounts> = HashMap::new();
|
||||||
|
accounts_filter.insert(
|
||||||
|
"client".to_owned(),
|
||||||
|
SubscribeRequestFilterAccounts {
|
||||||
|
account: vec![],
|
||||||
|
owner: vec![solana_sdk::stake::program::ID.to_string()],
|
||||||
|
filters: vec![],
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut confirmed_stream = client
|
||||||
|
.subscribe_once(
|
||||||
|
slots.clone(),
|
||||||
|
accounts_filter, //accounts
|
||||||
|
Default::default(), //tx
|
||||||
|
Default::default(), //entry
|
||||||
|
Default::default(), //full block
|
||||||
|
Default::default(), //block meta
|
||||||
|
Some(CommitmentLevel::Confirmed),
|
||||||
|
vec![],
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
while let Some(Ok(update)) = confirmed_stream.next().await {
|
||||||
|
match update.update_oneof {
|
||||||
|
Some(UpdateOneof::Account(account)) => {
|
||||||
|
log::info!("Geyser receive account at slot:{}", account.slot);
|
||||||
|
log::info!(
|
||||||
|
"Geyser notif with previous account owner:{:?}",
|
||||||
|
account
|
||||||
|
.previous_account_state
|
||||||
|
.map(|acc| Pubkey::try_from(acc.owner).expect("valid pubkey"))
|
||||||
|
);
|
||||||
|
if let Some(account) = account.account {
|
||||||
|
log::info!(
|
||||||
|
"Geyser notif for account account:{:?}",
|
||||||
|
Pubkey::try_from(account.pubkey).expect("valid pubkey")
|
||||||
|
);
|
||||||
|
log::info!(
|
||||||
|
"Geyser notif account owner:{:?}",
|
||||||
|
Pubkey::try_from(account.owner).expect("valid pubkey")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(UpdateOneof::Slot(slot)) => log::trace!(
|
||||||
|
"Geyser receive slot:{} commitment:{:?}",
|
||||||
|
slot.slot,
|
||||||
|
slot.status()
|
||||||
|
),
|
||||||
|
Some(UpdateOneof::Ping(_)) => {
|
||||||
|
log::trace!("GRPC Ping");
|
||||||
|
}
|
||||||
|
k => {
|
||||||
|
bail!("Unexpected update: {k:?}");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -0,0 +1,2 @@
|
||||||
|
[toolchain]
|
||||||
|
channel = "1.72.0"
|
|
@ -7,10 +7,92 @@ use solana_client::nonblocking::rpc_client::RpcClient;
|
||||||
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
|
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
|
||||||
use solana_sdk::epoch_info::EpochInfo;
|
use solana_sdk::epoch_info::EpochInfo;
|
||||||
use solana_sdk::sysvar::epoch_schedule::EpochSchedule;
|
use solana_sdk::sysvar::epoch_schedule::EpochSchedule;
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
use std::collections::BTreeSet;
|
||||||
|
use std::ops::Bound;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use yellowstone_grpc_proto::geyser::CommitmentLevel as GeyserCommitmentLevel;
|
use yellowstone_grpc_proto::geyser::CommitmentLevel as GeyserCommitmentLevel;
|
||||||
|
use yellowstone_grpc_proto::prelude::SubscribeUpdateBlock;
|
||||||
use yellowstone_grpc_proto::prelude::SubscribeUpdateSlot;
|
use yellowstone_grpc_proto::prelude::SubscribeUpdateSlot;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct BlockSlotVerifier {
|
||||||
|
block_cache: BTreeMap<u64, SubscribeUpdateBlock>,
|
||||||
|
slot_cache: BTreeSet<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BlockSlotVerifier {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
BlockSlotVerifier {
|
||||||
|
block_cache: BTreeMap::new(),
|
||||||
|
slot_cache: BTreeSet::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn process_slot(&mut self, slot: u64) -> Option<(u64, SubscribeUpdateBlock)> {
|
||||||
|
match self.block_cache.remove(&slot) {
|
||||||
|
//the block is already seen. Return slot/block.
|
||||||
|
Some(block) => Some((slot, block)),
|
||||||
|
None => {
|
||||||
|
self.slot_cache.insert(slot);
|
||||||
|
self.verify(slot);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn process_block(
|
||||||
|
&mut self,
|
||||||
|
block: SubscribeUpdateBlock,
|
||||||
|
) -> Option<(u64, SubscribeUpdateBlock)> {
|
||||||
|
let slot = block.slot;
|
||||||
|
if self.slot_cache.remove(&slot) {
|
||||||
|
//the slot is already seen. Return slot/block.
|
||||||
|
Some((slot, block))
|
||||||
|
} else {
|
||||||
|
//Cache block and wait for the slot
|
||||||
|
let old = self.block_cache.insert(slot, block);
|
||||||
|
if old.is_some() {
|
||||||
|
log::warn!("Receive 2 blocks for the same slot:{slot}");
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn verify(&mut self, current_slot: u64) {
|
||||||
|
//do some verification on cached block and slot
|
||||||
|
let old_slot: Vec<_> = self
|
||||||
|
.slot_cache
|
||||||
|
.range((
|
||||||
|
Bound::Unbounded,
|
||||||
|
Bound::Included(current_slot.saturating_sub(2)),
|
||||||
|
))
|
||||||
|
.copied()
|
||||||
|
.collect();
|
||||||
|
if old_slot.len() > 0 {
|
||||||
|
log::error!("Missing block for slots:{:?}", old_slot);
|
||||||
|
for slot in &old_slot {
|
||||||
|
self.slot_cache.remove(&slot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//verify that there's no too old block.
|
||||||
|
let old_block_slots: Vec<_> = self
|
||||||
|
.block_cache
|
||||||
|
.range((
|
||||||
|
Bound::Unbounded,
|
||||||
|
Bound::Included(current_slot.saturating_sub(2)),
|
||||||
|
))
|
||||||
|
.map(|(slot, _)| slot)
|
||||||
|
.copied()
|
||||||
|
.collect();
|
||||||
|
if old_block_slots.len() > 0 {
|
||||||
|
log::error!("Missing slot for block slot:{:?}", old_slot);
|
||||||
|
for slot in old_block_slots {
|
||||||
|
self.block_cache.remove(&slot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default, Copy, Clone, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize)]
|
#[derive(Debug, Default, Copy, Clone, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize)]
|
||||||
pub struct Epoch {
|
pub struct Epoch {
|
||||||
pub epoch: u64,
|
pub epoch: u64,
|
||||||
|
|
|
@ -36,6 +36,7 @@ curl http://localhost:3001 -X POST -H "Content-Type: application/json" -d '
|
||||||
|
|
||||||
use crate::bootstrap::BootstrapData;
|
use crate::bootstrap::BootstrapData;
|
||||||
use crate::bootstrap::BootstrapEvent;
|
use crate::bootstrap::BootstrapEvent;
|
||||||
|
use crate::epoch::BlockSlotVerifier;
|
||||||
use crate::leader_schedule::LeaderScheduleData;
|
use crate::leader_schedule::LeaderScheduleData;
|
||||||
use crate::stakestore::StakeStore;
|
use crate::stakestore::StakeStore;
|
||||||
use crate::votestore::VoteStore;
|
use crate::votestore::VoteStore;
|
||||||
|
@ -55,6 +56,7 @@ use yellowstone_grpc_proto::geyser::SubscribeUpdateAccount;
|
||||||
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterAccounts;
|
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterAccounts;
|
||||||
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocks;
|
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocks;
|
||||||
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta;
|
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta;
|
||||||
|
use yellowstone_grpc_proto::prelude::SubscribeUpdateBlock;
|
||||||
use yellowstone_grpc_proto::{
|
use yellowstone_grpc_proto::{
|
||||||
prelude::{subscribe_update::UpdateOneof, SubscribeRequestFilterSlots},
|
prelude::{subscribe_update::UpdateOneof, SubscribeRequestFilterSlots},
|
||||||
tonic::service::Interceptor,
|
tonic::service::Interceptor,
|
||||||
|
@ -184,7 +186,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
||||||
//subscribe Geyser grpc
|
//subscribe Geyser grpc
|
||||||
//slot subscription
|
//slot subscription
|
||||||
let mut slots = HashMap::new();
|
let mut slots = HashMap::new();
|
||||||
slots.insert("client".to_string(), SubscribeRequestFilterSlots {});
|
slots.insert("client".to_string(), SubscribeRequestFilterSlots::default());
|
||||||
|
|
||||||
//account subscription
|
//account subscription
|
||||||
let mut accounts: HashMap<String, SubscribeRequestFilterAccounts> = HashMap::new();
|
let mut accounts: HashMap<String, SubscribeRequestFilterAccounts> = HashMap::new();
|
||||||
|
@ -195,8 +197,6 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
||||||
owner: vec![
|
owner: vec![
|
||||||
solana_sdk::stake::program::ID.to_string(),
|
solana_sdk::stake::program::ID.to_string(),
|
||||||
solana_sdk::vote::program::ID.to_string(),
|
solana_sdk::vote::program::ID.to_string(),
|
||||||
solana_sdk::sysvar::stake_history::ID.to_string(),
|
|
||||||
// solana_sdk::system_program::ID.to_string(),
|
|
||||||
],
|
],
|
||||||
filters: vec![],
|
filters: vec![],
|
||||||
},
|
},
|
||||||
|
@ -230,6 +230,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
||||||
Default::default(), //block meta
|
Default::default(), //block meta
|
||||||
Some(CommitmentLevel::Confirmed),
|
Some(CommitmentLevel::Confirmed),
|
||||||
vec![],
|
vec![],
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
@ -254,11 +255,13 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
||||||
|
|
||||||
//For DEBUG TODO remove:
|
//For DEBUG TODO remove:
|
||||||
//start stake verification loop
|
//start stake verification loop
|
||||||
let mut stake_verification_sender =
|
// let mut stake_verification_sender =
|
||||||
crate::stakestore::start_stake_verification_loop(RPC_URL.to_string()).await;
|
// crate::stakestore::start_stake_verification_loop(RPC_URL.to_string()).await;
|
||||||
|
|
||||||
//TODO remove. Store parent hash to see if we don't miss a block.
|
//use to process block at confirm slot.
|
||||||
let mut parent_block_slot = None;
|
//at confirm slot are send before the block.
|
||||||
|
//Verify that the last confirmed slot receive is the block slot.
|
||||||
|
let mut block_slot_verifier = BlockSlotVerifier::new();
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
Some(req) = request_rx.recv() => {
|
Some(req) = request_rx.recv() => {
|
||||||
|
@ -409,6 +412,12 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let CommitmentLevel::Confirmed = slot.status() {
|
||||||
|
log::trace!("Receive confirmed slot:{}", slot.slot);
|
||||||
|
if let Some((slot, block)) = block_slot_verifier.process_slot(slot.slot) {
|
||||||
|
process_block(block, slot);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Some(UpdateOneof::BlockMeta(block_meta)) => {
|
Some(UpdateOneof::BlockMeta(block_meta)) => {
|
||||||
log::info!("Receive Block Meta at slot: {}", block_meta.slot);
|
log::info!("Receive Block Meta at slot: {}", block_meta.slot);
|
||||||
|
@ -420,49 +429,8 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
||||||
block.parent_slot,
|
block.parent_slot,
|
||||||
);
|
);
|
||||||
|
|
||||||
//TODO remove; Detect missing block
|
if let Some((slot, block)) = block_slot_verifier.process_block(block) {
|
||||||
if let Some(parent_block_slot) = parent_block_slot {
|
process_block(block, slot);
|
||||||
if parent_block_slot != block.parent_slot {
|
|
||||||
log::error!("Bad parent slot stored:{} block:{}, miss a block"
|
|
||||||
,parent_block_slot,block.parent_slot
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
parent_block_slot = Some(block.slot);
|
|
||||||
|
|
||||||
//parse to detect stake merge tx.
|
|
||||||
//first in the main thread then in a specific thread.
|
|
||||||
let stake_public_key: Vec<u8> = solana_sdk::stake::program::id().to_bytes().to_vec();
|
|
||||||
for notif_tx in block.transactions {
|
|
||||||
if !notif_tx.is_vote {
|
|
||||||
if let Some(message) = notif_tx.transaction.and_then(|tx| tx.message) {
|
|
||||||
for instruction in message.instructions {
|
|
||||||
//filter stake tx
|
|
||||||
if message.account_keys[instruction.program_id_index as usize] == stake_public_key {
|
|
||||||
let source_bytes: [u8; 64] = notif_tx.signature[..solana_sdk::signature::SIGNATURE_BYTES]
|
|
||||||
.try_into()
|
|
||||||
.unwrap();
|
|
||||||
log::info!("New stake Tx sign:{} at block slot:{:?} current_slot:{} accounts:{:?}"
|
|
||||||
, solana_sdk::signature::Signature::from(source_bytes).to_string()
|
|
||||||
, block.slot
|
|
||||||
, current_epoch_state.current_slot.confirmed_slot
|
|
||||||
, instruction.accounts
|
|
||||||
);
|
|
||||||
let program_index = instruction.program_id_index;
|
|
||||||
crate::stakestore::process_stake_tx_message(
|
|
||||||
&mut stake_verification_sender,
|
|
||||||
&mut stakestore
|
|
||||||
, &message.account_keys
|
|
||||||
, instruction
|
|
||||||
, program_index
|
|
||||||
, block.slot
|
|
||||||
, current_epoch_state.current_epoch_end_slot(),
|
|
||||||
).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -509,6 +477,42 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn process_block(block: SubscribeUpdateBlock, confirmed_slot: Slot) {
|
||||||
|
//parse to detect stake merge tx.
|
||||||
|
//first in the main thread then in a specific thread.
|
||||||
|
let stake_public_key: Vec<u8> = solana_sdk::stake::program::id().to_bytes().to_vec();
|
||||||
|
for notif_tx in block.transactions {
|
||||||
|
if !notif_tx.is_vote {
|
||||||
|
if let Some(message) = notif_tx.transaction.and_then(|tx| tx.message) {
|
||||||
|
for instruction in message.instructions {
|
||||||
|
//filter stake tx
|
||||||
|
if message.account_keys[instruction.program_id_index as usize]
|
||||||
|
== stake_public_key
|
||||||
|
{
|
||||||
|
let source_bytes: [u8; 64] = notif_tx.signature
|
||||||
|
[..solana_sdk::signature::SIGNATURE_BYTES]
|
||||||
|
.try_into()
|
||||||
|
.unwrap();
|
||||||
|
log::info!(
|
||||||
|
"New stake Tx sign:{} at block slot:{:?} current_slot:{} accounts:{:?}",
|
||||||
|
solana_sdk::signature::Signature::from(source_bytes).to_string(),
|
||||||
|
block.slot,
|
||||||
|
confirmed_slot,
|
||||||
|
instruction.accounts
|
||||||
|
);
|
||||||
|
let program_index = instruction.program_id_index;
|
||||||
|
crate::stakestore::process_stake_tx_message(
|
||||||
|
&message.account_keys,
|
||||||
|
instruction,
|
||||||
|
program_index,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub struct AccountPretty {
|
pub struct AccountPretty {
|
||||||
|
|
|
@ -346,19 +346,6 @@ pub async fn start_stake_verification_loop(
|
||||||
request_tx
|
request_tx
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_verification(
|
|
||||||
_stake_sender: &mut Sender<(String, Pubkey, Option<StoredStake>)>,
|
|
||||||
_stakestore: &mut StakeStore,
|
|
||||||
_instr: &str,
|
|
||||||
_stake_pybkey: Pubkey,
|
|
||||||
) {
|
|
||||||
// let current_stake = stakestore.stakes.get(&stake_pybkey).cloned();
|
|
||||||
// stake_sender
|
|
||||||
// .send((instr.to_string(), stake_pybkey, current_stake))
|
|
||||||
// .await
|
|
||||||
// .unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
fn verify_account_len(account_keys: &[Pubkey], instr_accounts: &[u8], indexes: Vec<usize>) -> bool {
|
fn verify_account_len(account_keys: &[Pubkey], instr_accounts: &[u8], indexes: Vec<usize>) -> bool {
|
||||||
!indexes
|
!indexes
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
@ -367,15 +354,11 @@ fn verify_account_len(account_keys: &[Pubkey], instr_accounts: &[u8], indexes: V
|
||||||
.is_some()
|
.is_some()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn process_stake_tx_message(
|
pub fn process_stake_tx_message(
|
||||||
stake_sender: &mut Sender<(String, Pubkey, Option<StoredStake>)>,
|
|
||||||
stakestore: &mut StakeStore,
|
|
||||||
account_keys_vec: &[Vec<u8>],
|
account_keys_vec: &[Vec<u8>],
|
||||||
instruction: CompiledInstruction,
|
instruction: CompiledInstruction,
|
||||||
//for debug and trace purpose.
|
//for debug and trace purpose.
|
||||||
program_id_index: u32,
|
program_id_index: u32,
|
||||||
_tx_slot: Slot,
|
|
||||||
_current_end_epoch_slot: u64,
|
|
||||||
) {
|
) {
|
||||||
//for tracing purpose
|
//for tracing purpose
|
||||||
let account_keys: Vec<Pubkey> = account_keys_vec
|
let account_keys: Vec<Pubkey> = account_keys_vec
|
||||||
|
@ -427,14 +410,6 @@ pub async fn process_stake_tx_message(
|
||||||
} else {
|
} else {
|
||||||
log::warn!("StakeInstruction::Initialize authorized:{authorized} lockup:{lockup} Index error in instruction:{:?}", instruction.accounts);
|
log::warn!("StakeInstruction::Initialize authorized:{authorized} lockup:{lockup} Index error in instruction:{:?}", instruction.accounts);
|
||||||
}
|
}
|
||||||
|
|
||||||
send_verification(
|
|
||||||
stake_sender,
|
|
||||||
stakestore,
|
|
||||||
"Initialize",
|
|
||||||
account_keys[instruction.accounts[0] as usize],
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
StakeInstruction::Authorize(new_authorized, authority_type) => {
|
StakeInstruction::Authorize(new_authorized, authority_type) => {
|
||||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) {
|
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) {
|
||||||
|
@ -458,14 +433,6 @@ pub async fn process_stake_tx_message(
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
log::info!("StakeInstruction::Authorize value:{value} custodian:{custodian:?}");
|
log::info!("StakeInstruction::Authorize value:{value} custodian:{custodian:?}");
|
||||||
|
|
||||||
send_verification(
|
|
||||||
stake_sender,
|
|
||||||
stakestore,
|
|
||||||
"Authorize",
|
|
||||||
account_keys[instruction.accounts[0] as usize],
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
StakeInstruction::DelegateStake => {
|
StakeInstruction::DelegateStake => {
|
||||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4, 5]) {
|
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4, 5]) {
|
||||||
|
@ -484,14 +451,6 @@ pub async fn process_stake_tx_message(
|
||||||
"stakeAuthority": account_keys[instruction.accounts[5] as usize].to_string(),
|
"stakeAuthority": account_keys[instruction.accounts[5] as usize].to_string(),
|
||||||
});
|
});
|
||||||
log::info!("StakeInstruction::DelegateStake infos:{info}");
|
log::info!("StakeInstruction::DelegateStake infos:{info}");
|
||||||
|
|
||||||
send_verification(
|
|
||||||
stake_sender,
|
|
||||||
stakestore,
|
|
||||||
"DelegateStake",
|
|
||||||
account_keys[instruction.accounts[0] as usize],
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
StakeInstruction::Split(lamports) => {
|
StakeInstruction::Split(lamports) => {
|
||||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) {
|
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) {
|
||||||
|
@ -508,14 +467,6 @@ pub async fn process_stake_tx_message(
|
||||||
"lamports": lamports,
|
"lamports": lamports,
|
||||||
});
|
});
|
||||||
log::info!("StakeInstruction::Split infos:{info}");
|
log::info!("StakeInstruction::Split infos:{info}");
|
||||||
|
|
||||||
send_verification(
|
|
||||||
stake_sender,
|
|
||||||
stakestore,
|
|
||||||
"Split",
|
|
||||||
account_keys[instruction.accounts[0] as usize],
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
StakeInstruction::Withdraw(lamports) => {
|
StakeInstruction::Withdraw(lamports) => {
|
||||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) {
|
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) {
|
||||||
|
@ -536,14 +487,6 @@ pub async fn process_stake_tx_message(
|
||||||
let custodian = (instruction.accounts.len() >= 6)
|
let custodian = (instruction.accounts.len() >= 6)
|
||||||
.then(|| json!(account_keys[instruction.accounts[5] as usize].to_string()));
|
.then(|| json!(account_keys[instruction.accounts[5] as usize].to_string()));
|
||||||
log::info!("StakeInstruction::Withdraw custodian:{custodian:?}infos:{info}");
|
log::info!("StakeInstruction::Withdraw custodian:{custodian:?}infos:{info}");
|
||||||
|
|
||||||
send_verification(
|
|
||||||
stake_sender,
|
|
||||||
stakestore,
|
|
||||||
"Withdraw",
|
|
||||||
account_keys[instruction.accounts[0] as usize],
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
StakeInstruction::Deactivate => {
|
StakeInstruction::Deactivate => {
|
||||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) {
|
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) {
|
||||||
|
@ -585,14 +528,6 @@ pub async fn process_stake_tx_message(
|
||||||
"custodian": account_keys[instruction.accounts[1] as usize].to_string(),
|
"custodian": account_keys[instruction.accounts[1] as usize].to_string(),
|
||||||
});
|
});
|
||||||
log::info!("StakeInstruction::SetLockup unixTimestamp:{unix_timestamp} epoch:{epoch} custodian:{custodian} infos:{info}");
|
log::info!("StakeInstruction::SetLockup unixTimestamp:{unix_timestamp} epoch:{epoch} custodian:{custodian} infos:{info}");
|
||||||
|
|
||||||
send_verification(
|
|
||||||
stake_sender,
|
|
||||||
stakestore,
|
|
||||||
"SetLockup",
|
|
||||||
account_keys[instruction.accounts[0] as usize],
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
StakeInstruction::AuthorizeWithSeed(args) => {
|
StakeInstruction::AuthorizeWithSeed(args) => {
|
||||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1]) {
|
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1]) {
|
||||||
|
@ -615,14 +550,6 @@ pub async fn process_stake_tx_message(
|
||||||
let custodian = (instruction.accounts.len() >= 4)
|
let custodian = (instruction.accounts.len() >= 4)
|
||||||
.then(|| json!(account_keys[instruction.accounts[3] as usize].to_string()));
|
.then(|| json!(account_keys[instruction.accounts[3] as usize].to_string()));
|
||||||
log::info!("StakeInstruction::AuthorizeWithSeed clockSysvar:{clock_sysvar:?} custodian:{custodian:?} infos:{info}");
|
log::info!("StakeInstruction::AuthorizeWithSeed clockSysvar:{clock_sysvar:?} custodian:{custodian:?} infos:{info}");
|
||||||
|
|
||||||
send_verification(
|
|
||||||
stake_sender,
|
|
||||||
stakestore,
|
|
||||||
"AuthorizeWithSeed",
|
|
||||||
account_keys[instruction.accounts[0] as usize],
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
StakeInstruction::InitializeChecked => {
|
StakeInstruction::InitializeChecked => {
|
||||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) {
|
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) {
|
||||||
|
@ -639,14 +566,6 @@ pub async fn process_stake_tx_message(
|
||||||
"withdrawer": account_keys[instruction.accounts[3] as usize].to_string(),
|
"withdrawer": account_keys[instruction.accounts[3] as usize].to_string(),
|
||||||
});
|
});
|
||||||
log::info!("StakeInstruction::InitializeChecked infos:{info}");
|
log::info!("StakeInstruction::InitializeChecked infos:{info}");
|
||||||
|
|
||||||
send_verification(
|
|
||||||
stake_sender,
|
|
||||||
stakestore,
|
|
||||||
"InitializeChecked",
|
|
||||||
account_keys[instruction.accounts[0] as usize],
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
StakeInstruction::AuthorizeChecked(authority_type) => {
|
StakeInstruction::AuthorizeChecked(authority_type) => {
|
||||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) {
|
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) {
|
||||||
|
@ -666,14 +585,6 @@ pub async fn process_stake_tx_message(
|
||||||
let custodian = (instruction.accounts.len() >= 5)
|
let custodian = (instruction.accounts.len() >= 5)
|
||||||
.then(|| json!(account_keys[instruction.accounts[4] as usize].to_string()));
|
.then(|| json!(account_keys[instruction.accounts[4] as usize].to_string()));
|
||||||
log::info!("StakeInstruction::AuthorizeChecked custodian:{custodian:?} infos:{info}");
|
log::info!("StakeInstruction::AuthorizeChecked custodian:{custodian:?} infos:{info}");
|
||||||
|
|
||||||
send_verification(
|
|
||||||
stake_sender,
|
|
||||||
stakestore,
|
|
||||||
"AuthorizeChecked",
|
|
||||||
account_keys[instruction.accounts[0] as usize],
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
StakeInstruction::AuthorizeCheckedWithSeed(args) => {
|
StakeInstruction::AuthorizeCheckedWithSeed(args) => {
|
||||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) {
|
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3]) {
|
||||||
|
@ -697,14 +608,6 @@ pub async fn process_stake_tx_message(
|
||||||
log::info!(
|
log::info!(
|
||||||
"StakeInstruction::AuthorizeCheckedWithSeed custodian:{custodian:?} infos:{info}"
|
"StakeInstruction::AuthorizeCheckedWithSeed custodian:{custodian:?} infos:{info}"
|
||||||
);
|
);
|
||||||
|
|
||||||
send_verification(
|
|
||||||
stake_sender,
|
|
||||||
stakestore,
|
|
||||||
"AuthorizeCheckedWithSeed",
|
|
||||||
account_keys[instruction.accounts[0] as usize],
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
StakeInstruction::SetLockupChecked(lockup_args) => {
|
StakeInstruction::SetLockupChecked(lockup_args) => {
|
||||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) {
|
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2]) {
|
||||||
|
@ -729,14 +632,6 @@ pub async fn process_stake_tx_message(
|
||||||
"custodian": account_keys[instruction.accounts[1] as usize].to_string(),
|
"custodian": account_keys[instruction.accounts[1] as usize].to_string(),
|
||||||
});
|
});
|
||||||
log::info!("StakeInstruction::SetLockupChecked unixTimestamp:{unix_timestamp} epoch:{epoch} custodian:{custodian:?} infos:{info}");
|
log::info!("StakeInstruction::SetLockupChecked unixTimestamp:{unix_timestamp} epoch:{epoch} custodian:{custodian:?} infos:{info}");
|
||||||
|
|
||||||
send_verification(
|
|
||||||
stake_sender,
|
|
||||||
stakestore,
|
|
||||||
"SetLockupChecked",
|
|
||||||
account_keys[instruction.accounts[0] as usize],
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
StakeInstruction::GetMinimumDelegation => {
|
StakeInstruction::GetMinimumDelegation => {
|
||||||
log::info!("StakeInstruction::GetMinimumDelegation");
|
log::info!("StakeInstruction::GetMinimumDelegation");
|
||||||
|
@ -755,14 +650,6 @@ pub async fn process_stake_tx_message(
|
||||||
"referenceVoteAccount": account_keys[instruction.accounts[2] as usize].to_string(),
|
"referenceVoteAccount": account_keys[instruction.accounts[2] as usize].to_string(),
|
||||||
});
|
});
|
||||||
log::info!("StakeInstruction::DeactivateDelinquent infos:{info}");
|
log::info!("StakeInstruction::DeactivateDelinquent infos:{info}");
|
||||||
|
|
||||||
send_verification(
|
|
||||||
stake_sender,
|
|
||||||
stakestore,
|
|
||||||
"DeactivateDelinquent",
|
|
||||||
account_keys[instruction.accounts[0] as usize],
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
StakeInstruction::Redelegate => {
|
StakeInstruction::Redelegate => {
|
||||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) {
|
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) {
|
||||||
|
@ -780,14 +667,6 @@ pub async fn process_stake_tx_message(
|
||||||
"stakeAuthority": account_keys[instruction.accounts[4] as usize].to_string(),
|
"stakeAuthority": account_keys[instruction.accounts[4] as usize].to_string(),
|
||||||
});
|
});
|
||||||
log::info!("StakeInstruction::Redelegate infos:{info}");
|
log::info!("StakeInstruction::Redelegate infos:{info}");
|
||||||
|
|
||||||
send_verification(
|
|
||||||
stake_sender,
|
|
||||||
stakestore,
|
|
||||||
"Redelegate",
|
|
||||||
account_keys[instruction.accounts[0] as usize],
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
StakeInstruction::Merge => {
|
StakeInstruction::Merge => {
|
||||||
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) {
|
if !verify_account_len(&account_keys, &instruction.accounts, vec![0, 1, 2, 3, 4]) {
|
||||||
|
@ -827,22 +706,6 @@ pub async fn process_stake_tx_message(
|
||||||
// },
|
// },
|
||||||
// current_end_epoch_slot,
|
// current_end_epoch_slot,
|
||||||
// );
|
// );
|
||||||
|
|
||||||
// send_verification(
|
|
||||||
// stake_sender,
|
|
||||||
// stakestore,
|
|
||||||
// "Merge Destination",
|
|
||||||
// account_keys[instruction.accounts[0] as usize],
|
|
||||||
// )
|
|
||||||
// .await;
|
|
||||||
|
|
||||||
// send_verification(
|
|
||||||
// stake_sender,
|
|
||||||
// stakestore,
|
|
||||||
// "Merge Source",
|
|
||||||
// account_keys[instruction.accounts[1] as usize],
|
|
||||||
// )
|
|
||||||
// .await;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue