diff --git a/Cargo.lock b/Cargo.lock index 898c6f9..df3c9b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,9 +76,9 @@ dependencies = [ [[package]] name = "ahash" -version = "0.8.6" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91429305e9f0a25f6205c5b8e0d2db09e0708a7a6df0f42212bb56c32c8ac97a" +checksum = "72832d73be48bac96a5d7944568f305d829ed55b0ce3b483647089dfaf6cf704" dependencies = [ "cfg-if", "getrandom 0.2.11", @@ -249,12 +249,6 @@ dependencies = [ "rand 0.8.5", ] -[[package]] -name = "array-bytes" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ad284aeb45c13f2fb4f084de4a420ebf447423bdf9386c0540ce33cb3ef4b8c" - [[package]] name = "arrayref" version = "0.3.7" @@ -444,6 +438,9 @@ name = "bitflags" version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" +dependencies = [ + "serde", +] [[package]] name = "bitmaps" @@ -1239,7 +1236,7 @@ dependencies = [ [[package]] name = "geyser-grpc-connector" -version = "0.7.2+yellowstone.1.11" +version = "0.10.1+yellowstone.1.12" dependencies = [ "anyhow", "async-stream", @@ -1250,6 +1247,7 @@ dependencies = [ "itertools 0.10.5", "log", "merge-streams", + "solana-logger", "solana-sdk", "tokio", "tracing", @@ -1325,7 +1323,7 @@ version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.4", ] [[package]] @@ -1676,6 +1674,18 @@ dependencies = [ "libsecp256k1-core", ] +[[package]] +name = "light-poseidon" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c9a85a9752c549ceb7578064b4ed891179d20acd85f27318573b64d2d7ee7ee" +dependencies = [ + "ark-bn254", + "ark-ff", + "num-bigint 0.4.4", + "thiserror", +] + [[package]] name = "linux-raw-sys" version = "0.4.12" @@ -2241,6 +2251,17 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "qualifier_attr" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e2e25ee72f5b24d773cae88422baddefff7714f97aab68d96fe2b6fc4a28fb2" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.41", +] + [[package]] name = "quote" version = "1.0.33" @@ -2779,9 +2800,9 @@ dependencies = [ [[package]] name = "solana-account-decoder" -version = "1.16.17" +version = "1.17.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "121e55656c2094950f374247e1303dd09517f1ed49c91bf60bf114760b286eb4" +checksum = "22ea4bedfcc8686ae6d01a3d8288f5b9746cd00ec63f0ce9a6415849d35add50" dependencies = [ "Inflector", "base64 0.21.5", @@ -2792,42 +2813,21 @@ dependencies = [ "serde", "serde_derive", "serde_json", - "solana-address-lookup-table-program", "solana-config-program", "solana-sdk", "spl-token", "spl-token-2022", + "spl-token-group-interface", "spl-token-metadata-interface", "thiserror", "zstd", ] -[[package]] -name = "solana-address-lookup-table-program" -version = "1.16.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ccb31f7f14d5876acd9ec38f5bf6097bfb4b350141d81c7ff2bf684db3ca815" -dependencies = [ - "bincode", - "bytemuck", - "log", - "num-derive 0.3.3", - "num-traits", - "rustc_version", - "serde", - "solana-frozen-abi", - "solana-frozen-abi-macro", - "solana-program", - "solana-program-runtime", - "solana-sdk", - "thiserror", -] - [[package]] name = "solana-config-program" -version = "1.16.17" +version = "1.17.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94dc0f4463daf1c6155f20eac948ea4ced705e5f5520546aef4e11e746a6d95d" +checksum = "8de23cd0dd8673f4590e90bfa47ff19eb629f4b7dc15a3fb173a62d932801d07" dependencies = [ "bincode", "chrono", @@ -2839,11 +2839,11 @@ dependencies = [ [[package]] name = "solana-frozen-abi" -version = "1.16.17" +version = "1.17.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d266bf0311bb403d31206aa2904b8741f57c7f5e27580b6810ad5e22fc7c3282" +checksum = "4090f2ac64149ce1fbabd5277f41e278edc1f38121927fe8f6355e67ead3e199" dependencies = [ - "ahash 0.8.6", + "ahash 0.8.4", "blake3", "block-buffer 0.10.4", "bs58", @@ -2852,13 +2852,10 @@ dependencies = [ "cc", "either", "generic-array", - "getrandom 0.1.16", "im", "lazy_static", "log", "memmap2", - "once_cell", - "rand_core 0.6.4", "rustc_version", "serde", "serde_bytes", @@ -2872,9 +2869,9 @@ dependencies = [ [[package]] name = "solana-frozen-abi-macro" -version = "1.16.17" +version = "1.17.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dfe18c5155015dcb494c6de84a03b725fcf90ec2006a047769018b94c2cf0de" +checksum = "765bcdc1ecc31ea5d3d7ddb680ffa6645809c122b4ffdc223b161850e6ba352b" dependencies = [ "proc-macro2", "quote", @@ -2884,9 +2881,9 @@ dependencies = [ [[package]] name = "solana-logger" -version = "1.16.17" +version = "1.17.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f76fe25c2d06dcf621befd1e8d5655143e8a059c7e20fcb71736bc80ed779d6" +checksum = "9c7f3cad088bc5f00569cb5b4c3aaba8d935f8f7cc25c91cc0c55a8a7de2b137" dependencies = [ "env_logger", "lazy_static", @@ -2895,9 +2892,9 @@ dependencies = [ [[package]] name = "solana-measure" -version = "1.16.17" +version = "1.17.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db165b8a7f5d840abef011c78a18ffe63cad9192d676b07d94f469b6b5dc6cf6" +checksum = "2de5041d16120852c0deea047c024e1fad8819e49041491f0cca6c91c243fd5d" dependencies = [ "log", "solana-sdk", @@ -2905,9 +2902,9 @@ dependencies = [ [[package]] name = "solana-metrics" -version = "1.16.17" +version = "1.17.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa01731bb3952904962d49a1ea1205db54e93f3a56f4006d32e02a7c85d60546" +checksum = "2fd6f25f0076b6eb873f7e2a85e53191ac2affe6782131be1a2867d057307e20" dependencies = [ "crossbeam-channel", "gethostname", @@ -2915,22 +2912,22 @@ dependencies = [ "log", "reqwest", "solana-sdk", + "thiserror", ] [[package]] name = "solana-program" -version = "1.16.17" +version = "1.17.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bb16998986492de307eef503ce47e84503d35baa92dc60832b22476948b1c16" +checksum = "c1141d1dffbe68852128f7bbcc3c43a5d2cb715ecffeeb64eb81bb93cbaf80bb" dependencies = [ "ark-bn254", "ark-ec", "ark-ff", "ark-serialize", - "array-bytes", "base64 0.21.5", "bincode", - "bitflags 1.3.2", + "bitflags 2.4.1", "blake3", "borsh 0.10.3", "borsh 0.9.3", @@ -2947,14 +2944,14 @@ dependencies = [ "lazy_static", "libc", "libsecp256k1", + "light-poseidon", "log", "memoffset", "num-bigint 0.4.4", "num-derive 0.3.3", "num-traits", "parking_lot", - "rand 0.7.3", - "rand_chacha 0.2.2", + "rand 0.8.5", "rustc_version", "rustversion", "serde", @@ -2974,9 +2971,9 @@ dependencies = [ [[package]] name = "solana-program-runtime" -version = "1.16.17" +version = "1.17.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "036d6ecf67a3a7c6dc74d4f7fa6ab321e7ce8feccb7c9dff8384a41d0a12345b" +checksum = "942de577a2865cec28fc174575c9bd6cf7af815832af67fe40ca856075550998" dependencies = [ "base64 0.21.5", "bincode", @@ -2988,7 +2985,7 @@ dependencies = [ "num-derive 0.3.3", "num-traits", "percentage", - "rand 0.7.3", + "rand 0.8.5", "rustc_version", "serde", "solana-frozen-abi", @@ -3002,14 +2999,14 @@ dependencies = [ [[package]] name = "solana-sdk" -version = "1.16.17" +version = "1.17.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4106cda3d10833ba957dbd25fb841b50aeca7480ccf8f54859294716f54bcd4b" +checksum = "278a95acb99377dd4585599fdbec23d0a6fcb94ec78285283723fdd365fe885e" dependencies = [ "assert_matches", "base64 0.21.5", "bincode", - "bitflags 1.3.2", + "bitflags 2.4.1", "borsh 0.10.3", "bs58", "bytemuck", @@ -3032,8 +3029,9 @@ dependencies = [ "num_enum 0.6.1", "pbkdf2 0.11.0", "qstring", + "qualifier_attr", "rand 0.7.3", - "rand_chacha 0.2.2", + "rand 0.8.5", "rustc_version", "rustversion", "serde", @@ -3055,9 +3053,9 @@ dependencies = [ [[package]] name = "solana-sdk-macro" -version = "1.16.17" +version = "1.17.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e560806a3859717eb2220b26e2cd68bb757b63affa3e79c3f1d8d853b5ee78f" +checksum = "92dbaf563210f61828800f2a3d8c188fa2afede91920d364982e280318db2eb5" dependencies = [ "bs58", "proc-macro2", @@ -3067,10 +3065,16 @@ dependencies = [ ] [[package]] -name = "solana-transaction-status" -version = "1.16.17" +name = "solana-security-txt" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "236dd4e43b8a7402bce250228e04c0c68d9493a3e19c71b377ccc7c4390fd969" +checksum = "468aa43b7edb1f9b7b7b686d5c3aeb6630dc1708e86e31343499dd5c4d775183" + +[[package]] +name = "solana-transaction-status" +version = "1.17.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e2031070cba17802f7108b53f6db01b82cdfb0360b0a8b9d51c584f2e9dd9e4" dependencies = [ "Inflector", "base64 0.21.5", @@ -3083,7 +3087,6 @@ dependencies = [ "serde_derive", "serde_json", "solana-account-decoder", - "solana-address-lookup-table-program", "solana-sdk", "spl-associated-token-account", "spl-memo", @@ -3094,9 +3097,9 @@ dependencies = [ [[package]] name = "solana-zk-token-sdk" -version = "1.16.17" +version = "1.17.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "278c08e13bc04b6940997602909052524a375154b00cf0bfa934359a3bb7e6f0" +checksum = "ef26fb44734aa940e6648bbbeead677edc68c7e1ec09128e5f16a8924c389a38" dependencies = [ "aes-gcm-siv", "base64 0.21.5", @@ -3123,9 +3126,9 @@ dependencies = [ [[package]] name = "solana_rbpf" -version = "0.6.1" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17d4ba1e58947346e360fabde0697029d36ba83c42f669199b16a8931313cf29" +checksum = "3d457cc2ba742c120492a64b7fa60e22c575e891f6b55039f4d736568fb112a3" dependencies = [ "byteorder", "combine", @@ -3148,9 +3151,9 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" [[package]] name = "spl-associated-token-account" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "385e31c29981488f2820b2022d8e731aae3b02e6e18e2fd854e4c9a94dc44fc3" +checksum = "992d9c64c2564cc8f63a4b508bf3ebcdf2254b0429b13cd1d31adb6162432a5f" dependencies = [ "assert_matches", "borsh 0.10.3", @@ -3246,9 +3249,9 @@ dependencies = [ [[package]] name = "spl-tlv-account-resolution" -version = "0.4.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "062e148d3eab7b165582757453632ffeef490c02c86a48bfdb4988f63eefb3b9" +checksum = "615d381f48ddd2bb3c57c7f7fb207591a2a05054639b18a62e785117dd7a8683" dependencies = [ "bytemuck", "solana-program", @@ -3275,9 +3278,9 @@ dependencies = [ [[package]] name = "spl-token-2022" -version = "0.9.0" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4abf34a65ba420584a0c35f3903f8d727d1f13ababbdc3f714c6b065a686e86" +checksum = "d697fac19fd74ff472dfcc13f0b442dd71403178ce1de7b5d16f83a33561c059" dependencies = [ "arrayref", "bytemuck", @@ -3285,16 +3288,31 @@ dependencies = [ "num-traits", "num_enum 0.7.1", "solana-program", + "solana-security-txt", "solana-zk-token-sdk", "spl-memo", "spl-pod", "spl-token", + "spl-token-group-interface", "spl-token-metadata-interface", "spl-transfer-hook-interface", "spl-type-length-value", "thiserror", ] +[[package]] +name = "spl-token-group-interface" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b889509d49fa74a4a033ca5dae6c2307e9e918122d97e58562f5c4ffa795c75d" +dependencies = [ + "bytemuck", + "solana-program", + "spl-discriminator", + "spl-pod", + "spl-program-error", +] + [[package]] name = "spl-token-metadata-interface" version = "0.2.0" @@ -3311,9 +3329,9 @@ dependencies = [ [[package]] name = "spl-transfer-hook-interface" -version = "0.3.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "051d31803f873cabe71aec3c1b849f35248beae5d19a347d93a5c9cccc5d5a9b" +checksum = "7aabdb7c471566f6ddcee724beb8618449ea24b399e58d464d6b5bc7db550259" dependencies = [ "arrayref", "bytemuck", @@ -3606,7 +3624,6 @@ dependencies = [ "axum", "base64 0.21.5", "bytes", - "flate2", "h2", "http", "http-body", @@ -4147,9 +4164,8 @@ dependencies = [ [[package]] name = "yellowstone-grpc-client" -version = "1.12.0+solana.1.16.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e58204f372a7e82d15d72bdf99334029c4e9cdc15bd2e9a5c33b598d9f1eb8b6" +version = "1.13.0+solana.1.17.15" +source = "git+https://github.com/rpcpool/yellowstone-grpc.git?tag=v1.12.0+solana.1.17.15#c7b72cc8781c2dc48e4a7c94e411f95df495cf2f" dependencies = [ "bytes", "futures", @@ -4162,9 +4178,8 @@ dependencies = [ [[package]] name = "yellowstone-grpc-proto" -version = "1.11.0+solana.1.16.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00d751c6ef3093ec90ab1e16c6a504b5bea99aca6c688c429fed4cc56782f57e" +version = "1.12.0+solana.1.17.15" +source = "git+https://github.com/rpcpool/yellowstone-grpc.git?tag=v1.12.0+solana.1.17.15#c7b72cc8781c2dc48e4a7c94e411f95df495cf2f" dependencies = [ "anyhow", "bincode", diff --git a/Cargo.toml b/Cargo.toml index 9f88a6f..f4a69f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "geyser-grpc-connector" -version = "0.7.2+yellowstone.1.11" +version = "0.10.1+yellowstone.1.12" edition = "2021" description = "Multiplexing and Reconnection on Yellowstone gRPC Geyser client streaming" @@ -9,13 +9,12 @@ authors = ["GroovieGermanikus "] repository = "https://github.com/blockworks-foundation/geyser-grpc-connector" [dependencies] -# v1.11.0+solana.1.16.17 -yellowstone-grpc-proto = "1.11.0" -# v1.12.0+solana.1.16.17 -yellowstone-grpc-client = "1.12.0" +yellowstone-grpc-client = { version = "1.13.0+solana.1.17.15", git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" } +yellowstone-grpc-proto = { version = "1.12.0+solana.1.17.15", git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" } + # required for CommitmentConfig -solana-sdk = "~1.16.17" +solana-sdk = "~1.17.15" url = "2.5.0" async-stream = "0.3.5" @@ -33,3 +32,4 @@ bincode = "1.3.3" [dev-dependencies] tracing-subscriber = "0.3.16" +solana-logger = "1" diff --git a/examples/stream_blocks_autoconnect.rs b/examples/stream_blocks_autoconnect.rs new file mode 100644 index 0000000..3e24cb0 --- /dev/null +++ b/examples/stream_blocks_autoconnect.rs @@ -0,0 +1,138 @@ +use futures::StreamExt; +use log::info; +use solana_sdk::clock::Slot; +use solana_sdk::commitment_config::CommitmentConfig; +use std::env; + +use geyser_grpc_connector::channel_plugger::spawn_broadcast_channel_plug; +use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task; +use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor; +use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message}; +use tokio::time::{sleep, Duration}; +use tracing::warn; +use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; +use yellowstone_grpc_proto::geyser::SubscribeUpdate; +use yellowstone_grpc_proto::prost::Message as _; + +pub struct BlockMini { + pub blocksize: usize, + pub slot: Slot, + pub commitment_config: CommitmentConfig, +} + +struct BlockMiniExtractor(CommitmentConfig); + +impl FromYellowstoneExtractor for BlockMiniExtractor { + type Target = BlockMini; + fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)> { + match update.update_oneof { + Some(UpdateOneof::Block(update_block_message)) => { + let blocksize = update_block_message.encoded_len(); + let slot = update_block_message.slot; + let mini = BlockMini { + blocksize, + slot, + commitment_config: self.0, + }; + Some((slot, mini)) + } + Some(UpdateOneof::BlockMeta(update_blockmeta_message)) => { + let blocksize = update_blockmeta_message.encoded_len(); + let slot = update_blockmeta_message.slot; + let mini = BlockMini { + blocksize, + slot, + commitment_config: self.0, + }; + Some((slot, mini)) + } + _ => None, + } + } +} + +#[allow(dead_code)] +enum TestCases { + Basic, + SlowReceiverStartup, + TemporaryLaggingReceiver, + CloseAfterReceiving, + AbortTaskFromOutside, +} +const TEST_CASE: TestCases = TestCases::Basic; + +#[tokio::main] +pub async fn main() { + // RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace + tracing_subscriber::fmt::init(); + // console_subscriber::init(); + + let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green"); + let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok(); + + info!( + "Using grpc source on {} ({})", + grpc_addr_green, + grpc_x_token_green.is_some() + ); + + let timeouts = GrpcConnectionTimeouts { + connect_timeout: Duration::from_secs(5), + request_timeout: Duration::from_secs(5), + subscribe_timeout: Duration::from_secs(5), + receive_timeout: Duration::from_secs(5), + }; + + let green_config = + GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone()); + + info!("Write Block stream.."); + + let (jh_geyser_task, message_channel) = create_geyser_autoconnection_task( + green_config.clone(), + GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(), + ); + let mut message_channel = + spawn_broadcast_channel_plug(tokio::sync::broadcast::channel(8), message_channel); + + tokio::spawn(async move { + if let TestCases::SlowReceiverStartup = TEST_CASE { + sleep(Duration::from_secs(5)).await; + } + + let mut message_count = 0; + while let Ok(message) = message_channel.recv().await { + if let TestCases::AbortTaskFromOutside = TEST_CASE { + if message_count > 5 { + info!("(testcase) aborting task from outside"); + jh_geyser_task.abort(); + } + } + match message { + Message::GeyserSubscribeUpdate(subscriber_update) => { + message_count += 1; + info!("got update - {} bytes", subscriber_update.encoded_len()); + + if let TestCases::CloseAfterReceiving = TEST_CASE { + info!("(testcase) closing stream after receiving"); + return; + } + } + Message::Connecting(attempt) => { + warn!("Connection attempt: {}", attempt); + } + } + + if let TestCases::TemporaryLaggingReceiver = TEST_CASE { + if message_count % 3 == 1 { + info!("(testcase) lagging a bit"); + sleep(Duration::from_millis(1500)).await; + } + } + } + warn!("Stream aborted"); + }); + + // "infinite" sleep + sleep(Duration::from_secs(2000)).await; +} diff --git a/examples/stream_blocks_mainnet.rs b/examples/stream_blocks_mainnet.rs index 73440ea..e2c2731 100644 --- a/examples/stream_blocks_mainnet.rs +++ b/examples/stream_blocks_mainnet.rs @@ -21,12 +21,11 @@ use solana_sdk::signature::Signature; use solana_sdk::transaction::TransactionError; use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock; -use geyser_grpc_connector::grpc_subscription_autoreconnect::{ - create_geyser_reconnecting_stream, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, -}; +use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream; use geyser_grpc_connector::grpcmultiplex_fastestwins::{ create_multiplexed_stream, FromYellowstoneExtractor, }; +use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig}; use tokio::time::{sleep, Duration}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; @@ -130,6 +129,7 @@ pub async fn main() { connect_timeout: Duration::from_secs(5), request_timeout: Duration::from_secs(5), subscribe_timeout: Duration::from_secs(5), + receive_timeout: Duration::from_secs(5), }; let green_config = diff --git a/examples/stream_blocks_single.rs b/examples/stream_blocks_single.rs index 2d7ee51..4abc002 100644 --- a/examples/stream_blocks_single.rs +++ b/examples/stream_blocks_single.rs @@ -5,16 +5,14 @@ use solana_sdk::commitment_config::CommitmentConfig; use std::env; use std::pin::pin; -use geyser_grpc_connector::grpc_subscription_autoreconnect::{ - create_geyser_reconnecting_stream, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, -}; -use geyser_grpc_connector::grpcmultiplex_fastestwins::{ - create_multiplexed_stream, FromYellowstoneExtractor, -}; +use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream; +use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor; +use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message}; use tokio::time::{sleep, Duration}; +use tracing::warn; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; -use yellowstone_grpc_proto::prost::Message; +use yellowstone_grpc_proto::prost::Message as _; fn start_example_blockmini_consumer( multiplex_stream: impl Stream + Send + 'static, @@ -86,23 +84,69 @@ pub async fn main() { connect_timeout: Duration::from_secs(5), request_timeout: Duration::from_secs(5), subscribe_timeout: Duration::from_secs(5), + receive_timeout: Duration::from_secs(5), }; - let green_config = - GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone()); + let config = GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone()); info!("Write Block stream.."); + let green_stream = create_geyser_reconnecting_stream( - green_config.clone(), - GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(), - // GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(), + config.clone(), + GeyserFilter(CommitmentConfig::finalized()).blocks_and_txs(), ); - let multiplex_stream = create_multiplexed_stream( - vec![green_stream], - BlockMiniExtractor(CommitmentConfig::confirmed()), + + let blue_stream = create_geyser_reconnecting_stream( + config.clone(), + GeyserFilter(CommitmentConfig::processed()).blocks_and_txs(), ); - start_example_blockmini_consumer(multiplex_stream); + + tokio::spawn(async move { + let mut green_stream = pin!(green_stream); + while let Some(message) = green_stream.next().await { + match message { + Message::GeyserSubscribeUpdate(subscriber_update) => { + let mapped = map_block_update(*subscriber_update); + if let Some(slot) = mapped { + info!("got update (green)!!! slot: {}", slot); + } + } + Message::Connecting(attempt) => { + warn!("Connection attempt: {}", attempt); + } + } + } + warn!("Stream aborted"); + }); + + tokio::spawn(async move { + let mut blue_stream = pin!(blue_stream); + while let Some(message) = blue_stream.next().await { + match message { + Message::GeyserSubscribeUpdate(subscriber_update) => { + let mapped = map_block_update(*subscriber_update); + if let Some(slot) = mapped { + info!("got update (blue)!!! slot: {}", slot); + } + } + Message::Connecting(attempt) => { + warn!("Connection attempt: {}", attempt); + } + } + } + warn!("Stream aborted"); + }); // "infinite" sleep sleep(Duration::from_secs(1800)).await; } + +fn map_block_update(update: SubscribeUpdate) -> Option { + match update.update_oneof { + Some(UpdateOneof::Block(update_block_message)) => { + let slot = update_block_message.slot; + Some(slot) + } + _ => None, + } +} diff --git a/rust-toolchain.toml b/rust-toolchain.toml index f400973..8142c30 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "1.70" +channel = "1.73.0" diff --git a/src/channel_plugger.rs b/src/channel_plugger.rs new file mode 100644 index 0000000..9ba1628 --- /dev/null +++ b/src/channel_plugger.rs @@ -0,0 +1,171 @@ +use log::{debug, info, warn}; +use std::time::Duration; +use tokio::sync::broadcast::error::RecvError; +use tokio::sync::mpsc::error::SendTimeoutError; + +/// usage: see plug_pattern test +pub fn spawn_broadcast_channel_plug( + downstream_broadcast: ( + tokio::sync::broadcast::Sender, + tokio::sync::broadcast::Receiver, + ), + upstream: tokio::sync::mpsc::Receiver, +) -> tokio::sync::broadcast::Receiver { + spawn_plugger_mpcs_to_broadcast(upstream, downstream_broadcast.0); + downstream_broadcast.1 +} + +/// note: backpressure will NOT get propagated to upstream +pub fn spawn_plugger_mpcs_to_broadcast( + mut upstream: tokio::sync::mpsc::Receiver, + downstream: tokio::sync::broadcast::Sender, + // TODO allow multiple downstreams + fanout +) { + // abort forwarder by closing the sender + let _private_handler = tokio::spawn(async move { + while let Some(value) = upstream.recv().await { + match downstream.send(value) { + Ok(n_subscribers) => { + debug!("forwarded to {} subscribers", n_subscribers); + } + Err(_dropped_msg) => { + // decide to continue if no subscribers + debug!("no subscribers - dropping payload and continue"); + } + } + } + debug!("no more messages from producer - shutting down connector"); + }); +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::time::{sleep, timeout}; + + #[tokio::test] + async fn plug_pattern() { + let (_jh_task, message_channel) = tokio::sync::mpsc::channel::(1); + let _broadcast_rx = + spawn_broadcast_channel_plug(tokio::sync::broadcast::channel(8), message_channel); + } + + #[tokio::test] + async fn connect_broadcast_to_mpsc() { + solana_logger::setup_with_default("debug"); + + let (tx1, rx1) = tokio::sync::mpsc::channel::(1); + let (tx2, rx2) = tokio::sync::broadcast::channel::(2); + drop(rx2); + + let jh_producer = tokio::spawn(async move { + for i in 1..=10 { + info!("producer sending {}", i); + if let Err(SendTimeoutError::Timeout(message)) = + tx1.send_timeout(i, Duration::from_millis(200)).await + { + info!("producer send was blocked"); + tx1.send(message).await.unwrap(); + } + sleep(Duration::from_millis(500)).await; + } + }); + + // downstream receiver A connected to broadcast + let mut channel_a = tx2.subscribe(); + tokio::spawn(async move { + loop { + match channel_a.recv().await { + Ok(msg) => { + info!("A: {:?} (len={})", msg, channel_a.len()); + } + Err(RecvError::Lagged(n_missed)) => { + warn!("channel A lagged {} messages", n_missed); + } + Err(RecvError::Closed) => { + info!("channel A closed (by forwarder)"); + break; + } + } + } + }); + + // downstream receiver B connected to broadcast + let mut channel_b = tx2.subscribe(); + tokio::spawn(async move { + loop { + match channel_b.recv().await { + Ok(msg) => { + info!("B: {:?} (len={})", msg, channel_b.len()); + // slow receiver + sleep(Duration::from_millis(1000)).await; + } + Err(RecvError::Lagged(n_missed)) => { + warn!("channel B lagged {} messages", n_missed); + } + Err(RecvError::Closed) => { + info!("channel B closed (by forwarder)"); + break; + } + } + } + }); + + // connect them + spawn_plugger_mpcs_to_broadcast(rx1, tx2); + + // wait forever + info!("Started tasks .. waiting for producer to finish"); + // should take 5 secs + assert!( + timeout(Duration::from_secs(10), jh_producer).await.is_ok(), + "timeout" + ); + info!("producer done - wait a bit longer ..."); + sleep(Duration::from_secs(3)).await; + info!("done."); + + // note how messages pile up for slow receiver B + } + + #[tokio::test] + async fn connect_broadcast_to_mpsc_nosubscribers() { + solana_logger::setup_with_default("debug"); + + let (tx1, rx1) = tokio::sync::mpsc::channel::(1); + let (tx2, rx2) = tokio::sync::broadcast::channel::(2); + + let jh_producer = tokio::spawn(async move { + for i in 1..=10 { + info!("producer sending {}", i); + if let Err(SendTimeoutError::Timeout(message)) = + tx1.send_timeout(i, Duration::from_millis(200)).await + { + info!("producer send was blocked"); + tx1.send(message).await.unwrap(); + } + sleep(Duration::from_millis(500)).await; + } + }); + + // connect them + spawn_plugger_mpcs_to_broadcast(rx1, tx2); + + sleep(Duration::from_secs(3)).await; + info!("dropping subscriber"); + drop(rx2); + + // wait forever + info!("Started tasks .. waiting for producer to finish"); + // should take 5 secs + assert!( + timeout(Duration::from_secs(10), jh_producer).await.is_ok(), + "timeout" + ); + info!("producer done - wait a bit longer ..."); + sleep(Duration::from_secs(3)).await; + info!("done."); + + // note how messages pile up for slow receiver B + } +} diff --git a/src/grpc_subscription.rs b/src/grpc_subscription.rs deleted file mode 100644 index 477d29f..0000000 --- a/src/grpc_subscription.rs +++ /dev/null @@ -1,82 +0,0 @@ -// use crate::{ -// endpoint_stremers::EndpointStreaming, -// rpc_polling::vote_accounts_and_cluster_info_polling::poll_vote_accounts_and_cluster_info, -// }; -use anyhow::{bail, Context}; -use futures::StreamExt; - -use solana_sdk::commitment_config::CommitmentConfig; -use std::collections::HashMap; -use tokio::sync::broadcast::Sender; -use yellowstone_grpc_client::GeyserGrpcClient; -use yellowstone_grpc_proto::prelude::{ - subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks, - SubscribeUpdateBlock, -}; - -pub fn create_block_processing_task( - grpc_addr: String, - grpc_x_token: Option, - block_sx: Sender, - commitment_level: CommitmentLevel, -) -> tokio::task::JoinHandle> { - let mut blocks_subs = HashMap::new(); - blocks_subs.insert( - "client".to_string(), - SubscribeRequestFilterBlocks { - account_include: Default::default(), - include_transactions: Some(true), - include_accounts: Some(false), - include_entries: Some(false), - }, - ); - - let _commitment_config = match commitment_level { - CommitmentLevel::Confirmed => CommitmentConfig::confirmed(), - CommitmentLevel::Finalized => CommitmentConfig::finalized(), - CommitmentLevel::Processed => CommitmentConfig::processed(), - }; - - tokio::spawn(async move { - // connect to grpc - let mut client = GeyserGrpcClient::connect(grpc_addr, grpc_x_token, None)?; - let mut stream = client - .subscribe_once( - HashMap::new(), - Default::default(), - HashMap::new(), - Default::default(), - blocks_subs, - Default::default(), - Some(commitment_level), - Default::default(), - None, - ) - .await?; - - while let Some(message) = stream.next().await { - let message = message?; - - let Some(update) = message.update_oneof else { - continue; - }; - - match update { - UpdateOneof::Block(block) => { - // let block = map_produced_block(block, commitment_config); - - block_sx - .send(block) - .context("Grpc failed to send a block")?; - } - UpdateOneof::Ping(_) => { - log::trace!("GRPC Ping"); - } - u => { - bail!("Unexpected update: {u:?}"); - } - }; - } - bail!("geyser slot stream ended"); - }) -} diff --git a/src/grpc_subscription_autoreconnect.rs b/src/grpc_subscription_autoreconnect_streams.rs similarity index 56% rename from src/grpc_subscription_autoreconnect.rs rename to src/grpc_subscription_autoreconnect_streams.rs index a0ef511..e070292 100644 --- a/src/grpc_subscription_autoreconnect.rs +++ b/src/grpc_subscription_autoreconnect_streams.rs @@ -1,85 +1,14 @@ +use crate::{Attempt, GrpcSourceConfig, Message}; use async_stream::stream; use futures::{Stream, StreamExt}; use log::{debug, info, log, trace, warn, Level}; -use solana_sdk::commitment_config::CommitmentConfig; -use std::collections::HashMap; -use std::fmt::{Debug, Display}; use std::time::Duration; use tokio::task::JoinHandle; use tokio::time::{sleep, timeout}; use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult}; -use yellowstone_grpc_proto::geyser::{ - CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeUpdate, -}; -use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta; -use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; +use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate}; use yellowstone_grpc_proto::tonic::Status; -#[derive(Clone, Debug)] -pub struct GrpcConnectionTimeouts { - pub connect_timeout: Duration, - pub request_timeout: Duration, - pub subscribe_timeout: Duration, -} - -#[derive(Clone)] -pub struct GrpcSourceConfig { - pub grpc_addr: String, - pub grpc_x_token: Option, - tls_config: Option, - timeouts: Option, -} - -impl Display for GrpcSourceConfig { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "grpc_addr {}", - crate::obfuscate::url_obfuscate_api_token(&self.grpc_addr) - ) - } -} - -impl Debug for GrpcSourceConfig { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Display::fmt(&self, f) - } -} - -impl GrpcSourceConfig { - /// Create a grpc source without tls and timeouts - pub fn new_simple(grpc_addr: String) -> Self { - Self { - grpc_addr, - grpc_x_token: None, - tls_config: None, - timeouts: None, - } - } - pub fn new( - grpc_addr: String, - grpc_x_token: Option, - tls_config: Option, - timeouts: GrpcConnectionTimeouts, - ) -> Self { - Self { - grpc_addr, - grpc_x_token, - tls_config, - timeouts: Some(timeouts), - } - } -} - -type Attempt = u32; - -// wraps payload and status messages -pub enum Message { - GeyserSubscribeUpdate(Box), - // connect (attempt=1) or reconnect(attempt=2..) - Connecting(Attempt), -} - enum ConnectionState>> { NotConnected(Attempt), Connecting(Attempt, JoinHandle>), @@ -87,74 +16,6 @@ enum ConnectionState>> { WaitReconnect(Attempt), } -#[derive(Clone)] -pub struct GeyserFilter(pub CommitmentConfig); - -impl GeyserFilter { - pub fn blocks_and_txs(&self) -> SubscribeRequest { - let mut blocks_subs = HashMap::new(); - blocks_subs.insert( - "client".to_string(), - SubscribeRequestFilterBlocks { - account_include: Default::default(), - include_transactions: Some(true), - include_accounts: Some(false), - include_entries: Some(false), - }, - ); - - SubscribeRequest { - slots: HashMap::new(), - accounts: Default::default(), - transactions: HashMap::new(), - entry: Default::default(), - blocks: blocks_subs, - blocks_meta: HashMap::new(), - commitment: Some(map_commitment_level(self.0) as i32), - accounts_data_slice: Default::default(), - ping: None, - } - } - - pub fn blocks_meta(&self) -> SubscribeRequest { - let mut blocksmeta_subs = HashMap::new(); - blocksmeta_subs.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {}); - - SubscribeRequest { - slots: HashMap::new(), - accounts: Default::default(), - transactions: HashMap::new(), - entry: Default::default(), - blocks: HashMap::new(), - blocks_meta: blocksmeta_subs, - commitment: Some(map_commitment_level(self.0) as i32), - accounts_data_slice: Default::default(), - ping: None, - } - } -} - -fn map_commitment_level(commitment_config: CommitmentConfig) -> CommitmentLevel { - // solana_sdk -> yellowstone - match commitment_config.commitment { - solana_sdk::commitment_config::CommitmentLevel::Processed => { - yellowstone_grpc_proto::prelude::CommitmentLevel::Processed - } - solana_sdk::commitment_config::CommitmentLevel::Confirmed => { - yellowstone_grpc_proto::prelude::CommitmentLevel::Confirmed - } - solana_sdk::commitment_config::CommitmentLevel::Finalized => { - yellowstone_grpc_proto::prelude::CommitmentLevel::Finalized - } - _ => { - panic!( - "unsupported commitment level {}", - commitment_config.commitment - ) - } - } -} - // Take geyser filter, connect to Geyser and return a generic stream of SubscribeUpdate // note: stream never terminates pub fn create_geyser_reconnecting_stream( @@ -216,40 +77,45 @@ pub fn create_geyser_reconnecting_stream( Ok(Ok(subscribed_stream)) => (ConnectionState::Ready(attempt, subscribed_stream), Message::Connecting(attempt)), Ok(Err(geyser_error)) => { // ATM we consider all errors recoverable - warn!("! subscribe failed on {} - retrying: {:?}", grpc_source, geyser_error); + warn!("subscribe failed on {} - retrying: {:?}", grpc_source, geyser_error); (ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt)) }, Err(geyser_grpc_task_error) => { - panic!("! task aborted - should not happen :{geyser_grpc_task_error}"); + panic!("task aborted - should not happen :{geyser_grpc_task_error}"); } } } ConnectionState::Ready(attempt, mut geyser_stream) => { - - match geyser_stream.next().await { - Some(Ok(update_message)) => { + let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout); + match timeout(receive_timeout.unwrap_or(Duration::MAX), geyser_stream.next()).await { + Ok(Some(Ok(update_message))) => { trace!("> recv update message from {}", grpc_source); (ConnectionState::Ready(attempt, geyser_stream), Message::GeyserSubscribeUpdate(Box::new(update_message))) } - Some(Err(tonic_status)) => { + Ok(Some(Err(tonic_status))) => { // ATM we consider all errors recoverable - warn!("! error on {} - retrying: {:?}", grpc_source, tonic_status); + warn!("error on {} - retrying: {:?}", grpc_source, tonic_status); (ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt)) } - None => { + Ok(None) => { // should not arrive here, Mean the stream close. warn!("geyser stream closed on {} - retrying", grpc_source); (ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt)) } + Err(_elapsed) => { + // timeout + warn!("geyser stream timeout on {} - retrying", grpc_source); + (ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt)) + } } } ConnectionState::WaitReconnect(attempt) => { let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0); - info!("! waiting {} seconds, then reconnect to {}", backoff_secs, grpc_source); + info!("waiting {} seconds, then reconnect to {}", backoff_secs, grpc_source); sleep(Duration::from_secs_f32(backoff_secs)).await; (ConnectionState::NotConnected(attempt), Message::Connecting(attempt)) } @@ -267,6 +133,7 @@ pub fn create_geyser_reconnecting_stream( #[cfg(test)] mod tests { use super::*; + use crate::GrpcConnectionTimeouts; #[tokio::test] async fn test_debug_no_secrets() { @@ -274,6 +141,7 @@ mod tests { connect_timeout: Duration::from_secs(1), request_timeout: Duration::from_secs(2), subscribe_timeout: Duration::from_secs(3), + receive_timeout: Duration::from_secs(3), }; assert_eq!( format!( @@ -295,6 +163,7 @@ mod tests { connect_timeout: Duration::from_secs(1), request_timeout: Duration::from_secs(2), subscribe_timeout: Duration::from_secs(3), + receive_timeout: Duration::from_secs(3), }; assert_eq!( format!( diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs new file mode 100644 index 0000000..55f6a4b --- /dev/null +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -0,0 +1,345 @@ +use crate::{GrpcSourceConfig, Message}; +use futures::{Stream, StreamExt}; +use log::{debug, error, info, log, trace, warn, Level}; +use std::time::Duration; +use tokio::sync::mpsc::error::SendTimeoutError; +use tokio::sync::mpsc::Receiver; +use tokio::task::AbortHandle; +use tokio::time::{sleep, timeout, Instant}; +use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError}; +use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate}; +use yellowstone_grpc_proto::tonic::service::Interceptor; +use yellowstone_grpc_proto::tonic::Status; + +type Attempt = u32; + +enum ConnectionState>, F: Interceptor> { + NotConnected(Attempt), + Connected(Attempt, GeyserGrpcClient), + Ready(Attempt, S), + // error states + RecoverableConnectionError(Attempt), + // non-recoverable error + FatalError(Attempt, FatalErrorReason), + WaitReconnect(Attempt), +} + +enum FatalErrorReason { + DownstreamChannelClosed, + ConfigurationError, + NetworkError, + SubscribeError, +} + +/// connect to grpc source performing autoconect if required, +/// returns mpsc channel; task will abort on fatal error +/// +/// implementation hints: +/// * no panic/unwrap +/// * do not use "?" +/// * do not "return" unless you really want to abort the task +pub fn create_geyser_autoconnection_task( + grpc_source: GrpcSourceConfig, + subscribe_filter: SubscribeRequest, +) -> (AbortHandle, Receiver) { + // read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/ + let (sender, receiver_channel) = tokio::sync::mpsc::channel::(1); + + let jh_geyser_task = tokio::spawn(async move { + let mut state = ConnectionState::NotConnected(0); + let mut messages_forwarded = 0; + + loop { + state = match state { + ConnectionState::NotConnected(mut attempt) => { + attempt += 1; + + let addr = grpc_source.grpc_addr.clone(); + let token = grpc_source.grpc_x_token.clone(); + let config = grpc_source.tls_config.clone(); + let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout); + let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout); + log!( + if attempt > 1 { + Level::Warn + } else { + Level::Debug + }, + "Connecting attempt #{} to {}", + attempt, + addr + ); + let connect_result = GeyserGrpcClient::connect_with_timeout( + addr, + token, + config, + connect_timeout, + request_timeout, + false, + ) + .await; + + match connect_result { + Ok(client) => ConnectionState::Connected(attempt, client), + Err(GeyserGrpcClientError::InvalidUri(_)) => ConnectionState::FatalError( + attempt, + FatalErrorReason::ConfigurationError, + ), + Err(GeyserGrpcClientError::MetadataValueError(_)) => { + ConnectionState::FatalError( + attempt, + FatalErrorReason::ConfigurationError, + ) + } + Err(GeyserGrpcClientError::InvalidXTokenLength(_)) => { + ConnectionState::FatalError( + attempt, + FatalErrorReason::ConfigurationError, + ) + } + Err(GeyserGrpcClientError::TonicError(tonic_error)) => { + warn!( + "connect failed on {} - aborting: {:?}", + grpc_source, tonic_error + ); + ConnectionState::FatalError(attempt, FatalErrorReason::NetworkError) + } + Err(GeyserGrpcClientError::TonicStatus(tonic_status)) => { + warn!( + "connect failed on {} - retrying: {:?}", + grpc_source, tonic_status + ); + ConnectionState::RecoverableConnectionError(attempt) + } + Err(GeyserGrpcClientError::SubscribeSendError(send_error)) => { + warn!( + "connect failed with send error on {} - retrying: {:?}", + grpc_source, send_error + ); + ConnectionState::RecoverableConnectionError(attempt) + } + } + } + ConnectionState::Connected(attempt, mut client) => { + let subscribe_timeout = + grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout); + let subscribe_filter = subscribe_filter.clone(); + debug!("Subscribe with filter {:?}", subscribe_filter); + + let subscribe_result_timeout = timeout( + subscribe_timeout.unwrap_or(Duration::MAX), + client.subscribe_once2(subscribe_filter), + ) + .await; + + match subscribe_result_timeout { + Ok(subscribe_result) => { + match subscribe_result { + Ok(geyser_stream) => ConnectionState::Ready(attempt, geyser_stream), + Err(GeyserGrpcClientError::TonicError(_)) => { + warn!("subscribe failed on {} - retrying", grpc_source); + ConnectionState::RecoverableConnectionError(attempt) + } + Err(GeyserGrpcClientError::TonicStatus(_)) => { + warn!("subscribe failed on {} - retrying", grpc_source); + ConnectionState::RecoverableConnectionError(attempt) + } + // non-recoverable + Err(unrecoverable_error) => { + error!( + "subscribe to {} failed with unrecoverable error: {}", + grpc_source, unrecoverable_error + ); + ConnectionState::FatalError( + attempt, + FatalErrorReason::SubscribeError, + ) + } + } + } + Err(_elapsed) => { + warn!( + "subscribe failed with timeout on {} - retrying", + grpc_source + ); + ConnectionState::RecoverableConnectionError(attempt) + } + } + } + ConnectionState::RecoverableConnectionError(attempt) => { + let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0); + info!( + "waiting {} seconds, then reconnect to {}", + backoff_secs, grpc_source + ); + sleep(Duration::from_secs_f32(backoff_secs)).await; + ConnectionState::NotConnected(attempt) + } + ConnectionState::FatalError(_attempt, reason) => match reason { + FatalErrorReason::DownstreamChannelClosed => { + warn!("downstream closed - aborting"); + return; + } + FatalErrorReason::ConfigurationError => { + warn!("fatal configuration error - aborting"); + return; + } + FatalErrorReason::NetworkError => { + warn!("fatal network error - aborting"); + return; + } + FatalErrorReason::SubscribeError => { + warn!("fatal grpc subscribe error - aborting"); + return; + } + }, + ConnectionState::WaitReconnect(attempt) => { + let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0); + info!( + "waiting {} seconds, then reconnect to {}", + backoff_secs, grpc_source + ); + sleep(Duration::from_secs_f32(backoff_secs)).await; + ConnectionState::NotConnected(attempt) + } + ConnectionState::Ready(attempt, mut geyser_stream) => { + let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout); + 'recv_loop: loop { + match timeout( + receive_timeout.unwrap_or(Duration::MAX), + geyser_stream.next(), + ) + .await + { + Ok(Some(Ok(update_message))) => { + trace!("> recv update message from {}", grpc_source); + // note: first send never blocks as the mpsc channel has capacity 1 + let warning_threshold = if messages_forwarded == 1 { + Duration::from_millis(3000) + } else { + Duration::from_millis(500) + }; + let started_at = Instant::now(); + match sender + .send_timeout( + Message::GeyserSubscribeUpdate(Box::new(update_message)), + warning_threshold, + ) + .await + { + Ok(()) => { + messages_forwarded += 1; + if messages_forwarded == 1 { + // note: first send never blocks - do not print time as this is a lie + trace!("queued first update message"); + } else { + trace!( + "queued update message {} in {:.02}ms", + messages_forwarded, + started_at.elapsed().as_secs_f32() * 1000.0 + ); + } + continue 'recv_loop; + } + Err(SendTimeoutError::Timeout(the_message)) => { + warn!("downstream receiver did not pick put message for {}ms - keep waiting", warning_threshold.as_millis()); + + match sender.send(the_message).await { + Ok(()) => { + messages_forwarded += 1; + trace!( + "queued delayed update message {} in {:.02}ms", + messages_forwarded, + started_at.elapsed().as_secs_f32() * 1000.0 + ); + } + Err(_send_error) => { + warn!("downstream receiver closed, message is lost - aborting"); + break 'recv_loop ConnectionState::FatalError( + attempt, + FatalErrorReason::DownstreamChannelClosed, + ); + } + } + } + Err(SendTimeoutError::Closed(_)) => { + warn!("downstream receiver closed - aborting"); + break 'recv_loop ConnectionState::FatalError( + attempt, + FatalErrorReason::DownstreamChannelClosed, + ); + } + } + } + Ok(Some(Err(tonic_status))) => { + // all tonic errors are recoverable + warn!("error on {} - retrying: {:?}", grpc_source, tonic_status); + break 'recv_loop ConnectionState::WaitReconnect(attempt); + } + Ok(None) => { + warn!("geyser stream closed on {} - retrying", grpc_source); + break 'recv_loop ConnectionState::WaitReconnect(attempt); + } + Err(_elapsed) => { + warn!("timeout on {} - retrying", grpc_source); + break 'recv_loop ConnectionState::WaitReconnect(attempt); + } + } + } // -- END receive loop + } + } // -- END match + } // -- endless state loop + }); + + (jh_geyser_task.abort_handle(), receiver_channel) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::GrpcConnectionTimeouts; + + #[tokio::test] + async fn test_debug_no_secrets() { + let timeout_config = GrpcConnectionTimeouts { + connect_timeout: Duration::from_secs(1), + request_timeout: Duration::from_secs(2), + subscribe_timeout: Duration::from_secs(3), + receive_timeout: Duration::from_secs(3), + }; + assert_eq!( + format!( + "{:?}", + GrpcSourceConfig::new( + "http://localhost:1234".to_string(), + Some("my-secret".to_string()), + None, + timeout_config + ) + ), + "grpc_addr http://localhost:1234" + ); + } + + #[tokio::test] + async fn test_display_no_secrets() { + let timeout_config = GrpcConnectionTimeouts { + connect_timeout: Duration::from_secs(1), + request_timeout: Duration::from_secs(2), + subscribe_timeout: Duration::from_secs(3), + receive_timeout: Duration::from_secs(3), + }; + assert_eq!( + format!( + "{}", + GrpcSourceConfig::new( + "http://localhost:1234".to_string(), + Some("my-secret".to_string()), + None, + timeout_config + ) + ), + "grpc_addr http://localhost:1234" + ); + } +} diff --git a/src/grpcmultiplex_fastestwins.rs b/src/grpcmultiplex_fastestwins.rs index 81d58c4..78dbde1 100644 --- a/src/grpcmultiplex_fastestwins.rs +++ b/src/grpcmultiplex_fastestwins.rs @@ -1,5 +1,5 @@ -use crate::grpc_subscription_autoreconnect::Message; -use crate::grpc_subscription_autoreconnect::Message::GeyserSubscribeUpdate; +use crate::Message; +use crate::Message::GeyserSubscribeUpdate; use async_stream::stream; use futures::{Stream, StreamExt}; use log::{info, warn}; diff --git a/src/lib.rs b/src/lib.rs index 91794cd..0cf09c1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,173 @@ -pub mod grpc_subscription; -pub mod grpc_subscription_autoreconnect; +use solana_sdk::commitment_config::CommitmentConfig; +use std::collections::HashMap; +use std::fmt::{Debug, Display}; +use std::time::Duration; +use yellowstone_grpc_proto::geyser::{ + CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, + SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, SubscribeUpdate, +}; +use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; + +pub mod channel_plugger; +pub mod grpc_subscription_autoreconnect_streams; +pub mod grpc_subscription_autoreconnect_tasks; pub mod grpcmultiplex_fastestwins; mod obfuscate; + +type Attempt = u32; + +// wraps payload and status messages +// clone is required by broacast channel +#[derive(Clone)] +pub enum Message { + GeyserSubscribeUpdate(Box), + // connect (attempt=1) or reconnect(attempt=2..) + Connecting(Attempt), +} + +#[derive(Clone, Debug)] +pub struct GrpcConnectionTimeouts { + pub connect_timeout: Duration, + pub request_timeout: Duration, + pub subscribe_timeout: Duration, + pub receive_timeout: Duration, +} + +#[derive(Clone)] +pub struct GrpcSourceConfig { + pub grpc_addr: String, + pub grpc_x_token: Option, + tls_config: Option, + timeouts: Option, +} + +impl Display for GrpcSourceConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "grpc_addr {}", + crate::obfuscate::url_obfuscate_api_token(&self.grpc_addr) + ) + } +} + +impl Debug for GrpcSourceConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(&self, f) + } +} + +impl GrpcSourceConfig { + /// Create a grpc source without tls and timeouts + pub fn new_simple(grpc_addr: String) -> Self { + Self { + grpc_addr, + grpc_x_token: None, + tls_config: None, + timeouts: None, + } + } + pub fn new( + grpc_addr: String, + grpc_x_token: Option, + tls_config: Option, + timeouts: GrpcConnectionTimeouts, + ) -> Self { + Self { + grpc_addr, + grpc_x_token, + tls_config, + timeouts: Some(timeouts), + } + } +} + +#[derive(Clone)] +pub struct GeyserFilter(pub CommitmentConfig); + +impl GeyserFilter { + pub fn blocks_and_txs(&self) -> SubscribeRequest { + let mut blocks_subs = HashMap::new(); + blocks_subs.insert( + "client".to_string(), + SubscribeRequestFilterBlocks { + account_include: Default::default(), + include_transactions: Some(true), + include_accounts: Some(false), + include_entries: Some(false), + }, + ); + + SubscribeRequest { + slots: HashMap::new(), + accounts: Default::default(), + transactions: HashMap::new(), + entry: Default::default(), + blocks: blocks_subs, + blocks_meta: HashMap::new(), + commitment: Some(map_commitment_level(self.0) as i32), + accounts_data_slice: Default::default(), + ping: None, + } + } + + pub fn blocks_meta(&self) -> SubscribeRequest { + let mut blocksmeta_subs = HashMap::new(); + blocksmeta_subs.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {}); + + SubscribeRequest { + slots: HashMap::new(), + accounts: Default::default(), + transactions: HashMap::new(), + entry: Default::default(), + blocks: HashMap::new(), + blocks_meta: blocksmeta_subs, + commitment: Some(map_commitment_level(self.0) as i32), + accounts_data_slice: Default::default(), + ping: None, + } + } + + pub fn slots(&self) -> SubscribeRequest { + let mut slots_subs = HashMap::new(); + slots_subs.insert( + "client".to_string(), + SubscribeRequestFilterSlots { + filter_by_commitment: Some(true), + }, + ); + + SubscribeRequest { + slots: slots_subs, + accounts: Default::default(), + transactions: HashMap::new(), + entry: Default::default(), + blocks: HashMap::new(), + blocks_meta: HashMap::new(), + commitment: Some(map_commitment_level(self.0) as i32), + accounts_data_slice: Default::default(), + ping: None, + } + } +} + +fn map_commitment_level(commitment_config: CommitmentConfig) -> CommitmentLevel { + // solana_sdk -> yellowstone + match commitment_config.commitment { + solana_sdk::commitment_config::CommitmentLevel::Processed => { + yellowstone_grpc_proto::prelude::CommitmentLevel::Processed + } + solana_sdk::commitment_config::CommitmentLevel::Confirmed => { + yellowstone_grpc_proto::prelude::CommitmentLevel::Confirmed + } + solana_sdk::commitment_config::CommitmentLevel::Finalized => { + yellowstone_grpc_proto::prelude::CommitmentLevel::Finalized + } + _ => { + panic!( + "unsupported commitment level {}", + commitment_config.commitment + ) + } + } +}