diff --git a/Cargo.lock b/Cargo.lock index ff2537befe..31f03d48b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4569,6 +4569,15 @@ dependencies = [ "solana-sdk", ] +[[package]] +name = "solana-notifier" +version = "1.2.0" +dependencies = [ + "log 0.4.8", + "reqwest", + "serde_json", +] + [[package]] name = "solana-ownable" version = "1.2.0" @@ -4618,6 +4627,7 @@ dependencies = [ "solana-logger", "solana-metrics", "solana-net-utils", + "solana-notifier", "solana-sdk", "solana-stake-program", "tar", @@ -4993,14 +5003,13 @@ dependencies = [ "clap", "humantime 2.0.0", "log 0.4.8", - "reqwest", - "serde_json", "solana-clap-utils", "solana-cli", "solana-cli-config", "solana-client", "solana-logger", "solana-metrics", + "solana-notifier", "solana-sdk", "solana-transaction-status", "solana-version", diff --git a/Cargo.toml b/Cargo.toml index 61389a8afb..9211fd611b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ members = [ "measure", "metrics", "net-shaper", + "notifier", "programs/bpf_loader", "programs/budget", "programs/btc_spv", diff --git a/notifier/.gitignore b/notifier/.gitignore new file mode 100644 index 0000000000..5404b132db --- /dev/null +++ b/notifier/.gitignore @@ -0,0 +1,2 @@ +/target/ +/farf/ diff --git a/notifier/Cargo.toml b/notifier/Cargo.toml new file mode 100644 index 0000000000..c7f2e94e8b --- /dev/null +++ b/notifier/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "solana-notifier" +version = "1.2.0" +description = "Solana Notifier" +authors = ["Solana Maintainers "] +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +edition = "2018" + +[dependencies] +log = "0.4.8" +reqwest = { version = "0.10.4", default-features = false, features = ["blocking", "rustls-tls", "json"] } +serde_json = "1.0" + +[lib] +name = "solana_notifier" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/watchtower/src/notifier.rs b/notifier/src/lib.rs similarity index 67% rename from watchtower/src/notifier.rs rename to notifier/src/lib.rs index 0553f464b2..ed1d418aa8 100644 --- a/watchtower/src/notifier.rs +++ b/notifier/src/lib.rs @@ -1,7 +1,26 @@ +/// To activate Slack, Discord and/or Telegram notifications, define these environment variables +/// before using the `Notifier` +/// ```bash +/// export SLACK_WEBHOOK=... +/// export DISCORD_WEBHOOK=... +/// ``` +/// +/// Telegram requires the following two variables: +/// ```bash +/// export TELEGRAM_BOT_TOKEN=... +/// export TELEGRAM_CHAT_ID=... +/// ``` +/// +/// To receive a Twilio SMS notification on failure, having a Twilio account, +/// and a sending number owned by that account, +/// define environment variable before running `solana-watchtower`: +/// ```bash +/// export TWILIO_CONFIG='ACCOUNT=,TOKEN=,TO=,FROM=' +/// ``` use log::*; -use reqwest::blocking::Client; +use reqwest::{blocking::Client, StatusCode}; use serde_json::json; -use std::env; +use std::{env, thread::sleep, time::Duration}; struct TelegramWebHook { bot_token: String, @@ -65,11 +84,11 @@ pub struct Notifier { } impl Notifier { - pub fn new() -> Self { - Self::new_with_env_prefix("") + pub fn default() -> Self { + Self::new("") } - pub fn new_with_env_prefix(env_prefix: &str) -> Self { + pub fn new(env_prefix: &str) -> Self { info!("Initializing {}Notifier", env_prefix); let discord_webhook = env::var(format!("{}DISCORD_WEBHOOK", env_prefix)) @@ -107,9 +126,30 @@ impl Notifier { pub fn send(&self, msg: &str) { if let Some(webhook) = &self.discord_webhook { - let data = json!({ "content": msg }); - if let Err(err) = self.client.post(webhook).json(&data).send() { - warn!("Failed to send Discord message: {:?}", err); + for line in msg.split('\n') { + // Discord rate limiting is aggressive, limit to 1 message a second + sleep(Duration::from_millis(1000)); + + info!("Sending {}", line); + let data = json!({ "content": line }); + + loop { + let response = self.client.post(webhook).json(&data).send(); + + if let Err(err) = response { + warn!("Failed to send Discord message: \"{}\": {:?}", line, err); + break; + } else if let Ok(response) = response { + info!("response status: {}", response.status()); + if response.status() == StatusCode::TOO_MANY_REQUESTS { + warn!("rate limited!..."); + warn!("response text: {:?}", response.text()); + sleep(Duration::from_secs(2)); + } else { + break; + } + } + } } } diff --git a/ramp-tps/Cargo.toml b/ramp-tps/Cargo.toml index 9b877b848e..be4e344582 100644 --- a/ramp-tps/Cargo.toml +++ b/ramp-tps/Cargo.toml @@ -21,6 +21,7 @@ solana-client = { path = "../client", version = "1.2.0" } solana-logger = { path = "../logger", version = "1.2.0" } solana-metrics = { path = "../metrics", version = "1.2.0" } solana-net-utils = { path = "../net-utils", version = "1.2.0" } +solana-notifier = { path = "../notifier", version = "1.2.0" } solana-sdk = { path = "../sdk", version = "1.2.0" } solana-stake-program = { path = "../programs/stake", version = "1.2.0" } tar = "0.4.26" diff --git a/ramp-tps/src/main.rs b/ramp-tps/src/main.rs index dcc8132795..0735dcb51c 100644 --- a/ramp-tps/src/main.rs +++ b/ramp-tps/src/main.rs @@ -1,6 +1,5 @@ //! Ramp up TPS for Tour de SOL until all validators drop out -mod notifier; mod results; mod stake; mod tps; @@ -54,7 +53,7 @@ fn gift_for_round(tps_round: u32, initial_balance: u64) -> u64 { fn main() { solana_logger::setup_with_default("solana=debug"); solana_metrics::set_panic_hook("ramp-tps"); - let mut notifier = notifier::Notifier::new(); + let mut notifier = solana_notifier::Notifier::default(); let matches = App::new(crate_name!()) .about(crate_description!()) @@ -199,7 +198,7 @@ fn main() { let _ = fs::remove_dir_all(&tmp_ledger_path); fs::create_dir_all(&tmp_ledger_path).expect("failed to create temp ledger path"); - notifier.notify("Hi!"); + notifier.send("Hi!"); datapoint_info!("ramp-tps", ("event", "boot", String),); let entrypoint_str = matches.value_of("entrypoint").unwrap(); @@ -219,7 +218,7 @@ fn main() { debug!("First normal slot: {}", first_normal_slot); let sleep_slots = first_normal_slot.saturating_sub(current_slot); if sleep_slots > 0 { - notifier.notify(&format!( + notifier.send(&format!( "Waiting for warm-up epochs to complete (epoch {})", epoch_schedule.first_normal_epoch )); @@ -291,7 +290,7 @@ fn main() { let mut tps_sampler = tps::Sampler::new(&entrypoint_addr); loop { - notifier.notify(&format!("Round {}!", tps_round)); + notifier.send(&format!("Round {}!", tps_round)); let tx_count = tx_count_for_round(tps_round, tx_count_baseline, tx_count_increment); datapoint_info!( "ramp-tps", @@ -328,7 +327,7 @@ fn main() { ("validators", starting_validators.len(), i64) ); - notifier.buffer(format!( + notifier.send(&format!( "There are {} validators present:", starting_validators.len() )); @@ -338,11 +337,10 @@ fn main() { .map(|node_pubkey| format!("* {}", pubkey_to_keybase(&node_pubkey))) .collect(); validators.sort(); - notifier.buffer_vec(validators); - notifier.flush(); + notifier.send(&validators.join("\n")); let client_tx_count = tx_count / NUM_BENCH_CLIENTS as u64; - notifier.notify(&format!( + notifier.send(&format!( "Starting transactions for {} minutes (batch size={})", round_minutes, tx_count, )); @@ -393,7 +391,7 @@ fn main() { ("round", tps_round, i64), ); - notifier.notify("Transactions stopped"); + notifier.send("Transactions stopped"); tps_sampler.report_results(¬ifier); let remaining_validators = voters::fetch_active_validators(&rpc_client); diff --git a/ramp-tps/src/notifier.rs b/ramp-tps/src/notifier.rs deleted file mode 100644 index 7f723bea36..0000000000 --- a/ramp-tps/src/notifier.rs +++ /dev/null @@ -1,92 +0,0 @@ -use log::*; -use reqwest::{blocking::Client, StatusCode}; -use serde_json::json; -use std::{env, thread::sleep, time::Duration}; - -/// For each notification -/// 1) Log an info level message -/// 2) Notify Slack channel if Slack is configured -/// 3) Notify Discord channel if Discord is configured -pub struct Notifier { - buffer: Vec, - client: Client, - discord_webhook: Option, - slack_webhook: Option, -} - -impl Notifier { - pub fn new() -> Self { - let discord_webhook = env::var("DISCORD_WEBHOOK") - .map_err(|_| { - warn!("Discord notifications disabled"); - }) - .ok(); - let slack_webhook = env::var("SLACK_WEBHOOK") - .map_err(|_| { - warn!("Slack notifications disabled"); - }) - .ok(); - Notifier { - buffer: Vec::new(), - client: Client::new(), - discord_webhook, - slack_webhook, - } - } - - fn send(&self, msg: &str) { - if let Some(webhook) = &self.discord_webhook { - for line in msg.split('\n') { - // Discord rate limiting is aggressive, limit to 1 message a second to keep - // it from getting mad at us... - sleep(Duration::from_millis(1000)); - - info!("Sending {}", line); - let data = json!({ "content": line }); - - loop { - let response = self.client.post(webhook).json(&data).send(); - - if let Err(err) = response { - warn!("Failed to send Discord message: \"{}\": {:?}", line, err); - break; - } else if let Ok(response) = response { - info!("response status: {}", response.status()); - if response.status() == StatusCode::TOO_MANY_REQUESTS { - warn!("rate limited!..."); - warn!("response text: {:?}", response.text()); - std::thread::sleep(Duration::from_secs(2)); - } else { - break; - } - } - } - } - } - - if let Some(webhook) = &self.slack_webhook { - let data = json!({ "text": msg }); - if let Err(err) = self.client.post(webhook).json(&data).send() { - warn!("Failed to send Slack message: {:?}", err); - } - } - } - - pub fn buffer(&mut self, msg: String) { - self.buffer.push(msg); - } - - pub fn buffer_vec(&mut self, mut msgs: Vec) { - self.buffer.append(&mut msgs); - } - - pub fn flush(&mut self) { - self.notify(&self.buffer.join("\n")); - self.buffer.clear(); - } - - pub fn notify(&self, msg: &str) { - info!("{}", msg); - self.send(msg); - } -} diff --git a/ramp-tps/src/stake.rs b/ramp-tps/src/stake.rs index 8289035baa..9b3ce1bc42 100644 --- a/ramp-tps/src/stake.rs +++ b/ramp-tps/src/stake.rs @@ -1,4 +1,4 @@ -use crate::{notifier, utils}; +use crate::utils; use log::*; use solana_client::{rpc_client::RpcClient, rpc_response::RpcEpochInfo}; use solana_sdk::{ @@ -64,11 +64,11 @@ pub fn wait_for_warm_up( rpc_client: &RpcClient, stake_config: &StakeConfig, genesis_config: &GenesisConfig, - notifier: ¬ifier::Notifier, + notifier: &solana_notifier::Notifier, ) { // Sleep until activation_epoch has finished if epoch_info.epoch <= activation_epoch { - notifier.notify(&format!( + notifier.send(&format!( "Waiting until epoch {} is finished...", activation_epoch )); @@ -105,7 +105,7 @@ pub fn wait_for_warm_up( let warm_up_epochs = calculate_stake_warmup(stake_entry, stake_config); let stake_warmed_up_epoch = latest_epoch + warm_up_epochs; if stake_warmed_up_epoch > current_epoch { - notifier.notify(&format!( + notifier.send(&format!( "Waiting until epoch {} for stake to warmup (current epoch is {})...", stake_warmed_up_epoch, current_epoch )); diff --git a/ramp-tps/src/tps.rs b/ramp-tps/src/tps.rs index 316baac7fb..96db434a37 100644 --- a/ramp-tps/src/tps.rs +++ b/ramp-tps/src/tps.rs @@ -1,7 +1,7 @@ -use crate::notifier::Notifier; use log::*; use solana_client::perf_utils::{sample_txs, SampleStats}; use solana_client::thin_client::ThinClient; +use solana_notifier::Notifier; use solana_sdk::timing::duration_as_s; use std::{ net::SocketAddr, @@ -64,7 +64,7 @@ impl Sampler { pub fn report_results(&self, notifier: &Notifier) { let SampleStats { tps, elapsed, txs } = self.maxes.read().unwrap()[0].1; let avg_tps = txs as f32 / duration_as_s(&elapsed); - notifier.notify(&format!( + notifier.send(&format!( "Highest TPS: {:.0}, Average TPS: {:.0}", tps, avg_tps )); diff --git a/ramp-tps/src/utils.rs b/ramp-tps/src/utils.rs index dbc97a693b..6a6052e472 100644 --- a/ramp-tps/src/utils.rs +++ b/ramp-tps/src/utils.rs @@ -1,8 +1,8 @@ -use crate::notifier::Notifier; use bzip2::bufread::BzDecoder; use log::*; use solana_client::rpc_client::RpcClient; use solana_net_utils::parse_host; +use solana_notifier::Notifier; use solana_sdk::{ clock::{Epoch, Slot}, genesis_config::GenesisConfig, @@ -89,8 +89,8 @@ pub fn is_host(string: String) -> Result<(), String> { Ok(()) } -pub fn bail(notifier: &crate::notifier::Notifier, msg: &str) -> ! { - notifier.notify(msg); +pub fn bail(notifier: &Notifier, msg: &str) -> ! { + notifier.send(msg); sleep(Duration::from_secs(30)); // Wait for notifications to send std::process::exit(1); } diff --git a/ramp-tps/src/voters.rs b/ramp-tps/src/voters.rs index 2eae40a822..ce7a80f72e 100644 --- a/ramp-tps/src/voters.rs +++ b/ramp-tps/src/voters.rs @@ -1,7 +1,7 @@ -use crate::notifier::Notifier; use crate::utils; use log::*; use solana_client::{client_error::Result as ClientResult, rpc_client::RpcClient}; +use solana_notifier::Notifier; use solana_sdk::{ clock::Slot, epoch_schedule::EpochSchedule, @@ -183,7 +183,7 @@ pub fn announce_results( ) { let buffer_records = |keys: Vec<&Pubkey>, notifier: &mut Notifier| { if keys.is_empty() { - notifier.buffer("* None".to_string()); + notifier.send("* None"); return; } @@ -199,7 +199,7 @@ pub fn announce_results( } } validators.sort(); - notifier.buffer_vec(validators); + notifier.send(&validators.join("\n")); }; let healthy: Vec<_> = remaining_validators @@ -217,14 +217,12 @@ pub fn announce_results( .filter(|k| !remaining_validators.contains_key(k)) .collect(); - notifier.buffer("Healthy Validators:".to_string()); + notifier.send("Healthy Validators:"); buffer_records(healthy, notifier); - notifier.buffer("Unhealthy Validators:".to_string()); + notifier.send("Unhealthy Validators:"); buffer_records(unhealthy, notifier); - notifier.buffer("Inactive Validators:".to_string()); + notifier.send("Inactive Validators:"); buffer_records(inactive, notifier); - - notifier.flush(); } /// Award stake to the surviving validators by delegating stake to their vote account @@ -235,10 +233,12 @@ pub fn award_stake( sol_gift: u64, notifier: &mut Notifier, ) { + let mut buffer = vec![]; + for (node_pubkey, vote_account_pubkey) in voters { info!("Delegate {} SOL to {}", sol_gift, node_pubkey); delegate_stake(rpc_client, faucet_keypair, vote_account_pubkey, sol_gift); - notifier.buffer(format!("Delegated {} SOL to {}", sol_gift, node_pubkey)); + buffer.push(format!("Delegated {} SOL to {}", sol_gift, node_pubkey)); } - notifier.flush(); + notifier.send(&buffer.join("\n")); } diff --git a/watchtower/Cargo.toml b/watchtower/Cargo.toml index 6b9e8157c2..09a345fd34 100644 --- a/watchtower/Cargo.toml +++ b/watchtower/Cargo.toml @@ -12,14 +12,13 @@ homepage = "https://solana.com/" clap = "2.33.1" log = "0.4.8" humantime = "2.0.0" -reqwest = { version = "0.10.4", default-features = false, features = ["blocking", "rustls-tls", "json"] } -serde_json = "1.0" solana-clap-utils = { path = "../clap-utils", version = "1.2.0" } solana-cli-config = { path = "../cli-config", version = "1.2.0" } solana-cli = { path = "../cli", version = "1.2.0" } solana-client = { path = "../client", version = "1.2.0" } solana-logger = { path = "../logger", version = "1.2.0" } solana-metrics = { path = "../metrics", version = "1.2.0" } +solana-notifier = { path = "../notifier", version = "1.2.0" } solana-sdk = { path = "../sdk", version = "1.2.0" } solana-transaction-status = { path = "../transaction-status", version = "1.2.0" } solana-version = { path = "../version", version = "1.2.0" } diff --git a/watchtower/README.md b/watchtower/README.md index cc4230937d..33bad3e458 100644 --- a/watchtower/README.md +++ b/watchtower/README.md @@ -23,25 +23,3 @@ On failure this data point contains details about the specific test that failed the following fields: * `test`: name of the sanity test that failed * `err`: exact sanity failure message - - -### Sanity failure push notification -To receive a Slack, Discord and/or Telegram notification on sanity failure, -define environment variables before running `solana-watchtower`: -``` -export SLACK_WEBHOOK=... -export DISCORD_WEBHOOK=... -``` - -Telegram requires the following two variables: -``` -export TELEGRAM_BOT_TOKEN=... -export TELEGRAM_CHAT_ID=... -``` - -To receive a Twilio SMS notification on failure, having a Twilio account, -and a sending number owned by that account, -define environment variable before running `solana-watchtower`: -``` -export TWILIO_CONFIG='ACCOUNT=,TOKEN=,TO=,FROM=' -``` diff --git a/watchtower/src/main.rs b/watchtower/src/main.rs index 499f23da26..621f32fece 100644 --- a/watchtower/src/main.rs +++ b/watchtower/src/main.rs @@ -1,8 +1,5 @@ //! A command-line executable for monitoring the health of a cluster -mod notifier; - -use crate::notifier::Notifier; use clap::{crate_description, crate_name, value_t, value_t_or_exit, App, Arg}; use log::*; use solana_clap_utils::{ @@ -13,6 +10,7 @@ use solana_client::{ client_error::Result as ClientResult, rpc_client::RpcClient, rpc_response::RpcVoteAccountStatus, }; use solana_metrics::{datapoint_error, datapoint_info}; +use solana_notifier::Notifier; use solana_sdk::{ clock::Slot, hash::Hash, native_token::lamports_to_sol, program_utils::limited_deserialize, pubkey::Pubkey, @@ -232,7 +230,7 @@ fn load_blocks( } fn transaction_monitor(rpc_client: RpcClient) { - let notifier = Notifier::new_with_env_prefix("TRANSACTION_NOTIFIER_"); + let notifier = Notifier::new("TRANSACTION_NOTIFIER_"); let mut start_slot = loop { match rpc_client.get_slot() { Ok(slot) => break slot, @@ -303,7 +301,7 @@ fn main() -> Result<(), Box> { }; let rpc_client = RpcClient::new(config.json_rpc_url.clone()); - let notifier = Notifier::new(); + let notifier = Notifier::default(); let mut last_transaction_count = 0; let mut last_recent_blockhash = Hash::default(); let mut last_notification_msg = "".into();