Merge pull request #8 from blockworks-foundation/feature/channel-based-autoconnect

Feature/channel based autoconnect
This commit is contained in:
Groovie | Mango 2024-02-01 09:38:41 +01:00 committed by GitHub
commit 6c26802864
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 1019 additions and 350 deletions

191
Cargo.lock generated
View File

@ -76,9 +76,9 @@ dependencies = [
[[package]]
name = "ahash"
version = "0.8.6"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91429305e9f0a25f6205c5b8e0d2db09e0708a7a6df0f42212bb56c32c8ac97a"
checksum = "72832d73be48bac96a5d7944568f305d829ed55b0ce3b483647089dfaf6cf704"
dependencies = [
"cfg-if",
"getrandom 0.2.11",
@ -249,12 +249,6 @@ dependencies = [
"rand 0.8.5",
]
[[package]]
name = "array-bytes"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ad284aeb45c13f2fb4f084de4a420ebf447423bdf9386c0540ce33cb3ef4b8c"
[[package]]
name = "arrayref"
version = "0.3.7"
@ -444,6 +438,9 @@ name = "bitflags"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07"
dependencies = [
"serde",
]
[[package]]
name = "bitmaps"
@ -1239,7 +1236,7 @@ dependencies = [
[[package]]
name = "geyser-grpc-connector"
version = "0.7.2+yellowstone.1.11"
version = "0.10.1+yellowstone.1.12"
dependencies = [
"anyhow",
"async-stream",
@ -1250,6 +1247,7 @@ dependencies = [
"itertools 0.10.5",
"log",
"merge-streams",
"solana-logger",
"solana-sdk",
"tokio",
"tracing",
@ -1325,7 +1323,7 @@ version = "0.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e"
dependencies = [
"ahash 0.8.6",
"ahash 0.8.4",
]
[[package]]
@ -1676,6 +1674,18 @@ dependencies = [
"libsecp256k1-core",
]
[[package]]
name = "light-poseidon"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c9a85a9752c549ceb7578064b4ed891179d20acd85f27318573b64d2d7ee7ee"
dependencies = [
"ark-bn254",
"ark-ff",
"num-bigint 0.4.4",
"thiserror",
]
[[package]]
name = "linux-raw-sys"
version = "0.4.12"
@ -2241,6 +2251,17 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "qualifier_attr"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e2e25ee72f5b24d773cae88422baddefff7714f97aab68d96fe2b6fc4a28fb2"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.41",
]
[[package]]
name = "quote"
version = "1.0.33"
@ -2779,9 +2800,9 @@ dependencies = [
[[package]]
name = "solana-account-decoder"
version = "1.16.17"
version = "1.17.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "121e55656c2094950f374247e1303dd09517f1ed49c91bf60bf114760b286eb4"
checksum = "22ea4bedfcc8686ae6d01a3d8288f5b9746cd00ec63f0ce9a6415849d35add50"
dependencies = [
"Inflector",
"base64 0.21.5",
@ -2792,42 +2813,21 @@ dependencies = [
"serde",
"serde_derive",
"serde_json",
"solana-address-lookup-table-program",
"solana-config-program",
"solana-sdk",
"spl-token",
"spl-token-2022",
"spl-token-group-interface",
"spl-token-metadata-interface",
"thiserror",
"zstd",
]
[[package]]
name = "solana-address-lookup-table-program"
version = "1.16.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ccb31f7f14d5876acd9ec38f5bf6097bfb4b350141d81c7ff2bf684db3ca815"
dependencies = [
"bincode",
"bytemuck",
"log",
"num-derive 0.3.3",
"num-traits",
"rustc_version",
"serde",
"solana-frozen-abi",
"solana-frozen-abi-macro",
"solana-program",
"solana-program-runtime",
"solana-sdk",
"thiserror",
]
[[package]]
name = "solana-config-program"
version = "1.16.17"
version = "1.17.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94dc0f4463daf1c6155f20eac948ea4ced705e5f5520546aef4e11e746a6d95d"
checksum = "8de23cd0dd8673f4590e90bfa47ff19eb629f4b7dc15a3fb173a62d932801d07"
dependencies = [
"bincode",
"chrono",
@ -2839,11 +2839,11 @@ dependencies = [
[[package]]
name = "solana-frozen-abi"
version = "1.16.17"
version = "1.17.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d266bf0311bb403d31206aa2904b8741f57c7f5e27580b6810ad5e22fc7c3282"
checksum = "4090f2ac64149ce1fbabd5277f41e278edc1f38121927fe8f6355e67ead3e199"
dependencies = [
"ahash 0.8.6",
"ahash 0.8.4",
"blake3",
"block-buffer 0.10.4",
"bs58",
@ -2852,13 +2852,10 @@ dependencies = [
"cc",
"either",
"generic-array",
"getrandom 0.1.16",
"im",
"lazy_static",
"log",
"memmap2",
"once_cell",
"rand_core 0.6.4",
"rustc_version",
"serde",
"serde_bytes",
@ -2872,9 +2869,9 @@ dependencies = [
[[package]]
name = "solana-frozen-abi-macro"
version = "1.16.17"
version = "1.17.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6dfe18c5155015dcb494c6de84a03b725fcf90ec2006a047769018b94c2cf0de"
checksum = "765bcdc1ecc31ea5d3d7ddb680ffa6645809c122b4ffdc223b161850e6ba352b"
dependencies = [
"proc-macro2",
"quote",
@ -2884,9 +2881,9 @@ dependencies = [
[[package]]
name = "solana-logger"
version = "1.16.17"
version = "1.17.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f76fe25c2d06dcf621befd1e8d5655143e8a059c7e20fcb71736bc80ed779d6"
checksum = "9c7f3cad088bc5f00569cb5b4c3aaba8d935f8f7cc25c91cc0c55a8a7de2b137"
dependencies = [
"env_logger",
"lazy_static",
@ -2895,9 +2892,9 @@ dependencies = [
[[package]]
name = "solana-measure"
version = "1.16.17"
version = "1.17.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db165b8a7f5d840abef011c78a18ffe63cad9192d676b07d94f469b6b5dc6cf6"
checksum = "2de5041d16120852c0deea047c024e1fad8819e49041491f0cca6c91c243fd5d"
dependencies = [
"log",
"solana-sdk",
@ -2905,9 +2902,9 @@ dependencies = [
[[package]]
name = "solana-metrics"
version = "1.16.17"
version = "1.17.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa01731bb3952904962d49a1ea1205db54e93f3a56f4006d32e02a7c85d60546"
checksum = "2fd6f25f0076b6eb873f7e2a85e53191ac2affe6782131be1a2867d057307e20"
dependencies = [
"crossbeam-channel",
"gethostname",
@ -2915,22 +2912,22 @@ dependencies = [
"log",
"reqwest",
"solana-sdk",
"thiserror",
]
[[package]]
name = "solana-program"
version = "1.16.17"
version = "1.17.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bb16998986492de307eef503ce47e84503d35baa92dc60832b22476948b1c16"
checksum = "c1141d1dffbe68852128f7bbcc3c43a5d2cb715ecffeeb64eb81bb93cbaf80bb"
dependencies = [
"ark-bn254",
"ark-ec",
"ark-ff",
"ark-serialize",
"array-bytes",
"base64 0.21.5",
"bincode",
"bitflags 1.3.2",
"bitflags 2.4.1",
"blake3",
"borsh 0.10.3",
"borsh 0.9.3",
@ -2947,14 +2944,14 @@ dependencies = [
"lazy_static",
"libc",
"libsecp256k1",
"light-poseidon",
"log",
"memoffset",
"num-bigint 0.4.4",
"num-derive 0.3.3",
"num-traits",
"parking_lot",
"rand 0.7.3",
"rand_chacha 0.2.2",
"rand 0.8.5",
"rustc_version",
"rustversion",
"serde",
@ -2974,9 +2971,9 @@ dependencies = [
[[package]]
name = "solana-program-runtime"
version = "1.16.17"
version = "1.17.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "036d6ecf67a3a7c6dc74d4f7fa6ab321e7ce8feccb7c9dff8384a41d0a12345b"
checksum = "942de577a2865cec28fc174575c9bd6cf7af815832af67fe40ca856075550998"
dependencies = [
"base64 0.21.5",
"bincode",
@ -2988,7 +2985,7 @@ dependencies = [
"num-derive 0.3.3",
"num-traits",
"percentage",
"rand 0.7.3",
"rand 0.8.5",
"rustc_version",
"serde",
"solana-frozen-abi",
@ -3002,14 +2999,14 @@ dependencies = [
[[package]]
name = "solana-sdk"
version = "1.16.17"
version = "1.17.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4106cda3d10833ba957dbd25fb841b50aeca7480ccf8f54859294716f54bcd4b"
checksum = "278a95acb99377dd4585599fdbec23d0a6fcb94ec78285283723fdd365fe885e"
dependencies = [
"assert_matches",
"base64 0.21.5",
"bincode",
"bitflags 1.3.2",
"bitflags 2.4.1",
"borsh 0.10.3",
"bs58",
"bytemuck",
@ -3032,8 +3029,9 @@ dependencies = [
"num_enum 0.6.1",
"pbkdf2 0.11.0",
"qstring",
"qualifier_attr",
"rand 0.7.3",
"rand_chacha 0.2.2",
"rand 0.8.5",
"rustc_version",
"rustversion",
"serde",
@ -3055,9 +3053,9 @@ dependencies = [
[[package]]
name = "solana-sdk-macro"
version = "1.16.17"
version = "1.17.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e560806a3859717eb2220b26e2cd68bb757b63affa3e79c3f1d8d853b5ee78f"
checksum = "92dbaf563210f61828800f2a3d8c188fa2afede91920d364982e280318db2eb5"
dependencies = [
"bs58",
"proc-macro2",
@ -3067,10 +3065,16 @@ dependencies = [
]
[[package]]
name = "solana-transaction-status"
version = "1.16.17"
name = "solana-security-txt"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "236dd4e43b8a7402bce250228e04c0c68d9493a3e19c71b377ccc7c4390fd969"
checksum = "468aa43b7edb1f9b7b7b686d5c3aeb6630dc1708e86e31343499dd5c4d775183"
[[package]]
name = "solana-transaction-status"
version = "1.17.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e2031070cba17802f7108b53f6db01b82cdfb0360b0a8b9d51c584f2e9dd9e4"
dependencies = [
"Inflector",
"base64 0.21.5",
@ -3083,7 +3087,6 @@ dependencies = [
"serde_derive",
"serde_json",
"solana-account-decoder",
"solana-address-lookup-table-program",
"solana-sdk",
"spl-associated-token-account",
"spl-memo",
@ -3094,9 +3097,9 @@ dependencies = [
[[package]]
name = "solana-zk-token-sdk"
version = "1.16.17"
version = "1.17.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "278c08e13bc04b6940997602909052524a375154b00cf0bfa934359a3bb7e6f0"
checksum = "ef26fb44734aa940e6648bbbeead677edc68c7e1ec09128e5f16a8924c389a38"
dependencies = [
"aes-gcm-siv",
"base64 0.21.5",
@ -3123,9 +3126,9 @@ dependencies = [
[[package]]
name = "solana_rbpf"
version = "0.6.1"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17d4ba1e58947346e360fabde0697029d36ba83c42f669199b16a8931313cf29"
checksum = "3d457cc2ba742c120492a64b7fa60e22c575e891f6b55039f4d736568fb112a3"
dependencies = [
"byteorder",
"combine",
@ -3148,9 +3151,9 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
[[package]]
name = "spl-associated-token-account"
version = "2.2.0"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "385e31c29981488f2820b2022d8e731aae3b02e6e18e2fd854e4c9a94dc44fc3"
checksum = "992d9c64c2564cc8f63a4b508bf3ebcdf2254b0429b13cd1d31adb6162432a5f"
dependencies = [
"assert_matches",
"borsh 0.10.3",
@ -3246,9 +3249,9 @@ dependencies = [
[[package]]
name = "spl-tlv-account-resolution"
version = "0.4.0"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "062e148d3eab7b165582757453632ffeef490c02c86a48bfdb4988f63eefb3b9"
checksum = "615d381f48ddd2bb3c57c7f7fb207591a2a05054639b18a62e785117dd7a8683"
dependencies = [
"bytemuck",
"solana-program",
@ -3275,9 +3278,9 @@ dependencies = [
[[package]]
name = "spl-token-2022"
version = "0.9.0"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4abf34a65ba420584a0c35f3903f8d727d1f13ababbdc3f714c6b065a686e86"
checksum = "d697fac19fd74ff472dfcc13f0b442dd71403178ce1de7b5d16f83a33561c059"
dependencies = [
"arrayref",
"bytemuck",
@ -3285,16 +3288,31 @@ dependencies = [
"num-traits",
"num_enum 0.7.1",
"solana-program",
"solana-security-txt",
"solana-zk-token-sdk",
"spl-memo",
"spl-pod",
"spl-token",
"spl-token-group-interface",
"spl-token-metadata-interface",
"spl-transfer-hook-interface",
"spl-type-length-value",
"thiserror",
]
[[package]]
name = "spl-token-group-interface"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b889509d49fa74a4a033ca5dae6c2307e9e918122d97e58562f5c4ffa795c75d"
dependencies = [
"bytemuck",
"solana-program",
"spl-discriminator",
"spl-pod",
"spl-program-error",
]
[[package]]
name = "spl-token-metadata-interface"
version = "0.2.0"
@ -3311,9 +3329,9 @@ dependencies = [
[[package]]
name = "spl-transfer-hook-interface"
version = "0.3.0"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "051d31803f873cabe71aec3c1b849f35248beae5d19a347d93a5c9cccc5d5a9b"
checksum = "7aabdb7c471566f6ddcee724beb8618449ea24b399e58d464d6b5bc7db550259"
dependencies = [
"arrayref",
"bytemuck",
@ -3606,7 +3624,6 @@ dependencies = [
"axum",
"base64 0.21.5",
"bytes",
"flate2",
"h2",
"http",
"http-body",
@ -4147,9 +4164,8 @@ dependencies = [
[[package]]
name = "yellowstone-grpc-client"
version = "1.12.0+solana.1.16.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e58204f372a7e82d15d72bdf99334029c4e9cdc15bd2e9a5c33b598d9f1eb8b6"
version = "1.13.0+solana.1.17.15"
source = "git+https://github.com/rpcpool/yellowstone-grpc.git?tag=v1.12.0+solana.1.17.15#c7b72cc8781c2dc48e4a7c94e411f95df495cf2f"
dependencies = [
"bytes",
"futures",
@ -4162,9 +4178,8 @@ dependencies = [
[[package]]
name = "yellowstone-grpc-proto"
version = "1.11.0+solana.1.16.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00d751c6ef3093ec90ab1e16c6a504b5bea99aca6c688c429fed4cc56782f57e"
version = "1.12.0+solana.1.17.15"
source = "git+https://github.com/rpcpool/yellowstone-grpc.git?tag=v1.12.0+solana.1.17.15#c7b72cc8781c2dc48e4a7c94e411f95df495cf2f"
dependencies = [
"anyhow",
"bincode",

View File

@ -1,6 +1,6 @@
[package]
name = "geyser-grpc-connector"
version = "0.7.2+yellowstone.1.11"
version = "0.10.1+yellowstone.1.12"
edition = "2021"
description = "Multiplexing and Reconnection on Yellowstone gRPC Geyser client streaming"
@ -9,13 +9,12 @@ authors = ["GroovieGermanikus <groovie@mango.markets>"]
repository = "https://github.com/blockworks-foundation/geyser-grpc-connector"
[dependencies]
# v1.11.0+solana.1.16.17
yellowstone-grpc-proto = "1.11.0"
# v1.12.0+solana.1.16.17
yellowstone-grpc-client = "1.12.0"
yellowstone-grpc-client = { version = "1.13.0+solana.1.17.15", git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" }
yellowstone-grpc-proto = { version = "1.12.0+solana.1.17.15", git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" }
# required for CommitmentConfig
solana-sdk = "~1.16.17"
solana-sdk = "~1.17.15"
url = "2.5.0"
async-stream = "0.3.5"
@ -33,3 +32,4 @@ bincode = "1.3.3"
[dev-dependencies]
tracing-subscriber = "0.3.16"
solana-logger = "1"

View File

@ -0,0 +1,138 @@
use futures::StreamExt;
use log::info;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::env;
use geyser_grpc_connector::channel_plugger::spawn_broadcast_channel_plug;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task;
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use tokio::time::{sleep, Duration};
use tracing::warn;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
use yellowstone_grpc_proto::prost::Message as _;
pub struct BlockMini {
pub blocksize: usize,
pub slot: Slot,
pub commitment_config: CommitmentConfig,
}
struct BlockMiniExtractor(CommitmentConfig);
impl FromYellowstoneExtractor for BlockMiniExtractor {
type Target = BlockMini;
fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)> {
match update.update_oneof {
Some(UpdateOneof::Block(update_block_message)) => {
let blocksize = update_block_message.encoded_len();
let slot = update_block_message.slot;
let mini = BlockMini {
blocksize,
slot,
commitment_config: self.0,
};
Some((slot, mini))
}
Some(UpdateOneof::BlockMeta(update_blockmeta_message)) => {
let blocksize = update_blockmeta_message.encoded_len();
let slot = update_blockmeta_message.slot;
let mini = BlockMini {
blocksize,
slot,
commitment_config: self.0,
};
Some((slot, mini))
}
_ => None,
}
}
}
#[allow(dead_code)]
enum TestCases {
Basic,
SlowReceiverStartup,
TemporaryLaggingReceiver,
CloseAfterReceiving,
AbortTaskFromOutside,
}
const TEST_CASE: TestCases = TestCases::Basic;
#[tokio::main]
pub async fn main() {
// RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace
tracing_subscriber::fmt::init();
// console_subscriber::init();
let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green");
let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok();
info!(
"Using grpc source on {} ({})",
grpc_addr_green,
grpc_x_token_green.is_some()
);
let timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};
let green_config =
GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
info!("Write Block stream..");
let (jh_geyser_task, message_channel) = create_geyser_autoconnection_task(
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
);
let mut message_channel =
spawn_broadcast_channel_plug(tokio::sync::broadcast::channel(8), message_channel);
tokio::spawn(async move {
if let TestCases::SlowReceiverStartup = TEST_CASE {
sleep(Duration::from_secs(5)).await;
}
let mut message_count = 0;
while let Ok(message) = message_channel.recv().await {
if let TestCases::AbortTaskFromOutside = TEST_CASE {
if message_count > 5 {
info!("(testcase) aborting task from outside");
jh_geyser_task.abort();
}
}
match message {
Message::GeyserSubscribeUpdate(subscriber_update) => {
message_count += 1;
info!("got update - {} bytes", subscriber_update.encoded_len());
if let TestCases::CloseAfterReceiving = TEST_CASE {
info!("(testcase) closing stream after receiving");
return;
}
}
Message::Connecting(attempt) => {
warn!("Connection attempt: {}", attempt);
}
}
if let TestCases::TemporaryLaggingReceiver = TEST_CASE {
if message_count % 3 == 1 {
info!("(testcase) lagging a bit");
sleep(Duration::from_millis(1500)).await;
}
}
}
warn!("Stream aborted");
});
// "infinite" sleep
sleep(Duration::from_secs(2000)).await;
}

View File

@ -21,12 +21,11 @@ use solana_sdk::signature::Signature;
use solana_sdk::transaction::TransactionError;
use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock;
use geyser_grpc_connector::grpc_subscription_autoreconnect::{
create_geyser_reconnecting_stream, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig,
};
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig};
use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
@ -130,6 +129,7 @@ pub async fn main() {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};
let green_config =

View File

@ -5,16 +5,14 @@ use solana_sdk::commitment_config::CommitmentConfig;
use std::env;
use std::pin::pin;
use geyser_grpc_connector::grpc_subscription_autoreconnect::{
create_geyser_reconnecting_stream, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig,
};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use tokio::time::{sleep, Duration};
use tracing::warn;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
use yellowstone_grpc_proto::prost::Message;
use yellowstone_grpc_proto::prost::Message as _;
fn start_example_blockmini_consumer(
multiplex_stream: impl Stream<Item = BlockMini> + Send + 'static,
@ -86,23 +84,69 @@ pub async fn main() {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};
let green_config =
GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
let config = GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
info!("Write Block stream..");
let green_stream = create_geyser_reconnecting_stream(
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
// GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
config.clone(),
GeyserFilter(CommitmentConfig::finalized()).blocks_and_txs(),
);
let multiplex_stream = create_multiplexed_stream(
vec![green_stream],
BlockMiniExtractor(CommitmentConfig::confirmed()),
let blue_stream = create_geyser_reconnecting_stream(
config.clone(),
GeyserFilter(CommitmentConfig::processed()).blocks_and_txs(),
);
start_example_blockmini_consumer(multiplex_stream);
tokio::spawn(async move {
let mut green_stream = pin!(green_stream);
while let Some(message) = green_stream.next().await {
match message {
Message::GeyserSubscribeUpdate(subscriber_update) => {
let mapped = map_block_update(*subscriber_update);
if let Some(slot) = mapped {
info!("got update (green)!!! slot: {}", slot);
}
}
Message::Connecting(attempt) => {
warn!("Connection attempt: {}", attempt);
}
}
}
warn!("Stream aborted");
});
tokio::spawn(async move {
let mut blue_stream = pin!(blue_stream);
while let Some(message) = blue_stream.next().await {
match message {
Message::GeyserSubscribeUpdate(subscriber_update) => {
let mapped = map_block_update(*subscriber_update);
if let Some(slot) = mapped {
info!("got update (blue)!!! slot: {}", slot);
}
}
Message::Connecting(attempt) => {
warn!("Connection attempt: {}", attempt);
}
}
}
warn!("Stream aborted");
});
// "infinite" sleep
sleep(Duration::from_secs(1800)).await;
}
fn map_block_update(update: SubscribeUpdate) -> Option<Slot> {
match update.update_oneof {
Some(UpdateOneof::Block(update_block_message)) => {
let slot = update_block_message.slot;
Some(slot)
}
_ => None,
}
}

View File

@ -1,2 +1,2 @@
[toolchain]
channel = "1.70"
channel = "1.73.0"

171
src/channel_plugger.rs Normal file
View File

@ -0,0 +1,171 @@
use log::{debug, info, warn};
use std::time::Duration;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc::error::SendTimeoutError;
/// usage: see plug_pattern test
pub fn spawn_broadcast_channel_plug<T: Send + 'static>(
downstream_broadcast: (
tokio::sync::broadcast::Sender<T>,
tokio::sync::broadcast::Receiver<T>,
),
upstream: tokio::sync::mpsc::Receiver<T>,
) -> tokio::sync::broadcast::Receiver<T> {
spawn_plugger_mpcs_to_broadcast(upstream, downstream_broadcast.0);
downstream_broadcast.1
}
/// note: backpressure will NOT get propagated to upstream
pub fn spawn_plugger_mpcs_to_broadcast<T: Send + 'static>(
mut upstream: tokio::sync::mpsc::Receiver<T>,
downstream: tokio::sync::broadcast::Sender<T>,
// TODO allow multiple downstreams + fanout
) {
// abort forwarder by closing the sender
let _private_handler = tokio::spawn(async move {
while let Some(value) = upstream.recv().await {
match downstream.send(value) {
Ok(n_subscribers) => {
debug!("forwarded to {} subscribers", n_subscribers);
}
Err(_dropped_msg) => {
// decide to continue if no subscribers
debug!("no subscribers - dropping payload and continue");
}
}
}
debug!("no more messages from producer - shutting down connector");
});
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::{sleep, timeout};
#[tokio::test]
async fn plug_pattern() {
let (_jh_task, message_channel) = tokio::sync::mpsc::channel::<u32>(1);
let _broadcast_rx =
spawn_broadcast_channel_plug(tokio::sync::broadcast::channel(8), message_channel);
}
#[tokio::test]
async fn connect_broadcast_to_mpsc() {
solana_logger::setup_with_default("debug");
let (tx1, rx1) = tokio::sync::mpsc::channel::<u64>(1);
let (tx2, rx2) = tokio::sync::broadcast::channel::<u64>(2);
drop(rx2);
let jh_producer = tokio::spawn(async move {
for i in 1..=10 {
info!("producer sending {}", i);
if let Err(SendTimeoutError::Timeout(message)) =
tx1.send_timeout(i, Duration::from_millis(200)).await
{
info!("producer send was blocked");
tx1.send(message).await.unwrap();
}
sleep(Duration::from_millis(500)).await;
}
});
// downstream receiver A connected to broadcast
let mut channel_a = tx2.subscribe();
tokio::spawn(async move {
loop {
match channel_a.recv().await {
Ok(msg) => {
info!("A: {:?} (len={})", msg, channel_a.len());
}
Err(RecvError::Lagged(n_missed)) => {
warn!("channel A lagged {} messages", n_missed);
}
Err(RecvError::Closed) => {
info!("channel A closed (by forwarder)");
break;
}
}
}
});
// downstream receiver B connected to broadcast
let mut channel_b = tx2.subscribe();
tokio::spawn(async move {
loop {
match channel_b.recv().await {
Ok(msg) => {
info!("B: {:?} (len={})", msg, channel_b.len());
// slow receiver
sleep(Duration::from_millis(1000)).await;
}
Err(RecvError::Lagged(n_missed)) => {
warn!("channel B lagged {} messages", n_missed);
}
Err(RecvError::Closed) => {
info!("channel B closed (by forwarder)");
break;
}
}
}
});
// connect them
spawn_plugger_mpcs_to_broadcast(rx1, tx2);
// wait forever
info!("Started tasks .. waiting for producer to finish");
// should take 5 secs
assert!(
timeout(Duration::from_secs(10), jh_producer).await.is_ok(),
"timeout"
);
info!("producer done - wait a bit longer ...");
sleep(Duration::from_secs(3)).await;
info!("done.");
// note how messages pile up for slow receiver B
}
#[tokio::test]
async fn connect_broadcast_to_mpsc_nosubscribers() {
solana_logger::setup_with_default("debug");
let (tx1, rx1) = tokio::sync::mpsc::channel::<u64>(1);
let (tx2, rx2) = tokio::sync::broadcast::channel::<u64>(2);
let jh_producer = tokio::spawn(async move {
for i in 1..=10 {
info!("producer sending {}", i);
if let Err(SendTimeoutError::Timeout(message)) =
tx1.send_timeout(i, Duration::from_millis(200)).await
{
info!("producer send was blocked");
tx1.send(message).await.unwrap();
}
sleep(Duration::from_millis(500)).await;
}
});
// connect them
spawn_plugger_mpcs_to_broadcast(rx1, tx2);
sleep(Duration::from_secs(3)).await;
info!("dropping subscriber");
drop(rx2);
// wait forever
info!("Started tasks .. waiting for producer to finish");
// should take 5 secs
assert!(
timeout(Duration::from_secs(10), jh_producer).await.is_ok(),
"timeout"
);
info!("producer done - wait a bit longer ...");
sleep(Duration::from_secs(3)).await;
info!("done.");
// note how messages pile up for slow receiver B
}
}

View File

@ -1,82 +0,0 @@
// use crate::{
// endpoint_stremers::EndpointStreaming,
// rpc_polling::vote_accounts_and_cluster_info_polling::poll_vote_accounts_and_cluster_info,
// };
use anyhow::{bail, Context};
use futures::StreamExt;
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::HashMap;
use tokio::sync::broadcast::Sender;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::prelude::{
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks,
SubscribeUpdateBlock,
};
pub fn create_block_processing_task(
grpc_addr: String,
grpc_x_token: Option<String>,
block_sx: Sender<SubscribeUpdateBlock>,
commitment_level: CommitmentLevel,
) -> tokio::task::JoinHandle<anyhow::Result<()>> {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);
let _commitment_config = match commitment_level {
CommitmentLevel::Confirmed => CommitmentConfig::confirmed(),
CommitmentLevel::Finalized => CommitmentConfig::finalized(),
CommitmentLevel::Processed => CommitmentConfig::processed(),
};
tokio::spawn(async move {
// connect to grpc
let mut client = GeyserGrpcClient::connect(grpc_addr, grpc_x_token, None)?;
let mut stream = client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
blocks_subs,
Default::default(),
Some(commitment_level),
Default::default(),
None,
)
.await?;
while let Some(message) = stream.next().await {
let message = message?;
let Some(update) = message.update_oneof else {
continue;
};
match update {
UpdateOneof::Block(block) => {
// let block = map_produced_block(block, commitment_config);
block_sx
.send(block)
.context("Grpc failed to send a block")?;
}
UpdateOneof::Ping(_) => {
log::trace!("GRPC Ping");
}
u => {
bail!("Unexpected update: {u:?}");
}
};
}
bail!("geyser slot stream ended");
})
}

View File

@ -1,85 +1,14 @@
use crate::{Attempt, GrpcSourceConfig, Message};
use async_stream::stream;
use futures::{Stream, StreamExt};
use log::{debug, info, log, trace, warn, Level};
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult};
use yellowstone_grpc_proto::geyser::{
CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeUpdate,
};
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta;
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
use yellowstone_grpc_proto::tonic::Status;
#[derive(Clone, Debug)]
pub struct GrpcConnectionTimeouts {
pub connect_timeout: Duration,
pub request_timeout: Duration,
pub subscribe_timeout: Duration,
}
#[derive(Clone)]
pub struct GrpcSourceConfig {
pub grpc_addr: String,
pub grpc_x_token: Option<String>,
tls_config: Option<ClientTlsConfig>,
timeouts: Option<GrpcConnectionTimeouts>,
}
impl Display for GrpcSourceConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"grpc_addr {}",
crate::obfuscate::url_obfuscate_api_token(&self.grpc_addr)
)
}
}
impl Debug for GrpcSourceConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self, f)
}
}
impl GrpcSourceConfig {
/// Create a grpc source without tls and timeouts
pub fn new_simple(grpc_addr: String) -> Self {
Self {
grpc_addr,
grpc_x_token: None,
tls_config: None,
timeouts: None,
}
}
pub fn new(
grpc_addr: String,
grpc_x_token: Option<String>,
tls_config: Option<ClientTlsConfig>,
timeouts: GrpcConnectionTimeouts,
) -> Self {
Self {
grpc_addr,
grpc_x_token,
tls_config,
timeouts: Some(timeouts),
}
}
}
type Attempt = u32;
// wraps payload and status messages
pub enum Message {
GeyserSubscribeUpdate(Box<SubscribeUpdate>),
// connect (attempt=1) or reconnect(attempt=2..)
Connecting(Attempt),
}
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
NotConnected(Attempt),
Connecting(Attempt, JoinHandle<GeyserGrpcClientResult<S>>),
@ -87,74 +16,6 @@ enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
WaitReconnect(Attempt),
}
#[derive(Clone)]
pub struct GeyserFilter(pub CommitmentConfig);
impl GeyserFilter {
pub fn blocks_and_txs(&self) -> SubscribeRequest {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);
SubscribeRequest {
slots: HashMap::new(),
accounts: Default::default(),
transactions: HashMap::new(),
entry: Default::default(),
blocks: blocks_subs,
blocks_meta: HashMap::new(),
commitment: Some(map_commitment_level(self.0) as i32),
accounts_data_slice: Default::default(),
ping: None,
}
}
pub fn blocks_meta(&self) -> SubscribeRequest {
let mut blocksmeta_subs = HashMap::new();
blocksmeta_subs.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {});
SubscribeRequest {
slots: HashMap::new(),
accounts: Default::default(),
transactions: HashMap::new(),
entry: Default::default(),
blocks: HashMap::new(),
blocks_meta: blocksmeta_subs,
commitment: Some(map_commitment_level(self.0) as i32),
accounts_data_slice: Default::default(),
ping: None,
}
}
}
fn map_commitment_level(commitment_config: CommitmentConfig) -> CommitmentLevel {
// solana_sdk -> yellowstone
match commitment_config.commitment {
solana_sdk::commitment_config::CommitmentLevel::Processed => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Processed
}
solana_sdk::commitment_config::CommitmentLevel::Confirmed => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Confirmed
}
solana_sdk::commitment_config::CommitmentLevel::Finalized => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Finalized
}
_ => {
panic!(
"unsupported commitment level {}",
commitment_config.commitment
)
}
}
}
// Take geyser filter, connect to Geyser and return a generic stream of SubscribeUpdate
// note: stream never terminates
pub fn create_geyser_reconnecting_stream(
@ -216,40 +77,45 @@ pub fn create_geyser_reconnecting_stream(
Ok(Ok(subscribed_stream)) => (ConnectionState::Ready(attempt, subscribed_stream), Message::Connecting(attempt)),
Ok(Err(geyser_error)) => {
// ATM we consider all errors recoverable
warn!("! subscribe failed on {} - retrying: {:?}", grpc_source, geyser_error);
warn!("subscribe failed on {} - retrying: {:?}", grpc_source, geyser_error);
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
},
Err(geyser_grpc_task_error) => {
panic!("! task aborted - should not happen :{geyser_grpc_task_error}");
panic!("task aborted - should not happen :{geyser_grpc_task_error}");
}
}
}
ConnectionState::Ready(attempt, mut geyser_stream) => {
match geyser_stream.next().await {
Some(Ok(update_message)) => {
let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout);
match timeout(receive_timeout.unwrap_or(Duration::MAX), geyser_stream.next()).await {
Ok(Some(Ok(update_message))) => {
trace!("> recv update message from {}", grpc_source);
(ConnectionState::Ready(attempt, geyser_stream), Message::GeyserSubscribeUpdate(Box::new(update_message)))
}
Some(Err(tonic_status)) => {
Ok(Some(Err(tonic_status))) => {
// ATM we consider all errors recoverable
warn!("! error on {} - retrying: {:?}", grpc_source, tonic_status);
warn!("error on {} - retrying: {:?}", grpc_source, tonic_status);
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
}
None => {
Ok(None) => {
// should not arrive here, Mean the stream close.
warn!("geyser stream closed on {} - retrying", grpc_source);
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
}
Err(_elapsed) => {
// timeout
warn!("geyser stream timeout on {} - retrying", grpc_source);
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
}
}
}
ConnectionState::WaitReconnect(attempt) => {
let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0);
info!("! waiting {} seconds, then reconnect to {}", backoff_secs, grpc_source);
info!("waiting {} seconds, then reconnect to {}", backoff_secs, grpc_source);
sleep(Duration::from_secs_f32(backoff_secs)).await;
(ConnectionState::NotConnected(attempt), Message::Connecting(attempt))
}
@ -267,6 +133,7 @@ pub fn create_geyser_reconnecting_stream(
#[cfg(test)]
mod tests {
use super::*;
use crate::GrpcConnectionTimeouts;
#[tokio::test]
async fn test_debug_no_secrets() {
@ -274,6 +141,7 @@ mod tests {
connect_timeout: Duration::from_secs(1),
request_timeout: Duration::from_secs(2),
subscribe_timeout: Duration::from_secs(3),
receive_timeout: Duration::from_secs(3),
};
assert_eq!(
format!(
@ -295,6 +163,7 @@ mod tests {
connect_timeout: Duration::from_secs(1),
request_timeout: Duration::from_secs(2),
subscribe_timeout: Duration::from_secs(3),
receive_timeout: Duration::from_secs(3),
};
assert_eq!(
format!(

View File

@ -0,0 +1,345 @@
use crate::{GrpcSourceConfig, Message};
use futures::{Stream, StreamExt};
use log::{debug, error, info, log, trace, warn, Level};
use std::time::Duration;
use tokio::sync::mpsc::error::SendTimeoutError;
use tokio::sync::mpsc::Receiver;
use tokio::task::AbortHandle;
use tokio::time::{sleep, timeout, Instant};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError};
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
use yellowstone_grpc_proto::tonic::service::Interceptor;
use yellowstone_grpc_proto::tonic::Status;
type Attempt = u32;
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>, F: Interceptor> {
NotConnected(Attempt),
Connected(Attempt, GeyserGrpcClient<F>),
Ready(Attempt, S),
// error states
RecoverableConnectionError(Attempt),
// non-recoverable error
FatalError(Attempt, FatalErrorReason),
WaitReconnect(Attempt),
}
enum FatalErrorReason {
DownstreamChannelClosed,
ConfigurationError,
NetworkError,
SubscribeError,
}
/// connect to grpc source performing autoconect if required,
/// returns mpsc channel; task will abort on fatal error
///
/// implementation hints:
/// * no panic/unwrap
/// * do not use "?"
/// * do not "return" unless you really want to abort the task
pub fn create_geyser_autoconnection_task(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
) -> (AbortHandle, Receiver<Message>) {
// read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/
let (sender, receiver_channel) = tokio::sync::mpsc::channel::<Message>(1);
let jh_geyser_task = tokio::spawn(async move {
let mut state = ConnectionState::NotConnected(0);
let mut messages_forwarded = 0;
loop {
state = match state {
ConnectionState::NotConnected(mut attempt) => {
attempt += 1;
let addr = grpc_source.grpc_addr.clone();
let token = grpc_source.grpc_x_token.clone();
let config = grpc_source.tls_config.clone();
let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout);
let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout);
log!(
if attempt > 1 {
Level::Warn
} else {
Level::Debug
},
"Connecting attempt #{} to {}",
attempt,
addr
);
let connect_result = GeyserGrpcClient::connect_with_timeout(
addr,
token,
config,
connect_timeout,
request_timeout,
false,
)
.await;
match connect_result {
Ok(client) => ConnectionState::Connected(attempt, client),
Err(GeyserGrpcClientError::InvalidUri(_)) => ConnectionState::FatalError(
attempt,
FatalErrorReason::ConfigurationError,
),
Err(GeyserGrpcClientError::MetadataValueError(_)) => {
ConnectionState::FatalError(
attempt,
FatalErrorReason::ConfigurationError,
)
}
Err(GeyserGrpcClientError::InvalidXTokenLength(_)) => {
ConnectionState::FatalError(
attempt,
FatalErrorReason::ConfigurationError,
)
}
Err(GeyserGrpcClientError::TonicError(tonic_error)) => {
warn!(
"connect failed on {} - aborting: {:?}",
grpc_source, tonic_error
);
ConnectionState::FatalError(attempt, FatalErrorReason::NetworkError)
}
Err(GeyserGrpcClientError::TonicStatus(tonic_status)) => {
warn!(
"connect failed on {} - retrying: {:?}",
grpc_source, tonic_status
);
ConnectionState::RecoverableConnectionError(attempt)
}
Err(GeyserGrpcClientError::SubscribeSendError(send_error)) => {
warn!(
"connect failed with send error on {} - retrying: {:?}",
grpc_source, send_error
);
ConnectionState::RecoverableConnectionError(attempt)
}
}
}
ConnectionState::Connected(attempt, mut client) => {
let subscribe_timeout =
grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout);
let subscribe_filter = subscribe_filter.clone();
debug!("Subscribe with filter {:?}", subscribe_filter);
let subscribe_result_timeout = timeout(
subscribe_timeout.unwrap_or(Duration::MAX),
client.subscribe_once2(subscribe_filter),
)
.await;
match subscribe_result_timeout {
Ok(subscribe_result) => {
match subscribe_result {
Ok(geyser_stream) => ConnectionState::Ready(attempt, geyser_stream),
Err(GeyserGrpcClientError::TonicError(_)) => {
warn!("subscribe failed on {} - retrying", grpc_source);
ConnectionState::RecoverableConnectionError(attempt)
}
Err(GeyserGrpcClientError::TonicStatus(_)) => {
warn!("subscribe failed on {} - retrying", grpc_source);
ConnectionState::RecoverableConnectionError(attempt)
}
// non-recoverable
Err(unrecoverable_error) => {
error!(
"subscribe to {} failed with unrecoverable error: {}",
grpc_source, unrecoverable_error
);
ConnectionState::FatalError(
attempt,
FatalErrorReason::SubscribeError,
)
}
}
}
Err(_elapsed) => {
warn!(
"subscribe failed with timeout on {} - retrying",
grpc_source
);
ConnectionState::RecoverableConnectionError(attempt)
}
}
}
ConnectionState::RecoverableConnectionError(attempt) => {
let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0);
info!(
"waiting {} seconds, then reconnect to {}",
backoff_secs, grpc_source
);
sleep(Duration::from_secs_f32(backoff_secs)).await;
ConnectionState::NotConnected(attempt)
}
ConnectionState::FatalError(_attempt, reason) => match reason {
FatalErrorReason::DownstreamChannelClosed => {
warn!("downstream closed - aborting");
return;
}
FatalErrorReason::ConfigurationError => {
warn!("fatal configuration error - aborting");
return;
}
FatalErrorReason::NetworkError => {
warn!("fatal network error - aborting");
return;
}
FatalErrorReason::SubscribeError => {
warn!("fatal grpc subscribe error - aborting");
return;
}
},
ConnectionState::WaitReconnect(attempt) => {
let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0);
info!(
"waiting {} seconds, then reconnect to {}",
backoff_secs, grpc_source
);
sleep(Duration::from_secs_f32(backoff_secs)).await;
ConnectionState::NotConnected(attempt)
}
ConnectionState::Ready(attempt, mut geyser_stream) => {
let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout);
'recv_loop: loop {
match timeout(
receive_timeout.unwrap_or(Duration::MAX),
geyser_stream.next(),
)
.await
{
Ok(Some(Ok(update_message))) => {
trace!("> recv update message from {}", grpc_source);
// note: first send never blocks as the mpsc channel has capacity 1
let warning_threshold = if messages_forwarded == 1 {
Duration::from_millis(3000)
} else {
Duration::from_millis(500)
};
let started_at = Instant::now();
match sender
.send_timeout(
Message::GeyserSubscribeUpdate(Box::new(update_message)),
warning_threshold,
)
.await
{
Ok(()) => {
messages_forwarded += 1;
if messages_forwarded == 1 {
// note: first send never blocks - do not print time as this is a lie
trace!("queued first update message");
} else {
trace!(
"queued update message {} in {:.02}ms",
messages_forwarded,
started_at.elapsed().as_secs_f32() * 1000.0
);
}
continue 'recv_loop;
}
Err(SendTimeoutError::Timeout(the_message)) => {
warn!("downstream receiver did not pick put message for {}ms - keep waiting", warning_threshold.as_millis());
match sender.send(the_message).await {
Ok(()) => {
messages_forwarded += 1;
trace!(
"queued delayed update message {} in {:.02}ms",
messages_forwarded,
started_at.elapsed().as_secs_f32() * 1000.0
);
}
Err(_send_error) => {
warn!("downstream receiver closed, message is lost - aborting");
break 'recv_loop ConnectionState::FatalError(
attempt,
FatalErrorReason::DownstreamChannelClosed,
);
}
}
}
Err(SendTimeoutError::Closed(_)) => {
warn!("downstream receiver closed - aborting");
break 'recv_loop ConnectionState::FatalError(
attempt,
FatalErrorReason::DownstreamChannelClosed,
);
}
}
}
Ok(Some(Err(tonic_status))) => {
// all tonic errors are recoverable
warn!("error on {} - retrying: {:?}", grpc_source, tonic_status);
break 'recv_loop ConnectionState::WaitReconnect(attempt);
}
Ok(None) => {
warn!("geyser stream closed on {} - retrying", grpc_source);
break 'recv_loop ConnectionState::WaitReconnect(attempt);
}
Err(_elapsed) => {
warn!("timeout on {} - retrying", grpc_source);
break 'recv_loop ConnectionState::WaitReconnect(attempt);
}
}
} // -- END receive loop
}
} // -- END match
} // -- endless state loop
});
(jh_geyser_task.abort_handle(), receiver_channel)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::GrpcConnectionTimeouts;
#[tokio::test]
async fn test_debug_no_secrets() {
let timeout_config = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(1),
request_timeout: Duration::from_secs(2),
subscribe_timeout: Duration::from_secs(3),
receive_timeout: Duration::from_secs(3),
};
assert_eq!(
format!(
"{:?}",
GrpcSourceConfig::new(
"http://localhost:1234".to_string(),
Some("my-secret".to_string()),
None,
timeout_config
)
),
"grpc_addr http://localhost:1234"
);
}
#[tokio::test]
async fn test_display_no_secrets() {
let timeout_config = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(1),
request_timeout: Duration::from_secs(2),
subscribe_timeout: Duration::from_secs(3),
receive_timeout: Duration::from_secs(3),
};
assert_eq!(
format!(
"{}",
GrpcSourceConfig::new(
"http://localhost:1234".to_string(),
Some("my-secret".to_string()),
None,
timeout_config
)
),
"grpc_addr http://localhost:1234"
);
}
}

View File

@ -1,5 +1,5 @@
use crate::grpc_subscription_autoreconnect::Message;
use crate::grpc_subscription_autoreconnect::Message::GeyserSubscribeUpdate;
use crate::Message;
use crate::Message::GeyserSubscribeUpdate;
use async_stream::stream;
use futures::{Stream, StreamExt};
use log::{info, warn};

View File

@ -1,4 +1,173 @@
pub mod grpc_subscription;
pub mod grpc_subscription_autoreconnect;
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::time::Duration;
use yellowstone_grpc_proto::geyser::{
CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks,
SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, SubscribeUpdate,
};
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
pub mod channel_plugger;
pub mod grpc_subscription_autoreconnect_streams;
pub mod grpc_subscription_autoreconnect_tasks;
pub mod grpcmultiplex_fastestwins;
mod obfuscate;
type Attempt = u32;
// wraps payload and status messages
// clone is required by broacast channel
#[derive(Clone)]
pub enum Message {
GeyserSubscribeUpdate(Box<SubscribeUpdate>),
// connect (attempt=1) or reconnect(attempt=2..)
Connecting(Attempt),
}
#[derive(Clone, Debug)]
pub struct GrpcConnectionTimeouts {
pub connect_timeout: Duration,
pub request_timeout: Duration,
pub subscribe_timeout: Duration,
pub receive_timeout: Duration,
}
#[derive(Clone)]
pub struct GrpcSourceConfig {
pub grpc_addr: String,
pub grpc_x_token: Option<String>,
tls_config: Option<ClientTlsConfig>,
timeouts: Option<GrpcConnectionTimeouts>,
}
impl Display for GrpcSourceConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"grpc_addr {}",
crate::obfuscate::url_obfuscate_api_token(&self.grpc_addr)
)
}
}
impl Debug for GrpcSourceConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self, f)
}
}
impl GrpcSourceConfig {
/// Create a grpc source without tls and timeouts
pub fn new_simple(grpc_addr: String) -> Self {
Self {
grpc_addr,
grpc_x_token: None,
tls_config: None,
timeouts: None,
}
}
pub fn new(
grpc_addr: String,
grpc_x_token: Option<String>,
tls_config: Option<ClientTlsConfig>,
timeouts: GrpcConnectionTimeouts,
) -> Self {
Self {
grpc_addr,
grpc_x_token,
tls_config,
timeouts: Some(timeouts),
}
}
}
#[derive(Clone)]
pub struct GeyserFilter(pub CommitmentConfig);
impl GeyserFilter {
pub fn blocks_and_txs(&self) -> SubscribeRequest {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);
SubscribeRequest {
slots: HashMap::new(),
accounts: Default::default(),
transactions: HashMap::new(),
entry: Default::default(),
blocks: blocks_subs,
blocks_meta: HashMap::new(),
commitment: Some(map_commitment_level(self.0) as i32),
accounts_data_slice: Default::default(),
ping: None,
}
}
pub fn blocks_meta(&self) -> SubscribeRequest {
let mut blocksmeta_subs = HashMap::new();
blocksmeta_subs.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {});
SubscribeRequest {
slots: HashMap::new(),
accounts: Default::default(),
transactions: HashMap::new(),
entry: Default::default(),
blocks: HashMap::new(),
blocks_meta: blocksmeta_subs,
commitment: Some(map_commitment_level(self.0) as i32),
accounts_data_slice: Default::default(),
ping: None,
}
}
pub fn slots(&self) -> SubscribeRequest {
let mut slots_subs = HashMap::new();
slots_subs.insert(
"client".to_string(),
SubscribeRequestFilterSlots {
filter_by_commitment: Some(true),
},
);
SubscribeRequest {
slots: slots_subs,
accounts: Default::default(),
transactions: HashMap::new(),
entry: Default::default(),
blocks: HashMap::new(),
blocks_meta: HashMap::new(),
commitment: Some(map_commitment_level(self.0) as i32),
accounts_data_slice: Default::default(),
ping: None,
}
}
}
fn map_commitment_level(commitment_config: CommitmentConfig) -> CommitmentLevel {
// solana_sdk -> yellowstone
match commitment_config.commitment {
solana_sdk::commitment_config::CommitmentLevel::Processed => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Processed
}
solana_sdk::commitment_config::CommitmentLevel::Confirmed => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Confirmed
}
solana_sdk::commitment_config::CommitmentLevel::Finalized => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Finalized
}
_ => {
panic!(
"unsupported commitment level {}",
commitment_config.commitment
)
}
}
}