clippy+fmt

This commit is contained in:
GroovieGermanikus 2024-06-07 09:38:59 +02:00
parent be5fa011b1
commit fad4f65569
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
3 changed files with 227 additions and 262 deletions

View File

@ -1,45 +1,14 @@
mod rpcnode_define_checks;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::pin::pin;
use std::process::{exit, ExitCode};
use std::str::FromStr;
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
use futures_util::FutureExt;
use geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use serde_json::{json, Value};
use solana_account_decoder::parse_token::spl_token_ids;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client::rpc_client::GetConfirmedSignaturesForAddress2Config;
use solana_rpc_client_api::request::TokenAccountsFilter;
use solana_rpc_client_api::response::SlotInfo;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use tokio::{join, select};
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::Sender;
use tokio::task::{JoinError, JoinHandle, JoinSet};
use tokio::time::{Instant, timeout};
use tokio_stream::{Stream, StreamExt};
use tokio_stream::wrappers::{BroadcastStream, ReceiverStream};
use tracing::{debug, error, info, trace, warn};
use websocket_tungstenite_retry::websocket_stable::{StableWebSocket, WsMessage};
use url::Url;
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots, SubscribeUpdate};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use anyhow::Context;
use enum_iterator::Sequence;
use gethostname::gethostname;
use solana_account_decoder::UiAccountEncoding;
use solana_rpc_client_api::config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
use solana_rpc_client_api::filter::{Memcmp, RpcFilterType};
use itertools::Itertools;
type Slot = u64;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::process::{exit, ExitCode};
use std::time::Duration;
use tokio::task::JoinSet;
use tracing::{debug, error, info, warn};
const TASK_TIMEOUT: Duration = Duration::from_millis(15000);
@ -59,7 +28,6 @@ enum Check {
WebsocketAccount,
}
async fn send_webook_discord(discord_body: Value) {
let Ok(url) = std::env::var("DISCORD_WEBHOOK") else {
info!("sending to discord is disabled");
@ -67,9 +35,7 @@ async fn send_webook_discord(discord_body: Value) {
};
let client = reqwest::Client::new();
let res = client.post(url)
.json(&discord_body)
.send().await;
let res = client.post(url).json(&discord_body).send().await;
match res {
Ok(_) => {
info!("webhook sent");
@ -87,16 +53,17 @@ async fn main() -> ExitCode {
// name of rpc node for logging/discord (e.g. hostname)
let rpcnode_label = std::env::var("RPCNODE_LABEL").unwrap();
let map_checks_by_name: HashMap<String, Check> =
enum_iterator::all::<Check>().map(|check| {
(format!("{:?}", check), check)
}).collect();
let map_checks_by_name: HashMap<String, Check> = enum_iterator::all::<Check>()
.map(|check| (format!("{:?}", check), check))
.collect();
// comma separated
let checks_enabled = std::env::var("CHECKS_ENABLED").unwrap();
debug!("checks_enabled unparsed: {}", checks_enabled);
let checks_enabled: Vec<Check> = checks_enabled.split(",").map(|s| {
let checks_enabled: Vec<Check> = checks_enabled
.split(',')
.map(|s| {
let s = s.trim();
match map_checks_by_name.get(s) {
@ -106,9 +73,14 @@ async fn main() -> ExitCode {
exit(1);
}
}
}).cloned().collect_vec();
})
.cloned()
.collect_vec();
info!("checks enabled for rpcnode <{}>: {:?}", rpcnode_label, checks_enabled);
info!(
"checks enabled for rpcnode <{}>: {:?}",
rpcnode_label, checks_enabled
);
let mut all_check_tasks: JoinSet<CheckResult> = JoinSet::new();
@ -126,7 +98,12 @@ async fn main() -> ExitCode {
match res {
Ok(CheckResult::Success(check)) => {
tasks_successful += 1;
info!("one more task completed <{:?}>, {}/{} left", check, all_check_tasks.len(), tasks_total);
info!(
"one more task completed <{:?}>, {}/{} left",
check,
all_check_tasks.len(),
tasks_total
);
tasks_success.push(check);
}
Ok(CheckResult::Timeout(check)) => {
@ -145,13 +122,20 @@ async fn main() -> ExitCode {
assert!(tasks_total > 0, "no results");
let discord_body = create_discord_message(&rpcnode_label, checks_enabled, &mut tasks_success, tasks_timedout, success);
let discord_body = create_discord_message(
&rpcnode_label,
checks_enabled,
&mut tasks_success,
tasks_timedout,
success,
);
send_webook_discord(discord_body).await;
if !success {
warn!("rpcnode <{}> - tasks failed ({}) or timed out ({}) of {} total",
rpcnode_label, tasks_failed, tasks_timeout, tasks_total);
warn!(
"rpcnode <{}> - tasks failed ({}) or timed out ({}) of {} total",
rpcnode_label, tasks_failed, tasks_timeout, tasks_total
);
for check in enum_iterator::all::<Check>() {
if !tasks_success.contains(&check) {
warn!("!! did not complet task <{:?}>", check);
@ -159,14 +143,23 @@ async fn main() -> ExitCode {
}
return ExitCode::FAILURE;
} else {
info!("rpcnode <{}> - all {} tasks completed: {:?}", rpcnode_label, tasks_total, tasks_success);
info!(
"rpcnode <{}> - all {} tasks completed: {:?}",
rpcnode_label, tasks_total, tasks_success
);
return ExitCode::SUCCESS;
}
}
fn create_discord_message(
rpcnode_label: &str, checks_enabled: Vec<Check>, mut tasks_success: &mut Vec<Check>, mut tasks_timedout: Vec<Check>, success: bool) -> Value {
let result_per_check = enum_iterator::all::<Check>().map(|check| {
rpcnode_label: &str,
checks_enabled: Vec<Check>,
tasks_success: &mut [Check],
tasks_timedout: Vec<Check>,
success: bool,
) -> Value {
let result_per_check = enum_iterator::all::<Check>()
.map(|check| {
let name = format!("{:?}", check);
let disabled = !checks_enabled.contains(&check);
let timedout = tasks_timedout.contains(&check);
@ -186,15 +179,12 @@ fn create_discord_message(
"value": value
}
}
}).collect_vec();
})
.collect_vec();
let fields = result_per_check;
let status_color = if success {
0x00FF00
} else {
0xFC4100
};
let status_color = if success { 0x00FF00 } else { 0xFC4100 };
let hostname_executed = gethostname();
@ -219,4 +209,3 @@ fn create_discord_message(
};
body
}

View File

@ -1,9 +1,4 @@
use std::collections::HashMap;
use std::future::Future;
use std::pin::pin;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use crate::{Check, CheckResult, TASK_TIMEOUT};
use anyhow::Context;
use futures_util::{FutureExt, StreamExt};
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
@ -17,6 +12,12 @@ use solana_rpc_client_api::filter::{Memcmp, RpcFilterType};
use solana_rpc_client_api::request::TokenAccountsFilter;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use std::collections::HashMap;
use std::future::Future;
use std::pin::pin;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinSet;
use tokio::time::timeout;
use tracing::debug;
@ -24,57 +25,76 @@ use url::Url;
use websocket_tungstenite_retry::websocket_stable::{StableWebSocket, WsMessage};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts};
use crate::{Check, CheckResult, TASK_TIMEOUT};
pub fn define_checks(checks_enabled: &Vec<Check>, mut all_check_tasks: &mut JoinSet<CheckResult>) {
pub fn define_checks(checks_enabled: &[Check], all_check_tasks: &mut JoinSet<CheckResult>) {
if checks_enabled.contains(&Check::Gpa) {
let rpc_client = read_rpc_config();
add_task(Check::Gpa, rpc_gpa(rpc_client.clone()), &mut all_check_tasks);
add_task(Check::Gpa, rpc_gpa(rpc_client.clone()), all_check_tasks);
}
if checks_enabled.contains(&Check::TokenAccouns) {
let rpc_client = read_rpc_config();
add_task(Check::TokenAccouns, rpc_get_token_accounts_by_owner(rpc_client.clone()), &mut all_check_tasks);
add_task(
Check::TokenAccouns,
rpc_get_token_accounts_by_owner(rpc_client.clone()),
all_check_tasks,
);
}
if checks_enabled.contains(&Check::Gsfa) {
let rpc_client = read_rpc_config();
add_task(Check::Gsfa, rpc_get_signatures_for_address(rpc_client.clone()), &mut all_check_tasks);
add_task(
Check::Gsfa,
rpc_get_signatures_for_address(rpc_client.clone()),
all_check_tasks,
);
}
if checks_enabled.contains(&Check::GetAccountInfo) {
let rpc_client = read_rpc_config();
add_task(Check::GetAccountInfo, rpc_get_account_info(rpc_client.clone()), &mut all_check_tasks);
add_task(
Check::GetAccountInfo,
rpc_get_account_info(rpc_client.clone()),
all_check_tasks,
);
}
if checks_enabled.contains(&Check::GeyserAllAccounts) {
let geyser_grpc_config = read_geyser_config();
add_task(Check::GeyserAllAccounts, create_geyser_all_accounts_task(geyser_grpc_config), &mut all_check_tasks);
add_task(
Check::GeyserAllAccounts,
create_geyser_all_accounts_task(geyser_grpc_config),
all_check_tasks,
);
}
if checks_enabled.contains(&Check::GeyserTokenAccount) {
let geyser_grpc_config = read_geyser_config();
add_task(Check::GeyserTokenAccount, create_geyser_token_account_task(geyser_grpc_config), &mut all_check_tasks);
add_task(
Check::GeyserTokenAccount,
create_geyser_token_account_task(geyser_grpc_config),
all_check_tasks,
);
}
if checks_enabled.contains(&Check::WebsocketAccount) {
let ws_addr = read_ws_config();
add_task(Check::WebsocketAccount, websocket_account_subscribe(Url::parse(ws_addr.as_str()).unwrap()), &mut all_check_tasks);
add_task(
Check::WebsocketAccount,
websocket_account_subscribe(Url::parse(ws_addr.as_str()).unwrap()),
all_check_tasks,
);
}
}
fn read_rpc_config() -> Arc<RpcClient> {
// http://...
let rpc_addr = std::env::var("RPC_HTTP_ADDR").unwrap();
let rpc_url = Url::parse(rpc_addr.as_str()).unwrap();
let rpc_client = Arc::new(RpcClient::new(rpc_url.to_string()));
rpc_client
Arc::new(RpcClient::new(rpc_url.to_string()))
}
fn read_ws_config() -> String {
// wss://...
let ws_addr = std::env::var("RPC_WS_ADDR").unwrap();
ws_addr
std::env::var("RPC_WS_ADDR").unwrap()
}
fn read_geyser_config() -> GrpcSourceConfig {
@ -86,84 +106,66 @@ fn read_geyser_config() -> GrpcSourceConfig {
subscribe_timeout: Duration::from_secs(10),
receive_timeout: Duration::from_secs(10),
};
let geyser_grpc_config = GrpcSourceConfig::new(grpc_addr.to_string(), None, None, geyser_grpc_timeouts.clone());
geyser_grpc_config
GrpcSourceConfig::new(
grpc_addr.to_string(),
None,
None,
geyser_grpc_timeouts.clone(),
)
}
fn add_task(check: Check, task: impl Future<Output=()> + Send + 'static, all_check_tasks: &mut JoinSet<CheckResult>) {
let timeout =
timeout(TASK_TIMEOUT, task).then(|res| async move {
fn add_task(
check: Check,
task: impl Future<Output = ()> + Send + 'static,
all_check_tasks: &mut JoinSet<CheckResult>,
) {
let timeout = timeout(TASK_TIMEOUT, task).then(|res| async move {
match res {
Ok(()) => {
CheckResult::Success(check)
}
Err(_) => {
CheckResult::Timeout(check)
}
Ok(()) => CheckResult::Success(check),
Err(_) => CheckResult::Timeout(check),
}
});
all_check_tasks.spawn(timeout);
}
// note: this might fail if the yellowstone plugin does not allow "any broadcast filter"
async fn create_geyser_all_accounts_task(config: GrpcSourceConfig) {
let green_stream = create_geyser_reconnecting_stream(
config.clone(),
all_accounts(),
);
let green_stream = create_geyser_reconnecting_stream(config.clone(), all_accounts());
let mut count = 0;
let mut green_stream = pin!(green_stream);
while let Some(message) = green_stream.next().await {
match message {
Message::GeyserSubscribeUpdate(subscriber_update) => {
match subscriber_update.update_oneof {
Some(UpdateOneof::Account(update)) => {
if let Message::GeyserSubscribeUpdate(subscriber_update) = message {
if let Some(UpdateOneof::Account(update)) = subscriber_update.update_oneof {
debug!("Account from geyser: {:?}", update.account.unwrap().pubkey);
count += 1;
if count > 3 {
return;
}
}
_ => {}
}
}
_ => {}
}
};
panic!("failed to receive the requested accounts");
}
async fn create_geyser_token_account_task(config: GrpcSourceConfig) {
let green_stream = create_geyser_reconnecting_stream(
config.clone(),
token_accounts(),
);
let green_stream = create_geyser_reconnecting_stream(config.clone(), token_accounts());
let mut count = 0;
let mut green_stream = pin!(green_stream);
while let Some(message) = green_stream.next().await {
match message {
Message::GeyserSubscribeUpdate(subscriber_update) => {
match subscriber_update.update_oneof {
Some(UpdateOneof::Account(update)) => {
if let Message::GeyserSubscribeUpdate(subscriber_update) = message {
if let Some(UpdateOneof::Account(update)) = subscriber_update.update_oneof {
debug!("Token Account: {:?}", update.account.unwrap().pubkey);
count += 1;
if count > 3 {
return;
}
}
_ => {}
}
}
_ => {}
}
};
panic!("failed to receive the requested token accounts");
}
@ -171,7 +173,7 @@ async fn create_geyser_token_account_task(config: GrpcSourceConfig) {
async fn rpc_gpa(rpc_client: Arc<RpcClient>) {
let program_pubkey = Pubkey::from_str("4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg").unwrap();
let filter = RpcFilterType::Memcmp(Memcmp::new_raw_bytes(30, vec![42]));
let _filter = RpcFilterType::Memcmp(Memcmp::new_raw_bytes(30, vec![42]));
let _config = RpcProgramAccountsConfig {
// filters: Some(vec![filter]),
@ -196,16 +198,16 @@ async fn rpc_gpa(rpc_client: Arc<RpcClient>) {
debug!("Program accounts: {:?}", program_accounts.len());
// mango 12400 on mainnet
assert!(program_accounts.len() > 1000, "program accounts count is too low");
assert!(
program_accounts.len() > 1000,
"program accounts count is too low"
);
}
async fn rpc_get_account_info(rpc_client: Arc<RpcClient>) {
let program_pubkey = Pubkey::from_str("4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg").unwrap();
let account_info = rpc_client
.get_account(&program_pubkey)
.await
.unwrap();
let account_info = rpc_client.get_account(&program_pubkey).await.unwrap();
debug!("Account info: {:?}", account_info);
@ -217,10 +219,7 @@ async fn rpc_get_token_accounts_by_owner(rpc_client: Arc<RpcClient>) {
let mint = Pubkey::from_str("EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v").unwrap();
let token_accounts = rpc_client
.get_token_accounts_by_owner(
&owner_pubkey,
TokenAccountsFilter::Mint(mint),
)
.get_token_accounts_by_owner(&owner_pubkey, TokenAccountsFilter::Mint(mint))
.await
.context("rpc_get_token_accounts_by_owner")
.unwrap();
@ -228,7 +227,7 @@ async fn rpc_get_token_accounts_by_owner(rpc_client: Arc<RpcClient>) {
// 1 account
debug!("Token accounts: {:?}", token_accounts.len());
assert!(token_accounts.len() > 0, "token accounts count is zero");
assert!(!token_accounts.is_empty(), "token accounts count is zero");
}
async fn rpc_get_signatures_for_address(rpc_client: Arc<RpcClient>) {
@ -253,12 +252,8 @@ async fn rpc_get_signatures_for_address(rpc_client: Arc<RpcClient>) {
assert!(signatures.len() > 10, "signatures count is too low");
}
async fn websocket_account_subscribe(
rpc_url: Url
) {
let sysvar_subscribe =
json!({
async fn websocket_account_subscribe(rpc_url: Url) {
let sysvar_subscribe = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "accountSubscribe",
@ -292,7 +287,6 @@ async fn websocket_account_subscribe(
panic!("failed to receive the requested sysvar clock accounts");
}
pub fn all_accounts() -> SubscribeRequest {
let mut accounts_subs = HashMap::new();
accounts_subs.insert(
@ -317,7 +311,6 @@ pub fn all_accounts() -> SubscribeRequest {
}
}
pub fn token_accounts() -> SubscribeRequest {
let mut accounts_subs = HashMap::new();
accounts_subs.insert(
@ -325,13 +318,14 @@ pub fn token_accounts() -> SubscribeRequest {
SubscribeRequestFilterAccounts {
account: vec![],
// vec!["4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg".to_string()],
owner:
spl_token_ids().iter().map(|pubkey| pubkey.to_string()).collect(),
owner: spl_token_ids()
.iter()
.map(|pubkey| pubkey.to_string())
.collect(),
filters: vec![],
},
);
SubscribeRequest {
slots: HashMap::new(),
accounts: accounts_subs,
@ -344,4 +338,3 @@ pub fn token_accounts() -> SubscribeRequest {
ping: None,
}
}

View File

@ -1,10 +1,5 @@
use std::collections::HashMap;
use std::pin::pin;
use std::str::FromStr;
use std::thread::sleep;
use std::time::Duration;
use geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use serde_json::json;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client::rpc_client::GetConfirmedSignaturesForAddress2Config;
@ -12,17 +7,22 @@ use solana_rpc_client_api::request::TokenAccountsFilter;
use solana_rpc_client_api::response::SlotInfo;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use std::collections::HashMap;
use std::pin::pin;
use std::str::FromStr;
use std::thread::sleep;
use std::time::Duration;
use tokio::select;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::Sender;
use tokio::time::Instant;
use tokio_stream::{Stream, StreamExt};
use tokio_stream::wrappers::{BroadcastStream, ReceiverStream};
use tracing::{info, trace, warn};
use websocket_tungstenite_retry::websocket_stable::{StableWebSocket, WsMessage};
use tokio_stream::StreamExt;
use tracing::info;
use url::Url;
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots, SubscribeUpdate};
use websocket_tungstenite_retry::websocket_stable::{StableWebSocket, WsMessage};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{
SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots, SubscribeUpdate,
};
type Slot = u64;
@ -31,15 +31,18 @@ async fn main() {
tracing_subscriber::fmt::init();
let ws_url1 = format!("wss://api.mainnet-beta.solana.com");
let ws_url2 = format!("wss://mango.rpcpool.com/{MAINNET_API_TOKEN}",
MAINNET_API_TOKEN = std::env::var("MAINNET_API_TOKEN").unwrap());
let rpc_url = format!("https://mango.rpcpool.com/{MAINNET_API_TOKEN}",
MAINNET_API_TOKEN = std::env::var("MAINNET_API_TOKEN").unwrap());
let ws_url2 = format!(
"wss://mango.rpcpool.com/{MAINNET_API_TOKEN}",
MAINNET_API_TOKEN = std::env::var("MAINNET_API_TOKEN").unwrap()
);
let rpc_url = format!(
"https://mango.rpcpool.com/{MAINNET_API_TOKEN}",
MAINNET_API_TOKEN = std::env::var("MAINNET_API_TOKEN").unwrap()
);
let rpc_url = Url::parse(rpc_url.as_str()).unwrap();
let grpc_addr = std::env::var("GRPC_ADDR").unwrap();
let timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(10),
request_timeout: Duration::from_secs(10),
@ -47,18 +50,22 @@ async fn main() {
receive_timeout: Duration::from_secs(10),
};
let config = GrpcSourceConfig::new(grpc_addr.to_string(), None, None, timeouts.clone());
let (slots_tx, mut slots_rx) = tokio::sync::mpsc::channel(100);
start_geyser_slots_task(config.clone(), slots_tx.clone());
tokio::spawn(websocket_source(Url::parse(ws_url1.as_str()).unwrap(), slots_tx.clone()));
tokio::spawn(websocket_source(Url::parse(ws_url2.as_str()).unwrap(), slots_tx.clone()));
tokio::spawn(websocket_source(
Url::parse(ws_url1.as_str()).unwrap(),
slots_tx.clone(),
));
tokio::spawn(websocket_source(
Url::parse(ws_url2.as_str()).unwrap(),
slots_tx.clone(),
));
tokio::spawn(rpc_getslot_source(rpc_url, slots_tx.clone()));
let started_at = Instant::now();
while let Some(slot) = slots_rx.recv().await {
println!("Slot: {}", slot);
@ -71,13 +78,7 @@ async fn main() {
sleep(Duration::from_secs(15));
}
async fn rpc_getslot_source(
rpc_url: Url,
mpsc_downstream: tokio::sync::mpsc::Sender<Slot>,
) {
async fn rpc_getslot_source(rpc_url: Url, mpsc_downstream: tokio::sync::mpsc::Sender<Slot>) {
let rpc = RpcClient::new(rpc_url.to_string());
loop {
tokio::time::sleep(Duration::from_millis(100)).await;
@ -87,20 +88,13 @@ async fn rpc_getslot_source(
.unwrap();
match mpsc_downstream.send(slot).await {
Ok(_) => {}
Err(_) => return
Err(_) => return,
}
}
}
}
async fn websocket_source(
rpc_url: Url,
mpsc_downstream: tokio::sync::mpsc::Sender<Slot>,
) {
let processed_slot_subscribe =
json!({
async fn websocket_source(rpc_url: Url, mpsc_downstream: tokio::sync::mpsc::Sender<Slot>) {
let processed_slot_subscribe = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "slotSubscribe",
@ -118,51 +112,40 @@ async fn websocket_source(
while let Ok(msg) = channel.recv().await {
if let WsMessage::Text(payload) = msg {
let ws_result: jsonrpsee_types::SubscriptionResponse<SlotInfo> = serde_json::from_str(&payload).unwrap();
let ws_result: jsonrpsee_types::SubscriptionResponse<SlotInfo> =
serde_json::from_str(&payload).unwrap();
let slot_info = ws_result.params.result;
match mpsc_downstream.send(slot_info.slot).await {
Ok(_) => {}
Err(_) => return
Err(_) => return,
}
}
}
}
// note: this might fail if the yellowstone plugin does not allow "any broadcast filter"
fn start_geyser_slots_task(config: GrpcSourceConfig,
fn start_geyser_slots_task(
config: GrpcSourceConfig,
mpsc_downstream: tokio::sync::mpsc::Sender<Slot>,
) {
let green_stream = create_geyser_reconnecting_stream(
config.clone(),
slots(),
);
let green_stream = create_geyser_reconnecting_stream(config.clone(), slots());
tokio::spawn(async move {
let mut green_stream = pin!(green_stream);
while let Some(message) = green_stream.next().await {
match message {
Message::GeyserSubscribeUpdate(subscriber_update) => {
match subscriber_update.update_oneof {
Some(UpdateOneof::Slot(slot_info)) => {
if let Message::GeyserSubscribeUpdate(subscriber_update) = message {
if let Some(UpdateOneof::Slot(slot_info)) = subscriber_update.update_oneof {
info!("Slot from geyser: {:?}", slot_info.slot);
match mpsc_downstream.send(slot_info.slot).await {
Ok(_) => {}
Err(_) => return
Err(_) => return,
}
}
_ => {}
}
}
_ => {}
}
}
});
}
pub fn slots() -> SubscribeRequest {
let mut slot_subs = HashMap::new();
slot_subs.insert(