combine nicely

This commit is contained in:
GroovieGermanikus 2024-06-05 18:44:19 +02:00
parent d66b7d08cb
commit c73b5a783b
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 53 additions and 19 deletions

View File

@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::future::Future;
use std::pin::pin;
use std::str::FromStr;
use std::sync::Arc;
@ -18,11 +19,11 @@ use solana_sdk::pubkey::Pubkey;
use tokio::{join, select};
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::Sender;
use tokio::task::{JoinHandle, JoinSet};
use tokio::time::Instant;
use tokio::task::{JoinError, JoinHandle, JoinSet};
use tokio::time::{Instant, timeout};
use tokio_stream::{Stream, StreamExt};
use tokio_stream::wrappers::{BroadcastStream, ReceiverStream};
use tracing::{info, trace, warn};
use tracing::{debug, info, trace, warn};
use websocket_tungstenite_retry::websocket_stable::{StableWebSocket, WsMessage};
use url::Url;
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots, SubscribeUpdate};
@ -30,6 +31,14 @@ use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
type Slot = u64;
const TASK_TIMEOUT: Duration = Duration::from_millis(5000);
enum CheckResult {
Success(Check),
Timeout(Check),
}
#[derive(Debug)]
enum Check {
Gpa,
TokenAccouns,
@ -61,27 +70,52 @@ async fn main() {
let geyser_grpc_config = GrpcSourceConfig::new(grpc_addr.to_string(), None, None, geyser_grpc_timeouts.clone());
let mut all_check_tasks: JoinSet<Check> = JoinSet::new();
let mut all_check_tasks: JoinSet<CheckResult> = JoinSet::new();
all_check_tasks.spawn(rpc_gpa(rpc_client.clone()).then(|_| async { Check::Gpa }));
all_check_tasks.spawn(rpc_get_token_accounts_by_owner(rpc_client.clone()).then(|_| async { Check::TokenAccouns }));
all_check_tasks.spawn(rpc_get_signatures_for_address(rpc_client.clone()).then(|_| async { Check::Gsfa }));
all_check_tasks.spawn(rpc_get_account_info(rpc_client.clone()).then(|_| async { Check::GetAccountInfo }));
add_task(Check::Gpa, rpc_gpa(rpc_client.clone()), &mut all_check_tasks);
add_task(Check::TokenAccouns, rpc_get_token_accounts_by_owner(rpc_client.clone()), &mut all_check_tasks);
add_task(Check::Gsfa, rpc_get_signatures_for_address(rpc_client.clone()), &mut all_check_tasks);
add_task(Check::GetAccountInfo, rpc_get_account_info(rpc_client.clone()), &mut all_check_tasks);
all_check_tasks.spawn(websocket_account_subscribe(Url::parse(ws_url.as_str()).unwrap()).then(|_| async { Check::WebsocketAccount }));
add_task(Check::WebsocketAccount, websocket_account_subscribe(Url::parse(ws_url.as_str()).unwrap()), &mut all_check_tasks);
all_check_tasks.spawn(create_geyser_all_accounts_task(geyser_grpc_config.clone()).then(|_| async { Check::GeyserAllAccounts }));
all_check_tasks.spawn(create_geyser_token_account_task(geyser_grpc_config.clone()).then(|_| async { Check::GeyserTokenAccount }));
add_task(Check::GeyserAllAccounts, create_geyser_all_accounts_task(geyser_grpc_config.clone()), &mut all_check_tasks);
add_task(Check::GeyserTokenAccount, create_geyser_token_account_task(geyser_grpc_config.clone()), &mut all_check_tasks);
info!("all tasks started...");
while let Some(res) = all_check_tasks.join_next().await {
info!("one more Task completed: {} left", all_check_tasks.len());
match res {
Ok(CheckResult::Success(check)) => {
info!("one more Task completed: {:?}, {} left", check, all_check_tasks.len());
}
Ok(CheckResult::Timeout(check)) => {
info!("timeout running Task {:?}", check);
}
Err(_) => {
warn!("Task failed");
}
}
}
info!("all tasks completed...");
}
fn add_task(check: Check, task: impl Future<Output = ()> + Send + 'static, all_check_tasks: &mut JoinSet<CheckResult>) {
let timeout =
timeout(TASK_TIMEOUT, task).then(|res| async move {
match res {
Ok(()) => {
CheckResult::Success(check)
}
Err(_) => {
CheckResult::Timeout(check)
}
}
});
all_check_tasks.spawn(timeout);
}
// note: this might fail if the yellowstone plugin does not allow "any broadcast filter"
async fn create_geyser_all_accounts_task(config: GrpcSourceConfig) {
@ -97,7 +131,7 @@ async fn create_geyser_all_accounts_task(config: GrpcSourceConfig) {
Message::GeyserSubscribeUpdate(subscriber_update) => {
match subscriber_update.update_oneof {
Some(UpdateOneof::Account(update)) => {
info!("Account from geyser: {:?}", update.account.unwrap().pubkey);
debug!("Account from geyser: {:?}", update.account.unwrap().pubkey);
count += 1;
if count > 3 {
return;
@ -124,7 +158,7 @@ async fn create_geyser_token_account_task(config: GrpcSourceConfig) {
Message::GeyserSubscribeUpdate(subscriber_update) => {
match subscriber_update.update_oneof {
Some(UpdateOneof::Account(update)) => {
info!("Token Account: {:?}", update.account.unwrap().pubkey);
debug!("Token Account: {:?}", update.account.unwrap().pubkey);
count += 1;
if count > 3 {
return;
@ -150,7 +184,7 @@ async fn rpc_gpa(rpc_client: Arc<RpcClient>) {
.await
.unwrap();
info!("Program accounts: {:?}", program_accounts.len());
debug!("Program accounts: {:?}", program_accounts.len());
// mango 12400 on mainnet
// CPL: 107 on mainnet
@ -164,7 +198,7 @@ async fn rpc_get_account_info(rpc_client: Arc<RpcClient>) {
.await
.unwrap();
info!("Account info: {:?}", account_info);
debug!("Account info: {:?}", account_info);
}
@ -181,7 +215,7 @@ async fn rpc_get_token_accounts_by_owner(rpc_client: Arc<RpcClient>) {
.unwrap();
// 1 account
info!("Token accounts: {:?}", token_accounts.len());
debug!("Token accounts: {:?}", token_accounts.len());
}
async fn rpc_get_signatures_for_address(rpc_client: Arc<RpcClient>) {
@ -200,7 +234,7 @@ async fn rpc_get_signatures_for_address(rpc_client: Arc<RpcClient>) {
.unwrap();
// 42
info!("Signatures: {:?}", signatures.len());
debug!("Signatures: {:?}", signatures.len());
}
@ -231,7 +265,7 @@ async fn websocket_account_subscribe(
let mut count = 0;
while let Ok(msg) = channel.recv().await {
if let WsMessage::Text(payload) = msg {
info!("SysvarC1ock: {:?}", payload);
debug!("SysvarC1ock: {:?}", payload);
count += 1;
}
if count > 3 {