ported over grpc_fullstream.rs from lite-rpc commit 54cb41ee

This commit is contained in:
GroovieGermanikus 2023-12-14 10:58:26 +01:00
parent 7c50bf53af
commit babf35e122
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
5 changed files with 549 additions and 216 deletions

149
Cargo.lock generated
View File

@ -281,9 +281,9 @@ checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
[[package]]
name = "async-compression"
version = "0.4.5"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc2d0cfb2a7388d34f590e76686704c494ed7aaceed62ee1ba35cbf363abc2a5"
checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a"
dependencies = [
"brotli",
"flate2",
@ -1224,6 +1224,8 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-stream",
"base64 0.21.5",
"bincode",
"futures",
"itertools",
"log",
@ -1437,16 +1439,15 @@ dependencies = [
[[package]]
name = "hyper-rustls"
version = "0.24.2"
version = "0.23.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590"
checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c"
dependencies = [
"futures-util",
"http",
"hyper",
"rustls",
"rustls 0.20.9",
"tokio",
"tokio-rustls",
"tokio-rustls 0.23.4",
]
[[package]]
@ -2369,9 +2370,9 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]]
name = "reqwest"
version = "0.11.22"
version = "0.11.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b"
checksum = "13293b639a097af28fc8a90f22add145a9c954e49d77da06263d58cf44d5fb91"
dependencies = [
"async-compression",
"base64 0.21.5",
@ -2391,14 +2392,13 @@ dependencies = [
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls",
"rustls 0.20.9",
"rustls-pemfile",
"serde",
"serde_json",
"serde_urlencoded",
"system-configuration",
"tokio",
"tokio-rustls",
"tokio-rustls 0.23.4",
"tokio-util",
"tower-service",
"url",
@ -2409,6 +2409,21 @@ dependencies = [
"winreg",
]
[[package]]
name = "ring"
version = "0.16.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc"
dependencies = [
"cc",
"libc",
"once_cell",
"spin 0.5.2",
"untrusted 0.7.1",
"web-sys",
"winapi",
]
[[package]]
name = "ring"
version = "0.17.7"
@ -2418,8 +2433,8 @@ dependencies = [
"cc",
"getrandom 0.2.11",
"libc",
"spin",
"untrusted",
"spin 0.9.8",
"untrusted 0.9.0",
"windows-sys 0.48.0",
]
@ -2457,6 +2472,18 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "rustls"
version = "0.20.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b80e3dec595989ea8510028f30c408a4630db12c9cbb8de34203b89d6577e99"
dependencies = [
"log",
"ring 0.16.20",
"sct",
"webpki",
]
[[package]]
name = "rustls"
version = "0.21.10"
@ -2464,7 +2491,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba"
dependencies = [
"log",
"ring",
"ring 0.17.7",
"rustls-webpki",
"sct",
]
@ -2496,8 +2523,8 @@ version = "0.101.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765"
dependencies = [
"ring",
"untrusted",
"ring 0.17.7",
"untrusted 0.9.0",
]
[[package]]
@ -2553,8 +2580,8 @@ version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414"
dependencies = [
"ring",
"untrusted",
"ring 0.17.7",
"untrusted 0.9.0",
]
[[package]]
@ -3138,6 +3165,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "spin"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spin"
version = "0.9.8"
@ -3376,27 +3409,6 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "system-configuration"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7"
dependencies = [
"bitflags 1.3.2",
"core-foundation",
"system-configuration-sys",
]
[[package]]
name = "system-configuration-sys"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "tempfile"
version = "3.8.1"
@ -3523,21 +3535,32 @@ dependencies = [
"syn 2.0.40",
]
[[package]]
name = "tokio-rustls"
version = "0.23.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
dependencies = [
"rustls 0.20.9",
"tokio",
"webpki",
]
[[package]]
name = "tokio-rustls"
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081"
dependencies = [
"rustls",
"rustls 0.21.10",
"tokio",
]
[[package]]
name = "tokio-stream"
version = "0.1.14"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842"
checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313"
dependencies = [
"futures-core",
"pin-project-lite",
@ -3546,9 +3569,9 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.7.10"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15"
checksum = "f988a1a1adc2fb21f9c12aa96441da33a1728193ae0b95d2be22dbd17fcb4e5c"
dependencies = [
"bytes",
"futures-core",
@ -3615,11 +3638,11 @@ dependencies = [
"percent-encoding",
"pin-project",
"prost",
"rustls",
"rustls 0.21.10",
"rustls-native-certs",
"rustls-pemfile",
"tokio",
"tokio-rustls",
"tokio-rustls 0.24.1",
"tokio-stream",
"tower",
"tower-layer",
@ -3794,6 +3817,12 @@ dependencies = [
"void",
]
[[package]]
name = "untrusted"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "untrusted"
version = "0.9.0"
@ -3937,10 +3966,23 @@ dependencies = [
]
[[package]]
name = "webpki-roots"
version = "0.25.3"
name = "webpki"
version = "0.22.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10"
checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53"
dependencies = [
"ring 0.17.7",
"untrusted 0.9.0",
]
[[package]]
name = "webpki-roots"
version = "0.22.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87"
dependencies = [
"webpki",
]
[[package]]
name = "which"
@ -4137,12 +4179,11 @@ dependencies = [
[[package]]
name = "winreg"
version = "0.50.0"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1"
checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d"
dependencies = [
"cfg-if",
"windows-sys 0.48.0",
"winapi",
]
[[package]]

View File

@ -10,7 +10,7 @@ yellowstone-grpc-proto = "1.11.0"
solana-sdk = "~1.16.17"
async-stream = "0.3.5"
tokio = { version = "1.32" , features = ["full"] }
tokio = { version = "1.28.2" , features = ["full"] }
futures = "0.3.28"
anyhow = "1.0.70"
log = "0.4.17"
@ -18,3 +18,8 @@ tracing = "0.1.37"
tracing-subscriber = "0.3.16"
prometheus = "0.13.3"
itertools = "0.10.5"
base64 = "0.21.5"
bincode = "1.3.3"
#[dev-dependencies]
#solana-test-validator = "~1.16.17"

View File

@ -0,0 +1,226 @@
use base64::Engine;
use itertools::Itertools;
use solana_sdk::borsh0_10::try_from_slice_unchecked;
/// This file mocks the core model of the RPC server.
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::compute_budget;
use solana_sdk::compute_budget::ComputeBudgetInstruction;
use solana_sdk::hash::Hash;
use solana_sdk::instruction::CompiledInstruction;
use solana_sdk::message::{MessageHeader, v0, VersionedMessage};
use solana_sdk::message::v0::MessageAddressTableLookup;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::reward_type::RewardType;
use solana_sdk::signature::Signature;
use solana_sdk::transaction::TransactionError;
use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock;
#[derive(Default, Debug, Clone)]
pub struct ProducedBlock {
pub transactions: Vec<TransactionInfo>,
// pub leader_id: Option<String>,
pub blockhash: String,
pub block_height: u64,
pub slot: Slot,
pub parent_slot: Slot,
pub block_time: u64,
pub commitment_config: CommitmentConfig,
pub previous_blockhash: String,
// pub rewards: Option<Vec<Reward>>,
}
#[derive(Debug, Clone)]
pub struct TransactionInfo {
pub signature: String,
pub err: Option<TransactionError>,
pub cu_requested: Option<u32>,
pub prioritization_fees: Option<u64>,
pub cu_consumed: Option<u64>,
pub recent_blockhash: String,
pub message: String,
}
pub fn map_produced_block(
block: SubscribeUpdateBlock,
commitment_config: CommitmentConfig,
) -> ProducedBlock {
let txs: Vec<TransactionInfo> = block
.transactions
.into_iter()
.filter_map(|tx| {
let Some(meta) = tx.meta else {
return None;
};
let Some(transaction) = tx.transaction else {
return None;
};
let Some(message) = transaction.message else {
return None;
};
let Some(header) = message.header else {
return None;
};
let signatures = transaction
.signatures
.into_iter()
.filter_map(|sig| match Signature::try_from(sig) {
Ok(sig) => Some(sig),
Err(_) => {
log::warn!(
"Failed to read signature from transaction in block {} - skipping",
block.blockhash
);
None
}
})
.collect_vec();
let err = meta.err.map(|x| {
bincode::deserialize::<TransactionError>(&x.err)
.expect("TransactionError should be deserialized")
});
let signature = signatures[0];
let compute_units_consumed = meta.compute_units_consumed;
let message = VersionedMessage::V0(v0::Message {
header: MessageHeader {
num_required_signatures: header.num_required_signatures as u8,
num_readonly_signed_accounts: header.num_readonly_signed_accounts as u8,
num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as u8,
},
account_keys: message
.account_keys
.into_iter()
.map(|key| {
let bytes: [u8; 32] =
key.try_into().unwrap_or(Pubkey::default().to_bytes());
Pubkey::new_from_array(bytes)
})
.collect(),
recent_blockhash: Hash::new(&message.recent_blockhash),
instructions: message
.instructions
.into_iter()
.map(|ix| CompiledInstruction {
program_id_index: ix.program_id_index as u8,
accounts: ix.accounts,
data: ix.data,
})
.collect(),
address_table_lookups: message
.address_table_lookups
.into_iter()
.map(|table| {
let bytes: [u8; 32] = table
.account_key
.try_into()
.unwrap_or(Pubkey::default().to_bytes());
MessageAddressTableLookup {
account_key: Pubkey::new_from_array(bytes),
writable_indexes: table.writable_indexes,
readonly_indexes: table.readonly_indexes,
}
})
.collect(),
});
let legacy_compute_budget: Option<(u32, Option<u64>)> =
message.instructions().iter().find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
}) = try_from_slice_unchecked(i.data.as_slice())
{
if additional_fee > 0 {
return Some((
units,
Some(((units * 1000) / additional_fee) as u64),
));
} else {
return Some((units, None));
}
}
}
None
});
let legacy_cu_requested = legacy_compute_budget.map(|x| x.0);
let legacy_prioritization_fees = legacy_compute_budget.map(|x| x.1).unwrap_or(None);
let cu_requested = message
.instructions()
.iter()
.find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(limit);
}
}
None
})
.or(legacy_cu_requested);
let prioritization_fees = message
.instructions()
.iter()
.find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(price);
}
}
None
})
.or(legacy_prioritization_fees);
Some(TransactionInfo {
signature: signature.to_string(),
err,
cu_requested,
prioritization_fees,
cu_consumed: compute_units_consumed,
recent_blockhash: message.recent_blockhash().to_string(),
message: base64::engine::general_purpose::STANDARD.encode(message.serialize()),
})
})
.collect();
// removed rewards
ProducedBlock {
transactions: txs,
block_height: block
.block_height
.map(|block_height| block_height.block_height)
.unwrap(),
block_time: block.block_time.map(|time| time.timestamp).unwrap() as u64,
blockhash: block.blockhash,
previous_blockhash: block.parent_blockhash,
commitment_config,
// leader_id,
parent_slot: block.parent_slot,
slot: block.slot,
// rewards,
}
}

View File

@ -1,20 +1,64 @@
mod literpc_core_model;
use std::collections::HashMap;
use std::pin::pin;
use futures::{Stream, StreamExt};
use log::{info};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use tokio::sync::broadcast::{Receiver};
use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeUpdateBlock};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{create_multiplex, GrpcSourceConfig};
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeUpdate, SubscribeUpdateBlock};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use geyser_grpc_connector::grpcmultiplex_fastestwins::{create_multiplex, ExtractBlockFromStream, GrpcSourceConfig};
use crate::literpc_core_model::{map_produced_block, ProducedBlock};
fn start_example_consumer(blocks_notifier: Receiver<Box<SubscribeUpdateBlock>>) {
fn start_example_consumer(mut block_stream: impl Stream<Item=ProducedBlock> + Send + 'static) {
tokio::spawn(async move {
let mut blocks_notifier = blocks_notifier;
loop {
let block = blocks_notifier.recv().await.unwrap();
info!("received block #{} with {} txs", block.slot, block.transactions.len());
let mut block_stream = pin!(block_stream);
while let Some(block) = block_stream.next().await {
info!("received block #{}", block.slot,);
}
});
}
struct ExtractBlock(CommitmentConfig);
impl ExtractBlockFromStream for ExtractBlock {
type Block = ProducedBlock;
fn extract(&self, update: SubscribeUpdate, current_slot: Slot) -> Option<(Slot, Self::Block)> {
match update.update_oneof {
Some(UpdateOneof::Block(update_block_message))
if update_block_message.slot > current_slot =>
{
let block = map_produced_block(update_block_message, self.0);
Some((block.slot, block))
}
_ => None,
}
}
fn get_block_subscription_filter(&self) -> HashMap<String, SubscribeRequestFilterBlocks> {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);
blocks_subs
}
fn get_blockmeta_subscription_filter(
&self,
) -> HashMap<String, SubscribeRequestFilterBlocksMeta> {
HashMap::new()
}
}
#[tokio::main]
pub async fn main() {
// RUST_LOG=info,grpc_using_streams=debug
@ -30,18 +74,17 @@ pub async fn main() {
// testnet - NOTE: this connection has terrible lags (almost 5 minutes)
// let grpc_addr = "http://147.28.169.13:10000".to_string();
let (block_sx, blocks_notifier) = tokio::sync::broadcast::channel(1000);
let green_config = GrpcSourceConfig::new("triton".to_string(), grpc_addr_mainnet_triton, None);
let blue_config = GrpcSourceConfig::new("mangoams81".to_string(), grpc_addr_mainnet_ams81, None);
let toxiproxy_config = GrpcSourceConfig::new("toxiproxy".to_string(), grpc_addr_mainnet_triton_toxi, None);
create_multiplex(
let multiplex_stream = create_multiplex(
vec![green_config, blue_config, toxiproxy_config],
CommitmentConfig::finalized(),
block_sx).await;
ExtractBlock(CommitmentConfig::confirmed()),);
start_example_consumer(blocks_notifier);
start_example_consumer(multiplex_stream);
// "infinite" sleep
sleep(Duration::from_secs(1800)).await;

View File

@ -1,111 +1,96 @@
use std::collections::{HashMap};
use std::ops::{Add};
use anyhow::{Context};
use async_stream::stream;
use futures::{Stream, StreamExt};
use itertools::{Itertools};
use log::{debug, info, warn};
use itertools::Itertools;
use log::{info, warn};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::commitment_config::CommitmentLevel;
use tokio::{select};
use tokio::sync::broadcast::{Sender};
use tokio::time::{Duration, Instant, sleep_until};
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::{SubscribeRequestFilterBlocks, SubscribeUpdate, SubscribeUpdateBlock};
use std::collections::HashMap;
use std::pin::pin;
use tokio::task::JoinHandle;
use tokio::time::{sleep, Duration};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdateBlockMeta;
use yellowstone_grpc_proto::geyser::{
CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeUpdate,
};
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta;
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
use yellowstone_grpc_proto::tonic::Status;
// use solana_lite_rpc_cluster_endpoints::grpc_subscription::{create_block_processing_task, map_produced_block};
// use solana_lite_rpc_core::AnyhowJoinHandle;
// use solana_lite_rpc_core::structures::produced_block::ProducedBlock;
pub trait ExtractBlockFromStream {
type Block;
fn extract(&self, update: SubscribeUpdate, current_slot: Slot) -> Option<(Slot, Self::Block)>;
fn get_block_subscription_filter(&self) -> HashMap<String, SubscribeRequestFilterBlocks>;
fn get_blockmeta_subscription_filter(
&self,
) -> HashMap<String, SubscribeRequestFilterBlocksMeta>;
}
// TODO map SubscribeUpdateBlock
pub async fn create_multiplex(
struct ExtractBlock(CommitmentConfig);
struct ExtractBlockMeta(CommitmentConfig);
pub fn create_multiplex<E>(
grpc_sources: Vec<GrpcSourceConfig>,
commitment_config: CommitmentConfig,
block_sx: Sender<Box<SubscribeUpdateBlock>>,
) -> tokio::task::JoinHandle<anyhow::Result<()>> {
extractor: E,
) -> impl Stream<Item = E::Block>
where
E: ExtractBlockFromStream,
{
assert!(
commitment_config == CommitmentConfig::confirmed()
|| commitment_config == CommitmentConfig::finalized(),
"Only CONFIRMED and FINALIZED is supported");
// note: PROCESSED blocks are not sequential in presense of forks; this will break the logic
if grpc_sources.is_empty() {
if grpc_sources.len() < 1 {
panic!("Must have at least one source");
}
tokio::spawn(async move {
info!("Starting multiplexer with {} sources: {}",
grpc_sources.len(),
grpc_sources.iter().map(|source| source.label.clone()).join(", "));
info!(
"Starting multiplexer with {} sources: {}",
grpc_sources.len(),
grpc_sources
.iter()
.map(|source| source.label.clone())
.join(", ")
);
let mut futures = futures::stream::SelectAll::new();
for grpc_source in grpc_sources {
// note: stream never terminates
let stream = create_geyser_reconnecting_stream(grpc_source.clone(), commitment_config).await;
futures.push(Box::pin(stream));
}
let mut futures = futures::stream::SelectAll::new();
let mut current_slot: Slot = 0;
'main_loop: loop {
let block_cmd = select! {
message = futures.next() => {
match message {
Some(message) => {
map_filter_block_message(current_slot, message, commitment_config)
}
None => {
panic!("source stream is not supposed to terminate");
}
}
}
};
match block_cmd {
BlockCmd::ForwardBlock(block) => {
current_slot = block.slot;
block_sx.send(block).context("send block to downstream")?;
}
BlockCmd::DiscardBlockBehindTip(slot) => {
debug!(". discarding redundant block #{}", slot);
}
BlockCmd::SkipMessage => {
debug!(". skipping this message by type");
}
}
}
})
}
// look Ma, no Clone!
#[derive(Debug)]
enum BlockCmd {
ForwardBlock(Box<SubscribeUpdateBlock>),
DiscardBlockBehindTip(Slot),
// skip geyser messages which are not block related updates
SkipMessage,
}
fn map_filter_block_message(current_slot: Slot, update_message: SubscribeUpdate, _commitment_config: CommitmentConfig) -> BlockCmd {
if let Some(UpdateOneof::Block(update_block_message)) = update_message.update_oneof {
if update_block_message.slot <= current_slot && current_slot != 0 {
// no progress - skip this
return BlockCmd::DiscardBlockBehindTip(update_block_message.slot);
}
// expensive
// let produced_block = map_produced_block(update_block_message, commitment_config);
BlockCmd::ForwardBlock(Box::new(update_block_message))
} else {
BlockCmd::SkipMessage
for grpc_source in grpc_sources {
futures.push(Box::pin(create_geyser_reconnecting_stream(
grpc_source.clone(),
(
extractor.get_block_subscription_filter(),
extractor.get_blockmeta_subscription_filter(),
),
commitment_config,
)));
}
filter_blocks(futures, extractor)
}
fn filter_blocks<S, E>(geyser_stream: S, extractor: E) -> impl Stream<Item = E::Block>
where
S: Stream<Item = Option<SubscribeUpdate>>,
E: ExtractBlockFromStream,
{
let mut current_slot: Slot = 0;
stream! {
for await update in geyser_stream {
if let Some(update) = update {
if let Some((new_slot, block)) = extractor.extract(update, current_slot) {
current_slot = new_slot;
yield block;
}
}
}
}
}
#[derive(Clone, Debug)]
@ -128,11 +113,24 @@ impl GrpcSourceConfig {
}
}
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
NotConnected,
Connecting(JoinHandle<GeyserGrpcClientResult<S>>),
Ready(S),
WaitReconnect,
}
// TODO use GrpcSource
// note: stream never terminates
async fn create_geyser_reconnecting_stream(
fn create_geyser_reconnecting_stream(
grpc_source: GrpcSourceConfig,
commitment_config: CommitmentConfig) -> impl Stream<Item = SubscribeUpdate> {
blocks_filters: (
HashMap<String, SubscribeRequestFilterBlocks>,
HashMap<String, SubscribeRequestFilterBlocksMeta>,
),
commitment_config: CommitmentConfig,
) -> impl Stream<Item = Option<SubscribeUpdate>> {
let label = grpc_source.label.clone();
// solana_sdk -> yellowstone
let commitment_level = match commitment_config.commitment {
@ -141,75 +139,95 @@ async fn create_geyser_reconnecting_stream(
_ => panic!("Only CONFIRMED and FINALIZED is supported/suggested"),
};
let label = grpc_source.label.clone();
// NOT_CONNECTED; CONNECTING
let mut state = ConnectionState::NotConnected;
// in case of cancellation, we restart from here:
// thus we want to keep the progression in a state object outside the stream! makro
stream! {
let mut throttle_barrier = Instant::now();
'reconnect_loop: loop {
sleep_until(throttle_barrier).await;
throttle_barrier = Instant::now().add(Duration::from_millis(1000));
loop{
let yield_value;
(state, yield_value) = match state {
ConnectionState::NotConnected => {
let connect_result = GeyserGrpcClient::connect_with_timeout(
grpc_source.grpc_addr.clone(), grpc_source.grpc_x_token.clone(), grpc_source.tls_config.clone(),
Some(Duration::from_secs(2)), Some(Duration::from_secs(2)), false).await;
let connection_task = tokio::spawn({
let addr = grpc_source.grpc_addr.clone();
let token = grpc_source.grpc_x_token.clone();
let config = grpc_source.tls_config.clone();
let (block_filter, blockmeta_filter) = blocks_filters.clone();
async move {
let mut client = match connect_result {
Ok(connected_client) => connected_client,
Err(geyser_grpc_client_error) => {
// TODO identify non-recoverable errors and cancel stream
warn!("Connect failed on {} - retrying: {:?}", label, geyser_grpc_client_error);
continue 'reconnect_loop;
let connect_result = GeyserGrpcClient::connect_with_timeout(
addr, token, config,
Some(Duration::from_secs(2)), Some(Duration::from_secs(2)), false).await;
let mut client = connect_result?;
// Connected;
let subscribe_result = client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
block_filter,
blockmeta_filter,
Some(commitment_level),
Default::default(),
None,
).await;
subscribe_result
}
});
(ConnectionState::Connecting(connection_task), None)
}
};
ConnectionState::Connecting(connection_task) => {
let subscribe_result = connection_task.await;
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);
let subscribe_result = client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
blocks_subs,
Default::default(),
Some(commitment_level),
Default::default(),
None,
).await;
let geyser_stream = match subscribe_result {
Ok(subscribed_stream) => subscribed_stream,
Err(geyser_grpc_client_error) => {
// TODO identify non-recoverable errors and cancel stream
warn!("Subscribe failed on {} - retrying: {:?}", label, geyser_grpc_client_error);
continue 'reconnect_loop;
}
};
for await update_message in geyser_stream {
match update_message {
Ok(update_message) => {
info!(">message on {}", label);
yield update_message;
match subscribe_result {
Ok(Ok(subscribed_stream)) => (ConnectionState::Ready(subscribed_stream), None),
Ok(Err(geyser_error)) => {
// TODO identify non-recoverable errors and cancel stream
warn!("Subscribe failed on {} - retrying: {:?}", label, geyser_error);
(ConnectionState::WaitReconnect, None)
},
Err(geyser_grpc_task_error) => {
panic!("Task aborted - should not happen :{geyser_grpc_task_error}");
}
}
Err(tonic_status) => {
// TODO identify non-recoverable errors and cancel stream
warn!("Receive error on {} - retrying: {:?}", label, tonic_status);
continue 'reconnect_loop;
}
}
} // -- production loop
warn!("stream consumer loop {} terminated", label);
} // -- main loop
}
ConnectionState::Ready(mut geyser_stream) => {
//for await update_message in geyser_stream {
match geyser_stream.next().await {
Some(Ok(update_message)) => {
info!(">message on {}", label);
(ConnectionState::Ready(geyser_stream), Some(update_message))
}
Some(Err(tonic_status)) => {
// TODO identify non-recoverable errors and cancel stream
warn!("Receive error on {} - retrying: {:?}", label, tonic_status);
(ConnectionState::WaitReconnect, None)
}
None => {
//TODO should not arrive. Mean the stream close.
warn!("Geyzer stream close on {} - retrying", label);
(ConnectionState::WaitReconnect, None)
}
}
//} // -- production loop
}
ConnectionState::WaitReconnect => {
// TODO implement backoff
sleep(Duration::from_secs(1)).await;
(ConnectionState::NotConnected, None)
}
}; // -- match
yield yield_value
}
} // -- stream!
}