Security: stop gossiping temporary inbound remote addresses to peers

- stop putting inbound addresses in the address book
- drop address book entries that can't be used for outbound connections
  - distinguish between temporary inbound and permanent outbound peer
    addresses
  - also create variants to handle proxy connections
    (but don't use them yet)
  - avoid tracking connection state for isolated connections
- document security constraints for the address book and peer set
This commit is contained in:
teor 2021-05-07 10:50:04 +10:00
parent fde8f1e4ca
commit a8a0d6450c
9 changed files with 491 additions and 134 deletions

View File

@ -1,4 +1,4 @@
//! The addressbook manages information about what peers exist, when they were
//! The `AddressBook` manages information about what peers exist, when they were
//! seen, and what services they provide.
use std::{
@ -13,8 +13,39 @@ use tracing::Span;
use crate::{constants, types::MetaAddr, PeerAddrState};
/// A database of peers, their advertised services, and information on when they
/// were last seen.
/// A database of peer listener addresses, their advertised services, and
/// information on when they were last seen.
///
/// # Security
///
/// Address book state must be based on outbound connections to peers.
///
/// If the address book is updated incorrectly:
/// - malicious peers can interfere with other peers' `AddressBook` state,
/// or
/// - Zebra can advertise unreachable addresses to its own peers.
///
/// ## Adding Addresses
///
/// The address book should only contain Zcash listener port addresses from peers
/// on the configured network. These addresses can come from:
/// - DNS seeders
/// - addresses gossiped by other peers
/// - the canonical address (`Version.address_from`) provided by each peer,
/// particularly peers on inbound connections.
///
/// The remote addresses of inbound connections must not be added to the address
/// book, because they contain ephemeral outbound ports, not listener ports.
///
/// Isolated connections must not add addresses or update the address book.
///
/// ## Updating Address State
///
/// Updates to address state must be based on outbound connections to peers.
///
/// Updates must not be based on:
/// - the remote addresses of inbound connections, or
/// - the canonical address of any connection.
#[derive(Clone, Debug)]
pub struct AddressBook {
/// Each known peer address has a matching `MetaAddr`
@ -33,8 +64,11 @@ pub struct AddressMetrics {
/// The number of addresses in the `Responded` state.
responded: usize,
/// The number of addresses in the `NeverAttempted` state.
never_attempted: usize,
/// The number of addresses in the `NeverAttemptedGossiped` state.
never_attempted_gossiped: usize,
/// The number of addresses in the `NeverAttemptedAlternate` state.
never_attempted_alternate: usize,
/// The number of addresses in the `Failed` state.
failed: usize,
@ -93,9 +127,10 @@ impl AddressBook {
/// Add `new` to the address book, updating the previous entry if `new` is
/// more recent or discarding `new` if it is stale.
///
/// ## Note
/// # Correctness
///
/// All changes should go through `update` or `take`, to ensure accurate metrics.
/// All new addresses should go through `update`, so that the address book
/// only contains valid outbound addresses.
pub fn update(&mut self, new: MetaAddr) {
let _guard = self.span.enter();
trace!(
@ -104,6 +139,14 @@ impl AddressBook {
recent_peers = self.recently_live_peers().count(),
);
// Drop any unspecified or client addresses.
//
// Communication with these addresses can be monitored via Zebra's
// metrics. (The address book is for valid peer addresses.)
if !new.is_valid_for_outbound() {
return;
}
if let Some(prev) = self.get_by_addr(new.addr) {
if prev.get_last_seen() > new.get_last_seen() {
return;
@ -117,9 +160,10 @@ impl AddressBook {
/// Removes the entry with `addr`, returning it if it exists
///
/// ## Note
/// # Note
///
/// All changes should go through `update` or `take`, to ensure accurate metrics.
/// All address removals should go through `take`, so that the address
/// book metrics are accurate.
fn take(&mut self, removed_addr: SocketAddr) -> Option<MetaAddr> {
let _guard = self.span.enter();
trace!(
@ -254,7 +298,12 @@ impl AddressBook {
/// Returns metrics for the addresses in this address book.
pub fn address_metrics(&self) -> AddressMetrics {
let responded = self.state_peers(PeerAddrState::Responded).count();
let never_attempted = self.state_peers(PeerAddrState::NeverAttempted).count();
let never_attempted_gossiped = self
.state_peers(PeerAddrState::NeverAttemptedGossiped)
.count();
let never_attempted_alternate = self
.state_peers(PeerAddrState::NeverAttemptedAlternate)
.count();
let failed = self.state_peers(PeerAddrState::Failed).count();
let attempt_pending = self.state_peers(PeerAddrState::AttemptPending).count();
@ -265,7 +314,8 @@ impl AddressBook {
AddressMetrics {
responded,
never_attempted,
never_attempted_gossiped,
never_attempted_alternate,
failed,
attempt_pending,
recently_live,
@ -281,7 +331,11 @@ impl AddressBook {
// TODO: rename to address_book.[state_name]
metrics::gauge!("candidate_set.responded", m.responded as f64);
metrics::gauge!("candidate_set.gossiped", m.never_attempted as f64);
metrics::gauge!("candidate_set.gossiped", m.never_attempted_gossiped as f64);
metrics::gauge!(
"candidate_set.alternate",
m.never_attempted_alternate as f64
);
metrics::gauge!("candidate_set.failed", m.failed as f64);
metrics::gauge!("candidate_set.pending", m.attempt_pending as f64);
@ -327,7 +381,12 @@ impl AddressBook {
self.last_address_log = Some(Instant::now());
// if all peers have failed
if m.responded + m.attempt_pending + m.never_attempted == 0 {
if m.responded
+ m.attempt_pending
+ m.never_attempted_gossiped
+ m.never_attempted_alternate
== 0
{
warn!(
address_metrics = ?m,
"all peer addresses have failed. Hint: check your network connection"

View File

@ -15,6 +15,7 @@ use tower::{
};
use crate::{peer, BoxError, Config, Request, Response};
use peer::ConnectedAddr;
/// Use the provided TCP connection to create a Zcash connection completely
/// isolated from all other node state.
@ -57,13 +58,11 @@ pub fn connect_isolated(
.finish()
.expect("provided mandatory builder parameters");
// We can't get the remote addr from conn, because it might be a tcp
// connection through a socks proxy, not directly to the remote. But it
// doesn't seem like zcashd cares if we give a bogus one, and Zebra doesn't
// touch it at all.
let remote_addr = "0.0.0.0:8233".parse().unwrap();
// Don't send any metadata about the connection
let connected_addr = ConnectedAddr::new_isolated();
Oneshot::new(handshake, (conn, remote_addr)).map_ok(|client| BoxService::new(Wrapper(client)))
Oneshot::new(handshake, (conn, connected_addr))
.map_ok(|client| BoxService::new(Wrapper(client)))
}
// This can be deleted when a new version of Tower with map_err is released.

View File

@ -44,7 +44,13 @@ pub enum PeerAddrState {
/// The peer's address has just been fetched from a DNS seeder, or via peer
/// gossip, but we haven't attempted to connect to it yet.
NeverAttempted,
NeverAttemptedGossiped,
/// The peer's address has just been received as part of a `Version` message,
/// so we might already be connected to this peer.
///
/// Alternate addresses are attempted after gossiped addresses.
NeverAttemptedAlternate,
/// The peer's TCP connection failed, or the peer sent us an unexpected
/// Zcash protocol message, so we failed the connection.
@ -54,9 +60,11 @@ pub enum PeerAddrState {
AttemptPending,
}
// non-test code should explicitly specify the peer address state
#[cfg(test)]
impl Default for PeerAddrState {
fn default() -> Self {
NeverAttempted
NeverAttemptedGossiped
}
}
@ -66,19 +74,23 @@ impl Ord for PeerAddrState {
///
/// See [`CandidateSet`] and [`MetaAddr::cmp`] for more details.
fn cmp(&self, other: &Self) -> Ordering {
use Ordering::*;
match (self, other) {
(Responded, Responded)
| (NeverAttempted, NeverAttempted)
| (Failed, Failed)
| (AttemptPending, AttemptPending) => Ordering::Equal,
| (NeverAttemptedGossiped, NeverAttemptedGossiped)
| (NeverAttemptedAlternate, NeverAttemptedAlternate)
| (AttemptPending, AttemptPending) => Equal,
// We reconnect to `Responded` peers that have stopped sending messages,
// then `NeverAttempted` peers, then `Failed` peers
(Responded, _) => Ordering::Less,
(_, Responded) => Ordering::Greater,
(NeverAttempted, _) => Ordering::Less,
(_, NeverAttempted) => Ordering::Greater,
(Failed, _) => Ordering::Less,
(_, Failed) => Ordering::Greater,
(Responded, _) => Less,
(_, Responded) => Greater,
(NeverAttemptedGossiped, _) => Less,
(_, NeverAttemptedGossiped) => Greater,
(NeverAttemptedAlternate, _) => Less,
(_, NeverAttemptedAlternate) => Greater,
(Failed, _) => Less,
(_, Failed) => Greater,
// AttemptPending is covered by the other cases
}
}
@ -124,8 +136,8 @@ pub struct MetaAddr {
}
impl MetaAddr {
/// Create a new `MetaAddr` from the deserialized fields in an `Addr`
/// message.
/// Create a new `MetaAddr` from the deserialized fields in a gossiped
/// peer `Addr` message.
pub fn new_gossiped(
addr: &SocketAddr,
services: &PeerServices,
@ -136,11 +148,19 @@ impl MetaAddr {
services: *services,
last_seen: *last_seen,
// the state is Zebra-specific, it isn't part of the Zcash network protocol
last_connection_state: NeverAttempted,
last_connection_state: NeverAttemptedGossiped,
}
}
/// Create a new `MetaAddr` for a peer that has just `Responded`.
///
/// # Security
///
/// This address must be the remote address from an outbound connection.
/// Otherwise:
/// - malicious peers could interfere with other peers' `AddressBook` state,
/// or
/// - Zebra could advertise unreachable addresses to its own peers.
pub fn new_responded(addr: &SocketAddr, services: &PeerServices) -> MetaAddr {
MetaAddr {
addr: *addr,
@ -160,6 +180,17 @@ impl MetaAddr {
}
}
/// Create a new `MetaAddr` for a peer's alternate address, received via a
/// `Version` message.
pub fn new_alternate(addr: &SocketAddr, services: &PeerServices) -> MetaAddr {
MetaAddr {
addr: *addr,
services: *services,
last_seen: Utc::now(),
last_connection_state: NeverAttemptedAlternate,
}
}
/// Create a new `MetaAddr` for a peer that has just had an error.
pub fn new_errored(addr: &SocketAddr, services: &PeerServices) -> MetaAddr {
MetaAddr {
@ -195,6 +226,13 @@ impl MetaAddr {
self.last_seen
}
/// Is this address valid for outbound connections?
pub fn is_valid_for_outbound(&self) -> bool {
self.services.contains(PeerServices::NODE_NETWORK)
&& !self.addr.ip().is_unspecified()
&& self.addr.port() != 0
}
/// Return a sanitized version of this `MetaAddr`, for sending to a remote peer.
pub fn sanitize(&self) -> MetaAddr {
let interval = crate::constants::TIMESTAMP_TRUNCATION_SECONDS;
@ -207,7 +245,7 @@ impl MetaAddr {
services: self.services,
last_seen,
// the state isn't sent to the remote peer, but sanitize it anyway
last_connection_state: Default::default(),
last_connection_state: NeverAttemptedGossiped,
}
}
}
@ -222,6 +260,7 @@ impl Ord for MetaAddr {
/// See [`CandidateSet`] for more details.
fn cmp(&self, other: &Self) -> Ordering {
use std::net::IpAddr::{V4, V6};
use Ordering::*;
let oldest_first = self.get_last_seen().cmp(&other.get_last_seen());
let newest_first = oldest_first.reverse();
@ -229,22 +268,23 @@ impl Ord for MetaAddr {
let connection_state = self.last_connection_state.cmp(&other.last_connection_state);
let reconnection_time = match self.last_connection_state {
Responded => oldest_first,
NeverAttempted => newest_first,
NeverAttemptedGossiped => newest_first,
NeverAttemptedAlternate => newest_first,
Failed => oldest_first,
AttemptPending => oldest_first,
};
let ip_numeric = match (self.addr.ip(), other.addr.ip()) {
(V4(a), V4(b)) => a.octets().cmp(&b.octets()),
(V6(a), V6(b)) => a.octets().cmp(&b.octets()),
(V4(_), V6(_)) => Ordering::Less,
(V6(_), V4(_)) => Ordering::Greater,
(V4(_), V6(_)) => Less,
(V6(_), V4(_)) => Greater,
};
connection_state
.then(reconnection_time)
// The remainder is meaningless as an ordering, but required so that we
// have a total order on `MetaAddr` values: self and other must compare
// as Ordering::Equal iff they are equal.
// as Equal iff they are equal.
.then(ip_numeric)
.then(self.addr.port().cmp(&other.addr.port()))
.then(self.services.bits().cmp(&other.services.bits()))

View File

@ -21,4 +21,4 @@ pub use client::Client;
pub use connection::Connection;
pub use connector::Connector;
pub use error::{HandshakeError, PeerError, SharedPeerError};
pub use handshake::Handshake;
pub use handshake::{ConnectedAddr, Handshake, HandshakeRequest};

View File

@ -11,7 +11,7 @@ use tower::{discover::Change, Service, ServiceExt};
use crate::{BoxError, Request, Response};
use super::{Client, Handshake};
use super::{Client, ConnectedAddr, Handshake};
/// A wrapper around [`peer::Handshake`] that opens a TCP connection before
/// forwarding to the inner handshake service. Writing this as its own
@ -53,7 +53,8 @@ where
async move {
let stream = TcpStream::connect(addr).await?;
hs.ready_and().await?;
let client = hs.call((stream, addr)).await?;
let connected_addr = ConnectedAddr::new_outbound_direct(addr);
let client = hs.call((stream, connected_addr)).await?;
Ok(Change::Insert(addr, client))
}
.boxed()

View File

@ -1,7 +1,8 @@
use std::{
collections::HashSet,
fmt,
future::Future,
net::SocketAddr,
net::{IpAddr, Ipv4Addr, SocketAddr},
pin::Pin,
sync::Arc,
task::{Context, Poll},
@ -12,6 +13,7 @@ use futures::{
channel::{mpsc, oneshot},
future, FutureExt, SinkExt, StreamExt,
};
use lazy_static::lazy_static;
use tokio::{net::TcpStream, sync::broadcast, task::JoinError, time::timeout};
use tokio_util::codec::Framed;
use tower::Service;
@ -53,6 +55,191 @@ pub struct Handshake<S> {
parent_span: Span,
}
/// The peer address that we are handshaking with.
///
/// Typically, we can rely on outbound addresses, but inbound addresses don't
/// give us enough information to reconnect to that peer.
#[derive(Copy, Clone, PartialEq)]
pub enum ConnectedAddr {
/// The address we used to make a direct outbound connection.
///
/// In an honest network, a Zcash peer is listening on this exact address
/// and port.
OutboundDirect { addr: SocketAddr },
/// The address we received from the OS, when a remote peer directly
/// connected to our Zcash listener port.
///
/// In an honest network, a Zcash peer might be listening on this address,
/// if its outbound address is the same as its listener address. But the port
/// is an ephemeral outbound TCP port, not a listener port.
InboundDirect {
maybe_ip: IpAddr,
transient_port: u16,
},
/// The proxy address we used to make an outbound connection.
///
/// The proxy address can be used by many connections, but our own ephemeral
/// outbound address and port can be used as an identifier for the duration
/// of this connection.
OutboundProxy {
proxy_addr: SocketAddr,
transient_local_addr: SocketAddr,
},
/// The address we received from the OS, when a remote peer connected via an
/// inbound proxy.
///
/// The proxy's ephemeral outbound address can be used as an identifier for
/// the duration of this connection.
InboundProxy { transient_addr: SocketAddr },
/// An isolated connection, where we deliberately don't connect any metadata.
Isolated,
//
// TODO: handle Tor onion addresses
}
lazy_static! {
/// An unspecified IPv4 address
pub static ref UNSPECIFIED_IPV4_ADDR: SocketAddr =
(Ipv4Addr::UNSPECIFIED, 0).into();
}
use ConnectedAddr::*;
impl ConnectedAddr {
/// Returns a new outbound directly connected addr.
pub fn new_outbound_direct(addr: SocketAddr) -> ConnectedAddr {
OutboundDirect { addr }
}
/// Returns a new inbound directly connected addr.
pub fn new_inbound_direct(addr: SocketAddr) -> ConnectedAddr {
InboundDirect {
maybe_ip: addr.ip(),
transient_port: addr.port(),
}
}
/// Returns a new outbound connected addr via `proxy`.
///
/// `local_addr` is the ephemeral local address of the connection.
#[allow(unused)]
pub fn new_outbound_proxy(proxy: SocketAddr, local_addr: SocketAddr) -> ConnectedAddr {
OutboundProxy {
proxy_addr: proxy,
transient_local_addr: local_addr,
}
}
/// Returns a new inbound connected addr from `proxy`.
//
// TODO: distinguish between direct listeners and proxy listeners in the
// rest of zebra-network
#[allow(unused)]
pub fn new_inbound_proxy(proxy: SocketAddr) -> ConnectedAddr {
InboundProxy {
transient_addr: proxy,
}
}
/// Returns a new isolated connected addr, with no metadata.
pub fn new_isolated() -> ConnectedAddr {
Isolated
}
/// Returns a `SocketAddr` that can be used to track this connection in the
/// `AddressBook`.
///
/// `None` for inbound connections, proxy connections, and isolated
/// connections.
///
/// # Correctness
///
/// This address can be used for reconnection attempts, or as a permanent
/// identifier.
///
/// # Security
///
/// This address must not depend on the canonical address from the `Version`
/// message. Otherwise, malicious peers could interfere with other peers
/// `AddressBook` state.
pub fn get_address_book_addr(&self) -> Option<SocketAddr> {
match self {
OutboundDirect { addr } => Some(*addr),
// TODO: consider using the canonical address of the peer to track
// outbound proxy connections
InboundDirect { .. } | OutboundProxy { .. } | InboundProxy { .. } | Isolated => None,
}
}
/// Returns a `SocketAddr` that can be used to temporarily identify a
/// connection.
///
/// Isolated connections must not change Zebra's peer set or address book
/// state, so they do not have an identifier.
///
/// # Correctness
///
/// The returned address is only valid while the original connection is
/// open. It must not be used in the `AddressBook`, for outbound connection
/// attempts, or as a permanent identifier.
///
/// # Security
///
/// This address must not depend on the canonical address from the `Version`
/// message. Otherwise, malicious peers could interfere with other peers'
/// `PeerSet` state.
pub fn get_transient_addr(&self) -> Option<SocketAddr> {
match self {
OutboundDirect { addr } => Some(*addr),
InboundDirect {
maybe_ip,
transient_port,
} => Some(SocketAddr::new(*maybe_ip, *transient_port)),
OutboundProxy {
transient_local_addr,
..
} => Some(*transient_local_addr),
InboundProxy { transient_addr } => Some(*transient_addr),
Isolated => None,
}
}
/// Returns the metrics label for this connection's address.
pub fn get_transient_addr_label(&self) -> String {
self.get_transient_addr()
.map_or_else(|| "isolated".to_string(), |addr| addr.to_string())
}
/// Returns a short label for the kind of connection.
pub fn get_short_kind_label(&self) -> &'static str {
match self {
OutboundDirect { .. } => "Out",
InboundDirect { .. } => "In",
OutboundProxy { .. } => "ProxOut",
InboundProxy { .. } => "ProxIn",
Isolated => "Isol",
}
}
}
impl fmt::Debug for ConnectedAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let kind = self.get_short_kind_label();
let addr = self.get_transient_addr_label();
if matches!(self, Isolated) {
f.write_str(kind)
} else {
f.debug_tuple(kind).field(&addr).finish()
}
}
}
/// A builder for `Handshake`.
pub struct Builder<S> {
config: Option<Config>,
inbound_service: Option<S>,
@ -81,6 +268,9 @@ where
}
/// Provide a channel for registering inventory advertisements. Optional.
///
/// This channel takes transient remote addresses, which the `PeerSet` uses
/// to look up peers that have specific inventory.
pub fn with_inventory_collector(
mut self,
inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>,
@ -91,7 +281,8 @@ where
/// Provide a hook for timestamp collection. Optional.
///
/// If this is unset, timestamps will not be collected.
/// This channel takes `MetaAddr`s, permanent addresses which can be used to
/// make outbound connections to peers.
pub fn with_timestamp_collector(mut self, timestamp_collector: mpsc::Sender<MetaAddr>) -> Self {
self.timestamp_collector = Some(timestamp_collector);
self
@ -181,19 +372,19 @@ where
}
/// Negotiate the Zcash network protocol version with the remote peer
/// at `addr`, using the connection `peer_conn`.
/// at `connected_addr`, using the connection `peer_conn`.
///
/// We split `Handshake` into its components before calling this function,
/// to avoid infectious `Sync` bounds on the returned future.
pub async fn negotiate_version(
peer_conn: &mut Framed<TcpStream, Codec>,
addr: &SocketAddr,
connected_addr: &ConnectedAddr,
config: Config,
nonces: Arc<futures::lock::Mutex<HashSet<Nonce>>>,
user_agent: String,
our_services: PeerServices,
relay: bool,
) -> Result<(Version, PeerServices), HandshakeError> {
) -> Result<(Version, PeerServices, SocketAddr), HandshakeError> {
// Create a random nonce for this connection
let local_nonce = Nonce::default();
// # Correctness
@ -227,7 +418,12 @@ pub async fn negotiate_version(
version: constants::CURRENT_VERSION,
services: our_services,
timestamp,
address_recv: (PeerServices::NODE_NETWORK, *addr),
address_recv: (
PeerServices::NODE_NETWORK,
connected_addr
.get_transient_addr()
.unwrap_or_else(|| *UNSPECIFIED_IPV4_ADDR),
),
// TODO: detect external address (#1893)
address_from: (our_services, config.listen_addr),
nonce: local_nonce,
@ -248,17 +444,28 @@ pub async fn negotiate_version(
// Check that we got a Version and destructure its fields into the local scope.
debug!(?remote_msg, "got message from remote peer");
let (remote_nonce, remote_services, remote_version) = if let Message::Version {
nonce,
services,
version,
..
} = remote_msg
{
(nonce, services, version)
} else {
Err(HandshakeError::UnexpectedMessage(Box::new(remote_msg)))?
};
let (remote_nonce, remote_services, remote_version, remote_canonical_addr) =
if let Message::Version {
version,
services,
address_from,
nonce,
..
} = remote_msg
{
let (address_services, canonical_addr) = address_from;
if address_services != services {
info!(
?services,
?address_services,
"peer with inconsistent version services and version address services"
);
}
(nonce, services, version, canonical_addr)
} else {
Err(HandshakeError::UnexpectedMessage(Box::new(remote_msg)))?
};
// Check for nonce reuse, indicating self-connection
//
@ -317,10 +524,12 @@ pub async fn negotiate_version(
Err(HandshakeError::ObsoleteVersion(remote_version))?;
}
Ok((remote_version, remote_services))
Ok((remote_version, remote_services, remote_canonical_addr))
}
impl<S> Service<(TcpStream, SocketAddr)> for Handshake<S>
pub type HandshakeRequest = (TcpStream, ConnectedAddr);
impl<S> Service<HandshakeRequest> for Handshake<S>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send,
@ -334,14 +543,15 @@ where
Poll::Ready(Ok(()))
}
fn call(&mut self, req: (TcpStream, SocketAddr)) -> Self::Future {
let (tcp_stream, addr) = req;
fn call(&mut self, req: HandshakeRequest) -> Self::Future {
let (tcp_stream, connected_addr) = req;
let connector_span = span!(Level::INFO, "connector", ?addr);
let negotiator_span = span!(Level::INFO, "negotiator", peer = ?connected_addr);
// set the peer connection span's parent to the global span, as it
// should exist independently of its creation source (inbound
// connection, crawler, initial peer, ...)
let connection_span = span!(parent: &self.parent_span, Level::INFO, "peer", ?addr);
let connection_span =
span!(parent: &self.parent_span, Level::INFO, "", peer = ?connected_addr);
// Clone these upfront, so they can be moved into the future.
let nonces = self.nonces.clone();
@ -354,7 +564,10 @@ where
let relay = self.relay;
let fut = async move {
debug!(?addr, "negotiating protocol version with remote peer");
debug!(
addr = ?connected_addr,
"negotiating protocol version with remote peer"
);
// CORRECTNESS
//
@ -364,16 +577,16 @@ where
tcp_stream,
Codec::builder()
.for_network(config.network)
.with_metrics_label(addr.ip().to_string())
.with_metrics_addr_label(connected_addr.get_transient_addr_label())
.finish(),
);
// Wrap the entire initial connection setup in a timeout.
let (remote_version, remote_services) = timeout(
let (remote_version, remote_services, _remote_canonical_addr) = timeout(
constants::HANDSHAKE_TIMEOUT,
negotiate_version(
&mut peer_conn,
&addr,
&connected_addr,
config,
nonces,
user_agent,
@ -418,7 +631,7 @@ where
"zcash.net.out.messages",
1,
"command" => msg.to_string(),
"addr" => addr.to_string(),
"addr" => connected_addr.get_transient_addr_label(),
);
// We need to use future::ready rather than an async block here,
// because we need the sink to be Unpin, and the With<Fut, ...>
@ -445,24 +658,30 @@ where
"zcash.net.in.messages",
1,
"command" => msg.to_string(),
"addr" => addr.to_string(),
"addr" => connected_addr.get_transient_addr_label(),
);
// the collector doesn't depend on network activity,
// so this await should not hang
let _ = inbound_ts_collector
.send(MetaAddr::new_responded(&addr, &remote_services))
.await;
if let Some(book_addr) = connected_addr.get_address_book_addr() {
// the collector doesn't depend on network activity,
// so this await should not hang
let _ = inbound_ts_collector
.send(MetaAddr::new_responded(&book_addr, &remote_services))
.await;
}
}
Err(err) => {
metrics::counter!(
"zebra.net.in.errors",
1,
"error" => err.to_string(),
"addr" => addr.to_string(),
"addr" => connected_addr.get_transient_addr_label(),
);
let _ = inbound_ts_collector
.send(MetaAddr::new_errored(&addr, &remote_services))
.await;
if let Some(book_addr) = connected_addr.get_address_book_addr() {
let _ = inbound_ts_collector
.send(MetaAddr::new_errored(&book_addr, &remote_services))
.await;
}
}
}
msg
@ -472,7 +691,9 @@ where
let inv_collector = inv_collector.clone();
let span = debug_span!("inventory_filter");
async move {
if let Ok(Message::Inv(hashes)) = &msg {
if let (Ok(Message::Inv(hashes)), Some(transient_addr)) =
(&msg, connected_addr.get_transient_addr())
{
// We ignore inventory messages with more than one
// block, because they are most likely replies to a
// query, rather than a newly gossiped block.
@ -487,13 +708,15 @@ where
// merged inv messages into separate inv messages. (#1799)
match hashes.as_slice() {
[hash @ InventoryHash::Block(_)] => {
let _ = inv_collector.send((*hash, addr));
let _ = inv_collector.send((*hash, transient_addr));
}
[hashes @ ..] => {
for hash in hashes {
if matches!(hash, InventoryHash::Tx(_)) {
debug!(?hash, "registering Tx inventory hash");
let _ = inv_collector.send((*hash, addr));
// The peer set and inv collector use the peer's remote
// address as an identifier
let _ = inv_collector.send((*hash, transient_addr));
} else {
trace!(?hash, "ignoring non Tx inventory hash")
}
@ -562,10 +785,12 @@ where
Either::Left(_)
) {
tracing::trace!("shutting down due to Client shut down");
// awaiting a local task won't hang
let _ = timestamp_collector
.send(MetaAddr::new_shutdown(&addr, &remote_services))
.await;
if let Some(book_addr) = connected_addr.get_address_book_addr() {
// awaiting a local task won't hang
let _ = timestamp_collector
.send(MetaAddr::new_shutdown(&book_addr, &remote_services))
.await;
}
return;
}
@ -579,7 +804,7 @@ where
if heartbeat_timeout(
heartbeat,
&mut timestamp_collector,
&addr,
&connected_addr,
&remote_services,
)
.await
@ -597,7 +822,7 @@ where
};
// Spawn a new task to drive this handshake.
tokio::spawn(fut.instrument(connector_span))
tokio::spawn(fut.instrument(negotiator_span))
.map(|x: Result<Result<Client, HandshakeError>, JoinError>| Ok(x??))
.boxed()
}
@ -652,7 +877,7 @@ async fn send_one_heartbeat(server_tx: &mut mpsc::Sender<ClientRequest>) -> Resu
async fn heartbeat_timeout<F, T>(
fut: F,
timestamp_collector: &mut mpsc::Sender<MetaAddr>,
addr: &SocketAddr,
connected_addr: &ConnectedAddr,
remote_services: &PeerServices,
) -> Result<T, BoxError>
where
@ -660,21 +885,33 @@ where
{
let t = match timeout(constants::HEARTBEAT_INTERVAL, fut).await {
Ok(inner_result) => {
handle_heartbeat_error(inner_result, timestamp_collector, addr, remote_services).await?
handle_heartbeat_error(
inner_result,
timestamp_collector,
connected_addr,
remote_services,
)
.await?
}
Err(elapsed) => {
handle_heartbeat_error(Err(elapsed), timestamp_collector, addr, remote_services).await?
handle_heartbeat_error(
Err(elapsed),
timestamp_collector,
connected_addr,
remote_services,
)
.await?
}
};
Ok(t)
}
/// If `result.is_err()`, mark `addr` as failed using `timestamp_collector`.
/// If `result.is_err()`, mark `connected_addr` as failed using `timestamp_collector`.
async fn handle_heartbeat_error<T, E>(
result: Result<T, E>,
timestamp_collector: &mut mpsc::Sender<MetaAddr>,
addr: &SocketAddr,
connected_addr: &ConnectedAddr,
remote_services: &PeerServices,
) -> Result<T, E>
where
@ -685,10 +922,11 @@ where
Err(err) => {
tracing::debug!(?err, "heartbeat error, shutting down");
let _ = timestamp_collector
.send(MetaAddr::new_errored(&addr, &remote_services))
.await;
if let Some(book_addr) = connected_addr.get_address_book_addr() {
let _ = timestamp_collector
.send(MetaAddr::new_errored(&book_addr, &remote_services))
.await;
}
Err(err)
}
}

View File

@ -12,11 +12,7 @@ use futures::{
stream::{FuturesUnordered, StreamExt},
TryFutureExt,
};
use tokio::{
net::{TcpListener, TcpStream},
sync::broadcast,
time::Instant,
};
use tokio::{net::TcpListener, sync::broadcast, time::Instant};
use tower::{
buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover,
util::BoxService, Service, ServiceExt,
@ -75,7 +71,7 @@ where
// handshakes. These use the same handshake service internally to detect
// self-connection attempts. Both are decorated with a tower TimeoutLayer to
// enforce timeouts as specified in the Config.
let (listener, connector) = {
let (listen_handshaker, outbound_connector) = {
use tower::timeout::TimeoutLayer;
let hs_timeout = TimeoutLayer::new(constants::HANDSHAKE_TIMEOUT);
use crate::protocol::external::types::PeerServices;
@ -136,18 +132,19 @@ where
}
let listen_guard = tokio::spawn(
listen(config.listen_addr, listener, peerset_tx.clone()).instrument(Span::current()),
listen(config.listen_addr, listen_handshaker, peerset_tx.clone())
.instrument(Span::current()),
);
// 2. Initial peers, specified in the config.
let initial_peers_fut = {
let config = config.clone();
let connector = connector.clone();
let outbound_connector = outbound_connector.clone();
let peerset_tx = peerset_tx.clone();
async move {
let initial_peers = config.initial_peers().await;
// Connect the tx end to the 3 peer sources:
add_initial_peers(initial_peers, connector, peerset_tx).await
add_initial_peers(initial_peers, outbound_connector, peerset_tx).await
}
.boxed()
};
@ -175,7 +172,7 @@ where
demand_tx,
demand_rx,
candidates,
connector,
outbound_connector,
peerset_tx,
)
.instrument(Span::current()),
@ -190,10 +187,10 @@ where
/// Use the provided `handshaker` to connect to `initial_peers`, then send
/// the results over `tx`.
#[instrument(skip(initial_peers, connector, tx))]
#[instrument(skip(initial_peers, outbound_connector, tx))]
async fn add_initial_peers<S>(
initial_peers: std::collections::HashSet<SocketAddr>,
connector: S,
outbound_connector: S,
mut tx: mpsc::Sender<PeerChange>,
) -> Result<(), BoxError>
where
@ -209,7 +206,7 @@ where
// single `CallAll` to completion, and handshakes have a short timeout.
use tower::util::CallAllUnordered;
let addr_stream = futures::stream::iter(initial_peers.into_iter());
let mut handshakes = CallAllUnordered::new(connector, addr_stream);
let mut handshakes = CallAllUnordered::new(outbound_connector, addr_stream);
while let Some(handshake_result) = handshakes.next().await {
// this is verbose, but it's better than just hanging with no output
@ -222,8 +219,11 @@ where
Ok(())
}
/// Bind to `addr`, listen for peers using `handshaker`, then send the
/// results over `tx`.
/// Listens for peer connections on `addr`, then sets up each connection as a
/// Zcash peer.
///
/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
/// the `Client` result over `tx`.
#[instrument(skip(tx, handshaker))]
async fn listen<S>(
addr: SocketAddr,
@ -231,7 +231,7 @@ async fn listen<S>(
tx: mpsc::Sender<PeerChange>,
) -> Result<(), BoxError>
where
S: Service<(TcpStream, SocketAddr), Response = peer::Client, Error = BoxError> + Clone,
S: Service<peer::HandshakeRequest, Response = peer::Client, Error = BoxError> + Clone,
S::Future: Send + 'static,
{
info!("Trying to open Zcash protocol endpoint at {}...", addr);
@ -253,8 +253,10 @@ where
if let Ok((tcp_stream, addr)) = listener.accept().await {
debug!(?addr, "got incoming connection");
handshaker.ready_and().await?;
// TODO: distinguish between proxied listeners and direct listeners
let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr);
// Construct a handshake future but do not drive it yet....
let handshake = handshaker.call((tcp_stream, addr));
let handshake = handshaker.call((tcp_stream, connected_addr));
// ... instead, spawn a new task to handle this connection
let mut tx2 = tx.clone();
tokio::spawn(async move {
@ -292,7 +294,7 @@ enum CrawlerAction {
///
/// Crawl for new peers every `crawl_new_peer_interval`, and whenever there is
/// demand, but no new peers in `candidates`. After crawling, try to connect to
/// one new peer using `connector`.
/// one new peer using `outbound_connector`.
///
/// If a handshake fails, restore the unused demand signal by sending it to
/// `demand_tx`.
@ -300,13 +302,13 @@ enum CrawlerAction {
/// The crawler terminates when `candidates.update()` or `success_tx` returns a
/// permanent internal error. Transient errors and individual peer errors should
/// be handled within the crawler.
#[instrument(skip(demand_tx, demand_rx, candidates, connector, success_tx))]
#[instrument(skip(demand_tx, demand_rx, candidates, outbound_connector, success_tx))]
async fn crawl_and_dial<C, S>(
crawl_new_peer_interval: std::time::Duration,
mut demand_tx: mpsc::Sender<()>,
mut demand_rx: mpsc::Receiver<()>,
mut candidates: CandidateSet<S>,
connector: C,
outbound_connector: C,
mut success_tx: mpsc::Sender<PeerChange>,
) -> Result<(), BoxError>
where
@ -380,10 +382,12 @@ where
// spawn each handshake into an independent task, so it can make
// progress independently of the crawls
let hs_join =
tokio::spawn(dial(candidate, connector.clone())).map(move |res| match res {
Ok(crawler_action) => crawler_action,
Err(e) => {
panic!("panic during handshaking with {:?}: {:?} ", candidate, e);
tokio::spawn(dial(candidate, outbound_connector.clone())).map(move |res| {
match res {
Ok(crawler_action) => crawler_action,
Err(e) => {
panic!("panic during handshaking with {:?}: {:?} ", candidate, e);
}
}
});
handshakes.push(Box::pin(hs_join));
@ -431,12 +435,12 @@ where
}
}
/// Try to connect to `candidate` using `connector`.
/// Try to connect to `candidate` using `outbound_connector`.
///
/// Returns a `HandshakeConnected` action on success, and a
/// `HandshakeFailed` action on error.
#[instrument(skip(connector,))]
async fn dial<C>(candidate: MetaAddr, mut connector: C) -> CrawlerAction
#[instrument(skip(outbound_connector,))]
async fn dial<C>(candidate: MetaAddr, mut outbound_connector: C) -> CrawlerAction
where
C: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxError>
+ Clone
@ -453,10 +457,13 @@ where
debug!(?candidate.addr, "attempting outbound connection in response to demand");
// the connector is always ready, so this can't hang
let connector = connector.ready_and().await.expect("connector never errors");
let outbound_connector = outbound_connector
.ready_and()
.await
.expect("outbound connector never errors");
// the handshake has timeouts, so it shouldn't hang
connector
outbound_connector
.call(candidate.addr)
.map_err(|e| (candidate, e))
.map(Into::into)

View File

@ -40,6 +40,17 @@ use super::{
/// A [`tower::Service`] that abstractly represents "the rest of the network".
///
/// # Security
///
/// The `Discover::Key` must be the transient remote address of each peer. This
/// address may only be valid for the duration of a single connection. (For
/// example, inbound connections have an ephemeral remote port, and proxy
/// connections have an ephemeral local or proxy port.)
///
/// Otherwise, malicious peers could interfere with other peers' `PeerSet` state.
///
/// # Implementation
///
/// This implementation is adapted from the one in `tower-balance`, and as
/// described in that crate's documentation, it
///

View File

@ -45,8 +45,8 @@ pub struct Builder {
version: Version,
/// The maximum allowable message length.
max_len: usize,
/// An optional label to use for reporting metrics.
metrics_label: Option<String>,
/// An optional address label, to use for reporting metrics.
metrics_addr_label: Option<String>,
}
impl Codec {
@ -56,7 +56,7 @@ impl Codec {
network: Network::Mainnet,
version: constants::CURRENT_VERSION,
max_len: MAX_PROTOCOL_MESSAGE_LEN,
metrics_label: None,
metrics_addr_label: None,
}
}
@ -95,9 +95,9 @@ impl Builder {
self
}
/// Configure the codec for the given peer address.
pub fn with_metrics_label(mut self, metrics_label: String) -> Self {
self.metrics_label = Some(metrics_label);
/// Configure the codec with a label corresponding to the peer address.
pub fn with_metrics_addr_label(mut self, metrics_addr_label: String) -> Self {
self.metrics_addr_label = Some(metrics_addr_label);
self
}
}
@ -116,8 +116,10 @@ impl Encoder<Message> for Codec {
return Err(Parse("body length exceeded maximum size"));
}
if let Some(label) = self.builder.metrics_label.clone() {
metrics::counter!("zcash.net.out.bytes.total", (body_length + HEADER_LEN) as u64, "addr" => label);
if let Some(addr_label) = self.builder.metrics_addr_label.clone() {
metrics::counter!("zcash.net.out.bytes.total",
(body_length + HEADER_LEN) as u64,
"addr" => addr_label);
}
use Message::*;
@ -370,7 +372,7 @@ impl Decoder for Codec {
return Err(Parse("body length exceeded maximum size"));
}
if let Some(label) = self.builder.metrics_label.clone() {
if let Some(label) = self.builder.metrics_addr_label.clone() {
metrics::counter!("zcash.net.in.bytes.total", (body_len + HEADER_LEN) as u64, "addr" => label);
}