T1. Fix isolated connection bugs, improve tests, upgrade dependencies (#3302)

* Make handshakes generic over AsyncRead + AsyncWrite

* Simplify connect_isolated using ServiceExt::map_err and BoxError

* Move isolated network tests to their own module

* Improve isolated TCP connection tests

* Add an in-memory connection test that uses AsyncReadWrite

* Support connect_isolated on testnet

* Add a wrapper function for isolated TCP connections to an IP address

* Run test tasks for a while, and clean up after them

* Upgrade Zebra dependencies to be compatible with arti, but don't add arti yet

* Fix deny.toml

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: Conrado Gouvea <conrado@zfnd.org>
This commit is contained in:
teor 2022-01-15 05:34:59 +10:00 committed by GitHub
parent ee9f081dd5
commit 6c787dd188
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 658 additions and 375 deletions

332
Cargo.lock generated
View File

@ -38,9 +38,9 @@ checksum = "74f5722bc48763cb9d81d8427ca05b6aa2842f6632cf8e4c0a29eef9baececcc"
dependencies = [
"darling",
"ident_case",
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.60",
"proc-macro2 1.0.34",
"quote 1.0.10",
"syn 1.0.83",
"synstructure",
]
@ -86,7 +86,7 @@ version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43bb833f0bf979d8475d38fbf09ed3b8a55e1885fe93ad3f93239fc6a4f17b98"
dependencies = [
"getrandom 0.2.0",
"getrandom 0.2.3",
"once_cell",
"version_check 0.9.2",
]
@ -167,9 +167,9 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3548b8efc9f8e8a5a0a2808c5bd8451a9031b9e5b879a79590304ae928b0a70"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.60",
"proc-macro2 1.0.34",
"quote 1.0.10",
"syn 1.0.83",
]
[[package]]
@ -291,8 +291,8 @@ dependencies = [
"lazy_static",
"lazycell",
"peeking_take_while",
"proc-macro2 1.0.24",
"quote 1.0.7",
"proc-macro2 1.0.34",
"quote 1.0.10",
"regex",
"rustc-hash",
"shlex 0.1.1",
@ -313,8 +313,8 @@ dependencies = [
"lazycell",
"log",
"peeking_take_while",
"proc-macro2 1.0.24",
"quote 1.0.7",
"proc-macro2 1.0.34",
"quote 1.0.10",
"regex",
"rustc-hash",
"shlex 1.0.0",
@ -467,12 +467,6 @@ version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38"
[[package]]
name = "bytes"
version = "1.1.0"
@ -871,7 +865,7 @@ checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1"
dependencies = [
"bstr",
"csv-core",
"itoa 0.4.6",
"itoa 0.4.8",
"ryu",
"serde",
]
@ -917,10 +911,10 @@ checksum = "f0c960ae2da4de88a91b2d920c2a7233b400bc33cb28453a2987822d8392519b"
dependencies = [
"fnv",
"ident_case",
"proc-macro2 1.0.24",
"quote 1.0.7",
"proc-macro2 1.0.34",
"quote 1.0.10",
"strsim 0.9.3",
"syn 1.0.60",
"syn 1.0.83",
]
[[package]]
@ -930,8 +924,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b5a2f4ac4969822c62224815d069952656cadc7084fdca9751e6d959189b72"
dependencies = [
"darling_core",
"quote 1.0.7",
"syn 1.0.60",
"quote 1.0.10",
"syn 1.0.83",
]
[[package]]
@ -998,9 +992,9 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "278ef1934318d524612205f69df005eea30ec10edf7913e500b5a527fce55bc0"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.60",
"proc-macro2 1.0.34",
"quote 1.0.10",
"syn 1.0.83",
]
[[package]]
@ -1054,9 +1048,9 @@ version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e94aa31f7c0dc764f57896dc615ddd76fc13b0d5dca7eb6cc5e018a5a09ec06"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.60",
"proc-macro2 1.0.34",
"quote 1.0.10",
"syn 1.0.83",
]
[[package]]
@ -1223,9 +1217,9 @@ version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6dbd947adfffb0efc70599b3ddcf7b5597bb5fa9e245eb99f62b3a5f7bb8bd3c"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.60",
"proc-macro2 1.0.34",
"quote 1.0.10",
"syn 1.0.83",
]
[[package]]
@ -1303,13 +1297,13 @@ dependencies = [
[[package]]
name = "getrandom"
version = "0.2.0"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee8025cf36f917e6a52cce185b7c7177689b838b7ec138364e50cc2277a56cf4"
checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753"
dependencies = [
"cfg-if 0.1.10",
"cfg-if 1.0.0",
"libc",
"wasi 0.9.0+wasi-snapshot-preview1",
"wasi 0.10.0+wasi-snapshot-preview1",
]
[[package]]
@ -1319,9 +1313,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24b328c01a4d71d2d8173daa93562a73ab0fe85616876f02500f53d82948c504"
dependencies = [
"proc-macro-error",
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.60",
"proc-macro2 1.0.34",
"quote 1.0.10",
"syn 1.0.83",
]
[[package]]
@ -1376,9 +1370,9 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90454ce4de40b7ca6a8968b5ef367bdab48413962588d0d2b1638d60090c35d7"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.60",
"proc-macro2 1.0.34",
"quote 1.0.10",
"syn 1.0.83",
]
[[package]]
@ -1387,7 +1381,7 @@ version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f072413d126e57991455e0a922b31e4c8ba7c2ffbebf6b78b4f8521397d65cd"
dependencies = [
"bytes 1.1.0",
"bytes",
"fnv",
"futures-core",
"futures-sink",
@ -1490,13 +1484,13 @@ dependencies = [
[[package]]
name = "http"
version = "0.2.1"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28d569972648b2c512421b5f2a405ad6ac9666547189d0c5477a3f200f3e02f9"
checksum = "1323096b05d41827dadeaee54c9981958c0f94e670bc94ed80037d1a7b8b186b"
dependencies = [
"bytes 0.5.6",
"bytes",
"fnv",
"itoa 0.4.6",
"itoa 0.4.8",
]
[[package]]
@ -1505,7 +1499,7 @@ version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6"
dependencies = [
"bytes 1.1.0",
"bytes",
"http",
"pin-project-lite",
]
@ -1534,7 +1528,7 @@ version = "0.14.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7ec3e62bdc98a2f0393a5048e4c30ef659440ea6e0e572965103e72bd836f55"
dependencies = [
"bytes 1.1.0",
"bytes",
"futures-channel",
"futures-core",
"futures-util",
@ -1543,7 +1537,7 @@ dependencies = [
"http-body",
"httparse",
"httpdate",
"itoa 0.4.6",
"itoa 0.4.8",
"pin-project-lite",
"socket2 0.4.2",
"tokio",
@ -1554,17 +1548,15 @@ dependencies = [
[[package]]
name = "hyper-rustls"
version = "0.22.1"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f9f7a97316d44c0af9b0301e65010573a853a9fc97046d7331d7f6bc0fd5a64"
checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac"
dependencies = [
"futures-util",
"http",
"hyper",
"log",
"rustls",
"rustls 0.20.2",
"tokio",
"tokio-rustls",
"webpki",
]
[[package]]
@ -1660,9 +1652,9 @@ dependencies = [
[[package]]
name = "itoa"
version = "0.4.6"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc6f3ad7b9d11a0c00842ff8de1b60ee58661048eb8049ed33c73594f359d7e6"
checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
[[package]]
name = "itoa"
@ -1841,9 +1833,9 @@ checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
[[package]]
name = "memchr"
version = "2.3.4"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525"
checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a"
[[package]]
name = "memoffset"
@ -1906,10 +1898,10 @@ checksum = "caa72e4a3d157986dd2565c82ecbddcc23941513669a3766b938f6b72eb87f3f"
dependencies = [
"lazy_static",
"proc-macro-hack",
"proc-macro2 1.0.24",
"quote 1.0.7",
"proc-macro2 1.0.34",
"quote 1.0.10",
"regex",
"syn 1.0.60",
"syn 1.0.83",
]
[[package]]
@ -1959,8 +1951,8 @@ checksum = "d5f7db7a675c4b46b8842105b9371d6151e95fbbecd9b0e54dc2ea814397d2cc"
dependencies = [
"lazy_static",
"log",
"rustls",
"webpki",
"rustls 0.19.1",
"webpki 0.21.2",
"webpki-roots 0.18.0",
]
@ -1998,7 +1990,7 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43"
dependencies = [
"smallvec 1.5.0",
"smallvec 1.7.0",
]
[[package]]
@ -2060,7 +2052,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bafe4179722c2894288ee77a9f044f02811c86af699344c498b0840c698a2465"
dependencies = [
"arrayvec 0.4.12",
"itoa 0.4.6",
"itoa 0.4.8",
]
[[package]]
@ -2213,7 +2205,7 @@ dependencies = [
"instant",
"libc",
"redox_syscall 0.1.57",
"smallvec 1.5.0",
"smallvec 1.7.0",
"winapi",
]
@ -2289,9 +2281,9 @@ version = "0.4.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3be26700300be6d9d23264c73211d8190e755b6b5ca7a1b28230025511b52a5e"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.60",
"proc-macro2 1.0.34",
"quote 1.0.10",
"syn 1.0.83",
]
[[package]]
@ -2300,9 +2292,9 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c950132583b500556b1efd71d45b319029f2b71518d979fcc208e16b42426f"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.60",
"proc-macro2 1.0.34",
"quote 1.0.10",
"syn 1.0.83",
]
[[package]]
@ -2375,9 +2367,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.60",
"proc-macro2 1.0.34",
"quote 1.0.10",
"syn 1.0.83",
"version_check 0.9.2",
]
@ -2387,8 +2379,8 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"proc-macro2 1.0.34",
"quote 1.0.10",
"version_check 0.9.2",
]
@ -2409,9 +2401,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.24"
version = "1.0.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e0704ee1a7e00d7bb417d0770ea303c1bccbabf0ef1667dae92b5967f5f8a71"
checksum = "2f84e92c0f7c9d58328b85a78557813e4bd845130db68d7184635344399423b1"
dependencies = [
"unicode-xid 0.2.1",
]
@ -2496,9 +2488,9 @@ version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "608c156fd8e97febc07dc9c2e2c80bf74cfc6ef26893eae3daf8bc2bc94a4b7f"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.60",
"proc-macro2 1.0.34",
"quote 1.0.10",
"syn 1.0.83",
]
[[package]]
@ -2512,11 +2504,11 @@ dependencies = [
[[package]]
name = "quote"
version = "1.0.7"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37"
checksum = "38bc8cc6a5f2e3655e0899c1b848643b2562f853f114bfec7be120678e3ace05"
dependencies = [
"proc-macro2 1.0.24",
"proc-macro2 1.0.34",
]
[[package]]
@ -2595,7 +2587,7 @@ version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7"
dependencies = [
"getrandom 0.2.0",
"getrandom 0.2.3",
]
[[package]]
@ -2712,7 +2704,7 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "528532f3d801c87aec9def2add9ca802fe569e44a544afe633765267840abe64"
dependencies = [
"getrandom 0.2.0",
"getrandom 0.2.3",
"redox_syscall 0.2.10",
]
@ -2754,12 +2746,12 @@ dependencies = [
[[package]]
name = "reqwest"
version = "0.11.6"
version = "0.11.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66d2927ca2f685faf0fc620ac4834690d29e7abb153add10f5812eef20b5e280"
checksum = "7c4e0a76dc12a116108933f6301b95e83634e0c47b0afbed6abbaa0601e99258"
dependencies = [
"base64 0.13.0",
"bytes 1.1.0",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
@ -2774,7 +2766,8 @@ dependencies = [
"mime",
"percent-encoding",
"pin-project-lite",
"rustls",
"rustls 0.20.2",
"rustls-pemfile",
"serde",
"serde_json",
"serde_urlencoded",
@ -2784,7 +2777,7 @@ dependencies = [
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"webpki-roots 0.21.1",
"webpki-roots 0.22.1",
"winreg",
]
@ -2881,8 +2874,29 @@ dependencies = [
"base64 0.13.0",
"log",
"ring",
"sct",
"webpki",
"sct 0.6.0",
"webpki 0.21.2",
]
[[package]]
name = "rustls"
version = "0.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d37e5e2290f3e040b594b1a9e04377c2c671f1a1cfd9bfdef82106ac1c113f84"
dependencies = [
"log",
"ring",
"sct 0.7.0",
"webpki 0.22.0",
]
[[package]]
name = "rustls-pemfile"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9"
dependencies = [
"base64 0.13.0",
]
[[package]]
@ -2940,6 +2954,16 @@ dependencies = [
"untrusted",
]
[[package]]
name = "sct"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "secp256k1"
version = "0.21.2"
@ -3107,9 +3131,9 @@ version = "1.0.133"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed201699328568d8d08208fdd080e3ff594e6c422e438b6705905da01005d537"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.60",
"proc-macro2 1.0.34",
"quote 1.0.10",
"syn 1.0.83",
]
[[package]]
@ -3130,7 +3154,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edfa57a7f8d9c1d260a549e7224100f6c43d43f9103e06dd8b4095a9b2b43ce9"
dependencies = [
"form_urlencoded",
"itoa 0.4.6",
"itoa 0.4.8",
"ryu",
"serde",
]
@ -3212,9 +3236,9 @@ dependencies = [
[[package]]
name = "smallvec"
version = "1.5.0"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7acad6f34eb9e8a259d3283d1e8c1d34d7415943d4895f65cc73813c7396fc85"
checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309"
[[package]]
name = "socket2"
@ -3255,9 +3279,9 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bdfb59103e43a0f99a346b57860d50f2138a7008d08acd964e9ac0fef3ae9a5"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.60",
"proc-macro2 1.0.34",
"quote 1.0.10",
"syn 1.0.83",
]
[[package]]
@ -3315,9 +3339,9 @@ checksum = "dcb5ae327f9cc13b68763b5749770cb9e048a99bd9dfdfa58d0cf05d5f64afe0"
dependencies = [
"heck",
"proc-macro-error",
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.60",
"proc-macro2 1.0.34",
"quote 1.0.10",
"syn 1.0.83",
]
[[package]]
@ -3339,12 +3363,12 @@ dependencies = [
[[package]]
name = "syn"
version = "1.0.60"
version = "1.0.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c700597eca8a5a762beb35753ef6b94df201c81cca676604f547495a0d7f0081"
checksum = "23a1dfb999630e338648c83e91c59a4e9fb7620f520c3194b6b89e276f2f1959"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"proc-macro2 1.0.34",
"quote 1.0.10",
"unicode-xid 0.2.1",
]
@ -3354,9 +3378,9 @@ version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b834f2d66f734cb897113e34aaff2f1ab4719ca946f9a7358dba8f8064148701"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.60",
"proc-macro2 1.0.34",
"quote 1.0.10",
"syn 1.0.83",
"unicode-xid 0.2.1",
]
@ -3413,9 +3437,9 @@ version = "1.0.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.60",
"proc-macro2 1.0.34",
"quote 1.0.10",
"syn 1.0.83",
]
[[package]]
@ -3469,7 +3493,7 @@ version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbbf1c778ec206785635ce8ad57fe52b3009ae9e0c9f574a728f3049d3e55838"
dependencies = [
"bytes 1.1.0",
"bytes",
"libc",
"memchr",
"mio",
@ -3489,20 +3513,20 @@ version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.60",
"proc-macro2 1.0.34",
"quote 1.0.10",
"syn 1.0.83",
]
[[package]]
name = "tokio-rustls"
version = "0.22.0"
version = "0.23.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6"
checksum = "a27d5f2b839802bd8267fa19b0530f5a08b9c08cd417976be2a65d130fe1c11b"
dependencies = [
"rustls",
"rustls 0.20.2",
"tokio",
"webpki",
"webpki 0.22.0",
]
[[package]]
@ -3524,7 +3548,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3"
dependencies = [
"async-stream",
"bytes 1.1.0",
"bytes",
"futures-core",
"tokio",
"tokio-stream",
@ -3536,7 +3560,7 @@ version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0"
dependencies = [
"bytes 1.1.0",
"bytes",
"futures-core",
"futures-sink",
"log",
@ -3648,9 +3672,9 @@ version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4f480b8f81512e825f337ad51e94c1eb5d3bbdf2b363dcd01e2b19a9ffe3f8e"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.60",
"proc-macro2 1.0.34",
"quote 1.0.10",
"syn 1.0.83",
]
[[package]]
@ -3755,7 +3779,7 @@ dependencies = [
"serde",
"serde_json",
"sharded-slab",
"smallvec 1.5.0",
"smallvec 1.7.0",
"thread_local",
"tracing",
"tracing-core",
@ -3771,9 +3795,9 @@ checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "typenum"
version = "1.12.0"
version = "1.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "373c8a200f9e67a0c95e62a4f52fbf80c23b4381c05a17845531982fa99e6b33"
checksum = "b63708a265f51345575b27fe43f9500ad611579e764c79edbc2037b1121959ec"
[[package]]
name = "uint"
@ -3978,9 +4002,9 @@ dependencies = [
"bumpalo",
"lazy_static",
"log",
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.60",
"proc-macro2 1.0.34",
"quote 1.0.10",
"syn 1.0.83",
"wasm-bindgen-shared",
]
@ -4002,7 +4026,7 @@ version = "0.2.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a6ac8995ead1f084a8dea1e65f194d0973800c7f571f6edd70adf06ecf77084"
dependencies = [
"quote 1.0.7",
"quote 1.0.10",
"wasm-bindgen-macro-support",
]
@ -4012,9 +4036,9 @@ version = "0.2.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5a48c72f299d80557c7c62e37e7225369ecc0c963964059509fbafe917c7549"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.60",
"proc-macro2 1.0.34",
"quote 1.0.10",
"syn 1.0.83",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@ -4045,22 +4069,32 @@ dependencies = [
"untrusted",
]
[[package]]
name = "webpki"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "webpki-roots"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91cd5736df7f12a964a5067a12c62fa38e1bd8080aff1f80bc29be7c80d19ab4"
dependencies = [
"webpki",
"webpki 0.21.2",
]
[[package]]
name = "webpki-roots"
version = "0.21.1"
version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940"
checksum = "c475786c6f47219345717a043a37ec04cb4bc185e28853adcc4fa0a947eba630"
dependencies = [
"webpki",
"webpki 0.22.0",
]
[[package]]
@ -4346,7 +4380,7 @@ version = "1.0.0-beta.3"
dependencies = [
"bitflags",
"byteorder",
"bytes 1.1.0",
"bytes",
"chrono",
"futures",
"hex",
@ -4522,8 +4556,8 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3f369ddb18862aba61aa49bf31e74d29f0f162dec753063200e1dc084345d16"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.60",
"proc-macro2 1.0.34",
"quote 1.0.10",
"syn 1.0.83",
"synstructure",
]

View File

@ -52,9 +52,6 @@ skip-tree = [
# ticket #2998: hdrhistogram dependencies
{ name = "hdrhistogram", version = "=6.3.4" },
# ticket #2999: http dependencies
{ name = "bytes", version = "=0.5.6" },
# ticket #3061: reqwest and minreq dependencies
{ name = "webpki-roots", version = "=0.18.0" },
@ -70,6 +67,7 @@ skip-tree = [
# wait for lots of crates in the cryptographic ecosystem to upgrade
{ name = "rand", version = "=0.7.3" },
{ name = "rustls", version = "=0.19.1" },
# wait for lots of crates in the tokio ecosystem to upgrade
{ name = "socket2", version = "=0.3.16" },

View File

@ -1,19 +1,15 @@
//! Code for creating isolated connections to specific peers.
//! Creating isolated connections to specific peers.
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use std::{future::Future, net::SocketAddr};
use futures::future::{FutureExt, TryFutureExt};
use tokio::net::TcpStream;
use futures::future::TryFutureExt;
use tokio::io::{AsyncRead, AsyncWrite};
use tower::{
util::{BoxService, Oneshot},
Service,
ServiceExt,
};
use zebra_chain::chain_tip::NoChainTip;
use zebra_chain::{chain_tip::NoChainTip, parameters::Network};
use crate::{
peer::{self, ConnectedAddr, HandshakeRequest},
@ -21,42 +17,50 @@ use crate::{
BoxError, Config, Request, Response,
};
/// Use the provided TCP connection to create a Zcash connection completely
/// isolated from all other node state.
#[cfg(test)]
mod tests;
/// Creates a Zcash peer connection using the provided data stream.
/// This connection is completely isolated from all other node state.
///
/// The connection pool returned by `init` should be used for all requests that
/// The connection pool returned by [`init`](zebra_network::init)
/// should be used for all requests that
/// don't require isolated state or use of an existing TCP connection. However,
/// this low-level API is useful for custom network crawlers or Tor connections.
///
/// In addition to being completely isolated from all other node state, this
/// method also aims to be minimally distinguishable from other clients.
///
/// SECURITY TODO: check if the timestamp field can be zeroed, to remove another distinguisher (#3300)
///
/// Note that this method does not implement any timeout behavior, so callers may
/// want to layer it with a timeout as appropriate for their application.
///
/// # Inputs
///
/// - `conn`: an existing TCP connection to use. Passing an existing TCP
/// connection allows this method to be used with clearnet or Tor transports.
/// - `network`: the Zcash [`Network`] used for this connection.
///
/// - `data_stream`: an existing data stream. This can be a non-anonymised TCP connection,
/// or a Tor client [`DataStream`].
///
/// - `user_agent`: a valid BIP14 user-agent, e.g., the empty string.
///
/// # Bug
///
/// `connect_isolated` only works on `Mainnet`, see #1687.
pub fn connect_isolated(
conn: TcpStream,
pub fn connect_isolated<AsyncReadWrite>(
network: Network,
data_stream: AsyncReadWrite,
user_agent: String,
) -> impl Future<
Output = Result<
BoxService<Request, Response, Box<dyn std::error::Error + Send + Sync + 'static>>,
Box<dyn std::error::Error + Send + Sync + 'static>,
>,
> {
) -> impl Future<Output = Result<BoxService<Request, Response, BoxError>, BoxError>>
where
AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let config = Config {
network,
..Config::default()
};
let handshake = peer::Handshake::builder()
.with_config(Config::default())
.with_config(config)
.with_inbound_service(tower::service_fn(|_req| async move {
Ok::<Response, Box<dyn std::error::Error + Send + Sync + 'static>>(Response::Nil)
Ok::<Response, BoxError>(Response::Nil)
}))
.with_user_agent(user_agent)
.with_latest_chain_tip(NoChainTip)
@ -70,88 +74,29 @@ pub fn connect_isolated(
Oneshot::new(
handshake,
HandshakeRequest {
tcp_stream: conn,
data_stream,
connected_addr,
connection_tracker,
},
)
.map_ok(|client| BoxService::new(Wrapper(client)))
.map_ok(|client| BoxService::new(client.map_err(Into::into)))
}
// This can be deleted when a new version of Tower with map_err is released.
struct Wrapper(peer::Client);
impl Service<Request> for Wrapper {
type Response = Response;
type Error = BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx).map_err(Into::into)
}
fn call(&mut self, req: Request) -> Self::Future {
self.0.call(req).map_err(Into::into).boxed()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn connect_isolated_sends_minimally_distinguished_version_message() {
use std::net::SocketAddr;
use futures::stream::StreamExt;
use tokio_util::codec::Framed;
use crate::{
protocol::external::{AddrInVersion, Codec, Message},
types::PeerServices,
};
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let listen_addr = listener.local_addr().unwrap();
let fixed_isolated_addr: SocketAddr = "0.0.0.0:8233".parse().unwrap();
let conn = tokio::net::TcpStream::connect(listen_addr).await.unwrap();
tokio::spawn(connect_isolated(conn, "".to_string()));
let (conn, _) = listener.accept().await.unwrap();
let mut stream = Framed::new(conn, Codec::builder().finish());
if let Message::Version {
services,
timestamp,
address_from,
user_agent,
start_height,
relay,
..
} = stream
.next()
.await
.expect("stream item")
.expect("item is Ok(msg)")
{
// Check that the version message sent by connect_isolated
// has the fields specified in the Stolon RFC.
assert_eq!(services, PeerServices::empty());
assert_eq!(timestamp.timestamp() % (5 * 60), 0);
assert_eq!(
address_from,
AddrInVersion::new(fixed_isolated_addr, PeerServices::empty()),
);
assert_eq!(user_agent, "");
assert_eq!(start_height.0, 0);
assert!(!relay);
} else {
panic!("handshake did not send version message");
}
}
/// Creates a direct TCP Zcash peer connection to `addr`.
/// This connection is completely isolated from all other node state.
///
/// See [`connect_isolated`] for details.
///
/// # Privacy
///
/// Transactions sent over this connection can be linked to the sending and receiving IP address
/// by passive internet observers.
pub fn connect_isolated_tcp_direct(
network: Network,
addr: SocketAddr,
user_agent: String,
) -> impl Future<Output = Result<BoxService<Request, Response, BoxError>, BoxError>> {
tokio::net::TcpStream::connect(addr)
.err_into()
.and_then(move |tcp_stream| connect_isolated(network, tcp_stream, user_agent))
}

View File

@ -0,0 +1,3 @@
//! Tests for isolated Zebra connections.
mod vectors;

View File

@ -0,0 +1,220 @@
//! Fixed test vectors for isolated Zebra connections.
use std::{net::SocketAddr, task::Poll, time::Duration};
use futures::stream::StreamExt;
use tokio_util::codec::Framed;
use crate::{
constants::CURRENT_NETWORK_PROTOCOL_VERSION,
protocol::external::{AddrInVersion, Codec, Message},
types::PeerServices,
};
use super::super::*;
use Network::*;
/// Test that `connect_isolated` sends a version message with minimal distinguishing features,
/// when sent over TCP.
#[tokio::test]
async fn connect_isolated_sends_anonymised_version_message_tcp() {
zebra_test::init();
if zebra_test::net::zebra_skip_network_tests() {
return;
}
connect_isolated_sends_anonymised_version_message_tcp_net(Mainnet).await;
connect_isolated_sends_anonymised_version_message_tcp_net(Testnet).await;
}
async fn connect_isolated_sends_anonymised_version_message_tcp_net(network: Network) {
// These tests might fail on machines with no configured IPv4 addresses.
// (Localhost should be enough.)
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let listen_addr = listener.local_addr().unwrap();
// Connection errors are detected using the JoinHandle.
// (They might also make the test hang.)
let mut outbound_join_handle = tokio::spawn(connect_isolated_tcp_direct(
network,
listen_addr,
"".to_string(),
));
let (inbound_conn, _) = listener.accept().await.unwrap();
let mut inbound_stream =
Framed::new(inbound_conn, Codec::builder().for_network(network).finish());
// We don't need to send any bytes to get a version message.
if let Message::Version {
version,
services,
timestamp,
address_recv,
address_from,
nonce: _,
user_agent,
start_height,
relay,
} = inbound_stream
.next()
.await
.expect("stream item")
.expect("item is Ok(msg)")
{
// Check that the version message sent by connect_isolated
// anonymises all the fields that it possibly can.
//
// The version field needs to be accurate, because it controls protocol features.
// The nonce must be randomised for security.
//
// SECURITY TODO: check if the timestamp field can be zeroed, to remove another distinguisher (#3300)
let mut fixed_isolated_addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
fixed_isolated_addr.set_port(network.default_port());
// Required fields should be accurate and match most other peers.
// (We can't test nonce randomness here.)
assert_eq!(version, CURRENT_NETWORK_PROTOCOL_VERSION);
assert_eq!(timestamp.timestamp() % (5 * 60), 0);
// Other fields should be empty or zeroed.
assert_eq!(services, PeerServices::empty());
assert_eq!(
address_recv,
// Since we're connecting to the peer, we expect it to have the node flag.
//
// SECURITY TODO: should this just be zeroed anyway? (#3300)
AddrInVersion::new(fixed_isolated_addr, PeerServices::NODE_NETWORK),
);
assert_eq!(
address_from,
AddrInVersion::new(fixed_isolated_addr, PeerServices::empty()),
);
assert_eq!(user_agent, "");
assert_eq!(start_height.0, 0);
assert!(!relay);
} else {
panic!("handshake did not send version message");
}
// Let the spawned task run for a short time.
tokio::time::sleep(Duration::from_secs(1)).await;
// Make sure that the isolated connection did not:
// - panic, or
// - return a service.
//
// This test doesn't send a version message on `inbound_conn`,
// so providing a service is incorrect behaviour.
//
// A timeout error would be acceptable,
// but a TCP connection error indicates a potential test setup issue.
// So we fail on them both, because we expect this test to complete before the timeout.
let outbound_result = futures::poll!(&mut outbound_join_handle);
assert!(matches!(outbound_result, Poll::Pending));
outbound_join_handle.abort();
}
/// Test that `connect_isolated` sends a version message with minimal distinguishing features,
/// when sent in-memory.
///
/// This test also:
/// - checks `AsyncReadWrite` support, and
/// - runs even if network tests are disabled.
#[tokio::test]
async fn connect_isolated_sends_anonymised_version_message_mem() {
zebra_test::init();
connect_isolated_sends_anonymised_version_message_mem_net(Mainnet).await;
connect_isolated_sends_anonymised_version_message_mem_net(Testnet).await;
}
async fn connect_isolated_sends_anonymised_version_message_mem_net(network: Network) {
// We expect version messages to be ~100 bytes
let (inbound_stream, outbound_stream) = tokio::io::duplex(1024);
let mut outbound_join_handle =
tokio::spawn(connect_isolated(network, outbound_stream, "".to_string()));
let mut inbound_stream = Framed::new(
inbound_stream,
Codec::builder().for_network(network).finish(),
);
// We don't need to send any bytes to get a version message.
if let Message::Version {
version,
services,
timestamp,
address_recv,
address_from,
nonce: _,
user_agent,
start_height,
relay,
} = inbound_stream
.next()
.await
.expect("stream item")
.expect("item is Ok(msg)")
{
// Check that the version message sent by connect_isolated
// anonymises all the fields that it possibly can.
//
// The version field needs to be accurate, because it controls protocol features.
// The nonce must be randomised for security.
//
// SECURITY TODO: check if the timestamp field can be zeroed, to remove another distinguisher (#3300)
let mut fixed_isolated_addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
fixed_isolated_addr.set_port(network.default_port());
// Required fields should be accurate and match most other peers.
// (We can't test nonce randomness here.)
assert_eq!(version, CURRENT_NETWORK_PROTOCOL_VERSION);
assert_eq!(timestamp.timestamp() % (5 * 60), 0);
// Other fields should be empty or zeroed.
assert_eq!(services, PeerServices::empty());
assert_eq!(
address_recv,
// Since we're connecting to the peer, we expect it to have the node flag.
//
// SECURITY TODO: should this just be zeroed anyway? (#3300)
AddrInVersion::new(fixed_isolated_addr, PeerServices::NODE_NETWORK),
);
assert_eq!(
address_from,
AddrInVersion::new(fixed_isolated_addr, PeerServices::empty()),
);
assert_eq!(user_agent, "");
assert_eq!(start_height.0, 0);
assert!(!relay);
} else {
panic!("handshake did not send version message");
}
// Let the spawned task run for a short time.
tokio::time::sleep(Duration::from_secs(1)).await;
// Make sure that the isolated connection did not:
// - panic, or
// - return a service.
//
// This test doesn't send a version message on `inbound_conn`,
// so providing a service is incorrect behaviour.
// (But a timeout error would be acceptable.)
let outbound_result = futures::poll!(&mut outbound_join_handle);
assert!(matches!(
outbound_result,
Poll::Pending | Poll::Ready(Ok(Err(_)))
));
outbound_join_handle.abort();
}

View File

@ -70,7 +70,7 @@ mod protocol;
pub use crate::{
address_book::AddressBook,
config::Config,
isolated::connect_isolated,
isolated::{connect_isolated, connect_isolated_tcp_direct},
meta_addr::PeerAddrState,
peer::{HandshakeError, PeerError, SharedPeerError},
peer_set::init,

View File

@ -21,11 +21,21 @@ use crate::{
/// A wrapper around [`peer::Handshake`] that opens a TCP connection before
/// forwarding to the inner handshake service. Writing this as its own
/// [`tower::Service`] lets us apply unified timeout policies, etc.
pub struct Connector<S, C = NoChainTip> {
handshaker: Handshake<S, C>,
pub struct Connector<S, C = NoChainTip>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send,
C: ChainTip + Clone + Send + 'static,
{
handshaker: Handshake<S, TcpStream, C>,
}
impl<S: Clone, C: Clone> Clone for Connector<S, C> {
impl<S, C> Clone for Connector<S, C>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send,
C: ChainTip + Clone + Send + 'static,
{
fn clone(&self) -> Self {
Connector {
handshaker: self.handshaker.clone(),
@ -33,8 +43,13 @@ impl<S: Clone, C: Clone> Clone for Connector<S, C> {
}
}
impl<S, C> Connector<S, C> {
pub fn new(handshaker: Handshake<S, C>) -> Self {
impl<S, C> Connector<S, C>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send,
C: ChainTip + Clone + Send + 'static,
{
pub fn new(handshaker: Handshake<S, TcpStream, C>) -> Self {
Connector { handshaker }
}
}
@ -77,11 +92,11 @@ where
let connector_span = info_span!("connector", peer = ?connected_addr);
async move {
let stream = TcpStream::connect(addr).await?;
let tcp_stream = TcpStream::connect(addr).await?;
hs.ready().await?;
let client = hs
.call(HandshakeRequest {
tcp_stream: stream,
.call(HandshakeRequest::<TcpStream> {
data_stream: tcp_stream,
connected_addr,
connection_tracker,
})

View File

@ -3,6 +3,7 @@ use std::{
collections::HashSet,
fmt,
future::Future,
marker::PhantomData,
net::{IpAddr, Ipv4Addr, SocketAddr},
pin::Pin,
sync::Arc,
@ -12,7 +13,7 @@ use std::{
use chrono::{TimeZone, Utc};
use futures::{channel::oneshot, future, pin_mut, FutureExt, SinkExt, StreamExt};
use tokio::{
net::TcpStream,
io::{AsyncRead, AsyncWrite},
sync::broadcast,
task::JoinError,
time::{timeout, Instant},
@ -53,18 +54,51 @@ use crate::{
/// To avoid hangs, each handshake (or its connector) should be:
/// - launched in a separate task, and
/// - wrapped in a timeout.
#[derive(Clone)]
pub struct Handshake<S, C = NoChainTip> {
pub struct Handshake<S, AsyncReadWrite, C = NoChainTip>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send,
C: ChainTip + Clone + Send + 'static,
AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
config: Config,
inbound_service: S,
address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>,
nonces: Arc<futures::lock::Mutex<HashSet<Nonce>>>,
user_agent: String,
our_services: PeerServices,
relay: bool,
parent_span: Span,
inbound_service: S,
address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>,
minimum_peer_version: MinimumPeerVersion<C>,
nonces: Arc<futures::lock::Mutex<HashSet<Nonce>>>,
parent_span: Span,
_phantom_data: PhantomData<AsyncReadWrite>,
}
impl<S, AsyncReadWrite, C> Clone for Handshake<S, AsyncReadWrite, C>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send,
C: ChainTip + Clone + Send + 'static,
AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
user_agent: self.user_agent.clone(),
our_services: self.our_services,
relay: self.relay,
inbound_service: self.inbound_service.clone(),
address_book_updater: self.address_book_updater.clone(),
inv_collector: self.inv_collector.clone(),
minimum_peer_version: self.minimum_peer_version.clone(),
nonces: self.nonces.clone(),
parent_span: self.parent_span.clone(),
_phantom_data: self._phantom_data,
}
}
}
/// The peer address that we are handshaking with.
@ -306,22 +340,32 @@ impl fmt::Debug for ConnectedAddr {
}
/// A builder for `Handshake`.
pub struct Builder<S, C = NoChainTip> {
config: Option<Config>,
inbound_service: Option<S>,
address_book_updater: Option<tokio::sync::mpsc::Sender<MetaAddrChange>>,
our_services: Option<PeerServices>,
user_agent: Option<String>,
relay: Option<bool>,
inv_collector: Option<broadcast::Sender<(InventoryHash, SocketAddr)>>,
latest_chain_tip: C,
}
impl<S, C> Builder<S, C>
pub struct Builder<S, AsyncReadWrite, C = NoChainTip>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send,
C: ChainTip,
C: ChainTip + Clone + Send + 'static,
AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
config: Option<Config>,
our_services: Option<PeerServices>,
user_agent: Option<String>,
relay: Option<bool>,
inbound_service: Option<S>,
address_book_updater: Option<tokio::sync::mpsc::Sender<MetaAddrChange>>,
inv_collector: Option<broadcast::Sender<(InventoryHash, SocketAddr)>>,
latest_chain_tip: C,
_phantom_data: PhantomData<AsyncReadWrite>,
}
impl<S, AsyncReadWrite, C> Builder<S, AsyncReadWrite, C>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send,
C: ChainTip + Clone + Send + 'static,
AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
/// Provide a config. Mandatory.
pub fn with_config(mut self, config: Config) -> Self {
@ -381,9 +425,16 @@ where
/// constant over network upgrade activations.
///
/// Use [`NoChainTip`] to explicitly provide no chain tip.
pub fn with_latest_chain_tip<NewC>(self, latest_chain_tip: NewC) -> Builder<S, NewC> {
pub fn with_latest_chain_tip<NewC>(
self,
latest_chain_tip: NewC,
) -> Builder<S, AsyncReadWrite, NewC>
where
NewC: ChainTip + Clone + Send + 'static,
{
Builder {
latest_chain_tip,
// TODO: Until Rust RFC 2528 reaches stable, we can't do `..self`
config: self.config,
inbound_service: self.inbound_service,
@ -392,6 +443,7 @@ where
user_agent: self.user_agent,
relay: self.relay,
inv_collector: self.inv_collector,
_phantom_data: self._phantom_data,
}
}
@ -406,7 +458,7 @@ where
/// Consume this builder and produce a [`Handshake`].
///
/// Returns an error only if any mandatory field was unset.
pub fn finish(self) -> Result<Handshake<S, C>, &'static str> {
pub fn finish(self) -> Result<Handshake<S, AsyncReadWrite, C>, &'static str> {
let config = self.config.ok_or("did not specify config")?;
let inbound_service = self
.inbound_service
@ -430,38 +482,41 @@ where
Ok(Handshake {
config,
inbound_service,
inv_collector,
address_book_updater,
nonces,
user_agent,
our_services,
relay,
parent_span: Span::current(),
inbound_service,
address_book_updater,
inv_collector,
minimum_peer_version,
nonces,
parent_span: Span::current(),
_phantom_data: self._phantom_data,
})
}
}
impl<S> Handshake<S, NoChainTip>
impl<S, AsyncReadWrite> Handshake<S, AsyncReadWrite, NoChainTip>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send,
AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
/// Create a builder that configures a [`Handshake`] service.
pub fn builder() -> Builder<S, NoChainTip> {
pub fn builder() -> Builder<S, AsyncReadWrite, NoChainTip> {
// We don't derive `Default` because the derive inserts a `where S:
// Default` bound even though `Option<S>` implements `Default` even if
// `S` does not.
Builder {
config: None,
our_services: None,
user_agent: None,
relay: None,
inbound_service: None,
address_book_updater: None,
user_agent: None,
our_services: None,
relay: None,
inv_collector: None,
latest_chain_tip: NoChainTip,
_phantom_data: PhantomData::default(),
}
}
}
@ -472,8 +527,8 @@ where
/// We split `Handshake` into its components before calling this function,
/// to avoid infectious `Sync` bounds on the returned future.
#[allow(clippy::too_many_arguments)]
pub async fn negotiate_version(
peer_conn: &mut Framed<TcpStream, Codec>,
pub async fn negotiate_version<AsyncReadWrite>(
peer_conn: &mut Framed<AsyncReadWrite, Codec>,
connected_addr: &ConnectedAddr,
config: Config,
nonces: Arc<futures::lock::Mutex<HashSet<Nonce>>>,
@ -481,7 +536,10 @@ pub async fn negotiate_version(
our_services: PeerServices,
relay: bool,
mut minimum_peer_version: MinimumPeerVersion<impl ChainTip>,
) -> Result<(Version, PeerServices, SocketAddr), HandshakeError> {
) -> Result<(Version, PeerServices, SocketAddr), HandshakeError>
where
AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
// Create a random nonce for this connection
let local_nonce = Nonce::default();
// # Correctness
@ -671,9 +729,12 @@ pub async fn negotiate_version(
/// A handshake request.
/// Contains the information needed to handshake with the peer.
pub struct HandshakeRequest {
/// The TCP connection to the peer.
pub tcp_stream: TcpStream,
pub struct HandshakeRequest<AsyncReadWrite>
where
AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
/// The tokio [`TcpStream`] or Tor [`DataStream`] to the peer.
pub data_stream: AsyncReadWrite,
/// The address of the peer, and other related information.
pub connected_addr: ConnectedAddr,
@ -684,11 +745,13 @@ pub struct HandshakeRequest {
pub connection_tracker: ConnectionTracker,
}
impl<S, C> Service<HandshakeRequest> for Handshake<S, C>
impl<S, AsyncReadWrite, C> Service<HandshakeRequest<AsyncReadWrite>>
for Handshake<S, AsyncReadWrite, C>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send,
C: ChainTip + Clone + Send + 'static,
AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Response = Client;
type Error = BoxError;
@ -699,9 +762,9 @@ where
Poll::Ready(Ok(()))
}
fn call(&mut self, req: HandshakeRequest) -> Self::Future {
fn call(&mut self, req: HandshakeRequest<AsyncReadWrite>) -> Self::Future {
let HandshakeRequest {
tcp_stream,
data_stream,
connected_addr,
connection_tracker,
} = req;
@ -735,7 +798,7 @@ where
// As a defence-in-depth against hangs, every send() or next() on peer_conn
// should be wrapped in a timeout.
let mut peer_conn = Framed::new(
tcp_stream,
data_stream,
Codec::builder()
.for_network(config.network)
.with_metrics_addr_label(connected_addr.get_transient_addr_label())

View File

@ -13,7 +13,7 @@ use futures::{
};
use rand::seq::SliceRandom;
use tokio::{
net::TcpListener,
net::{TcpListener, TcpStream},
sync::broadcast,
time::{sleep, Instant},
};
@ -482,7 +482,8 @@ async fn accept_inbound_connections<S>(
peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
) -> Result<(), BoxError>
where
S: Service<peer::HandshakeRequest, Response = peer::Client, Error = BoxError> + Clone,
S: Service<peer::HandshakeRequest<TcpStream>, Response = peer::Client, Error = BoxError>
+ Clone,
S::Future: Send + 'static,
{
let mut active_inbound_connections = ActiveConnectionCounter::new_counter();
@ -534,7 +535,7 @@ where
// Construct a handshake future but do not drive it yet....
let handshake = handshaker.call(HandshakeRequest {
tcp_stream,
data_stream: tcp_stream,
connected_addr,
connection_tracker,
});

View File

@ -725,24 +725,25 @@ async fn listener_peer_limit_one_handshake_ok_then_drop() {
return;
}
let success_disconnect_inbound_handshaker = service_fn(|req: HandshakeRequest| async move {
let HandshakeRequest {
tcp_stream,
connected_addr: _,
connection_tracker,
} = req;
let success_disconnect_inbound_handshaker =
service_fn(|req: HandshakeRequest<TcpStream>| async move {
let HandshakeRequest {
data_stream: tcp_stream,
connected_addr: _,
connection_tracker,
} = req;
let (fake_client, _harness) = ClientTestHarness::build().finish();
let (fake_client, _harness) = ClientTestHarness::build().finish();
// Actually close the connection.
std::mem::drop(connection_tracker);
std::mem::drop(tcp_stream);
// Actually close the connection.
std::mem::drop(connection_tracker);
std::mem::drop(tcp_stream);
// Give the crawler time to get the message.
tokio::task::yield_now().await;
// Give the crawler time to get the message.
tokio::task::yield_now().await;
Ok(fake_client)
});
Ok(fake_client)
});
let (config, mut peerset_rx) =
spawn_inbound_listener_with_peer_limit(1, success_disconnect_inbound_handshaker).await;
@ -791,25 +792,26 @@ async fn listener_peer_limit_one_handshake_ok_stay_open() {
let (peer_tracker_tx, mut peer_tracker_rx) = mpsc::unbounded();
let success_stay_open_inbound_handshaker = service_fn(move |req: HandshakeRequest| {
let peer_tracker_tx = peer_tracker_tx.clone();
async move {
let HandshakeRequest {
tcp_stream,
connected_addr: _,
connection_tracker,
} = req;
let success_stay_open_inbound_handshaker =
service_fn(move |req: HandshakeRequest<TcpStream>| {
let peer_tracker_tx = peer_tracker_tx.clone();
async move {
let HandshakeRequest {
data_stream: tcp_stream,
connected_addr: _,
connection_tracker,
} = req;
let (fake_client, _harness) = ClientTestHarness::build().finish();
let (fake_client, _harness) = ClientTestHarness::build().finish();
// Make the connection staying open.
peer_tracker_tx
.unbounded_send((tcp_stream, connection_tracker))
.expect("unexpected error sending to unbounded channel");
// Make the connection staying open.
peer_tracker_tx
.unbounded_send((tcp_stream, connection_tracker))
.expect("unexpected error sending to unbounded channel");
Ok(fake_client)
}
});
Ok(fake_client)
}
});
let (config, mut peerset_rx) =
spawn_inbound_listener_with_peer_limit(1, success_stay_open_inbound_handshaker).await;
@ -913,24 +915,25 @@ async fn listener_peer_limit_default_handshake_ok_then_drop() {
return;
}
let success_disconnect_inbound_handshaker = service_fn(|req: HandshakeRequest| async move {
let HandshakeRequest {
tcp_stream,
connected_addr: _,
connection_tracker,
} = req;
let success_disconnect_inbound_handshaker =
service_fn(|req: HandshakeRequest<TcpStream>| async move {
let HandshakeRequest {
data_stream: tcp_stream,
connected_addr: _,
connection_tracker,
} = req;
let (fake_client, _harness) = ClientTestHarness::build().finish();
let (fake_client, _harness) = ClientTestHarness::build().finish();
// Actually close the connection.
std::mem::drop(connection_tracker);
std::mem::drop(tcp_stream);
// Actually close the connection.
std::mem::drop(connection_tracker);
std::mem::drop(tcp_stream);
// Give the crawler time to get the message.
tokio::task::yield_now().await;
// Give the crawler time to get the message.
tokio::task::yield_now().await;
Ok(fake_client)
});
Ok(fake_client)
});
let (config, mut peerset_rx) =
spawn_inbound_listener_with_peer_limit(None, success_disconnect_inbound_handshaker).await;
@ -979,25 +982,26 @@ async fn listener_peer_limit_default_handshake_ok_stay_open() {
let (peer_tracker_tx, mut peer_tracker_rx) = mpsc::unbounded();
let success_stay_open_inbound_handshaker = service_fn(move |req: HandshakeRequest| {
let peer_tracker_tx = peer_tracker_tx.clone();
async move {
let HandshakeRequest {
tcp_stream,
connected_addr: _,
connection_tracker,
} = req;
let success_stay_open_inbound_handshaker =
service_fn(move |req: HandshakeRequest<TcpStream>| {
let peer_tracker_tx = peer_tracker_tx.clone();
async move {
let HandshakeRequest {
data_stream: tcp_stream,
connected_addr: _,
connection_tracker,
} = req;
let (fake_client, _harness) = ClientTestHarness::build().finish();
let (fake_client, _harness) = ClientTestHarness::build().finish();
// Make the connection staying open.
peer_tracker_tx
.unbounded_send((tcp_stream, connection_tracker))
.expect("unexpected error sending to unbounded channel");
// Make the connection staying open.
peer_tracker_tx
.unbounded_send((tcp_stream, connection_tracker))
.expect("unexpected error sending to unbounded channel");
Ok(fake_client)
}
});
Ok(fake_client)
}
});
let (config, mut peerset_rx) =
spawn_inbound_listener_with_peer_limit(None, success_stay_open_inbound_handshaker).await;
@ -1353,7 +1357,7 @@ async fn spawn_inbound_listener_with_peer_limit<S>(
listen_handshaker: S,
) -> (Config, mpsc::Receiver<DiscoveredPeer>)
where
S: Service<peer::HandshakeRequest, Response = peer::Client, Error = BoxError>
S: Service<peer::HandshakeRequest<TcpStream>, Response = peer::Client, Error = BoxError>
+ Clone
+ Send
+ 'static,