fix(net): Clean up licensing, closure `move`, log typos, tracing spans (#6995)
* Remove a redundant outbound connector timeout * Fix panics in inbound connection handshaker * Refactor to simplify FuturesUnordered types * Make licensing notes consistent * Delete redundant `move` in closures * Fix a log typo * Add some missing tracing spans
This commit is contained in:
parent
40d697a66c
commit
ad7af3e2d8
|
@ -7,7 +7,11 @@ description = "Networking code for Zebra"
|
||||||
#
|
#
|
||||||
# This licence is deliberately different to the rest of Zebra.
|
# This licence is deliberately different to the rest of Zebra.
|
||||||
#
|
#
|
||||||
# zebra-network/src/peer_set/set.rs was modified from a 2019 version of:
|
# Some code in:
|
||||||
|
# zebra-network/src/peer_set/set.rs
|
||||||
|
# zebra-network/src/peer_set/unready_service.rs
|
||||||
|
# zebra-network/src/peer_set/initialize.rs
|
||||||
|
# was modified from a 2019 version of:
|
||||||
# https://github.com/tower-rs/tower/tree/master/tower/src/balance/p2c/service.rs
|
# https://github.com/tower-rs/tower/tree/master/tower/src/balance/p2c/service.rs
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
repository = "https://github.com/ZcashFoundation/zebra"
|
repository = "https://github.com/ZcashFoundation/zebra"
|
||||||
|
|
|
@ -13,6 +13,7 @@ use indexmap::IndexSet;
|
||||||
use serde::{de, Deserialize, Deserializer};
|
use serde::{de, Deserialize, Deserializer};
|
||||||
use tempfile::NamedTempFile;
|
use tempfile::NamedTempFile;
|
||||||
use tokio::{fs, io::AsyncWriteExt};
|
use tokio::{fs, io::AsyncWriteExt};
|
||||||
|
use tracing::Span;
|
||||||
|
|
||||||
use zebra_chain::parameters::Network;
|
use zebra_chain::parameters::Network;
|
||||||
|
|
||||||
|
@ -493,12 +494,15 @@ impl Config {
|
||||||
|
|
||||||
// Create the temporary file.
|
// Create the temporary file.
|
||||||
// Do blocking filesystem operations on a dedicated thread.
|
// Do blocking filesystem operations on a dedicated thread.
|
||||||
|
let span = Span::current();
|
||||||
let tmp_peer_cache_file = tokio::task::spawn_blocking(move || {
|
let tmp_peer_cache_file = tokio::task::spawn_blocking(move || {
|
||||||
// Put the temporary file in the same directory as the permanent file,
|
span.in_scope(move || {
|
||||||
// so atomic filesystem operations are possible.
|
// Put the temporary file in the same directory as the permanent file,
|
||||||
tempfile::Builder::new()
|
// so atomic filesystem operations are possible.
|
||||||
.prefix(&tmp_peer_cache_prefix)
|
tempfile::Builder::new()
|
||||||
.tempfile_in(peer_cache_dir)
|
.prefix(&tmp_peer_cache_prefix)
|
||||||
|
.tempfile_in(peer_cache_dir)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.expect("unexpected panic creating temporary peer cache file")?;
|
.expect("unexpected panic creating temporary peer cache file")?;
|
||||||
|
@ -514,31 +518,34 @@ impl Config {
|
||||||
|
|
||||||
// Atomically replace the current cache with the temporary cache.
|
// Atomically replace the current cache with the temporary cache.
|
||||||
// Do blocking filesystem operations on a dedicated thread.
|
// Do blocking filesystem operations on a dedicated thread.
|
||||||
|
let span = Span::current();
|
||||||
tokio::task::spawn_blocking(move || {
|
tokio::task::spawn_blocking(move || {
|
||||||
let result = tmp_peer_cache_file.persist(&peer_cache_file);
|
span.in_scope(move || {
|
||||||
|
let result = tmp_peer_cache_file.persist(&peer_cache_file);
|
||||||
|
|
||||||
// Drops the temp file if needed
|
// Drops the temp file if needed
|
||||||
match result {
|
match result {
|
||||||
Ok(_temp_file) => {
|
Ok(_temp_file) => {
|
||||||
info!(
|
info!(
|
||||||
cached_ip_count = ?peer_list.len(),
|
cached_ip_count = ?peer_list.len(),
|
||||||
?peer_cache_file,
|
?peer_cache_file,
|
||||||
"updated cached peer IP addresses"
|
"updated cached peer IP addresses"
|
||||||
);
|
|
||||||
|
|
||||||
for ip in &peer_list {
|
|
||||||
metrics::counter!(
|
|
||||||
"zcash.net.peers.cache",
|
|
||||||
1,
|
|
||||||
"cache" => peer_cache_file.display().to_string(),
|
|
||||||
"remote_ip" => ip.to_string()
|
|
||||||
);
|
);
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
for ip in &peer_list {
|
||||||
|
metrics::counter!(
|
||||||
|
"zcash.net.peers.cache",
|
||||||
|
1,
|
||||||
|
"cache" => peer_cache_file.display().to_string(),
|
||||||
|
"remote_ip" => ip.to_string()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err(error) => Err(error.error),
|
||||||
}
|
}
|
||||||
Err(error) => Err(error.error),
|
})
|
||||||
}
|
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.expect("unexpected panic making temporary peer cache file permanent")
|
.expect("unexpected panic making temporary peer cache file permanent")
|
||||||
|
|
|
@ -15,6 +15,7 @@ use crate::{
|
||||||
};
|
};
|
||||||
|
|
||||||
/// An ongoing task that regularly caches the current `address_book` to disk, based on `config`.
|
/// An ongoing task that regularly caches the current `address_book` to disk, based on `config`.
|
||||||
|
#[instrument(skip(config, address_book))]
|
||||||
pub async fn peer_cache_updater(
|
pub async fn peer_cache_updater(
|
||||||
config: Config,
|
config: Config,
|
||||||
address_book: Arc<Mutex<AddressBook>>,
|
address_book: Arc<Mutex<AddressBook>>,
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
//! A peer set whose size is dynamically determined by resource constraints.
|
//! A peer set whose size is dynamically determined by resource constraints.
|
||||||
|
//!
|
||||||
// Portions of this submodule were adapted from tower-balance,
|
//! The [`PeerSet`] implementation is adapted from the one in [tower::Balance][tower-balance].
|
||||||
// which is (c) 2019 Tower Contributors (MIT licensed).
|
//!
|
||||||
|
//! [tower-balance]: https://github.com/tower-rs/tower/tree/master/tower/src/balance
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::{BTreeMap, HashSet},
|
collections::{BTreeMap, HashSet},
|
||||||
|
@ -614,7 +615,7 @@ where
|
||||||
peerset_tx.clone(),
|
peerset_tx.clone(),
|
||||||
)
|
)
|
||||||
.await?
|
.await?
|
||||||
.map(move |res| match res {
|
.map(|res| match res {
|
||||||
Ok(()) => (),
|
Ok(()) => (),
|
||||||
Err(e @ JoinError { .. }) => {
|
Err(e @ JoinError { .. }) => {
|
||||||
if e.is_panic() {
|
if e.is_panic() {
|
||||||
|
@ -632,12 +633,12 @@ where
|
||||||
HANDSHAKE_TIMEOUT + Duration::from_millis(500),
|
HANDSHAKE_TIMEOUT + Duration::from_millis(500),
|
||||||
handshake_task,
|
handshake_task,
|
||||||
)
|
)
|
||||||
.map(move |res| match res {
|
.map(|res| match res {
|
||||||
Ok(()) => (),
|
Ok(()) => (),
|
||||||
Err(_e @ Elapsed { .. }) => {
|
Err(_e @ Elapsed { .. }) => {
|
||||||
info!(
|
info!(
|
||||||
"timeout in spawned accept_inbound_handshake() task: \
|
"timeout in spawned accept_inbound_handshake() task: \
|
||||||
inner task should have timeout out already"
|
inner task should have timed out already"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -677,6 +678,9 @@ where
|
||||||
///
|
///
|
||||||
/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
|
/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
|
||||||
/// the [`peer::Client`] result over `peerset_tx`.
|
/// the [`peer::Client`] result over `peerset_tx`.
|
||||||
|
//
|
||||||
|
// TODO: when we support inbound proxies, distinguish between proxied listeners and
|
||||||
|
// direct listeners in the span generated by this instrument macro
|
||||||
#[instrument(skip(handshaker, tcp_stream, connection_tracker, peerset_tx))]
|
#[instrument(skip(handshaker, tcp_stream, connection_tracker, peerset_tx))]
|
||||||
async fn accept_inbound_handshake<S>(
|
async fn accept_inbound_handshake<S>(
|
||||||
addr: PeerSocketAddr,
|
addr: PeerSocketAddr,
|
||||||
|
@ -701,8 +705,6 @@ where
|
||||||
//
|
//
|
||||||
// This await is okay because the handshaker's `poll_ready` method always returns Ready.
|
// This await is okay because the handshaker's `poll_ready` method always returns Ready.
|
||||||
handshaker.ready().await?;
|
handshaker.ready().await?;
|
||||||
// TODO: distinguish between proxied listeners and direct listeners
|
|
||||||
let handshaker_span = info_span!("listen_handshaker", peer = ?connected_addr);
|
|
||||||
|
|
||||||
// Construct a handshake future but do not drive it yet....
|
// Construct a handshake future but do not drive it yet....
|
||||||
let handshake = handshaker.call(HandshakeRequest {
|
let handshake = handshaker.call(HandshakeRequest {
|
||||||
|
@ -724,7 +726,7 @@ where
|
||||||
debug!(?handshake_result, "error handshaking with inbound peer");
|
debug!(?handshake_result, "error handshaking with inbound peer");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
.instrument(handshaker_span),
|
.in_current_span(),
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(handshake_task)
|
Ok(handshake_task)
|
||||||
|
@ -924,8 +926,8 @@ where
|
||||||
|
|
||||||
Ok(DemandCrawlFinished)
|
Ok(DemandCrawlFinished)
|
||||||
}
|
}
|
||||||
})
|
}.in_current_span())
|
||||||
.map(move |res| match res {
|
.map(|res| match res {
|
||||||
Ok(crawler_action) => crawler_action,
|
Ok(crawler_action) => crawler_action,
|
||||||
Err(e @ JoinError {..}) => {
|
Err(e @ JoinError {..}) => {
|
||||||
if e.is_panic() {
|
if e.is_panic() {
|
||||||
|
@ -936,8 +938,7 @@ where
|
||||||
// Just fake it
|
// Just fake it
|
||||||
Ok(HandshakeFinished)
|
Ok(HandshakeFinished)
|
||||||
}
|
}
|
||||||
})
|
});
|
||||||
.in_current_span();
|
|
||||||
|
|
||||||
handshakes.push(Box::pin(handshake_or_crawl_handle));
|
handshakes.push(Box::pin(handshake_or_crawl_handle));
|
||||||
}
|
}
|
||||||
|
@ -954,7 +955,7 @@ where
|
||||||
crawl(candidates, demand_tx).await?;
|
crawl(candidates, demand_tx).await?;
|
||||||
|
|
||||||
Ok(TimerCrawlFinished)
|
Ok(TimerCrawlFinished)
|
||||||
})
|
}.in_current_span())
|
||||||
.map(move |res| match res {
|
.map(move |res| match res {
|
||||||
Ok(crawler_action) => crawler_action,
|
Ok(crawler_action) => crawler_action,
|
||||||
Err(e @ JoinError {..}) => {
|
Err(e @ JoinError {..}) => {
|
||||||
|
@ -966,8 +967,7 @@ where
|
||||||
// Just fake it
|
// Just fake it
|
||||||
Ok(TimerCrawlFinished)
|
Ok(TimerCrawlFinished)
|
||||||
}
|
}
|
||||||
})
|
});
|
||||||
.in_current_span();
|
|
||||||
|
|
||||||
handshakes.push(Box::pin(crawl_handle));
|
handshakes.push(Box::pin(crawl_handle));
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@
|
||||||
//! # Implementation
|
//! # Implementation
|
||||||
//!
|
//!
|
||||||
//! The [`PeerSet`] implementation is adapted from the one in [tower::Balance][tower-balance].
|
//! The [`PeerSet`] implementation is adapted from the one in [tower::Balance][tower-balance].
|
||||||
|
//!
|
||||||
//! As described in Tower's documentation, it:
|
//! As described in Tower's documentation, it:
|
||||||
//!
|
//!
|
||||||
//! > Distributes requests across inner services using the [Power of Two Choices][p2c].
|
//! > Distributes requests across inner services using the [Power of Two Choices][p2c].
|
||||||
|
|
|
@ -1,6 +1,9 @@
|
||||||
/// Services that are busy or newly created.
|
//! Services that are busy or newly created.
|
||||||
///
|
//!
|
||||||
/// Adapted from tower-balance.
|
//! The [`UnreadyService`] implementation is adapted from the one in [tower::Balance][tower-balance].
|
||||||
|
//!
|
||||||
|
//! [tower-balance]: https://github.com/tower-rs/tower/tree/master/tower/src/balance
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
future::Future,
|
future::Future,
|
||||||
marker::PhantomData,
|
marker::PhantomData,
|
||||||
|
|
Loading…
Reference in New Issue