diff --git a/Cargo.lock b/Cargo.lock index 9a21965..b0d45e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -797,12 +797,13 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.0.91" +version = "1.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fd97381a8cc6493395a5afc4c691c1084b3768db713b73aa215217aa245d153" +checksum = "9157bbaa6b165880c27a4293a474c91cdcf265cc68cc829bf10be0964a391caf" dependencies = [ "jobserver", "libc", + "shlex", ] [[package]] @@ -817,6 +818,42 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chaindata_standalone" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.21.7", + "bincode", + "clap 3.2.25", + "csv", + "futures 0.3.30", + "geyser-grpc-connector", + "itertools 0.10.5", + "jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "jsonrpc-core-client", + "jsonrpc-derive", + "jsonrpc-pubsub 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static", + "log 0.4.21", + "lz4", + "mango-feeds-connector", + "prometheus", + "serde", + "serde_derive", + "serde_json", + "solana-account-decoder", + "solana-client", + "solana-rpc-client-api", + "solana-sdk", + "tokio", + "tokio-stream", + "tracing", + "tracing-subscriber", + "yellowstone-grpc-proto 1.13.0", +] + [[package]] name = "chrono" version = "0.4.37" @@ -1145,6 +1182,27 @@ dependencies = [ "subtle", ] +[[package]] +name = "csv" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" +dependencies = [ + "memchr", +] + [[package]] name = "ctr" version = "0.8.0" @@ -1698,6 +1756,30 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "geyser-grpc-connector" +version = "0.10.6+yellowstone.1.13" +source = "git+https://github.com/blockworks-foundation/geyser-grpc-connector.git?tag=v0.10.6+yellowstone.1.13+solana.1.17.28#20f29c1e21e1682f124c0a3386cd4a85b1854f01" +dependencies = [ + "anyhow", + "async-stream", + "base64 0.21.7", + "bincode", + "csv", + "derive_more", + "futures 0.3.30", + "itertools 0.10.5", + "log 0.4.21", + "merge-streams", + "solana-sdk", + "tokio", + "tonic-health", + "tracing", + "url 2.5.0", + "yellowstone-grpc-client 1.14.0", + "yellowstone-grpc-proto 1.13.0", +] + [[package]] name = "gimli" version = "0.28.1" @@ -1990,6 +2072,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes 1.6.0", + "hyper 0.14.28", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -2139,9 +2234,9 @@ checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" [[package]] name = "jobserver" -version = "0.1.28" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab46a6e9526ddef3ae7f787c06f0f2600639ba80ea3eade3d8e670a2230f51d6" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" dependencies = [ "libc", ] @@ -2164,6 +2259,7 @@ dependencies = [ "flate2 0.2.20", "futures 0.3.30", "hyper 0.14.28", + "hyper-tls", "jsonrpc-core 18.0.0 (git+https://github.com/ckamm/jsonrpc.git?branch=ckamm/http-with-gzip-default-v18.0.0)", "jsonrpc-pubsub 18.0.0 (git+https://github.com/ckamm/jsonrpc.git?branch=ckamm/http-with-gzip-default-v18.0.0)", "log 0.4.21", @@ -2280,9 +2376,9 @@ checksum = "a91d884b6667cd606bb5a69aa0c99ba811a115fc68915e7056ec08a46e93199a" [[package]] name = "lazy_static" -version = "1.4.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" @@ -2390,6 +2486,25 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "lz4" +version = "1.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d1febb2b4a79ddd1980eede06a8f7902197960aa0383ffcfdd62fe723036725" +dependencies = [ + "lz4-sys", +] + +[[package]] +name = "lz4-sys" +version = "1.11.1+lz4-1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "mango-feeds-connector" version = "0.4.7" @@ -2399,6 +2514,7 @@ dependencies = [ "async-trait", "clap 3.2.25", "criterion", + "csv", "futures 0.3.30", "itertools 0.10.5", "jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2419,9 +2535,19 @@ dependencies = [ "tokio", "tokio-stream", "tracing", + "tracing-subscriber", "warp", - "yellowstone-grpc-client", - "yellowstone-grpc-proto", + "yellowstone-grpc-client 1.15.0+solana.1.17", + "yellowstone-grpc-proto 1.14.0+solana.1.17", +] + +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", ] [[package]] @@ -2475,6 +2601,16 @@ dependencies = [ "autocfg 1.2.0", ] +[[package]] +name = "merge-streams" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f84f6452969abd246e7ac1fe4fe75906c76e8ec88d898df9aef37e0f3b6a7c2" +dependencies = [ + "futures-core", + "pin-project", +] + [[package]] name = "merlin" version = "3.0.0" @@ -2655,6 +2791,16 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi 0.3.9", +] + [[package]] name = "num" version = "0.2.1" @@ -2914,6 +3060,12 @@ version = "6.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking_lot" version = "0.9.0" @@ -3236,6 +3388,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +dependencies = [ + "cfg-if 1.0.0", + "fnv", + "lazy_static", + "memchr", + "parking_lot 0.12.1", + "protobuf", + "thiserror", +] + [[package]] name = "prost" version = "0.12.4" @@ -3289,6 +3456,12 @@ dependencies = [ "prost", ] +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "protobuf-src" version = "1.1.0+21.5" @@ -3634,8 +3807,17 @@ checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.6", + "regex-syntax 0.8.3", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -3646,9 +3828,15 @@ checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.3", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.3" @@ -4124,12 +4312,27 @@ dependencies = [ "keccak", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shell-words" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde" +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -5287,6 +5490,16 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if 1.0.0", + "once_cell", +] + [[package]] name = "time" version = "0.1.45" @@ -5445,6 +5658,16 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-reactor" version = "0.1.12" @@ -5717,6 +5940,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log 0.4.21", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec 1.13.2", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -5903,6 +6156,12 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vcpkg" version = "0.2.15" @@ -6381,6 +6640,21 @@ dependencies = [ "time 0.3.34", ] +[[package]] +name = "yellowstone-grpc-client" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f4f6d836d214cb8789002d039412da354049e9ffe983c643ec492c4d934698f" +dependencies = [ + "bytes 1.6.0", + "futures 0.3.30", + "http 0.2.12", + "thiserror", + "tonic", + "tonic-health", + "yellowstone-grpc-proto 1.13.0", +] + [[package]] name = "yellowstone-grpc-client" version = "1.15.0+solana.1.17" @@ -6391,7 +6665,24 @@ dependencies = [ "thiserror", "tonic", "tonic-health", - "yellowstone-grpc-proto", + "yellowstone-grpc-proto 1.14.0+solana.1.17", +] + +[[package]] +name = "yellowstone-grpc-proto" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c00b66d40d85c405f14b420d7674e98c70d06f6b673f36c9e0285f81b9b797d" +dependencies = [ + "anyhow", + "bincode", + "prost", + "protobuf-src", + "solana-account-decoder", + "solana-sdk", + "solana-transaction-status", + "tonic", + "tonic-build", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 662533f..c19bc87 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "connector", + "chaindata_standalone" ] resolver = "2" @@ -52,5 +53,3 @@ rustls = "0.20.8" warp = "0.3" - - diff --git a/chaindata_standalone/.gitignore b/chaindata_standalone/.gitignore new file mode 100644 index 0000000..f76246f --- /dev/null +++ b/chaindata_standalone/.gitignore @@ -0,0 +1 @@ +benchinput/ diff --git a/chaindata_standalone/Cargo.toml b/chaindata_standalone/Cargo.toml new file mode 100644 index 0000000..5e82244 --- /dev/null +++ b/chaindata_standalone/Cargo.toml @@ -0,0 +1,56 @@ +[package] +name = "chaindata_standalone" +version = "0.1.0" +edition = "2021" + +[[bin]] +name = "replay_slot_account_stream" +path = "src/replay_slot_account_stream.rs" + +[dependencies] +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } + +tokio = { workspace = true } +tokio-stream = { workspace = true } + +csv = "1.3.0" +log = { workspace = true } +anyhow = { workspace = true } + +itertools = { workspace = true } +futures = { workspace = true } + +lazy_static = "1.5.0" +serde = { workspace = true } +serde_json = { workspace = true } +serde_derive = { workspace = true } + +lz4 = "1.24.0" +base64 = "0.21.7" + +prometheus = "0.13.4" + +bincode = "1.3.3" +async-trait = { workspace = true } + +mango-feeds-connector = { path = "../connector" } +# note: this version does not relax the solana version +geyser-grpc-connector = { tag = "v0.10.6+yellowstone.1.13+solana.1.17.28", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" } +yellowstone-grpc-proto = "1.13.0" +#yellowstone-grpc-client = "1.14.0" + +jsonrpc-core = "18.0.0" +jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http", "tls"] } +jsonrpc-derive = "18.0.0" +jsonrpc-pubsub = "18.0.0" + +solana-sdk = { workspace = true } +solana-client = { workspace = true } +solana-rpc-client-api = "1.17" +solana-account-decoder = "1.17" +clap = { version = "3.2.25", features = ["derive"] } + +[lints.clippy] +needless_return = "allow" +enum_glob_use = "deny" diff --git a/chaindata_standalone/SAMPLEFLOWS.md b/chaindata_standalone/SAMPLEFLOWS.md new file mode 100644 index 0000000..b636811 --- /dev/null +++ b/chaindata_standalone/SAMPLEFLOWS.md @@ -0,0 +1,39 @@ + + + +``` +2024-08-12T08:58:14.644200Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195675784 +2024-08-12T08:58:14.739358Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676308 +2024-08-12T08:58:14.774673Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676381 +2024-08-12T08:58:14.899019Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676458 +2024-08-12T08:58:14.899313Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676472 +2024-08-12T08:58:14.914404Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676532 +2024-08-12T08:58:14.946106Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676565 +2024-08-12T08:58:14.946583Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676579 +2024-08-12T08:58:14.971928Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676593 +2024-08-12T08:58:14.986530Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676627 +2024-08-12T08:58:15.005577Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676657 +2024-08-12T08:58:15.864777Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195675784 +2024-08-12T08:58:15.864815Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195675784 +2024-08-12T08:58:15.864856Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 +2024-08-12T08:58:17.875143Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676308 +2024-08-12T08:58:17.875227Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676308 +2024-08-12T08:58:17.875421Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 +2024-08-12T08:58:17.875454Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676381 +2024-08-12T08:58:17.875516Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676381 +2024-08-12T08:58:17.875610Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 +2024-08-12T08:58:18.880978Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676458 +2024-08-12T08:58:18.881039Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676458 +2024-08-12T08:58:18.881218Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 +2024-08-12T08:58:18.881235Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676472 +2024-08-12T08:58:18.881297Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676472 +2024-08-12T08:58:18.881378Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 +2024-08-12T08:58:19.887019Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676532 +2024-08-12T08:58:19.887155Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676532 +2024-08-12T08:58:19.887320Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 +2024-08-12T08:58:19.887609Z TRACE chaindata_standalone::router_impl: [account_write_receiver->chain_data] account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676565 +2024-08-12T08:58:20.892641Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676565 +2024-08-12T08:58:20.892897Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676565 +2024-08-12T08:58:20.893127Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676579 +2024-08-12T08:58:20.893197Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676579 +``` \ No newline at end of file diff --git a/chaindata_standalone/src/account_write.rs b/chaindata_standalone/src/account_write.rs new file mode 100644 index 0000000..9295a5a --- /dev/null +++ b/chaindata_standalone/src/account_write.rs @@ -0,0 +1,41 @@ +#![allow(dead_code)] +use serde_derive::{Deserialize, Serialize}; +use solana_sdk::account::Account; +use solana_sdk::pubkey::Pubkey; + +pub fn account_write_from( + pubkey: Pubkey, + slot: u64, + write_version: u64, + account: Account, +) -> AccountWrite { + AccountWrite { + pubkey, + slot, + write_version, + lamports: account.lamports, + owner: account.owner, + executable: account.executable, + rent_epoch: account.rent_epoch, + data: account.data, + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct AccountWrite { + pub pubkey: Pubkey, + pub slot: u64, + pub write_version: u64, + pub lamports: u64, + pub owner: Pubkey, + pub executable: bool, + pub rent_epoch: u64, + pub data: Vec, + // is_selected < removed +} + +#[derive(Debug)] +pub enum AccountOrSnapshotUpdate { + AccountUpdate(AccountWrite), + SnapshotUpdate(Vec), +} diff --git a/chaindata_standalone/src/get_program_account.rs b/chaindata_standalone/src/get_program_account.rs new file mode 100644 index 0000000..41db1a8 --- /dev/null +++ b/chaindata_standalone/src/get_program_account.rs @@ -0,0 +1,376 @@ +#![allow(dead_code)] + +use std::collections::HashSet; +use std::{str::FromStr, sync::Arc, time::Duration}; + +use crate::account_write::{account_write_from, AccountWrite}; +use crate::solana_rpc_minimal::rpc_accounts_scan::RpcAccountsScanClient; +use base64::Engine; +use jsonrpc_core_client::transports::http; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use solana_account_decoder::{UiAccount, UiAccountEncoding}; +use solana_client::{ + nonblocking::rpc_client::RpcClient, + rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, + rpc_response::OptionalContext, +}; +use solana_rpc_client_api::filter::{Memcmp, RpcFilterType}; +use solana_sdk::account::Account; +use solana_sdk::{ + account::{Account as SolanaAccount, AccountSharedData, ReadableAccount}, + commitment_config::CommitmentConfig, + pubkey::Pubkey, +}; +use tracing::info; + +pub struct CustomSnapshotProgramAccounts { + pub slot: u64, + pub program_id: Option, + pub accounts: Vec, + pub missing_accounts: Vec, +} + +#[derive(Clone, PartialEq, Debug)] +pub enum FeedMetadata { + InvalidAccount(Pubkey), + SnapshotStart(Option), + SnapshotEnd(Option), +} + +// called on startup to get the required accounts, few calls with some 100 thousand accounts +#[tracing::instrument(skip_all, level = "trace")] +pub async fn get_snapshot_gta( + rpc_http_url: &str, + owner_id: &Pubkey, +) -> anyhow::Result { + let rpc_client = Arc::new(RpcClient::new_with_timeout_and_commitment( + rpc_http_url.to_string(), + Duration::from_secs(60 * 20), + CommitmentConfig::confirmed(), + )); + let config = RpcProgramAccountsConfig { + filters: Some(vec![ + RpcFilterType::DataSize(165), + RpcFilterType::Memcmp(Memcmp::new_base58_encoded( + 32, + owner_id.to_bytes().as_slice(), + )), + ]), + account_config: Default::default(), + with_context: Some(true), + }; + + // println!("{}", serde_json::to_string(&config)?); + + let token_program = Pubkey::from_str("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA").unwrap(); + let result = + get_compressed_program_account_rpc(&rpc_client, &HashSet::from([token_program]), config) + .await?; + + Ok(CustomSnapshotProgramAccounts { + slot: result.0, + accounts: result.1, + program_id: Some(token_program), + missing_accounts: vec![], + }) +} + +// called on startup to get the required accounts, few calls with some 100 thousand accounts +#[tracing::instrument(skip_all, level = "trace")] +pub async fn get_snapshot_gpa( + rpc_http_url: &str, + program_id: &Pubkey, + use_compression: bool, +) -> anyhow::Result { + let result = if use_compression { + get_compressed_program_account(rpc_http_url, &[*program_id].into_iter().collect()).await? + } else { + get_uncompressed_program_account(rpc_http_url, program_id.to_string()).await? + }; + + Ok(CustomSnapshotProgramAccounts { + slot: result.0, + accounts: result.1, + program_id: Some(*program_id), + missing_accounts: vec![], + }) +} + +// called on startup to get the required accounts, few calls with some 100 thousand accounts +#[tracing::instrument(skip_all, level = "trace")] +pub async fn get_snapshot_gma( + rpc_http_url: &str, + keys: &[Pubkey], +) -> anyhow::Result { + let keys = keys.iter().map(|x| x.to_string()).collect::>(); + feeds_get_snapshot_gma(rpc_http_url, keys).await +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct RpcKeyedCompressedAccount { + pub p: String, + pub a: String, +} + +// called on startup to get the required accounts, few calls with some 100 thousand accounts +#[tracing::instrument(skip_all, level = "trace")] +pub async fn get_compressed_program_account( + rpc_url: &str, + filters: &HashSet, +) -> anyhow::Result<(u64, Vec)> { + // setting larget timeout because gPA can take a lot of time + let rpc_client = Arc::new(RpcClient::new_with_timeout_and_commitment( + rpc_url.to_string(), + Duration::from_secs(60 * 20), + CommitmentConfig::finalized(), + )); + let config = RpcProgramAccountsConfig { + filters: None, + with_context: Some(true), + ..Default::default() + }; + + get_compressed_program_account_rpc(&rpc_client, filters, config).await +} + +// called on startup to get the required accounts, few calls with some 100 thousand accounts +#[tracing::instrument(skip_all, level = "trace")] +pub async fn get_compressed_program_account_rpc( + rpc_client: &RpcClient, + filters: &HashSet, + config: RpcProgramAccountsConfig, +) -> anyhow::Result<(u64, Vec)> { + let config = RpcProgramAccountsConfig { + with_context: Some(true), + ..config + }; + + let mut snap_result = vec![]; + let mut min_slot = u64::MAX; + + // use getGPA compressed if available + for program_id in filters.iter() { + info!("gPA for {}", program_id); + + let result = rpc_client + .send::>>( + solana_client::rpc_request::RpcRequest::Custom { + method: "getProgramAccountsCompressed", + }, + json!([program_id.to_string(), config]), + ) + .await; + + // failed to get over compressed program accounts + match result { + Ok(OptionalContext::Context(response)) => { + info!("Received compressed data for {}", program_id); + let updated_slot = response.context.slot; + min_slot = updated_slot.min(min_slot); + + for key_account in response.value { + let base64_decoded = + base64::engine::general_purpose::STANDARD.decode(&key_account.a)?; + // decompress all the account information + let uncompressed = lz4::block::decompress(&base64_decoded, None)?; + let shared_data = bincode::deserialize::(&uncompressed)?; + let account = SolanaAccount { + lamports: shared_data.lamports(), + data: shared_data.data().to_vec(), + owner: *shared_data.owner(), + executable: shared_data.executable(), + rent_epoch: shared_data.rent_epoch(), + }; + snap_result.push(AccountWrite { + pubkey: Pubkey::from_str(&key_account.p).unwrap(), + lamports: account.lamports, + data: account.data, + owner: account.owner, + executable: account.executable, + rent_epoch: account.rent_epoch, + slot: updated_slot, + write_version: 0, + // is_selected: false, + }); + } + + info!( + "Decompressed snapshot for {} with {} accounts", + program_id, + snap_result.len() + ); + } + Err(e) => { + anyhow::bail!( + "failed to get program {} account snapshot: {}", + program_id, + e + ) + } + _ => { + anyhow::bail!( + "failed to get program {} account snapshot (unknown reason)", + program_id + ) + } + } + } + + Ok((min_slot, snap_result)) +} + +// called on startup to get the required accounts, few calls with some 100 thousand accounts +#[tracing::instrument(skip_all, level = "trace")] +pub async fn get_uncompressed_program_account( + rpc_url: &str, + program_id: String, +) -> anyhow::Result<(u64, Vec)> { + let result = feeds_get_snapshot_gpa(rpc_url, program_id).await?; + + Ok(result) +} + +pub async fn feeds_get_snapshot_gpa( + rpc_http_url: &str, + program_id: String, +) -> anyhow::Result<(u64, Vec)> { + let rpc_client = http::connect::(rpc_http_url) + .await + .map_err_anyhow()?; + + let account_info_config = RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64), + commitment: Some(CommitmentConfig::finalized()), + data_slice: None, + min_context_slot: None, + }; + let program_accounts_config = RpcProgramAccountsConfig { + filters: None, + with_context: Some(true), + account_config: account_info_config.clone(), + }; + + tracing::log::debug!("requesting gpa snapshot {}", program_id); + let account_snapshot = rpc_client + .get_program_accounts(program_id.clone(), Some(program_accounts_config.clone())) + .await + .map_err_anyhow()?; + tracing::log::debug!("gpa snapshot received {}", program_id); + + match account_snapshot { + OptionalContext::Context(snapshot) => { + let snapshot_slot = snapshot.context.slot; + let mut accounts = vec![]; + + for acc in snapshot.value { + let (key, account) = (acc.pubkey, acc.account); + let pubkey = Pubkey::from_str(key.as_str()).unwrap(); + let account: Account = account.decode().unwrap(); + accounts.push(account_write_from(pubkey, snapshot_slot, 0, account)); + } + + Ok((snapshot_slot, accounts)) + } + OptionalContext::NoContext(_) => anyhow::bail!("bad snapshot format"), + } +} + +async fn feeds_get_snapshot_gma( + rpc_http_url: &str, + ids: Vec, +) -> anyhow::Result { + let rpc_client = http::connect::(rpc_http_url) + .await + .map_err_anyhow()?; + + let account_info_config = RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64), + commitment: Some(CommitmentConfig::finalized()), + data_slice: None, + min_context_slot: None, + }; + + tracing::log::debug!("requesting gma snapshot {:?}", ids); + let account_snapshot_response = rpc_client + .get_multiple_accounts(ids.clone(), Some(account_info_config)) + .await + .map_err_anyhow()?; + tracing::log::debug!("gma snapshot received {:?}", ids); + + let first_full_shot = account_snapshot_response.context.slot; + + let acc: Vec<(String, Option)> = ids + .iter() + .zip(account_snapshot_response.value) + .map(|x| (x.0.clone(), x.1)) + .collect(); + + let mut accounts = vec![]; + let mut missing_accounts = vec![]; + + for (key, account) in acc { + let pubkey = Pubkey::from_str(key.as_str()).unwrap(); + if let Some(account) = account { + let account: Account = account.decode().unwrap(); + accounts.push(account_write_from(pubkey, first_full_shot, 0, account)); + } else { + missing_accounts.push(pubkey); + } + } + + Ok(CustomSnapshotProgramAccounts { + slot: first_full_shot, + accounts, + missing_accounts, + program_id: None, + }) +} + +/// Fetch multiple account using one request per chunk of `max_chunk_size` accounts +/// +/// WARNING: some accounts requested may be missing from the result +pub async fn fetch_multiple_accounts( + rpc: &RpcClient, + all_keys: &[Pubkey], + max_chunk_size: usize, +) -> anyhow::Result> { + let config = RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64), + ..RpcAccountInfoConfig::default() + }; + + let mut raw_results = vec![]; + + for keys in all_keys.chunks(max_chunk_size) { + let account_info_config = config.clone(); + let keys: Vec = keys.to_vec(); + let req_res = rpc + .get_multiple_accounts_with_config(&keys, account_info_config) + .await?; + + let r: Vec<(Pubkey, Option)> = keys.into_iter().zip(req_res.value).collect(); + raw_results.push(r); + } + + let result = raw_results + .into_iter() + .flatten() + .filter_map(|(pubkey, account_opt)| account_opt.map(|acc| (pubkey, acc))) + .collect::>(); + + Ok(result) +} + +trait AnyhowWrap { + type Value; + fn map_err_anyhow(self) -> anyhow::Result; +} + +impl AnyhowWrap for Result { + type Value = T; + fn map_err_anyhow(self) -> anyhow::Result { + self.map_err(|err| anyhow::anyhow!("{:?}", err)) + } +} diff --git a/chaindata_standalone/src/metrics.rs b/chaindata_standalone/src/metrics.rs new file mode 100644 index 0000000..7833e6c --- /dev/null +++ b/chaindata_standalone/src/metrics.rs @@ -0,0 +1,19 @@ +use prometheus::{register_int_counter, IntCounter}; +lazy_static::lazy_static! { + pub static ref SNAPSHOT_ACCOUNTS_CNT: IntCounter = + register_int_counter!("SNAPSHOT_ACCOUNTS_CNT", "Counter").unwrap(); + pub static ref GRPC_PLUMBING_ACCOUNT_MESSAGE_CNT: IntCounter = + register_int_counter!("GRPC_PLUMBING_ACCOUNT_MESSAGE_CNT", "Counter").unwrap(); + pub static ref GRPC_PLUMBING_SLOT_MESSAGE_CNT: IntCounter = + register_int_counter!("GRPC_PLUMBING_SLOT_MESSAGE_CNT", "Counter").unwrap(); + pub static ref CHAINDATA_ACCOUNT_WRITE_IN: IntCounter = + register_int_counter!("CHAINDATA_ACCOUNT_WRITE_IN", "Counter").unwrap(); + pub static ref CHAINDATA_SLOT_UPDATE_IN: IntCounter = + register_int_counter!("CHAINDATA_SLOT_UPDATE_IN", "Counter").unwrap(); + pub static ref CHAINDATA_UPDATE_ACCOUNT: IntCounter = + register_int_counter!("CHAINDATA_UPDATE_ACCOUNT", "Counter").unwrap(); + pub static ref CHAINDATA_SNAP_UPDATE_ACCOUNT: IntCounter = + register_int_counter!("CHAINDATA_SNAP_UPDATE_ACCOUNT", "Counter").unwrap(); + pub static ref ACCOUNT_UPDATE_SENDER: IntCounter = + register_int_counter!("ACCOUNT_UPDATE_SENDER", "Counter").unwrap(); +} diff --git a/chaindata_standalone/src/metrics_dump.rs b/chaindata_standalone/src/metrics_dump.rs new file mode 100644 index 0000000..92deb09 --- /dev/null +++ b/chaindata_standalone/src/metrics_dump.rs @@ -0,0 +1,43 @@ +use log::{debug, info, trace}; +use prometheus::core::{Collector, Metric}; +use prometheus::IntCounter; +use std::f64::NAN; +use std::time::Duration; +use tokio::time::{interval, sleep, Instant, Interval}; + +pub fn start_metrics_dumper(prom_counter: &IntCounter) { + let metric = prom_counter.clone(); + let name = prom_counter + .desc() + .get(0) + .map(|x| x.fq_name.clone()) + .unwrap_or("noname".to_string()); + + tokio::spawn(async move { + let mut interval = interval(Duration::from_millis(3000)); + + let mut last_observed_at = Instant::now(); + let mut last_observed_value: u64 = u64::MIN; + + loop { + let value = metric.get(); + trace!("counter <{}> value: {}", name, value); + + if last_observed_value != u64::MIN { + let elapsed = last_observed_at.elapsed().as_secs_f64(); + let delta = value - last_observed_value; + debug!( + "counter <{}> (value={}) with throughput {:.1}/s", + name, + value, + delta as f64 / elapsed + ); + } + + last_observed_value = value; + last_observed_at = Instant::now(); + + interval.tick().await; + } + }); +} diff --git a/chaindata_standalone/src/replay_slot_account_stream.rs b/chaindata_standalone/src/replay_slot_account_stream.rs new file mode 100644 index 0000000..877ac63 --- /dev/null +++ b/chaindata_standalone/src/replay_slot_account_stream.rs @@ -0,0 +1,141 @@ +use itertools::Itertools; +use log::{info, trace}; +use mango_feeds_connector::chain_data::{AccountData, ChainData, SlotData, SlotStatus}; +use solana_sdk::account::AccountSharedData; +use solana_sdk::clock::Slot; +use solana_sdk::commitment_config::CommitmentLevel; +use solana_sdk::pubkey::Pubkey; +use std::fs::File; +use std::io; +use std::io::BufRead; +use std::str::FromStr; +use std::time::Instant; +use tracing_subscriber::fmt::format::FmtSpan; +use tracing_subscriber::EnvFilter; + +// #[derive(Parser, Debug, Clone)] +// #[clap()] +// struct Cli { +// #[clap(short, long)] +// replay_file: Option, +// } + +const RAYDIUM_AMM_PUBKEY: &str = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8"; + +pub fn main() { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .with_span_events(FmtSpan::CLOSE) + .init(); + + // let Cli { replay_file } = Cli::parse(); + + let mut chain_data = ChainData::new(); + let mut slot_cnt = 0; + let mut account_cnt = 0; + let started_at = Instant::now(); + // read line + // let buffer: BufReader<_> = match replay_file { + // None => { + // info!("use data from inline csv"); + // // 500k + // let data = include_bytes!("dump-slot-acccounts-fsn4-mixed-500k.csv"); + // io::BufReader::new(data.as_ref()) + // } + // Some(slot_stream_dump_file) => { + // info!("use replay_file: {}", slot_stream_dump_file); + // let file = File::open(slot_stream_dump_file).unwrap(); + // io::BufReader::new(file) + // } + // }; + + let buffer = { + info!("use data from inline csv"); + // 500k + let slot_stream_dump_file = + "chaindata_standalone/benchinput/dump-slot-acccounts-fsn4-mixed-500k.csv"; + let file = File::open(slot_stream_dump_file).unwrap(); + io::BufReader::new(file) + }; + for line in buffer.lines().map_while(Result::ok) { + // update_slot.slot, update_slot.parent.unwrap_or(0), short_status, since_epoch_ms + // slot, account_pk, write_version, data_len, since_epoch_ms + let rows = line.split(',').collect_vec(); + + if rows[0] == "MIXSLOT" { + let slot: u64 = rows[1].parse().unwrap(); + let parent: Option = + rows[2] + .parse() + .ok() + .and_then(|v| if v == 0 { None } else { Some(v) }); + let commitment_level = match rows[3].to_string().as_str() { + "P" => CommitmentLevel::Processed, + "C" => CommitmentLevel::Confirmed, + "F" => CommitmentLevel::Finalized, + _ => panic!("invalid commitment level"), + }; + let since_epoch_ms: u64 = rows[4].trim().parse().unwrap(); + + let slot_status = match commitment_level { + CommitmentLevel::Processed => SlotStatus::Processed, + CommitmentLevel::Confirmed => SlotStatus::Confirmed, + CommitmentLevel::Finalized => SlotStatus::Rooted, + _ => panic!("invalid commitment level"), + }; + const INIT_CHAIN: Slot = 0; + trace!( + "MIXSLOT slot: {}, parent: {:?}, status: {:?}, since_epoch_ms: {}", + slot, + parent, + slot_status, + since_epoch_ms + ); + let slot_data = SlotData { + slot, + parent, + status: slot_status, + chain: INIT_CHAIN, + }; + slot_cnt += 1; + chain_data.update_slot(slot_data); + } else if rows[0] == "MIXACCOUNT" { + let slot: u64 = rows[1].parse().unwrap(); + let account_pk: String = rows[2].parse().unwrap(); + let account_pk = Pubkey::from_str(&account_pk).unwrap(); + let write_version: u64 = rows[3].parse().unwrap(); + let data_len: u64 = rows[4].parse().unwrap(); + let since_epoch_ms: u64 = rows[5].trim().parse().unwrap(); + trace!("MIXACCOUNT slot: {}, account_pk: {}, write_version: {}, data_len: {}, since_epoch_ms: {}", slot, account_pk, write_version, data_len, since_epoch_ms); + + let account_data = AccountData { + slot, + write_version, + account: AccountSharedData::new( + slot, + data_len as usize, + &Pubkey::from_str(RAYDIUM_AMM_PUBKEY).unwrap(), + ), + }; + + account_cnt += 1; + chain_data.update_account(account_pk, account_data); + } + + if (slot_cnt + account_cnt) % 100_000 == 0 { + info!( + "progress .. slot_cnt: {}, account_cnt: {}, elapsed: {:.02}s", + slot_cnt, + account_cnt, + started_at.elapsed().as_secs_f64() + ); + } + } + + info!( + "slot_cnt: {}, account_cnt: {}, elapsed: {:.02}s", + slot_cnt, + account_cnt, + started_at.elapsed().as_secs_f64() + ); +} diff --git a/chaindata_standalone/src/solana_rpc_minimal.rs b/chaindata_standalone/src/solana_rpc_minimal.rs new file mode 100644 index 0000000..dbbfa3f --- /dev/null +++ b/chaindata_standalone/src/solana_rpc_minimal.rs @@ -0,0 +1,122 @@ +// cloned from mango-feeds + +pub mod rpc_accounts_scan { + use jsonrpc_core::Result; + use jsonrpc_derive::rpc; + use solana_account_decoder::UiAccount; + use solana_rpc_client_api::config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}; + use solana_rpc_client_api::response::{ + OptionalContext, Response as RpcResponse, RpcKeyedAccount, + }; + + /// this definition is derived from solana-rpc/rpc.rs + /// we want to avoid the heavy dependency to solana-rpc + /// the crate solana-rpc-client provides some client methods but do not expose the ```Context```we need + /// + #[rpc] + pub trait RpcAccountsScan { + type Metadata; + + #[rpc(meta, name = "getProgramAccounts")] + fn get_program_accounts( + &self, + meta: Self::Metadata, + program_id_str: String, + config: Option, + ) -> Result>>; + + #[rpc(meta, name = "getMultipleAccounts")] + fn get_multiple_accounts( + &self, + meta: Self::Metadata, + pubkey_strs: Vec, + config: Option, + ) -> Result>>>; + } +} + +pub mod rpc_pubsub { + use jsonrpc_core::Result; + use jsonrpc_derive::rpc; + use jsonrpc_pubsub::typed::Subscriber; + use jsonrpc_pubsub::SubscriptionId as PubSubSubscriptionId; + use solana_account_decoder::UiAccount; + use solana_rpc_client_api::config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}; + use solana_rpc_client_api::response::{Response as RpcResponse, RpcKeyedAccount, SlotUpdate}; + use std::sync::Arc; + + #[rpc] + pub trait RpcSolPubSub { + type Metadata; + + #[pubsub( + subscription = "accountNotification", + subscribe, + name = "accountSubscribe" + )] + fn account_subscribe( + &self, + meta: Self::Metadata, + subscriber: Subscriber>, + pubkey_str: String, + config: Option, + ); + + #[pubsub( + subscription = "accountNotification", + unsubscribe, + name = "accountUnsubscribe" + )] + fn account_unsubscribe( + &self, + meta: Option, + id: PubSubSubscriptionId, + ) -> Result; + + #[pubsub( + subscription = "programNotification", + subscribe, + name = "programSubscribe" + )] + fn program_subscribe( + &self, + meta: Self::Metadata, + subscriber: Subscriber>, + pubkey_str: String, + config: Option, + ); + + #[pubsub( + subscription = "programNotification", + unsubscribe, + name = "programUnsubscribe" + )] + fn program_unsubscribe( + &self, + meta: Option, + id: PubSubSubscriptionId, + ) -> Result; + + #[pubsub( + subscription = "slotsUpdatesNotification", + subscribe, + name = "slotsUpdatesSubscribe" + )] + fn slots_updates_subscribe( + &self, + meta: Self::Metadata, + subscriber: Subscriber>, + ); + + #[pubsub( + subscription = "slotsUpdatesNotification", + unsubscribe, + name = "slotsUpdatesUnsubscribe" + )] + fn slots_updates_unsubscribe( + &self, + meta: Option, + id: PubSubSubscriptionId, + ) -> Result; + } +} diff --git a/connector/Cargo.toml b/connector/Cargo.toml index b335088..91ad11e 100644 --- a/connector/Cargo.toml +++ b/connector/Cargo.toml @@ -16,6 +16,9 @@ harness = false default = [] [dependencies] +tracing = "0.1" +tracing-subscriber = "0.3" + jsonrpc-core = { workspace = true } jsonrpc-core-client = { workspace = true } jsonrpc-derive = "18.0.0" @@ -37,7 +40,6 @@ serde = { workspace = true } serde_derive = { workspace = true } log = { workspace = true } -tracing = "0.1.40" anyhow = { workspace = true } smallvec = "1.13.2" @@ -53,6 +55,7 @@ warp = { workspace = true } # 1.9.0+solana.1.16.1 yellowstone-grpc-client = { workspace = true } yellowstone-grpc-proto = { workspace = true } +csv = "1.3.0" [dev-dependencies] clap = { workspace = true }