feature(rpc): Migrate from deprecated `jsonrpc_*` crates to `jsonrpsee` (#9059)

* update methods

* update get block template rpc methods

* update other getblocktemplate files

* upgrade server and middlewares

* upgrade zebrad start command

* remove unused imports

* add a todo for unauthenticated rpc error

* upgrade tests, temporally comment out some of them

* fix the rpc tx queue

* update denies

* fix  links

* clippy

* fir more doc links

* fix queue tests

Co-authored-by: Arya <aryasolhi@gmail.com>

* add suggestions from code review

* fix snapshots

* try `block_on` instead of `now_or_never` in the http middleware

* move import

* Apply suggestions from code review

Co-authored-by: Arya <aryasolhi@gmail.com>

* fix bounds

---------

Co-authored-by: Arya <aryasolhi@gmail.com>
This commit is contained in:
Alfredo Garcia 2024-12-20 23:29:16 -03:00 committed by GitHub
parent 1ecf6551bc
commit 0fe47bbbbb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 2225 additions and 2730 deletions

View File

@ -78,7 +78,7 @@ version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"cipher",
"cpufeatures",
]
@ -89,7 +89,7 @@ version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"getrandom 0.2.15",
"once_cell",
"version_check",
@ -293,8 +293,8 @@ dependencies = [
"axum-core",
"bytes",
"futures-util",
"http 1.1.0",
"http-body 1.0.1",
"http",
"http-body",
"http-body-util",
"itoa",
"matchit",
@ -319,8 +319,8 @@ dependencies = [
"async-trait",
"bytes",
"futures-util",
"http 1.1.0",
"http-body 1.0.1",
"http",
"http-body",
"http-body-util",
"mime",
"pin-project-lite",
@ -338,7 +338,7 @@ checksum = "26b05800d2e817c8b3b4b54abd461726265fa9789ae34330622f2db9ee696f9d"
dependencies = [
"addr2line",
"cc",
"cfg-if 1.0.0",
"cfg-if",
"libc",
"miniz_oxide 0.7.4",
"object",
@ -573,16 +573,6 @@ dependencies = [
"tinyvec",
]
[[package]]
name = "bstr"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40723b8fb387abc38f4f4a37c09073622e41dd12327033091ef8950659e6dc0c"
dependencies = [
"memchr",
"serde",
]
[[package]]
name = "bumpalo"
version = "3.16.0"
@ -697,12 +687,6 @@ dependencies = [
"nom",
]
[[package]]
name = "cfg-if"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
name = "cfg-if"
version = "1.0.0"
@ -721,7 +705,7 @@ version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3613f74bd2eac03dad61bd53dbe620703d4371614fe0bc3b9f04dd36fe4e818"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"cipher",
"cpufeatures",
]
@ -975,7 +959,7 @@ version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
]
[[package]]
@ -1070,7 +1054,7 @@ version = "4.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"cpufeatures",
"curve25519-dalek-derive",
"digest",
@ -1538,7 +1522,7 @@ version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"libc",
"wasi 0.9.0+wasi-snapshot-preview1",
]
@ -1549,7 +1533,7 @@ version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"js-sys",
"libc",
"wasi 0.11.0+wasi-snapshot-preview1",
@ -1581,19 +1565,6 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "globset"
version = "0.4.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15f1ce686646e7f1e19bf7d5533fe443a45dbfb990e00629110797578b42fb19"
dependencies = [
"aho-corasick",
"bstr",
"log",
"regex-automata 0.4.8",
"regex-syntax 0.8.5",
]
[[package]]
name = "group"
version = "0.13.0"
@ -1617,11 +1588,11 @@ dependencies = [
"fnv",
"futures-core",
"futures-sink",
"http 1.1.0",
"http",
"indexmap 2.7.0",
"slab",
"tokio",
"tokio-util 0.7.13",
"tokio-util",
"tracing",
]
@ -1631,7 +1602,7 @@ version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"crunchy",
]
@ -1788,7 +1759,7 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9c7c7c8ac16c798734b8a24560c1362120597c40d5e1459f09498f8f6c8f2ba"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"libc",
"windows",
]
@ -1801,18 +1772,7 @@ checksum = "f34059280f617a59ee59a0455e93460d67e5c76dec42dd262d38f0f390f437b2"
dependencies = [
"flume",
"indicatif",
"parking_lot 0.12.3",
]
[[package]]
name = "http"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1"
dependencies = [
"bytes",
"fnv",
"itoa",
"parking_lot",
]
[[package]]
@ -1826,17 +1786,6 @@ dependencies = [
"itoa",
]
[[package]]
name = "http-body"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2"
dependencies = [
"bytes",
"http 0.2.12",
"pin-project-lite",
]
[[package]]
name = "http-body"
version = "1.0.1"
@ -1844,7 +1793,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184"
dependencies = [
"bytes",
"http 1.1.0",
"http",
]
[[package]]
@ -1855,8 +1804,8 @@ checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f"
dependencies = [
"bytes",
"futures-util",
"http 1.1.0",
"http-body 1.0.1",
"http",
"http-body",
"pin-project-lite",
]
@ -1894,29 +1843,6 @@ dependencies = [
"serde",
]
[[package]]
name = "hyper"
version = "0.14.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c08302e8fa335b151b788c775ff56e7a03ae64ff85c548ee820fecb70356e85"
dependencies = [
"bytes",
"futures-channel",
"futures-core",
"futures-util",
"http 0.2.12",
"http-body 0.4.6",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"socket2",
"tokio",
"tower-service",
"tracing",
"want",
]
[[package]]
name = "hyper"
version = "1.5.1"
@ -1927,8 +1853,8 @@ dependencies = [
"futures-channel",
"futures-util",
"h2",
"http 1.1.0",
"http-body 1.0.1",
"http",
"http-body",
"httparse",
"httpdate",
"itoa",
@ -1945,8 +1871,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333"
dependencies = [
"futures-util",
"http 1.1.0",
"hyper 1.5.1",
"http",
"hyper",
"hyper-util",
"rustls",
"rustls-pki-types",
@ -1962,7 +1888,7 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793"
dependencies = [
"hyper 1.5.1",
"hyper",
"hyper-util",
"pin-project-lite",
"tokio",
@ -1978,9 +1904,9 @@ dependencies = [
"bytes",
"futures-channel",
"futures-util",
"http 1.1.0",
"http-body 1.0.1",
"hyper 1.5.1",
"http",
"http-body",
"hyper",
"pin-project-lite",
"socket2",
"tokio",
@ -2138,15 +2064,6 @@ dependencies = [
"similar",
]
[[package]]
name = "instant"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222"
dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "ipnet"
version = "2.10.1"
@ -2248,49 +2165,90 @@ dependencies = [
]
[[package]]
name = "jsonrpc-derive"
version = "18.0.0"
name = "jsonrpsee"
version = "0.24.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b939a78fa820cdfcb7ee7484466746a7377760970f6f9c6fe19f9edcc8a38d2"
checksum = "c5c71d8c1a731cc4227c2f698d377e7848ca12c8a48866fc5e6951c43a4db843"
dependencies = [
"proc-macro-crate 0.1.5",
"jsonrpsee-core",
"jsonrpsee-server",
"jsonrpsee-types",
"tokio",
]
[[package]]
name = "jsonrpsee-core"
version = "0.24.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2882f6f8acb9fdaec7cefc4fd607119a9bd709831df7d7672a1d3b644628280"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http",
"http-body",
"http-body-util",
"jsonrpsee-types",
"parking_lot",
"rand 0.8.5",
"rustc-hash 2.0.0",
"serde",
"serde_json",
"thiserror 1.0.69",
"tokio",
"tracing",
]
[[package]]
name = "jsonrpsee-proc-macros"
version = "0.24.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c06c01ae0007548e73412c08e2285ffe5d723195bf268bce67b1b77c3bb2a14d"
dependencies = [
"heck 0.5.0",
"proc-macro-crate",
"proc-macro2",
"quote",
"syn 1.0.109",
"syn 2.0.90",
]
[[package]]
name = "jsonrpc-http-server"
version = "18.0.0"
name = "jsonrpsee-server"
version = "0.24.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1dea6e07251d9ce6a552abfb5d7ad6bc290a4596c8dcc3d795fae2bbdc1f3ff"
checksum = "82ad8ddc14be1d4290cd68046e7d1d37acd408efed6d3ca08aefcc3ad6da069c"
dependencies = [
"futures",
"hyper 0.14.31",
"jsonrpc-core",
"jsonrpc-server-utils",
"log",
"net2",
"parking_lot 0.11.2",
"unicase",
]
[[package]]
name = "jsonrpc-server-utils"
version = "18.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa4fdea130485b572c39a460d50888beb00afb3e35de23ccd7fad8ff19f0e0d4"
dependencies = [
"bytes",
"futures",
"globset",
"jsonrpc-core",
"lazy_static",
"log",
"futures-util",
"http",
"http-body",
"http-body-util",
"hyper",
"hyper-util",
"jsonrpsee-core",
"jsonrpsee-types",
"pin-project",
"route-recognizer",
"serde",
"serde_json",
"soketto",
"thiserror 1.0.69",
"tokio",
"tokio-stream",
"tokio-util 0.6.10",
"unicase",
"tokio-util",
"tower 0.4.13",
"tracing",
]
[[package]]
name = "jsonrpsee-types"
version = "0.24.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a178c60086f24cc35bb82f57c651d0d25d99c4742b4d335de04e97fa1f08a8a1"
dependencies = [
"http",
"serde",
"serde_json",
"thiserror 1.0.69",
]
[[package]]
@ -2355,7 +2313,7 @@ version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"windows-targets 0.52.6",
]
@ -2477,7 +2435,7 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ea1f30cedd69f0a2954655f7188c6a834246d2bcf1e315e2ac40c4b24dc9519"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"rayon",
]
@ -2514,7 +2472,7 @@ checksum = "85b6f8152da6d7892ff1b7a1c0fa3f435e92b5918ad67035c3bb432111d9a29b"
dependencies = [
"base64 0.22.1",
"http-body-util",
"hyper 1.5.1",
"hyper",
"hyper-util",
"indexmap 2.7.0",
"ipnet",
@ -2603,17 +2561,6 @@ dependencies = [
"getrandom 0.2.15",
]
[[package]]
name = "net2"
version = "0.2.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b13b648036a2339d06de780866fbdfda0dde886de7b3af2ddeba8b14f4ee34ac"
dependencies = [
"cfg-if 0.1.10",
"libc",
"winapi",
]
[[package]]
name = "nix"
version = "0.29.0"
@ -2621,7 +2568,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46"
dependencies = [
"bitflags 2.6.0",
"cfg-if 1.0.0",
"cfg-if",
"cfg_aliases",
"libc",
]
@ -2854,23 +2801,12 @@ version = "3.6.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d830939c76d294956402033aee57a6da7b438f2294eb94864c37b0569053a42c"
dependencies = [
"proc-macro-crate 3.2.0",
"proc-macro-crate",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "parking_lot"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core 0.8.6",
]
[[package]]
name = "parking_lot"
version = "0.12.3"
@ -2878,21 +2814,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
dependencies = [
"lock_api",
"parking_lot_core 0.9.10",
]
[[package]]
name = "parking_lot_core"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc"
dependencies = [
"cfg-if 1.0.0",
"instant",
"libc",
"redox_syscall 0.2.16",
"smallvec",
"winapi",
"parking_lot_core",
]
[[package]]
@ -2901,9 +2823,9 @@ version = "0.9.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"libc",
"redox_syscall 0.5.7",
"redox_syscall",
"smallvec",
"windows-targets 0.52.6",
]
@ -3113,15 +3035,6 @@ dependencies = [
"uint 0.9.5",
]
[[package]]
name = "proc-macro-crate"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d6ea3c4595b96363c13943497db34af4460fb474a95c43f4446ad341b8c9785"
dependencies = [
"toml 0.5.11",
]
[[package]]
name = "proc-macro-crate"
version = "3.2.0"
@ -3505,15 +3418,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "redox_syscall"
version = "0.5.7"
@ -3590,10 +3494,10 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
"http 1.1.0",
"http-body 1.0.1",
"http",
"http-body",
"http-body-util",
"hyper 1.5.1",
"hyper",
"hyper-rustls",
"hyper-util",
"ipnet",
@ -3613,7 +3517,7 @@ dependencies = [
"sync_wrapper 1.0.1",
"tokio",
"tokio-rustls",
"tokio-util 0.7.13",
"tokio-util",
"tower-service",
"url",
"wasm-bindgen",
@ -3639,7 +3543,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d"
dependencies = [
"cc",
"cfg-if 1.0.0",
"cfg-if",
"getrandom 0.2.15",
"libc",
"spin",
@ -3686,6 +3590,12 @@ dependencies = [
"serde",
]
[[package]]
name = "route-recognizer"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afab94fb28594581f62d981211a9a4d53cc8130bbcbbb89a0440d9b8e81a7746"
[[package]]
name = "rustc-demangle"
version = "0.1.24"
@ -4076,13 +3986,24 @@ dependencies = [
"version_check",
]
[[package]]
name = "sha1"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]]
name = "sha2"
version = "0.10.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"cpufeatures",
"digest",
]
@ -4169,6 +4090,22 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "soketto"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e859df029d160cb88608f5d7df7fb4753fd20fdfb4de5644f3d8b8440841721"
dependencies = [
"base64 0.22.1",
"bytes",
"futures",
"http",
"httparse",
"log",
"rand 0.8.5",
"sha1",
]
[[package]]
name = "spandoc"
version = "0.2.2"
@ -4325,7 +4262,7 @@ version = "3.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"fastrand",
"once_cell",
"rustix",
@ -4397,7 +4334,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfe075d7053dae61ac5413a34ea7d4913b6e6207844fd726bdd858b37ff72bf5"
dependencies = [
"bitflags 2.6.0",
"cfg-if 1.0.0",
"cfg-if",
"libc",
"log",
"rustversion",
@ -4410,7 +4347,7 @@ version = "1.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"once_cell",
]
@ -4482,7 +4419,7 @@ dependencies = [
"bytes",
"libc",
"mio",
"parking_lot 0.12.3",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
@ -4522,7 +4459,7 @@ dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util 0.7.13",
"tokio-util",
]
[[package]]
@ -4538,20 +4475,6 @@ dependencies = [
"tokio-stream",
]
[[package]]
name = "tokio-util"
version = "0.6.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"log",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.7.13"
@ -4560,6 +4483,7 @@ checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078"
dependencies = [
"bytes",
"futures-core",
"futures-io",
"futures-sink",
"pin-project-lite",
"tokio",
@ -4620,10 +4544,10 @@ dependencies = [
"base64 0.22.1",
"bytes",
"h2",
"http 1.1.0",
"http-body 1.0.1",
"http",
"http-body",
"http-body-util",
"hyper 1.5.1",
"hyper",
"hyper-timeout",
"hyper-util",
"percent-encoding",
@ -4680,7 +4604,7 @@ dependencies = [
"rand 0.8.5",
"slab",
"tokio",
"tokio-util 0.7.13",
"tokio-util",
"tower-layer",
"tower-service",
"tracing",
@ -4714,7 +4638,7 @@ dependencies = [
"tinyvec",
"tokio",
"tokio-test",
"tokio-util 0.7.13",
"tokio-util",
"tower 0.4.13",
"tower-fallback",
"tower-test",
@ -4966,12 +4890,6 @@ version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94"
[[package]]
name = "unicase"
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e51b68083f157f853b6379db119d1c1be0e6e4dec98101079dec41f6f5cf6df"
[[package]]
name = "unicode-bidi"
version = "0.3.17"
@ -5101,7 +5019,7 @@ checksum = "2990d9ea5967266ea0ccf413a4aa5c42a93dbcfda9cb49a97de6931726b12566"
dependencies = [
"anyhow",
"cargo_metadata",
"cfg-if 1.0.0",
"cfg-if",
"git2",
"regex",
"rustc_version",
@ -5228,7 +5146,7 @@ version = "0.2.95"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"once_cell",
"wasm-bindgen-macro",
]
@ -5254,7 +5172,7 @@ version = "0.4.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc7ec4f8827a71586374db3e87abdb5a2bb3a15afed140221307c3ec06b1f63b"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"js-sys",
"wasm-bindgen",
"web-sys",
@ -5962,7 +5880,7 @@ dependencies = [
"thiserror 2.0.6",
"tokio",
"tokio-stream",
"tokio-util 0.7.13",
"tokio-util",
"toml 0.8.19",
"tower 0.4.13",
"tracing",
@ -5994,11 +5912,13 @@ dependencies = [
"color-eyre",
"futures",
"hex",
"http-body-util",
"hyper",
"indexmap 2.7.0",
"insta",
"jsonrpc-core",
"jsonrpc-derive",
"jsonrpc-http-server",
"jsonrpsee",
"jsonrpsee-proc-macros",
"jsonrpsee-types",
"nix",
"proptest",
"prost",
@ -6204,13 +6124,13 @@ dependencies = [
"howudoin",
"http-body-util",
"humantime-serde",
"hyper 1.5.1",
"hyper",
"hyper-util",
"indexmap 2.7.0",
"indicatif",
"inferno",
"insta",
"jsonrpc-core",
"jsonrpsee-types",
"lazy_static",
"log",
"metrics",

View File

@ -78,19 +78,8 @@ skip-tree = [
{ name = "base64", version = "=0.21.7" },
{ name = "sync_wrapper", version = "0.1.2" },
# wait for jsonrpc-http-server to update hyper or for Zebra to replace jsonrpc (#8682)
{ name = "h2", version = "=0.3.26" },
{ name = "http", version = "=0.2.12" },
{ name = "http-body", version = "=0.4.6" },
{ name = "hyper", version = "=0.14.31" },
{ name = "hyper-rustls", version = "=0.24.2" },
{ name = "reqwest", version = "=0.11.27" },
{ name = "rustls", version = "=0.21.12" },
{ name = "rustls-pemfile", version = "=1.0.4" },
{ name = "rustls-webpki", version = "=0.101.7" },
{ name = "tokio-rustls", version = "=0.24.1" },
{ name = "webpki-roots", version = "=0.25.4" },
# wait for abscissa_core to update toml
{ name = "toml", version = "=0.5.11" },
# wait for structopt-derive to update heck
{ name = "heck", version = "=0.3.3" },

View File

@ -59,9 +59,11 @@ chrono = { version = "0.4.39", default-features = false, features = [
] }
futures = "0.3.31"
jsonrpc-core = "18.0.0"
jsonrpc-derive = "18.0.0"
jsonrpc-http-server = "18.0.0"
jsonrpsee = { version = "0.24.7", features = ["server"] }
jsonrpsee-types = "0.24.7"
jsonrpsee-proc-macros = "0.24.7"
hyper = "1.5.0"
http-body-util = "0.1.2"
# zebra-rpc needs the preserve_order feature in serde_json, which is a dependency of jsonrpc-core
serde_json = { version = "1.0.133", features = ["preserve_order"] }

View File

@ -50,24 +50,12 @@ pub struct Config {
/// The number of threads used to process RPC requests and responses.
///
/// Zebra's RPC server has a separate thread pool and a `tokio` executor for each thread.
/// State queries are run concurrently using the shared thread pool controlled by
/// the [`SyncSection.parallel_cpu_threads`](https://docs.rs/zebrad/latest/zebrad/components/sync/struct.Config.html#structfield.parallel_cpu_threads) config.
///
/// If the number of threads is not configured or zero, Zebra uses the number of logical cores.
/// If the number of logical cores can't be detected, Zebra uses one thread.
///
/// Set to `1` to run all RPC queries on a single thread, and detect RPC port conflicts from
/// multiple Zebra or `zcashd` instances.
///
/// For details, see [the `jsonrpc_http_server` documentation](https://docs.rs/jsonrpc-http-server/latest/jsonrpc_http_server/struct.ServerBuilder.html#method.threads).
///
/// ## Warning
///
/// The default config uses multiple threads, which disables RPC port conflict detection.
/// This can allow multiple Zebra instances to share the same RPC port.
///
/// If some of those instances are outdated or failed, RPC queries can be slow or inconsistent.
/// This field is deprecated and could be removed in a future release.
/// We keep it just for backward compatibility but it actually do nothing.
/// It was something configurable when the RPC server was based in the jsonrpc-core crate,
/// not anymore since we migrated to jsonrpsee.
// TODO: Prefix this field name with an underscore so it's clear that it's now unused, and
// use serde(rename) to continue successfully deserializing old configs.
pub parallel_cpu_threads: usize,
/// Test-only option that makes Zebra say it is at the chain tip,

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
//! Constant values used in mining rpcs methods.
use jsonrpc_core::ErrorCode;
use jsonrpsee_types::ErrorCode;
use zebra_chain::{
block,

View File

@ -2,7 +2,8 @@
use std::{collections::HashMap, iter, sync::Arc};
use jsonrpc_core::{Error, ErrorCode, Result};
use jsonrpsee::core::RpcResult as Result;
use jsonrpsee_types::{ErrorCode, ErrorObject};
use tower::{Service, ServiceExt};
use zebra_chain::{
@ -61,25 +62,23 @@ pub fn check_parameters(parameters: &Option<JsonParameters>) -> Result<()> {
mode: GetBlockTemplateRequestMode::Proposal,
data: None,
..
} => Err(Error {
code: ErrorCode::InvalidParams,
message: "\"data\" parameter must be \
provided in \"proposal\" mode"
.to_string(),
data: None,
}),
} => Err(ErrorObject::borrowed(
ErrorCode::InvalidParams.code(),
"\"data\" parameter must be \
provided in \"proposal\" mode",
None,
)),
JsonParameters {
mode: GetBlockTemplateRequestMode::Template,
data: Some(_),
..
} => Err(Error {
code: ErrorCode::InvalidParams,
message: "\"data\" parameter must be \
omitted in \"template\" mode"
.to_string(),
data: None,
}),
} => Err(ErrorObject::borrowed(
ErrorCode::InvalidParams.code(),
"\"data\" parameter must be \
omitted in \"template\" mode",
None,
)),
}
}
@ -131,11 +130,7 @@ where
let block_verifier_router_response = block_verifier_router
.ready()
.await
.map_err(|error| Error {
code: ErrorCode::ServerError(0),
message: error.to_string(),
data: None,
})?
.map_err(|error| ErrorObject::owned(0, error.to_string(), None::<()>))?
.call(zebra_consensus::Request::CheckProposal(Arc::new(block)))
.await;
@ -189,16 +184,14 @@ where
Hint: check your network connection, clock, and time zone settings."
);
return Err(Error {
code: NOT_SYNCED_ERROR_CODE,
message: format!(
"Zebra has not synced to the chain tip, \
return Err(ErrorObject::borrowed(
NOT_SYNCED_ERROR_CODE.code(),
"Zebra has not synced to the chain tip, \
estimated distance: {estimated_distance_to_chain_tip:?}, \
local tip: {local_tip_height:?}. \
Hint: check your network connection, clock, and time zone settings."
),
data: None,
});
Hint: check your network connection, clock, and time zone settings.",
None,
));
}
Ok(())
@ -227,11 +220,7 @@ where
let response = state
.oneshot(request.clone())
.await
.map_err(|error| Error {
code: ErrorCode::ServerError(0),
message: error.to_string(),
data: None,
})?;
.map_err(|error| ErrorObject::owned(0, error.to_string(), None::<()>))?;
let chain_info = match response {
zebra_state::ReadResponse::ChainInfo(chain_info) => chain_info,
@ -261,11 +250,7 @@ where
let response = mempool
.oneshot(mempool::Request::FullTransactions)
.await
.map_err(|error| Error {
code: ErrorCode::ServerError(0),
message: error.to_string(),
data: None,
})?;
.map_err(|error| ErrorObject::owned(0, error.to_string(), None::<()>))?;
// TODO: Order transactions in block templates based on their dependencies

View File

@ -3,7 +3,7 @@
use zebra_chain::parameters::Network;
/// Response to a `getmininginfo` RPC request.
#[derive(Debug, Default, PartialEq, Eq, serde::Serialize)]
#[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize)]
pub struct Response {
/// The current tip height.
#[serde(rename = "blocks")]

View File

@ -2,11 +2,11 @@
// Allow doc links to these imports.
#[allow(unused_imports)]
use crate::methods::get_block_template_rpcs::GetBlockTemplateRpc;
use crate::methods::get_block_template_rpcs::GetBlockTemplate;
/// Optional argument `jsonparametersobject` for `submitblock` RPC request
///
/// See notes for [`GetBlockTemplateRpc::submit_block`] method
/// See notes for [`crate::methods::GetBlockTemplateRpcServer::submit_block`] method
#[derive(Clone, Debug, PartialEq, Eq, serde::Deserialize)]
pub struct JsonParameters {
/// The workid for the block template. Currently unused.
@ -28,7 +28,7 @@ pub struct JsonParameters {
/// Response to a `submitblock` RPC request.
///
/// Zebra never returns "duplicate-invalid", because it does not store invalid blocks.
#[derive(Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum ErrorResponse {
/// Block was already committed to the non-finalized or finalized state
@ -44,7 +44,7 @@ pub enum ErrorResponse {
/// Response to a `submitblock` RPC request.
///
/// Zebra never returns "duplicate-invalid", because it does not store invalid blocks.
#[derive(Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(untagged)]
pub enum Response {
/// Block was not successfully submitted, return error

View File

@ -4,7 +4,7 @@ use std::{collections::HashSet, fmt::Debug, sync::Arc};
use futures::{join, FutureExt, TryFutureExt};
use hex::{FromHex, ToHex};
use jsonrpc_core::{Error, ErrorCode};
use jsonrpsee_types::{ErrorCode, ErrorObject};
use proptest::{collection::vec, prelude::*};
use thiserror::Error;
use tokio::sync::oneshot;
@ -28,7 +28,7 @@ use zebra_test::mock_service::MockService;
use crate::methods;
use super::super::{
AddressBalance, AddressStrings, NetworkUpgradeStatus, Rpc, RpcImpl, SentTransactionHash,
AddressBalance, AddressStrings, NetworkUpgradeStatus, RpcImpl, RpcServer, SentTransactionHash,
};
proptest! {
@ -49,7 +49,7 @@ proptest! {
let transaction_hex = hex::encode(&transaction_bytes);
let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex));
let send_task = tokio::spawn(async move { rpc.send_raw_transaction(transaction_hex).await });
let unmined_transaction = UnminedTx::from(transaction);
let expected_request = mempool::Request::Queue(vec![unmined_transaction.into()]);
@ -64,7 +64,7 @@ proptest! {
state.expect_no_requests().await?;
let result = send_task.await?;
let result = send_task.await.expect("send_raw_transaction should not panic");
prop_assert_eq!(result, Ok(hash));
@ -91,7 +91,9 @@ proptest! {
let transaction_bytes = transaction.zcash_serialize_to_vec()?;
let transaction_hex = hex::encode(&transaction_bytes);
let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex.clone()));
let _rpc = rpc.clone();
let _transaction_hex = transaction_hex.clone();
let send_task = tokio::spawn(async move { _rpc.send_raw_transaction(_transaction_hex).await });
let unmined_transaction = UnminedTx::from(transaction);
let expected_request = mempool::Request::Queue(vec![unmined_transaction.clone().into()]);
@ -103,11 +105,11 @@ proptest! {
state.expect_no_requests().await?;
let result = send_task.await?;
let result = send_task.await.expect("send_raw_transaction should not panic");
check_err_code(result, ErrorCode::ServerError(-1))?;
let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex));
let send_task = tokio::spawn(async move { rpc.send_raw_transaction(transaction_hex.clone()).await });
let expected_request = mempool::Request::Queue(vec![unmined_transaction.clone().into()]);
@ -118,7 +120,7 @@ proptest! {
.await?
.respond(Ok::<_, BoxError>(mempool::Response::Queued(vec![Ok(rsp_rx)])));
let result = send_task.await?;
let result = send_task.await.expect("send_raw_transaction should not panic");
check_err_code(result, ErrorCode::ServerError(-25))?;
@ -173,13 +175,13 @@ proptest! {
tokio::time::pause();
runtime.block_on(async move {
let send_task = tokio::spawn(rpc.send_raw_transaction(non_hex_string));
let send_task = rpc.send_raw_transaction(non_hex_string);
// Check that there are no further requests.
mempool.expect_no_requests().await?;
state.expect_no_requests().await?;
check_err_code(send_task.await?, ErrorCode::ServerError(-22))?;
check_err_code(send_task.await, ErrorCode::ServerError(-22))?;
// The queue task should continue without errors or panics
prop_assert!(mempool_tx_queue.now_or_never().is_none());
@ -204,12 +206,12 @@ proptest! {
prop_assume!(Transaction::zcash_deserialize(&*random_bytes).is_err());
runtime.block_on(async move {
let send_task = tokio::spawn(rpc.send_raw_transaction(hex::encode(random_bytes)));
let send_task = rpc.send_raw_transaction(hex::encode(random_bytes));
mempool.expect_no_requests().await?;
state.expect_no_requests().await?;
check_err_code(send_task.await?, ErrorCode::ServerError(-22))?;
check_err_code(send_task.await, ErrorCode::ServerError(-22))?;
// The queue task should continue without errors or panics
prop_assert!(mempool_tx_queue.now_or_never().is_none());
@ -374,8 +376,8 @@ proptest! {
let (response, _) = tokio::join!(response_fut, mock_state_handler);
prop_assert_eq!(
&response.err().unwrap().message,
"no chain tip available yet"
response.err().unwrap().message().to_string(),
"no chain tip available yet".to_string()
);
mempool.expect_no_requests().await?;
@ -603,8 +605,10 @@ proptest! {
let transaction_hash = tx.hash();
let tx_bytes = tx.zcash_serialize_to_vec()?;
let tx_hex = hex::encode(&tx_bytes);
let send_task = tokio::spawn(rpc.send_raw_transaction(tx_hex));
let send_task = {
let rpc = rpc.clone();
tokio::task::spawn(async move { rpc.send_raw_transaction(tx_hex).await })
};
let tx_unmined = UnminedTx::from(tx);
let expected_request = mempool::Request::Queue(vec![tx_unmined.clone().into()]);
@ -678,10 +682,11 @@ proptest! {
runtime.block_on(async move {
let mut transactions_hash_set = HashSet::new();
for tx in txs.clone() {
let rpc_clone = rpc.clone();
// send a transaction
let tx_bytes = tx.zcash_serialize_to_vec()?;
let tx_hex = hex::encode(&tx_bytes);
let send_task = tokio::spawn(rpc.send_raw_transaction(tx_hex));
let send_task = tokio::task::spawn(async move { rpc_clone.send_raw_transaction(tx_hex).await });
let tx_unmined = UnminedTx::from(tx.clone());
let expected_request = mempool::Request::Queue(vec![tx_unmined.clone().into()]);
@ -768,11 +773,22 @@ fn invalid_txid() -> BoxedStrategy<String> {
}
/// Checks that the given RPC response contains the given error code.
fn check_err_code<T>(rsp: Result<T, Error>, error_code: ErrorCode) -> Result<(), TestCaseError> {
prop_assert!(
matches!(&rsp, Err(Error { code, .. }) if *code == error_code),
"the RPC response must match the error code: {error_code:?}"
);
fn check_err_code<T>(
rsp: Result<T, ErrorObject>,
error_code: ErrorCode,
) -> Result<(), TestCaseError> {
match rsp {
Err(e) => {
prop_assert!(
e.code() == error_code.code(),
"the RPC response must match the error code: {:?}",
error_code.code()
);
}
Ok(_) => {
prop_assert!(false, "expected an error response, but got Ok");
}
}
Ok(())
}

View File

@ -7,7 +7,9 @@
use std::{collections::BTreeMap, sync::Arc};
use futures::FutureExt;
use insta::dynamic_redaction;
use jsonrpsee::core::RpcResult as Result;
use tower::buffer::Buffer;
use zebra_chain::{

View File

@ -12,7 +12,7 @@ use std::{
use hex::FromHex;
use insta::Settings;
use jsonrpc_core::Result;
use jsonrpsee::core::RpcResult as Result;
use tower::{buffer::Buffer, Service};
use zebra_chain::{
@ -47,7 +47,7 @@ use crate::methods::{
},
hex_data::HexData,
tests::{snapshot::EXCESSIVE_BLOCK_HEIGHT, utils::fake_history_tree},
GetBlockHash, GetBlockTemplateRpc, GetBlockTemplateRpcImpl,
GetBlockHash, GetBlockTemplateRpcImpl, GetBlockTemplateRpcServer,
};
pub async fn test_responses<State, ReadState>(
@ -488,20 +488,18 @@ pub async fn test_responses<State, ReadState>(
// `z_listunifiedreceivers`
let ua1 = String::from("u1l8xunezsvhq8fgzfl7404m450nwnd76zshscn6nfys7vyz2ywyh4cc5daaq0c7q2su5lqfh23sp7fkf3kt27ve5948mzpfdvckzaect2jtte308mkwlycj2u0eac077wu70vqcetkxf");
let z_list_unified_receivers =
tokio::spawn(get_block_template_rpc.z_list_unified_receivers(ua1))
.await
.expect("unexpected panic in z_list_unified_receivers RPC task")
.expect("unexpected error in z_list_unified_receivers RPC call");
let z_list_unified_receivers = get_block_template_rpc
.z_list_unified_receivers(ua1)
.await
.expect("unexpected error in z_list_unified_receivers RPC call");
snapshot_rpc_z_listunifiedreceivers("ua1", z_list_unified_receivers, &settings);
let ua2 = String::from("u1uf4qsmh037x2jp6k042h9d2w22wfp39y9cqdf8kcg0gqnkma2gf4g80nucnfeyde8ev7a6kf0029gnwqsgadvaye9740gzzpmr67nfkjjvzef7rkwqunqga4u4jges4tgptcju5ysd0");
let z_list_unified_receivers =
tokio::spawn(get_block_template_rpc.z_list_unified_receivers(ua2))
.await
.expect("unexpected panic in z_list_unified_receivers RPC task")
.expect("unexpected error in z_list_unified_receivers RPC call");
let z_list_unified_receivers = get_block_template_rpc
.z_list_unified_receivers(ua2)
.await
.expect("unexpected error in z_list_unified_receivers RPC call");
snapshot_rpc_z_listunifiedreceivers("ua2", z_list_unified_receivers, &settings);
}

View File

@ -3,6 +3,7 @@
use std::ops::RangeInclusive;
use std::sync::Arc;
use futures::FutureExt;
use tower::buffer::Buffer;
use zebra_chain::serialization::ZcashSerialize;
@ -495,7 +496,7 @@ async fn rpc_getblock_missing_error() {
// Make sure Zebra returns the correct error code `-8` for missing blocks
// https://github.com/zcash/lightwalletd/blob/v0.4.16/common/common.go#L287-L290
let block_future = tokio::spawn(rpc.get_block("0".to_string(), Some(0u8)));
let block_future = tokio::spawn(async move { rpc.get_block("0".to_string(), Some(0u8)).await });
// Make the mock service respond with no block
let response_handler = state
@ -503,11 +504,10 @@ async fn rpc_getblock_missing_error() {
.await;
response_handler.respond(zebra_state::ReadResponse::Block(None));
let block_response = block_future.await;
let block_response = block_response
.expect("unexpected panic in spawned request future")
.expect_err("unexpected success from missing block state response");
assert_eq!(block_response.code, ErrorCode::ServerError(-8),);
let block_response = block_future.await.expect("block future should not panic");
let block_response =
block_response.expect_err("unexpected success from missing block state response");
assert_eq!(block_response.code(), ErrorCode::ServerError(-8).code());
// Now check the error string the way `lightwalletd` checks it
assert_eq!(
@ -898,7 +898,7 @@ async fn rpc_getaddresstxids_invalid_arguments() {
.await
.unwrap_err();
assert_eq!(rpc_rsp.code, ErrorCode::ServerError(-5));
assert_eq!(rpc_rsp.code(), ErrorCode::ServerError(-5).code());
mempool.expect_no_requests().await;
@ -918,7 +918,7 @@ async fn rpc_getaddresstxids_invalid_arguments() {
.await
.unwrap_err();
assert_eq!(
error.message,
error.message(),
"start Height(2) must be less than or equal to end Height(1)".to_string()
);
@ -934,7 +934,7 @@ async fn rpc_getaddresstxids_invalid_arguments() {
.await
.unwrap_err();
assert_eq!(
error.message,
error.message(),
"start Height(0) and end Height(1) must both be greater than zero".to_string()
);
@ -950,7 +950,7 @@ async fn rpc_getaddresstxids_invalid_arguments() {
.await
.unwrap_err();
assert_eq!(
error.message,
error.message(),
"start Height(1) and end Height(11) must both be less than or equal to the chain tip Height(10)".to_string()
);
@ -1096,7 +1096,7 @@ async fn rpc_getaddressutxos_invalid_arguments() {
.await
.unwrap_err();
assert_eq!(error.code, ErrorCode::ServerError(-5));
assert_eq!(error.code(), ErrorCode::ServerError(-5).code());
mempool.expect_no_requests().await;
state.expect_no_requests().await;
@ -1253,7 +1253,10 @@ async fn rpc_getblockcount_empty_state() {
assert!(get_block_count.is_err());
// Check the error we got is the correct one
assert_eq!(get_block_count.err().unwrap().message, "No blocks in state");
assert_eq!(
get_block_count.err().unwrap().message(),
"No blocks in state"
);
mempool.expect_no_requests().await;
}
@ -1697,8 +1700,8 @@ async fn rpc_getblocktemplate_mining_address(use_p2pkh: bool) {
.expect_err("needs an error when estimated distance to network chain tip is far");
assert_eq!(
get_block_template_sync_error.code,
ErrorCode::ServerError(-10)
get_block_template_sync_error.code(),
ErrorCode::ServerError(-10).code()
);
mock_sync_status.set_is_close_to_tip(false);
@ -1710,8 +1713,8 @@ async fn rpc_getblocktemplate_mining_address(use_p2pkh: bool) {
.expect_err("needs an error when syncer is not close to tip");
assert_eq!(
get_block_template_sync_error.code,
ErrorCode::ServerError(-10)
get_block_template_sync_error.code(),
ErrorCode::ServerError(-10).code()
);
mock_chain_tip_sender.send_estimated_distance_to_network_chain_tip(Some(200));
@ -1721,8 +1724,8 @@ async fn rpc_getblocktemplate_mining_address(use_p2pkh: bool) {
.expect_err("needs an error when syncer is not close to tip or estimated distance to network chain tip is far");
assert_eq!(
get_block_template_sync_error.code,
ErrorCode::ServerError(-10)
get_block_template_sync_error.code(),
ErrorCode::ServerError(-10).code()
);
let get_block_template_sync_error = get_block_template_rpc
@ -1733,7 +1736,10 @@ async fn rpc_getblocktemplate_mining_address(use_p2pkh: bool) {
.await
.expect_err("needs an error when called in proposal mode without data");
assert_eq!(get_block_template_sync_error.code, ErrorCode::InvalidParams);
assert_eq!(
get_block_template_sync_error.code(),
ErrorCode::InvalidParams.code()
);
let get_block_template_sync_error = get_block_template_rpc
.get_block_template(Some(get_block_template::JsonParameters {
@ -1743,7 +1749,10 @@ async fn rpc_getblocktemplate_mining_address(use_p2pkh: bool) {
.await
.expect_err("needs an error when passing in block data in template mode");
assert_eq!(get_block_template_sync_error.code, ErrorCode::InvalidParams);
assert_eq!(
get_block_template_sync_error.code(),
ErrorCode::InvalidParams.code()
);
// The long poll id is valid, so it returns a state error instead
let get_block_template_sync_error = get_block_template_rpc
@ -1761,8 +1770,8 @@ async fn rpc_getblocktemplate_mining_address(use_p2pkh: bool) {
.expect_err("needs an error when the state is empty");
assert_eq!(
get_block_template_sync_error.code,
ErrorCode::ServerError(-10)
get_block_template_sync_error.code(),
ErrorCode::ServerError(-10).code()
);
// Try getting mempool transactions with a different tip hash

View File

@ -7,12 +7,11 @@
//! See the full list of
//! [Differences between JSON-RPC 1.0 and 2.0.](https://www.simple-is-better.org/rpc/#differences-between-1-0-and-2-0)
use std::{fmt, panic, thread::available_parallelism};
use std::{fmt, panic};
use cookie::Cookie;
use http_request_compatibility::With;
use jsonrpc_core::{Compatibility, MetaIoHandler};
use jsonrpc_http_server::{CloseHandle, ServerBuilder};
use jsonrpsee::server::middleware::rpc::RpcServiceBuilder;
use jsonrpsee::server::{Server, ServerHandle};
use tokio::task::JoinHandle;
use tower::Service;
use tracing::*;
@ -25,15 +24,15 @@ use zebra_node_services::mempool;
use crate::{
config::Config,
methods::{Rpc, RpcImpl},
methods::{RpcImpl, RpcServer as _},
server::{
http_request_compatibility::HttpRequestMiddleware,
http_request_compatibility::HttpRequestMiddlewareLayer,
rpc_call_compatibility::FixRpcResponseMiddleware,
},
};
#[cfg(feature = "getblocktemplate-rpcs")]
use crate::methods::{GetBlockTemplateRpc, GetBlockTemplateRpcImpl};
use crate::methods::{GetBlockTemplateRpcImpl, GetBlockTemplateRpcServer};
pub mod cookie;
pub mod error;
@ -55,8 +54,8 @@ pub struct RpcServer {
/// Zebra's application version, with build metadata.
build_version: String,
/// A handle that shuts down the RPC server.
close_handle: CloseHandle,
/// A server handle used to shuts down the RPC server.
close_handle: ServerHandle,
}
impl fmt::Debug for RpcServer {
@ -68,7 +67,7 @@ impl fmt::Debug for RpcServer {
.field(
"close_handle",
// TODO: when it stabilises, use std::any::type_name_of_val(&self.close_handle)
&"CloseHandle",
&"ServerHandle",
)
.finish()
}
@ -77,6 +76,8 @@ impl fmt::Debug for RpcServer {
/// The message to log when logging the RPC server's listen address
pub const OPENED_RPC_ENDPOINT_MSG: &str = "Opened RPC endpoint at ";
type ServerTask = JoinHandle<Result<(), tower::BoxError>>;
impl RpcServer {
/// Start a new RPC server endpoint using the supplied configs and services.
///
@ -90,7 +91,7 @@ impl RpcServer {
// - put some of the configs or services in their own struct?
// - replace VersionString with semver::Version, and update the tests to provide valid versions
#[allow(clippy::too_many_arguments)]
pub fn spawn<
pub async fn spawn<
VersionString,
UserAgentString,
Mempool,
@ -115,7 +116,7 @@ impl RpcServer {
address_book: AddressBook,
latest_chain_tip: Tip,
network: Network,
) -> (JoinHandle<()>, JoinHandle<()>, Option<Self>)
) -> Result<(ServerTask, JoinHandle<()>), tower::BoxError>
where
VersionString: ToString + Clone + Send + 'static,
UserAgentString: ToString + Clone + Send + 'static,
@ -150,136 +151,79 @@ impl RpcServer {
SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
{
if let Some(listen_addr) = config.listen_addr {
info!("Trying to open RPC endpoint at {}...", listen_addr,);
let listen_addr = config
.listen_addr
.expect("caller should make sure listen_addr is set");
// Create handler compatible with V1 and V2 RPC protocols
let mut io: MetaIoHandler<(), _> =
MetaIoHandler::new(Compatibility::Both, FixRpcResponseMiddleware);
#[cfg(feature = "getblocktemplate-rpcs")]
// Initialize the getblocktemplate rpc method handler
let get_block_template_rpc_impl = GetBlockTemplateRpcImpl::new(
&network,
mining_config.clone(),
mempool.clone(),
state.clone(),
latest_chain_tip.clone(),
block_verifier_router,
sync_status,
address_book,
);
// Initialize the rpc methods with the zebra version
let (rpc_impl, rpc_tx_queue_task_handle) = RpcImpl::new(
build_version.clone(),
user_agent,
network.clone(),
config.debug_force_finished_sync,
#[cfg(feature = "getblocktemplate-rpcs")]
{
// Initialize the getblocktemplate rpc method handler
let get_block_template_rpc_impl = GetBlockTemplateRpcImpl::new(
&network,
mining_config.clone(),
mempool.clone(),
state.clone(),
latest_chain_tip.clone(),
block_verifier_router,
sync_status,
address_book,
);
mining_config.debug_like_zcashd,
#[cfg(not(feature = "getblocktemplate-rpcs"))]
true,
mempool,
state,
latest_chain_tip,
);
io.extend_with(get_block_template_rpc_impl.to_delegate());
}
// Initialize the rpc methods with the zebra version
let (rpc_impl, rpc_tx_queue_task_handle) = RpcImpl::new(
build_version.clone(),
user_agent,
network.clone(),
config.debug_force_finished_sync,
#[cfg(feature = "getblocktemplate-rpcs")]
mining_config.debug_like_zcashd,
#[cfg(not(feature = "getblocktemplate-rpcs"))]
true,
mempool,
state,
latest_chain_tip,
);
io.extend_with(rpc_impl.to_delegate());
// If zero, automatically scale threads to the number of CPU cores
let mut parallel_cpu_threads = config.parallel_cpu_threads;
if parallel_cpu_threads == 0 {
parallel_cpu_threads = available_parallelism().map(usize::from).unwrap_or(1);
}
// The server is a blocking task, which blocks on executor shutdown.
// So we need to start it in a std::thread.
// (Otherwise tokio panics on RPC port conflict, which shuts down the RPC server.)
let span = Span::current();
let start_server = move || {
span.in_scope(|| {
let middleware = if config.enable_cookie_auth {
let cookie = Cookie::default();
cookie::write_to_disk(&cookie, &config.cookie_dir)
.expect("Zebra must be able to write the auth cookie to the disk");
HttpRequestMiddleware::default().with(cookie)
} else {
HttpRequestMiddleware::default()
};
// Use a different tokio executor from the rest of Zebra,
// so that large RPCs and any task handling bugs don't impact Zebra.
let server_instance = ServerBuilder::new(io)
.threads(parallel_cpu_threads)
// TODO: disable this security check if we see errors from lightwalletd
//.allowed_hosts(DomainsValidation::Disabled)
.request_middleware(middleware)
.start_http(&listen_addr)
.expect("Unable to start RPC server");
info!("{OPENED_RPC_ENDPOINT_MSG}{}", server_instance.address());
let close_handle = server_instance.close_handle();
let rpc_server_handle = RpcServer {
config,
network,
build_version: build_version.to_string(),
close_handle,
};
(server_instance, rpc_server_handle)
})
};
// Propagate panics from the std::thread
let (server_instance, rpc_server_handle) = match std::thread::spawn(start_server).join()
{
Ok(rpc_server) => rpc_server,
Err(panic_object) => panic::resume_unwind(panic_object),
};
// The server is a blocking task, which blocks on executor shutdown.
// So we need to wait on it on a std::thread, inside a tokio blocking task.
// (Otherwise tokio panics when we shut down the RPC server.)
let span = Span::current();
let wait_on_server = move || {
span.in_scope(|| {
server_instance.wait();
info!("Stopped RPC endpoint");
})
};
let span = Span::current();
let rpc_server_task_handle = tokio::task::spawn_blocking(move || {
let thread_handle = std::thread::spawn(wait_on_server);
// Propagate panics from the inner std::thread to the outer tokio blocking task
span.in_scope(|| match thread_handle.join() {
Ok(()) => (),
Err(panic_object) => panic::resume_unwind(panic_object),
})
});
(
rpc_server_task_handle,
rpc_tx_queue_task_handle,
Some(rpc_server_handle),
)
let http_middleware_layer = if config.enable_cookie_auth {
let cookie = Cookie::default();
cookie::write_to_disk(&cookie, &config.cookie_dir)
.expect("Zebra must be able to write the auth cookie to the disk");
HttpRequestMiddlewareLayer::new(Some(cookie))
} else {
// There is no RPC port, so the RPC tasks do nothing.
(
tokio::task::spawn(futures::future::pending().in_current_span()),
tokio::task::spawn(futures::future::pending().in_current_span()),
None,
)
}
HttpRequestMiddlewareLayer::new(None)
};
let http_middleware = tower::ServiceBuilder::new().layer(http_middleware_layer);
let rpc_middleware = RpcServiceBuilder::new()
.rpc_logger(1024)
.layer_fn(FixRpcResponseMiddleware::new);
let server_instance = Server::builder()
.http_only()
.set_http_middleware(http_middleware)
.set_rpc_middleware(rpc_middleware)
.build(listen_addr)
.await
.expect("Unable to start RPC server");
let addr = server_instance
.local_addr()
.expect("Unable to get local address");
info!("{OPENED_RPC_ENDPOINT_MSG}{}", addr);
#[cfg(feature = "getblocktemplate-rpcs")]
let mut rpc_module = rpc_impl.into_rpc();
#[cfg(not(feature = "getblocktemplate-rpcs"))]
let rpc_module = rpc_impl.into_rpc();
#[cfg(feature = "getblocktemplate-rpcs")]
rpc_module
.merge(get_block_template_rpc_impl.into_rpc())
.unwrap();
let server_task: JoinHandle<Result<(), tower::BoxError>> = tokio::spawn(async move {
server_instance.start(rpc_module).stopped().await;
Ok(())
});
Ok((server_task, rpc_tx_queue_task_handle))
}
/// Shut down this RPC server, blocking the current thread.
@ -305,7 +249,7 @@ impl RpcServer {
/// Shuts down this RPC server using its `close_handle`.
///
/// See `shutdown_blocking()` for details.
fn shutdown_blocking_inner(close_handle: CloseHandle, config: Config) {
fn shutdown_blocking_inner(close_handle: ServerHandle, config: Config) {
// The server is a blocking task, so it can't run inside a tokio thread.
// See the note at wait_on_server.
let span = Span::current();
@ -321,7 +265,7 @@ impl RpcServer {
}
info!("Stopping RPC server");
close_handle.clone().close();
let _ = close_handle.stop();
debug!("Stopped RPC server");
})
};

View File

@ -1,4 +1,5 @@
//! RPC error codes & their handling.
use jsonrpsee_types::{ErrorCode, ErrorObject, ErrorObjectOwned};
/// Bitcoin RPC error codes
///
@ -51,22 +52,25 @@ pub enum LegacyCode {
ClientInvalidIpOrSubnet = -30,
}
impl From<LegacyCode> for jsonrpc_core::ErrorCode {
impl From<LegacyCode> for ErrorCode {
fn from(code: LegacyCode) -> Self {
Self::ServerError(code as i64)
Self::ServerError(code as i32)
}
}
/// A trait for mapping errors to [`jsonrpc_core::Error`].
pub(crate) trait MapError<T>: Sized {
/// Maps errors to [`jsonrpc_core::Error`] with a specific error code.
fn map_error(
self,
code: impl Into<jsonrpc_core::ErrorCode>,
) -> std::result::Result<T, jsonrpc_core::Error>;
impl From<LegacyCode> for i32 {
fn from(code: LegacyCode) -> Self {
code as i32
}
}
/// Maps errors to [`jsonrpc_core::Error`] with a [`LegacyCode::Misc`] error code.
fn map_misc_error(self) -> std::result::Result<T, jsonrpc_core::Error> {
/// A trait for mapping errors to [`jsonrpsee_types::ErrorObjectOwned`].
pub(crate) trait MapError<T>: Sized {
/// Maps errors to [`jsonrpsee_types::ErrorObjectOwned`] with a specific error code.
fn map_error(self, code: impl Into<ErrorCode>) -> std::result::Result<T, ErrorObjectOwned>;
/// Maps errors to [`jsonrpsee_types::ErrorObjectOwned`] with a [`LegacyCode::Misc`] error code.
fn map_misc_error(self) -> std::result::Result<T, ErrorObjectOwned> {
self.map_error(LegacyCode::Misc)
}
}
@ -77,15 +81,12 @@ pub(crate) trait OkOrError<T>: Sized {
/// message if conversion is to `Err`.
fn ok_or_error(
self,
code: impl Into<jsonrpc_core::ErrorCode>,
code: impl Into<ErrorCode>,
message: impl ToString,
) -> std::result::Result<T, jsonrpc_core::Error>;
) -> std::result::Result<T, ErrorObjectOwned>;
/// Converts the implementing type to `Result<T, jsonrpc_core::Error>`, using a [`LegacyCode::Misc`] error code.
fn ok_or_misc_error(
self,
message: impl ToString,
) -> std::result::Result<T, jsonrpc_core::Error> {
fn ok_or_misc_error(self, message: impl ToString) -> std::result::Result<T, ErrorObjectOwned> {
self.ok_or_error(LegacyCode::Misc, message)
}
}
@ -94,25 +95,21 @@ impl<T, E> MapError<T> for Result<T, E>
where
E: ToString,
{
fn map_error(self, code: impl Into<jsonrpc_core::ErrorCode>) -> Result<T, jsonrpc_core::Error> {
self.map_err(|error| jsonrpc_core::Error {
code: code.into(),
message: error.to_string(),
data: None,
})
fn map_error(self, code: impl Into<ErrorCode>) -> Result<T, ErrorObjectOwned> {
self.map_err(|error| ErrorObject::owned(code.into().code(), error.to_string(), None::<()>))
}
}
impl<T> OkOrError<T> for Option<T> {
fn ok_or_error(
self,
code: impl Into<jsonrpc_core::ErrorCode>,
code: impl Into<ErrorCode>,
message: impl ToString,
) -> Result<T, jsonrpc_core::Error> {
self.ok_or(jsonrpc_core::Error {
code: code.into(),
message: message.to_string(),
data: None,
})
) -> Result<T, ErrorObjectOwned> {
self.ok_or(ErrorObject::owned(
code.into().code(),
message.to_string(),
None::<()>,
))
}
}

View File

@ -2,16 +2,25 @@
//!
//! These fixes are applied at the HTTP level, before the RPC request is parsed.
use base64::{engine::general_purpose::URL_SAFE, Engine as _};
use futures::TryStreamExt;
use jsonrpc_http_server::{
hyper::{body::Bytes, header, Body, Request},
RequestMiddleware, RequestMiddlewareAction,
use std::future::Future;
use std::pin::Pin;
use futures::{future, FutureExt};
use http_body_util::BodyExt;
use hyper::{body::Bytes, header};
use jsonrpsee::{
core::BoxError,
server::{HttpBody, HttpRequest, HttpResponse},
};
use jsonrpsee_types::ErrorObject;
use tower::Service;
use super::cookie::Cookie;
/// HTTP [`RequestMiddleware`] with compatibility workarounds.
use base64::{engine::general_purpose::URL_SAFE, Engine as _};
/// HTTP [`HttpRequestMiddleware`] with compatibility workarounds.
///
/// This middleware makes the following changes to HTTP requests:
///
@ -25,7 +34,7 @@ use super::cookie::Cookie;
/// ### Add missing `content-type` HTTP header
///
/// Some RPC clients don't include a `content-type` HTTP header.
/// But unlike web browsers, [`jsonrpc_http_server`] does not do content sniffing.
/// But unlike web browsers, [`jsonrpsee`] does not do content sniffing.
///
/// If there is no `content-type` header, we assume the content is JSON,
/// and let the parser error if we are incorrect.
@ -42,103 +51,30 @@ use super::cookie::Cookie;
/// Any user-specified data in RPC requests is hex or base58check encoded.
/// We assume lightwalletd validates data encodings before sending it on to Zebra.
/// So any fixes Zebra performs won't change user-specified data.
#[derive(Clone, Debug, Default)]
pub struct HttpRequestMiddleware {
#[derive(Clone, Debug)]
pub struct HttpRequestMiddleware<S> {
service: S,
cookie: Option<Cookie>,
}
/// A trait for updating an object, consuming it and returning the updated version.
pub trait With<T> {
/// Updates `self` with an instance of type `T` and returns the updated version of `self`.
fn with(self, _: T) -> Self;
}
impl With<Cookie> for HttpRequestMiddleware {
fn with(mut self, cookie: Cookie) -> Self {
self.cookie = Some(cookie);
self
impl<S> HttpRequestMiddleware<S> {
/// Create a new `HttpRequestMiddleware` with the given service and cookie.
pub fn new(service: S, cookie: Option<Cookie>) -> Self {
Self { service, cookie }
}
}
impl RequestMiddleware for HttpRequestMiddleware {
fn on_request(&self, mut request: Request<Body>) -> RequestMiddlewareAction {
tracing::trace!(?request, "original HTTP request");
// Check if the request is authenticated
if !self.check_credentials(request.headers_mut()) {
let error = jsonrpc_core::Error {
code: jsonrpc_core::ErrorCode::ServerError(401),
message: "unauthenticated method".to_string(),
data: None,
};
return jsonrpc_http_server::Response {
code: jsonrpc_http_server::hyper::StatusCode::from_u16(401)
.expect("hard-coded status code should be valid"),
content_type: header::HeaderValue::from_static("application/json; charset=utf-8"),
content: serde_json::to_string(&jsonrpc_core::Response::from(error, None))
.expect("hard-coded result should serialize"),
}
.into();
}
// Fix the request headers if needed and we can do so.
HttpRequestMiddleware::insert_or_replace_content_type_header(request.headers_mut());
// Fix the request body
let request = request.map(|body| {
let body = body.map_ok(|data| {
// To simplify data handling, we assume that any search strings won't be split
// across multiple `Bytes` data buffers.
//
// To simplify error handling, Zebra only supports valid UTF-8 requests,
// and uses lossy UTF-8 conversion.
//
// JSON-RPC requires all requests to be valid UTF-8.
// The lower layers should reject invalid requests with lossy changes.
// But if they accept some lossy changes, that's ok,
// because the request was non-standard anyway.
//
// We're not concerned about performance here, so we just clone the Cow<str>
let data = String::from_utf8_lossy(data.as_ref()).to_string();
// Fix up the request.
let data = Self::remove_json_1_fields(data);
Bytes::from(data)
});
Body::wrap_stream(body)
});
tracing::trace!(?request, "modified HTTP request");
RequestMiddlewareAction::Proceed {
// TODO: disable this security check if we see errors from lightwalletd.
should_continue_on_invalid_cors: false,
request,
}
}
}
impl HttpRequestMiddleware {
/// Remove any "jsonrpc: 1.0" fields in `data`, and return the resulting string.
pub fn remove_json_1_fields(data: String) -> String {
// Replace "jsonrpc = 1.0":
// - at the start or middle of a list, and
// - at the end of a list;
// with no spaces (lightwalletd format), and spaces after separators (example format).
//
// TODO: if we see errors from lightwalletd, make this replacement more accurate:
// - use a partial JSON fragment parser
// - combine the whole request into a single buffer, and use a JSON parser
// - use a regular expression
//
// We could also just handle the exact lightwalletd format,
// by replacing `{"jsonrpc":"1.0",` with `{`.
data.replace("\"jsonrpc\":\"1.0\",", "")
.replace("\"jsonrpc\": \"1.0\",", "")
.replace(",\"jsonrpc\":\"1.0\"", "")
.replace(", \"jsonrpc\": \"1.0\"", "")
/// Check if the request is authenticated.
pub fn check_credentials(&self, headers: &header::HeaderMap) -> bool {
self.cookie.as_ref().map_or(true, |internal_cookie| {
headers
.get(header::AUTHORIZATION)
.and_then(|auth_header| auth_header.to_str().ok())
.and_then(|auth_header| auth_header.split_whitespace().nth(1))
.and_then(|encoded| URL_SAFE.decode(encoded).ok())
.and_then(|decoded| String::from_utf8(decoded).ok())
.and_then(|request_cookie| request_cookie.split(':').nth(1).map(String::from))
.map_or(false, |passwd| internal_cookie.authenticate(passwd))
})
}
/// Insert or replace client supplied `content-type` HTTP header to `application/json` in the following cases:
@ -182,17 +118,110 @@ impl HttpRequestMiddleware {
}
}
/// Check if the request is authenticated.
pub fn check_credentials(&self, headers: &header::HeaderMap) -> bool {
self.cookie.as_ref().map_or(true, |internal_cookie| {
headers
.get(header::AUTHORIZATION)
.and_then(|auth_header| auth_header.to_str().ok())
.and_then(|auth_header| auth_header.split_whitespace().nth(1))
.and_then(|encoded| URL_SAFE.decode(encoded).ok())
.and_then(|decoded| String::from_utf8(decoded).ok())
.and_then(|request_cookie| request_cookie.split(':').nth(1).map(String::from))
.map_or(false, |passwd| internal_cookie.authenticate(passwd))
})
/// Remove any "jsonrpc: 1.0" fields in `data`, and return the resulting string.
pub fn remove_json_1_fields(data: String) -> String {
// Replace "jsonrpc = 1.0":
// - at the start or middle of a list, and
// - at the end of a list;
// with no spaces (lightwalletd format), and spaces after separators (example format).
//
// TODO: if we see errors from lightwalletd, make this replacement more accurate:
// - use a partial JSON fragment parser
// - combine the whole request into a single buffer, and use a JSON parser
// - use a regular expression
//
// We could also just handle the exact lightwalletd format,
// by replacing `{"jsonrpc":"1.0",` with `{"jsonrpc":"2.0`.
data.replace("\"jsonrpc\":\"1.0\",", "\"jsonrpc\":\"2.0\",")
.replace("\"jsonrpc\": \"1.0\",", "\"jsonrpc\": \"2.0\",")
.replace(",\"jsonrpc\":\"1.0\"", ",\"jsonrpc\":\"2.0\"")
.replace(", \"jsonrpc\": \"1.0\"", ", \"jsonrpc\": \"2.0\"")
}
}
/// Implement the Layer for HttpRequestMiddleware to allow injecting the cookie
#[derive(Clone)]
pub struct HttpRequestMiddlewareLayer {
cookie: Option<Cookie>,
}
impl HttpRequestMiddlewareLayer {
/// Create a new `HttpRequestMiddlewareLayer` with the given cookie.
pub fn new(cookie: Option<Cookie>) -> Self {
Self { cookie }
}
}
impl<S> tower::Layer<S> for HttpRequestMiddlewareLayer {
type Service = HttpRequestMiddleware<S>;
fn layer(&self, service: S) -> Self::Service {
HttpRequestMiddleware::new(service, self.cookie.clone())
}
}
/// A trait for updating an object, consuming it and returning the updated version.
pub trait With<T> {
/// Updates `self` with an instance of type `T` and returns the updated version of `self`.
fn with(self, _: T) -> Self;
}
impl<S> With<Cookie> for HttpRequestMiddleware<S> {
fn with(mut self, cookie: Cookie) -> Self {
self.cookie = Some(cookie);
self
}
}
impl<S> Service<HttpRequest<HttpBody>> for HttpRequestMiddleware<S>
where
S: Service<HttpRequest, Response = HttpResponse> + std::clone::Clone + Send + 'static,
S::Error: Into<BoxError> + 'static,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx).map_err(Into::into)
}
fn call(&mut self, mut request: HttpRequest<HttpBody>) -> Self::Future {
// Check if the request is authenticated
if !self.check_credentials(request.headers_mut()) {
let error = ErrorObject::borrowed(401, "unauthenticated method", None);
// TODO: Error object is not being returned to the user but an empty response.
return future::err(BoxError::from(error)).boxed();
}
// Fix the request headers.
Self::insert_or_replace_content_type_header(request.headers_mut());
let mut service = self.service.clone();
let (parts, body) = request.into_parts();
async move {
let bytes = body
.collect()
.await
.expect("Failed to collect body data")
.to_bytes();
let data = String::from_utf8_lossy(bytes.as_ref()).to_string();
// Fix JSON-RPC 1.0 requests.
let data = Self::remove_json_1_fields(data);
let body = HttpBody::from(Bytes::from(data).as_ref().to_vec());
let request = HttpRequest::from_parts(parts, body);
service.call(request).await.map_err(Into::into)
}
.boxed()
}
}

View File

@ -3,116 +3,66 @@
//! These fixes are applied at the JSON-RPC call level,
//! after the RPC request is parsed and split into calls.
use std::future::Future;
use futures::future::{Either, FutureExt};
use jsonrpc_core::{
middleware::Middleware,
types::{Call, Failure, Output, Response},
BoxFuture, Metadata, MethodCall, Notification,
use jsonrpsee::{
server::middleware::rpc::{layer::ResponseFuture, RpcService, RpcServiceT},
MethodResponse,
};
use jsonrpsee_types::ErrorObject;
use crate::server;
/// JSON-RPC [`Middleware`] with compatibility workarounds.
/// JSON-RPC [`FixRpcResponseMiddleware`] with compatibility workarounds.
///
/// This middleware makes the following changes to JSON-RPC calls:
///
/// ## Make RPC framework response codes match `zcashd`
///
/// [`jsonrpc_core`] returns specific error codes while parsing requests:
/// <https://docs.rs/jsonrpc-core/18.0.0/jsonrpc_core/types/error/enum.ErrorCode.html#variants>
/// [`jsonrpsee_types`] returns specific error codes while parsing requests:
/// <https://docs.rs/jsonrpsee-types/latest/jsonrpsee_types/error/enum.ErrorCode.html>
///
/// But these codes are different from `zcashd`, and some RPC clients rely on the exact code.
///
/// ## Read-Only Functionality
///
/// This middleware also logs unrecognized RPC requests.
pub struct FixRpcResponseMiddleware;
impl<M: Metadata> Middleware<M> for FixRpcResponseMiddleware {
type Future = BoxFuture<Option<Response>>;
type CallFuture = BoxFuture<Option<Output>>;
fn on_call<Next, NextFuture>(
&self,
call: Call,
meta: M,
next: Next,
) -> Either<Self::CallFuture, NextFuture>
where
Next: Fn(Call, M) -> NextFuture + Send + Sync,
NextFuture: Future<Output = Option<Output>> + Send + 'static,
{
Either::Left(
next(call.clone(), meta)
.map(|mut output| {
Self::fix_error_codes(&mut output);
output
})
.inspect(|output| Self::log_if_error(output, call))
.boxed(),
)
}
/// Specifically, the [`jsonrpsee_types::error::INVALID_PARAMS_CODE`] is different:
/// <https://docs.rs/jsonrpsee-types/latest/jsonrpsee_types/error/constant.INVALID_PARAMS_CODE.html>
pub struct FixRpcResponseMiddleware {
service: RpcService,
}
impl FixRpcResponseMiddleware {
/// Replaces [`jsonrpc_core::ErrorCode`]s in the [`Output`] with their `zcashd` equivalents.
///
/// ## Replaced Codes
///
/// 1. [`jsonrpc_core::ErrorCode::InvalidParams`] -> [`server::error::LegacyCode::Misc`]
/// Rationale:
/// The `node-stratum-pool` mining pool library expects error code `-1` to detect available RPC methods:
/// <https://github.com/s-nomp/node-stratum-pool/blob/d86ae73f8ff968d9355bb61aac05e0ebef36ccb5/lib/pool.js#L459>
fn fix_error_codes(output: &mut Option<Output>) {
if let Some(Output::Failure(Failure { ref mut error, .. })) = output {
if matches!(error.code, jsonrpc_core::ErrorCode::InvalidParams) {
let original_code = error.code.clone();
error.code = server::error::LegacyCode::Misc.into();
tracing::debug!("Replacing RPC error: {original_code:?} with {error}");
}
}
}
/// Obtain a description string for a received request.
///
/// Prints out only the method name and the received parameters.
fn call_description(call: &Call) -> String {
const MAX_PARAMS_LOG_LENGTH: usize = 100;
match call {
Call::MethodCall(MethodCall { method, params, .. }) => {
let mut params = format!("{params:?}");
if params.len() >= MAX_PARAMS_LOG_LENGTH {
params.truncate(MAX_PARAMS_LOG_LENGTH);
params.push_str("...");
}
format!(r#"method = {method:?}, params = {params}"#)
}
Call::Notification(Notification { method, params, .. }) => {
let mut params = format!("{params:?}");
if params.len() >= MAX_PARAMS_LOG_LENGTH {
params.truncate(MAX_PARAMS_LOG_LENGTH);
params.push_str("...");
}
format!(r#"notification = {method:?}, params = {params}"#)
}
Call::Invalid { .. } => "invalid request".to_owned(),
}
}
/// Check RPC output and log any errors.
//
// TODO: do we want to ignore ErrorCode::ServerError(_), or log it at debug?
fn log_if_error(output: &Option<Output>, call: Call) {
if let Some(Output::Failure(Failure { error, .. })) = output {
let call_description = Self::call_description(&call);
tracing::info!("RPC error: {error} in call: {call_description}");
}
/// Create a new `FixRpcResponseMiddleware` with the given `service`.
pub fn new(service: RpcService) -> Self {
Self { service }
}
}
impl<'a> RpcServiceT<'a> for FixRpcResponseMiddleware {
type Future = ResponseFuture<futures::future::BoxFuture<'a, jsonrpsee::MethodResponse>>;
fn call(&self, request: jsonrpsee::types::Request<'a>) -> Self::Future {
let service = self.service.clone();
ResponseFuture::future(Box::pin(async move {
let response = service.call(request).await;
if response.is_error() {
let original_error_code = response
.as_error_code()
.expect("response should have an error code");
if original_error_code == jsonrpsee_types::ErrorCode::InvalidParams.code() {
let new_error_code = crate::server::error::LegacyCode::Misc.into();
tracing::debug!(
"Replacing RPC error: {original_error_code} with {new_error_code}"
);
let json: serde_json::Value =
serde_json::from_str(response.into_parts().0.as_str())
.expect("response string should be valid json");
let id = json["id"]
.as_str()
.expect("response json should have an id")
.to_string();
return MethodResponse::error(
jsonrpsee_types::Id::Str(id.into()),
ErrorObject::borrowed(new_error_code, "Invalid params", None),
);
}
}
response
}))
}
}

View File

@ -3,12 +3,8 @@
// These tests call functions which can take unit arguments if some features aren't enabled.
#![allow(clippy::unit_arg)]
use std::{
net::{Ipv4Addr, SocketAddrV4},
time::Duration,
};
use std::net::{Ipv4Addr, SocketAddrV4};
use futures::FutureExt;
use tower::buffer::Buffer;
use zebra_chain::{
@ -21,111 +17,71 @@ use zebra_test::mock_service::MockService;
use super::super::*;
/// Test that the JSON-RPC server spawns when configured with a single thread.
#[test]
fn rpc_server_spawn_single_thread() {
rpc_server_spawn(false)
}
/// Test that the JSON-RPC server spawns when configured with multiple threads.
#[test]
#[cfg(not(target_os = "windows"))]
fn rpc_server_spawn_parallel_threads() {
rpc_server_spawn(true)
/// Test that the JSON-RPC server spawns.
#[tokio::test]
async fn rpc_server_spawn_test() {
rpc_server_spawn().await
}
/// Test if the RPC server will spawn on a randomly generated port.
///
/// Set `parallel_cpu_threads` to true to auto-configure based on the number of CPU cores.
#[tracing::instrument]
fn rpc_server_spawn(parallel_cpu_threads: bool) {
async fn rpc_server_spawn() {
let _init_guard = zebra_test::init();
let config = Config {
listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0).into()),
indexer_listen_addr: None,
parallel_cpu_threads: if parallel_cpu_threads { 2 } else { 1 },
parallel_cpu_threads: 0,
debug_force_finished_sync: false,
cookie_dir: Default::default(),
enable_cookie_auth: false,
};
let rt = tokio::runtime::Runtime::new().unwrap();
let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let mut block_verifier_router: MockService<_, _, _, BoxError> =
MockService::build().for_unit_tests();
rt.block_on(async {
let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let mut block_verifier_router: MockService<_, _, _, BoxError> =
MockService::build().for_unit_tests();
info!("spawning RPC server...");
info!("spawning RPC server...");
let _rpc_server_task_handle = RpcServer::spawn(
config,
Default::default(),
"RPC server test",
"RPC server test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Buffer::new(block_verifier_router.clone(), 1),
MockSyncStatus::default(),
MockAddressBookPeers::default(),
NoChainTip,
Mainnet,
);
let (rpc_server_task_handle, rpc_tx_queue_task_handle, _rpc_server) = RpcServer::spawn(
config,
Default::default(),
"RPC server test",
"RPC server test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Buffer::new(block_verifier_router.clone(), 1),
MockSyncStatus::default(),
MockAddressBookPeers::default(),
NoChainTip,
Mainnet,
);
info!("spawned RPC server, checking services...");
info!("spawned RPC server, checking services...");
mempool.expect_no_requests().await;
state.expect_no_requests().await;
block_verifier_router.expect_no_requests().await;
// The server and queue tasks should continue without errors or panics
let rpc_server_task_result = rpc_server_task_handle.now_or_never();
assert!(rpc_server_task_result.is_none());
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
assert!(rpc_tx_queue_task_result.is_none());
});
info!("waiting for RPC server to shut down...");
rt.shutdown_timeout(Duration::from_secs(1));
mempool.expect_no_requests().await;
state.expect_no_requests().await;
block_verifier_router.expect_no_requests().await;
}
/// Test that the JSON-RPC server spawns when configured with a single thread,
/// on an OS-assigned unallocated port.
#[test]
fn rpc_server_spawn_unallocated_port_single_thread() {
rpc_server_spawn_unallocated_port(false, false)
/// Test that the JSON-RPC server spawns on an OS-assigned unallocated port.
#[tokio::test]
async fn rpc_server_spawn_unallocated_port() {
rpc_spawn_unallocated_port(false).await
}
/// Test that the JSON-RPC server spawns and shuts down when configured with a single thread,
/// on an OS-assigned unallocated port.
#[test]
fn rpc_server_spawn_unallocated_port_single_thread_shutdown() {
rpc_server_spawn_unallocated_port(false, true)
}
/// Test that the JSON-RPC server spawns when configured with multiple threads,
/// on an OS-assigned unallocated port.
#[test]
fn rpc_sever_spawn_unallocated_port_parallel_threads() {
rpc_server_spawn_unallocated_port(true, false)
}
/// Test that the JSON-RPC server spawns and shuts down when configured with multiple threads,
/// on an OS-assigned unallocated port.
#[test]
fn rpc_sever_spawn_unallocated_port_parallel_threads_shutdown() {
rpc_server_spawn_unallocated_port(true, true)
/// Test that the JSON-RPC server spawns and shuts down on an OS-assigned unallocated port.
#[tokio::test]
async fn rpc_server_spawn_unallocated_port_shutdown() {
rpc_spawn_unallocated_port(true).await
}
/// Test if the RPC server will spawn on an OS-assigned unallocated port.
///
/// Set `parallel_cpu_threads` to true to auto-configure based on the number of CPU cores,
/// and `do_shutdown` to true to close the server using the close handle.
/// Set `do_shutdown` to true to close the server using the close handle.
#[tracing::instrument]
fn rpc_server_spawn_unallocated_port(parallel_cpu_threads: bool, do_shutdown: bool) {
async fn rpc_spawn_unallocated_port(do_shutdown: bool) {
let _init_guard = zebra_test::init();
let port = zebra_test::net::random_unallocated_port();
@ -134,300 +90,111 @@ fn rpc_server_spawn_unallocated_port(parallel_cpu_threads: bool, do_shutdown: bo
let config = Config {
listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()),
indexer_listen_addr: None,
parallel_cpu_threads: if parallel_cpu_threads { 0 } else { 1 },
parallel_cpu_threads: 0,
debug_force_finished_sync: false,
cookie_dir: Default::default(),
enable_cookie_auth: false,
};
let rt = tokio::runtime::Runtime::new().unwrap();
let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let mut block_verifier_router: MockService<_, _, _, BoxError> =
MockService::build().for_unit_tests();
rt.block_on(async {
let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let mut block_verifier_router: MockService<_, _, _, BoxError> =
MockService::build().for_unit_tests();
info!("spawning RPC server...");
info!("spawning RPC server...");
let rpc_server_task_handle = RpcServer::spawn(
config,
Default::default(),
"RPC server test",
"RPC server test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Buffer::new(block_verifier_router.clone(), 1),
MockSyncStatus::default(),
MockAddressBookPeers::default(),
NoChainTip,
Mainnet,
)
.await
.expect("");
let (rpc_server_task_handle, rpc_tx_queue_task_handle, rpc_server) = RpcServer::spawn(
config,
Default::default(),
"RPC server test",
"RPC server test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Buffer::new(block_verifier_router.clone(), 1),
MockSyncStatus::default(),
MockAddressBookPeers::default(),
NoChainTip,
Mainnet,
);
info!("spawned RPC server, checking services...");
info!("spawned RPC server, checking services...");
mempool.expect_no_requests().await;
state.expect_no_requests().await;
block_verifier_router.expect_no_requests().await;
mempool.expect_no_requests().await;
state.expect_no_requests().await;
block_verifier_router.expect_no_requests().await;
if do_shutdown {
rpc_server
.expect("unexpected missing RpcServer for configured RPC port")
.shutdown()
.await
.expect("unexpected panic during RpcServer shutdown");
// The server and queue tasks should shut down without errors or panics
let rpc_server_task_result = rpc_server_task_handle.await;
assert!(
matches!(rpc_server_task_result, Ok(())),
"unexpected server task panic during shutdown: {rpc_server_task_result:?}"
);
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.await;
assert!(
matches!(rpc_tx_queue_task_result, Ok(())),
"unexpected queue task panic during shutdown: {rpc_tx_queue_task_result:?}"
);
} else {
// The server and queue tasks should continue without errors or panics
let rpc_server_task_result = rpc_server_task_handle.now_or_never();
assert!(rpc_server_task_result.is_none());
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
assert!(rpc_tx_queue_task_result.is_none());
}
});
info!("waiting for RPC server to shut down...");
rt.shutdown_timeout(Duration::from_secs(1));
if do_shutdown {
rpc_server_task_handle.0.abort();
}
}
/// Test if the RPC server will panic correctly when there is a port conflict.
///
/// This test is sometimes unreliable on Windows, and hangs on macOS.
/// We believe this is a CI infrastructure issue, not a platform-specific issue.
#[test]
#[tokio::test]
#[should_panic(expected = "Unable to start RPC server")]
#[cfg(not(any(target_os = "windows", target_os = "macos")))]
fn rpc_server_spawn_port_conflict() {
async fn rpc_server_spawn_port_conflict() {
use std::time::Duration;
let _init_guard = zebra_test::init();
let port = zebra_test::net::random_known_port();
let config = Config {
listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()),
indexer_listen_addr: None,
parallel_cpu_threads: 1,
debug_force_finished_sync: false,
parallel_cpu_threads: 0,
cookie_dir: Default::default(),
enable_cookie_auth: false,
};
let rt = tokio::runtime::Runtime::new().unwrap();
let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let mut block_verifier_router: MockService<_, _, _, BoxError> =
MockService::build().for_unit_tests();
let test_task_handle = rt.spawn(async {
let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let mut block_verifier_router: MockService<_, _, _, BoxError> =
MockService::build().for_unit_tests();
info!("spawning RPC server 1...");
info!("spawning RPC server 1...");
let _rpc_server_1_task_handle = RpcServer::spawn(
config.clone(),
Default::default(),
"RPC server 1 test",
"RPC server 1 test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Buffer::new(block_verifier_router.clone(), 1),
MockSyncStatus::default(),
MockAddressBookPeers::default(),
NoChainTip,
Mainnet,
)
.await;
let (_rpc_server_1_task_handle, _rpc_tx_queue_1_task_handle, _rpc_server) =
RpcServer::spawn(
config.clone(),
Default::default(),
"RPC server 1 test",
"RPC server 1 test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Buffer::new(block_verifier_router.clone(), 1),
MockSyncStatus::default(),
MockAddressBookPeers::default(),
NoChainTip,
Mainnet,
);
tokio::time::sleep(Duration::from_secs(3)).await;
tokio::time::sleep(Duration::from_secs(3)).await;
info!("spawning conflicted RPC server 2...");
info!("spawning conflicted RPC server 2...");
let _rpc_server_2_task_handle = RpcServer::spawn(
config,
Default::default(),
"RPC server 2 conflict test",
"RPC server 2 conflict test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Buffer::new(block_verifier_router.clone(), 1),
MockSyncStatus::default(),
MockAddressBookPeers::default(),
NoChainTip,
Mainnet,
)
.await;
let (rpc_server_2_task_handle, _rpc_tx_queue_2_task_handle, _rpc_server) = RpcServer::spawn(
config,
Default::default(),
"RPC server 2 conflict test",
"RPC server 2 conflict test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Buffer::new(block_verifier_router.clone(), 1),
MockSyncStatus::default(),
MockAddressBookPeers::default(),
NoChainTip,
Mainnet,
);
info!("spawned RPC servers, checking services...");
info!("spawned RPC servers, checking services...");
mempool.expect_no_requests().await;
state.expect_no_requests().await;
block_verifier_router.expect_no_requests().await;
// Because there is a panic inside a multi-threaded executor,
// we can't depend on the exact behaviour of the other tasks,
// particularly across different machines and OSes.
// The second server should panic, so its task handle should return the panic
let rpc_server_2_task_result = rpc_server_2_task_handle.await;
match rpc_server_2_task_result {
Ok(()) => panic!(
"RPC server with conflicting port should exit with an error: \
unexpected Ok result"
),
Err(join_error) => match join_error.try_into_panic() {
Ok(panic_object) => panic::resume_unwind(panic_object),
Err(cancelled_error) => panic!(
"RPC server with conflicting port should exit with an error: \
unexpected JoinError: {cancelled_error:?}"
),
},
}
// Ignore the queue task result
});
// Wait until the spawned task finishes
std::thread::sleep(Duration::from_secs(10));
info!("waiting for RPC server to shut down...");
rt.shutdown_timeout(Duration::from_secs(3));
match test_task_handle.now_or_never() {
Some(Ok(_never)) => unreachable!("test task always panics"),
None => panic!("unexpected test task hang"),
Some(Err(join_error)) => match join_error.try_into_panic() {
Ok(panic_object) => panic::resume_unwind(panic_object),
Err(cancelled_error) => panic!(
"test task should exit with a RPC server panic: \
unexpected non-panic JoinError: {cancelled_error:?}"
),
},
}
}
/// Check if the RPC server detects a port conflict when running parallel threads.
///
/// If this test fails, that's great!
/// We can make parallel the default, and remove the warnings in the config docs.
///
/// This test is sometimes unreliable on Windows, and hangs on macOS.
/// We believe this is a CI infrastructure issue, not a platform-specific issue.
#[test]
#[cfg(not(any(target_os = "windows", target_os = "macos")))]
fn rpc_server_spawn_port_conflict_parallel_auto() {
let _init_guard = zebra_test::init();
let port = zebra_test::net::random_known_port();
let config = Config {
listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()),
indexer_listen_addr: None,
parallel_cpu_threads: 2,
debug_force_finished_sync: false,
cookie_dir: Default::default(),
enable_cookie_auth: false,
};
let rt = tokio::runtime::Runtime::new().unwrap();
let test_task_handle = rt.spawn(async {
let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let mut block_verifier_router: MockService<_, _, _, BoxError> =
MockService::build().for_unit_tests();
info!("spawning parallel RPC server 1...");
let (_rpc_server_1_task_handle, _rpc_tx_queue_1_task_handle, _rpc_server) =
RpcServer::spawn(
config.clone(),
Default::default(),
"RPC server 1 test",
"RPC server 1 test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Buffer::new(block_verifier_router.clone(), 1),
MockSyncStatus::default(),
MockAddressBookPeers::default(),
NoChainTip,
Mainnet,
);
tokio::time::sleep(Duration::from_secs(3)).await;
info!("spawning parallel conflicted RPC server 2...");
let (rpc_server_2_task_handle, _rpc_tx_queue_2_task_handle, _rpc_server) = RpcServer::spawn(
config,
Default::default(),
"RPC server 2 conflict test",
"RPC server 2 conflict test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
Buffer::new(block_verifier_router.clone(), 1),
MockSyncStatus::default(),
MockAddressBookPeers::default(),
NoChainTip,
Mainnet,
);
info!("spawned RPC servers, checking services...");
mempool.expect_no_requests().await;
state.expect_no_requests().await;
block_verifier_router.expect_no_requests().await;
// Because there might be a panic inside a multi-threaded executor,
// we can't depend on the exact behaviour of the other tasks,
// particularly across different machines and OSes.
// The second server doesn't panic, but we'd like it to.
// (See the function docs for details.)
let rpc_server_2_task_result = rpc_server_2_task_handle.await;
match rpc_server_2_task_result {
Ok(()) => info!(
"Parallel RPC server with conflicting port should exit with an error: \
but we're ok with it ignoring the conflict for now"
),
Err(join_error) => match join_error.try_into_panic() {
Ok(panic_object) => panic::resume_unwind(panic_object),
Err(cancelled_error) => info!(
"Parallel RPC server with conflicting port should exit with an error: \
but we're ok with it ignoring the conflict for now: \
unexpected JoinError: {cancelled_error:?}"
),
},
}
// Ignore the queue task result
});
// Wait until the spawned task finishes
std::thread::sleep(Duration::from_secs(10));
info!("waiting for parallel RPC server to shut down...");
rt.shutdown_timeout(Duration::from_secs(3));
match test_task_handle.now_or_never() {
Some(Ok(())) => {
info!("parallel RPC server task successfully exited");
}
None => panic!("unexpected test task hang"),
Some(Err(join_error)) => match join_error.try_into_panic() {
Ok(panic_object) => panic::resume_unwind(panic_object),
Err(cancelled_error) => info!(
"Parallel RPC server with conflicting port should exit with an error: \
but we're ok with it ignoring the conflict for now: \
unexpected JoinError: {cancelled_error:?}"
),
},
}
mempool.expect_no_requests().await;
state.expect_no_requests().await;
block_verifier_router.expect_no_requests().await;
}

View File

@ -382,9 +382,10 @@ impl SyncerRpcMethods for RpcRequestClient {
}
Err(err)
if err
.downcast_ref::<jsonrpc_core::Error>()
.downcast_ref::<jsonrpsee_types::ErrorCode>()
.is_some_and(|err| {
err.code == server::error::LegacyCode::InvalidParameter.into()
let code: i32 = server::error::LegacyCode::InvalidParameter.into();
err.code() == code
}) =>
{
Ok(None)

View File

@ -254,7 +254,7 @@ tonic-build = { version = "0.12.3", optional = true }
abscissa_core = { version = "0.7.0", features = ["testing"] }
hex = "0.4.3"
hex-literal = "0.4.1"
jsonrpc-core = "18.0.0"
jsonrpsee-types = "0.24.7"
once_cell = "1.20.2"
regex = "1.11.0"
insta = { version = "1.41.1", features = ["json"] }

View File

@ -243,20 +243,31 @@ impl StartCmd {
}
// Launch RPC server
info!("spawning RPC server");
let (rpc_task_handle, rpc_tx_queue_task_handle, rpc_server) = RpcServer::spawn(
config.rpc.clone(),
config.mining.clone(),
build_version(),
user_agent(),
mempool.clone(),
read_only_state_service.clone(),
block_verifier_router.clone(),
sync_status.clone(),
address_book.clone(),
latest_chain_tip.clone(),
config.network.network.clone(),
);
let (rpc_task_handle, mut rpc_tx_queue_task_handle) =
if let Some(listen_addr) = config.rpc.listen_addr {
info!("spawning RPC server");
info!("Trying to open RPC endpoint at {}...", listen_addr,);
let rpc_task_handle = RpcServer::spawn(
config.rpc.clone(),
config.mining.clone(),
build_version(),
user_agent(),
mempool.clone(),
read_only_state_service.clone(),
block_verifier_router.clone(),
sync_status.clone(),
address_book.clone(),
latest_chain_tip.clone(),
config.network.network.clone(),
);
rpc_task_handle.await.unwrap()
} else {
warn!("configure an listen_addr to start the RPC server");
(
tokio::spawn(std::future::pending().in_current_span()),
tokio::spawn(std::future::pending().in_current_span()),
)
};
// TODO: Add a shutdown signal and start the server with `serve_with_incoming_shutdown()` if
// any related unit tests sometimes crash with memory errors
@ -399,7 +410,6 @@ impl StartCmd {
// ongoing tasks
pin!(rpc_task_handle);
pin!(indexer_rpc_task_handle);
pin!(rpc_tx_queue_task_handle);
pin!(syncer_task_handle);
pin!(block_gossip_task_handle);
pin!(mempool_crawler_task_handle);
@ -425,17 +435,10 @@ impl StartCmd {
let mut exit_when_task_finishes = true;
let result = select! {
rpc_result = &mut rpc_task_handle => {
rpc_result
rpc_join_result = &mut rpc_task_handle => {
let rpc_server_result = rpc_join_result
.expect("unexpected panic in the rpc task");
info!("rpc task exited");
Ok(())
}
indexer_rpc_join_result = &mut indexer_rpc_task_handle => {
let indexer_rpc_server_result = indexer_rpc_join_result
.expect("unexpected panic in the rpc task");
info!(?indexer_rpc_server_result, "indexer rpc task exited");
info!(?rpc_server_result, "rpc task exited");
Ok(())
}
@ -446,6 +449,13 @@ impl StartCmd {
Ok(())
}
indexer_rpc_join_result = &mut indexer_rpc_task_handle => {
let indexer_rpc_server_result = indexer_rpc_join_result
.expect("unexpected panic in the indexer task");
info!(?indexer_rpc_server_result, "indexer rpc task exited");
Ok(())
}
sync_result = &mut syncer_task_handle => sync_result
.expect("unexpected panic in the syncer task")
.map(|_| info!("syncer task exited")),
@ -536,15 +546,6 @@ impl StartCmd {
state_checkpoint_verify_handle.abort();
old_databases_task_handle.abort();
// Wait until the RPC server shuts down.
// This can take around 150 seconds.
//
// Without this shutdown, Zebra's RPC unit tests sometimes crashed with memory errors.
if let Some(rpc_server) = rpc_server {
info!("waiting for RPC server to shut down");
rpc_server.shutdown_blocking();
}
info!("exiting Zebra: all tasks have been asked to stop, waiting for remaining tasks to finish");
exit_status

View File

@ -35,7 +35,7 @@ use zebra_rpc::{
GetBlockTemplateCapability::*, GetBlockTemplateRequestMode::*,
},
hex_data::HexData,
GetBlockTemplateRpc, GetBlockTemplateRpcImpl,
GetBlockTemplateRpcImpl, GetBlockTemplateRpcServer,
},
};
use zebra_state::WatchReceiver;

View File

@ -3270,7 +3270,7 @@ async fn nu6_funding_streams_and_coinbase_balance() -> Result<()> {
types::submit_block,
},
hex_data::HexData,
GetBlockTemplateRpc, GetBlockTemplateRpcImpl,
GetBlockTemplateRpcImpl, GetBlockTemplateRpcServer,
};
use zebra_test::mock_service::MockService;
let _init_guard = zebra_test::init();

View File

@ -161,9 +161,10 @@ impl MiningRpcMethods for RpcRequestClient {
}
Err(err)
if err
.downcast_ref::<jsonrpc_core::Error>()
.downcast_ref::<jsonrpsee_types::ErrorObject>()
.is_some_and(|err| {
err.code == server::error::LegacyCode::InvalidParameter.into()
let error: i32 = server::error::LegacyCode::InvalidParameter.into();
err.code() == error
}) =>
{
Ok(None)

View File

@ -4,6 +4,7 @@ expression: parsed
---
{
"jsonrpc": "2.0",
"id": 123,
"result": {
"pool": "orchard",
"start_index": 0,
@ -13,6 +14,5 @@ expression: parsed
"end_height": 1707429
}
]
},
"id": 123
}
}

View File

@ -4,6 +4,7 @@ expression: parsed
---
{
"jsonrpc": "2.0",
"id": 123,
"result": {
"pool": "orchard",
"start_index": 338,
@ -13,6 +14,5 @@ expression: parsed
"end_height": 1888929
}
]
},
"id": 123
}
}

View File

@ -4,6 +4,7 @@ expression: parsed
---
{
"jsonrpc": "2.0",
"id": 123,
"result": {
"pool": "orchard",
"start_index": 585,
@ -13,6 +14,5 @@ expression: parsed
"end_height": 2000126
}
]
},
"id": 123
}
}

View File

@ -4,6 +4,7 @@ expression: parsed
---
{
"jsonrpc": "2.0",
"id": 123,
"result": {
"pool": "sapling",
"start_index": 0,
@ -13,6 +14,5 @@ expression: parsed
"end_height": 558822
}
]
},
"id": 123
}
}

View File

@ -4,6 +4,7 @@ expression: parsed
---
{
"jsonrpc": "2.0",
"id": 123,
"result": {
"pool": "sapling",
"start_index": 0,
@ -53,6 +54,5 @@ expression: parsed
"end_height": 1363036
}
]
},
"id": 123
}
}

View File

@ -4,6 +4,7 @@ expression: parsed
---
{
"jsonrpc": "2.0",
"id": 123,
"result": {
"pool": "sapling",
"start_index": 1090,
@ -33,6 +34,5 @@ expression: parsed
"end_height": 2056616
}
]
},
"id": 123
}
}

View File

@ -4,6 +4,7 @@ expression: parsed
---
{
"jsonrpc": "2.0",
"id": 123,
"result": {
"pool": "sapling",
"start_index": 17,
@ -13,6 +14,5 @@ expression: parsed
"end_height": 1703171
}
]
},
"id": 123
}
}