feat(net): return peer metadata from `connect_isolated` functions (#4870)

* Move version into a ConnectionInfo struct

* Add negotiated version to ConnectionInfo

Part of this change was generated using:
```
fastmod --fixed-strings ".version(" ".remote_version(" zebra-network
```

* Add the peer address to ConnectionInfo, add ConnectionInfo to Connection

* Return a Client instance from connect_isolated_* functions

This allows library users to access client ConnectionInfo.

* Add and improve debug formatting

* Add peer services and user agent to ConnectionInfo

* Export the Client type, and fix up a zebrad test

* Export types used by the public API

* Split VersionMessage into its own struct

* Use VersionMessage in ConnectionInfo

* Add a public API test for ConnectionInfo

* Wrap ConnectionInfo in an Arc

* Fix some doc links
This commit is contained in:
teor 2022-09-15 01:00:25 +10:00 committed by GitHub
parent 00eee8652e
commit 806dd0f24c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 450 additions and 186 deletions

View File

@ -4,15 +4,12 @@ use std::{future::Future, net::SocketAddr};
use futures::future::TryFutureExt;
use tokio::io::{AsyncRead, AsyncWrite};
use tower::{
util::{BoxService, Oneshot},
Service, ServiceExt,
};
use tower::{util::Oneshot, Service};
use zebra_chain::{chain_tip::NoChainTip, parameters::Network};
use crate::{
peer::{self, ConnectedAddr, HandshakeRequest},
peer::{self, Client, ConnectedAddr, HandshakeRequest},
peer_set::ActiveConnectionCounter,
BoxError, Config, Request, Response,
};
@ -51,7 +48,7 @@ pub fn connect_isolated<PeerTransport>(
network: Network,
data_stream: PeerTransport,
user_agent: String,
) -> impl Future<Output = Result<BoxService<Request, Response, BoxError>, BoxError>>
) -> impl Future<Output = Result<Client, BoxError>>
where
PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
@ -79,7 +76,7 @@ pub fn connect_isolated_with_inbound<PeerTransport, InboundService>(
data_stream: PeerTransport,
user_agent: String,
inbound_service: InboundService,
) -> impl Future<Output = Result<BoxService<Request, Response, BoxError>, BoxError>>
) -> impl Future<Output = Result<Client, BoxError>>
where
PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
InboundService:
@ -111,7 +108,6 @@ where
connection_tracker,
},
)
.map_ok(|client| BoxService::new(client.map_err(Into::into)))
}
/// Creates a direct TCP Zcash peer connection to `addr`.
@ -129,7 +125,7 @@ pub fn connect_isolated_tcp_direct(
network: Network,
addr: SocketAddr,
user_agent: String,
) -> impl Future<Output = Result<BoxService<Request, Response, BoxError>, BoxError>> {
) -> impl Future<Output = Result<Client, BoxError>> {
let nil_inbound_service =
tower::service_fn(|_req| async move { Ok::<Response, BoxError>(Response::Nil) });
@ -150,7 +146,7 @@ pub fn connect_isolated_tcp_direct_with_inbound<InboundService>(
addr: SocketAddr,
user_agent: String,
inbound_service: InboundService,
) -> impl Future<Output = Result<BoxService<Request, Response, BoxError>, BoxError>>
) -> impl Future<Output = Result<Client, BoxError>>
where
InboundService:
Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,

View File

@ -9,6 +9,7 @@ use crate::{
constants::CURRENT_NETWORK_PROTOCOL_VERSION,
protocol::external::{AddrInVersion, Codec, Message},
types::PeerServices,
VersionMessage,
};
use super::super::*;
@ -127,7 +128,7 @@ async fn check_version_message<PeerTransport>(
PeerTransport: AsyncRead + Unpin,
{
// We don't need to send any bytes to get a version message.
if let Message::Version {
if let Message::Version(VersionMessage {
version,
services,
timestamp,
@ -137,7 +138,7 @@ async fn check_version_message<PeerTransport>(
user_agent,
start_height,
relay,
} = inbound_stream
}) = inbound_stream
.next()
.await
.expect("stream item")

View File

@ -4,11 +4,14 @@ use std::sync::{Arc, Mutex};
use arti_client::{DataStream, TorAddr, TorClient, TorClientConfig};
use tor_rtcompat::tokio::TokioRuntimeHandle;
use tower::{util::BoxService, Service};
use tower::Service;
use zebra_chain::parameters::Network;
use crate::{connect_isolated, connect_isolated_with_inbound, BoxError, Request, Response};
use crate::{
connect_isolated, connect_isolated_with_inbound, peer::Client as ZebraClient, BoxError,
Request, Response,
};
#[cfg(test)]
mod tests;
@ -44,7 +47,7 @@ pub async fn connect_isolated_tor(
network: Network,
hostname: String,
user_agent: String,
) -> Result<BoxService<Request, Response, BoxError>, BoxError> {
) -> Result<ZebraClient, BoxError> {
let tor_stream = new_tor_stream(hostname).await?;
// Calling connect_isolated_tor_with_inbound causes lifetime issues.
@ -68,7 +71,7 @@ pub async fn connect_isolated_tor_with_inbound<InboundService>(
hostname: String,
user_agent: String,
inbound_service: InboundService,
) -> Result<BoxService<Request, Response, BoxError>, BoxError>
) -> Result<ZebraClient, BoxError>
where
InboundService:
Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,

View File

@ -168,15 +168,24 @@ pub use crate::{
config::Config,
isolated::{connect_isolated, connect_isolated_tcp_direct},
meta_addr::PeerAddrState,
peer::{HandshakeError, PeerError, SharedPeerError},
peer::{Client, ConnectedAddr, ConnectionInfo, HandshakeError, PeerError, SharedPeerError},
peer_set::init,
policies::RetryLimit,
protocol::internal::{InventoryResponse, Request, Response},
protocol::{
external::{Version, VersionMessage},
internal::{InventoryResponse, Request, Response},
},
};
/// Types used in the definition of [`Request`] and [`Response`] messages.
/// Types used in the definition of [`Request`], [`Response`], and [`VersionMessage`].
pub mod types {
pub use crate::{meta_addr::MetaAddr, protocol::types::PeerServices};
pub use crate::{
meta_addr::MetaAddr,
protocol::{
external::{AddrInVersion, Nonce},
types::PeerServices,
},
};
#[cfg(any(test, feature = "proptest-impl"))]
pub use crate::protocol::external::InventoryHash;

View File

@ -25,7 +25,7 @@ pub use client::Client;
pub use connection::Connection;
pub use connector::{Connector, OutboundConnectorRequest};
pub use error::{ErrorSlot, HandshakeError, PeerError, SharedPeerError};
pub use handshake::{ConnectedAddr, Handshake, HandshakeRequest};
pub use handshake::{ConnectedAddr, ConnectionInfo, Handshake, HandshakeRequest};
pub use load_tracked_client::LoadTrackedClient;
pub use minimum_peer_version::MinimumPeerVersion;
pub use priority::{AttributePreference, PeerPreference};

View File

@ -6,6 +6,7 @@ use std::{
iter,
net::SocketAddr,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
@ -19,10 +20,13 @@ use tokio::{sync::broadcast, task::JoinHandle};
use tower::Service;
use crate::{
peer::error::{AlreadyErrored, ErrorSlot, PeerError, SharedPeerError},
peer::{
error::{AlreadyErrored, ErrorSlot, PeerError, SharedPeerError},
ConnectionInfo,
},
peer_set::InventoryChange,
protocol::{
external::{types::Version, InventoryHash},
external::InventoryHash,
internal::{Request, Response},
},
BoxError,
@ -33,6 +37,9 @@ pub mod tests;
/// The "client" duplex half of a peer connection.
pub struct Client {
/// The metadata for the connected peer `service`.
pub connection_info: Arc<ConnectionInfo>,
/// Used to shut down the corresponding heartbeat.
/// This is always Some except when we take it on drop.
pub(crate) shutdown_tx: Option<oneshot::Sender<CancelHeartbeatTask>>,
@ -44,17 +51,11 @@ pub struct Client {
/// so that the peer set can route retries to other clients.
pub(crate) inv_collector: broadcast::Sender<InventoryChange>,
/// The peer address for registering missing inventory.
pub(crate) transient_addr: Option<SocketAddr>,
/// A slot for an error shared between the Connection and the Client that uses it.
///
/// `None` unless the connection or client have errored.
pub(crate) error_slot: ErrorSlot,
/// The peer connection's protocol version.
pub(crate) version: Version,
/// A handle to the task responsible for connecting to the peer.
pub(crate) connection_task: JoinHandle<()>,
@ -84,6 +85,8 @@ pub(crate) struct ClientRequest {
pub inv_collector: Option<broadcast::Sender<InventoryChange>>,
/// The peer address for registering missing inventory.
///
/// TODO: replace this with `ConnectedAddr`?
pub transient_addr: Option<SocketAddr>,
/// The tracing context for the request, so that work the connection task does
@ -170,7 +173,10 @@ impl std::fmt::Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// skip the channels, they don't tell us anything useful
f.debug_struct("Client")
.field("connection_info", &self.connection_info)
.field("error_slot", &self.error_slot)
.field("connection_task", &self.connection_task)
.field("heartbeat_task", &self.heartbeat_task)
.finish()
}
}
@ -594,7 +600,7 @@ impl Service<Request> for Client {
request,
tx,
inv_collector: Some(self.inv_collector.clone()),
transient_addr: self.transient_addr,
transient_addr: self.connection_info.connected_addr.get_transient_addr(),
span,
}) {
Err(e) => {

View File

@ -3,8 +3,13 @@
#![cfg_attr(feature = "proptest-impl", allow(dead_code))]
use std::time::Duration;
use std::{
net::{Ipv4Addr, SocketAddrV4},
sync::Arc,
time::Duration,
};
use chrono::Utc;
use futures::{
channel::{mpsc, oneshot},
future::{self, AbortHandle, Future, FutureExt},
@ -14,11 +19,20 @@ use tokio::{
task::JoinHandle,
};
use zebra_chain::block::Height;
use crate::{
peer::{error::SharedPeerError, CancelHeartbeatTask, Client, ClientRequest, ErrorSlot},
constants,
peer::{
error::SharedPeerError, CancelHeartbeatTask, Client, ClientRequest, ConnectionInfo,
ErrorSlot,
},
peer_set::InventoryChange,
protocol::external::types::Version,
BoxError,
protocol::{
external::{types::Version, AddrInVersion},
types::{Nonce, PeerServices},
},
BoxError, VersionMessage,
};
#[cfg(test)]
@ -34,7 +48,7 @@ pub struct ClientTestHarness {
#[allow(dead_code)]
inv_receiver: Option<broadcast::Receiver<InventoryChange>>,
error_slot: ErrorSlot,
version: Version,
remote_version: Version,
connection_aborter: AbortHandle,
heartbeat_aborter: AbortHandle,
}
@ -50,9 +64,9 @@ impl ClientTestHarness {
}
}
/// Gets the peer protocol version associated to the [`Client`].
pub fn version(&self) -> Version {
self.version
/// Gets the remote peer protocol version reported by the [`Client`].
pub fn remote_version(&self) -> Version {
self.remote_version
}
/// Returns true if the [`Client`] instance still wants connection heartbeats to be sent.
@ -278,20 +292,46 @@ where
let (inv_sender, inv_receiver) = broadcast::channel(5);
let error_slot = ErrorSlot::default();
let version = self.version.unwrap_or(Version(0));
let remote_version = self.version.unwrap_or(Version(0));
let (connection_task, connection_aborter) =
Self::spawn_background_task_or_fallback(self.connection_task);
let (heartbeat_task, heartbeat_aborter) =
Self::spawn_background_task_or_fallback_with_result(self.heartbeat_task);
let negotiated_version =
std::cmp::min(remote_version, constants::CURRENT_NETWORK_PROTOCOL_VERSION);
let remote = VersionMessage {
version: remote_version,
services: PeerServices::default(),
timestamp: Utc::now(),
address_recv: AddrInVersion::new(
SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1),
PeerServices::default(),
),
address_from: AddrInVersion::new(
SocketAddrV4::new(Ipv4Addr::LOCALHOST, 2),
PeerServices::default(),
),
nonce: Nonce::default(),
user_agent: "client test harness".to_string(),
start_height: Height(0),
relay: true,
};
let connection_info = Arc::new(ConnectionInfo {
connected_addr: crate::peer::ConnectedAddr::Isolated,
remote,
negotiated_version,
});
let client = Client {
connection_info,
shutdown_tx: Some(shutdown_sender),
server_tx: client_request_sender,
inv_collector: inv_sender,
transient_addr: None,
error_slot: error_slot.clone(),
version,
connection_task,
heartbeat_task,
};
@ -301,7 +341,7 @@ where
shutdown_receiver: Some(shutdown_receiver),
inv_receiver: Some(inv_receiver),
error_slot,
version,
remote_version,
connection_aborter,
heartbeat_aborter,
};

View File

@ -29,7 +29,7 @@ use crate::{
meta_addr::MetaAddr,
peer::{
connection::peer_tx::PeerTx, error::AlreadyErrored, ClientRequest, ClientRequestReceiver,
ConnectedAddr, ErrorSlot, InProgressClientRequest, MustUseClientResponseSender, PeerError,
ConnectionInfo, ErrorSlot, InProgressClientRequest, MustUseClientResponseSender, PeerError,
SharedPeerError,
},
peer_set::ConnectionTracker,
@ -448,6 +448,12 @@ impl From<Request> for InboundMessage {
/// The channels, services, and associated state for a peer connection.
pub struct Connection<S, Tx> {
/// The metadata for the connected peer `service`.
///
/// This field is used for debugging.
#[allow(dead_code)]
pub connection_info: Arc<ConnectionInfo>,
/// The state of this connection's current request or response.
pub(super) state: State,
@ -505,6 +511,21 @@ pub struct Connection<S, Tx> {
pub(super) last_metrics_state: Option<Cow<'static, str>>,
}
impl<S, Tx> fmt::Debug for Connection<S, Tx> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// skip the channels, they don't tell us anything useful
f.debug_struct(std::any::type_name::<Connection<S, Tx>>())
.field("connection_info", &self.connection_info)
.field("state", &self.state)
.field("request_timer", &self.request_timer)
.field("cached_addrs", &self.cached_addrs.len())
.field("error_slot", &self.error_slot)
.field("metrics_label", &self.metrics_label)
.field("last_metrics_state", &self.last_metrics_state)
.finish()
}
}
impl<S, Tx> Connection<S, Tx> {
/// Return a new connection from its channels, services, and shared state.
pub(crate) fn new(
@ -513,9 +534,12 @@ impl<S, Tx> Connection<S, Tx> {
error_slot: ErrorSlot,
peer_tx: Tx,
connection_tracker: ConnectionTracker,
connected_addr: ConnectedAddr,
connection_info: Arc<ConnectionInfo>,
) -> Self {
let metrics_label = connection_info.connected_addr.get_transient_addr_label();
Connection {
connection_info,
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
@ -524,7 +548,7 @@ impl<S, Tx> Connection<S, Tx> {
error_slot,
peer_tx: peer_tx.into(),
connection_tracker,
metrics_label: connected_addr.get_transient_addr_label(),
metrics_label,
last_metrics_state: None,
}
}

View File

@ -1,17 +1,26 @@
//! Tests for peer connections
use std::io;
use std::{
io,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
sync::Arc,
};
use chrono::Utc;
use futures::{channel::mpsc, sink::SinkMapErr, SinkExt};
use zebra_chain::serialization::SerializationError;
use zebra_chain::{block::Height, serialization::SerializationError};
use zebra_test::mock_service::MockService;
use crate::{
peer::{ClientRequest, ConnectedAddr, Connection, ErrorSlot},
constants::CURRENT_NETWORK_PROTOCOL_VERSION,
peer::{ClientRequest, ConnectedAddr, Connection, ConnectionInfo, ErrorSlot},
peer_set::ActiveConnectionCounter,
protocol::external::Message,
Request, Response,
protocol::{
external::{AddrInVersion, Message},
types::{Nonce, PeerServices},
},
Request, Response, VersionMessage,
};
mod prop;
@ -45,13 +54,35 @@ fn new_test_connection<A>() -> (
};
let peer_tx = peer_tx.sink_map_err(error_converter);
let fake_addr: SocketAddr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 4).into();
let fake_version = CURRENT_NETWORK_PROTOCOL_VERSION;
let fake_services = PeerServices::default();
let remote = VersionMessage {
version: fake_version,
services: fake_services,
timestamp: Utc::now(),
address_recv: AddrInVersion::new(fake_addr, fake_services),
address_from: AddrInVersion::new(fake_addr, fake_services),
nonce: Nonce::default(),
user_agent: "connection test".to_string(),
start_height: Height(0),
relay: true,
};
let connection_info = ConnectionInfo {
connected_addr: ConnectedAddr::Isolated,
remote,
negotiated_version: fake_version,
};
let connection = Connection::new(
mock_inbound_service.clone(),
client_rx,
shared_error_slot.clone(),
peer_tx,
ActiveConnectionCounter::new_counter().track_connection(),
ConnectedAddr::Isolated,
Arc::new(connection_info),
);
(

View File

@ -45,7 +45,7 @@ use crate::{
internal::{Request, Response},
},
types::MetaAddr,
BoxError, Config,
BoxError, Config, VersionMessage,
};
/// A [`Service`] that handshakes with a remote peer and constructs a
@ -76,6 +76,25 @@ where
parent_span: Span,
}
impl<S, C> fmt::Debug for Handshake<S, C>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send,
C: ChainTip + Clone + Send + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// skip the channels, they don't tell us anything useful
f.debug_struct(std::any::type_name::<Handshake<S, C>>())
.field("config", &self.config)
.field("user_agent", &self.user_agent)
.field("our_services", &self.our_services)
.field("relay", &self.relay)
.field("minimum_peer_version", &self.minimum_peer_version)
.field("parent_span", &self.parent_span)
.finish()
}
}
impl<S, C> Clone for Handshake<S, C>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
@ -98,6 +117,26 @@ where
}
}
/// The metadata for a peer connection.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ConnectionInfo {
/// The connected peer address, if known.
/// This address might not be valid for outbound connections.
///
/// Peers can be connected via a transient inbound or proxy address,
/// which will appear as the connected address to the OS and Zebra.
pub connected_addr: ConnectedAddr,
/// The network protocol [`VersionMessage`](crate::VersionMessage) sent by the remote peer.
pub remote: VersionMessage,
/// The network protocol version negotiated with the remote peer.
///
/// Derived from `remote.version` and the
/// [current `zebra_network` protocol version](constants::CURRENT_NETWORK_PROTOCOL_VERSION).
pub negotiated_version: Version,
}
/// The peer address that we are handshaking with.
///
/// Typically, we can rely on outbound addresses, but inbound addresses don't
@ -108,7 +147,10 @@ pub enum ConnectedAddr {
///
/// In an honest network, a Zcash peer is listening on this exact address
/// and port.
OutboundDirect { addr: SocketAddr },
OutboundDirect {
/// The connected outbound remote address and port.
addr: SocketAddr,
},
/// The address we received from the OS, when a remote peer directly
/// connected to our Zcash listener port.
@ -117,7 +159,10 @@ pub enum ConnectedAddr {
/// if its outbound address is the same as its listener address. But the port
/// is an ephemeral outbound TCP port, not a listener port.
InboundDirect {
/// The connected inbound remote address.
maybe_ip: IpAddr,
/// The connected inbound transient remote port.
transient_port: u16,
},
@ -127,7 +172,10 @@ pub enum ConnectedAddr {
/// outbound address and port can be used as an identifier for the duration
/// of this connection.
OutboundProxy {
/// The remote address and port of the proxy.
proxy_addr: SocketAddr,
/// The local address and transient port we used to connect to the proxy.
transient_local_addr: SocketAddr,
},
@ -136,7 +184,10 @@ pub enum ConnectedAddr {
///
/// The proxy's ephemeral outbound address can be used as an identifier for
/// the duration of this connection.
InboundProxy { transient_addr: SocketAddr },
InboundProxy {
/// The local address and transient port we used to connect to the proxy.
transient_addr: SocketAddr,
},
/// An isolated connection, where we deliberately don't have any connection metadata.
Isolated,
@ -208,6 +259,8 @@ impl ConnectedAddr {
/// This address must not depend on the canonical address from the `Version`
/// message. Otherwise, malicious peers could interfere with other peers
/// `AddressBook` state.
///
/// TODO: remove the `get_` from these methods (Rust style avoids `get` prefixes)
pub fn get_address_book_addr(&self) -> Option<SocketAddr> {
match self {
OutboundDirect { addr } => Some(*addr),
@ -512,6 +565,8 @@ where
///
/// We split `Handshake` into its components before calling this function,
/// to avoid infectious `Sync` bounds on the returned future.
///
/// Returns the [`VersionMessage`](crate::VersionMessage) sent by the remote peer.
#[allow(clippy::too_many_arguments)]
pub async fn negotiate_version<PeerTransport>(
peer_conn: &mut Framed<PeerTransport, Codec>,
@ -522,7 +577,7 @@ pub async fn negotiate_version<PeerTransport>(
our_services: PeerServices,
relay: bool,
mut minimum_peer_version: MinimumPeerVersion<impl ChainTip>,
) -> Result<(Version, PeerServices, SocketAddr), HandshakeError>
) -> Result<VersionMessage, HandshakeError>
where
PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
@ -570,7 +625,7 @@ where
}
};
let our_version = Message::Version {
let our_version = VersionMessage {
version: constants::CURRENT_NETWORK_PROTOCOL_VERSION,
services: our_services,
timestamp,
@ -581,7 +636,8 @@ where
user_agent: user_agent.clone(),
start_height: minimum_peer_version.chain_tip_height(),
relay,
};
}
.into();
debug!(?our_version, "sending initial version message");
peer_conn.send(our_version).await?;
@ -592,11 +648,11 @@ where
.ok_or(HandshakeError::ConnectionClosed)??;
// Wait for next message if the one we got is not Version
loop {
let remote: VersionMessage = loop {
match remote_msg {
Message::Version { .. } => {
debug!(?remote_msg, "got version message from remote peer");
break;
Message::Version(version_message) => {
debug!(?version_message, "got version message from remote peer");
break version_message;
}
_ => {
remote_msg = peer_conn
@ -606,34 +662,18 @@ where
debug!(?remote_msg, "ignoring non-version message from remote peer");
}
}
};
let remote_address_services = remote.address_from.untrusted_services();
if remote_address_services != remote.services {
info!(
?remote.services,
?remote_address_services,
?remote.user_agent,
"peer with inconsistent version services and version address services",
);
}
// If we got a Version message, destructure its fields into the local scope.
let (remote_nonce, remote_services, remote_version, remote_canonical_addr, user_agent) =
if let Message::Version {
version,
services,
address_from,
nonce,
user_agent,
..
} = remote_msg
{
let canonical_addr = address_from.addr();
let address_services = address_from.untrusted_services();
if address_services != services {
info!(
?services,
?address_services,
"peer with inconsistent version services and version address services"
);
}
(nonce, services, version, canonical_addr, user_agent)
} else {
Err(HandshakeError::UnexpectedMessage(Box::new(remote_msg)))?
};
// Check for nonce reuse, indicating self-connection
//
// # Correctness
@ -643,7 +683,7 @@ where
// released.
let nonce_reuse = {
let mut locked_nonces = nonces.lock().await;
let nonce_reuse = locked_nonces.contains(&remote_nonce);
let nonce_reuse = locked_nonces.contains(&remote.nonce);
// Regardless of whether we observed nonce reuse, clean up the nonce set.
locked_nonces.remove(&local_nonce);
nonce_reuse
@ -655,12 +695,13 @@ where
// SECURITY: Reject connections to peers on old versions, because they might not know about all
// network upgrades and could lead to chain forks or slower block propagation.
let min_version = minimum_peer_version.current();
if remote_version < min_version {
if remote.version < min_version {
debug!(
remote_ip = ?their_addr,
?remote_version,
?remote.version,
?min_version,
"disconnecting from peer with obsolete network protocol version"
?remote.user_agent,
"disconnecting from peer with obsolete network protocol version",
);
// the value is the number of rejected handshakes, by peer IP and protocol version
@ -668,29 +709,30 @@ where
"zcash.net.peers.obsolete",
1,
"remote_ip" => their_addr.to_string(),
"remote_version" => remote_version.to_string(),
"remote_version" => remote.version.to_string(),
"min_version" => min_version.to_string(),
"user_agent" => user_agent,
"user_agent" => remote.user_agent.clone(),
);
// the value is the remote version of the most recent rejected handshake from each peer
metrics::gauge!(
"zcash.net.peers.version.obsolete",
remote_version.0 as f64,
remote.version.0 as f64,
"remote_ip" => their_addr.to_string(),
);
// Disconnect if peer is using an obsolete version.
Err(HandshakeError::ObsoleteVersion(remote_version))?;
Err(HandshakeError::ObsoleteVersion(remote.version))?;
} else {
let negotiated_version = min(constants::CURRENT_NETWORK_PROTOCOL_VERSION, remote_version);
let negotiated_version = min(constants::CURRENT_NETWORK_PROTOCOL_VERSION, remote.version);
debug!(
remote_ip = ?their_addr,
?remote_version,
?remote.version,
?negotiated_version,
?min_version,
"negotiated network protocol version with peer"
?remote.user_agent,
"negotiated network protocol version with peer",
);
// the value is the number of connected handshakes, by peer IP and protocol version
@ -698,16 +740,16 @@ where
"zcash.net.peers.connected",
1,
"remote_ip" => their_addr.to_string(),
"remote_version" => remote_version.to_string(),
"remote_version" => remote.version.to_string(),
"negotiated_version" => negotiated_version.to_string(),
"min_version" => min_version.to_string(),
"user_agent" => user_agent,
"user_agent" => remote.user_agent.clone(),
);
// the value is the remote version of the most recent connected handshake from each peer
metrics::gauge!(
"zcash.net.peers.version.connected",
remote_version.0 as f64,
remote.version.0 as f64,
"remote_ip" => their_addr.to_string(),
);
}
@ -736,7 +778,7 @@ where
}
}
Ok((remote_version, remote_services, remote_canonical_addr))
Ok(remote)
}
/// A handshake request.
@ -813,8 +855,7 @@ where
.finish(),
);
// Wrap the entire initial connection setup in a timeout.
let (remote_version, remote_services, remote_canonical_addr) = negotiate_version(
let remote = negotiate_version(
&mut peer_conn,
&connected_addr,
config,
@ -826,6 +867,9 @@ where
)
.await?;
let remote_canonical_addr = remote.address_from.addr();
let remote_services = remote.services;
// If we've learned potential peer addresses from an inbound
// connection or handshake, add those addresses to our address book.
//
@ -853,7 +897,14 @@ where
// Set the connection's version to the minimum of the received version or our own.
let negotiated_version =
std::cmp::min(remote_version, constants::CURRENT_NETWORK_PROTOCOL_VERSION);
std::cmp::min(remote.version, constants::CURRENT_NETWORK_PROTOCOL_VERSION);
// Limit containing struct size, and avoid multiple duplicates of 300+ bytes of data.
let connection_info = Arc::new(ConnectionInfo {
connected_addr,
remote,
negotiated_version,
});
// Reconfigure the codec to use the negotiated version.
//
@ -970,7 +1021,7 @@ where
error_slot.clone(),
peer_tx,
connection_tracker,
connected_addr,
connection_info.clone(),
);
let connection_task = tokio::spawn(
@ -993,12 +1044,11 @@ where
);
let client = Client {
connection_info,
shutdown_tx: Some(shutdown_tx),
server_tx,
inv_collector,
transient_addr: connected_addr.get_transient_addr(),
error_slot,
version: remote_version,
connection_task,
heartbeat_task,
};

View File

@ -1,7 +1,10 @@
//! A peer connection service wrapper type to handle load tracking and provide access to the
//! reported protocol version.
use std::task::{Context, Poll};
use std::{
sync::Arc,
task::{Context, Poll},
};
use tower::{
load::{Load, PeakEwma},
@ -10,7 +13,7 @@ use tower::{
use crate::{
constants::{EWMA_DECAY_TIME_NANOS, EWMA_DEFAULT_RTT},
peer::Client,
peer::{Client, ConnectionInfo},
protocol::external::types::Version,
};
@ -18,14 +21,17 @@ use crate::{
///
/// It also keeps track of the peer's reported protocol version.
pub struct LoadTrackedClient {
/// A service representing a connected peer, wrapped in a load tracker.
service: PeakEwma<Client>,
version: Version,
/// The metadata for the connected peer `service`.
connection_info: Arc<ConnectionInfo>,
}
/// Create a new [`LoadTrackedClient`] wrapping the provided `client` service.
impl From<Client> for LoadTrackedClient {
fn from(client: Client) -> Self {
let version = client.version;
let connection_info = client.connection_info.clone();
let service = PeakEwma::new(
client,
@ -34,14 +40,17 @@ impl From<Client> for LoadTrackedClient {
tower::load::CompleteOnResponse::default(),
);
LoadTrackedClient { service, version }
LoadTrackedClient {
service,
connection_info,
}
}
}
impl LoadTrackedClient {
/// Retrieve the peer's reported protocol version.
pub fn version(&self) -> Version {
self.version
pub fn remote_version(&self) -> Version {
self.connection_info.remote.version
}
}

View File

@ -1,5 +1,7 @@
//! Watches for chain tip height updates to determine the minimum supported peer protocol version.
use std::fmt;
use zebra_chain::{block::Height, chain_tip::ChainTip, parameters::Network};
use crate::protocol::external::types::Version;
@ -16,6 +18,17 @@ pub struct MinimumPeerVersion<C> {
has_changed: bool,
}
impl<C> fmt::Debug for MinimumPeerVersion<C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// skip the chain tip to avoid locking issues
f.debug_struct(std::any::type_name::<MinimumPeerVersion<C>>())
.field("network", &self.network)
.field("current_minimum", &self.current_minimum)
.field("has_changed", &self.has_changed)
.finish()
}
}
impl<C> MinimumPeerVersion<C>
where
C: ChainTip,

View File

@ -439,7 +439,7 @@ where
let cancel = self.cancel_handles.remove(&key);
assert!(cancel.is_some(), "missing cancel handle");
if svc.version() >= self.minimum_peer_version.current() {
if svc.remote_version() >= self.minimum_peer_version.current() {
self.ready_services.insert(key, svc);
}
}
@ -509,7 +509,7 @@ where
let preselected_p2c_peer = &mut self.preselected_p2c_peer;
self.ready_services.retain(|address, peer| {
if peer.version() >= minimum_version {
if peer.remote_version() >= minimum_version {
true
} else {
if *preselected_p2c_peer == Some(*address) {
@ -562,7 +562,7 @@ where
/// If the service is for a connection to an outdated peer, the request is cancelled and the
/// service is dropped.
fn push_unready(&mut self, key: D::Key, svc: D::Service) {
let peer_version = svc.version();
let peer_version = svc.remote_version();
let (tx, rx) = oneshot::channel();
self.unready_services.push(UnreadyService {

View File

@ -299,7 +299,7 @@ where
let poll_result = peer_set.ready().now_or_never();
let all_peers_are_outdated = harnesses
.iter()
.all(|harness| harness.version() < minimum_version);
.all(|harness| harness.remote_version() < minimum_version);
if all_peers_are_outdated {
prop_assert!(matches!(poll_result, None));
@ -309,7 +309,7 @@ where
let mut number_of_connected_peers = 0;
for harness in harnesses {
let is_outdated = harness.version() < minimum_version;
let is_outdated = harness.remote_version() < minimum_version;
let is_connected = harness.wants_connection_heartbeats();
prop_assert!(

View File

@ -19,5 +19,7 @@ mod tests;
pub use addr::{canonical_socket_addr, AddrInVersion};
pub use codec::Codec;
pub use inv::InventoryHash;
pub use message::Message;
pub use message::{Message, VersionMessage};
pub use types::{Nonce, Version};
pub use zebra_chain::serialization::MAX_PROTOCOL_MESSAGE_LEN;

View File

@ -26,7 +26,7 @@ use crate::constants;
use super::{
addr::{AddrInVersion, AddrV1, AddrV2},
message::{Message, RejectReason},
message::{Message, RejectReason, VersionMessage},
types::*,
};
@ -195,7 +195,7 @@ impl Codec {
/// contain a checksum of the message body.
fn write_body<W: Write>(&self, msg: &Message, mut writer: W) -> Result<(), Error> {
match msg {
Message::Version {
Message::Version(VersionMessage {
version,
services,
timestamp,
@ -205,7 +205,7 @@ impl Codec {
user_agent,
start_height,
relay,
} => {
}) => {
writer.write_u32::<LittleEndian>(version.0)?;
writer.write_u64::<LittleEndian>(services.bits())?;
// # Security
@ -465,7 +465,7 @@ impl Decoder for Codec {
impl Codec {
fn read_version<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
Ok(Message::Version {
Ok(VersionMessage {
version: Version(reader.read_u32::<LittleEndian>()?),
// Use from_bits_truncate to discard unknown service bits.
services: PeerServices::from_bits_truncate(reader.read_u64::<LittleEndian>()?),
@ -485,7 +485,8 @@ impl Codec {
1 => true,
_ => return Err(Error::Parse("non-bool value supplied in relay field")),
},
})
}
.into())
}
fn read_verack<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
@ -723,9 +724,7 @@ impl Codec {
}
}
// TODO:
// - move these unit tests to a separate file
// - add exterior integration tests + proptest
// TODO: move these tests to their own module
#[cfg(test)]
mod tests {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
@ -740,7 +739,8 @@ mod tests {
static ref VERSION_TEST_VECTOR: Message = {
let services = PeerServices::NODE_NETWORK;
let timestamp = Utc.timestamp(1_568_000_000, 0);
Message::Version {
VersionMessage {
version: crate::constants::CURRENT_NETWORK_PROTOCOL_VERSION,
services,
timestamp,
@ -757,6 +757,7 @@ mod tests {
start_height: block::Height(540_000),
relay: true,
}
.into()
};
}

View File

@ -9,7 +9,7 @@ use zebra_chain::{
transaction::UnminedTx,
};
use crate::meta_addr::MetaAddr;
use crate::{meta_addr::MetaAddr, BoxError};
use super::{addr::AddrInVersion, inv::InventoryHash, types::*};
@ -45,54 +45,7 @@ pub enum Message {
/// is distinct from a simple version number.
///
/// [Bitcoin reference](https://en.bitcoin.it/wiki/Protocol_documentation#version)
Version {
/// The network version number supported by the sender.
version: Version,
/// The network services advertised by the sender.
services: PeerServices,
/// The time when the version message was sent.
///
/// This is a 64-bit field. Zebra rejects out-of-range times as invalid.
#[cfg_attr(
any(test, feature = "proptest-impl"),
proptest(strategy = "datetime_full()")
)]
timestamp: DateTime<Utc>,
/// The network address of the node receiving this message, and its
/// advertised network services.
///
/// Q: how does the handshake know the remote peer's services already?
address_recv: AddrInVersion,
/// The network address of the node sending this message, and its
/// advertised network services.
address_from: AddrInVersion,
/// Node random nonce, randomly generated every time a version
/// packet is sent. This nonce is used to detect connections
/// to self.
nonce: Nonce,
/// The Zcash user agent advertised by the sender.
user_agent: String,
/// The last block received by the emitting node.
start_height: block::Height,
/// Whether the remote peer should announce relayed
/// transactions or not, see [BIP 0037].
///
/// Zebra does not implement the bloom filters in [BIP 0037].
/// Instead, it only relays:
/// - newly verified best chain block hashes and mempool transaction IDs,
/// - after it reaches the chain tip.
///
/// [BIP 0037]: https://github.com/bitcoin/bips/blob/master/bip-0037.mediawiki
relay: bool,
},
Version(VersionMessage),
/// A `verack` message.
///
@ -340,6 +293,69 @@ pub enum Message {
FilterClear,
}
/// A `version` message.
///
/// Note that although this is called `version` in Bitcoin, its role is really
/// analogous to a `ClientHello` message in TLS, used to begin a handshake, and
/// is distinct from a simple version number.
///
/// This struct provides a type that is guaranteed to be a `version` message,
/// and allows [`Message::Version`](Message) fields to be accessed directly.
///
/// [Bitcoin reference](https://en.bitcoin.it/wiki/Protocol_documentation#version)
#[derive(Clone, Eq, PartialEq, Debug)]
#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))]
pub struct VersionMessage {
/// The network version number supported by the sender.
pub version: Version,
/// The network services advertised by the sender.
pub services: PeerServices,
/// The time when the version message was sent.
///
/// This is a 64-bit field. Zebra rejects out-of-range times as invalid.
///
/// TODO: replace with a custom DateTime64 type (#2171)
#[cfg_attr(
any(test, feature = "proptest-impl"),
proptest(strategy = "datetime_full()")
)]
pub timestamp: DateTime<Utc>,
/// The network address of the node receiving this message, and its
/// advertised network services.
///
/// Q: how does the handshake know the remote peer's services already?
pub address_recv: AddrInVersion,
/// The network address of the node sending this message, and its
/// advertised network services.
pub address_from: AddrInVersion,
/// Node random nonce, randomly generated every time a version
/// packet is sent. This nonce is used to detect connections
/// to self.
pub nonce: Nonce,
/// The Zcash user agent advertised by the sender.
pub user_agent: String,
/// The last block received by the emitting node.
pub start_height: block::Height,
/// Whether the remote peer should announce relayed
/// transactions or not, see [BIP 0037].
///
/// Zebra does not implement the bloom filters in [BIP 0037].
/// Instead, it only relays:
/// - newly verified best chain block hashes and mempool transaction IDs,
/// - after it reaches the chain tip.
///
/// [BIP 0037]: https://github.com/bitcoin/bips/blob/master/bip-0037.mediawiki
pub relay: bool,
}
/// The maximum size of the rejection message.
///
/// This is equivalent to `COMMAND_SIZE` in zcashd.
@ -350,6 +366,27 @@ const MAX_REJECT_MESSAGE_LENGTH: usize = 12;
/// This is equivalent to `MAX_REJECT_MESSAGE_LENGTH` in zcashd.
const MAX_REJECT_REASON_LENGTH: usize = 111;
impl From<VersionMessage> for Message {
fn from(version_message: VersionMessage) -> Self {
Message::Version(version_message)
}
}
impl TryFrom<Message> for VersionMessage {
type Error = BoxError;
fn try_from(message: Message) -> Result<Self, Self::Error> {
match message {
Message::Version(version_message) => Ok(version_message),
_ => Err(format!(
"{} message is not a version message: {message:?}",
message.command()
)
.into()),
}
}
}
// TODO: add tests for Error conversion and Reject message serialization (#4633)
// (Zebra does not currently send reject messages, and it ignores received reject messages.)
impl<E> From<E> for Message
@ -408,13 +445,13 @@ pub enum RejectReason {
impl fmt::Display for Message {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(&match self {
Message::Version {
Message::Version(VersionMessage {
version,
address_recv,
address_from,
user_agent,
..
} => format!(
}) => format!(
"version {{ network: {}, recv: {},_from: {}, user_agent: {:?} }}",
version,
address_recv.addr(),
@ -481,7 +518,7 @@ impl Message {
/// Returns the Zcash protocol message command as a string.
pub fn command(&self) -> &'static str {
match self {
Message::Version { .. } => "version",
Message::Version(_) => "version",
Message::Verack => "verack",
Message::Ping(_) => "ping",
Message::Pong(_) => "pong",

View File

@ -0,0 +1,45 @@
//! Acceptance tests for zebra-network APIs.
use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
sync::Arc,
};
use chrono::Utc;
use zebra_chain::block::Height;
use zebra_network::{
types::{AddrInVersion, Nonce, PeerServices},
ConnectedAddr, ConnectionInfo, Version, VersionMessage,
};
/// Test that the types used in [`ConnectionInfo`] are public,
/// by compiling code that explicitly uses those types.
#[test]
fn connection_info_types_are_public() {
let fake_addr: SocketAddr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3).into();
let fake_version = Version(3);
let fake_services = PeerServices::default();
// Each struct field must have its type explicitly listed here
let connected_addr: ConnectedAddr = ConnectedAddr::OutboundDirect { addr: fake_addr };
let negotiated_version: Version = fake_version;
let remote = VersionMessage {
version: fake_version,
services: fake_services,
timestamp: Utc::now(),
address_recv: AddrInVersion::new(fake_addr, fake_services),
address_from: AddrInVersion::new(fake_addr, fake_services),
nonce: Nonce::default(),
user_agent: "public API compile test".to_string(),
start_height: Height(0),
relay: true,
};
let _connection_info = Arc::new(ConnectionInfo {
connected_addr,
remote,
negotiated_version,
});
}

View File

@ -607,10 +607,7 @@ async fn setup(
) -> (
// real services
// connected peer which responds with isolated_peer_response
Buffer<
BoxService<zebra_network::Request, zebra_network::Response, BoxError>,
zebra_network::Request,
>,
Buffer<zebra_network::Client, zebra_network::Request>,
// inbound service
BoxCloneService<zebra_network::Request, zebra_network::Response, BoxError>,
// outbound peer set (only has the connected peer)