Merge pull request #41 from blockworks-foundation/postgres_and_prometheus

postgres and prometheus and unbounded tx channels
This commit is contained in:
galactus 2023-01-31 12:13:05 +01:00 committed by GitHub
commit 4e060b5996
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1004 additions and 271 deletions

3
.gitignore vendored
View File

@ -1,3 +1,6 @@
target
node_modules
bench/metrics.csv
*.pem
*.pks
.env

493
Cargo.lock generated
View File

@ -205,9 +205,9 @@ dependencies = [
[[package]]
name = "async-trait"
version = "0.1.61"
version = "0.1.63"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "705339e0e4a9690e2908d2b3d049d85682cf19fbd5782494498fbf7003a6a282"
checksum = "eff18d764974428cf3a9328e23fc5c986f5fbed46e6cd4cdf42544df5d297ec1"
dependencies = [
"proc-macro2 1.0.50",
"quote 1.0.23",
@ -269,7 +269,7 @@ name = "bench"
version = "0.1.0"
dependencies = [
"anyhow",
"clap 4.1.1",
"clap 4.1.4",
"csv",
"dirs",
"log",
@ -456,18 +456,18 @@ dependencies = [
[[package]]
name = "bytemuck"
version = "1.12.3"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aaa3a8d9a1ca92e282c96a32d6511b695d7d994d1d102ba85d279f9b2756947f"
checksum = "c041d3eab048880cb0b86b256447da3f18859a163c3b8d8893f4e6368abe6393"
dependencies = [
"bytemuck_derive",
]
[[package]]
name = "bytemuck_derive"
version = "1.3.0"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fe233b960f12f8007e3db2d136e3cb1c291bfd7396e384ee76025fc1a3932b4"
checksum = "1aca418a974d83d40a0c1f0c5cba6ff4bc28d8df099109ca459a2118d40b6322"
dependencies = [
"proc-macro2 1.0.50",
"quote 1.0.23",
@ -498,9 +498,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.0.78"
version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a20104e2335ce8a659d6dd92a51a767a0c062599c73b343fd152cb401e828c3d"
checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f"
dependencies = [
"jobserver",
]
@ -579,9 +579,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.1.1"
version = "4.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ec7a4128863c188deefe750ac1d1dfe66c236909f845af04beed823638dc1b2"
checksum = "f13b9c79b5d1dd500d20ef541215a6423c75829ef43117e1b4d17fd8af0b5d76"
dependencies = [
"bitflags",
"clap_derive",
@ -1045,6 +1045,12 @@ dependencies = [
"syn 0.15.44",
]
[[package]]
name = "doc-comment"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
[[package]]
name = "eager"
version = "0.1.0"
@ -1088,9 +1094,9 @@ dependencies = [
[[package]]
name = "either"
version = "1.8.0"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797"
checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91"
[[package]]
name = "encode_unicode"
@ -1179,6 +1185,12 @@ version = "2.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "fallible-iterator"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
[[package]]
name = "fastrand"
version = "1.8.0"
@ -1210,6 +1222,21 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
dependencies = [
"foreign-types-shared",
]
[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]]
name = "form_urlencoded"
version = "1.1.0"
@ -1220,10 +1247,16 @@ dependencies = [
]
[[package]]
name = "futures"
version = "0.3.25"
name = "ftoa"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0"
checksum = "ca45aac12b6c561b6289bc68957cb1db3dccf870e1951d590202de5e24f1dd35"
[[package]]
name = "futures"
version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13e2792b0ff0340399d58445b88fd9770e3489eff258a4cbc1523418f12abf84"
dependencies = [
"futures-channel",
"futures-core",
@ -1236,9 +1269,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.25"
version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed"
checksum = "2e5317663a9089767a1ec00a487df42e0ca174b61b4483213ac24448e4664df5"
dependencies = [
"futures-core",
"futures-sink",
@ -1246,15 +1279,15 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.25"
version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac"
checksum = "ec90ff4d0fe1f57d600049061dc6bb68ed03c7d2fbd697274c41805dcb3f8608"
[[package]]
name = "futures-executor"
version = "0.3.25"
version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7acc85df6714c176ab5edf386123fafe217be88c0840ec11f199441134a074e2"
checksum = "e8de0a35a6ab97ec8869e32a2473f4b1324459e14c29275d14b10cb1fd19b50e"
dependencies = [
"futures-core",
"futures-task",
@ -1263,9 +1296,9 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.25"
version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb"
checksum = "bfb8371b6fb2aeb2d280374607aeabfc99d95c72edfe51692e42d3d7f0d08531"
[[package]]
name = "futures-lite"
@ -1284,9 +1317,9 @@ dependencies = [
[[package]]
name = "futures-macro"
version = "0.3.25"
version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d"
checksum = "95a73af87da33b5acf53acfebdc339fe592ecf5357ac7c0a7734ab9d8c876a70"
dependencies = [
"proc-macro2 1.0.50",
"quote 1.0.23",
@ -1295,15 +1328,15 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.25"
version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9"
checksum = "f310820bb3e8cfd46c80db4d7fb8353e15dfff853a127158425f31e0be6c8364"
[[package]]
name = "futures-task"
version = "0.3.25"
version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea"
checksum = "dcf79a1bf610b10f42aea489289c5a2c478a786509693b80cd39c44ccd936366"
[[package]]
name = "futures-timer"
@ -1317,9 +1350,9 @@ dependencies = [
[[package]]
name = "futures-util"
version = "0.3.25"
version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6"
checksum = "9c1d6de3acfef38d2be4b1f543f553131788603495be83da675e180c8d6b7bd1"
dependencies = [
"futures-channel",
"futures-core",
@ -1682,6 +1715,7 @@ checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399"
dependencies = [
"autocfg",
"hashbrown 0.12.3",
"serde",
]
[[package]]
@ -1893,7 +1927,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baa6da1e4199c10d7b1d0a6e5e8bd8e55f351163b6f4b3cbb044672a69bd4c1c"
dependencies = [
"heck",
"proc-macro-crate 1.2.1",
"proc-macro-crate 1.3.0",
"proc-macro2 1.0.50",
"quote 1.0.23",
"syn 1.0.107",
@ -2068,21 +2102,25 @@ dependencies = [
"bincode",
"bs58",
"bytes",
"clap 4.1.1",
"clap 4.1.4",
"const_env",
"dashmap",
"futures",
"jsonrpsee",
"log",
"native-tls",
"postgres-native-tls",
"procinfo",
"serde",
"serde_json",
"serde_prometheus",
"solana-client",
"solana-sdk",
"solana-transaction-status",
"solana-version",
"thiserror",
"tokio",
"tokio-postgres",
"tracing-subscriber",
]
@ -2105,6 +2143,15 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "md-5"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca"
dependencies = [
"digest 0.10.6",
]
[[package]]
name = "memchr"
version = "2.5.0"
@ -2183,6 +2230,24 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "native-tls"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e"
dependencies = [
"lazy_static",
"libc",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework",
"security-framework-sys",
"tempfile",
]
[[package]]
name = "nix"
version = "0.24.3"
@ -2211,6 +2276,15 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "nom8"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae01545c9c7fc4486ab7debaf2aad7003ac19431791868fb2e8066df97fad2f8"
dependencies = [
"memchr",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
@ -2332,20 +2406,20 @@ dependencies = [
[[package]]
name = "num_enum"
version = "0.5.7"
version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf5395665662ef45796a4ff5486c5d41d29e0c09640af4c5f17fd94ee2c119c9"
checksum = "8d829733185c1ca374f17e52b762f24f535ec625d2cc1f070e34c8a9068f341b"
dependencies = [
"num_enum_derive",
]
[[package]]
name = "num_enum_derive"
version = "0.5.7"
version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b0498641e53dd6ac1a4f22547548caa6864cc4933784319cd1775271c5a46ce"
checksum = "2be1598bf1c313dcdd12092e3f1920f463462525a21b7b4e11b4168353d0123e"
dependencies = [
"proc-macro-crate 1.2.1",
"proc-macro-crate 1.3.0",
"proc-macro2 1.0.50",
"quote 1.0.23",
"syn 1.0.107",
@ -2378,12 +2452,51 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "openssl"
version = "0.10.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b102428fd03bc5edf97f62620f7298614c45cedf287c271e7ed450bbaf83f2e1"
dependencies = [
"bitflags",
"cfg-if",
"foreign-types",
"libc",
"once_cell",
"openssl-macros",
"openssl-sys",
]
[[package]]
name = "openssl-macros"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c"
dependencies = [
"proc-macro2 1.0.50",
"quote 1.0.23",
"syn 1.0.107",
]
[[package]]
name = "openssl-probe"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.80"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23bbbf7854cd45b83958ebe919f0e8e516793727652e27fda10a8384cfc790b7"
dependencies = [
"autocfg",
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "os_str_bytes"
version = "6.4.1"
@ -2467,6 +2580,24 @@ dependencies = [
"num",
]
[[package]]
name = "phf"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "928c6535de93548188ef63bb7c4036bd415cd8f36ad25af44b9789b2ee72a48c"
dependencies = [
"phf_shared",
]
[[package]]
name = "phf_shared"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1fb5f6f826b772a8d4c0394209441e7d37cbbb967ae9c7e0e8134365c9ee676"
dependencies = [
"siphasher",
]
[[package]]
name = "pin-project"
version = "1.0.12"
@ -2528,6 +2659,48 @@ dependencies = [
"universal-hash",
]
[[package]]
name = "postgres-native-tls"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d442770e2b1e244bb5eb03b31c79b65bb2568f413b899eaba850fa945a65954"
dependencies = [
"futures",
"native-tls",
"tokio",
"tokio-native-tls",
"tokio-postgres",
]
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "878c6cbf956e03af9aa8204b407b9cbf47c072164800aa918c516cd4b056c50c"
dependencies = [
"base64 0.13.1",
"byteorder",
"bytes",
"fallible-iterator",
"hmac 0.12.1",
"md-5",
"memchr",
"rand 0.8.5",
"sha2 0.10.6",
"stringprep",
]
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73d946ec7d256b04dfadc4e6a3292324e6f417124750fc5c0950f981b703a0f1"
dependencies = [
"bytes",
"fallible-iterator",
"postgres-protocol",
]
[[package]]
name = "ppv-lite86"
version = "0.2.17"
@ -2545,13 +2718,12 @@ dependencies = [
[[package]]
name = "proc-macro-crate"
version = "1.2.1"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eda0fc3b0fb7c975631757e14d9049da17374063edb6ebbcbc54d880d4fe94e9"
checksum = "66618389e4ec1c7afe67d51a9bf34ff9236480f8d51e7489b7d5ab0303c13f34"
dependencies = [
"once_cell",
"thiserror",
"toml",
"toml_edit",
]
[[package]]
@ -2780,9 +2952,9 @@ dependencies = [
[[package]]
name = "rayon-core"
version = "1.10.1"
version = "1.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cac410af5d00ab6884528b4ab69d1e8e146e8d471201800fa1b4524126de6ad3"
checksum = "356a0625f1954f730c0201cdab48611198dc6ce21f4acff55089b5a78e6e835b"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
@ -2856,12 +3028,12 @@ dependencies = [
[[package]]
name = "reqwest"
version = "0.11.13"
version = "0.11.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68cc60575865c7831548863cc02356512e3f1dc2f3f82cb837d7fc4cc8f3c97c"
checksum = "21eed90ec8570952d53b772ecf8f206aa1ec9a3d76b2521c56c42973f2d91ee9"
dependencies = [
"async-compression",
"base64 0.13.1",
"base64 0.21.0",
"bytes",
"encoding_rs",
"futures-core",
@ -2957,9 +3129,9 @@ dependencies = [
[[package]]
name = "rustix"
version = "0.36.6"
version = "0.36.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4feacf7db682c6c329c4ede12649cd36ecab0f3be5b7d74e6a20304725db4549"
checksum = "d4fdebc4b395b7fbb9ab11e462e20ed9051e7b16e42d24042c776eca0ac81b03"
dependencies = [
"bitflags",
"errno",
@ -3144,6 +3316,21 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_prometheus"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25fcd6131bac47a32328d1ba1ee15a27f8d91ab2e5920dba71dbe93d2648f6b1"
dependencies = [
"ftoa",
"indexmap",
"itoa 0.4.8",
"lazy_static",
"regex",
"serde",
"snafu",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
@ -3268,6 +3455,12 @@ version = "1.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c"
[[package]]
name = "siphasher"
version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
[[package]]
name = "sized-chunks"
version = "0.6.5"
@ -3293,6 +3486,27 @@ version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
[[package]]
name = "snafu"
version = "0.6.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eab12d3c261b2308b0d80c26fffb58d17eba81a4be97890101f416b478c79ca7"
dependencies = [
"doc-comment",
"snafu-derive",
]
[[package]]
name = "snafu-derive"
version = "0.6.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1508efa03c362e23817f96cde18abed596a25219a8b2c66e8db33c03543d315b"
dependencies = [
"proc-macro2 1.0.50",
"quote 1.0.23",
"syn 1.0.107",
]
[[package]]
name = "socket2"
version = "0.4.7"
@ -3321,9 +3535,9 @@ dependencies = [
[[package]]
name = "solana-account-decoder"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87b4533fe4abfd4c540ece335ad767cc91e93a5263069e2e59225be555c5c839"
checksum = "b04c1316932017ae5f947e83d77cc0356c4a395130a480cdc17ffb0570a0c115"
dependencies = [
"Inflector",
"base64 0.13.1",
@ -3346,9 +3560,9 @@ dependencies = [
[[package]]
name = "solana-address-lookup-table-program"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ceb299cd9df79f4c1abda6f140813f1e451a9a8810d18b84ff9dc3b81f1593c6"
checksum = "5be490ed850c99286a4e4ba169ce20695336fe666c56bd823bfd8db689d23a58"
dependencies = [
"bincode",
"bytemuck",
@ -3367,9 +3581,9 @@ dependencies = [
[[package]]
name = "solana-clap-utils"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dde26cacc87164747988cf1cef8e701155188a8d51ed45a7d1be268bc49c41c2"
checksum = "36228e03e14bc7d7707189b66f625981993f1a000b0b192d5b42367349901d91"
dependencies = [
"chrono",
"clap 2.34.0",
@ -3385,9 +3599,9 @@ dependencies = [
[[package]]
name = "solana-cli-config"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aad3cc2faa1721149f1af05b8f10daed33c68b7523bcc82bc7f2835f2ed51746"
checksum = "6c43b08f24fd605eaeaafe0e834dc9b209137ac253bc874d32a5bdd791cbd318"
dependencies = [
"dirs-next",
"lazy_static",
@ -3401,9 +3615,9 @@ dependencies = [
[[package]]
name = "solana-client"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2d59b69ee79e5b32f41b381d0e54d81e3a89e3ebe35335a586241dc000e8374"
checksum = "a3e270b1afd0b360c2aec42ae302ae7980ebb226017275b32a6156ab2ccbdad9"
dependencies = [
"async-mutex",
"async-trait",
@ -3455,9 +3669,9 @@ dependencies = [
[[package]]
name = "solana-config-program"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b80b3dd0cd746511a4bbf9a64a8ddb2b528800437928dd90a1a16ef0e1b95be3"
checksum = "fb275d80a482134f0f0c5439b0c40ba3f04bef70dbc46c0e47f6107f6ae482a8"
dependencies = [
"bincode",
"chrono",
@ -3469,9 +3683,9 @@ dependencies = [
[[package]]
name = "solana-faucet"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e0634db95537eeb77d78f402ea70b513b8bc12ed926b379eca53a1ba5038bbc"
checksum = "b3ef95ad1f87b8c011d0e4d85a46f4a703e9dd7e722459659b395ed70d6ba924"
dependencies = [
"bincode",
"byteorder",
@ -3493,9 +3707,9 @@ dependencies = [
[[package]]
name = "solana-frozen-abi"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c39813ee5b249cb8ccb325d3639323eb3616e7bb9a2b1502936d7ea20530097"
checksum = "f44a019070a6cec4d3ad8605c5caa65bdaa13f00b5f1849340f44ffea63b625b"
dependencies = [
"ahash",
"blake3",
@ -3527,9 +3741,9 @@ dependencies = [
[[package]]
name = "solana-frozen-abi-macro"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dad43ac27c4b8d7a3ce0e2cb8642a7e3b8ea5e3c29ecea38045a8518519adccf"
checksum = "be23cc7a382f54dfe1348edb94610e5cc146b8eb21563cdd04062a403c75ba62"
dependencies = [
"proc-macro2 1.0.50",
"quote 1.0.23",
@ -3539,9 +3753,9 @@ dependencies = [
[[package]]
name = "solana-logger"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13a18f8d7490f712a4340998fca2b0d35afcdef671320a0e51f40b537363d592"
checksum = "447d16a70a1b5383736ef44801050c0e1affd022303b22ed899352f958c2de4b"
dependencies = [
"env_logger",
"lazy_static",
@ -3550,9 +3764,9 @@ dependencies = [
[[package]]
name = "solana-measure"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e365647d451d2b124d9705e92fcfc6e90790ae317495ff20043b6812eb8c41"
checksum = "2400d2534a19f7605c5059060edea0499600a223f1a1f6a4b172666c04946a77"
dependencies = [
"log",
"solana-sdk",
@ -3560,9 +3774,9 @@ dependencies = [
[[package]]
name = "solana-metrics"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c4630a427e772ad5a4a64ca43f0d80848af19a1057084c9611a1e71bf027fce"
checksum = "68aaa3d683945dc3b6ca38923ef952ca1f96a27b61f898a1ddf9f4cd79f2df92"
dependencies = [
"crossbeam-channel",
"gethostname",
@ -3574,9 +3788,9 @@ dependencies = [
[[package]]
name = "solana-net-utils"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "740fb87bea9d7b9eee070244441c9079b44fa223224fb1d6bd23da1b8ec0f2b3"
checksum = "d6d7093739e143d5e2edf3e81e523d47228adb802b847d66f4ab819be7ad6dc8"
dependencies = [
"bincode",
"clap 3.2.23",
@ -3596,9 +3810,9 @@ dependencies = [
[[package]]
name = "solana-perf"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "079105b92b89a0e0b3f238f1c2c40ebffd2171d632236c20f59abac79a8aa978"
checksum = "cbc742f8d53f0a6e6f3a27ed11c1d0764b5486813c721d625c56094fcd14e984"
dependencies = [
"ahash",
"bincode",
@ -3623,9 +3837,9 @@ dependencies = [
[[package]]
name = "solana-program"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0dafff676128fe508ab83147b6fb19534fc33f43ec14789da1f1867e9ea06887"
checksum = "d0937481f080f5dd495fae456c94718a7bacf30fb5fdabb02dcb8a9622e446d5"
dependencies = [
"base64 0.13.1",
"bincode",
@ -3672,9 +3886,9 @@ dependencies = [
[[package]]
name = "solana-program-runtime"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17865dc487a5f38e8f64a8ff3ff14e92a8a71be87ca6ee958ad07f5d1fa4cdf4"
checksum = "b4d12047608bac77fca000e18f7a2df3c7fa90656d7c7d387b1cd7faf18b238c"
dependencies = [
"base64 0.13.1",
"bincode",
@ -3699,9 +3913,9 @@ dependencies = [
[[package]]
name = "solana-rayon-threadlimit"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f82e4deecbe820847c88f091f9b721fad46276575fcfdf177bbc2743731dc25b"
checksum = "b6eca67181e0381532db4bc69a625b1f96a047be461ff9050c451add0165424f"
dependencies = [
"lazy_static",
"num_cpus",
@ -3709,9 +3923,9 @@ dependencies = [
[[package]]
name = "solana-remote-wallet"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09863751b4d9ca46297f0662293561c4a3846abc9cab7b691b091ea9ae3c9340"
checksum = "9b83d035ee90035ebcb07ec73672fdc0272e5b98899846dd29fcb31f856ac78c"
dependencies = [
"console",
"dialoguer",
@ -3728,9 +3942,9 @@ dependencies = [
[[package]]
name = "solana-sdk"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c702cc57432bc16eab54ad7b5668c2a3cdc72b0f820175972b4857e26ac4f49"
checksum = "390e7481c56dda2ceab2652beeda30a533e9667b34861a2eb4eec92fa1d826d7"
dependencies = [
"assert_matches",
"base64 0.13.1",
@ -3779,9 +3993,9 @@ dependencies = [
[[package]]
name = "solana-sdk-macro"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f89a14a8f1e7708fe19ee3140125e9d8279945ead74cb09e65c94dd5cf0640c3"
checksum = "33d0acbad862093ea123f3a27364336dcb0c8373522cd6810496a34e932c56c1"
dependencies = [
"bs58",
"proc-macro2 1.0.50",
@ -3792,9 +4006,9 @@ dependencies = [
[[package]]
name = "solana-streamer"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7116f13d20003e99f3f8fb5b1b20cb638a8c797ec8fe40dc4840a284bab1e53c"
checksum = "853b0187fdf233c13e8b7ba76e61d0c7cb49ca92c5fdb3b7568ad5ca30e2cf88"
dependencies = [
"crossbeam-channel",
"futures-util",
@ -3821,9 +4035,9 @@ dependencies = [
[[package]]
name = "solana-transaction-status"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff48b27221d728dd907400711aa42d07d5fe78c6bf9e35f850c78e89ee800e97"
checksum = "3c5bbdaed99403e4a17763bee60c1e0e3418524503c72b514ebff62efbcc9d33"
dependencies = [
"Inflector",
"base64 0.13.1",
@ -3850,9 +4064,9 @@ dependencies = [
[[package]]
name = "solana-version"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18f9c97e7d62d3e0ef04426cd7731689f5c675e0b4540fa5dca172ab261b57a3"
checksum = "5a46c9ecb15ccd5388511cec0c5bfb989589425f8286ce432ff64b55dc7bf61e"
dependencies = [
"log",
"rustc_version 0.4.0",
@ -3866,9 +4080,9 @@ dependencies = [
[[package]]
name = "solana-vote-program"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "694e6ecff6764540b555224308fa3cbccb59135e6029761febdba49d8197faaf"
checksum = "81ab9ff8928282cb42871a370435dd4713f700854801afb476cf63066f1337db"
dependencies = [
"bincode",
"log",
@ -3887,9 +4101,9 @@ dependencies = [
[[package]]
name = "solana-zk-token-sdk"
version = "1.14.12"
version = "1.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32395c4561673f7b4aa1f3a5b5a654eaa363041f67d92f5d680de72293ef7d1b"
checksum = "cebca4083e982ae01583d1a590c4d679e6f648a4761364ddfb43026d2c433142"
dependencies = [
"aes-gcm-siv",
"arrayref",
@ -3990,6 +4204,16 @@ dependencies = [
"thiserror",
]
[[package]]
name = "stringprep"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]]
name = "strsim"
version = "0.8.0"
@ -4183,9 +4407,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.24.2"
version = "1.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "597a12a59981d9e3c38d216785b0c37399f6e415e8d0712047620f189371b0bb"
checksum = "c8e00990ebabbe4c14c08aca901caed183ecd5c09562a12c824bb53d3c3fd3af"
dependencies = [
"autocfg",
"bytes",
@ -4212,6 +4436,40 @@ dependencies = [
"syn 1.0.107",
]
[[package]]
name = "tokio-native-tls"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b"
dependencies = [
"native-tls",
"tokio",
]
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29a12c1b3e0704ae7dfc25562629798b29c72e6b1d0a681b6f29ab4ae5e7f7bf"
dependencies = [
"async-trait",
"byteorder",
"bytes",
"fallible-iterator",
"futures-channel",
"futures-util",
"log",
"parking_lot",
"percent-encoding",
"phf",
"pin-project-lite",
"postgres-protocol",
"postgres-types",
"socket2",
"tokio",
"tokio-util",
]
[[package]]
name = "tokio-rustls"
version = "0.23.4"
@ -4267,13 +4525,30 @@ dependencies = [
[[package]]
name = "toml"
version = "0.5.10"
version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1333c76748e868a4d9d1017b5ab53171dfd095f70c712fdb4653a406547f598f"
checksum = "f4f7f0dd8d50a853a531c426359045b1998f04219d88799810762cd4ad314234"
dependencies = [
"serde",
]
[[package]]
name = "toml_datetime"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4553f467ac8e3d374bc9a177a26801e5d0f9b211aa1673fb137a403afd1c9cf5"
[[package]]
name = "toml_edit"
version = "0.18.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56c59d8dd7d0dcbc6428bf7aa2f0e823e26e43b3c9aca15bbc9475d23e5fa12b"
dependencies = [
"indexmap",
"nom8",
"toml_datetime",
]
[[package]]
name = "tower"
version = "0.4.13"
@ -4392,9 +4667,9 @@ checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba"
[[package]]
name = "unicode-bidi"
version = "0.3.8"
version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992"
checksum = "d54675592c1dbefd78cbd98db9bacd89886e1ca50692a0692baefffdeb92dd58"
[[package]]
name = "unicode-ident"
@ -4478,6 +4753,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "vec_map"
version = "0.8.2"

View File

@ -13,24 +13,28 @@ members = [
bench = { path = "./bench" }
[dependencies]
solana-client = "1.14.12"
solana-sdk = "1.14.12"
solana-transaction-status = "1.14.12"
solana-version = "1.14.12"
solana-client = "1.14.13"
solana-sdk = "1.14.13"
solana-transaction-status = "1.14.13"
solana-version = "1.14.13"
serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.91"
tokio = { version = "1.24.2", features = ["full"]}
tokio = { version = "1.25.0", features = ["full"]}
bincode = "1.3.3"
bs58 = "0.4.0"
base64 = "0.21.0"
thiserror = "1.0.38"
futures = "0.3.25"
futures = "0.3.26"
bytes = "1.3.0"
anyhow = "1.0.68"
log = "0.4.17"
clap = { version = "4.1.1", features = ["derive"] }
clap = { version = "4.1.4", features = ["derive"] }
dashmap = "5.4.0"
const_env = "0.1.2"
jsonrpsee = { version = "0.16.2", features = ["macros", "full"] }
tracing-subscriber = "0.3.16"
procinfo = "0.4.2"
tokio-postgres = "0.7.7"
native-tls = "0.2.11"
postgres-native-tls = "0.5.0"
serde_prometheus = "0.1.6"

View File

@ -4,14 +4,14 @@ version = "0.1.0"
edition = "2021"
[dependencies]
solana-client = "1.14.12"
solana-sdk = "1.14.12"
solana-client = "1.14.13"
solana-sdk = "1.14.13"
log = "0.4.17"
anyhow = "1.0.68"
serde = "1.0.152"
serde_json = "1.0.91"
csv = "1.1.6"
clap = { version = "4.1.1", features = ["derive"] }
tokio = { version = "1.24.2", features = ["full", "fs"]}
clap = { version = "4.1.4", features = ["derive"] }
tokio = { version = "1.25.0", features = ["full", "fs"]}
tracing-subscriber = "0.3.16"
dirs = "4.0.0"

View File

@ -1,3 +0,0 @@
total_time_elapsed_sec,txs_sent,time_to_send_txs,txs_confirmed,txs_un_confirmed,tps
8.96211512,20000,6.054988221,14321,5679,1597.9486770975554
8.96211512,20000,6.054988221,14321,5679,1597.9486770975554
1 total_time_elapsed_sec txs_sent time_to_send_txs txs_confirmed txs_un_confirmed tps
2 8.96211512 20000 6.054988221 14321 5679 1597.9486770975554
3 8.96211512 20000 6.054988221 14321 5679 1597.9486770975554

View File

@ -26,6 +26,6 @@ kill_timeout = 5
soft_limit = 1024
type = "connections"
# [metrics]
# path = "/metrics"
# port = 9091
[metrics]
path = "/metrics"
port = 9091

22
migrations/create.sql Normal file
View File

@ -0,0 +1,22 @@
CREATE TABLE lite_rpc.Txs (
id SERIAL NOT NULL PRIMARY KEY,
signature CHAR(88) NOT NULL,
recent_slot BIGINT NOT NULL,
forwarded_slot BIGINT NOT NULL,
processed_slot BIGINT,
cu_consumed BIGINT,
cu_requested BIGINT,
quic_response SMALLINT
);
CREATE TABLE lite_rpc.Blocks (
slot BIGINT NOT NULL PRIMARY KEY,
leader_id BIGINT NOT NULL,
parent_slot BIGINT NOT NULL
);
CREATE TABLE lite_rpc.AccountAddrs (
id SERIAL PRIMARY KEY,
addr VARCHAR(45) NOT NULL
);

3
migrations/rm.sql Normal file
View File

@ -0,0 +1,3 @@
DROP TABLE lite_rpc.Txs;
DROP TABLE lite_rpc.Blocks;
DROP TABLE lite_rpc.AccountAddrs;

View File

@ -3,7 +3,10 @@ use crate::{
encoding::BinaryEncoding,
rpc::LiteRpcServer,
tpu_manager::TpuManager,
workers::{BlockListener, Cleaner, Metrics, MetricsCapture, TxSender},
workers::{
BlockInformation, BlockListener, Cleaner, MetricsCapture, Postgres, PrometheusSync,
TxSender, WireTransaction,
},
};
use std::{ops::Deref, str::FromStr, sync::Arc, time::Duration};
@ -26,17 +29,21 @@ use solana_sdk::{
transaction::VersionedTransaction,
};
use solana_transaction_status::TransactionStatus;
use tokio::{net::ToSocketAddrs, task::JoinHandle};
use tokio::{
net::ToSocketAddrs,
sync::mpsc::{self, UnboundedSender},
task::JoinHandle,
};
/// A bridge between clients and tpu
#[derive(Clone)]
pub struct LiteBridge {
pub rpc_client: Arc<RpcClient>,
pub tpu_manager: Arc<TpuManager>,
// None if LiteBridge is not executed
pub tx_send: Option<UnboundedSender<(String, WireTransaction, u64)>>,
pub tx_sender: TxSender,
pub finalized_block_listenser: BlockListener,
pub confirmed_block_listenser: BlockListener,
pub metrics_capture: MetricsCapture,
pub finalized_block_listener: BlockListener,
pub confirmed_block_listener: BlockListener,
}
impl LiteBridge {
@ -50,7 +57,7 @@ impl LiteBridge {
let tx_sender = TxSender::new(tpu_manager.clone());
let finalized_block_listenser = BlockListener::new(
let finalized_block_listener = BlockListener::new(
pub_sub_client.clone(),
rpc_client.clone(),
tx_sender.clone(),
@ -58,7 +65,7 @@ impl LiteBridge {
)
.await?;
let confirmed_block_listenser = BlockListener::new(
let confirmed_block_listener = BlockListener::new(
pub_sub_client,
rpc_client.clone(),
tx_sender.clone(),
@ -66,87 +73,120 @@ impl LiteBridge {
)
.await?;
let metrics_capture = MetricsCapture::new(tx_sender.clone());
Ok(Self {
rpc_client,
tpu_manager,
tx_send: None,
tx_sender,
finalized_block_listenser,
confirmed_block_listenser,
metrics_capture,
finalized_block_listener,
confirmed_block_listener,
})
}
pub fn get_block_listner(&self, commitment_config: CommitmentConfig) -> BlockListener {
if let CommitmentLevel::Finalized = commitment_config.commitment {
self.finalized_block_listenser.clone()
self.finalized_block_listener.clone()
} else {
self.confirmed_block_listenser.clone()
self.confirmed_block_listener.clone()
}
}
/// List for `JsonRpc` requests
pub async fn start_services<T: ToSocketAddrs + std::fmt::Debug + 'static + Send + Clone>(
self,
mut self,
http_addr: T,
ws_addr: T,
tx_batch_size: usize,
tx_send_interval: Duration,
clean_interval: Duration,
) -> anyhow::Result<[JoinHandle<anyhow::Result<()>>; 7]> {
let finalized_block_listenser = self.finalized_block_listenser.clone().listen();
postgres_config: Option<String>,
) -> anyhow::Result<Vec<JoinHandle<anyhow::Result<()>>>> {
let (postgres, postgres_send) = if let Some(postgres_config) = postgres_config {
let (postgres_send, postgres_recv) = mpsc::unbounded_channel();
let (postgres_connection, postgres) = Postgres::new(postgres_config).await?;
let confirmed_block_listenser = self.confirmed_block_listenser.clone().listen();
let postgres = postgres.start(postgres_recv);
let tx_sender = self
.tx_sender
(Some((postgres, postgres_connection)), Some(postgres_send))
} else {
(None, None)
};
let (tx_send, tx_recv) = mpsc::unbounded_channel();
self.tx_send = Some(tx_send);
let tx_sender = self.tx_sender.clone().execute(
tx_recv,
tx_batch_size,
tx_send_interval,
postgres_send.clone(),
);
let metrics_capture = MetricsCapture::new(self.tx_sender.clone());
let prometheus_sync = PrometheusSync::new(metrics_capture.clone()).sync();
let finalized_block_listener = self
.finalized_block_listener
.clone()
.execute(tx_batch_size, tx_send_interval);
.listen(postgres_send.clone());
let ws_server_handle = ServerBuilder::default()
.ws_only()
.build(ws_addr.clone())
.await?
.start(self.clone().into_rpc())?;
let http_server_handle = ServerBuilder::default()
.http_only()
.build(http_addr.clone())
.await?
.start(self.clone().into_rpc())?;
let ws_server = tokio::spawn(async move {
info!("Websocket Server started at {ws_addr:?}");
ws_server_handle.stopped().await;
bail!("Websocket server stopped");
});
let http_server = tokio::spawn(async move {
info!("HTTP Server started at {http_addr:?}");
http_server_handle.stopped().await;
bail!("HTTP server stopped");
});
let metrics_capture = self.metrics_capture.capture();
let confirmed_block_listener = self.confirmed_block_listener.clone().listen(None);
let cleaner = Cleaner::new(
self.tx_sender.clone(),
[
self.finalized_block_listenser.clone(),
self.confirmed_block_listenser.clone(),
self.finalized_block_listener.clone(),
self.confirmed_block_listener.clone(),
],
)
.start(clean_interval);
Ok([
let rpc = self.into_rpc();
let (ws_server, http_server) = {
let ws_server_handle = ServerBuilder::default()
.ws_only()
.build(ws_addr.clone())
.await?
.start(rpc.clone())?;
let http_server_handle = ServerBuilder::default()
.http_only()
.build(http_addr.clone())
.await?
.start(rpc)?;
let ws_server = tokio::spawn(async move {
info!("Websocket Server started at {ws_addr:?}");
ws_server_handle.stopped().await;
bail!("Websocket server stopped");
});
let http_server = tokio::spawn(async move {
info!("HTTP Server started at {http_addr:?}");
http_server_handle.stopped().await;
bail!("HTTP server stopped");
});
(ws_server, http_server)
};
let mut services = vec![
ws_server,
http_server,
tx_sender,
finalized_block_listenser,
confirmed_block_listenser,
metrics_capture,
finalized_block_listener,
confirmed_block_listener,
metrics_capture.capture(postgres_send),
prometheus_sync,
cleaner,
])
];
if let Some((postgres, connection)) = postgres {
services.push(connection);
services.push(postgres);
}
Ok(services)
}
}
@ -177,8 +217,19 @@ impl LiteRpcServer for LiteBridge {
};
let sig = tx.get_signature();
let Some(BlockInformation { slot, .. }) = self
.confirmed_block_listener
.get_block_info(&tx.get_recent_blockhash().to_string())
.await else {
log::warn!("block");
return Err(jsonrpsee::core::Error::Custom("Blockhash not found in confirmed block store".to_string()));
};
self.tx_sender.enqnueue_tx(sig.to_string(), raw_tx).await;
self.tx_send
.as_ref()
.expect("Lite Bridge Not Executed")
.send((sig.to_string(), raw_tx, slot))
.unwrap();
Ok(BinaryEncoding::Base58.encode(sig))
}
@ -194,8 +245,8 @@ impl LiteRpcServer for LiteBridge {
};
let block_listner = self.get_block_listner(commitment_config);
let (blockhash, last_valid_block_height) = block_listner.get_latest_blockhash().await;
let slot = block_listner.get_slot().await;
let (blockhash, BlockInformation { slot, block_height }) =
block_listner.get_latest_block_info().await;
Ok(RpcResponse {
context: RpcResponseContext {
@ -204,7 +255,7 @@ impl LiteRpcServer for LiteBridge {
},
value: RpcBlockhash {
blockhash,
last_valid_block_height,
last_valid_block_height: block_height,
},
})
}
@ -237,7 +288,7 @@ impl LiteRpcServer for LiteBridge {
}
};
let slot = block_listner.get_slot().await;
let slot = block_listner.get_latest_block_info().await.1.slot;
Ok(RpcResponse {
context: RpcResponseContext {
@ -265,7 +316,12 @@ impl LiteRpcServer for LiteBridge {
Ok(RpcResponse {
context: RpcResponseContext {
slot: self.finalized_block_listenser.get_slot().await,
slot: self
.finalized_block_listener
.get_latest_block_info()
.await
.1
.slot,
api_version: None,
},
value: sig_statuses,
@ -311,10 +367,6 @@ impl LiteRpcServer for LiteBridge {
Ok(airdrop_sig)
}
async fn get_metrics(&self) -> crate::rpc::Result<Metrics> {
return Ok(self.metrics_capture.get_metrics().await);
}
fn signature_subscribe(
&self,
mut sink: SubscriptionSink,

View File

@ -27,4 +27,7 @@ pub struct Args {
/// interval between clean
#[arg(short = 'c', long, default_value_t = DEFAULT_CLEAN_INTERVAL_MS)]
pub clean_interval_ms: u64,
/// addr to postgres
#[arg(short = 'p', long)]
pub postgres_config: Option<String>,
}

View File

@ -17,6 +17,7 @@ pub async fn main() -> anyhow::Result<()> {
tx_batch_interval_ms,
clean_interval_ms,
fanout_size,
postgres_config,
} = Args::parse();
let tx_batch_interval_ms = Duration::from_millis(tx_batch_interval_ms);
@ -31,6 +32,7 @@ pub async fn main() -> anyhow::Result<()> {
tx_batch_size,
tx_batch_interval_ms,
clean_interval_ms,
postgres_config,
)
.await?;

View File

@ -6,10 +6,7 @@ use solana_client::rpc_response::{Response as RpcResponse, RpcBlockhash, RpcVers
use solana_sdk::commitment_config::CommitmentConfig;
use solana_transaction_status::TransactionStatus;
use crate::{
configs::{IsBlockHashValidConfig, SendTransactionConfig},
workers::Metrics,
};
use crate::configs::{IsBlockHashValidConfig, SendTransactionConfig};
pub type Result<T> = std::result::Result<T, jsonrpsee::core::Error>;
@ -53,9 +50,6 @@ pub trait LiteRpc {
config: Option<RpcRequestAirdropConfig>,
) -> Result<String>;
#[method(name = "getMetrics")]
async fn get_metrics(&self) -> Result<Metrics>;
#[subscription(name = "signatureSubscribe" => "signatureNotification", unsubscribe="signatureUnsubscribe", item=RpcResponse<serde_json::Value>)]
fn signature_subscribe(&self, signature: String, commitment_config: CommitmentConfig);
}

View File

@ -8,7 +8,7 @@ use solana_client::{
nonblocking::{rpc_client::RpcClient, tpu_client::TpuClient},
tpu_client::TpuClientConfig,
};
use tokio::sync::RwLock;
use tokio::sync::{RwLock, RwLockReadGuard};
#[derive(Clone)]
pub struct TpuManager {
@ -83,4 +83,8 @@ impl TpuManager {
}
}
}
pub async fn get_tpu_client(&self) -> RwLockReadGuard<TpuClient> {
self.tpu_client.read().await
}
}

View File

@ -14,11 +14,17 @@ use solana_client::{
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_transaction_status::{
TransactionConfirmationStatus, TransactionStatus, UiTransactionStatusMeta,
option_serializer::OptionSerializer, TransactionConfirmationStatus, TransactionStatus,
UiConfirmedBlock, UiTransactionStatusMeta,
};
use tokio::{
sync::{mpsc::Sender, RwLock},
task::JoinHandle,
};
use tokio::{sync::RwLock, task::JoinHandle};
use super::TxSender;
use crate::workers::{PostgresBlock, PostgresMsg, PostgresUpdateTx};
use super::{PostgresMpscSend, TxProps, TxSender};
/// Background worker which listen's to new blocks
/// and keeps a track of confirmed txs
@ -27,16 +33,22 @@ pub struct BlockListener {
pub_sub_client: Arc<PubsubClient>,
commitment_config: CommitmentConfig,
tx_sender: TxSender,
latest_block_info: Arc<RwLock<BlockInformation>>,
block_store: Arc<DashMap<String, BlockInformation>>,
latest_block_hash: Arc<RwLock<String>>,
pub signature_subscribers: Arc<DashMap<String, SubscriptionSink>>,
}
struct BlockInformation {
#[derive(Clone)]
pub struct BlockInformation {
pub slot: u64,
pub blockhash: String,
pub block_height: u64,
}
pub struct BlockListnerNotificatons {
pub block: Sender<UiConfirmedBlock>,
pub tx: Sender<TxProps>,
}
impl BlockListener {
pub async fn new(
pub_sub_client: Arc<PubsubClient>,
@ -48,14 +60,20 @@ impl BlockListener {
.get_latest_blockhash_with_commitment(commitment_config)
.await?;
let latest_block_hash = latest_block_hash.to_string();
let slot = rpc_client
.get_slot_with_commitment(commitment_config)
.await?;
Ok(Self {
pub_sub_client,
tx_sender,
latest_block_info: Arc::new(RwLock::new(BlockInformation {
slot: rpc_client.get_slot().await?,
blockhash: latest_block_hash.to_string(),
block_height,
})),
latest_block_hash: Arc::new(RwLock::new(latest_block_hash.clone())),
block_store: Arc::new({
let map = DashMap::new();
map.insert(latest_block_hash, BlockInformation { slot, block_height });
map
}),
commitment_config,
signature_subscribers: Default::default(),
})
@ -71,14 +89,29 @@ impl BlockListener {
num_of_sigs_commited
}
pub async fn get_slot(&self) -> u64 {
self.latest_block_info.read().await.slot
pub async fn get_latest_block_info(&self) -> (String, BlockInformation) {
let blockhash = &*self.latest_block_hash.read().await;
(
blockhash.to_owned(),
self.block_store
.get(blockhash)
.expect("Latest Block Not in Map")
.value()
.to_owned(),
)
}
pub async fn get_latest_blockhash(&self) -> (String, u64) {
let block = self.latest_block_info.read().await;
pub async fn get_block_info(&self, blockhash: &str) -> Option<BlockInformation> {
let Some(info) = self.block_store.get(blockhash) else {
return None;
};
(block.blockhash.clone(), block.block_height)
Some(info.value().to_owned())
}
pub async fn get_latest_blockhash(&self) -> String {
self.latest_block_hash.read().await.to_owned()
}
pub fn signature_subscribe(&self, signature: String, sink: SubscriptionSink) {
@ -89,10 +122,8 @@ impl BlockListener {
self.signature_subscribers.remove(&signature);
}
pub fn listen(self) -> JoinHandle<anyhow::Result<()>> {
pub fn listen(self, postgres: Option<PostgresMpscSend>) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
info!("Subscribing to blocks");
let commitment = self.commitment_config.commitment;
let comfirmation_status = match commitment {
@ -100,6 +131,8 @@ impl BlockListener {
_ => TransactionConfirmationStatus::Confirmed,
};
info!("Subscribing to {commitment:?} blocks");
let (mut recv, _) = self
.pub_sub_client
.block_subscribe(
@ -136,14 +169,24 @@ impl BlockListener {
continue;
};
*self.latest_block_info.write().await = BlockInformation {
slot,
blockhash,
block_height,
};
let parent_slot = block.parent_slot;
*self.latest_block_hash.write().await = blockhash.clone();
self.block_store
.insert(blockhash, BlockInformation { slot, block_height });
if let Some(postgres) = &postgres {
postgres
.send(PostgresMsg::PostgresBlock(PostgresBlock {
slot: slot as i64,
leader_id: 0, //FIX:
parent_slot: parent_slot as i64,
}))
.expect("Error sending block to postgres service");
}
for tx in transactions {
let Some(UiTransactionStatusMeta { err, status, .. }) = tx.meta else {
let Some(UiTransactionStatusMeta { err, status, compute_units_consumed ,.. }) = tx.meta else {
info!("tx with no meta");
continue;
};
@ -158,7 +201,7 @@ impl BlockListener {
if let Some(mut tx_status) = self.tx_sender.txs_sent.get_mut(&sig) {
tx_status.value_mut().status = Some(TransactionStatus {
slot,
confirmations: None, //TODO: talk about this
confirmations: None,
status,
err: err.clone(),
confirmation_status: Some(comfirmation_status.clone()),
@ -167,7 +210,6 @@ impl BlockListener {
// subscribers
if let Some((_sig, mut sink)) = self.signature_subscribers.remove(&sig) {
// info!("notification {}", sig);
// none if transaction succeeded
sink.send(&RpcResponse {
context: RpcResponseContext {
@ -177,6 +219,25 @@ impl BlockListener {
value: serde_json::json!({ "err": err }),
})?;
}
let cu_consumed = match compute_units_consumed {
OptionSerializer::Some(cu_consumed) => Some(cu_consumed as i64),
_ => None,
};
// write to postgres
if let Some(postgres) = &postgres {
postgres
.send(PostgresMsg::PostgresUpdateTx(
PostgresUpdateTx {
processed_slot: slot as i64,
cu_consumed,
cu_requested: None, //TODO: cu requested
},
sig,
))
.unwrap();
}
}
}

View File

@ -4,7 +4,7 @@ use log::{info, warn};
use solana_transaction_status::TransactionConfirmationStatus;
use tokio::{sync::RwLock, task::JoinHandle};
use super::TxSender;
use super::{PostgresMpscSend, TxSender};
use serde::{Deserialize, Serialize};
/// Background worker which captures metrics
@ -37,7 +37,7 @@ impl MetricsCapture {
self.metrics.read().await.to_owned()
}
pub fn capture(self) -> JoinHandle<anyhow::Result<()>> {
pub fn capture(self, postgres: Option<PostgresMpscSend>) -> JoinHandle<anyhow::Result<()>> {
let mut one_second = tokio::time::interval(std::time::Duration::from_secs(1));
tokio::spawn(async move {
@ -84,6 +84,10 @@ impl MetricsCapture {
None
}
};
if let Some(_postgres) = &postgres {
// postgres.send_metrics(metrics.clone()).await?;
}
}
})
}

View File

@ -1,9 +1,13 @@
mod block_listenser;
mod cleaner;
mod metrics_capture;
mod postgres;
mod prometheus;
mod tx_sender;
pub use block_listenser::*;
pub use cleaner::*;
pub use metrics_capture::*;
pub use postgres::*;
pub use prometheus::*;
pub use tx_sender::*;

212
src/workers/postgres.rs Normal file
View File

@ -0,0 +1,212 @@
use std::sync::Arc;
use anyhow::{bail, Context, Ok};
use log::{info, warn};
use postgres_native_tls::MakeTlsConnector;
use tokio::{
fs,
sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
RwLock,
},
task::JoinHandle,
};
use tokio_postgres::Client;
use native_tls::{Certificate, Identity, TlsConnector};
pub struct Postgres {
client: Arc<RwLock<Client>>,
}
#[derive(Debug)]
pub struct PostgresTx {
pub signature: String,
pub recent_slot: i64,
pub forwarded_slot: i64,
pub processed_slot: Option<i64>,
pub cu_consumed: Option<i64>,
pub cu_requested: Option<i64>,
pub quic_response: i16,
}
#[derive(Debug)]
pub struct PostgresUpdateTx {
pub processed_slot: i64,
pub cu_consumed: Option<i64>,
pub cu_requested: Option<i64>,
}
#[derive(Debug)]
pub struct PostgresBlock {
pub slot: i64,
pub leader_id: i64,
pub parent_slot: i64,
}
#[derive(Debug)]
pub struct PostgreAccountAddr {
pub id: u32,
pub addr: String,
}
#[derive(Debug)]
pub enum PostgresMsg {
PostgresTx(PostgresTx),
PostgresBlock(PostgresBlock),
PostgreAccountAddr(PostgreAccountAddr),
PostgresUpdateTx(PostgresUpdateTx, String),
}
pub type PostgresMpscRecv = UnboundedReceiver<PostgresMsg>;
pub type PostgresMpscSend = UnboundedSender<PostgresMsg>;
impl Postgres {
/// # Return
/// (connection join handle, Self)
///
/// returned join handle is required to be polled
pub async fn new(
porstgres_config: String,
) -> anyhow::Result<(JoinHandle<anyhow::Result<()>>, Self)> {
let ca_pem = fs::read("ca.pem").await?;
// let ca_pem = BinaryEncoding::Base64
// .decode(ca_pem_b64)
// .context("ca pem decode")?;
let client_pks = fs::read("client.pks").await?;
// let client_pks = BinaryEncoding::Base64.decode(client_pks_b64).context("client pks decode")?;
let connector = TlsConnector::builder()
.add_root_certificate(Certificate::from_pem(&ca_pem)?)
.identity(Identity::from_pkcs12(&client_pks, "p").context("Identity")?)
.danger_accept_invalid_hostnames(true)
.danger_accept_invalid_certs(true)
.build()?;
info!("making tls config");
let connector = MakeTlsConnector::new(connector);
let (client, connection) =
tokio_postgres::connect(&porstgres_config, connector.clone()).await?;
let client = Arc::new(RwLock::new(client));
let connection = {
let client = client.clone();
#[allow(unreachable_code)]
tokio::spawn(async move {
let mut connection = connection;
loop {
if let Err(err) = connection.await {
warn!("Connection to postgres broke {err:?}")
};
let f = tokio_postgres::connect(&porstgres_config, connector.clone()).await?;
*client.write().await = f.0;
connection = f.1;
}
bail!("Potsgres revival loop failed")
})
};
Ok((connection, Self { client }))
}
pub async fn send_block(&self, block: PostgresBlock) -> anyhow::Result<()> {
let PostgresBlock {
slot,
leader_id,
parent_slot,
} = block;
self.client
.read()
.await
.execute(
r#"
INSERT INTO lite_rpc.Blocks
(slot, leader_id, parent_slot)
VALUES
($1, $2, $3)
"#,
&[&slot, &leader_id, &parent_slot],
)
.await?;
Ok(())
}
pub async fn send_tx(&self, tx: PostgresTx) -> anyhow::Result<()> {
let PostgresTx {
signature,
recent_slot,
forwarded_slot,
processed_slot,
cu_consumed,
cu_requested,
quic_response,
} = tx;
self.client.read().await.execute(
r#"
INSERT INTO lite_rpc.Txs
(signature, recent_slot, forwarded_slot, processed_slot, cu_consumed, cu_requested, quic_response)
VALUES
($1, $2, $3, $4, $5, $6, $7)
"#,
&[&signature, &recent_slot, &forwarded_slot, &processed_slot, &cu_consumed, &cu_requested, &quic_response],
).await?;
Ok(())
}
pub async fn update_tx(&self, tx: PostgresUpdateTx, signature: &str) -> anyhow::Result<()> {
let PostgresUpdateTx {
processed_slot,
cu_consumed,
cu_requested,
} = tx;
self.client
.read()
.await
.execute(
r#"
UPDATE lite_rpc.Txs
SET processed_slot = $1, cu_consumed = $2, cu_requested = $3
WHERE signature = $4
"#,
&[&processed_slot, &cu_consumed, &cu_requested, &signature],
)
.await?;
Ok(())
}
pub fn start(self, mut recv: PostgresMpscRecv) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
info!("Writing to postgres");
while let Some(msg) = recv.recv().await {
let Err(err) = (
match msg {
PostgresMsg::PostgresTx(tx) => self.send_tx(tx).await,
PostgresMsg::PostgresUpdateTx(tx, sig) => self.update_tx(tx, &sig).await,
PostgresMsg::PostgresBlock(block) => self.send_block(block).await,
PostgresMsg::PostgreAccountAddr(_) => todo!(),
} ) else {
continue;
};
warn!("Error writing to postgres {err}");
}
bail!("Postgres channel closed")
})
}
}

59
src/workers/prometheus.rs Normal file
View File

@ -0,0 +1,59 @@
use std::collections::HashMap;
use tokio::{
io::AsyncWriteExt,
net::{TcpListener, TcpStream},
task::JoinHandle,
};
use super::MetricsCapture;
#[derive(Clone)]
pub struct PrometheusSync {
metrics_capture: MetricsCapture,
}
impl PrometheusSync {
pub fn new(metrics_capture: MetricsCapture) -> Self {
Self { metrics_capture }
}
fn create_response(payload: &str) -> String {
format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
payload.len(),
payload
)
}
async fn handle_stream(&self, stream: &mut TcpStream) -> anyhow::Result<()> {
let metrics = self.metrics_capture.get_metrics().await;
let metrics = serde_prometheus::to_string(&metrics, Some("literpc"), HashMap::new())?;
let response = Self::create_response(&metrics);
stream.writable().await?;
stream.write_all(response.as_bytes()).await?;
stream.flush().await?;
Ok(())
}
pub fn sync(self) -> JoinHandle<anyhow::Result<()>> {
#[allow(unreachable_code)]
tokio::spawn(async move {
let listener = TcpListener::bind("[::]:9091").await?;
loop {
let Ok((mut stream, _addr)) = listener.accept().await else {
continue;
};
let _ = self.handle_stream(&mut stream).await;
}
Ok(())
})
}
}

View File

@ -3,13 +3,22 @@ use std::{
time::{Duration, Instant},
};
use anyhow::bail;
use dashmap::DashMap;
use log::{info, warn};
use solana_transaction_status::TransactionStatus;
use tokio::{sync::RwLock, task::JoinHandle};
use tokio::{
sync::mpsc::{error::TryRecvError, UnboundedReceiver},
task::JoinHandle,
};
use crate::tpu_manager::TpuManager;
use crate::{
tpu_manager::TpuManager,
workers::{PostgresMsg, PostgresTx},
};
use super::PostgresMpscSend;
pub type WireTransaction = Vec<u8>;
@ -18,8 +27,6 @@ pub type WireTransaction = Vec<u8>;
pub struct TxSender {
/// Tx(s) forwarded to tpu
pub txs_sent: Arc<DashMap<String, TxProps>>,
/// Transactions queue for retrying
enqueued_txs: Arc<RwLock<Vec<(String, WireTransaction)>>>,
/// TpuClient to call the tpu port
tpu_manager: Arc<TpuManager>,
}
@ -27,6 +34,7 @@ pub struct TxSender {
/// Transaction Properties
pub struct TxProps {
pub status: Option<TransactionStatus>,
/// Time at which transaction was forwarded
pub sent_at: Instant,
}
@ -42,53 +50,54 @@ impl Default for TxProps {
impl TxSender {
pub fn new(tpu_manager: Arc<TpuManager>) -> Self {
Self {
enqueued_txs: Default::default(),
tpu_manager,
txs_sent: Default::default(),
}
}
/// en-queue transaction if it doesn't already exist
pub async fn enqnueue_tx(&self, sig: String, raw_tx: WireTransaction) {
self.enqueued_txs.write().await.push((sig, raw_tx));
}
/// retry enqued_tx(s)
pub async fn forward_txs(&self, tx_batch_size: usize) {
if self.enqueued_txs.read().await.is_empty() {
async fn forward_txs(
&self,
sigs_and_slots: Vec<(String, u64)>,
txs: Vec<WireTransaction>,
postgres: Option<PostgresMpscSend>,
) {
assert_eq!(sigs_and_slots.len(), txs.len());
if sigs_and_slots.is_empty() {
return;
}
let mut enqueued_txs = Vec::new();
std::mem::swap(&mut enqueued_txs, &mut *self.enqueued_txs.write().await);
let mut tx_remaining = enqueued_txs.len();
let mut enqueued_txs = enqueued_txs.into_iter();
let tpu_client = self.tpu_manager.clone();
let txs_sent = self.txs_sent.clone();
tokio::spawn(async move {
while tx_remaining != 0 {
let mut batch = Vec::with_capacity(tx_batch_size);
let mut sigs = Vec::with_capacity(tx_batch_size);
for (batched, (sig, tx)) in enqueued_txs.by_ref().enumerate() {
batch.push(tx);
sigs.push(sig);
tx_remaining -= 1;
if batched == tx_batch_size {
break;
let quic_response = match tpu_client.try_send_wire_transaction_batch(txs).await {
Ok(_) => {
for (sig, _) in &sigs_and_slots {
txs_sent.insert(sig.to_owned(), TxProps::default());
}
1
}
Err(err) => {
warn!("{err}");
0
}
};
match tpu_client.try_send_wire_transaction_batch(batch).await {
Ok(_) => {
for sig in sigs {
txs_sent.insert(sig, TxProps::default());
}
}
Err(err) => {
warn!("{err}");
}
if let Some(postgres) = postgres {
for (sig, recent_slot) in sigs_and_slots {
postgres
.send(PostgresMsg::PostgresTx(PostgresTx {
signature: sig.clone(),
recent_slot: recent_slot as i64,
forwarded_slot: 0, // FIX: figure this out
processed_slot: None, // FIX: figure this out
cu_consumed: None, // FIX: figure this out
cu_requested: None, // FIX: figure this out
quic_response,
}))
.expect("Error writing to postgres service");
}
}
});
@ -97,12 +106,11 @@ impl TxSender {
/// retry and confirm transactions every 2ms (avg time to confirm tx)
pub fn execute(
self,
mut recv: UnboundedReceiver<(String, WireTransaction, u64)>,
tx_batch_size: usize,
tx_send_interval: Duration,
postgres_send: Option<PostgresMpscSend>,
) -> JoinHandle<anyhow::Result<()>> {
let mut interval = tokio::time::interval(tx_send_interval);
#[allow(unreachable_code)]
tokio::spawn(async move {
info!(
"Batching tx(s) with batch size of {tx_batch_size} every {}ms",
@ -110,12 +118,27 @@ impl TxSender {
);
loop {
interval.tick().await;
self.forward_txs(tx_batch_size).await;
}
let prev_inst = tokio::time::Instant::now();
// to give the correct type to JoinHandle
Ok(())
let mut sigs_and_slots = Vec::with_capacity(tx_batch_size);
let mut txs = Vec::with_capacity(tx_batch_size);
while (prev_inst.elapsed() < tx_send_interval) || txs.len() == tx_batch_size {
match recv.try_recv() {
Ok((sig, tx, slot)) => {
sigs_and_slots.push((sig, slot));
txs.push(tx);
}
Err(TryRecvError::Disconnected) => {
bail!("Channel Disconnected");
}
_ => {}
}
}
self.forward_txs(sigs_and_slots, txs, postgres_send.clone())
.await;
}
})
}
}

View File

@ -13,6 +13,7 @@ use solana_client::nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcCli
use solana_sdk::commitment_config::CommitmentConfig;
use solana_transaction_status::TransactionConfirmationStatus;
use tokio::sync::mpsc;
#[tokio::test]
async fn send_and_confirm_txs() {
@ -43,11 +44,15 @@ async fn send_and_confirm_txs() {
.await
.unwrap();
let (tx_send, tx_recv) = mpsc::unbounded_channel();
let services = try_join_all(vec![
block_listener.clone().listen(),
block_listener.clone().listen(None),
tx_sender.clone().execute(
tx_recv,
DEFAULT_TX_BATCH_SIZE,
Duration::from_millis(DEFAULT_TX_BATCH_INTERVAL_MS),
None,
),
]);
@ -61,9 +66,9 @@ async fn send_and_confirm_txs() {
let tx = BinaryEncoding::Base58.encode(bincode::serialize(&tx).unwrap());
let sig = sig.to_string();
tx_sender
.enqnueue_tx(sig.clone(), tx.as_bytes().to_vec())
.await;
tx_send
.send((sig.clone(), tx.as_bytes().to_vec(), 0))
.unwrap();
for _ in 0..2 {
let tx_status = tx_sender.txs_sent.get(&sig).unwrap();