make checks configurable

This commit is contained in:
GroovieGermanikus 2024-06-07 08:40:48 +02:00
parent 786874f0bb
commit f1bb5dfba9
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
3 changed files with 88 additions and 61 deletions

1
Cargo.lock generated
View File

@ -2027,6 +2027,7 @@ dependencies = [
"enum-iterator 2.1.0",
"futures-util",
"geyser-grpc-connector",
"itertools 0.10.5",
"jsonrpsee-types",
"reqwest 0.12.4",
"serde",

View File

@ -37,3 +37,4 @@ enum-iterator = "2.1.0"
url = "2.5.0"
config = "0.14.0"
itertools = "0.10.5"

View File

@ -45,7 +45,7 @@ enum CheckResult {
Timeout(Check),
}
#[derive(Debug, PartialEq, Sequence)]
#[derive(Clone, Debug, PartialEq, Sequence)]
enum Check {
Gpa,
TokenAccouns,
@ -73,7 +73,6 @@ async fn send_webook_discord() {
error!("webhook failed: {:?}", e);
}
}
}
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
@ -83,18 +82,19 @@ async fn main() -> ExitCode {
// send_webook_discord().await;
// name of rpc node for logging/discord (e.g. hostname)
let rpcnode_label = std::env::var("RPCNODE_LABEL").unwrap();
let map_checks_by_name: HashMap<String, Check> =
enum_iterator::all::<Check>().map(|check| {
(format!("{:?}", check), check)
}).collect();
info!("map_checks_by_name: {:?}", map_checks_by_name);
// comma separated
let checks_enabled = std::env::var("CHECKS_ENABLED").unwrap();
debug!("checks_enabled unparsed: {}", checks_enabled);
let checks_enabled = checks_enabled.split(",").map(|s| {
let checks_enabled: Vec<Check> = checks_enabled.split(",").map(|s| {
let s = s.trim();
match map_checks_by_name.get(s) {
@ -104,50 +104,48 @@ async fn main() -> ExitCode {
exit(1);
}
}
}).collect_vec();
}).cloned().collect_vec();
info!("checks enabled: {:?}", checks_enabled);
info!("checks enabled for rpcnode <{}>: {:?}", rpcnode_label, checks_enabled);
// name of rpc node for logging/discord (e.g. hostname)
let rpcnode_label = std::env::var("RPCNODE_LABEL").unwrap();
// http://...
let rpc_addr = std::env::var("RPC_HTTP_ADDR").unwrap();
// wss://...
let ws_addr = std::env::var("RPC_WS_ADDR").unwrap();
// http://...
let grpc_addr = std::env::var("GRPC_ADDR").unwrap();
let geyser_grpc_timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(10),
request_timeout: Duration::from_secs(10),
subscribe_timeout: Duration::from_secs(10),
receive_timeout: Duration::from_secs(10),
};
info!("rpcnode_check_alive against {}: (rpc {}, ws {}, gprc {})", rpcnode_label, rpc_addr, ws_addr, grpc_addr);
let rpc_url = Url::parse(rpc_addr.as_str()).unwrap();
let rpc_client = Arc::new(RpcClient::new(rpc_url.to_string()));
let geyser_grpc_config = GrpcSourceConfig::new(grpc_addr.to_string(), None, None, geyser_grpc_timeouts.clone());
let mut all_check_tasks: JoinSet<CheckResult> = JoinSet::new();
if checks_enabled.contains(&Check::Gpa) {
let rpc_client = read_rpc_config();
add_task(Check::Gpa, rpc_gpa(rpc_client.clone()), &mut all_check_tasks);
}
if checks_enabled.contains(&Check::TokenAccouns) {
let rpc_client = read_rpc_config();
add_task(Check::TokenAccouns, rpc_get_token_accounts_by_owner(rpc_client.clone()), &mut all_check_tasks);
}
if checks_enabled.contains(&Check::Gsfa) {
let rpc_client = read_rpc_config();
add_task(Check::Gsfa, rpc_get_signatures_for_address(rpc_client.clone()), &mut all_check_tasks);
}
if checks_enabled.contains(&Check::GetAccountInfo) {
let rpc_client = read_rpc_config();
add_task(Check::GetAccountInfo, rpc_get_account_info(rpc_client.clone()), &mut all_check_tasks);
}
if checks_enabled.contains(&Check::GeyserAllAccounts) {
let geyser_grpc_config = read_geyser_config();
add_task(Check::GeyserAllAccounts, create_geyser_all_accounts_task(geyser_grpc_config), &mut all_check_tasks);
}
if checks_enabled.contains(&Check::GeyserTokenAccount) {
let geyser_grpc_config = read_geyser_config();
add_task(Check::GeyserTokenAccount, create_geyser_token_account_task(geyser_grpc_config), &mut all_check_tasks);
}
if checks_enabled.contains(&Check::WebsocketAccount) {
let ws_addr = read_ws_config();
add_task(Check::WebsocketAccount, websocket_account_subscribe(Url::parse(ws_addr.as_str()).unwrap()), &mut all_check_tasks);
}
add_task(Check::GeyserAllAccounts, create_geyser_all_accounts_task(geyser_grpc_config.clone()), &mut all_check_tasks);
add_task(Check::GeyserTokenAccount, create_geyser_token_account_task(geyser_grpc_config.clone()), &mut all_check_tasks);
info!("all tasks started...");
let tasks_total = all_check_tasks.len();
info!("all {} tasks started...", tasks_total);
let mut tasks_success = Vec::new();
let mut tasks_successful = 0;
@ -157,7 +155,7 @@ async fn main() -> ExitCode {
match res {
Ok(CheckResult::Success(check)) => {
tasks_successful += 1;
info!("one more task completed <{:?}>, {} left", check, all_check_tasks.len());
info!("one more task completed <{:?}>, {}/{} left", check, all_check_tasks.len(), tasks_total);
tasks_success.push(check);
}
Ok(CheckResult::Timeout(check)) => {
@ -176,7 +174,8 @@ async fn main() -> ExitCode {
if tasks_failed + tasks_timeout > 0 {
warn!("tasks failed ({}) or timed out ({}) of {} total", tasks_failed, tasks_timeout, tasks_total);
warn!("rpcnode <{}> - tasks failed ({}) or timed out ({}) of {} total",
rpcnode_label, tasks_failed, tasks_timeout, tasks_total);
for check in enum_iterator::all::<Check>() {
if !tasks_success.contains(&check) {
warn!("!! did not complet task <{:?}>", check);
@ -184,10 +183,41 @@ async fn main() -> ExitCode {
}
return ExitCode::SUCCESS;
} else {
info!("all {} tasks completed...", tasks_total);
info!("rpcnode <{}> - all {} tasks completed: {:?}", rpcnode_label, tasks_total, tasks_success);
return ExitCode::FAILURE;
}
}
fn read_rpc_config() -> Arc<RpcClient> {
// http://...
let rpc_addr = std::env::var("RPC_HTTP_ADDR").unwrap();
let rpc_url = Url::parse(rpc_addr.as_str()).unwrap();
let rpc_client = Arc::new(RpcClient::new(rpc_url.to_string()));
rpc_client
}
fn read_ws_config() -> String {
// wss://...
let ws_addr = std::env::var("RPC_WS_ADDR").unwrap();
ws_addr
}
fn read_geyser_config() -> GrpcSourceConfig {
let grpc_addr = std::env::var("GRPC_ADDR").unwrap();
let geyser_grpc_timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(10),
request_timeout: Duration::from_secs(10),
subscribe_timeout: Duration::from_secs(10),
receive_timeout: Duration::from_secs(10),
};
let geyser_grpc_config = GrpcSourceConfig::new(grpc_addr.to_string(), None, None, geyser_grpc_timeouts.clone());
geyser_grpc_config
}
fn add_task(check: Check, task: impl Future<Output=()> + Send + 'static, all_check_tasks: &mut JoinSet<CheckResult>) {
@ -266,7 +296,6 @@ async fn create_geyser_token_account_task(config: GrpcSourceConfig) {
}
async fn rpc_gpa(rpc_client: Arc<RpcClient>) {
let program_pubkey = Pubkey::from_str("4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg").unwrap();
let filter = RpcFilterType::Memcmp(Memcmp::new_raw_bytes(30, vec![42]));
@ -308,7 +337,6 @@ async fn rpc_get_account_info(rpc_client: Arc<RpcClient>) {
debug!("Account info: {:?}", account_info);
assert!(account_info.lamports > 0, "account lamports is zero");
}
async fn rpc_get_token_accounts_by_owner(rpc_client: Arc<RpcClient>) {
@ -356,7 +384,6 @@ async fn rpc_get_signatures_for_address(rpc_client: Arc<RpcClient>) {
async fn websocket_account_subscribe(
rpc_url: Url
) {
let sysvar_subscribe =
json!({
"jsonrpc": "2.0",
@ -393,7 +420,6 @@ async fn websocket_account_subscribe(
}
pub fn all_accounts() -> SubscribeRequest {
let mut accounts_subs = HashMap::new();
accounts_subs.insert(
@ -419,7 +445,6 @@ pub fn all_accounts() -> SubscribeRequest {
}
pub fn token_accounts() -> SubscribeRequest {
let mut accounts_subs = HashMap::new();
accounts_subs.insert(