minimal working version
This commit is contained in:
commit
2247116fc9
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,33 @@
|
|||
[package]
|
||||
name = "geyser-grpc-connector"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
yellowstone-grpc-client = "1.11.0"
|
||||
yellowstone-grpc-proto = "1.11.0"
|
||||
|
||||
solana-sdk = "~1.16.3"
|
||||
|
||||
async-trait = "0.1.68"
|
||||
async-stream = "0.3.5"
|
||||
tokio = { version = "1.32" , features = ["full"] }
|
||||
serde = { version = "1.0.160", features = ["derive"] }
|
||||
serde_json = "1.0.96"
|
||||
bincode = "1.3.3"
|
||||
bs58 = "0.4.0"
|
||||
base64 = "0.21.0"
|
||||
thiserror = "1.0.40"
|
||||
futures = "0.3.28"
|
||||
bytes = "1.4.0"
|
||||
anyhow = "1.0.70"
|
||||
log = "0.4.17"
|
||||
dashmap = "5.4.0"
|
||||
const_env = "0.1.2"
|
||||
tracing = "0.1.37"
|
||||
tracing-subscriber = "0.3.16"
|
||||
chrono = "0.4.24"
|
||||
prometheus = "0.13.3"
|
||||
lazy_static = "1.4.0"
|
||||
async-channel = "1.8.0"
|
||||
itertools = "0.10.5"
|
|
@ -0,0 +1,66 @@
|
|||
use std::collections::{HashMap, HashSet};
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::ops::{Add, Deref, Sub};
|
||||
use std::pin::{pin, Pin};
|
||||
use anyhow::{bail, Context};
|
||||
use async_stream::stream;
|
||||
use futures::{Stream, StreamExt};
|
||||
use itertools::{Itertools};
|
||||
|
||||
use log::{debug, error, info, warn};
|
||||
use solana_sdk::clock::Slot;
|
||||
use solana_sdk::commitment_config::CommitmentConfig;
|
||||
use tokio::{select};
|
||||
use tokio::sync::broadcast::{Receiver, Sender};
|
||||
use tokio::task::{JoinHandle, JoinSet};
|
||||
use tokio::time::{sleep, Duration, timeout, Instant, sleep_until};
|
||||
use yellowstone_grpc_client::GeyserGrpcClient;
|
||||
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeUpdate, SubscribeUpdateBlock, SubscribeUpdateBlockMeta};
|
||||
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
|
||||
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
|
||||
use geyser_grpc_connector::grpcmultiplex_fastestwins::{create_multiplex, GrpcSourceConfig};
|
||||
|
||||
|
||||
fn start_example_consumer(blocks_notifier: Receiver<SubscribeUpdateBlock>) {
|
||||
tokio::spawn(async move {
|
||||
let mut blocks_notifier = blocks_notifier;
|
||||
loop {
|
||||
let block = blocks_notifier.recv().await.unwrap();
|
||||
info!("received block #{} with {} txs", block.slot, block.transactions.len());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
pub async fn main() {
|
||||
// RUST_LOG=info,grpc_using_streams=debug
|
||||
tracing_subscriber::fmt::init();
|
||||
// console_subscriber::init();
|
||||
|
||||
// mango validator (mainnet)
|
||||
let grpc_addr_mainnet_triton = "http://202.8.9.108:10000".to_string();
|
||||
// via toxiproxy
|
||||
let grpc_addr_mainnet_triton_toxi = "http://127.0.0.1:10001".to_string();
|
||||
// ams81 (mainnet)
|
||||
let grpc_addr_mainnet_ams81 = "http://202.8.8.12:10000".to_string();
|
||||
// testnet - NOTE: this connection has terrible lags (almost 5 minutes)
|
||||
// let grpc_addr = "http://147.28.169.13:10000".to_string();
|
||||
|
||||
let (block_sx, blocks_notifier) = tokio::sync::broadcast::channel(1000);
|
||||
|
||||
let green_config = GrpcSourceConfig::new("triton".to_string(), grpc_addr_mainnet_triton, None);
|
||||
let blue_config = GrpcSourceConfig::new("mangoams81".to_string(), grpc_addr_mainnet_ams81, None);
|
||||
let toxiproxy_config = GrpcSourceConfig::new("toxiproxy".to_string(), grpc_addr_mainnet_triton_toxi, None);
|
||||
|
||||
create_multiplex(
|
||||
vec![green_config, blue_config, toxiproxy_config],
|
||||
CommitmentLevel::Confirmed,
|
||||
block_sx);
|
||||
|
||||
start_example_consumer(blocks_notifier);
|
||||
|
||||
// "infinite" sleep
|
||||
sleep(Duration::from_secs(1800)).await;
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,95 @@
|
|||
// use crate::{
|
||||
// endpoint_stremers::EndpointStreaming,
|
||||
// rpc_polling::vote_accounts_and_cluster_info_polling::poll_vote_accounts_and_cluster_info,
|
||||
// };
|
||||
use anyhow::{bail, Context};
|
||||
use futures::{Stream, StreamExt};
|
||||
use itertools::Itertools;
|
||||
use solana_sdk::{
|
||||
borsh0_10::try_from_slice_unchecked,
|
||||
commitment_config::CommitmentConfig,
|
||||
compute_budget::{self, ComputeBudgetInstruction},
|
||||
hash::Hash,
|
||||
instruction::CompiledInstruction,
|
||||
message::{
|
||||
v0::{self, MessageAddressTableLookup},
|
||||
MessageHeader, VersionedMessage,
|
||||
},
|
||||
pubkey::Pubkey,
|
||||
signature::Signature,
|
||||
transaction::TransactionError,
|
||||
};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tokio::sync::broadcast::Sender;
|
||||
use yellowstone_grpc_client::GeyserGrpcClient;
|
||||
use yellowstone_grpc_proto::prelude::{
|
||||
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks,
|
||||
SubscribeRequestFilterSlots, SubscribeUpdateBlock,
|
||||
};
|
||||
|
||||
pub fn create_block_processing_task(
|
||||
grpc_addr: String,
|
||||
grpc_x_token: Option<String>,
|
||||
block_sx: Sender<SubscribeUpdateBlock>,
|
||||
commitment_level: CommitmentLevel,
|
||||
) -> tokio::task::JoinHandle<anyhow::Result<()>> {
|
||||
let mut blocks_subs = HashMap::new();
|
||||
blocks_subs.insert(
|
||||
"client".to_string(),
|
||||
SubscribeRequestFilterBlocks {
|
||||
account_include: Default::default(),
|
||||
include_transactions: Some(true),
|
||||
include_accounts: Some(false),
|
||||
include_entries: Some(false),
|
||||
},
|
||||
);
|
||||
|
||||
let commitment_config = match commitment_level {
|
||||
CommitmentLevel::Confirmed => CommitmentConfig::confirmed(),
|
||||
CommitmentLevel::Finalized => CommitmentConfig::finalized(),
|
||||
CommitmentLevel::Processed => CommitmentConfig::processed(),
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
// connect to grpc
|
||||
let mut client = GeyserGrpcClient::connect(grpc_addr, grpc_x_token, None)?;
|
||||
let mut stream = client
|
||||
.subscribe_once(
|
||||
HashMap::new(),
|
||||
Default::default(),
|
||||
HashMap::new(),
|
||||
Default::default(),
|
||||
blocks_subs,
|
||||
Default::default(),
|
||||
Some(commitment_level),
|
||||
Default::default(),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
while let Some(message) = stream.next().await {
|
||||
let message = message?;
|
||||
|
||||
let Some(update) = message.update_oneof else {
|
||||
continue;
|
||||
};
|
||||
|
||||
match update {
|
||||
UpdateOneof::Block(block) => {
|
||||
// let block = map_produced_block(block, commitment_config);
|
||||
|
||||
block_sx
|
||||
.send(block)
|
||||
.context("Grpc failed to send a block")?;
|
||||
}
|
||||
UpdateOneof::Ping(_) => {
|
||||
log::trace!("GRPC Ping");
|
||||
}
|
||||
u => {
|
||||
bail!("Unexpected update: {u:?}");
|
||||
}
|
||||
};
|
||||
}
|
||||
bail!("geyser slot stream ended");
|
||||
})
|
||||
}
|
|
@ -0,0 +1,220 @@
|
|||
use std::collections::{HashMap, HashSet};
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::ops::{Add, Deref, Sub};
|
||||
use std::pin::{pin, Pin};
|
||||
use anyhow::{bail, Context};
|
||||
use async_stream::stream;
|
||||
use futures::{Stream, StreamExt};
|
||||
use itertools::{Itertools};
|
||||
|
||||
use log::{debug, error, info, warn};
|
||||
use solana_sdk::clock::Slot;
|
||||
use solana_sdk::commitment_config::CommitmentConfig;
|
||||
use tokio::{select};
|
||||
use tokio::sync::broadcast::{Receiver, Sender};
|
||||
use tokio::task::{JoinHandle, JoinSet};
|
||||
use tokio::time::{sleep, Duration, timeout, Instant, sleep_until};
|
||||
use yellowstone_grpc_client::GeyserGrpcClient;
|
||||
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeUpdate, SubscribeUpdateBlock, SubscribeUpdateBlockMeta};
|
||||
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
|
||||
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
|
||||
|
||||
// use solana_lite_rpc_cluster_endpoints::grpc_subscription::{create_block_processing_task, map_produced_block};
|
||||
// use solana_lite_rpc_core::AnyhowJoinHandle;
|
||||
// use solana_lite_rpc_core::structures::produced_block::ProducedBlock;
|
||||
|
||||
|
||||
// TODO map SubscribeUpdateBlock
|
||||
pub fn create_multiplex(
|
||||
grpc_sources: Vec<GrpcSourceConfig>,
|
||||
commitment_level: CommitmentLevel,
|
||||
block_sx: Sender<SubscribeUpdateBlock>,
|
||||
) -> tokio::task::JoinHandle<anyhow::Result<()>> {
|
||||
assert!(
|
||||
commitment_level == CommitmentLevel::Confirmed
|
||||
|| commitment_level == CommitmentLevel::Finalized,
|
||||
"Only CONFIRMED and FINALIZED is supported");
|
||||
// note: PROCESSED blocks are not sequential in presense of forks; this will break the logic
|
||||
|
||||
if grpc_sources.len() < 1 {
|
||||
panic!("Must have at least one source");
|
||||
}
|
||||
|
||||
let commitment_config = match commitment_level {
|
||||
CommitmentLevel::Confirmed => CommitmentConfig::confirmed(),
|
||||
CommitmentLevel::Finalized => CommitmentConfig::finalized(),
|
||||
// not used, not supported!
|
||||
CommitmentLevel::Processed => CommitmentConfig::processed(),
|
||||
};
|
||||
|
||||
let jh = tokio::spawn(async move {
|
||||
info!("Starting multiplexer with {} sources: {}",
|
||||
grpc_sources.len(),
|
||||
grpc_sources.iter().map(|source| source.label.clone()).join(", "));
|
||||
|
||||
let mut futures = futures::stream::SelectAll::new();
|
||||
for grpc_source in grpc_sources {
|
||||
// note: stream never terminates
|
||||
let stream = create_geyser_reconnecting_stream(grpc_source.clone(), commitment_level).await;
|
||||
futures.push(Box::pin(stream));
|
||||
}
|
||||
|
||||
let mut current_slot: Slot = 0;
|
||||
|
||||
let mut start_stream33 = false;
|
||||
'main_loop: loop {
|
||||
|
||||
let block_cmd = select! {
|
||||
message = futures.next() => {
|
||||
match message {
|
||||
Some(message) => {
|
||||
map_filter_block_message(current_slot, message, commitment_config)
|
||||
}
|
||||
None => {
|
||||
panic!("source stream is not supposed to terminate");
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
match block_cmd {
|
||||
BlockCmd::ForwardBlock(block) => {
|
||||
current_slot = block.slot;
|
||||
block_sx.send(block).context("send block to downstream")?;
|
||||
}
|
||||
BlockCmd::DiscardBlockBehindTip(slot) => {
|
||||
debug!(". discarding redundant block #{}", slot);
|
||||
}
|
||||
BlockCmd::SkipMessage => {
|
||||
debug!(". skipping this message by type");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
return jh;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum BlockCmd {
|
||||
ForwardBlock(SubscribeUpdateBlock),
|
||||
DiscardBlockBehindTip(Slot),
|
||||
// skip geyser messages which are not block related updates
|
||||
SkipMessage,
|
||||
}
|
||||
|
||||
fn map_filter_block_message(current_slot: Slot, update_message: SubscribeUpdate, commitment_config: CommitmentConfig) -> BlockCmd {
|
||||
if let Some(UpdateOneof::Block(update_block_message)) = update_message.update_oneof {
|
||||
if update_block_message.slot <= current_slot && current_slot != 0 {
|
||||
// no progress - skip this
|
||||
return BlockCmd::DiscardBlockBehindTip(update_block_message.slot);
|
||||
}
|
||||
|
||||
// expensive
|
||||
// let produced_block = map_produced_block(update_block_message, commitment_config);
|
||||
|
||||
BlockCmd::ForwardBlock(update_block_message)
|
||||
} else {
|
||||
return BlockCmd::SkipMessage;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct GrpcSourceConfig {
|
||||
// symbolic name used in logs
|
||||
label: String,
|
||||
grpc_addr: String,
|
||||
grpc_x_token: Option<String>,
|
||||
tls_config: Option<ClientTlsConfig>,
|
||||
}
|
||||
|
||||
impl GrpcSourceConfig {
|
||||
pub fn new(label: String, grpc_addr: String, grpc_x_token: Option<String>) -> Self {
|
||||
Self {
|
||||
label,
|
||||
grpc_addr,
|
||||
grpc_x_token,
|
||||
tls_config: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO use GrpcSource
|
||||
// note: stream never terminates
|
||||
async fn create_geyser_reconnecting_stream(
|
||||
grpc_source: GrpcSourceConfig,
|
||||
commitment_level: CommitmentLevel) -> impl Stream<Item = SubscribeUpdate> {
|
||||
let label = grpc_source.label.clone();
|
||||
stream! {
|
||||
let mut throttle_barrier = Instant::now();
|
||||
'reconnect_loop: loop {
|
||||
sleep_until(throttle_barrier).await;
|
||||
throttle_barrier = Instant::now().add(Duration::from_millis(1000));
|
||||
|
||||
let connect_result = GeyserGrpcClient::connect_with_timeout(
|
||||
grpc_source.grpc_addr.clone(), grpc_source.grpc_x_token.clone(), grpc_source.tls_config.clone(),
|
||||
Some(Duration::from_secs(2)), Some(Duration::from_secs(2)), false).await;
|
||||
|
||||
let mut client = match connect_result {
|
||||
Ok(connected_client) => connected_client,
|
||||
Err(geyser_grpc_client_error) => {
|
||||
// TODO identify non-recoverable errors and cancel stream
|
||||
warn!("Connect failed on {} - retrying: {:?}", label, geyser_grpc_client_error);
|
||||
continue 'reconnect_loop;
|
||||
}
|
||||
};
|
||||
|
||||
let mut blocks_subs = HashMap::new();
|
||||
blocks_subs.insert(
|
||||
"client".to_string(),
|
||||
SubscribeRequestFilterBlocks {
|
||||
account_include: Default::default(),
|
||||
include_transactions: Some(true),
|
||||
include_accounts: Some(false),
|
||||
include_entries: Some(false),
|
||||
},
|
||||
);
|
||||
|
||||
let subscribe_result = client
|
||||
.subscribe_once(
|
||||
HashMap::new(),
|
||||
Default::default(),
|
||||
HashMap::new(),
|
||||
Default::default(),
|
||||
blocks_subs,
|
||||
Default::default(),
|
||||
Some(commitment_level),
|
||||
Default::default(),
|
||||
None,
|
||||
).await;
|
||||
|
||||
let geyser_stream = match subscribe_result {
|
||||
Ok(subscribed_stream) => subscribed_stream,
|
||||
Err(geyser_grpc_client_error) => {
|
||||
// TODO identify non-recoverable errors and cancel stream
|
||||
warn!("Subscribe failed on {} - retrying: {:?}", label, geyser_grpc_client_error);
|
||||
continue 'reconnect_loop;
|
||||
}
|
||||
};
|
||||
|
||||
for await update_message in geyser_stream {
|
||||
match update_message {
|
||||
Ok(update_message) => {
|
||||
info!(">message on {}", label);
|
||||
yield update_message;
|
||||
}
|
||||
Err(tonic_status) => {
|
||||
// TODO identify non-recoverable errors and cancel stream
|
||||
warn!("Receive error on {} - retrying: {:?}", label, tonic_status);
|
||||
continue 'reconnect_loop;
|
||||
}
|
||||
}
|
||||
} // -- production loop
|
||||
|
||||
warn!("stream consumer loop terminated for {}", label);
|
||||
} // -- main loop
|
||||
} // -- stream!
|
||||
|
||||
}
|
|
@ -0,0 +1,3 @@
|
|||
pub mod grpcmultiplex_fastestwins;
|
||||
pub mod grpc_subscription;
|
||||
|
Loading…
Reference in New Issue