notify discord tx measurements

This commit is contained in:
TovarishFin 2024-06-12 14:48:58 +02:00
parent da82125621
commit f750037166
9 changed files with 326 additions and 207 deletions

2
Cargo.lock generated
View File

@ -2439,6 +2439,7 @@ name = "mangorpc-latency-tester"
version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"clap 4.5.7",
"config",
"dirs",
@ -2450,6 +2451,7 @@ dependencies = [
"geyser-grpc-connector",
"itertools 0.10.5",
"jsonrpsee-types",
"once_cell",
"reqwest 0.12.4",
"serde",
"serde_derive",

View File

@ -35,3 +35,5 @@ serde_derive = "1.0.203"
clap = { version = "4.5.7", features = ["derive"] }
solana-program = "1.17.31"
solana-transaction-status = "1.17.31"
once_cell = "1.19.0"
chrono = "0.4.38"

View File

@ -25,16 +25,6 @@ TODO: document usage
cargo run measure-slot-latency -- --help
```
### measure tx mined speeds
configuration can be provided by a `config.toml` file, a `.env` file, or by manually giving env args.
see `config.example.toml` and/or `.env.example` for usage.
```bash
cargo run measure-send-transaction
```
### watch measure tx mined speeds
the same as above but will do the action every n seconds
@ -77,6 +67,3 @@ cargo run watch-measure-send-transaction -- --watch-interval 600
```
![example discord message](discord1.png)
## TODOs
- notify to discord

View File

@ -1,6 +1,9 @@
[general]
discord_webhook = "https://discord.com/api/webhooks/<chan>/<key>"
[measure_txs]
user_key = "1,2,3,4,5..."
pubsub_url="wss://<url>"
pubsub_url="wss://<>"
rpc_url="https://<url>"
helius_url = "https://<url>"

View File

@ -7,11 +7,19 @@ use std::iter::zip;
use std::{collections::HashMap, env};
use tracing::error;
use crate::discord::DISCORD_WEBHOOK_URL;
pub struct ParsedConfig {
pub general: GeneralConfig,
pub measure_txs: MeasureTxsConfig,
pub user: Keypair,
}
#[derive(Debug, Deserialize)]
pub struct GeneralConfig {
pub discord_webhook: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct MeasureTxsConfig {
pub pubsub_url: String,
@ -23,6 +31,7 @@ pub struct MeasureTxsConfig {
#[derive(Debug, Deserialize)]
pub struct Config {
pub general: GeneralConfig,
pub measure_txs: MeasureTxsConfig,
}
@ -53,6 +62,7 @@ pub fn try_parse_toml() -> Result<ParsedConfig> {
let config: Config = toml::from_str(&config_path)?;
let user = parse_user_key(config.measure_txs.user_key.clone())?;
let parsed_config = ParsedConfig {
general: config.general,
measure_txs: config.measure_txs,
user,
};
@ -61,6 +71,7 @@ pub fn try_parse_toml() -> Result<ParsedConfig> {
}
pub fn try_parse_env() -> Result<ParsedConfig> {
let discord_webhook = env::var("DISCORD_WEBHOOK").ok();
let pubsub_url = env::var("PUBSUB_URL")?;
let rpc_url = env::var("RPC_URL")?;
let helius_url = env::var("HELIUS_URL")?;
@ -79,6 +90,7 @@ pub fn try_parse_env() -> Result<ParsedConfig> {
}
Ok(ParsedConfig {
general: GeneralConfig { discord_webhook },
measure_txs: MeasureTxsConfig {
pubsub_url,
rpc_url,
@ -95,9 +107,17 @@ pub fn setup() -> Result<ParsedConfig> {
setup_logging();
if let Ok(config) = try_parse_toml() {
return Ok(config);
let config = if let Ok(config) = try_parse_toml() {
config
} else {
try_parse_env()?
};
if let Some(ref discord_webhook) = config.general.discord_webhook {
DISCORD_WEBHOOK_URL
.set(discord_webhook.clone())
.expect("DISCORD_WEBHOOK_URL previously unset");
}
try_parse_env()
return Ok(config);
}

192
src/discord.rs Normal file
View File

@ -0,0 +1,192 @@
use crate::{measure_txs::WatchTxResult, rpcnode_define_checks::Check};
use chrono::SecondsFormat;
use chrono::Utc;
use gethostname::gethostname;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use reqwest::Client;
use serde_json::{json, Value};
use tracing::{error, info, warn};
pub static DISCORD_WEBHOOK_URL: OnceCell<String> = OnceCell::new();
fn create_slot_warning_msg(slot_sent: u64, slot_confirmed: u64) -> Value {
let status_color = 0xFF0000;
let content = format!(
r#"
slot_sent is greater than slot_confirmed.
an rpc node is likely severely lagging...
slot values:
slot_sent: {slot_sent}
slot_confirmed: {slot_confirmed}
"#
);
json! {
{
"embeds": [
{
"title": "error: unusual slot values",
"description": content,
"color": status_color,
}
]
}
}
}
fn create_tx_measurement_msg(notify_results: &[WatchTxResult]) -> Value {
let status_color = 0x0000FF;
let mut description = String::new();
description.push_str("```\n");
for result in notify_results {
description.push_str(&format!("<{}>", result.label));
if let Some(tx_result) = &result.result {
description.push_str(&format!(
r#"
tx_sig: {}
slot_sent: {}
slot_confirmed: {}
slot_diff: {}"#,
tx_result.signature,
tx_result.slot_sent,
tx_result.slot_confirmed,
tx_result.slot_confirmed - tx_result.slot_sent,
));
} else if let Some(error) = &result.error {
description.push_str(&format!(
r#"
error: {}"#,
error
));
}
description.push_str(&format!(
r#"
"lifetime_avg_slot_diff: {}
lifetime_fails: {}
</{}>
"#,
result.lifetime_avg, result.lifetime_fails, result.label
));
}
description.push_str("```\n");
json! {
{
"embeds": [
{
"title": "measure-tx results",
"description": description,
"username": "RPC Node Check",
"color": status_color,
"timestamp": Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true),
}
]
}
}
}
pub fn create_check_alive_discord_message(
rpcnode_label: &str,
checks_enabled: Vec<Check>,
tasks_success: &mut [Check],
tasks_timedout: Vec<Check>,
success: bool,
) -> Value {
let result_per_check = enum_iterator::all::<Check>()
.map(|check| {
let name = format!("{:?}", check);
let disabled = !checks_enabled.contains(&check);
let timedout = tasks_timedout.contains(&check);
let success = tasks_success.contains(&check);
let value = if disabled {
"disabled"
} else if timedout {
"timed out"
} else if success {
"OK"
} else {
"failed"
};
json! {
{
"name": name,
"value": value
}
}
})
.collect_vec();
let fields = result_per_check;
let status_color = if success { 0x00FF00 } else { 0xFC4100 };
let hostname_executed = gethostname();
let content = if success {
format!("OK rpc node check for <{}>", rpcnode_label)
} else {
let userid_groovie = 933275947124273182u128;
let role_id_alerts_mangolana = 1100752577307619368u128;
let mentions = format!("<@{}> <@&{}>", userid_groovie, role_id_alerts_mangolana);
format!("Failed rpc node check for <{}> {}", rpcnode_label, mentions)
};
let body = json! {
{
"content": content,
"description": format!("executed on {}", hostname_executed.to_string_lossy()),
"username": "RPC Node Check",
"embeds": [
{
"title": "Check Results",
"description": "",
"color": status_color,
"fields":
fields
,
"footer": {
"text": format!("github: mangorpc-latency-tester, author: groovie")
}
}
]
}
};
body
}
pub async fn send_webook_discord(url: String, discord_body: Value) {
let client = Client::new();
let res = client.post(url).json(&discord_body).send().await;
match res {
Ok(_) => {
info!("webhook sent");
}
Err(e) => {
error!("webhook failed: {:?}", e);
}
}
}
pub async fn notify_slot_warning(slot_sent: u64, slot_confirmed: u64) {
if let Some(url) = DISCORD_WEBHOOK_URL.get() {
let msg = create_slot_warning_msg(slot_sent, slot_confirmed);
send_webook_discord(url.clone(), msg).await;
} else {
warn!("notify_slot_warning: discord notifications disabled");
}
}
pub async fn notify_tx_measurement(notify_results: &[WatchTxResult]) {
if let Some(url) = DISCORD_WEBHOOK_URL.get() {
let msg = create_tx_measurement_msg(notify_results);
send_webook_discord(url.clone(), msg).await;
} else {
warn!("notify_tx_measurement: discord notifications disabled");
}
}

View File

@ -1,4 +1,5 @@
pub mod config;
pub mod discord;
pub mod measure_txs;
pub mod rpcnode_check_alive;
pub mod rpcnode_define_checks;
@ -9,7 +10,6 @@ use anyhow::Result;
use clap::{Args, Parser, Subcommand};
use config::MeasureTxsConfig;
use config::ParsedConfig;
use measure_txs::measure_txs;
use measure_txs::watch_measure_txs;
use rpcnode_check_alive::check;
use slot_latency_tester::measure_slot_latency;
@ -47,9 +47,7 @@ enum Commands {
CheckAlive(CheckAlive),
#[clap(aliases = &["l"], about = "measure slot latency between different nodes")]
MeasureSlotLatency,
#[clap(aliases = &["t"], about = "measure tx submission times to different nodes")]
MeasureSendTransaction,
#[clap(aliases = &["w"], about = "measure tx submission times to different nodes every n seconds")]
#[clap(aliases = &["wt"], about = "measure tx submission times to different nodes every n seconds")]
WatchMeasureSendTransaction(WatchMeasureSendTransaction),
}
@ -65,6 +63,7 @@ async fn main() -> Result<()> {
..
},
user,
.. // general is set globally using OnceCell
} = config::setup()?;
let cli = Cli::parse();
match cli.command {
@ -78,9 +77,6 @@ async fn main() -> Result<()> {
Ok(())
}
Commands::MeasureSlotLatency => measure_slot_latency().await,
Commands::MeasureSendTransaction => {
measure_txs(user, pubsub_url, rpc_url, helius_url, urls_by_label).await
}
Commands::WatchMeasureSendTransaction(WatchMeasureSendTransaction {
watch_interval_seconds,
}) => {

View File

@ -1,5 +1,6 @@
use crate::discord::{notify_slot_warning, notify_tx_measurement};
use anyhow::{bail, Result};
use futures_util::future::join_all;
use futures_util::future::{join, join_all};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use solana_client::{
@ -20,6 +21,7 @@ use solana_transaction_status::UiTransactionEncoding;
use std::{
collections::HashMap,
iter::zip,
ops::AddAssign,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
@ -29,12 +31,12 @@ use std::{
use tokio::{
sync::Notify,
task::JoinHandle,
time::{sleep, timeout},
time::{self, sleep, timeout},
};
use tokio_stream::StreamExt;
use tracing::{debug, error, info, warn};
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct TxSendResult {
pub label: String,
pub signature: Signature,
@ -42,6 +44,20 @@ pub struct TxSendResult {
pub slot_confirmed: u64,
}
#[derive(Debug, Clone)]
pub struct TxSendError {
pub label: String,
pub error: String,
}
pub struct WatchTxResult {
pub label: String,
pub result: Option<TxSendResult>,
pub error: Option<String>,
pub lifetime_avg: u64,
pub lifetime_fails: u64,
}
#[derive(Serialize)]
struct RequestParams {
jsonrpc: String,
@ -185,80 +201,6 @@ async fn send_and_confirm_self_transfer_tx(
})
}
pub async fn measure_txs(
user: Keypair,
pubsub_url: String,
rpc_url: String,
helius_url: String,
urls_by_label: HashMap<String, String>,
) -> Result<()> {
info!("measuring txs...");
let user = Arc::new(user);
let atomic_slot = AtomicU64::new(0);
let atomic_slot = Arc::new(atomic_slot);
let slot_notifier = Arc::new(Notify::new());
let a_slot = Arc::clone(&atomic_slot);
let s_notifier = Arc::clone(&slot_notifier);
let _handle = watch_slots_retry(pubsub_url, a_slot, s_notifier);
info!("waiting for first slot...");
slot_notifier.notified().await;
let mut clients_by_label = HashMap::<String, Arc<RpcClient>>::new();
for (label, url) in urls_by_label.into_iter() {
let rpc_client = RpcClient::new(url);
clients_by_label.insert(label, Arc::new(rpc_client));
}
let rpc_client = RpcClient::new(rpc_url);
let recent_blockhash = rpc_client.get_latest_blockhash().await?;
info!("got recent_blockhash: {}", recent_blockhash);
let priority_fee = get_priority_fee_estimate(&helius_url).await?;
info!("using priority fee: {}", priority_fee);
let mut sig_futs = Vec::new();
for (i, (label, client)) in clients_by_label.iter().enumerate() {
let label = label.clone();
let rpc_client = Arc::clone(client);
let user = Arc::clone(&user);
let a_slot = Arc::clone(&atomic_slot);
let fut = send_and_confirm_self_transfer_tx(
user,
a_slot,
label,
rpc_client,
recent_blockhash,
priority_fee,
i as u64,
);
sig_futs.push(fut);
}
let results = join_all(sig_futs).await;
for ((label, _), r) in zip(clients_by_label, results) {
if let Ok(TxSendResult {
label,
signature,
slot_sent,
slot_confirmed,
}) = r
{
info!("label: {}", label);
info!("txSig: https://solscan.io/tx/{}", signature);
info!("slot_sent: {}", slot_sent);
info!("slot_confirmed: {}", slot_confirmed);
info!("slot_diff: {}", slot_confirmed - slot_sent);
} else {
error!("label: {}", label);
error!("error: {:?}", r);
}
}
Ok(())
}
pub async fn watch_measure_txs(
user: Keypair,
pubsub_url: String,
@ -288,11 +230,18 @@ pub async fn watch_measure_txs(
let rpc_client = RpcClient::new(rpc_url);
let mut interval = time::interval(Duration::from_secs(watch_interval_seconds));
let mut slot_diffs_by_label: HashMap<String, Vec<u64>> = HashMap::new();
let mut fails_by_label: HashMap<String, u64> = HashMap::new();
loop {
let recent_blockhash = rpc_client.get_latest_blockhash().await?;
info!("got recent_blockhash: {}", recent_blockhash);
interval.tick().await;
let rb_fut = rpc_client.get_latest_blockhash();
let pf_fut = get_priority_fee_estimate(&helius_url);
let (recent_blockhash, priority_fee) = join(rb_fut, pf_fut).await;
let priority_fee = get_priority_fee_estimate(&helius_url).await?;
let recent_blockhash = recent_blockhash?;
info!("got recent_blockhash: {}", recent_blockhash);
let priority_fee = priority_fee?;
info!("using priority fee: {}", priority_fee);
let mut sig_futs = Vec::new();
@ -315,25 +264,74 @@ pub async fn watch_measure_txs(
}
let results = join_all(sig_futs).await;
for ((label, _), r) in zip(c_by_l, results) {
if let Ok(TxSendResult {
let mut notify_results: Vec<WatchTxResult> = Vec::new();
for ((label, _), r) in zip(clients_by_label.clone(), results) {
match r {
Ok(tx_result) => {
let TxSendResult {
label,
signature,
slot_sent,
slot_confirmed,
}) = r
{
} = tx_result.clone();
if slot_sent > slot_confirmed {
warn!(
"slot_sent: {} > slot_confirmed: {}",
slot_sent, slot_confirmed
);
notify_slot_warning(slot_sent, slot_confirmed).await;
}
let slot_diff = slot_confirmed.saturating_sub(slot_sent);
let slot_diffs = slot_diffs_by_label.entry(label.clone()).or_default();
slot_diffs.push(slot_diff);
let (sum, count) = slot_diffs
.iter()
.fold((0, 0), |(sum, count), diff| (sum + diff, count + 1));
let lifetime_avg = if count > 0 { sum / count } else { u64::MAX };
let lifetime_fails = fails_by_label.entry(label.clone()).or_insert(0).clone();
notify_results.push(WatchTxResult {
label: label.clone(),
result: Some(tx_result),
error: None,
lifetime_avg,
lifetime_fails,
});
info!("label: {}", label);
info!("txSig: https://solscan.io/tx/{}", signature);
info!("slot_sent: {}", slot_sent);
info!("slot_confirmed: {}", slot_confirmed);
info!("slot_diff: {}", slot_confirmed - slot_sent);
} else {
info!("slot_diff: {}", slot_diff);
}
Err(e) => {
fails_by_label
.entry(label.clone())
.or_default()
.add_assign(1);
let slot_diffs = slot_diffs_by_label.entry(label.clone()).or_default();
let (sum, count) = slot_diffs
.iter()
.fold((0, 0), |(sum, count), diff| (sum + diff, count + 1));
let lifetime_avg = if count > 0 { sum / count } else { u64::MAX };
let lifetime_fails = fails_by_label.entry(label.clone()).or_insert(0).clone();
notify_results.push(WatchTxResult {
label: label.clone(),
result: None,
error: Some(e.to_string()),
lifetime_avg,
lifetime_fails,
});
error!("label: {}", label);
error!("error: {:?}", r);
error!("error: {:?}", e);
}
}
}
sleep(Duration::from_secs(watch_interval_seconds)).await;
notify_results.sort_by_key(|k| k.lifetime_avg);
notify_tx_measurement(&notify_results).await;
}
}

View File

@ -1,27 +1,15 @@
use crate::rpcnode_define_checks::{define_checks, Check, CheckResult};
use crate::{
discord::{create_check_alive_discord_message, send_webook_discord},
rpcnode_define_checks::{define_checks, Check, CheckResult},
};
use anyhow::{bail, Result};
use gethostname::gethostname;
use itertools::Itertools;
use serde_json::{json, Value};
use std::{collections::HashMap, env, process::exit, time::Duration};
use tokio::task::JoinSet;
use tracing::{debug, error, info, warn};
pub const TASK_TIMEOUT: Duration = Duration::from_millis(15_000);
async fn send_webook_discord(url: String, discord_body: Value) {
let client = reqwest::Client::new();
let res = client.post(url).json(&discord_body).send().await;
match res {
Ok(_) => {
info!("webhook sent");
}
Err(e) => {
error!("webhook failed: {:?}", e);
}
}
}
pub async fn check(
discord_webhook: Option<String>,
rpcnode_label: Option<String>,
@ -110,7 +98,7 @@ pub async fn check(
assert!(tasks_total > 0, "no results");
let discord_body = create_discord_message(
let discord_body = create_check_alive_discord_message(
&rpcnode_label,
checks_enabled,
&mut tasks_success,
@ -150,72 +138,3 @@ pub async fn check(
);
}
}
fn create_discord_message(
rpcnode_label: &str,
checks_enabled: Vec<Check>,
tasks_success: &mut [Check],
tasks_timedout: Vec<Check>,
success: bool,
) -> Value {
let result_per_check = enum_iterator::all::<Check>()
.map(|check| {
let name = format!("{:?}", check);
let disabled = !checks_enabled.contains(&check);
let timedout = tasks_timedout.contains(&check);
let success = tasks_success.contains(&check);
let value = if disabled {
"disabled"
} else if timedout {
"timed out"
} else if success {
"OK"
} else {
"failed"
};
json! {
{
"name": name,
"value": value
}
}
})
.collect_vec();
let fields = result_per_check;
let status_color = if success { 0x00FF00 } else { 0xFC4100 };
let hostname_executed = gethostname();
let content = if success {
format!("OK rpc node check for <{}>", rpcnode_label)
} else {
let userid_groovie = 933275947124273182u128;
let role_id_alerts_mangolana = 1100752577307619368u128;
let mentions = format!("<@{}> <@&{}>", userid_groovie, role_id_alerts_mangolana);
format!("Failed rpc node check for <{}> {}", rpcnode_label, mentions)
};
let body = json! {
{
"content": content,
"description": format!("executed on {}", hostname_executed.to_string_lossy()),
"username": "RPC Node Check",
"embeds": [
{
"title": "Check Results",
"description": "",
"color": status_color,
"fields":
fields
,
"footer": {
"text": format!("github: mangorpc-latency-tester, author: groovie")
}
}
]
}
};
body
}