chaindata standalone

This commit is contained in:
GroovieGermanikus 2024-12-17 16:08:19 +01:00
parent ace725ba8d
commit 24d5d30062
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
12 changed files with 1146 additions and 15 deletions

315
Cargo.lock generated
View File

@ -797,12 +797,13 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "cc"
version = "1.0.91"
version = "1.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd97381a8cc6493395a5afc4c691c1084b3768db713b73aa215217aa245d153"
checksum = "9157bbaa6b165880c27a4293a474c91cdcf265cc68cc829bf10be0964a391caf"
dependencies = [
"jobserver",
"libc",
"shlex",
]
[[package]]
@ -817,6 +818,42 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chaindata_standalone"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"base64 0.21.7",
"bincode",
"clap 3.2.25",
"csv",
"futures 0.3.30",
"geyser-grpc-connector",
"itertools 0.10.5",
"jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-core-client",
"jsonrpc-derive",
"jsonrpc-pubsub 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static",
"log 0.4.21",
"lz4",
"mango-feeds-connector",
"prometheus",
"serde",
"serde_derive",
"serde_json",
"solana-account-decoder",
"solana-client",
"solana-rpc-client-api",
"solana-sdk",
"tokio",
"tokio-stream",
"tracing",
"tracing-subscriber",
"yellowstone-grpc-proto 1.13.0",
]
[[package]]
name = "chrono"
version = "0.4.37"
@ -1145,6 +1182,27 @@ dependencies = [
"subtle",
]
[[package]]
name = "csv"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf"
dependencies = [
"csv-core",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "csv-core"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70"
dependencies = [
"memchr",
]
[[package]]
name = "ctr"
version = "0.8.0"
@ -1698,6 +1756,30 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "geyser-grpc-connector"
version = "0.10.6+yellowstone.1.13"
source = "git+https://github.com/blockworks-foundation/geyser-grpc-connector.git?tag=v0.10.6+yellowstone.1.13+solana.1.17.28#20f29c1e21e1682f124c0a3386cd4a85b1854f01"
dependencies = [
"anyhow",
"async-stream",
"base64 0.21.7",
"bincode",
"csv",
"derive_more",
"futures 0.3.30",
"itertools 0.10.5",
"log 0.4.21",
"merge-streams",
"solana-sdk",
"tokio",
"tonic-health",
"tracing",
"url 2.5.0",
"yellowstone-grpc-client 1.14.0",
"yellowstone-grpc-proto 1.13.0",
]
[[package]]
name = "gimli"
version = "0.28.1"
@ -1990,6 +2072,19 @@ dependencies = [
"tokio-io-timeout",
]
[[package]]
name = "hyper-tls"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes 1.6.0",
"hyper 0.14.28",
"native-tls",
"tokio",
"tokio-native-tls",
]
[[package]]
name = "iana-time-zone"
version = "0.1.60"
@ -2139,9 +2234,9 @@ checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
[[package]]
name = "jobserver"
version = "0.1.28"
version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab46a6e9526ddef3ae7f787c06f0f2600639ba80ea3eade3d8e670a2230f51d6"
checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0"
dependencies = [
"libc",
]
@ -2164,6 +2259,7 @@ dependencies = [
"flate2 0.2.20",
"futures 0.3.30",
"hyper 0.14.28",
"hyper-tls",
"jsonrpc-core 18.0.0 (git+https://github.com/ckamm/jsonrpc.git?branch=ckamm/http-with-gzip-default-v18.0.0)",
"jsonrpc-pubsub 18.0.0 (git+https://github.com/ckamm/jsonrpc.git?branch=ckamm/http-with-gzip-default-v18.0.0)",
"log 0.4.21",
@ -2280,9 +2376,9 @@ checksum = "a91d884b6667cd606bb5a69aa0c99ba811a115fc68915e7056ec08a46e93199a"
[[package]]
name = "lazy_static"
version = "1.4.0"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]]
name = "libc"
@ -2390,6 +2486,25 @@ version = "0.4.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c"
[[package]]
name = "lz4"
version = "1.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d1febb2b4a79ddd1980eede06a8f7902197960aa0383ffcfdd62fe723036725"
dependencies = [
"lz4-sys",
]
[[package]]
name = "lz4-sys"
version = "1.11.1+lz4-1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "mango-feeds-connector"
version = "0.4.7"
@ -2399,6 +2514,7 @@ dependencies = [
"async-trait",
"clap 3.2.25",
"criterion",
"csv",
"futures 0.3.30",
"itertools 0.10.5",
"jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2419,9 +2535,19 @@ dependencies = [
"tokio",
"tokio-stream",
"tracing",
"tracing-subscriber",
"warp",
"yellowstone-grpc-client",
"yellowstone-grpc-proto",
"yellowstone-grpc-client 1.15.0+solana.1.17",
"yellowstone-grpc-proto 1.14.0+solana.1.17",
]
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata 0.1.10",
]
[[package]]
@ -2475,6 +2601,16 @@ dependencies = [
"autocfg 1.2.0",
]
[[package]]
name = "merge-streams"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f84f6452969abd246e7ac1fe4fe75906c76e8ec88d898df9aef37e0f3b6a7c2"
dependencies = [
"futures-core",
"pin-project",
]
[[package]]
name = "merlin"
version = "3.0.0"
@ -2655,6 +2791,16 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi 0.3.9",
]
[[package]]
name = "num"
version = "0.2.1"
@ -2914,6 +3060,12 @@ version = "6.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking_lot"
version = "0.9.0"
@ -3236,6 +3388,21 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "prometheus"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1"
dependencies = [
"cfg-if 1.0.0",
"fnv",
"lazy_static",
"memchr",
"parking_lot 0.12.1",
"protobuf",
"thiserror",
]
[[package]]
name = "prost"
version = "0.12.4"
@ -3289,6 +3456,12 @@ dependencies = [
"prost",
]
[[package]]
name = "protobuf"
version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
[[package]]
name = "protobuf-src"
version = "1.1.0+21.5"
@ -3634,8 +3807,17 @@ checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata",
"regex-syntax",
"regex-automata 0.4.6",
"regex-syntax 0.8.3",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax 0.6.29",
]
[[package]]
@ -3646,9 +3828,15 @@ checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
"regex-syntax 0.8.3",
]
[[package]]
name = "regex-syntax"
version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.8.3"
@ -4124,12 +4312,27 @@ dependencies = [
"keccak",
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
[[package]]
name = "shell-words"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde"
[[package]]
name = "shlex"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-hook-registry"
version = "1.4.1"
@ -5287,6 +5490,16 @@ dependencies = [
"syn 2.0.58",
]
[[package]]
name = "thread_local"
version = "1.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c"
dependencies = [
"cfg-if 1.0.0",
"once_cell",
]
[[package]]
name = "time"
version = "0.1.45"
@ -5445,6 +5658,16 @@ dependencies = [
"syn 2.0.58",
]
[[package]]
name = "tokio-native-tls"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
dependencies = [
"native-tls",
"tokio",
]
[[package]]
name = "tokio-reactor"
version = "0.1.12"
@ -5717,6 +5940,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log 0.4.21",
"once_cell",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"sharded-slab",
"smallvec 1.13.2",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]
[[package]]
@ -5903,6 +6156,12 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "valuable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcpkg"
version = "0.2.15"
@ -6381,6 +6640,21 @@ dependencies = [
"time 0.3.34",
]
[[package]]
name = "yellowstone-grpc-client"
version = "1.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f4f6d836d214cb8789002d039412da354049e9ffe983c643ec492c4d934698f"
dependencies = [
"bytes 1.6.0",
"futures 0.3.30",
"http 0.2.12",
"thiserror",
"tonic",
"tonic-health",
"yellowstone-grpc-proto 1.13.0",
]
[[package]]
name = "yellowstone-grpc-client"
version = "1.15.0+solana.1.17"
@ -6391,7 +6665,24 @@ dependencies = [
"thiserror",
"tonic",
"tonic-health",
"yellowstone-grpc-proto",
"yellowstone-grpc-proto 1.14.0+solana.1.17",
]
[[package]]
name = "yellowstone-grpc-proto"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c00b66d40d85c405f14b420d7674e98c70d06f6b673f36c9e0285f81b9b797d"
dependencies = [
"anyhow",
"bincode",
"prost",
"protobuf-src",
"solana-account-decoder",
"solana-sdk",
"solana-transaction-status",
"tonic",
"tonic-build",
]
[[package]]

View File

@ -1,6 +1,7 @@
[workspace]
members = [
"connector",
"chaindata_standalone"
]
resolver = "2"
@ -52,5 +53,3 @@ rustls = "0.20.8"
warp = "0.3"

1
chaindata_standalone/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
benchinput/

View File

@ -0,0 +1,56 @@
[package]
name = "chaindata_standalone"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "replay_slot_account_stream"
path = "src/replay_slot_account_stream.rs"
[dependencies]
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tokio = { workspace = true }
tokio-stream = { workspace = true }
csv = "1.3.0"
log = { workspace = true }
anyhow = { workspace = true }
itertools = { workspace = true }
futures = { workspace = true }
lazy_static = "1.5.0"
serde = { workspace = true }
serde_json = { workspace = true }
serde_derive = { workspace = true }
lz4 = "1.24.0"
base64 = "0.21.7"
prometheus = "0.13.4"
bincode = "1.3.3"
async-trait = { workspace = true }
mango-feeds-connector = { path = "../connector" }
# note: this version does not relax the solana version
geyser-grpc-connector = { tag = "v0.10.6+yellowstone.1.13+solana.1.17.28", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" }
yellowstone-grpc-proto = "1.13.0"
#yellowstone-grpc-client = "1.14.0"
jsonrpc-core = "18.0.0"
jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http", "tls"] }
jsonrpc-derive = "18.0.0"
jsonrpc-pubsub = "18.0.0"
solana-sdk = { workspace = true }
solana-client = { workspace = true }
solana-rpc-client-api = "1.17"
solana-account-decoder = "1.17"
clap = { version = "3.2.25", features = ["derive"] }
[lints.clippy]
needless_return = "allow"
enum_glob_use = "deny"

View File

@ -0,0 +1,39 @@
```
2024-08-12T08:58:14.644200Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195675784
2024-08-12T08:58:14.739358Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676308
2024-08-12T08:58:14.774673Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676381
2024-08-12T08:58:14.899019Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676458
2024-08-12T08:58:14.899313Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676472
2024-08-12T08:58:14.914404Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676532
2024-08-12T08:58:14.946106Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676565
2024-08-12T08:58:14.946583Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676579
2024-08-12T08:58:14.971928Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676593
2024-08-12T08:58:14.986530Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676627
2024-08-12T08:58:15.005577Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676657
2024-08-12T08:58:15.864777Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195675784
2024-08-12T08:58:15.864815Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195675784
2024-08-12T08:58:15.864856Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398
2024-08-12T08:58:17.875143Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676308
2024-08-12T08:58:17.875227Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676308
2024-08-12T08:58:17.875421Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398
2024-08-12T08:58:17.875454Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676381
2024-08-12T08:58:17.875516Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676381
2024-08-12T08:58:17.875610Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398
2024-08-12T08:58:18.880978Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676458
2024-08-12T08:58:18.881039Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676458
2024-08-12T08:58:18.881218Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398
2024-08-12T08:58:18.881235Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676472
2024-08-12T08:58:18.881297Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676472
2024-08-12T08:58:18.881378Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398
2024-08-12T08:58:19.887019Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676532
2024-08-12T08:58:19.887155Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676532
2024-08-12T08:58:19.887320Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398
2024-08-12T08:58:19.887609Z TRACE chaindata_standalone::router_impl: [account_write_receiver->chain_data] account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676565
2024-08-12T08:58:20.892641Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676565
2024-08-12T08:58:20.892897Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676565
2024-08-12T08:58:20.893127Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676579
2024-08-12T08:58:20.893197Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676579
```

View File

@ -0,0 +1,41 @@
#![allow(dead_code)]
use serde_derive::{Deserialize, Serialize};
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
pub fn account_write_from(
pubkey: Pubkey,
slot: u64,
write_version: u64,
account: Account,
) -> AccountWrite {
AccountWrite {
pubkey,
slot,
write_version,
lamports: account.lamports,
owner: account.owner,
executable: account.executable,
rent_epoch: account.rent_epoch,
data: account.data,
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AccountWrite {
pub pubkey: Pubkey,
pub slot: u64,
pub write_version: u64,
pub lamports: u64,
pub owner: Pubkey,
pub executable: bool,
pub rent_epoch: u64,
pub data: Vec<u8>,
// is_selected < removed
}
#[derive(Debug)]
pub enum AccountOrSnapshotUpdate {
AccountUpdate(AccountWrite),
SnapshotUpdate(Vec<AccountWrite>),
}

View File

@ -0,0 +1,376 @@
#![allow(dead_code)]
use std::collections::HashSet;
use std::{str::FromStr, sync::Arc, time::Duration};
use crate::account_write::{account_write_from, AccountWrite};
use crate::solana_rpc_minimal::rpc_accounts_scan::RpcAccountsScanClient;
use base64::Engine;
use jsonrpc_core_client::transports::http;
use serde::{Deserialize, Serialize};
use serde_json::json;
use solana_account_decoder::{UiAccount, UiAccountEncoding};
use solana_client::{
nonblocking::rpc_client::RpcClient,
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_response::OptionalContext,
};
use solana_rpc_client_api::filter::{Memcmp, RpcFilterType};
use solana_sdk::account::Account;
use solana_sdk::{
account::{Account as SolanaAccount, AccountSharedData, ReadableAccount},
commitment_config::CommitmentConfig,
pubkey::Pubkey,
};
use tracing::info;
pub struct CustomSnapshotProgramAccounts {
pub slot: u64,
pub program_id: Option<Pubkey>,
pub accounts: Vec<AccountWrite>,
pub missing_accounts: Vec<Pubkey>,
}
#[derive(Clone, PartialEq, Debug)]
pub enum FeedMetadata {
InvalidAccount(Pubkey),
SnapshotStart(Option<Pubkey>),
SnapshotEnd(Option<Pubkey>),
}
// called on startup to get the required accounts, few calls with some 100 thousand accounts
#[tracing::instrument(skip_all, level = "trace")]
pub async fn get_snapshot_gta(
rpc_http_url: &str,
owner_id: &Pubkey,
) -> anyhow::Result<CustomSnapshotProgramAccounts> {
let rpc_client = Arc::new(RpcClient::new_with_timeout_and_commitment(
rpc_http_url.to_string(),
Duration::from_secs(60 * 20),
CommitmentConfig::confirmed(),
));
let config = RpcProgramAccountsConfig {
filters: Some(vec![
RpcFilterType::DataSize(165),
RpcFilterType::Memcmp(Memcmp::new_base58_encoded(
32,
owner_id.to_bytes().as_slice(),
)),
]),
account_config: Default::default(),
with_context: Some(true),
};
// println!("{}", serde_json::to_string(&config)?);
let token_program = Pubkey::from_str("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA").unwrap();
let result =
get_compressed_program_account_rpc(&rpc_client, &HashSet::from([token_program]), config)
.await?;
Ok(CustomSnapshotProgramAccounts {
slot: result.0,
accounts: result.1,
program_id: Some(token_program),
missing_accounts: vec![],
})
}
// called on startup to get the required accounts, few calls with some 100 thousand accounts
#[tracing::instrument(skip_all, level = "trace")]
pub async fn get_snapshot_gpa(
rpc_http_url: &str,
program_id: &Pubkey,
use_compression: bool,
) -> anyhow::Result<CustomSnapshotProgramAccounts> {
let result = if use_compression {
get_compressed_program_account(rpc_http_url, &[*program_id].into_iter().collect()).await?
} else {
get_uncompressed_program_account(rpc_http_url, program_id.to_string()).await?
};
Ok(CustomSnapshotProgramAccounts {
slot: result.0,
accounts: result.1,
program_id: Some(*program_id),
missing_accounts: vec![],
})
}
// called on startup to get the required accounts, few calls with some 100 thousand accounts
#[tracing::instrument(skip_all, level = "trace")]
pub async fn get_snapshot_gma(
rpc_http_url: &str,
keys: &[Pubkey],
) -> anyhow::Result<CustomSnapshotProgramAccounts> {
let keys = keys.iter().map(|x| x.to_string()).collect::<Vec<String>>();
feeds_get_snapshot_gma(rpc_http_url, keys).await
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct RpcKeyedCompressedAccount {
pub p: String,
pub a: String,
}
// called on startup to get the required accounts, few calls with some 100 thousand accounts
#[tracing::instrument(skip_all, level = "trace")]
pub async fn get_compressed_program_account(
rpc_url: &str,
filters: &HashSet<Pubkey>,
) -> anyhow::Result<(u64, Vec<AccountWrite>)> {
// setting larget timeout because gPA can take a lot of time
let rpc_client = Arc::new(RpcClient::new_with_timeout_and_commitment(
rpc_url.to_string(),
Duration::from_secs(60 * 20),
CommitmentConfig::finalized(),
));
let config = RpcProgramAccountsConfig {
filters: None,
with_context: Some(true),
..Default::default()
};
get_compressed_program_account_rpc(&rpc_client, filters, config).await
}
// called on startup to get the required accounts, few calls with some 100 thousand accounts
#[tracing::instrument(skip_all, level = "trace")]
pub async fn get_compressed_program_account_rpc(
rpc_client: &RpcClient,
filters: &HashSet<Pubkey>,
config: RpcProgramAccountsConfig,
) -> anyhow::Result<(u64, Vec<AccountWrite>)> {
let config = RpcProgramAccountsConfig {
with_context: Some(true),
..config
};
let mut snap_result = vec![];
let mut min_slot = u64::MAX;
// use getGPA compressed if available
for program_id in filters.iter() {
info!("gPA for {}", program_id);
let result = rpc_client
.send::<OptionalContext<Vec<RpcKeyedCompressedAccount>>>(
solana_client::rpc_request::RpcRequest::Custom {
method: "getProgramAccountsCompressed",
},
json!([program_id.to_string(), config]),
)
.await;
// failed to get over compressed program accounts
match result {
Ok(OptionalContext::Context(response)) => {
info!("Received compressed data for {}", program_id);
let updated_slot = response.context.slot;
min_slot = updated_slot.min(min_slot);
for key_account in response.value {
let base64_decoded =
base64::engine::general_purpose::STANDARD.decode(&key_account.a)?;
// decompress all the account information
let uncompressed = lz4::block::decompress(&base64_decoded, None)?;
let shared_data = bincode::deserialize::<AccountSharedData>(&uncompressed)?;
let account = SolanaAccount {
lamports: shared_data.lamports(),
data: shared_data.data().to_vec(),
owner: *shared_data.owner(),
executable: shared_data.executable(),
rent_epoch: shared_data.rent_epoch(),
};
snap_result.push(AccountWrite {
pubkey: Pubkey::from_str(&key_account.p).unwrap(),
lamports: account.lamports,
data: account.data,
owner: account.owner,
executable: account.executable,
rent_epoch: account.rent_epoch,
slot: updated_slot,
write_version: 0,
// is_selected: false,
});
}
info!(
"Decompressed snapshot for {} with {} accounts",
program_id,
snap_result.len()
);
}
Err(e) => {
anyhow::bail!(
"failed to get program {} account snapshot: {}",
program_id,
e
)
}
_ => {
anyhow::bail!(
"failed to get program {} account snapshot (unknown reason)",
program_id
)
}
}
}
Ok((min_slot, snap_result))
}
// called on startup to get the required accounts, few calls with some 100 thousand accounts
#[tracing::instrument(skip_all, level = "trace")]
pub async fn get_uncompressed_program_account(
rpc_url: &str,
program_id: String,
) -> anyhow::Result<(u64, Vec<AccountWrite>)> {
let result = feeds_get_snapshot_gpa(rpc_url, program_id).await?;
Ok(result)
}
pub async fn feeds_get_snapshot_gpa(
rpc_http_url: &str,
program_id: String,
) -> anyhow::Result<(u64, Vec<AccountWrite>)> {
let rpc_client = http::connect::<RpcAccountsScanClient>(rpc_http_url)
.await
.map_err_anyhow()?;
let account_info_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::finalized()),
data_slice: None,
min_context_slot: None,
};
let program_accounts_config = RpcProgramAccountsConfig {
filters: None,
with_context: Some(true),
account_config: account_info_config.clone(),
};
tracing::log::debug!("requesting gpa snapshot {}", program_id);
let account_snapshot = rpc_client
.get_program_accounts(program_id.clone(), Some(program_accounts_config.clone()))
.await
.map_err_anyhow()?;
tracing::log::debug!("gpa snapshot received {}", program_id);
match account_snapshot {
OptionalContext::Context(snapshot) => {
let snapshot_slot = snapshot.context.slot;
let mut accounts = vec![];
for acc in snapshot.value {
let (key, account) = (acc.pubkey, acc.account);
let pubkey = Pubkey::from_str(key.as_str()).unwrap();
let account: Account = account.decode().unwrap();
accounts.push(account_write_from(pubkey, snapshot_slot, 0, account));
}
Ok((snapshot_slot, accounts))
}
OptionalContext::NoContext(_) => anyhow::bail!("bad snapshot format"),
}
}
async fn feeds_get_snapshot_gma(
rpc_http_url: &str,
ids: Vec<String>,
) -> anyhow::Result<CustomSnapshotProgramAccounts> {
let rpc_client = http::connect::<RpcAccountsScanClient>(rpc_http_url)
.await
.map_err_anyhow()?;
let account_info_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::finalized()),
data_slice: None,
min_context_slot: None,
};
tracing::log::debug!("requesting gma snapshot {:?}", ids);
let account_snapshot_response = rpc_client
.get_multiple_accounts(ids.clone(), Some(account_info_config))
.await
.map_err_anyhow()?;
tracing::log::debug!("gma snapshot received {:?}", ids);
let first_full_shot = account_snapshot_response.context.slot;
let acc: Vec<(String, Option<UiAccount>)> = ids
.iter()
.zip(account_snapshot_response.value)
.map(|x| (x.0.clone(), x.1))
.collect();
let mut accounts = vec![];
let mut missing_accounts = vec![];
for (key, account) in acc {
let pubkey = Pubkey::from_str(key.as_str()).unwrap();
if let Some(account) = account {
let account: Account = account.decode().unwrap();
accounts.push(account_write_from(pubkey, first_full_shot, 0, account));
} else {
missing_accounts.push(pubkey);
}
}
Ok(CustomSnapshotProgramAccounts {
slot: first_full_shot,
accounts,
missing_accounts,
program_id: None,
})
}
/// Fetch multiple account using one request per chunk of `max_chunk_size` accounts
///
/// WARNING: some accounts requested may be missing from the result
pub async fn fetch_multiple_accounts(
rpc: &RpcClient,
all_keys: &[Pubkey],
max_chunk_size: usize,
) -> anyhow::Result<Vec<(Pubkey, Account)>> {
let config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
..RpcAccountInfoConfig::default()
};
let mut raw_results = vec![];
for keys in all_keys.chunks(max_chunk_size) {
let account_info_config = config.clone();
let keys: Vec<Pubkey> = keys.to_vec();
let req_res = rpc
.get_multiple_accounts_with_config(&keys, account_info_config)
.await?;
let r: Vec<(Pubkey, Option<Account>)> = keys.into_iter().zip(req_res.value).collect();
raw_results.push(r);
}
let result = raw_results
.into_iter()
.flatten()
.filter_map(|(pubkey, account_opt)| account_opt.map(|acc| (pubkey, acc)))
.collect::<Vec<_>>();
Ok(result)
}
trait AnyhowWrap {
type Value;
fn map_err_anyhow(self) -> anyhow::Result<Self::Value>;
}
impl<T, E: std::fmt::Debug> AnyhowWrap for Result<T, E> {
type Value = T;
fn map_err_anyhow(self) -> anyhow::Result<Self::Value> {
self.map_err(|err| anyhow::anyhow!("{:?}", err))
}
}

View File

@ -0,0 +1,19 @@
use prometheus::{register_int_counter, IntCounter};
lazy_static::lazy_static! {
pub static ref SNAPSHOT_ACCOUNTS_CNT: IntCounter =
register_int_counter!("SNAPSHOT_ACCOUNTS_CNT", "Counter").unwrap();
pub static ref GRPC_PLUMBING_ACCOUNT_MESSAGE_CNT: IntCounter =
register_int_counter!("GRPC_PLUMBING_ACCOUNT_MESSAGE_CNT", "Counter").unwrap();
pub static ref GRPC_PLUMBING_SLOT_MESSAGE_CNT: IntCounter =
register_int_counter!("GRPC_PLUMBING_SLOT_MESSAGE_CNT", "Counter").unwrap();
pub static ref CHAINDATA_ACCOUNT_WRITE_IN: IntCounter =
register_int_counter!("CHAINDATA_ACCOUNT_WRITE_IN", "Counter").unwrap();
pub static ref CHAINDATA_SLOT_UPDATE_IN: IntCounter =
register_int_counter!("CHAINDATA_SLOT_UPDATE_IN", "Counter").unwrap();
pub static ref CHAINDATA_UPDATE_ACCOUNT: IntCounter =
register_int_counter!("CHAINDATA_UPDATE_ACCOUNT", "Counter").unwrap();
pub static ref CHAINDATA_SNAP_UPDATE_ACCOUNT: IntCounter =
register_int_counter!("CHAINDATA_SNAP_UPDATE_ACCOUNT", "Counter").unwrap();
pub static ref ACCOUNT_UPDATE_SENDER: IntCounter =
register_int_counter!("ACCOUNT_UPDATE_SENDER", "Counter").unwrap();
}

View File

@ -0,0 +1,43 @@
use log::{debug, info, trace};
use prometheus::core::{Collector, Metric};
use prometheus::IntCounter;
use std::f64::NAN;
use std::time::Duration;
use tokio::time::{interval, sleep, Instant, Interval};
pub fn start_metrics_dumper(prom_counter: &IntCounter) {
let metric = prom_counter.clone();
let name = prom_counter
.desc()
.get(0)
.map(|x| x.fq_name.clone())
.unwrap_or("noname".to_string());
tokio::spawn(async move {
let mut interval = interval(Duration::from_millis(3000));
let mut last_observed_at = Instant::now();
let mut last_observed_value: u64 = u64::MIN;
loop {
let value = metric.get();
trace!("counter <{}> value: {}", name, value);
if last_observed_value != u64::MIN {
let elapsed = last_observed_at.elapsed().as_secs_f64();
let delta = value - last_observed_value;
debug!(
"counter <{}> (value={}) with throughput {:.1}/s",
name,
value,
delta as f64 / elapsed
);
}
last_observed_value = value;
last_observed_at = Instant::now();
interval.tick().await;
}
});
}

View File

@ -0,0 +1,141 @@
use itertools::Itertools;
use log::{info, trace};
use mango_feeds_connector::chain_data::{AccountData, ChainData, SlotData, SlotStatus};
use solana_sdk::account::AccountSharedData;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentLevel;
use solana_sdk::pubkey::Pubkey;
use std::fs::File;
use std::io;
use std::io::BufRead;
use std::str::FromStr;
use std::time::Instant;
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::EnvFilter;
// #[derive(Parser, Debug, Clone)]
// #[clap()]
// struct Cli {
// #[clap(short, long)]
// replay_file: Option<String>,
// }
const RAYDIUM_AMM_PUBKEY: &str = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8";
pub fn main() {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.with_span_events(FmtSpan::CLOSE)
.init();
// let Cli { replay_file } = Cli::parse();
let mut chain_data = ChainData::new();
let mut slot_cnt = 0;
let mut account_cnt = 0;
let started_at = Instant::now();
// read line
// let buffer: BufReader<_> = match replay_file {
// None => {
// info!("use data from inline csv");
// // 500k
// let data = include_bytes!("dump-slot-acccounts-fsn4-mixed-500k.csv");
// io::BufReader::new(data.as_ref())
// }
// Some(slot_stream_dump_file) => {
// info!("use replay_file: {}", slot_stream_dump_file);
// let file = File::open(slot_stream_dump_file).unwrap();
// io::BufReader::new(file)
// }
// };
let buffer = {
info!("use data from inline csv");
// 500k
let slot_stream_dump_file =
"chaindata_standalone/benchinput/dump-slot-acccounts-fsn4-mixed-500k.csv";
let file = File::open(slot_stream_dump_file).unwrap();
io::BufReader::new(file)
};
for line in buffer.lines().map_while(Result::ok) {
// update_slot.slot, update_slot.parent.unwrap_or(0), short_status, since_epoch_ms
// slot, account_pk, write_version, data_len, since_epoch_ms
let rows = line.split(',').collect_vec();
if rows[0] == "MIXSLOT" {
let slot: u64 = rows[1].parse().unwrap();
let parent: Option<u64> =
rows[2]
.parse()
.ok()
.and_then(|v| if v == 0 { None } else { Some(v) });
let commitment_level = match rows[3].to_string().as_str() {
"P" => CommitmentLevel::Processed,
"C" => CommitmentLevel::Confirmed,
"F" => CommitmentLevel::Finalized,
_ => panic!("invalid commitment level"),
};
let since_epoch_ms: u64 = rows[4].trim().parse().unwrap();
let slot_status = match commitment_level {
CommitmentLevel::Processed => SlotStatus::Processed,
CommitmentLevel::Confirmed => SlotStatus::Confirmed,
CommitmentLevel::Finalized => SlotStatus::Rooted,
_ => panic!("invalid commitment level"),
};
const INIT_CHAIN: Slot = 0;
trace!(
"MIXSLOT slot: {}, parent: {:?}, status: {:?}, since_epoch_ms: {}",
slot,
parent,
slot_status,
since_epoch_ms
);
let slot_data = SlotData {
slot,
parent,
status: slot_status,
chain: INIT_CHAIN,
};
slot_cnt += 1;
chain_data.update_slot(slot_data);
} else if rows[0] == "MIXACCOUNT" {
let slot: u64 = rows[1].parse().unwrap();
let account_pk: String = rows[2].parse().unwrap();
let account_pk = Pubkey::from_str(&account_pk).unwrap();
let write_version: u64 = rows[3].parse().unwrap();
let data_len: u64 = rows[4].parse().unwrap();
let since_epoch_ms: u64 = rows[5].trim().parse().unwrap();
trace!("MIXACCOUNT slot: {}, account_pk: {}, write_version: {}, data_len: {}, since_epoch_ms: {}", slot, account_pk, write_version, data_len, since_epoch_ms);
let account_data = AccountData {
slot,
write_version,
account: AccountSharedData::new(
slot,
data_len as usize,
&Pubkey::from_str(RAYDIUM_AMM_PUBKEY).unwrap(),
),
};
account_cnt += 1;
chain_data.update_account(account_pk, account_data);
}
if (slot_cnt + account_cnt) % 100_000 == 0 {
info!(
"progress .. slot_cnt: {}, account_cnt: {}, elapsed: {:.02}s",
slot_cnt,
account_cnt,
started_at.elapsed().as_secs_f64()
);
}
}
info!(
"slot_cnt: {}, account_cnt: {}, elapsed: {:.02}s",
slot_cnt,
account_cnt,
started_at.elapsed().as_secs_f64()
);
}

View File

@ -0,0 +1,122 @@
// cloned from mango-feeds
pub mod rpc_accounts_scan {
use jsonrpc_core::Result;
use jsonrpc_derive::rpc;
use solana_account_decoder::UiAccount;
use solana_rpc_client_api::config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
use solana_rpc_client_api::response::{
OptionalContext, Response as RpcResponse, RpcKeyedAccount,
};
/// this definition is derived from solana-rpc/rpc.rs
/// we want to avoid the heavy dependency to solana-rpc
/// the crate solana-rpc-client provides some client methods but do not expose the ```Context```we need
///
#[rpc]
pub trait RpcAccountsScan {
type Metadata;
#[rpc(meta, name = "getProgramAccounts")]
fn get_program_accounts(
&self,
meta: Self::Metadata,
program_id_str: String,
config: Option<RpcProgramAccountsConfig>,
) -> Result<OptionalContext<Vec<RpcKeyedAccount>>>;
#[rpc(meta, name = "getMultipleAccounts")]
fn get_multiple_accounts(
&self,
meta: Self::Metadata,
pubkey_strs: Vec<String>,
config: Option<RpcAccountInfoConfig>,
) -> Result<RpcResponse<Vec<Option<UiAccount>>>>;
}
}
pub mod rpc_pubsub {
use jsonrpc_core::Result;
use jsonrpc_derive::rpc;
use jsonrpc_pubsub::typed::Subscriber;
use jsonrpc_pubsub::SubscriptionId as PubSubSubscriptionId;
use solana_account_decoder::UiAccount;
use solana_rpc_client_api::config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
use solana_rpc_client_api::response::{Response as RpcResponse, RpcKeyedAccount, SlotUpdate};
use std::sync::Arc;
#[rpc]
pub trait RpcSolPubSub {
type Metadata;
#[pubsub(
subscription = "accountNotification",
subscribe,
name = "accountSubscribe"
)]
fn account_subscribe(
&self,
meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<UiAccount>>,
pubkey_str: String,
config: Option<RpcAccountInfoConfig>,
);
#[pubsub(
subscription = "accountNotification",
unsubscribe,
name = "accountUnsubscribe"
)]
fn account_unsubscribe(
&self,
meta: Option<Self::Metadata>,
id: PubSubSubscriptionId,
) -> Result<bool>;
#[pubsub(
subscription = "programNotification",
subscribe,
name = "programSubscribe"
)]
fn program_subscribe(
&self,
meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<RpcKeyedAccount>>,
pubkey_str: String,
config: Option<RpcProgramAccountsConfig>,
);
#[pubsub(
subscription = "programNotification",
unsubscribe,
name = "programUnsubscribe"
)]
fn program_unsubscribe(
&self,
meta: Option<Self::Metadata>,
id: PubSubSubscriptionId,
) -> Result<bool>;
#[pubsub(
subscription = "slotsUpdatesNotification",
subscribe,
name = "slotsUpdatesSubscribe"
)]
fn slots_updates_subscribe(
&self,
meta: Self::Metadata,
subscriber: Subscriber<Arc<SlotUpdate>>,
);
#[pubsub(
subscription = "slotsUpdatesNotification",
unsubscribe,
name = "slotsUpdatesUnsubscribe"
)]
fn slots_updates_unsubscribe(
&self,
meta: Option<Self::Metadata>,
id: PubSubSubscriptionId,
) -> Result<bool>;
}
}

View File

@ -16,6 +16,9 @@ harness = false
default = []
[dependencies]
tracing = "0.1"
tracing-subscriber = "0.3"
jsonrpc-core = { workspace = true }
jsonrpc-core-client = { workspace = true }
jsonrpc-derive = "18.0.0"
@ -37,7 +40,6 @@ serde = { workspace = true }
serde_derive = { workspace = true }
log = { workspace = true }
tracing = "0.1.40"
anyhow = { workspace = true }
smallvec = "1.13.2"
@ -53,6 +55,7 @@ warp = { workspace = true }
# 1.9.0+solana.1.16.1
yellowstone-grpc-client = { workspace = true }
yellowstone-grpc-proto = { workspace = true }
csv = "1.3.0"
[dev-dependencies]
clap = { workspace = true }