Watchtower can now emit a notifiation on all non-vote transactions (#9845)
This commit is contained in:
parent
894549f002
commit
de04563f18
|
@ -5045,11 +5045,14 @@ dependencies = [
|
|||
"reqwest",
|
||||
"serde_json",
|
||||
"solana-clap-utils",
|
||||
"solana-cli",
|
||||
"solana-cli-config",
|
||||
"solana-client",
|
||||
"solana-logger",
|
||||
"solana-metrics",
|
||||
"solana-sdk",
|
||||
"solana-transaction-status",
|
||||
"solana-vote-program",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
@ -5,7 +5,7 @@ use solana_sdk::{
|
|||
transaction::Transaction,
|
||||
};
|
||||
use solana_transaction_status::RpcTransactionStatusMeta;
|
||||
use std::fmt;
|
||||
use std::{fmt, io};
|
||||
|
||||
// Pretty print a "name value"
|
||||
pub fn println_name_value(name: &str, value: &str) {
|
||||
|
@ -64,33 +64,44 @@ pub fn println_signers(
|
|||
println!();
|
||||
}
|
||||
|
||||
pub fn println_transaction(
|
||||
pub fn write_transaction<W: io::Write>(
|
||||
w: &mut W,
|
||||
transaction: &Transaction,
|
||||
transaction_status: &Option<RpcTransactionStatusMeta>,
|
||||
prefix: &str,
|
||||
) {
|
||||
) -> io::Result<()> {
|
||||
let message = &transaction.message;
|
||||
println!("{}Recent Blockhash: {:?}", prefix, message.recent_blockhash);
|
||||
writeln!(
|
||||
w,
|
||||
"{}Recent Blockhash: {:?}",
|
||||
prefix, message.recent_blockhash
|
||||
)?;
|
||||
for (signature_index, signature) in transaction.signatures.iter().enumerate() {
|
||||
println!("{}Signature {}: {:?}", prefix, signature_index, signature);
|
||||
writeln!(
|
||||
w,
|
||||
"{}Signature {}: {:?}",
|
||||
prefix, signature_index, signature
|
||||
)?;
|
||||
}
|
||||
println!("{}{:?}", prefix, message.header);
|
||||
writeln!(w, "{}{:?}", prefix, message.header)?;
|
||||
for (account_index, account) in message.account_keys.iter().enumerate() {
|
||||
println!("{}Account {}: {:?}", prefix, account_index, account);
|
||||
writeln!(w, "{}Account {}: {:?}", prefix, account_index, account)?;
|
||||
}
|
||||
for (instruction_index, instruction) in message.instructions.iter().enumerate() {
|
||||
let program_pubkey = message.account_keys[instruction.program_id_index as usize];
|
||||
println!("{}Instruction {}", prefix, instruction_index);
|
||||
println!(
|
||||
writeln!(w, "{}Instruction {}", prefix, instruction_index)?;
|
||||
writeln!(
|
||||
w,
|
||||
"{} Program: {} ({})",
|
||||
prefix, program_pubkey, instruction.program_id_index
|
||||
);
|
||||
)?;
|
||||
for (account_index, account) in instruction.accounts.iter().enumerate() {
|
||||
let account_pubkey = message.account_keys[*account as usize];
|
||||
println!(
|
||||
writeln!(
|
||||
w,
|
||||
"{} Account {}: {} ({})",
|
||||
prefix, account_index, account_pubkey, account
|
||||
);
|
||||
)?;
|
||||
}
|
||||
|
||||
let mut raw = true;
|
||||
|
@ -99,7 +110,7 @@ pub fn println_transaction(
|
|||
solana_vote_program::vote_instruction::VoteInstruction,
|
||||
>(&instruction.data)
|
||||
{
|
||||
println!("{} {:?}", prefix, vote_instruction);
|
||||
writeln!(w, "{} {:?}", prefix, vote_instruction)?;
|
||||
raw = false;
|
||||
}
|
||||
} else if program_pubkey == solana_stake_program::id() {
|
||||
|
@ -107,7 +118,7 @@ pub fn println_transaction(
|
|||
solana_stake_program::stake_instruction::StakeInstruction,
|
||||
>(&instruction.data)
|
||||
{
|
||||
println!("{} {:?}", prefix, stake_instruction);
|
||||
writeln!(w, "{} {:?}", prefix, stake_instruction)?;
|
||||
raw = false;
|
||||
}
|
||||
} else if program_pubkey == solana_sdk::system_program::id() {
|
||||
|
@ -115,26 +126,27 @@ pub fn println_transaction(
|
|||
solana_sdk::system_instruction::SystemInstruction,
|
||||
>(&instruction.data)
|
||||
{
|
||||
println!("{} {:?}", prefix, system_instruction);
|
||||
writeln!(w, "{} {:?}", prefix, system_instruction)?;
|
||||
raw = false;
|
||||
}
|
||||
}
|
||||
|
||||
if raw {
|
||||
println!("{} Data: {:?}", prefix, instruction.data);
|
||||
writeln!(w, "{} Data: {:?}", prefix, instruction.data)?;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(transaction_status) = transaction_status {
|
||||
println!(
|
||||
writeln!(
|
||||
w,
|
||||
"{}Status: {}",
|
||||
prefix,
|
||||
match &transaction_status.status {
|
||||
Ok(_) => "Ok".into(),
|
||||
Err(err) => err.to_string(),
|
||||
}
|
||||
);
|
||||
println!("{} Fee: {}", prefix, transaction_status.fee);
|
||||
)?;
|
||||
writeln!(w, "{} Fee: {}", prefix, transaction_status.fee)?;
|
||||
assert_eq!(
|
||||
transaction_status.pre_balances.len(),
|
||||
transaction_status.post_balances.len()
|
||||
|
@ -146,23 +158,40 @@ pub fn println_transaction(
|
|||
.enumerate()
|
||||
{
|
||||
if pre == post {
|
||||
println!(
|
||||
writeln!(
|
||||
w,
|
||||
"{} Account {} balance: {} SOL",
|
||||
prefix,
|
||||
i,
|
||||
lamports_to_sol(*pre)
|
||||
);
|
||||
)?;
|
||||
} else {
|
||||
println!(
|
||||
writeln!(
|
||||
w,
|
||||
"{} Account {} balance: {} SOL -> {} SOL",
|
||||
prefix,
|
||||
i,
|
||||
lamports_to_sol(*pre),
|
||||
lamports_to_sol(*post)
|
||||
);
|
||||
)?;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
println!("{}Status: Unavailable", prefix);
|
||||
writeln!(w, "{}Status: Unavailable", prefix)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn println_transaction(
|
||||
transaction: &Transaction,
|
||||
transaction_status: &Option<RpcTransactionStatusMeta>,
|
||||
prefix: &str,
|
||||
) {
|
||||
let mut w = Vec::new();
|
||||
if write_transaction(&mut w, transaction, transaction_status, prefix).is_ok() {
|
||||
if let Ok(s) = String::from_utf8(w) {
|
||||
print!("{}", s);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,10 +16,13 @@ reqwest = { version = "0.10.4", default-features = false, features = ["blocking"
|
|||
serde_json = "1.0"
|
||||
solana-clap-utils = { path = "../clap-utils", version = "1.2.0" }
|
||||
solana-cli-config = { path = "../cli-config", version = "1.2.0" }
|
||||
solana-cli = { path = "../cli", version = "1.2.0" }
|
||||
solana-client = { path = "../client", version = "1.2.0" }
|
||||
solana-logger = { path = "../logger", version = "1.2.0" }
|
||||
solana-metrics = { path = "../metrics", version = "1.2.0" }
|
||||
solana-sdk = { path = "../sdk", version = "1.2.0" }
|
||||
solana-transaction-status = { path = "../transaction-status", version = "1.2.0" }
|
||||
solana-vote-program = { path = "../programs/vote", version = "1.2.0" }
|
||||
|
||||
[[bin]]
|
||||
name = "solana-watchtower"
|
||||
|
|
|
@ -13,7 +13,12 @@ use solana_client::{
|
|||
client_error::Result as ClientResult, rpc_client::RpcClient, rpc_response::RpcVoteAccountStatus,
|
||||
};
|
||||
use solana_metrics::{datapoint_error, datapoint_info};
|
||||
use solana_sdk::{hash::Hash, native_token::lamports_to_sol, pubkey::Pubkey};
|
||||
use solana_sdk::{
|
||||
clock::Slot, hash::Hash, native_token::lamports_to_sol, program_utils::limited_deserialize,
|
||||
pubkey::Pubkey,
|
||||
};
|
||||
use solana_transaction_status::{ConfirmedBlock, TransactionEncoding};
|
||||
use solana_vote_program::vote_instruction::VoteInstruction;
|
||||
use std::{
|
||||
error,
|
||||
str::FromStr,
|
||||
|
@ -27,6 +32,7 @@ struct Config {
|
|||
validator_identity_pubkeys: Vec<String>,
|
||||
no_duplicate_notifications: bool,
|
||||
monitor_active_stake: bool,
|
||||
notify_on_transactions: bool,
|
||||
}
|
||||
|
||||
fn get_config() -> Config {
|
||||
|
@ -101,6 +107,14 @@ fn get_config() -> Config {
|
|||
.takes_value(false)
|
||||
.help("Alert when the current stake for the cluster drops below 80%"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("notify_on_transactions")
|
||||
.long("notify-on-transactions")
|
||||
.takes_value(false)
|
||||
.help("Send a notification on all non-vote transactions. This can be very verbose!\
|
||||
Note that the notification environment variables used by this feature all require a \
|
||||
TRANSACTION_NOTIFIER_ prefix. For example: TRANSACTION_NOTIFIER_SLACK_WEBHOOK"),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
let config = if let Some(config_file) = matches.value_of("config_file") {
|
||||
|
@ -120,6 +134,7 @@ fn get_config() -> Config {
|
|||
|
||||
let no_duplicate_notifications = matches.is_present("no_duplicate_notifications");
|
||||
let monitor_active_stake = matches.is_present("monitor_active_stake");
|
||||
let notify_on_transactions = matches.is_present("notify_on_transactions");
|
||||
|
||||
let config = Config {
|
||||
interval,
|
||||
|
@ -127,6 +142,7 @@ fn get_config() -> Config {
|
|||
validator_identity_pubkeys,
|
||||
no_duplicate_notifications,
|
||||
monitor_active_stake,
|
||||
notify_on_transactions,
|
||||
};
|
||||
|
||||
info!("RPC URL: {}", config.json_rpc_url);
|
||||
|
@ -139,6 +155,133 @@ fn get_config() -> Config {
|
|||
config
|
||||
}
|
||||
|
||||
fn process_confirmed_block(notifier: &Notifier, slot: Slot, confirmed_block: ConfirmedBlock) {
|
||||
let mut vote_transactions = 0;
|
||||
|
||||
for rpc_transaction in &confirmed_block.transactions {
|
||||
if let Some(transaction) = rpc_transaction.transaction.decode() {
|
||||
if transaction.verify().is_ok() {
|
||||
let mut notify = true;
|
||||
|
||||
// Ignore simple Vote transactions since they are too prevalent
|
||||
if transaction.message.instructions.len() == 1 {
|
||||
let instruction = &transaction.message.instructions[0];
|
||||
let program_pubkey =
|
||||
transaction.message.account_keys[instruction.program_id_index as usize];
|
||||
if program_pubkey == solana_vote_program::id() {
|
||||
if let Ok(VoteInstruction::Vote(_)) =
|
||||
limited_deserialize::<VoteInstruction>(&instruction.data)
|
||||
{
|
||||
vote_transactions += 1;
|
||||
notify = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if notify {
|
||||
let mut w = Vec::new();
|
||||
if solana_cli::display::write_transaction(
|
||||
&mut w,
|
||||
&transaction,
|
||||
&rpc_transaction.meta,
|
||||
"",
|
||||
)
|
||||
.is_ok()
|
||||
{
|
||||
if let Ok(s) = String::from_utf8(w) {
|
||||
notifier.send(&format!("```Slot: {}\n{}```", slot, s));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
datapoint_error!(
|
||||
"watchtower-sanity-failure",
|
||||
("slot", slot, i64),
|
||||
("err", "Transaction signature verification failed", String)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
info!(
|
||||
"Process slot {} with {} regular transactions (and {} votes)",
|
||||
slot,
|
||||
confirmed_block.transactions.len() - vote_transactions,
|
||||
vote_transactions
|
||||
);
|
||||
}
|
||||
|
||||
fn load_blocks(
|
||||
rpc_client: &RpcClient,
|
||||
start_slot: Slot,
|
||||
end_slot: Slot,
|
||||
) -> ClientResult<Vec<(Slot, ConfirmedBlock)>> {
|
||||
info!(
|
||||
"Loading confirmed blocks between slots: {} - {}",
|
||||
start_slot, end_slot
|
||||
);
|
||||
|
||||
let slots = rpc_client.get_confirmed_blocks(start_slot, Some(end_slot))?;
|
||||
|
||||
let mut blocks = vec![];
|
||||
for slot in slots.into_iter() {
|
||||
let block =
|
||||
rpc_client.get_confirmed_block_with_encoding(slot, TransactionEncoding::Binary)?;
|
||||
blocks.push((slot, block));
|
||||
}
|
||||
Ok(blocks)
|
||||
}
|
||||
|
||||
fn transaction_monitor(rpc_client: RpcClient) {
|
||||
let notifier = Notifier::new_with_env_prefix("TRANSACTION_NOTIFIER_");
|
||||
let mut start_slot = loop {
|
||||
match rpc_client.get_slot() {
|
||||
Ok(slot) => break slot,
|
||||
Err(err) => {
|
||||
warn!("Failed to get current slot: {}", err);
|
||||
}
|
||||
}
|
||||
sleep(Duration::from_secs(1));
|
||||
};
|
||||
|
||||
loop {
|
||||
let end_slot = start_slot + 50;
|
||||
info!("start_slot:{} - end_slot:{}", start_slot, end_slot);
|
||||
|
||||
let latest_available_slot = rpc_client.get_slot().unwrap_or_else(|err| {
|
||||
info!("get_slot() failed: {}", err);
|
||||
0
|
||||
});
|
||||
|
||||
if latest_available_slot <= start_slot {
|
||||
info!("Waiting for a slot greater than {}...", start_slot);
|
||||
sleep(Duration::from_secs(5));
|
||||
continue;
|
||||
}
|
||||
|
||||
match load_blocks(&rpc_client, start_slot + 1, end_slot) {
|
||||
Ok(blocks) => {
|
||||
info!("Loaded {} blocks", blocks.len());
|
||||
|
||||
if blocks.is_empty() && end_slot < latest_available_slot {
|
||||
start_slot = end_slot;
|
||||
} else {
|
||||
for (slot, block) in blocks.into_iter() {
|
||||
process_confirmed_block(¬ifier, slot, block);
|
||||
start_slot = slot;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
info!(
|
||||
"failed to get blocks in range ({},{}): {}",
|
||||
start_slot, end_slot, err
|
||||
);
|
||||
sleep(Duration::from_secs(1));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_cluster_info(rpc_client: &RpcClient) -> ClientResult<(u64, Hash, RpcVoteAccountStatus)> {
|
||||
let transaction_count = rpc_client.get_transaction_count()?;
|
||||
let recent_blockhash = rpc_client.get_recent_blockhash()?.0;
|
||||
|
@ -152,8 +295,14 @@ fn main() -> Result<(), Box<dyn error::Error>> {
|
|||
solana_logger::setup_with_default("solana=info");
|
||||
solana_metrics::set_panic_hook("watchtower");
|
||||
|
||||
let rpc_client = RpcClient::new(config.json_rpc_url);
|
||||
let _notify_thread = if config.notify_on_transactions {
|
||||
let rpc_client = RpcClient::new(config.json_rpc_url.clone());
|
||||
Some(std::thread::spawn(move || transaction_monitor(rpc_client)))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let rpc_client = RpcClient::new(config.json_rpc_url.clone());
|
||||
let notifier = Notifier::new();
|
||||
let mut last_transaction_count = 0;
|
||||
let mut last_recent_blockhash = Hash::default();
|
||||
|
|
|
@ -66,19 +66,27 @@ pub struct Notifier {
|
|||
|
||||
impl Notifier {
|
||||
pub fn new() -> Self {
|
||||
let discord_webhook = env::var("DISCORD_WEBHOOK")
|
||||
Self::new_with_env_prefix("")
|
||||
}
|
||||
|
||||
pub fn new_with_env_prefix(env_prefix: &str) -> Self {
|
||||
info!("Initializing {}Notifier", env_prefix);
|
||||
|
||||
let discord_webhook = env::var(format!("{}DISCORD_WEBHOOK", env_prefix))
|
||||
.map_err(|_| {
|
||||
info!("Discord notifications disabled");
|
||||
})
|
||||
.ok();
|
||||
let slack_webhook = env::var("SLACK_WEBHOOK")
|
||||
let slack_webhook = env::var(format!("{}SLACK_WEBHOOK", env_prefix))
|
||||
.map_err(|_| {
|
||||
info!("Slack notifications disabled");
|
||||
})
|
||||
.ok();
|
||||
let telegram_webhook = if let (Ok(bot_token), Ok(chat_id)) =
|
||||
(env::var("TELEGRAM_BOT_TOKEN"), env::var("TELEGRAM_CHAT_ID"))
|
||||
{
|
||||
|
||||
let telegram_webhook = if let (Ok(bot_token), Ok(chat_id)) = (
|
||||
env::var(format!("{}TELEGRAM_BOT_TOKEN", env_prefix)),
|
||||
env::var(format!("{}TELEGRAM_CHAT_ID", env_prefix)),
|
||||
) {
|
||||
Some(TelegramWebHook { bot_token, chat_id })
|
||||
} else {
|
||||
info!("Telegram notifications disabled");
|
||||
|
|
Loading…
Reference in New Issue