Add compression bool to create_geyser_autoconnection_task_with_mpsc

This commit is contained in:
godmodegalactus 2024-04-23 10:19:24 +02:00
parent 950559b5aa
commit 05f110c895
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
8 changed files with 141 additions and 110 deletions

186
Cargo.lock generated
View File

@ -306,18 +306,18 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
name = "async-trait"
version = "0.1.79"
version = "0.1.80"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a507401cad91ec6a857ed5513a2073c82a9b9048762b885bb98655b306964681"
checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -640,7 +640,7 @@ checksum = "4da9a32f3fed317401fa3c862968128267c3106685286e15d5aaa3d7389c2f60"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -657,12 +657,13 @@ checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9"
[[package]]
name = "cc"
version = "1.0.92"
version = "1.0.95"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2678b2e3449475e95b0aa6f9b506a28e61b3dc8996592b983695e8ebb58a8b41"
checksum = "d32a725bc159af97c3e629873bb9f88fb8cf8a4867175f76dc987815ea07c83b"
dependencies = [
"jobserver",
"libc",
"once_cell",
]
[[package]]
@ -673,9 +674,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.37"
version = "0.4.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a0d04d43504c61aa6c7531f1871dd0d418d91130162063b789da00fd7057a5e"
checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401"
dependencies = [
"android-tzdata",
"iana-time-zone",
@ -683,7 +684,7 @@ dependencies = [
"num-traits",
"serde",
"wasm-bindgen",
"windows-targets 0.52.4",
"windows-targets 0.52.5",
]
[[package]]
@ -899,7 +900,7 @@ dependencies = [
"proc-macro2",
"quote",
"strsim",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -910,7 +911,7 @@ checksum = "a668eda54683121533a393014d8692171709ff57a7d61f187b6e782719f8933f"
dependencies = [
"darling_core",
"quote",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -1006,9 +1007,9 @@ dependencies = [
[[package]]
name = "either"
version = "1.10.0"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a"
checksum = "a47c1c47d2f5964e29c61246e81db715514cd532db6b5116a25ea3c03d6780a2"
[[package]]
name = "encoding_rs"
@ -1030,13 +1031,13 @@ dependencies = [
[[package]]
name = "enum-iterator-derive"
version = "1.3.0"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03cdc46ec28bd728e67540c528013c6a10eb69a02eb31078a1bda695438cbfb8"
checksum = "c19cbb53d33b57ac4df1f0af6b92c38c107cded663c4aea9fae1189dcfc17cf5"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -1167,7 +1168,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -1596,9 +1597,9 @@ checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
[[package]]
name = "jobserver"
version = "0.1.28"
version = "0.1.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab46a6e9526ddef3ae7f787c06f0f2600639ba80ea3eade3d8e670a2230f51d6"
checksum = "d2b099aaa34a9751c5bf0878add70444e1ed2dd73f347be99003d4577277de6e"
dependencies = [
"libc",
]
@ -1874,7 +1875,7 @@ checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -1955,7 +1956,7 @@ dependencies = [
"proc-macro-crate 1.3.1",
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -1967,7 +1968,7 @@ dependencies = [
"proc-macro-crate 3.1.0",
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -2092,7 +2093,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -2139,12 +2140,12 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "prettyplease"
version = "0.2.17"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d3928fb5db768cb86f891ff014f0144589297e3c6a1aba6ed7cecfdace270c7"
checksum = "5ac2cf0f2e4f42b49f5ffd07dae8d746508ef7526c13940e5f524012ae6c6550"
dependencies = [
"proc-macro2",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -2177,9 +2178,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.79"
version = "1.0.81"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e835ff2298f5721608eb1a980ecaee1aef2c132bf95ecc026a11b7bf3c01c02e"
checksum = "3d1597b0c024618f09a9c3b8655b7e430397a36d23fdafec26d6965e9eec3eba"
dependencies = [
"unicode-ident",
]
@ -2211,7 +2212,7 @@ dependencies = [
"prost",
"prost-types",
"regex",
"syn 2.0.58",
"syn 2.0.60",
"tempfile",
]
@ -2225,7 +2226,7 @@ dependencies = [
"itertools 0.12.1",
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -2263,7 +2264,7 @@ checksum = "9e2e25ee72f5b24d773cae88422baddefff7714f97aab68d96fe2b6fc4a28fb2"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -2494,9 +2495,9 @@ dependencies = [
[[package]]
name = "rustix"
version = "0.38.32"
version = "0.38.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65e04861e65f21776e67888bfbea442b3642beaa0138fdb1dd7a84a52dffdb89"
checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f"
dependencies = [
"bitflags 2.5.0",
"errno",
@ -2507,9 +2508,9 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.21.10"
version = "0.21.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba"
checksum = "7fecbfb7b1444f477b345853b1fce097a2c6fb637b2bfb87e6bc5db0f043fae4"
dependencies = [
"log",
"ring",
@ -2592,7 +2593,7 @@ checksum = "1db149f81d46d2deba7cd3c50772474707729550221e69588478ebf9ada425ae"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -2636,9 +2637,9 @@ checksum = "92d43fe69e652f3df9bdc2b85b2854a0825b86e4fb76bc44d945137d053639ca"
[[package]]
name = "serde"
version = "1.0.197"
version = "1.0.198"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2"
checksum = "9846a40c979031340571da2545a4e5b7c4163bdae79b301d5f86d03979451fcc"
dependencies = [
"serde_derive",
]
@ -2654,20 +2655,20 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.197"
version = "1.0.198"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b"
checksum = "e88edab869b01783ba905e7d0153f9fc1a6505a96e4ad3018011eedb838566d9"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
name = "serde_json"
version = "1.0.115"
version = "1.0.116"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12dc5c46daa8e9fdf4f5e71b6cf9a53f2487da0e86e55808e2d35539666497dd"
checksum = "3e17db7126d17feb94eb3fad46bf1a96b034e8aacbc2e775fe81505f8b0b2813"
dependencies = [
"itoa",
"ryu",
@ -2705,7 +2706,7 @@ dependencies = [
"darling",
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -2882,7 +2883,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustc_version",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -3067,7 +3068,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -3190,7 +3191,7 @@ checksum = "07fd7858fc4ff8fb0e34090e41d7eb06a823e1057945c26d480bfc21d2338a93"
dependencies = [
"quote",
"spl-discriminator-syn",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -3202,7 +3203,7 @@ dependencies = [
"proc-macro2",
"quote",
"sha2 0.10.8",
"syn 2.0.58",
"syn 2.0.60",
"thiserror",
]
@ -3250,7 +3251,7 @@ dependencies = [
"proc-macro2",
"quote",
"sha2 0.10.8",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -3387,9 +3388,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.58"
version = "2.0.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44cfb93f38070beee36b3fef7d4f5a16f27751d94b187b666a5cc5e9b0d30687"
checksum = "909518bc7b1c9b779f1bbf07f2929d35af9f0f37e47c6e9ef7f9dddc1e1821f3"
dependencies = [
"proc-macro2",
"quote",
@ -3446,22 +3447,22 @@ dependencies = [
[[package]]
name = "thiserror"
version = "1.0.58"
version = "1.0.59"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03468839009160513471e86a034bb2c5c0e4baae3b43f79ffc55c4a5427b3297"
checksum = "f0126ad08bff79f29fc3ae6a55cc72352056dfff61e3ff8bb7129476d44b23aa"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.58"
version = "1.0.59"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7"
checksum = "d1cd413b5d558b4c5bf3680e324a6fa5014e7b7c067a51e69dbdf47eb7148b66"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -3543,7 +3544,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -3660,7 +3661,7 @@ dependencies = [
"proc-macro2",
"prost-build",
"quote",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -3727,7 +3728,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -3904,7 +3905,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.60",
"wasm-bindgen-shared",
]
@ -3938,7 +3939,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.60",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@ -4002,7 +4003,7 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
dependencies = [
"windows-targets 0.52.4",
"windows-targets 0.52.5",
]
[[package]]
@ -4020,7 +4021,7 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
dependencies = [
"windows-targets 0.52.4",
"windows-targets 0.52.5",
]
[[package]]
@ -4040,17 +4041,18 @@ dependencies = [
[[package]]
name = "windows-targets"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7dd37b7e5ab9018759f893a1952c9420d060016fc19a472b4bb20d1bdd694d1b"
checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb"
dependencies = [
"windows_aarch64_gnullvm 0.52.4",
"windows_aarch64_msvc 0.52.4",
"windows_i686_gnu 0.52.4",
"windows_i686_msvc 0.52.4",
"windows_x86_64_gnu 0.52.4",
"windows_x86_64_gnullvm 0.52.4",
"windows_x86_64_msvc 0.52.4",
"windows_aarch64_gnullvm 0.52.5",
"windows_aarch64_msvc 0.52.5",
"windows_i686_gnu 0.52.5",
"windows_i686_gnullvm",
"windows_i686_msvc 0.52.5",
"windows_x86_64_gnu 0.52.5",
"windows_x86_64_gnullvm 0.52.5",
"windows_x86_64_msvc 0.52.5",
]
[[package]]
@ -4061,9 +4063,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bcf46cf4c365c6f2d1cc93ce535f2c8b244591df96ceee75d8e83deb70a9cac9"
checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263"
[[package]]
name = "windows_aarch64_msvc"
@ -4073,9 +4075,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da9f259dd3bcf6990b55bffd094c4f7235817ba4ceebde8e6d11cd0c5633b675"
checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6"
[[package]]
name = "windows_i686_gnu"
@ -4085,9 +4087,15 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e"
[[package]]
name = "windows_i686_gnu"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b474d8268f99e0995f25b9f095bc7434632601028cf86590aea5c8a5cb7801d3"
checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670"
[[package]]
name = "windows_i686_gnullvm"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9"
[[package]]
name = "windows_i686_msvc"
@ -4097,9 +4105,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406"
[[package]]
name = "windows_i686_msvc"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1515e9a29e5bed743cb4415a9ecf5dfca648ce85ee42e15873c3cd8610ff8e02"
checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf"
[[package]]
name = "windows_x86_64_gnu"
@ -4109,9 +4117,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5eee091590e89cc02ad514ffe3ead9eb6b660aedca2183455434b93546371a03"
checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9"
[[package]]
name = "windows_x86_64_gnullvm"
@ -4121,9 +4129,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77ca79f2451b49fa9e2af39f0747fe999fcda4f5e241b2898624dca97a1f2177"
checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596"
[[package]]
name = "windows_x86_64_msvc"
@ -4133,9 +4141,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.4"
version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8"
checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0"
[[package]]
name = "winnow"
@ -4203,7 +4211,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]
@ -4223,7 +4231,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
"syn 2.0.60",
]
[[package]]

View File

@ -94,6 +94,7 @@ pub async fn main() {
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
exit_notify,
false,
);
let mut message_channel =
spawn_broadcast_channel_plug(tokio::sync::broadcast::channel(8), message_channel);

View File

@ -143,14 +143,17 @@ pub async fn main() {
let green_stream = create_geyser_reconnecting_stream(
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
false,
);
let blue_stream = create_geyser_reconnecting_stream(
blue_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
false,
);
let toxiproxy_stream = create_geyser_reconnecting_stream(
toxiproxy_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
false,
);
let multiplex_stream = create_multiplexed_stream(
vec![green_stream, blue_stream, toxiproxy_stream],
@ -164,14 +167,17 @@ pub async fn main() {
let green_stream = create_geyser_reconnecting_stream(
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
false,
);
let blue_stream = create_geyser_reconnecting_stream(
blue_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
false,
);
let toxiproxy_stream = create_geyser_reconnecting_stream(
toxiproxy_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
false,
);
let multiplex_stream = create_multiplexed_stream(
vec![green_stream, blue_stream, toxiproxy_stream],

View File

@ -133,18 +133,21 @@ pub async fn main() {
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
autoconnect_tx.clone(),
exit_notify.resubscribe(),
false,
);
let _blue_stream_ah = create_geyser_autoconnection_task_with_mpsc(
blue_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
autoconnect_tx.clone(),
exit_notify.resubscribe(),
false,
);
let _toxiproxy_stream_ah = create_geyser_autoconnection_task_with_mpsc(
toxiproxy_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
autoconnect_tx.clone(),
exit_notify,
false,
);
start_example_blockmeta_consumer(blockmeta_rx);

View File

@ -96,11 +96,13 @@ pub async fn main() {
let green_stream = create_geyser_reconnecting_stream(
config.clone(),
GeyserFilter(CommitmentConfig::processed()).accounts(),
false,
);
let blue_stream = create_geyser_reconnecting_stream(
config.clone(),
GeyserFilter(CommitmentConfig::processed()).blocks_and_txs(),
false,
);
tokio::spawn(async move {

View File

@ -22,6 +22,7 @@ enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
pub fn create_geyser_reconnecting_stream(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
compressed: bool,
) -> impl Stream<Item = Message> {
let mut state = ConnectionState::NotConnected(1);
@ -47,14 +48,15 @@ pub fn create_geyser_reconnecting_stream(
async move {
let connect_result = connect_with_timeout_with_buffers(
addr,
token,
config,
connect_timeout,
request_timeout,
GeyserGrpcClientBufferConfig::optimize_for_subscription(&subscribe_filter),
)
.await;
addr,
token,
config,
connect_timeout,
request_timeout,
GeyserGrpcClientBufferConfig::optimize_for_subscription(&subscribe_filter),
compressed,
)
.await;
let mut client = connect_result?;

View File

@ -39,6 +39,7 @@ pub fn create_geyser_autoconnection_task(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
exit_notify: broadcast::Receiver<()>,
compressed: bool,
) -> (JoinHandle<()>, Receiver<Message>) {
let (sender, receiver_channel) = tokio::sync::mpsc::channel::<Message>(1);
@ -47,6 +48,7 @@ pub fn create_geyser_autoconnection_task(
subscribe_filter,
sender,
exit_notify,
compressed
);
(join_handle, receiver_channel)
@ -60,6 +62,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
subscribe_filter: SubscribeRequest,
mpsc_downstream: tokio::sync::mpsc::Sender<Message>,
mut exit_notify: broadcast::Receiver<()>,
compressed: bool,
) -> JoinHandle<()> {
// read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/
@ -131,14 +134,16 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
}
};
let fut_connector = connect_with_timeout_with_buffers(
addr,
token,
config,
connect_timeout,
request_timeout,
buffer_config,
);
let fut_connector =
connect_with_timeout_with_buffers(
addr,
token,
config,
connect_timeout,
request_timeout,
buffer_config,
compressed,
);
match await_or_exit(fut_connector, exit_notify.recv()).await {
MaybeExit::Continue(connection_result) => connection_handler(connection_result),

View File

@ -55,6 +55,7 @@ pub async fn connect_with_timeout_with_buffers<E, T>(
connect_timeout: Option<Duration>,
request_timeout: Option<Duration>,
buffer_config: GeyserGrpcClientBufferConfig,
compression: bool,
) -> GeyserGrpcClientResult<GeyserGrpcClient<impl Interceptor>>
where
E: Into<Bytes>,
@ -85,12 +86,15 @@ where
let interceptor = InterceptorXToken { x_token };
let channel = endpoint.connect_lazy();
let mut geyser_client = GeyserClient::with_interceptor(channel.clone(), interceptor.clone())
.max_decoding_message_size(GeyserGrpcClient::max_decoding_message_size());
if compression {
geyser_client = geyser_client.accept_compressed(CompressionEncoding::Gzip).send_compressed(CompressionEncoding::Gzip);
}
let client = GeyserGrpcClient::new(
HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
GeyserClient::with_interceptor(channel, interceptor)
.max_decoding_message_size(GeyserGrpcClient::max_decoding_message_size())
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip),
HealthClient::with_interceptor(channel, interceptor),
geyser_client
);
Ok(client)
}