Integrates groovies stuff

This commit is contained in:
mango-dee 2024-11-25 12:33:56 +08:00
commit 214886a27d
22 changed files with 2607 additions and 193 deletions

326
Cargo.lock generated
View File

@ -114,6 +114,55 @@ dependencies = [
"libc",
]
[[package]]
name = "anstream"
version = "0.6.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"is_terminal_polyfill",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9"
[[package]]
name = "anstyle-parse"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c"
dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2109dbce0e72be3ec00bed26e6a7479ca384ad226efdd66db8fa2e3a38c83125"
dependencies = [
"anstyle",
"windows-sys 0.59.0",
]
[[package]]
name = "anyhow"
version = "1.0.93"
@ -263,9 +312,9 @@ checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
[[package]]
name = "async-compression"
version = "0.4.17"
version = "0.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0cb8f1d480b0ea3783ab015936d2a55c87e219676f0c0b7dec61494043f21857"
checksum = "df895a515f70646414f4b45c0b79082783b80552b373a68283012928df56f522"
dependencies = [
"brotli",
"flate2",
@ -294,7 +343,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -305,7 +354,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -361,7 +410,7 @@ dependencies = [
"pin-project-lite",
"rustversion",
"serde",
"sync_wrapper 1.0.1",
"sync_wrapper 1.0.2",
"tower 0.5.1",
"tower-layer",
"tower-service",
@ -382,7 +431,7 @@ dependencies = [
"mime",
"pin-project-lite",
"rustversion",
"sync_wrapper 1.0.1",
"sync_wrapper 1.0.2",
"tower-layer",
"tower-service",
]
@ -526,7 +575,7 @@ dependencies = [
"proc-macro-crate 3.2.0",
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -614,7 +663,7 @@ checksum = "bcfcc3cd946cb52f0bbfdbbcfa2f4e24f75ebb6c0e1002f7c25904fada18b9ec"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -677,6 +726,52 @@ dependencies = [
"inout",
]
[[package]]
name = "clap"
version = "4.5.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb3b4b9e5a7c7514dfa52869339ee98b3156b0bfb4e8a77c4ff4babb64b1604f"
dependencies = [
"clap_builder",
"clap_derive",
]
[[package]]
name = "clap_builder"
version = "4.5.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b17a95aa67cc7b5ebd32aa5370189aa0d79069ef1c64ce893bd30fb24bff20ec"
dependencies = [
"anstream",
"anstyle",
"clap_lex",
"strsim",
]
[[package]]
name = "clap_derive"
version = "4.5.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ac6a0c7b1a9e9a5186361f67dfa1b88213572f427fb9ab038efb2bd8c582dab"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.89",
]
[[package]]
name = "clap_lex"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afb84c814227b90d6895e01398aee0d8033c00e7466aca416fb6a8e0eb19d8a7"
[[package]]
name = "colorchoice"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
[[package]]
name = "combine"
version = "3.8.1"
@ -732,6 +827,16 @@ dependencies = [
"libc",
]
[[package]]
name = "core-foundation"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b55271e5c8c478ad3f38ad24ef34923091e0548492a266d19b3c0b4d82574c63"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.7"
@ -740,9 +845,9 @@ checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "cpufeatures"
version = "0.2.15"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ca741a962e1b0bff6d724a1a0958b686406e853bb14061f218562e1896f95e6"
checksum = "16b80225097f2e5ae4e7179dd2266824648f3e2f49d9134d584b76389d31c4c3"
dependencies = [
"libc",
]
@ -862,7 +967,7 @@ dependencies = [
"proc-macro2",
"quote",
"strsim",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -873,7 +978,20 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806"
dependencies = [
"darling_core",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
name = "dashmap"
version = "5.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
dependencies = [
"cfg-if",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
@ -903,7 +1021,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustc_version",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -934,7 +1052,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -1010,7 +1128,7 @@ checksum = "a1ab991c1362ac86c61ab6f556cff143daa22e5a15e4e189df818b2fd19fe65b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -1141,7 +1259,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -1229,15 +1347,22 @@ dependencies = [
"async-stream",
"base64 0.21.7",
"bincode",
"clap",
"csv",
"dashmap",
"derive_more",
"futures",
"itertools 0.10.5",
"log",
"lz4_flex",
"merge-streams",
"regex",
"solana-account-decoder",
"solana-logger",
"solana-sdk",
"tokio",
"tonic",
"tonic-health",
"tracing",
"tracing-subscriber",
"url",
@ -1326,9 +1451,15 @@ dependencies = [
[[package]]
name = "hashbrown"
version = "0.15.1"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a9bfc1af68b1726ea47d3d5109de126281def866b33970e10fbab11b5dafab3"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
[[package]]
name = "hashbrown"
version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289"
[[package]]
name = "heck"
@ -1684,7 +1815,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -1731,7 +1862,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da"
dependencies = [
"equivalent",
"hashbrown 0.15.1",
"hashbrown 0.15.2",
]
[[package]]
@ -1749,6 +1880,12 @@ version = "2.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708"
[[package]]
name = "is_terminal_polyfill"
version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
[[package]]
name = "itertools"
version = "0.10.5"
@ -1778,9 +1915,9 @@ dependencies = [
[[package]]
name = "itoa"
version = "1.0.11"
version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
checksum = "540654e97a3f4470a492cd30ff187bc95d89557a903a2bbf112e2fae98104ef2"
[[package]]
name = "jobserver"
@ -1877,9 +2014,9 @@ checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89"
[[package]]
name = "litemap"
version = "0.7.3"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "643cb0b8d4fcc284004d5fd0d67ccf61dfffadb7f75e1e71bc420f4688a3a704"
checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104"
[[package]]
name = "lock_api"
@ -1897,6 +2034,15 @@ version = "0.4.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "lz4_flex"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5"
dependencies = [
"twox-hash",
]
[[package]]
name = "matchit"
version = "0.7.3"
@ -2045,7 +2191,7 @@ checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -2107,7 +2253,7 @@ dependencies = [
"proc-macro-crate 3.2.0",
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -2223,7 +2369,7 @@ checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -2278,7 +2424,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033"
dependencies = [
"proc-macro2",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -2301,9 +2447,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.89"
version = "1.0.92"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e"
checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0"
dependencies = [
"unicode-ident",
]
@ -2335,7 +2481,7 @@ dependencies = [
"prost",
"prost-types",
"regex",
"syn 2.0.87",
"syn 2.0.89",
"tempfile",
]
@ -2349,7 +2495,7 @@ dependencies = [
"itertools 0.13.0",
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -2597,9 +2743,9 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.23.17"
version = "0.23.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f1a745511c54ba6d4465e8d5dfbd81b45791756de28d4981af70d6dca128f1e"
checksum = "9c9cc1d47e243d655ace55ed38201c19ae02c148ae56412ab8750e8f0166ab7f"
dependencies = [
"log",
"once_cell",
@ -2612,12 +2758,11 @@ dependencies = [
[[package]]
name = "rustls-native-certs"
version = "0.8.0"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a"
checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3"
dependencies = [
"openssl-probe",
"rustls-pemfile 2.2.0",
"rustls-pki-types",
"schannel",
"security-framework",
@ -2712,7 +2857,7 @@ checksum = "1db149f81d46d2deba7cd3c50772474707729550221e69588478ebf9ada425ae"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -2727,12 +2872,12 @@ dependencies = [
[[package]]
name = "security-framework"
version = "2.11.1"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02"
checksum = "e1415a607e92bec364ea2cf9264646dcce0f91e6d65281bd6f2819cca3bf39c8"
dependencies = [
"bitflags 2.6.0",
"core-foundation",
"core-foundation 0.10.0",
"core-foundation-sys",
"libc",
"security-framework-sys",
@ -2780,7 +2925,7 @@ checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -2826,7 +2971,7 @@ dependencies = [
"darling",
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -2991,9 +3136,9 @@ dependencies = [
[[package]]
name = "solana-logger"
version = "2.1.1"
version = "2.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a77702845aef1509b822b9334c9538a2401dfd28570f01ba0e1da8da99c0886d"
checksum = "2b82e7089918666300a7399a1193b53a823f907f85a99f54a75b250f1996c196"
dependencies = [
"env_logger",
"lazy_static",
@ -3159,7 +3304,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -3311,7 +3456,7 @@ checksum = "d9e8418ea6269dcfb01c712f0444d2c75542c04448b480e87de59d2865edc750"
dependencies = [
"quote",
"spl-discriminator-syn",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -3323,7 +3468,7 @@ dependencies = [
"proc-macro2",
"quote",
"sha2 0.10.8",
"syn 2.0.87",
"syn 2.0.89",
"thiserror",
]
@ -3372,7 +3517,7 @@ dependencies = [
"proc-macro2",
"quote",
"sha2 0.10.8",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -3490,6 +3635,12 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "strsim"
version = "0.11.1"
@ -3515,9 +3666,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.87"
version = "2.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d"
checksum = "44d46482f1c1c87acd84dea20c1bf5ebff4c757009ed6bf19cfd36fb10e92c4e"
dependencies = [
"proc-macro2",
"quote",
@ -3532,9 +3683,9 @@ checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "sync_wrapper"
version = "1.0.1"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394"
checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263"
[[package]]
name = "synstructure"
@ -3544,7 +3695,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -3554,7 +3705,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7"
dependencies = [
"bitflags 1.3.2",
"core-foundation",
"core-foundation 0.9.4",
"system-configuration-sys",
]
@ -3607,7 +3758,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -3669,7 +3820,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -3688,7 +3839,7 @@ version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4"
dependencies = [
"rustls 0.23.17",
"rustls 0.23.18",
"rustls-pki-types",
"tokio",
]
@ -3754,6 +3905,7 @@ dependencies = [
"axum",
"base64 0.22.1",
"bytes",
"flate2",
"h2 0.4.7",
"http 1.1.0",
"http-body 1.0.1",
@ -3787,7 +3939,7 @@ dependencies = [
"prost-build",
"prost-types",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -3868,7 +4020,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -3912,6 +4064,16 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "twox-hash"
version = "1.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
dependencies = [
"cfg-if",
"static_assertions",
]
[[package]]
name = "typenum"
version = "1.17.0"
@ -3920,9 +4082,9 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
[[package]]
name = "unicode-ident"
version = "1.0.13"
version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe"
checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83"
[[package]]
name = "universal-hash"
@ -3961,9 +4123,9 @@ dependencies = [
[[package]]
name = "url"
version = "2.5.3"
version = "2.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d157f1b96d14500ffdc1f10ba712e780825526c03d9a49b4d0324b0d9113ada"
checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60"
dependencies = [
"form_urlencoded",
"idna",
@ -3982,6 +4144,12 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
[[package]]
name = "utf8parse"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "valuable"
version = "0.1.0"
@ -4043,7 +4211,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
"wasm-bindgen-shared",
]
@ -4077,7 +4245,7 @@ checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@ -4354,9 +4522,9 @@ dependencies = [
[[package]]
name = "yoke"
version = "0.7.4"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c5b1314b079b0930c31e3af543d8ee1757b1951ae1e1565ec704403a7240ca5"
checksum = "120e6aef9aa629e3d4f52dc8cc43a015c7724194c97dfaf45180d2daf2b77f40"
dependencies = [
"serde",
"stable_deref_trait",
@ -4366,13 +4534,13 @@ dependencies = [
[[package]]
name = "yoke-derive"
version = "0.7.4"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95"
checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
"synstructure",
]
@ -4394,27 +4562,27 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
name = "zerofrom"
version = "0.1.4"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91ec111ce797d0e0784a1116d0ddcdbea84322cd79e5d5ad173daeba4f93ab55"
checksum = "cff3ee08c995dee1859d998dea82f7374f2826091dd9cd47def953cae446cd2e"
dependencies = [
"zerofrom-derive",
]
[[package]]
name = "zerofrom-derive"
version = "0.1.4"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5"
checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
"synstructure",
]
@ -4435,7 +4603,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]
@ -4457,7 +4625,7 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"syn 2.0.89",
]
[[package]]

View File

@ -12,13 +12,12 @@ repository = "https://github.com/blockworks-foundation/geyser-grpc-connector"
yellowstone-grpc-client = { version = "2.0.0", git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v2.0.0+solana.2.0.16" }
yellowstone-grpc-proto = { version = "2.0.0", git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v2.0.0+solana.2.0.16" }
# required for CommitmentConfig
solana-sdk = "2.0.16"
url = "2.5.0"
async-stream = "0.3.5"
tokio = { version = "1.28" , features = ["rt"] }
tokio = { version = "1.28" , features = ["rt", "rt-multi-thread"] }
futures = "0.3.28"
merge-streams = "0.1.2"
anyhow = "1.0.70"
@ -32,10 +31,20 @@ bincode = "1.3.3"
csv = "1.3.0"
#[dev-dependencies]
dashmap = "5.5.3"
tonic = { version= "0.12.3", features=["gzip"] }
tonic-health = "0.12.3"
regex = "1.10.4"
clap = { version = "4.2", features = ["derive"] }
lz4_flex = "0.11.3"
[dev-dependencies]
tracing-subscriber = "0.3.16"
solana-logger = "2.0.16"
solana-account-decoder = "2.0.16"
[patch.crates-io.curve25519-dalek]
git = "https://github.com/anza-xyz/curve25519-dalek.git"
rev = "b500cdc2a920cd5bff9e2dd974d7b97349d61464"
rev = "b500cdc2a920cd5bff9e2dd974d7b97349d61464"

0
accounts-testnet.csv Normal file
View File

View File

@ -0,0 +1,40 @@
use itertools::Itertools;
use std::fs::File;
use std::io;
use std::io::BufRead;
use std::path::PathBuf;
pub fn main() {
let accounts_meta_file =
PathBuf::from("/Users/stefan/mango/projects/geyser-misc/ledger-debug-accounts.txt");
let file = File::open(accounts_meta_file).expect("file must exist");
let reader = io::BufReader::new(file);
for blocks in &reader.lines().chunks(9) {
let blocks = blocks.collect_vec();
let account_pk = blocks[0].as_ref().unwrap().replace(":", "");
if account_pk == "" {
break;
}
let owner_pk = blocks[2].as_ref().unwrap();
let ltick = owner_pk.find("'");
let rtick = owner_pk.rfind("'");
let owner_pk = &owner_pk[ltick.unwrap() + 1..rtick.unwrap()];
let data_len = blocks[6].as_ref().unwrap().replace(" data_len: ", "");
println!("{};{};{}", account_pk, owner_pk, data_len);
}
}
/*
16FMCmgLzCNNz6eTwGanbyN2ZxvTBSLuQ6DZhgeMshg:
balance: 0.00095352 SOL
owner: 'Feature111111111111111111111111111111111111'
executable: false
slot: 0
rent_epoch: 0
data_len: 9
data: 'AQAAAAAAAAAA'
encoding: "base64"
*/

View File

@ -0,0 +1,483 @@
use futures::{Stream, StreamExt};
use itertools::Itertools;
use log::{debug, info};
use solana_account_decoder::parse_token::spl_token_ids;
use solana_sdk::clock::{Slot, UnixTimestamp};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::hash::{hash, Hash};
use solana_sdk::pubkey::Pubkey;
use std::cmp::min;
use std::collections::{HashMap, VecDeque};
use std::pin::pin;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use std::{env, iter};
use tokio::sync::mpsc::Receiver;
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};
use geyser_grpc_connector::{
histogram_percentiles, AtomicSlot, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig,
Message,
};
use tokio::time::{sleep, Duration};
use tracing::{trace, warn};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{
SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocksMeta,
SubscribeRequestFilterSlots, SubscribeUpdate,
};
use yellowstone_grpc_proto::prost::Message as _;
mod debouncer;
#[tokio::main]
pub async fn main() {
// RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace
tracing_subscriber::fmt::init();
// console_subscriber::init();
let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green");
let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok();
info!(
"Using grpc source on {} ({})",
grpc_addr_green,
grpc_x_token_green.is_some()
);
let timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(25),
request_timeout: Duration::from_secs(25),
subscribe_timeout: Duration::from_secs(25),
receive_timeout: Duration::from_secs(25),
};
let config = GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
info!("Write Block stream..");
let (autoconnect_tx, geyser_messages_rx) = tokio::sync::mpsc::channel(10);
let (_exit, exit_notify) = tokio::sync::broadcast::channel(1);
// let _accounts_task = create_geyser_autoconnection_task_with_mpsc(
// config.clone(),
// GeyserFilter(CommitmentConfig::processed()).accounts(),
// autoconnect_tx.clone(),
// exit_notify.resubscribe(),
// );
//
// let _blocksmeta_task = create_geyser_autoconnection_task_with_mpsc(
// config.clone(),
// GeyserFilter(CommitmentConfig::processed()).blocks_meta(),
// autoconnect_tx.clone(),
// exit_notify.resubscribe(),
// );
let _all_accounts = create_geyser_autoconnection_task_with_mpsc(
config.clone(),
all_accounts(),
autoconnect_tx.clone(),
exit_notify.resubscribe(),
);
// let _token_accounts_task = create_geyser_autoconnection_task_with_mpsc(
// config.clone(),
// token_accounts(),
// autoconnect_tx.clone(),
// exit_notify.resubscribe(),
// );
let current_processed_slot = AtomicSlot::default();
start_tracking_slots(current_processed_slot.clone());
start_tracking_account_consumer(geyser_messages_rx, current_processed_slot.clone());
// "infinite" sleep
sleep(Duration::from_secs(1800)).await;
}
// note processed might return a slot that night end up on a fork
fn start_tracking_slots(current_processed_slot: AtomicSlot) {
let grpc_slot_source1 = env::var("GRPC_SLOT1_ADDR").expect("need grpc url for slot source1");
let grpc_x_token_source1 = env::var("GRPC_SLOT1_X_TOKEN").ok();
let grpc_slot_source2 = env::var("GRPC_SLOT2_ADDR").expect("need grpc url for slot source2");
let grpc_x_token_source2 = env::var("GRPC_SLOT2_X_TOKEN").ok();
info!(
"Using grpc sources for slot: {}, {}",
grpc_slot_source1, grpc_slot_source2
);
let timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};
let config1 = GrpcSourceConfig::new(
grpc_slot_source1,
grpc_x_token_source1,
None,
timeouts.clone(),
);
let config2 = GrpcSourceConfig::new(
grpc_slot_source2,
grpc_x_token_source2,
None,
timeouts.clone(),
);
tokio::spawn(async move {
debug!("start tracking slots..");
let (multiplex_tx, mut multiplex_rx) = tokio::sync::mpsc::channel(10);
// TODO expose
let (_exit, exit_notify) = tokio::sync::broadcast::channel(1);
let _blocksmeta_task1 = create_geyser_autoconnection_task_with_mpsc(
config1.clone(),
GeyserFilter(CommitmentConfig::processed()).slots(),
multiplex_tx.clone(),
exit_notify.resubscribe(),
);
let _blocksmeta_task2 = create_geyser_autoconnection_task_with_mpsc(
config2.clone(),
GeyserFilter(CommitmentConfig::processed()).slots(),
multiplex_tx.clone(),
exit_notify.resubscribe(),
);
let mut tip: Slot = 0;
loop {
match multiplex_rx.recv().await {
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
Some(UpdateOneof::Slot(update)) => {
let slot = update.slot;
current_processed_slot.store(slot, Ordering::Relaxed);
// don't do that with the mock impl as the slots restart when mock restarts
// if slot > tip {
// tip = slot;
// current_processed_slot.store(slot, Ordering::Relaxed);
// }
}
None => {}
_ => {}
},
None => {
log::warn!("multiplexer channel closed - aborting");
return;
}
Some(Message::Connecting(_)) => {}
}
}
});
}
// note: this keeps track of lot of data and might blow up memory
fn start_tracking_account_consumer(
mut geyser_messages_rx: Receiver<Message>,
current_processed_slot: Arc<AtomicU64>,
) {
tokio::spawn(async move {
let mut bytes_per_slot = HashMap::<Slot, usize>::new();
let mut updates_per_slot = HashMap::<Slot, usize>::new();
let mut wallclock_updates_per_slot_account =
HashMap::<(Slot, Pubkey), Vec<SystemTime>>::new();
// slot written by account update
let mut current_slot: Slot = 0;
let mut account_hashes = HashMap::<Pubkey, Vec<Hash>>::new();
// seconds since epoch
let mut block_time_per_slot = HashMap::<Slot, UnixTimestamp>::new();
// wall clock time of block completion (i.e. processed) reported by the block meta stream
let mut block_completion_notification_time_per_slot = HashMap::<Slot, SystemTime>::new();
let debouncer = debouncer::Debouncer::new(Duration::from_millis(50));
// Phoenix 4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg
// CzK26LWpoU9UjSrZkVu97oZj63abJrNv1zp9Hy2zZdy5
// 6ojSigXF7nDPyhFRgmn3V9ywhYseKF9J32ZrranMGVSX
// FV8EEHjJvDUD8Kkp1DcomTatZBA81Z6C5AhmvyUwvEAh
// choose an account for which the diff should be calculated
let selected_account_pk =
Pubkey::from_str("4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg").unwrap();
let mut last_account_data: Option<Vec<u8>> = None;
loop {
match geyser_messages_rx.recv().await {
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
Some(UpdateOneof::Account(update)) => {
let started_at = Instant::now();
let now = SystemTime::now();
let account_info = update.account.unwrap();
let account_pk = Pubkey::try_from(account_info.pubkey).unwrap();
let account_owner_pk = Pubkey::try_from(account_info.owner).unwrap();
// note: slot is referencing the block that is just built while the slot number reported from BlockMeta/Slot uses the slot after the block is built
let slot = update.slot;
let account_receive_time = get_epoch_sec();
if account_info.data.len() > 100000 {
let hash = hash(&account_info.data);
// info!("got account update!!! {} - {:?} - {} bytes - {} - {}lamps",
// slot, account_pk, account_info.data.len(), hash, account_info.lamports);
account_hashes
.entry(account_pk)
.and_modify(|entry| entry.push(hash))
.or_insert(vec![hash]);
}
// if account_hashes.len() > 100 {
// for (pubkey, hashes) in &account_hashes {
// info!("account hashes for {:?}", pubkey);
// for hash in hashes {
// info!("- hash: {}", hash);
// }
// }
// }
if account_pk == selected_account_pk {
info!(
"got account update!!! {} - {:?} - {} bytes - {}",
slot,
account_pk,
account_info.data.len(),
account_info.lamports
);
if let Some(prev_data) = last_account_data {
let hash1 = hash(&prev_data);
let hash2 = hash(&account_info.data);
info!("diff: {} {}", hash1, hash2);
delta_compress(&prev_data, &account_info.data);
}
last_account_data = Some(account_info.data.clone());
}
bytes_per_slot
.entry(slot)
.and_modify(|entry| *entry += account_info.data.len())
.or_insert(account_info.data.len());
updates_per_slot
.entry(slot)
.and_modify(|entry| *entry += 1)
.or_insert(1);
wallclock_updates_per_slot_account
.entry((slot, account_pk))
.and_modify(|entry| entry.push(now))
.or_insert(vec![now]);
if current_slot != slot && current_slot != 0 {
info!("New Slot: {}", slot);
info!(
"Slot: {} - account data transferred: {:.2} MiB",
slot,
*bytes_per_slot.get(&current_slot).unwrap() as f64
/ 1024.0
/ 1024.0
);
info!(
"Slot: {} - num of update messages: {}",
slot,
updates_per_slot.get(&current_slot).unwrap()
);
let per_account_updates = wallclock_updates_per_slot_account
.iter()
.filter(|((slot, _pubkey), _)| slot == &current_slot)
.map(|((_slot, _pubkey), updates)| updates.len() as f64)
.sorted_by(|a, b| a.partial_cmp(b).unwrap())
.collect_vec();
let per_account_updates_histogram =
histogram_percentiles::calculate_percentiles(&per_account_updates);
info!(
"Per-account updates histogram: {}",
per_account_updates_histogram
);
if let Some(actual_block_time) = block_time_per_slot.get(&current_slot)
{
info!(
"Block time for slot {}: delta {} seconds",
current_slot,
account_receive_time - *actual_block_time
);
}
let wallclock_minmax = wallclock_updates_per_slot_account
.iter()
.filter(|((slot, _pubkey), _)| slot == &current_slot)
.flat_map(|((_slot, _pubkey), updates)| updates)
.minmax();
if let Some((min, max)) = wallclock_minmax.into_option() {
info!("Wallclock timestamp between first and last account update received for slot {}: {:.2}s",
current_slot,
max.duration_since(*min).unwrap().as_secs_f64()
);
}
} // -- slot changed
current_slot = slot;
let latest_slot = current_processed_slot.load(Ordering::Relaxed);
if latest_slot != 0 {
// the perfect is value "-1"
let delta = (latest_slot as i64) - (slot as i64);
if debouncer.can_fire() {
let is_lagging = delta > -1;
let is_lagging_a_lot = delta - 20 > -1;
let info_text = if is_lagging {
if is_lagging_a_lot {
"A LOT"
} else {
"a bit"
}
} else {
"good"
};
// Account info for upcoming slot {} was {} behind current processed slot
debug!(
"Account update slot {}, delta: {} - {}",
slot, delta, info_text
);
}
}
}
None => {}
_ => {}
},
None => {
log::warn!("multiplexer channel closed - aborting");
return;
}
Some(Message::Connecting(_)) => {}
}
}
});
}
fn delta_compress(prev_data: &Vec<u8>, data: &Vec<u8>) {
let xor_region = min(prev_data.len(), data.len());
let mut xor_diff = vec![0u8; xor_region];
let mut equal = 0;
for i in 0..xor_region {
xor_diff[i] = prev_data[i] ^ data[i];
equal |= xor_diff[i];
}
if equal == 0 && prev_data.len() == data.len() {
info!("no difference in data");
return;
}
let count_non_zero = xor_diff.iter().filter(|&x| *x != 0).count();
info!(
"count_non_zero={} xor_region={}",
count_non_zero, xor_region
);
// info!("hex {:02X?}", xor_data);
let compressed_xor = lz4_flex::compress_prepend_size(&xor_diff);
info!(
"compressed size of xor: {} (was {})",
compressed_xor.len(),
xor_diff.len()
);
let compressed_data = lz4_flex::compress_prepend_size(&data);
info!(
"compressed size of data: {} (was {})",
compressed_data.len(),
data.len()
);
}
fn get_epoch_sec() -> UnixTimestamp {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as UnixTimestamp
}
pub fn token_accounts() -> SubscribeRequest {
let mut accounts_subs = HashMap::new();
accounts_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec![],
// vec!["4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg".to_string()],
owner: spl_token_ids()
.iter()
.map(|pubkey| pubkey.to_string())
.collect(),
filters: vec![],
},
);
SubscribeRequest {
accounts: accounts_subs,
..Default::default()
}
}
pub fn all_accounts_and_blocksmeta() -> SubscribeRequest {
let mut accounts_subs = HashMap::new();
accounts_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec![],
owner: vec![],
filters: vec![],
},
);
let mut slots_subs = HashMap::new();
slots_subs.insert(
"client".to_string(),
SubscribeRequestFilterSlots {
filter_by_commitment: Some(true),
},
);
let mut blocks_meta_subs = HashMap::new();
blocks_meta_subs.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {});
SubscribeRequest {
slots: slots_subs,
accounts: accounts_subs,
blocks_meta: blocks_meta_subs,
..Default::default()
}
}
pub fn all_accounts() -> SubscribeRequest {
let mut accounts_subs = HashMap::new();
accounts_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec![],
owner: vec![],
filters: vec![],
},
);
SubscribeRequest {
accounts: accounts_subs,
..Default::default()
}
}

34
examples/debouncer.rs Normal file
View File

@ -0,0 +1,34 @@
use std::sync::atomic::{AtomicI64, Ordering};
use std::time::{Duration, Instant};
#[derive(Debug)]
pub struct Debouncer {
started_at: Instant,
cooldown_ms: i64,
last: AtomicI64,
}
impl Debouncer {
pub fn new(cooldown: Duration) -> Self {
Self {
started_at: Instant::now(),
cooldown_ms: cooldown.as_millis() as i64,
last: AtomicI64::new(0),
}
}
pub fn can_fire(&self) -> bool {
let passed_total_ms = self.started_at.elapsed().as_millis() as i64;
let results = self
.last
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |last| {
if passed_total_ms - last > self.cooldown_ms {
Some(passed_total_ms)
} else {
None
}
});
results.is_ok()
}
}

View File

@ -17,6 +17,7 @@ use std::time::SystemTime;
use base64::Engine;
use csv::ReaderBuilder;
use itertools::Itertools;
/// This file mocks the core model of the RPC server.
use solana_sdk::compute_budget;
use solana_sdk::compute_budget::ComputeBudgetInstruction;
@ -28,9 +29,12 @@ use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature;
use solana_sdk::transaction::TransactionError;
use tokio::sync::broadcast;
use tokio::sync::mpsc::Receiver;
use yellowstone_grpc_proto::geyser::{
SubscribeRequest, SubscribeRequestFilterSlots, SubscribeUpdateSlot,
SubscribeRequest, SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions,
SubscribeUpdateSlot,
};
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
@ -189,12 +193,15 @@ pub async fn main() {
let green_config =
GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
let (_, exit_notify) = broadcast::channel(1);
// mix of (all) slots and processed accounts
let (autoconnect_tx, slots_accounts_rx) = tokio::sync::mpsc::channel(10);
let _green_stream_ah = create_geyser_autoconnection_task_with_mpsc(
green_config.clone(),
all_slots_and_processed_accounts_together(),
autoconnect_tx.clone(),
exit_notify.resubscribe(),
);
let (only_processed_accounts_tx, only_processed_accounts_rx) = tokio::sync::mpsc::channel(10);
@ -202,6 +209,7 @@ pub async fn main() {
green_config.clone(),
accounts_at_level(CommitmentLevel::Processed),
only_processed_accounts_tx.clone(),
exit_notify.resubscribe(),
);
let (only_confirmed_accounts_tx, only_confirmed_accounts_rx) = tokio::sync::mpsc::channel(10);
@ -209,6 +217,7 @@ pub async fn main() {
green_config.clone(),
accounts_at_level(CommitmentLevel::Confirmed),
only_confirmed_accounts_tx.clone(),
exit_notify.resubscribe(),
);
let (only_finalized_accounts_tx, only_finalized_accounts_rx) = tokio::sync::mpsc::channel(10);
@ -216,6 +225,7 @@ pub async fn main() {
green_config.clone(),
accounts_at_level(CommitmentLevel::Finalized),
only_finalized_accounts_tx.clone(),
exit_notify.resubscribe(),
);
start_all_slots_and_processed_accounts_consumer(slots_accounts_rx);

View File

@ -0,0 +1,112 @@
use log::{info, warn};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use std::collections::HashMap;
use std::env;
use std::str::FromStr;
use std::time::SystemTime;
use base64::Engine;
use csv::ReaderBuilder;
use itertools::Itertools;
use solana_sdk::borsh0_10::try_from_slice_unchecked;
/// This file mocks the core model of the RPC server.
use solana_sdk::compute_budget;
use solana_sdk::compute_budget::ComputeBudgetInstruction;
use solana_sdk::hash::Hash;
use solana_sdk::instruction::CompiledInstruction;
use solana_sdk::message::v0::MessageAddressTableLookup;
use solana_sdk::message::{v0, MessageHeader, VersionedMessage};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature;
use solana_sdk::transaction::TransactionError;
use tokio::sync::broadcast;
use tokio::sync::mpsc::Receiver;
use yellowstone_grpc_proto::geyser::{
SubscribeRequest, SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions,
SubscribeUpdateSlot,
};
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
use geyser_grpc_connector::{
map_commitment_level, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message,
};
use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterAccounts;
#[tokio::main]
pub async fn main() {
tracing_subscriber::fmt::init();
let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green");
let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok();
let (_foo, exit_notify) = broadcast::channel(1);
info!(
"Using gRPC source {} ({})",
grpc_addr_green,
grpc_x_token_green.is_some()
);
let timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};
let green_config =
GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
let (autoconnect_tx, mut transactions_rx) = tokio::sync::mpsc::channel(10);
let _tx_source_ah = create_geyser_autoconnection_task_with_mpsc(
green_config.clone(),
jupyter_trades(),
autoconnect_tx.clone(),
exit_notify,
);
loop {
let message = transactions_rx.recv().await;
match message {
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
Some(UpdateOneof::Transaction(update)) => {
let tx = update.transaction.unwrap();
let sig = Signature::try_from(tx.signature.as_slice()).unwrap();
info!("tx {}", sig);
}
_ => {} // FIXME
},
_ => {} // FIXME
}
}
}
fn jupyter_trades() -> SubscribeRequest {
let mut transaction_subs = HashMap::new();
transaction_subs.insert(
"client".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false),
failed: Some(false),
signature: None,
account_include: vec!["JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4".to_string()],
account_exclude: vec![],
account_required: vec![],
},
);
SubscribeRequest {
transactions: transaction_subs,
ping: None,
commitment: Some(map_commitment_level(CommitmentConfig {
commitment: CommitmentLevel::Confirmed,
}) as i32),
..Default::default()
}
}

View File

@ -0,0 +1,160 @@
use clap::Parser;
use regex::Regex;
use std::collections::HashMap;
use std::fs::File;
use std::io;
use std::io::BufRead;
use std::path::{Path, PathBuf};
use std::time::Duration;
pub fn parse_log_entry_subscriber(log_entry: &str) -> (u64, u64) {
let re = Regex::new(r".*got account update: write_version=(?P<write_version>\d+);timestamp_us=(?P<timestamp_us>\d+);slot=(?P<slot>\d+)").unwrap();
let caps = re.captures(log_entry).unwrap();
// let mut result = HashMap::new();
// result.insert("write_version".to_string(), caps["write_version"].to_string());
// result.insert("timestamp_us".to_string(), caps["timestamp_us"].to_string());
// result.insert("slot".to_string(), caps["slot"].to_string());
let write_version: u64 = caps["write_version"].parse().unwrap();
let timestamp_us: u64 = caps["timestamp_us"].parse().unwrap();
(write_version, timestamp_us)
}
pub fn parse_log_entry_source(log_entry: &str) -> (u64, u64) {
let re = Regex::new(r".*account update: write_version=(?P<write_version>\d+);timestamp_us=(?P<timestamp_us>\d+);slot=(?P<slot>\d+)").unwrap();
let caps = re.captures(log_entry).unwrap();
// let mut result = HashMap::new();
// result.insert("write_version".to_string(), caps["write_version"].to_string());
// result.insert("timestamp_us".to_string(), caps["timestamp_us"].to_string());
// result.insert("slot".to_string(), caps["slot"].to_string());
let write_version: u64 = caps["write_version"].parse().unwrap();
let timestamp_us: u64 = caps["timestamp_us"].parse().unwrap();
(write_version, timestamp_us)
}
fn read_subscriber_log(log_file: PathBuf) -> HashMap<u64, u64> {
let mut map: HashMap<u64, u64> = HashMap::new();
let file = File::open(log_file).expect("file must exist");
let reader = io::BufReader::new(file);
for line in reader.lines().take(1000) {
let line = line.expect("must be parsable");
let (write_version, timestamp_us) = parse_log_entry_subscriber(&line);
// println!("{:?}", parsed);
map.insert(write_version, timestamp_us);
}
map
}
fn read_source_log(log_file: PathBuf) -> HashMap<u64, u64> {
let mut map: HashMap<u64, u64> = HashMap::new();
let file = File::open(log_file).expect("file must exist");
let reader = io::BufReader::new(file);
for line in reader.lines().take(1000) {
let line = line.expect("must be parsable");
let (write_version, timestamp_us) = parse_log_entry_source(&line);
// println!("{:?}", parsed);
map.insert(write_version, timestamp_us);
}
map
}
// cat macbook.log |cut -b 111- | tr -d 'a-z_=' > macbook.log.csv
// cat solana-validator-macbook.log | cut -b 96- | tr -d 'a-z_='
fn read_from_csv(csv_file: PathBuf) -> HashMap<u64, u64> {
csv::ReaderBuilder::new()
.delimiter(b';')
.has_headers(false)
.from_path(csv_file)
.unwrap()
.into_deserialize()
.map(|record| {
let record: Vec<String> = record.unwrap();
let write_version = record[0].parse::<u64>().unwrap();
let timestamp_us = record[1].parse::<u64>().unwrap();
(write_version, timestamp_us)
})
.collect::<HashMap<u64, u64>>()
}
fn read_subscriber_log_csv(csv_file: PathBuf) -> HashMap<u64, u64> {
csv::ReaderBuilder::new()
.delimiter(b';')
.has_headers(false)
.from_path(csv_file)
.unwrap()
.into_deserialize()
.map(|record| {
let record: Vec<String> = record.unwrap();
let write_version = record[0].parse::<u64>().unwrap();
let timestamp_us = record[1].parse::<u64>().unwrap();
(write_version, timestamp_us)
})
.collect::<HashMap<u64, u64>>()
}
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct Args {
#[arg(long)]
pub csv_file_source: String,
#[arg(long)]
pub csv_file_subscriber: String,
}
pub fn main() {
let Args {
csv_file_source,
csv_file_subscriber,
} = Args::parse();
println!("Reading source log ...");
let source_timestamps = read_from_csv(PathBuf::from(csv_file_source));
println!("Reading subscriber log ...");
let subscriber_timestamps = read_from_csv(PathBuf::from(csv_file_subscriber));
for (write_version, timestamp_us) in subscriber_timestamps.into_iter() {
if let Some(source_timestamp) = source_timestamps.get(&write_version) {
let diff = (timestamp_us as i128) - (*source_timestamp as i128);
println!(
"write_version: {}, subscriber: {}, source: {}, diff: {:.1}ms",
write_version,
timestamp_us,
source_timestamp,
diff as f64 / 1000.0
);
}
}
}
pub fn main__() {
println!("Reading subscriber log ...");
let subscriber_timestamps = read_subscriber_log(PathBuf::from(
"/Users/stefan/mango/projects/geyser-misc/accounts-stream-performance/macbook.log",
));
println!("Reading source log ...");
let source_timestamps = read_source_log(PathBuf::from("/Users/stefan/mango/projects/geyser-misc/accounts-stream-performance/solana-validator-macbook.log"));
println!("Comparing ...");
for (write_version, timestamp_us) in subscriber_timestamps.into_iter() {
// println!("write_version: {}, subscriber: {}", write_version, timestamp_us);
if let Some(source_timestamp) = source_timestamps.get(&write_version) {
let diff = (timestamp_us as i128) - (*source_timestamp as i128);
println!(
"write_version: {}, subscriber: {}, source: {}, diff: {}",
write_version, timestamp_us, source_timestamp, diff
);
}
}
}

View File

@ -0,0 +1,180 @@
use clap::Parser;
use regex::Regex;
use std::collections::HashMap;
use std::fs::File;
use std::io;
use std::io::BufRead;
use std::path::PathBuf;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct Args {
#[arg(long)]
pub log_file: String,
}
pub fn main() {
let Args { log_file } = Args::parse();
println!("Reading log file: {}", log_file);
let log_file = PathBuf::from(log_file);
const LIMIT_LINES: usize = 10000000;
let mut timetag_sending_to_buffer: HashMap<u64, u64> = HashMap::new();
let mut timetag_before_sending_grpc: HashMap<u64, u64> = HashMap::new();
// contains only matches from previous sets
let mut timetag_geyser: HashMap<u64, u64> = HashMap::new();
let mut count_sending_to_buffer_channel = 0;
{
let file = File::open(&log_file).expect("file must exist");
let reader = io::BufReader::new(file);
for line in reader.lines().take(LIMIT_LINES) {
let line = line.expect("must be parsable");
// println!("-> buffer channel");
if let Some((write_version, timestamp_us)) =
parse_log_entry_sending_to_buffer_channel(line)
{
count_sending_to_buffer_channel += 1;
timetag_sending_to_buffer.insert(write_version, timestamp_us);
}
}
}
let mut count_sending_grpc = 0;
{
let file = File::open(&log_file).expect("file must exist");
let reader = io::BufReader::new(file);
for line in reader.lines().take(LIMIT_LINES) {
let line = line.expect("must be parsable");
// println!("-> when sending to grpc");
if let Some((write_version, timestamp_us)) = parse_log_entry_before_sending_grpc(line) {
count_sending_grpc += 1;
timetag_before_sending_grpc.insert(write_version, timestamp_us);
}
}
}
// THIS is by far the largest set
let mut count_at_geyser = 0;
{
let file = File::open(&log_file).expect("file must exist");
let reader = io::BufReader::new(file);
for line in reader.lines().take(LIMIT_LINES) {
let line = line.expect("must be parsable");
// println!("-> at geyser interface");
if let Some((write_version, timestamp_us)) = parse_log_entry_at_geyser_interface(line) {
count_at_geyser += 1;
if timetag_sending_to_buffer.contains_key(&write_version)
&& timetag_before_sending_grpc.contains_key(&write_version)
{
timetag_geyser.insert(write_version, timestamp_us);
}
}
}
}
println!("Count at geyser interface: {}", count_at_geyser);
println!(
"Count sending to buffer channel: {}",
count_sending_to_buffer_channel
);
println!("Count sending to grpc: {}", count_sending_grpc);
for (write_version, geyser_timestamp_us) in timetag_geyser {
let timestamp_sending_to_buffer = timetag_sending_to_buffer.get(&write_version).unwrap();
let timestamp_before_sending_grpc =
timetag_before_sending_grpc.get(&write_version).unwrap();
let delta1 = timestamp_sending_to_buffer - geyser_timestamp_us;
let delta2 = timestamp_before_sending_grpc - timestamp_sending_to_buffer;
println!(
"Write Version: {}, geyser - {}us - buffer - {}us - grpc",
write_version, delta1, delta2
);
}
}
fn parse_log_entry_at_geyser_interface(log_line: String) -> Option<(u64, u64)> {
if !log_line.contains("account update inspect from geyser") {
return None;
}
// Split the log line by ': ' to separate the prefix from the data
let parts: Vec<&str> = log_line.split(": ").collect();
// The second part contains the data we need
let data = parts[1];
// Split the data by ';' to separate the different fields
let fields: Vec<&str> = data.split(';').collect();
// For each field, split by '=' to separate the key from the value
let write_version: u64 = fields[0].split('=').collect::<Vec<&str>>()[1]
.parse()
.unwrap();
let timestamp_us: u64 = fields[1].split('=').collect::<Vec<&str>>()[1]
.parse()
.unwrap();
let _slot: u64 = fields[2].split('=').collect::<Vec<&str>>()[1]
.parse()
.unwrap();
Some((write_version, timestamp_us))
}
fn parse_log_entry_sending_to_buffer_channel(log_line: String) -> Option<(u64, u64)> {
if !log_line.contains("sending to buffer channel") {
return None;
}
// Split the log line by ': ' to separate the prefix from the data
let parts: Vec<&str> = log_line.split(": ").collect();
// The second part contains the data we need
let data = parts[1];
// Split the data by ';' to separate the different fields
let fields: Vec<&str> = data.split(';').collect();
// For each field, split by '=' to separate the key from the value
let write_version: u64 = fields[0].split('=').collect::<Vec<&str>>()[1]
.parse()
.unwrap();
let timestamp_us: u64 = fields[1].split('=').collect::<Vec<&str>>()[1]
.parse()
.unwrap();
let _slot: u64 = fields[2].split('=').collect::<Vec<&str>>()[1]
.parse()
.unwrap();
Some((write_version, timestamp_us))
}
fn parse_log_entry_before_sending_grpc(log_line: String) -> Option<(u64, u64)> {
if !log_line.contains("before sending to grpc") {
return None;
}
// Split the log line by ': ' to separate the prefix from the data
let parts: Vec<&str> = log_line.split(": ").collect();
// The third part contains the data we need
let data = parts[1];
// Split the data by ';' to separate the different fields
let fields: Vec<&str> = data.split(';').collect();
// For each field, split by '=' to separate the key from the value
let write_version: u64 = fields[0].split('=').collect::<Vec<&str>>()[1]
.parse()
.unwrap();
let timestamp_us: u64 = fields[1].split('=').collect::<Vec<&str>>()[1]
.parse()
.unwrap();
let _slot: u64 = fields[2].split('=').collect::<Vec<&str>>()[1]
.parse()
.unwrap();
Some((write_version, timestamp_us))
}

View File

@ -2,6 +2,7 @@ use log::info;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::env;
use tokio::sync::broadcast;
use geyser_grpc_connector::channel_plugger::spawn_broadcast_channel_plug;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task;
@ -87,9 +88,12 @@ pub async fn main() {
info!("Write Block stream..");
let (_, exit_notify) = broadcast::channel(1);
let (jh_geyser_task, message_channel) = create_geyser_autoconnection_task(
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
exit_notify,
);
let mut message_channel =
spawn_broadcast_channel_plug(tokio::sync::broadcast::channel(8), message_channel);

View File

@ -7,7 +7,6 @@ use std::pin::pin;
use base64::Engine;
use itertools::Itertools;
use solana_sdk::borsh0_10::try_from_slice_unchecked;
use solana_sdk::compute_budget::ComputeBudgetInstruction;
use solana_sdk::hash::Hash;
use solana_sdk::instruction::CompiledInstruction;
@ -30,6 +29,8 @@ use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
pub mod debouncer;
fn start_example_block_consumer(
multiplex_stream: impl Stream<Item = ProducedBlock> + Send + 'static,
) {

View File

@ -1,9 +1,7 @@
use futures::Stream;
use log::{info, warn};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::env;
use std::pin::pin;
use base64::Engine;
use itertools::Itertools;
@ -22,12 +20,8 @@ use solana_sdk::transaction::TransactionError;
use tokio::sync::mpsc::Receiver;
use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{
create_geyser_autoconnection_task, create_geyser_autoconnection_task_with_mpsc,
};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
@ -124,6 +118,7 @@ pub async fn main() {
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};
let (_, exit_notify) = tokio::sync::broadcast::channel(1);
let green_config =
GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
@ -137,16 +132,19 @@ pub async fn main() {
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
autoconnect_tx.clone(),
exit_notify.resubscribe(),
);
let _blue_stream_ah = create_geyser_autoconnection_task_with_mpsc(
blue_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
autoconnect_tx.clone(),
exit_notify.resubscribe(),
);
let _toxiproxy_stream_ah = create_geyser_autoconnection_task_with_mpsc(
toxiproxy_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
autoconnect_tx.clone(),
exit_notify,
);
start_example_blockmeta_consumer(blockmeta_rx);

View File

@ -85,10 +85,10 @@ pub async fn main() {
);
let timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
connect_timeout: Duration::from_secs(25),
request_timeout: Duration::from_secs(25),
subscribe_timeout: Duration::from_secs(25),
receive_timeout: Duration::from_secs(25),
};
let config = GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());

View File

@ -0,0 +1,337 @@
use dashmap::DashMap;
use futures::{Stream, StreamExt};
use log::{debug, info, trace};
use solana_account_decoder::parse_token::UiAccountState::Initialized;
use solana_account_decoder::parse_token::{
parse_token, spl_token_ids, TokenAccountType, UiTokenAccount,
};
use solana_sdk::clock::{Clock, Slot};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::sysvar::clock;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::env;
use std::pin::pin;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
use geyser_grpc_connector::{
map_commitment_level, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message,
};
use tokio::time::{sleep, Duration};
use tracing::field::debug;
use tracing::warn;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{
SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots, SubscribeUpdate,
};
use yellowstone_grpc_proto::prost::Message as _;
const ENABLE_TIMESTAMP_TAGGING: bool = false;
#[tokio::main]
pub async fn main() {
// RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace
tracing_subscriber::fmt::init();
// console_subscriber::init();
let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green");
let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok();
info!(
"Using grpc source on {} ({})",
grpc_addr_green,
grpc_x_token_green.is_some()
);
let timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(25),
request_timeout: Duration::from_secs(25),
subscribe_timeout: Duration::from_secs(25),
receive_timeout: Duration::from_secs(25),
};
let config = GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
info!("Write Block stream..");
let (exit_signal, exit_notify) = tokio::sync::broadcast::channel(1);
let (autoconnect_tx, mut accounts_rx) = tokio::sync::mpsc::channel(1000);
let _jh_green = create_geyser_autoconnection_task_with_mpsc(
config.clone(),
token_accounts(),
autoconnect_tx.clone(),
exit_signal.subscribe(),
);
let _jh_blue = create_geyser_autoconnection_task_with_mpsc(
config.clone(),
token_accounts_finalized(),
autoconnect_tx.clone(),
exit_signal.subscribe(),
);
// owner x mint -> amount
let token_account_by_ownermint: Arc<DashMap<Pubkey, DashMap<Pubkey, UiTokenAccount>>> =
Arc::new(DashMap::with_capacity(10000));
let token_account_by_ownermint_read = token_account_by_ownermint.clone();
let token_account_by_ownermint = token_account_by_ownermint.clone();
tokio::spawn(async move {
let mut bytes_per_slot: HashMap<Slot, u64> = HashMap::new();
let mut updates_per_slot: HashMap<Slot, u64> = HashMap::new();
let mut changing_slot = 0;
let mut current_slot = 0;
let mut account_write_first_timestamp: HashMap<u64, Instant> = HashMap::new();
while let Some(message) = accounts_rx.recv().await {
match message {
Message::GeyserSubscribeUpdate(subscriber_update) => {
match subscriber_update.update_oneof {
Some(UpdateOneof::Slot(update)) => {
current_slot = update.slot;
}
Some(UpdateOneof::Account(update)) => {
let slot = update.slot as Slot;
let account = update.account.unwrap();
let account_pk = Pubkey::try_from(account.pubkey).unwrap();
let size = account.data.len() as u64;
info!(
"got account update: {} - {:?} - {} bytes",
update.slot,
account_pk,
account.data.len()
);
if clock::id() == account_pk {
let clock: Clock = bincode::deserialize(&account.data).unwrap();
info!("clock: {:#?}", clock);
}
info!("got account write: {}", account.write_version);
match account_write_first_timestamp.entry(account.write_version) {
Entry::Occupied(o) => {
let first_timestamp = o.get();
info!("got second account update for same write version with delta of {:?}", first_timestamp.elapsed());
}
Entry::Vacant(v) => {
v.insert(Instant::now());
}
}
trace!(
"got account update: {} - {:?} - {} bytes",
update.slot,
account_pk,
account.data.len()
);
if ENABLE_TIMESTAMP_TAGGING {
let since_the_epoch = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.expect("Time went backwards");
info!(
"got account update: write_version={};timestamp_us={};slot={}",
account.write_version,
since_the_epoch.as_micros(),
update.slot
);
}
match parse_token(&account.data, Some(6)) {
Ok(TokenAccountType::Account(account_ui)) => {
// UiTokenAccount {
// mint: "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v",
// owner: "9un5wqE3q4oCjyrDkwsdD48KteCJitQX5978Vh7KKxHo",
// token_amount: UiTokenAmount {
// ui_amount: Some(8229995.070667),
// decimals: 6, amount: "8229995070667",
// ui_amount_string: "8229995.070667"
// },
// delegate: None,
// state: Initialized,
// is_native: false,
// rent_exempt_reserve: None,
// delegated_amount: None,
// close_authority: None,
// extensions: []
// }
// all different states are covered
// is_native: both true+false are sent
assert_eq!(account.executable, false);
assert_eq!(account.rent_epoch, u64::MAX);
let owner = Pubkey::from_str(&account_ui.owner).unwrap();
let mint = Pubkey::from_str(&account_ui.mint).unwrap();
// 6 decimals as requested
let amount = &account_ui.token_amount.amount;
// groovie wallet
if account_ui.owner.starts_with("66fEFnKy") {
info!(
"update balance for mint {} of owner {}: {}",
mint, owner, amount
);
}
// if pubkey.starts_with(b"JUP") {
// info!("update balance for mint {} of owner {}: {}", mint, owner, amount);
// }
token_account_by_ownermint
.entry(owner)
.or_insert_with(DashMap::new)
.insert(mint, account_ui);
bytes_per_slot
.entry(slot)
.and_modify(|total| *total += size)
.or_insert(size);
updates_per_slot
.entry(slot)
.and_modify(|total| *total += 1)
.or_insert(1);
let delta = (slot as i64) - (current_slot as i64);
if delta > 1 {
debug!("delta: {}", (slot as i64) - (current_slot as i64));
}
if slot != changing_slot && changing_slot != 0 {
let total_bytes =
bytes_per_slot.get(&changing_slot).unwrap();
let updates_count =
updates_per_slot.get(&changing_slot).unwrap();
info!(
"Slot {} - Total bytes: {}, {} updates",
slot, total_bytes, updates_count
);
}
changing_slot = slot;
}
Ok(TokenAccountType::Mint(mint)) => {
// not interesting
}
Ok(TokenAccountType::Multisig(_)) => {}
Err(parse_error) => {
trace!(
"Could not parse account {} - {}",
account_pk,
parse_error
);
}
}
let bytes: [u8; 32] = account_pk.to_bytes();
}
_ => {}
}
}
Message::Connecting(attempt) => {
warn!("Connection attempt: {}", attempt);
}
}
}
warn!("Stream aborted");
});
tokio::spawn(async move {
loop {
let mut total = 0;
for accounts_by_mint in token_account_by_ownermint_read.iter() {
for token_account_mint in accounts_by_mint.iter() {
total += 1;
let (owner, mint, account) = (
accounts_by_mint.key(),
token_account_mint.key(),
token_account_mint.value(),
);
// debug!("{} - {} - {}", owner, mint, account.token_amount.ui_amount_string);
}
}
info!("Total owner x mint entries in cache map: {}", total);
sleep(Duration::from_millis(1500)).await;
}
});
// "infinite" sleep
sleep(Duration::from_secs(1800)).await;
}
pub fn token_accounts() -> SubscribeRequest {
let mut accounts_subs = HashMap::new();
accounts_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
// account: vec!["4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg".to_string()],
account: vec![clock::id().to_string()],
owner: vec![],
// spl_token_ids().iter().map(|pubkey| pubkey.to_string()).collect(),
filters: vec![],
},
);
let mut slots_subs = HashMap::new();
slots_subs.insert(
"client".to_string(),
SubscribeRequestFilterSlots {
filter_by_commitment: Some(true),
},
);
SubscribeRequest {
slots: slots_subs,
accounts: accounts_subs,
transactions: HashMap::new(),
entry: Default::default(),
blocks: Default::default(),
blocks_meta: HashMap::new(),
commitment: Some(map_commitment_level(CommitmentConfig::processed()).into()),
accounts_data_slice: Default::default(),
ping: None,
}
}
// find out if fialiized makes a difference wrt accounts
pub fn token_accounts_finalized() -> SubscribeRequest {
let mut accounts_subs = HashMap::new();
accounts_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec!["4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg".to_string()],
owner: spl_token_ids()
.iter()
.map(|pubkey| pubkey.to_string())
.collect(),
filters: vec![],
},
);
let mut slots_subs = HashMap::new();
slots_subs.insert(
"client".to_string(),
SubscribeRequestFilterSlots {
filter_by_commitment: Some(true),
},
);
SubscribeRequest {
slots: slots_subs,
accounts: accounts_subs,
transactions: HashMap::new(),
entry: Default::default(),
blocks: Default::default(),
blocks_meta: HashMap::new(),
commitment: Some(map_commitment_level(CommitmentConfig::confirmed()).into()),
accounts_data_slice: Default::default(),
ping: None,
}
}

View File

@ -0,0 +1,157 @@
use dashmap::DashMap;
use futures::{Stream, StreamExt};
use itertools::Itertools;
use log::{debug, info, trace};
use solana_account_decoder::parse_token::UiAccountState::Initialized;
use solana_account_decoder::parse_token::{
parse_token, spl_token_ids, TokenAccountType, UiTokenAccount,
};
use solana_sdk::clock::{Slot, UnixTimestamp};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::program_utils::limited_deserialize;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::vote::instruction::VoteInstruction;
use solana_sdk::vote::state::Vote;
use std::collections::{HashMap, HashSet};
use std::env;
use std::pin::pin;
use std::str::FromStr;
use std::sync::Arc;
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
use geyser_grpc_connector::histogram_percentiles::calculate_percentiles;
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use tokio::time::{sleep, Duration};
use tracing::field::debug;
use tracing::warn;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{
SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots,
SubscribeRequestFilterTransactions, SubscribeUpdate,
};
use yellowstone_grpc_proto::prost::Message as _;
const ENABLE_TIMESTAMP_TAGGING: bool = false;
#[tokio::main]
pub async fn main() {
// RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace
tracing_subscriber::fmt::init();
// console_subscriber::init();
let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green");
let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok();
info!(
"Using grpc source on {} ({})",
grpc_addr_green,
grpc_x_token_green.is_some()
);
let timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(25),
request_timeout: Duration::from_secs(25),
subscribe_timeout: Duration::from_secs(25),
receive_timeout: Duration::from_secs(25),
};
let config = GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
info!("Write Block stream..");
let green_stream = create_geyser_reconnecting_stream(config.clone(), transaction_filter());
tokio::spawn(async move {
let mut vote_times_by_slot: HashMap<Slot, HashSet<UnixTimestamp>> = HashMap::new();
let mut green_stream = pin!(green_stream);
while let Some(message) = green_stream.next().await {
match message {
Message::GeyserSubscribeUpdate(subscriber_update) => {
match subscriber_update.update_oneof {
Some(UpdateOneof::Transaction(update)) => {
let message = update
.transaction
.unwrap()
.transaction
.unwrap()
.message
.unwrap();
let slot = update.slot;
// https://docs.solanalabs.com/implemented-proposals/validator-timestamp-oracle
for ci in message.instructions {
let vote_instruction =
limited_deserialize::<VoteInstruction>(&ci.data).unwrap();
let last_voted_slot = vote_instruction.last_voted_slot().unwrap();
info!(
"vote_instruction: {:?}",
vote_instruction.timestamp().unwrap()
);
vote_times_by_slot
.entry(last_voted_slot)
.or_insert(HashSet::new())
.insert(vote_instruction.timestamp().unwrap());
}
// hack to look at reasonable settled slot
// print_spread(&vote_times_by_slot, slot);
if vote_times_by_slot.contains_key(&(slot - 10)) {
print_spread(&vote_times_by_slot, slot - 10);
}
}
_ => {}
}
}
Message::Connecting(attempt) => {
warn!("Connection attempt: {}", attempt);
}
}
}
warn!("Stream aborted");
});
// "infinite" sleep
sleep(Duration::from_secs(1800)).await;
}
fn print_spread(vote_times_by_slot: &HashMap<Slot, HashSet<UnixTimestamp>>, slot: Slot) {
let slots = vote_times_by_slot.get(&slot).unwrap();
let min_slot = slots.iter().min().unwrap();
let array = slots
.iter()
.sorted()
.map(|x| (*x - min_slot) as f64)
.collect_vec();
let histo = calculate_percentiles(&array);
info!("slot: {} histo: {}", slot, histo);
}
pub fn transaction_filter() -> SubscribeRequest {
let mut trnasactions_subs = HashMap::new();
trnasactions_subs.insert(
"client".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(true),
failed: Some(false),
signature: None,
// TODO
account_include: vec![],
account_exclude: vec![],
account_required: vec![],
},
);
SubscribeRequest {
slots: HashMap::new(),
accounts: HashMap::new(),
transactions: trnasactions_subs,
entry: Default::default(),
blocks: Default::default(),
blocks_meta: HashMap::new(),
commitment: None,
accounts_data_slice: Default::default(),
ping: None,
}
}

View File

@ -0,0 +1,208 @@
//
// ```
// ssh eclipse-rpc -Nv
// ```
//
use futures::{Stream, StreamExt};
use itertools::Itertools;
use log::{debug, info};
use solana_account_decoder::parse_token::spl_token_ids;
use solana_sdk::clock::{Slot, UnixTimestamp};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::hash::{hash, Hash};
use solana_sdk::pubkey::Pubkey;
use std::cmp::min;
use std::collections::{HashMap, VecDeque};
use std::pin::pin;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use std::{env, iter};
use tokio::sync::mpsc::Receiver;
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use tokio::time::{sleep, Duration};
use tracing::{trace, warn};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{
SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocksMeta,
SubscribeRequestFilterSlots, SubscribeUpdate,
};
use yellowstone_grpc_proto::prost::Message as _;
type AtomicSlot = Arc<AtomicU64>;
#[tokio::main]
pub async fn main() {
// RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace
tracing_subscriber::fmt::init();
// console_subscriber::init();
let grpc_addr = env::var("GRPC_ADDR").expect("need grpc url");
let grpc_x_token = env::var("GRPC_X_TOKEN").ok();
info!(
"Using grpc source on {} ({})",
grpc_addr,
grpc_x_token.is_some()
);
let timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(25),
request_timeout: Duration::from_secs(25),
subscribe_timeout: Duration::from_secs(25),
receive_timeout: Duration::from_secs(25),
};
let config = GrpcSourceConfig::new(grpc_addr, grpc_x_token, None, timeouts.clone());
let (autoconnect_tx, geyser_messages_rx) = tokio::sync::mpsc::channel(10);
let (_exit_tx, exit_rx) = tokio::sync::broadcast::channel::<()>(1);
let _all_accounts = create_geyser_autoconnection_task_with_mpsc(
config.clone(),
all_accounts(),
autoconnect_tx.clone(),
exit_rx.resubscribe(),
);
let current_processed_slot = AtomicSlot::default();
start_tracking_account_consumer(geyser_messages_rx, current_processed_slot.clone());
// "infinite" sleep
sleep(Duration::from_secs(1800)).await;
}
// note: this keeps track of lot of data and might blow up memory
fn start_tracking_account_consumer(
mut geyser_messages_rx: Receiver<Message>,
current_processed_slot: Arc<AtomicU64>,
) {
tokio::spawn(async move {
loop {
match geyser_messages_rx.recv().await {
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
Some(UpdateOneof::Account(update)) => {
let started_at = Instant::now();
let now = SystemTime::now();
let account_info = update.account.unwrap();
let account_pk = Pubkey::try_from(account_info.pubkey).unwrap();
let account_owner_pk = Pubkey::try_from(account_info.owner).unwrap();
// note: slot is referencing the block that is just built while the slot number reported from BlockMeta/Slot uses the slot after the block is built
let slot = update.slot;
let account_receive_time = get_epoch_sec();
info!(
"Account update: slot: {}, account_pk: {}, account_owner_pk: {}, account_receive_time: {}",
slot, account_pk, account_owner_pk, account_receive_time
);
}
None => {}
_ => {}
},
None => {
log::warn!("multiplexer channel closed - aborting");
return;
}
Some(Message::Connecting(_)) => {}
}
}
});
}
fn get_epoch_sec() -> UnixTimestamp {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as UnixTimestamp
}
pub fn token_accounts() -> SubscribeRequest {
let mut accounts_subs = HashMap::new();
accounts_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec![],
// vec!["4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg".to_string()],
owner: spl_token_ids()
.iter()
.map(|pubkey| pubkey.to_string())
.collect(),
filters: vec![],
},
);
SubscribeRequest {
accounts: accounts_subs,
..Default::default()
}
}
pub fn all_accounts_and_blocksmeta() -> SubscribeRequest {
let mut accounts_subs = HashMap::new();
accounts_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec![],
owner: vec![],
filters: vec![],
},
);
let mut slots_subs = HashMap::new();
slots_subs.insert(
"client".to_string(),
SubscribeRequestFilterSlots {
filter_by_commitment: Some(true),
},
);
let mut blocks_meta_subs = HashMap::new();
blocks_meta_subs.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {});
SubscribeRequest {
slots: slots_subs,
accounts: accounts_subs,
blocks_meta: blocks_meta_subs,
..Default::default()
}
}
pub fn all_accounts() -> SubscribeRequest {
let mut accounts_subs = HashMap::new();
accounts_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec![],
owner: vec![],
filters: vec![],
},
);
SubscribeRequest {
accounts: accounts_subs,
..Default::default()
}
}
pub fn slots() -> SubscribeRequest {
let mut slots_subs = HashMap::new();
slots_subs.insert(
"client".to_string(),
SubscribeRequestFilterSlots {
filter_by_commitment: None,
},
);
SubscribeRequest {
slots: slots_subs,
..Default::default()
}
}

View File

@ -1,15 +1,19 @@
use crate::{Attempt, GrpcSourceConfig, Message};
use std::time::Duration;
use async_stream::stream;
use futures::{Stream, StreamExt};
use log::{debug, info, log, trace, warn, Level};
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};
use yellowstone_grpc_client::{GeyserGrpcBuilder, GeyserGrpcClient, GeyserGrpcClientResult};
use yellowstone_grpc_client::GeyserGrpcClientResult;
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
use yellowstone_grpc_proto::tonic::Status;
use crate::yellowstone_grpc_util::{
connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig,
};
use crate::{Attempt, GrpcSourceConfig, Message};
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
NotConnected(Attempt),
Connecting(Attempt, JoinHandle<GeyserGrpcClientResult<S>>),
@ -46,14 +50,17 @@ pub fn create_geyser_reconnecting_stream(
log!(if attempt > 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt, addr);
async move {
let mut builder = GeyserGrpcClient::build_from_shared(addr).unwrap()
.x_token(token).unwrap()
.connect_timeout(connect_timeout.unwrap_or(Duration::from_secs(10)))
.timeout(request_timeout.unwrap_or(Duration::from_secs(10)))
.tls_config(config.unwrap_or(ClientTlsConfig::new())).unwrap();
let mut client = builder.connect().await.unwrap();
let connect_result = connect_with_timeout_with_buffers(
addr,
token,
config,
connect_timeout,
request_timeout,
GeyserGrpcClientBufferConfig::optimize_for_subscription(&subscribe_filter),
)
.await;
let mut client = connect_result.unwrap(); // FIXME how to handle this?
debug!("Subscribe with filter {:?}", subscribe_filter);
let subscribe_result = timeout(subscribe_timeout.unwrap_or(Duration::MAX),
@ -130,9 +137,10 @@ pub fn create_geyser_reconnecting_stream(
#[cfg(test)]
mod tests {
use super::*;
use crate::GrpcConnectionTimeouts;
use super::*;
#[tokio::test]
async fn test_debug_no_secrets() {
let timeout_config = GrpcConnectionTimeouts {
@ -148,7 +156,7 @@ mod tests {
"http://localhost:1234".to_string(),
Some("my-secret".to_string()),
None,
timeout_config
timeout_config,
)
),
"grpc_addr http://localhost:1234"
@ -170,7 +178,7 @@ mod tests {
"http://localhost:1234".to_string(),
Some("my-secret".to_string()),
None,
timeout_config
timeout_config,
)
),
"grpc_addr http://localhost:1234"

View File

@ -1,17 +1,25 @@
use crate::{Attempt, GrpcSourceConfig, Message};
use std::env;
use std::future::Future;
use std::time::Duration;
use futures::{Stream, StreamExt};
use log::{debug, error, info, log, trace, warn, Level};
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc::error::SendTimeoutError;
use tokio::sync::mpsc::Receiver;
use tokio::task::AbortHandle;
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout, Instant};
use yellowstone_grpc_client::{GeyserGrpcBuilderError, GeyserGrpcClient, GeyserGrpcClientError};
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
use yellowstone_grpc_proto::tonic::service::Interceptor;
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
use yellowstone_grpc_proto::tonic::Status;
use crate::yellowstone_grpc_util::{
connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig,
};
use crate::{Attempt, GrpcSourceConfig, Message};
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>, F: Interceptor> {
NotConnected(Attempt),
// connected but not subscribed
@ -22,6 +30,8 @@ enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>, F: Inter
// non-recoverable error
FatalError(Attempt, FatalErrorReason),
WaitReconnect(Attempt),
// exit signal received
GracefulShutdown,
}
enum FatalErrorReason {
@ -34,13 +44,18 @@ enum FatalErrorReason {
pub fn create_geyser_autoconnection_task(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
) -> (AbortHandle, Receiver<Message>) {
exit_notify: broadcast::Receiver<()>,
) -> (JoinHandle<()>, Receiver<Message>) {
let (sender, receiver_channel) = tokio::sync::mpsc::channel::<Message>(1);
let abort_handle =
create_geyser_autoconnection_task_with_mpsc(grpc_source, subscribe_filter, sender);
let join_handle = create_geyser_autoconnection_task_with_mpsc(
grpc_source,
subscribe_filter,
sender,
exit_notify,
);
(abort_handle, receiver_channel)
(join_handle, receiver_channel)
}
/// connect to grpc source performing autoconnect if required,
@ -50,7 +65,8 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
mpsc_downstream: tokio::sync::mpsc::Sender<Message>,
) -> AbortHandle {
mut exit_notify: broadcast::Receiver<()>,
) -> JoinHandle<()> {
// read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/
// task will be aborted when downstream receiver gets dropped
@ -58,7 +74,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
let mut state = ConnectionState::NotConnected(1);
let mut messages_forwarded = 0;
loop {
'main_loop: loop {
state = match state {
ConnectionState::NotConnected(attempt) => {
let addr = grpc_source.grpc_addr.clone();
@ -77,18 +93,11 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
addr
);
let mut builder = GeyserGrpcClient::build_from_shared(addr)
.unwrap()
.x_token(token)
.unwrap()
.connect_timeout(connect_timeout.unwrap_or(Duration::from_secs(10)))
.timeout(request_timeout.unwrap_or(Duration::from_secs(10)))
.tls_config(config.unwrap_or(ClientTlsConfig::new()))
.unwrap();
// let buffer_config = yellowstone_grpc_util::GeyserGrpcClientBufferConfig::optimize_for_subscription(&subscribe_filter);
let buffer_config = buffer_config_from_env();
debug!("Using Grpc Buffer config {:?}", buffer_config);
let connect_result = builder.connect().await;
match connect_result {
let connection_handler = |connect_result| match connect_result {
Ok(client) => ConnectionState::Connecting(attempt, client),
Err(GeyserGrpcBuilderError::MetadataValueError(_)) => {
ConnectionState::FatalError(
@ -116,6 +125,22 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
);
ConnectionState::RecoverableConnectionError(attempt + 1)
}
};
let fut_connector = connect_with_timeout_with_buffers(
addr,
token,
config,
connect_timeout,
request_timeout,
buffer_config,
);
match await_or_exit(fut_connector, exit_notify.recv()).await {
MaybeExit::Continue(connection_result) => {
connection_handler(connection_result)
}
MaybeExit::Exit => ConnectionState::GracefulShutdown,
}
}
ConnectionState::Connecting(attempt, mut client) => {
@ -124,51 +149,58 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
let subscribe_filter = subscribe_filter.clone();
debug!("Subscribe with filter {:?}", subscribe_filter);
let subscribe_result_timeout = timeout(
subscribe_timeout.unwrap_or(Duration::MAX),
client.subscribe_once(subscribe_filter),
)
.await;
match subscribe_result_timeout {
Ok(subscribe_result) => {
match subscribe_result {
Ok(geyser_stream) => {
if attempt > 1 {
debug!(
"subscribed to {} after {} failed attempts",
let subscribe_handler =
|subscribe_result_timeout| match subscribe_result_timeout {
Ok(subscribe_result) => {
match subscribe_result {
Ok(geyser_stream) => {
if attempt > 1 {
debug!(
"subscribed to {} after {} failed attempts",
grpc_source, attempt
);
}
ConnectionState::Ready(geyser_stream)
}
Err(GeyserGrpcClientError::TonicStatus(_)) => {
warn!(
"subscribe failed on {} after {} attempts - retrying",
grpc_source, attempt
);
ConnectionState::RecoverableConnectionError(attempt + 1)
}
// non-recoverable
Err(unrecoverable_error) => {
error!(
"subscribe to {} failed with unrecoverable error: {}",
grpc_source, unrecoverable_error
);
ConnectionState::FatalError(
attempt + 1,
FatalErrorReason::SubscribeError,
)
}
ConnectionState::Ready(geyser_stream)
}
Err(GeyserGrpcClientError::TonicStatus(_)) => {
warn!(
"subscribe failed on {} after {} attempts - retrying",
grpc_source, attempt
);
ConnectionState::RecoverableConnectionError(attempt + 1)
}
// non-recoverable
Err(unrecoverable_error) => {
error!(
"subscribe to {} failed with unrecoverable error: {}",
grpc_source, unrecoverable_error
);
ConnectionState::FatalError(
attempt + 1,
FatalErrorReason::SubscribeError,
)
}
}
Err(_elapsed) => {
warn!(
"subscribe failed with timeout on {} - retrying",
grpc_source
);
ConnectionState::RecoverableConnectionError(attempt + 1)
}
};
let fut_subscribe = timeout(
subscribe_timeout.unwrap_or(Duration::MAX),
client.subscribe_once(subscribe_filter),
);
match await_or_exit(fut_subscribe, exit_notify.recv()).await {
MaybeExit::Continue(subscribe_result_timeout) => {
subscribe_handler(subscribe_result_timeout)
}
Err(_elapsed) => {
warn!(
"subscribe failed with timeout on {} - retrying",
grpc_source
);
ConnectionState::RecoverableConnectionError(attempt + 1)
}
MaybeExit::Exit => ConnectionState::GracefulShutdown,
}
}
ConnectionState::RecoverableConnectionError(attempt) => {
@ -177,12 +209,18 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
"waiting {} seconds, then reconnect to {}",
backoff_secs, grpc_source
);
sleep(Duration::from_secs_f32(backoff_secs)).await;
ConnectionState::NotConnected(attempt)
let fut_sleep = sleep(Duration::from_secs_f32(backoff_secs));
match await_or_exit(fut_sleep, exit_notify.recv()).await {
MaybeExit::Continue(()) => ConnectionState::NotConnected(attempt),
MaybeExit::Exit => ConnectionState::GracefulShutdown,
}
}
ConnectionState::FatalError(_attempt, reason) => match reason {
FatalErrorReason::DownstreamChannelClosed => {
warn!("downstream closed - aborting");
// TODO break 'main_loop instead of returning
return;
}
FatalErrorReason::ConfigurationError => {
@ -204,18 +242,29 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
"waiting {} seconds, then reconnect to {}",
backoff_secs, grpc_source
);
sleep(Duration::from_secs_f32(backoff_secs)).await;
ConnectionState::NotConnected(attempt)
let fut_sleep = sleep(Duration::from_secs_f32(backoff_secs));
match await_or_exit(fut_sleep, exit_notify.recv()).await {
MaybeExit::Continue(()) => ConnectionState::NotConnected(attempt),
MaybeExit::Exit => ConnectionState::GracefulShutdown,
}
}
ConnectionState::Ready(mut geyser_stream) => {
let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout);
'recv_loop: loop {
match timeout(
let fut_stream = timeout(
receive_timeout.unwrap_or(Duration::MAX),
geyser_stream.next(),
)
.await
{
);
let MaybeExit::Continue(geyser_stream_res) =
await_or_exit(fut_stream, exit_notify.recv()).await
else {
break 'recv_loop ConnectionState::GracefulShutdown;
};
match geyser_stream_res {
Ok(Some(Ok(update_message))) => {
trace!("> recv update message from {}", grpc_source);
// note: first send never blocks as the mpsc channel has capacity 1
@ -225,13 +274,19 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
Duration::from_millis(500)
};
let started_at = Instant::now();
match mpsc_downstream
.send_timeout(
Message::GeyserSubscribeUpdate(Box::new(update_message)),
warning_threshold,
)
.await
{
let fut_send = mpsc_downstream.send_timeout(
Message::GeyserSubscribeUpdate(Box::new(update_message)),
warning_threshold,
);
let MaybeExit::Continue(mpsc_downstream_result) =
await_or_exit(fut_send, exit_notify.recv()).await
else {
break 'recv_loop ConnectionState::GracefulShutdown;
};
match mpsc_downstream_result {
Ok(()) => {
messages_forwarded += 1;
if messages_forwarded == 1 {
@ -249,7 +304,15 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
Err(SendTimeoutError::Timeout(the_message)) => {
warn!("downstream receiver did not pick up message for {}ms - keep waiting", warning_threshold.as_millis());
match mpsc_downstream.send(the_message).await {
let fut_send = mpsc_downstream.send(the_message);
let MaybeExit::Continue(mpsc_downstream_result) =
await_or_exit(fut_send, exit_notify.recv()).await
else {
break 'recv_loop ConnectionState::GracefulShutdown;
};
match mpsc_downstream_result {
Ok(()) => {
messages_forwarded += 1;
trace!(
@ -289,21 +352,87 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
warn!("timeout on {} - retrying", grpc_source);
break 'recv_loop ConnectionState::WaitReconnect(1);
}
} // -- END match
}; // -- END match
} // -- END receive loop
}
ConnectionState::GracefulShutdown => {
debug!("shutting down {} gracefully on exit signal", grpc_source);
break 'main_loop;
}
} // -- END match
} // -- endless state loop
} // -- state loop; break ONLY on graceful shutdown
});
jh_geyser_task.abort_handle()
jh_geyser_task
}
fn buffer_config_from_env() -> GeyserGrpcClientBufferConfig {
if env::var("BUFFER_SIZE").is_err()
|| env::var("CONN_WINDOW").is_err()
|| env::var("STREAM_WINDOW").is_err()
{
warn!("BUFFER_SIZE, CONN_WINDOW, STREAM_WINDOW not set; using default buffer config");
return GeyserGrpcClientBufferConfig::default();
}
let buffer_size = env::var("BUFFER_SIZE")
.expect("buffer_size")
.parse::<usize>()
.expect("integer(bytes)");
let conn_window = env::var("CONN_WINDOW")
.expect("conn_window")
.parse::<u32>()
.expect("integer(bytes)");
let stream_window = env::var("STREAM_WINDOW")
.expect("stream_window")
.parse::<u32>()
.expect("integer(bytes)");
// conn_window should be larger than stream_window
GeyserGrpcClientBufferConfig {
buffer_size: Some(buffer_size),
conn_window: Some(conn_window),
stream_window: Some(stream_window),
}
}
enum MaybeExit<T> {
Continue(T),
Exit,
}
async fn await_or_exit<F, E, T>(future: F, exit_notify: E) -> MaybeExit<F::Output>
where
F: Future,
E: Future<Output = Result<T, RecvError>>,
{
tokio::select! {
res = future => {
MaybeExit::Continue(res)
},
res = exit_notify => {
match res {
Ok(_) => {
debug!("exit on signal");
}
Err(RecvError::Lagged(_)) => {
warn!("exit on signal (lag)");
}
Err(RecvError::Closed) => {
warn!("exit on signal (channel close)");
}
}
MaybeExit::Exit
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::GrpcConnectionTimeouts;
use super::*;
#[tokio::test]
async fn test_debug_no_secrets() {
let timeout_config = GrpcConnectionTimeouts {
@ -319,7 +448,7 @@ mod tests {
"http://localhost:1234".to_string(),
Some("my-secret".to_string()),
None,
timeout_config
timeout_config,
)
),
"grpc_addr http://localhost:1234"
@ -341,7 +470,7 @@ mod tests {
"http://localhost:1234".to_string(),
Some("my-secret".to_string()),
None,
timeout_config
timeout_config,
)
),
"grpc_addr http://localhost:1234"

View File

@ -0,0 +1,244 @@
use itertools::Itertools;
use std::fmt::Display;
use std::iter::zip;
// #[derive(Clone, Copy, Debug, Default)]
pub struct Point {
pub priority: f64,
pub value: f64,
}
impl From<(f64, f64)> for Point {
fn from((priority, cu_consumed): (f64, f64)) -> Self {
Point {
priority,
value: cu_consumed,
}
}
}
// #[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct HistValue {
pub percentile: f32,
pub value: f64,
}
/// `quantile` function is the same as the median if q=50, the same as the minimum if q=0 and the same as the maximum if q=100.
pub fn calculate_percentiles(input: &[f64]) -> Percentiles {
if input.is_empty() {
// note: percentile for empty array is undefined
return Percentiles {
v: vec![],
p: vec![],
};
}
let is_monotonic = input.windows(2).all(|w| w[0] <= w[1]);
assert!(is_monotonic, "array of values must be sorted");
let p_step = 5;
let i_percentiles = (0..=100).step_by(p_step).collect_vec();
let mut bucket_values = Vec::with_capacity(i_percentiles.len());
let mut percentiles = Vec::with_capacity(i_percentiles.len());
for p in i_percentiles {
let value = {
let index = input.len() * p / 100;
let cap_index = index.min(input.len() - 1);
input[cap_index]
};
bucket_values.push(value);
percentiles.push(p as f32 / 100.0);
}
Percentiles {
v: bucket_values,
p: percentiles,
}
}
pub fn calculate_cummulative(values: &[Point]) -> PercentilesCummulative {
if values.is_empty() {
// note: percentile for empty array is undefined
return PercentilesCummulative {
bucket_values: vec![],
percentiles: vec![],
};
}
let is_monotonic = values.windows(2).all(|w| w[0].priority <= w[1].priority);
assert!(is_monotonic, "array of values must be sorted");
let value_sum: f64 = values.iter().map(|x| x.value).sum();
let mut agg: f64 = values[0].value;
let mut index = 0;
let p_step = 5;
let percentiles = (0..=100).step_by(p_step).map(|p| p as f64).collect_vec();
let dist = percentiles
.iter()
.map(|percentile| {
while agg < (value_sum * *percentile) / 100.0 {
index += 1;
agg += values[index].value;
}
let priority = values[index].priority;
HistValue {
percentile: *percentile as f32,
value: priority,
}
})
.collect_vec();
PercentilesCummulative {
bucket_values: dist.iter().map(|hv| hv.value).collect_vec(),
percentiles: dist.iter().map(|hv| hv.percentile / 100.0).collect_vec(),
}
}
pub struct Percentiles {
// value
pub v: Vec<f64>,
// percentile in range 0.0..1.0
pub p: Vec<f32>,
}
impl Display for Percentiles {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for i in 0..self.v.len() {
write!(f, "p{}=>{} ", self.p[i] * 100.0, self.v[i])?;
}
Ok(())
}
}
#[allow(dead_code)]
impl Percentiles {
fn get_bucket_value(&self, percentile: f32) -> Option<f64> {
zip(&self.p, &self.v)
.find(|(&p, _v)| p == percentile)
.map(|(_p, &v)| v)
}
}
pub struct PercentilesCummulative {
pub bucket_values: Vec<f64>,
pub percentiles: Vec<f32>,
}
#[allow(dead_code)]
impl PercentilesCummulative {
fn get_bucket_value(&self, percentile: f32) -> Option<f64> {
zip(&self.percentiles, &self.bucket_values)
.find(|(&p, _cu)| p == percentile)
.map(|(_p, &cu)| cu)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_calculate_percentiles() {
let mut values = vec![2.0, 4.0, 5.0, 3.0, 1.0];
values.sort_by_key(|&x| (x * 100.0) as i64);
let percentiles = calculate_percentiles(&values).v;
assert_eq!(percentiles[0], 1.0);
assert_eq!(percentiles[10], 3.0);
assert_eq!(percentiles[15], 4.0);
assert_eq!(percentiles[18], 5.0);
assert_eq!(percentiles[20], 5.0);
}
#[test]
fn test_calculate_percentiles_by_cu() {
// total of 20000 CU where consumed
let values = vec![Point::from((100.0, 10000.0)), Point::from((200.0, 10000.0))];
let PercentilesCummulative {
bucket_values: by_cu,
percentiles: by_cu_percentiles,
..
} = calculate_cummulative(&values);
assert_eq!(by_cu_percentiles[10], 0.5);
assert_eq!(by_cu[10], 100.0); // need more than 100 to beat 50% of the CU
assert_eq!(by_cu[11], 200.0); // need more than 200 to beat 55% of the CU
assert_eq!(by_cu[20], 200.0); // need more than 200 to beat 100% of the CU
}
#[test]
fn test_empty_array() {
let values = vec![];
let percentiles = calculate_percentiles(&values).v;
// note: this is controversal
assert!(percentiles.is_empty());
}
#[test]
fn test_zeros() {
let values = vec![Point::from((0.0, 0.0)), Point::from((0.0, 0.0))];
let percentiles = calculate_cummulative(&values).bucket_values;
assert_eq!(percentiles[0], 0.0);
}
#[test]
fn test_statisticshowto() {
let values = vec![30.0, 33.0, 43.0, 53.0, 56.0, 67.0, 68.0, 72.0];
let percentiles = calculate_percentiles(&values);
assert_eq!(percentiles.v[5], 43.0);
assert_eq!(percentiles.p[5], 0.25);
assert_eq!(percentiles.get_bucket_value(0.25), Some(43.0));
let values = vec![
Point::from((30.0, 1.0)),
Point::from((33.0, 2.0)),
Point::from((43.0, 3.0)),
Point::from((53.0, 4.0)),
Point::from((56.0, 5.0)),
Point::from((67.0, 6.0)),
Point::from((68.0, 7.0)),
Point::from((72.0, 8.0)),
];
let percentiles = calculate_cummulative(&values);
assert_eq!(percentiles.percentiles[20], 1.0);
assert_eq!(percentiles.bucket_values[20], 72.0);
}
#[test]
fn test_simple_non_integer_index() {
// Messwerte: 3 5 5 6 7 7 8 10 10
// In diesem Fall lautet es also 5.
let values = vec![3.0, 5.0, 5.0, 6.0, 7.0, 7.0, 8.0, 10.0, 10.0];
let percentiles = calculate_percentiles(&values);
assert_eq!(percentiles.p[4], 0.20);
assert_eq!(percentiles.v[5], 5.0);
let values = vec![
Point::from((3.0, 1.0)),
Point::from((5.0, 2.0)),
Point::from((5.0, 3.0)),
Point::from((6.0, 4.0)),
Point::from((7.0, 5.0)),
Point::from((7.0, 6.0)),
Point::from((8.0, 7.0)),
Point::from((10.0, 8.0)),
Point::from((10.0, 9.0)),
];
let percentiles = calculate_cummulative(&values);
assert_eq!(percentiles.percentiles[19], 0.95);
assert_eq!(percentiles.percentiles[20], 1.0);
assert_eq!(percentiles.bucket_values[19], 10.0);
assert_eq!(percentiles.bucket_values[20], 10.0);
}
#[test]
fn test_large_list() {
let values = (0..1000).map(|i| i as f64).collect_vec();
let percentiles = calculate_percentiles(&values);
assert_eq!(percentiles.v[19], 950.0);
assert_eq!(percentiles.p[19], 0.95);
}
}

View File

@ -1,5 +1,7 @@
use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;
use solana_sdk::commitment_config::CommitmentConfig;
@ -10,11 +12,19 @@ use yellowstone_grpc_proto::geyser::{
};
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
pub use yellowstone_grpc_client::{
GeyserGrpcClient, GeyserGrpcClientError, GeyserGrpcClientResult,
};
pub mod channel_plugger;
pub mod grpc_subscription_autoreconnect_streams;
pub mod grpc_subscription_autoreconnect_tasks;
pub mod grpcmultiplex_fastestwins;
pub mod histogram_percentiles;
mod obfuscate;
pub mod yellowstone_grpc_util;
pub type AtomicSlot = Arc<AtomicU64>;
// 1-based attempt counter
type Attempt = u32;

View File

@ -0,0 +1,122 @@
use std::time::Duration;
use tonic::codec::CompressionEncoding;
use tonic::metadata::errors::InvalidMetadataValue;
use tonic::metadata::AsciiMetadataValue;
use tonic::service::Interceptor;
use tonic::transport::ClientTlsConfig;
use tonic_health::pb::health_client::HealthClient;
use yellowstone_grpc_client::{GeyserGrpcBuilderResult, GeyserGrpcClient, InterceptorXToken};
use yellowstone_grpc_proto::geyser::geyser_client::GeyserClient;
use yellowstone_grpc_proto::geyser::SubscribeRequest;
use yellowstone_grpc_proto::prost::bytes::Bytes;
pub async fn connect_with_timeout<E, T>(
endpoint: E,
x_token: Option<T>,
tls_config: Option<ClientTlsConfig>,
connect_timeout: Option<Duration>,
request_timeout: Option<Duration>,
) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor>>
where
E: Into<Bytes>,
T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>,
{
connect_with_timeout_with_buffers(
endpoint,
x_token,
tls_config,
connect_timeout,
request_timeout,
GeyserGrpcClientBufferConfig::default(),
)
.await
}
// see https://github.com/hyperium/tonic/blob/v0.10.2/tonic/src/transport/channel/mod.rs
const DEFAULT_BUFFER_SIZE: usize = 1024;
// see https://github.com/hyperium/hyper/blob/v0.14.28/src/proto/h2/client.rs#L45
const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb
const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb
#[derive(Debug, Clone)]
pub struct GeyserGrpcClientBufferConfig {
pub buffer_size: Option<usize>,
pub conn_window: Option<u32>,
pub stream_window: Option<u32>,
}
impl Default for GeyserGrpcClientBufferConfig {
fn default() -> Self {
GeyserGrpcClientBufferConfig {
buffer_size: Some(DEFAULT_BUFFER_SIZE),
conn_window: Some(DEFAULT_CONN_WINDOW),
stream_window: Some(DEFAULT_STREAM_WINDOW),
}
}
}
impl GeyserGrpcClientBufferConfig {
pub fn optimize_for_subscription(filter: &SubscribeRequest) -> GeyserGrpcClientBufferConfig {
if !filter.blocks.is_empty() {
GeyserGrpcClientBufferConfig {
buffer_size: Some(65536), // 64kb (default: 1k)
conn_window: Some(5242880), // 5mb (=default)
stream_window: Some(4194304), // 4mb (default: 2m)
}
} else {
GeyserGrpcClientBufferConfig::default()
}
}
}
pub async fn connect_with_timeout_with_buffers<E, T>(
endpoint: E,
x_token: Option<T>,
tls_config: Option<ClientTlsConfig>,
connect_timeout: Option<Duration>,
request_timeout: Option<Duration>,
buffer_config: GeyserGrpcClientBufferConfig,
) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor>>
where
E: Into<Bytes>,
T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>,
{
// see https://github.com/blockworks-foundation/geyser-grpc-connector/issues/10
let mut endpoint = tonic::transport::Endpoint::from_shared(endpoint)?
.tcp_nodelay(true)
.http2_adaptive_window(true)
.buffer_size(buffer_config.buffer_size)
.initial_connection_window_size(buffer_config.conn_window)
.initial_stream_window_size(buffer_config.stream_window);
if let Some(tls_config) = tls_config {
endpoint = endpoint.tls_config(tls_config)?;
}
if let Some(connect_timeout) = connect_timeout {
endpoint = endpoint.timeout(connect_timeout);
}
if let Some(request_timeout) = request_timeout {
endpoint = endpoint.timeout(request_timeout);
}
let x_token: Option<AsciiMetadataValue> = match x_token {
Some(x_token) => Some(x_token.try_into()?),
None => None,
};
let interceptor = InterceptorXToken {
x_token,
x_request_snapshot: false,
};
let channel = endpoint.connect_lazy();
let client = GeyserGrpcClient::new(
HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
GeyserClient::with_interceptor(channel, interceptor)
.accept_compressed(CompressionEncoding::Gzip)
.max_decoding_message_size(usize::MAX),
);
Ok(client)
}