tag slot sources

This commit is contained in:
GroovieGermanikus 2024-07-25 12:57:39 +02:00
parent b12ce518a7
commit 6c1fc7df59
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 52 additions and 18 deletions

View File

@ -26,20 +26,50 @@ use yellowstone_grpc_proto::geyser::{
type Slot = u64;
#[derive(Debug, Clone)]
enum SlotSource {
SolanaWebsocket,
SolanaRpc,
TritonRpc,
TritonWebsocket,
YellowstoneGrpc,
}
struct SlotDatapoint {
source: SlotSource,
slot: Slot,
timestamp: Instant,
}
impl SlotDatapoint {
fn new(source: SlotSource, slot: Slot) -> Self {
Self {
source,
slot,
timestamp: Instant::now(),
}
}
}
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
async fn main() {
tracing_subscriber::fmt::init();
let ws_url1 = format!("wss://api.mainnet-beta.solana.com");
let ws_url2 = format!(
// TODO add solana rpc
let solana_rpc_url = format!("https://api.mainnet-beta.solana.com");
let solana_ws_url = format!("wss://api.mainnet-beta.solana.com");
let triton_ws_url = format!(
"wss://mango.rpcpool.com/{MAINNET_API_TOKEN}",
MAINNET_API_TOKEN = std::env::var("MAINNET_API_TOKEN").unwrap()
);
let rpc_url = format!(
let triton_rpc_url = format!(
"https://mango.rpcpool.com/{MAINNET_API_TOKEN}",
MAINNET_API_TOKEN = std::env::var("MAINNET_API_TOKEN").unwrap()
);
let rpc_url = Url::parse(rpc_url.as_str()).unwrap();
let solana_rpc_url = Url::parse(solana_rpc_url.as_str()).unwrap();
let triton_rpc_url = Url::parse(triton_rpc_url.as_str()).unwrap();
let grpc_addr = std::env::var("GRPC_ADDR").unwrap();
@ -52,23 +82,26 @@ async fn main() {
let config = GrpcSourceConfig::new(grpc_addr.to_string(), None, None, timeouts.clone());
let (slots_tx, mut slots_rx) = tokio::sync::mpsc::channel(100);
let (slots_tx, mut slots_rx) = tokio::sync::mpsc::channel::<SlotDatapoint>(100);
start_geyser_slots_task(config.clone(), slots_tx.clone());
start_geyser_slots_task(config.clone(), SlotSource::YellowstoneGrpc, slots_tx.clone());
tokio::spawn(websocket_source(
Url::parse(ws_url1.as_str()).unwrap(),
Url::parse(solana_ws_url.as_str()).unwrap(),
SlotSource::SolanaWebsocket,
slots_tx.clone(),
));
tokio::spawn(websocket_source(
Url::parse(ws_url2.as_str()).unwrap(),
Url::parse(triton_ws_url.as_str()).unwrap(),
SlotSource::TritonWebsocket,
slots_tx.clone(),
));
tokio::spawn(rpc_getslot_source(rpc_url, slots_tx.clone()));
tokio::spawn(rpc_getslot_source(solana_rpc_url, SlotSource::SolanaRpc, slots_tx.clone()));
tokio::spawn(rpc_getslot_source(triton_rpc_url, SlotSource::TritonRpc, slots_tx.clone()));
let started_at = Instant::now();
while let Some(slot) = slots_rx.recv().await {
println!("Slot: {}", slot);
while let Some(SlotDatapoint { slot, source, .. }) = slots_rx.recv().await {
println!("Slot from {:?}: {}", source, slot);
if Instant::now().duration_since(started_at) > Duration::from_secs(2) {
break;
@ -78,7 +111,7 @@ async fn main() {
sleep(Duration::from_secs(15));
}
async fn rpc_getslot_source(rpc_url: Url, mpsc_downstream: tokio::sync::mpsc::Sender<Slot>) {
async fn rpc_getslot_source(rpc_url: Url, slot_source: SlotSource, mpsc_downstream: tokio::sync::mpsc::Sender<SlotDatapoint>) {
let rpc = RpcClient::new(rpc_url.to_string());
loop {
tokio::time::sleep(Duration::from_millis(100)).await;
@ -86,14 +119,15 @@ async fn rpc_getslot_source(rpc_url: Url, mpsc_downstream: tokio::sync::mpsc::Se
.get_slot_with_commitment(CommitmentConfig::processed())
.await
.unwrap();
match mpsc_downstream.send(slot).await {
match mpsc_downstream.send(SlotDatapoint::new(slot_source.clone(), slot)).await {
Ok(_) => {}
Err(_) => return,
}
}
}
async fn websocket_source(rpc_url: Url, mpsc_downstream: tokio::sync::mpsc::Sender<Slot>) {
async fn websocket_source(rpc_url: Url, slot_source: SlotSource,
mpsc_downstream: tokio::sync::mpsc::Sender<SlotDatapoint>) {
let processed_slot_subscribe = json!({
"jsonrpc": "2.0",
"id": 1,
@ -115,7 +149,7 @@ async fn websocket_source(rpc_url: Url, mpsc_downstream: tokio::sync::mpsc::Send
let ws_result: jsonrpsee_types::SubscriptionResponse<SlotInfo> =
serde_json::from_str(&payload).unwrap();
let slot_info = ws_result.params.result;
match mpsc_downstream.send(slot_info.slot).await {
match mpsc_downstream.send(SlotDatapoint::new(slot_source.clone(), slot_info.slot)).await {
Ok(_) => {}
Err(_) => return,
}
@ -126,7 +160,8 @@ async fn websocket_source(rpc_url: Url, mpsc_downstream: tokio::sync::mpsc::Send
// note: this might fail if the yellowstone plugin does not allow "any broadcast filter"
fn start_geyser_slots_task(
config: GrpcSourceConfig,
mpsc_downstream: tokio::sync::mpsc::Sender<Slot>,
slot_source: SlotSource,
mpsc_downstream: tokio::sync::mpsc::Sender<SlotDatapoint>,
) {
let green_stream = create_geyser_reconnecting_stream(config.clone(), slots());
@ -135,8 +170,7 @@ fn start_geyser_slots_task(
while let Some(message) = green_stream.next().await {
if let Message::GeyserSubscribeUpdate(subscriber_update) = message {
if let Some(UpdateOneof::Slot(slot_info)) = subscriber_update.update_oneof {
info!("Slot from geyser: {:?}", slot_info.slot);
match mpsc_downstream.send(slot_info.slot).await {
match mpsc_downstream.send(SlotDatapoint::new(slot_source.clone(), slot_info.slot)).await {
Ok(_) => {}
Err(_) => return,
}