clippy+fmt

This commit is contained in:
GroovieGermanikus 2024-08-07 14:21:38 +02:00
parent 9d00c38731
commit b0357b5441
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
3 changed files with 83 additions and 45 deletions

View File

@ -6,10 +6,10 @@ use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyse
use geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use serde_json::json;
use solana_account_decoder::parse_token::spl_token_ids;
use solana_account_decoder::{UiAccountEncoding, UiDataSliceConfig};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client::rpc_client::GetConfirmedSignaturesForAddress2Config;
use solana_rpc_client_api::config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
use solana_rpc_client_api::filter::{Memcmp, RpcFilterType};
use solana_rpc_client_api::request::TokenAccountsFilter;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
@ -19,7 +19,6 @@ use std::pin::pin;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use solana_account_decoder::{UiAccountEncoding, UiDataSliceConfig};
use tokio::task::JoinSet;
use tokio::time::timeout;
use tracing::debug;
@ -235,7 +234,8 @@ async fn rpc_get_account_info(rpc_client: Arc<RpcClient>) {
async fn rpc_get_token_accounts_by_owner(rpc_client: Arc<RpcClient>) {
let owner_pubkey = Pubkey::from_str("gmgLgwHZbRxbPHuGtE2cVVAXL6yrS8SvvFkDNjmWfkj").unwrap();
let mint_usdc: Pubkey = Pubkey::from_str("EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v").unwrap();
let mint_usdc: Pubkey =
Pubkey::from_str("EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v").unwrap();
let token_accounts = rpc_client
.get_token_accounts_by_owner(&owner_pubkey, TokenAccountsFilter::Mint(mint_usdc))

View File

@ -1,35 +1,24 @@
mod utils;
use crate::utils::configure_panic_hook;
use enum_iterator::Sequence;
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use itertools::Itertools;
use serde_json::json;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client::rpc_client::GetConfirmedSignaturesForAddress2Config;
use solana_rpc_client_api::request::TokenAccountsFilter;
use solana_rpc_client_api::response::SlotInfo;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use std::collections::{HashMap, HashSet};
use std::env;
use std::pin::pin;
use std::str::FromStr;
use std::thread::sleep;
use std::time::Duration;
use enum_iterator::Sequence;
use itertools::Itertools;
use tokio::{io, select};
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc::error::SendError;
use tokio::time::Instant;
use tokio_stream::StreamExt;
use tracing::{error, info};
use url::Url;
use websocket_tungstenite_retry::websocket_stable::{StableWebSocket, WsMessage};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{
SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots, SubscribeUpdate,
};
use crate::utils::configure_panic_hook;
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterSlots};
type Slot = u64;
@ -42,6 +31,7 @@ enum SlotSource {
YellowstoneGrpc,
}
#[allow(dead_code)]
struct SlotDatapoint {
source: SlotSource,
slot: Slot,
@ -61,11 +51,12 @@ impl SlotDatapoint {
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
async fn main() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO).init();
.with_max_level(tracing::Level::INFO)
.init();
configure_panic_hook();
let solana_rpc_url = format!("https://api.mainnet-beta.solana.com");
let solana_ws_url = format!("wss://api.mainnet-beta.solana.com");
let solana_rpc_url = "https://api.mainnet-beta.solana.com".to_string();
let solana_ws_url = "wss://api.mainnet-beta.solana.com".to_string();
let triton_ws_url = format!(
"wss://mango.rpcpool.com/{MAINNET_API_TOKEN}",
MAINNET_API_TOKEN = std::env::var("MAINNET_API_TOKEN").unwrap()
@ -77,8 +68,6 @@ async fn main() {
let solana_rpc_url = Url::parse(solana_rpc_url.as_str()).unwrap();
let triton_rpc_url = Url::parse(triton_rpc_url.as_str()).unwrap();
let grpc_addr = std::env::var("GRPC_ADDR").expect("require env variable GRPC_ADDR");
let grpc_x_token = env::var("GRPC_X_TOKEN").ok();
@ -93,7 +82,11 @@ async fn main() {
let (slots_tx, mut slots_rx) = tokio::sync::mpsc::channel::<SlotDatapoint>(100);
start_geyser_slots_task(config.clone(), SlotSource::YellowstoneGrpc, slots_tx.clone());
start_geyser_slots_task(
config.clone(),
SlotSource::YellowstoneGrpc,
slots_tx.clone(),
);
tokio::spawn(websocket_source(
Url::parse(solana_ws_url.as_str()).unwrap(),
@ -105,10 +98,17 @@ async fn main() {
SlotSource::TritonWebsocket,
slots_tx.clone(),
));
tokio::spawn(rpc_getslot_source(solana_rpc_url, SlotSource::SolanaRpc, slots_tx.clone()));
tokio::spawn(rpc_getslot_source(triton_rpc_url, SlotSource::TritonRpc, slots_tx.clone()));
tokio::spawn(rpc_getslot_source(
solana_rpc_url,
SlotSource::SolanaRpc,
slots_tx.clone(),
));
tokio::spawn(rpc_getslot_source(
triton_rpc_url,
SlotSource::TritonRpc,
slots_tx.clone(),
));
let started_at = Instant::now();
let mut latest_slot_per_source: HashMap<SlotSource, Slot> = HashMap::new();
while let Some(SlotDatapoint { slot, source, .. }) = slots_rx.recv().await {
// println!("Slot from {:?}: {}", source, slot);
@ -120,10 +120,13 @@ async fn main() {
// break;
// }
}
}
async fn rpc_getslot_source(rpc_url: Url, slot_source: SlotSource, mpsc_downstream: tokio::sync::mpsc::Sender<SlotDatapoint>) {
async fn rpc_getslot_source(
rpc_url: Url,
slot_source: SlotSource,
mpsc_downstream: tokio::sync::mpsc::Sender<SlotDatapoint>,
) {
let rpc = RpcClient::new_with_timeout(rpc_url.to_string(), Duration::from_secs(5));
loop {
tokio::time::sleep(Duration::from_millis(800)).await;
@ -133,7 +136,10 @@ async fn rpc_getslot_source(rpc_url: Url, slot_source: SlotSource, mpsc_downstre
match res {
Ok(slot) => {
match mpsc_downstream.send(SlotDatapoint::new(slot_source.clone(), slot)).await {
match mpsc_downstream
.send(SlotDatapoint::new(slot_source.clone(), slot))
.await
{
Ok(_) => {}
Err(_) => return,
}
@ -143,13 +149,14 @@ async fn rpc_getslot_source(rpc_url: Url, slot_source: SlotSource, mpsc_downstre
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
}
async fn websocket_source(rpc_url: Url, slot_source: SlotSource,
mpsc_downstream: tokio::sync::mpsc::Sender<SlotDatapoint>) {
async fn websocket_source(
rpc_url: Url,
slot_source: SlotSource,
mpsc_downstream: tokio::sync::mpsc::Sender<SlotDatapoint>,
) {
let processed_slot_subscribe = json!({
"jsonrpc": "2.0",
"id": 1,
@ -171,7 +178,10 @@ async fn websocket_source(rpc_url: Url, slot_source: SlotSource,
let ws_result: jsonrpsee_types::SubscriptionResponse<SlotInfo> =
serde_json::from_str(&payload).unwrap();
let slot_info = ws_result.params.result;
match mpsc_downstream.send(SlotDatapoint::new(slot_source.clone(), slot_info.slot)).await {
match mpsc_downstream
.send(SlotDatapoint::new(slot_source.clone(), slot_info.slot))
.await
{
Ok(_) => {}
Err(_) => panic!("downstream error"),
}
@ -194,7 +204,10 @@ fn start_geyser_slots_task(
while let Some(message) = green_stream.next().await {
if let Message::GeyserSubscribeUpdate(subscriber_update) = message {
if let Some(UpdateOneof::Slot(slot_info)) = subscriber_update.update_oneof {
match mpsc_downstream.send(SlotDatapoint::new(slot_source.clone(), slot_info.slot)).await {
match mpsc_downstream
.send(SlotDatapoint::new(slot_source.clone(), slot_info.slot))
.await
{
Ok(_) => {}
Err(_) => return,
}
@ -226,7 +239,6 @@ pub fn slots() -> SubscribeRequest {
}
}
async fn visualize_slots(latest_slot_per_source: &HashMap<SlotSource, Slot>) {
// println!("Slots: {:?}", latest_slot_per_source);
@ -234,19 +246,25 @@ async fn visualize_slots(latest_slot_per_source: &HashMap<SlotSource, Slot>) {
.map(|check| (format!("{:?}", check), check))
.collect();
let sorted_by_time: Vec<(&SlotSource, &Slot)> = latest_slot_per_source.iter().sorted_by_key(|(_, slot)| *slot).collect_vec();
let deltas = sorted_by_time.windows(2).map(|window| {
let (_source1, slot1) = window[0];
let (_source2, slot2) = window[1];
slot2 - slot1
}).collect_vec();
let sorted_by_time: Vec<(&SlotSource, &Slot)> = latest_slot_per_source
.iter()
.sorted_by_key(|(_, slot)| *slot)
.collect_vec();
let deltas = sorted_by_time
.windows(2)
.map(|window| {
let (_source1, slot1) = window[0];
let (_source2, slot2) = window[1];
slot2 - slot1
})
.collect_vec();
for i in 0..(sorted_by_time.len() + deltas.len()) {
if i % 2 == 0 {
let (source, slot) = sorted_by_time.get(i / 2).unwrap();
print!("{}({:?})", slot, source);
} else {
let edge = deltas.get(i / 2).unwrap().clone();
let edge = *deltas.get(i / 2).unwrap();
if edge == 0 {
print!(" = ");
} else if edge < 20 {
@ -269,5 +287,4 @@ async fn visualize_slots(latest_slot_per_source: &HashMap<SlotSource, Slot>) {
println!();
// print!("{}[2K\r", 27 as char);
}

21
src/utils.rs Normal file
View File

@ -0,0 +1,21 @@
use std::process;
use tracing::error;
pub fn configure_panic_hook() {
let default_panic = std::panic::take_hook();
std::panic::set_hook(Box::new(move |panic_info| {
default_panic(panic_info);
error!("{}", panic_info);
eprintln!("{}", panic_info);
if let Some(location) = panic_info.location() {
error!(
"panic occurred in file '{}' at line {}",
location.file(),
location.line(),
);
} else {
error!("panic occurred but can't get location information...");
}
process::exit(12);
}));
}