feat(network): Track misbehaving peer connections and ban them past a threshold (#9201)

* feat(network): Add misbehavior tracking for peers in the address book

* - Add a `misbehavior_score` field to `MetaAddr`
- Add a `bans_by_ip` field to `AddressBook`
- Update the `AddressBook::update()` method to:
    - increment misbehavior scores in its entries,
    - add addr ips to bans_by_ip if the score is excessive,
    - remove any addrs at the banned ip
- Avoid responding to `GetAddr` requests with addresses of misbehaving peers (return None from `sanitized()`),
- Avoid new inbound or outbound connections to banned ips

* Drops banned peer connections in peer set's `poll_ready()` method

* Adds rudimentary misbehavior score tracking

* fixes some proptests, moves and removes some TODOs

* fixes lint

* Removes outdated TODO

* Adds stub for acceptance test

* updates call to updated fn

* Stores likely inbound peer connection addresses in address book and return their IPs with the default port instead of the transient port when responding to GetAddr requests

* Avoids gossiping peer addrs from inbound connections

* updates test to check that sanitize won't return inbound peer addrs or addrs with non-zero misbehaviour scores.

updated misbehaviour score for potentially unavoidable errors.

* Updates `generate` RPC to support any network where PoW is disabled.

Updates acceptance test to check that zebrad instances disconnect once one of them advertises a block with an invalid PoW

* minor tangential cleanup

* Finishes acceptance test, sends misbehavior updates from syncer, and always updates address book entries if the update is to their misbehaviour score

* skip test on windows (to minimize risk of port conflict)

* Applies suggestions from code review
This commit is contained in:
Arya 2025-02-15 17:02:17 -05:00 committed by GitHub
parent 4613dcd259
commit b4211aa1cf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
39 changed files with 871 additions and 130 deletions

View File

@ -37,6 +37,15 @@ impl Network {
}
}
/// Returns iterator over deserialized blocks.
pub fn block_parsed_iter(&self) -> impl Iterator<Item = Block> {
self.block_iter().map(|(_, block_bytes)| {
block_bytes
.zcash_deserialize_into::<Block>()
.expect("block is structurally valid")
})
}
/// Returns iterator over verified unmined transactions in the provided block height range.
pub fn unmined_transactions_in_blocks(
&self,

View File

@ -100,6 +100,17 @@ impl VerifyBlockError {
_ => false,
}
}
/// Returns a suggested misbehaviour score increment for a certain error.
pub fn misbehavior_score(&self) -> u32 {
// TODO: Adjust these values based on zcashd (#9258).
use VerifyBlockError::*;
match self {
Block { source } => source.misbehavior_score(),
Equihash { .. } => 100,
_other => 0,
}
}
}
/// The maximum allowed number of legacy signature check operations in a block.

View File

@ -1037,6 +1037,21 @@ impl VerifyCheckpointError {
_ => false,
}
}
/// Returns a suggested misbehaviour score increment for a certain error.
pub fn misbehavior_score(&self) -> u32 {
// TODO: Adjust these values based on zcashd (#9258).
match self {
VerifyCheckpointError::VerifyBlock(verify_block_error) => {
verify_block_error.misbehavior_score()
}
VerifyCheckpointError::SubsidyError(_)
| VerifyCheckpointError::CoinbaseHeight { .. }
| VerifyCheckpointError::DuplicateTransaction
| VerifyCheckpointError::AmountError(_) => 100,
_other => 0,
}
}
}
/// The CheckpointVerifier service implementation.

View File

@ -67,7 +67,7 @@ impl ParameterCheckpoint for Network {
let (checkpoints_for_network, should_fallback_to_genesis_hash_as_checkpoint) = match self {
Network::Mainnet => (MAINNET_CHECKPOINTS, false),
Network::Testnet(params) if params.is_default_testnet() => (TESTNET_CHECKPOINTS, false),
Network::Testnet(_params) => (TESTNET_CHECKPOINTS, true),
Network::Testnet(_params) => ("", true),
};
// Check that the list starts with the correct genesis block and parses checkpoint list.

View File

@ -284,6 +284,50 @@ impl From<BoxError> for TransactionError {
}
}
impl TransactionError {
/// Returns a suggested misbehaviour score increment for a certain error when
/// verifying a mempool transaction.
pub fn mempool_misbehavior_score(&self) -> u32 {
use TransactionError::*;
// TODO: Adjust these values based on zcashd (#9258).
match self {
ImmatureTransparentCoinbaseSpend { .. }
| UnshieldedTransparentCoinbaseSpend { .. }
| CoinbasePosition
| CoinbaseAfterFirst
| CoinbaseHasJoinSplit
| CoinbaseHasSpend
| CoinbaseHasOutputPreHeartwood
| CoinbaseHasEnableSpendsOrchard
| CoinbaseOutputsNotDecryptable
| CoinbaseInMempool
| NonCoinbaseHasCoinbaseInput
| CoinbaseExpiryBlockHeight { .. }
| IncorrectFee
| Subsidy(_)
| WrongVersion
| NoInputs
| NoOutputs
| BadBalance
| Script(_)
| SmallOrder
| Groth16(_)
| MalformedGroth16(_)
| Ed25519(_)
| RedJubjub(_)
| RedPallas(_)
| BothVPubsNonZero
| DisabledAddToSproutPool
| NotEnoughFlags
| WrongConsensusBranchId
| MissingConsensusBranchId => 100,
_other => 0,
}
}
}
#[derive(Error, Clone, Debug, PartialEq, Eq)]
#[allow(missing_docs)]
pub enum BlockError {
@ -373,4 +417,18 @@ impl BlockError {
pub fn is_duplicate_request(&self) -> bool {
matches!(self, BlockError::AlreadyInChain(..))
}
/// Returns a suggested misbehaviour score increment for a certain error.
pub(crate) fn misbehavior_score(&self) -> u32 {
use BlockError::*;
match self {
MissingHeight(_)
| MaxHeight(_, _, _)
| InvalidDifficulty(_, _)
| TargetDifficultyLimit(_, _, _, _, _)
| DifficultyFilter(_, _, _, _) => 100,
_other => 0,
}
}
}

View File

@ -139,6 +139,15 @@ impl RouterError {
RouterError::Block { source, .. } => source.is_duplicate_request(),
}
}
/// Returns a suggested misbehaviour score increment for a certain error.
pub fn misbehavior_score(&self) -> u32 {
// TODO: Adjust these values based on zcashd (#9258).
match self {
RouterError::Checkpoint { source } => source.misbehavior_score(),
RouterError::Block { source } => source.misbehavior_score(),
}
}
}
impl<S, V> Service<Request> for BlockVerifierRouter<S, V>

View File

@ -10,6 +10,7 @@ use std::{
};
use chrono::Utc;
use indexmap::IndexMap;
use ordered_map::OrderedMap;
use tokio::sync::watch;
use tracing::Span;
@ -80,6 +81,9 @@ pub struct AddressBook {
// TODO: Replace with `by_ip: HashMap<IpAddr, BTreeMap<DateTime32, MetaAddr>>` to support configured `max_connections_per_ip` greater than 1
most_recent_by_ip: Option<HashMap<IpAddr, MetaAddr>>,
/// A list of banned addresses, with the time they were banned.
bans_by_ip: Arc<IndexMap<IpAddr, Instant>>,
/// The local listener address.
local_listener: SocketAddr,
@ -162,6 +166,7 @@ impl AddressBook {
address_metrics_tx,
last_address_log: None,
most_recent_by_ip: should_limit_outbound_conns_per_ip.then(HashMap::new),
bans_by_ip: Default::default(),
};
new_book.update_metrics(instant_now, chrono_now);
@ -409,6 +414,14 @@ impl AddressBook {
/// peers.
#[allow(clippy::unwrap_in_result)]
pub fn update(&mut self, change: MetaAddrChange) -> Option<MetaAddr> {
if self.bans_by_ip.contains_key(&change.addr().ip()) {
tracing::warn!(
?change,
"attempted to add a banned peer addr to address book"
);
return None;
}
let previous = self.get(change.addr());
let _guard = self.span.enter();
@ -428,6 +441,44 @@ impl AddressBook {
);
if let Some(updated) = updated {
if updated.misbehavior() >= constants::MAX_PEER_MISBEHAVIOR_SCORE {
// Ban and skip outbound connections with excessively misbehaving peers.
let banned_ip = updated.addr.ip();
let bans_by_ip = Arc::make_mut(&mut self.bans_by_ip);
bans_by_ip.insert(banned_ip, Instant::now());
if bans_by_ip.len() > constants::MAX_BANNED_IPS {
// Remove the oldest banned IP from the address book.
bans_by_ip.shift_remove_index(0);
}
self.most_recent_by_ip
.as_mut()
.expect("should be some when should_remove_most_recent_by_ip is true")
.remove(&banned_ip);
let banned_addrs: Vec<_> = self
.by_addr
.descending_keys()
.skip_while(|addr| addr.ip() != banned_ip)
.take_while(|addr| addr.ip() == banned_ip)
.cloned()
.collect();
for addr in banned_addrs {
self.by_addr.remove(&addr);
}
warn!(
?updated,
total_peers = self.by_addr.len(),
recent_peers = self.recently_live_peers(chrono_now).len(),
"banned ip and removed banned peer addresses from address book",
);
return None;
}
// Ignore invalid outbound addresses.
// (Inbound connections can be monitored via Zebra's metrics.)
if !updated.address_is_valid_for_outbound(&self.network) {
@ -634,6 +685,11 @@ impl AddressBook {
.cloned()
}
/// Returns banned IP addresses.
pub fn bans(&self) -> Arc<IndexMap<IpAddr, Instant>> {
self.bans_by_ip.clone()
}
/// Returns the number of entries in this address book.
pub fn len(&self) -> usize {
self.by_addr.len()
@ -805,6 +861,7 @@ impl Clone for AddressBook {
address_metrics_tx,
last_address_log: None,
most_recent_by_ip: self.most_recent_by_ip.clone(),
bans_by_ip: self.bans_by_ip.clone(),
}
}
}

View File

@ -1,7 +1,13 @@
//! The timestamp collector collects liveness information from peers.
use std::{cmp::max, net::SocketAddr, sync::Arc};
use std::{
cmp::max,
net::{IpAddr, SocketAddr},
sync::Arc,
time::Instant,
};
use indexmap::IndexMap;
use thiserror::Error;
use tokio::{
sync::{mpsc, watch},
@ -43,6 +49,7 @@ impl AddressBookUpdater {
local_listener: SocketAddr,
) -> (
Arc<std::sync::Mutex<AddressBook>>,
watch::Receiver<Arc<IndexMap<IpAddr, Instant>>>,
mpsc::Sender<MetaAddrChange>,
watch::Receiver<AddressMetrics>,
JoinHandle<Result<(), BoxError>>,
@ -74,6 +81,13 @@ impl AddressBookUpdater {
};
let worker_address_book = address_book.clone();
let (bans_sender, bans_receiver) = tokio::sync::watch::channel(
worker_address_book
.lock()
.expect("mutex should be unpoisoned")
.bans(),
);
let worker = move || {
info!("starting the address book updater");
@ -84,11 +98,24 @@ impl AddressBookUpdater {
//
// Briefly hold the address book threaded mutex, to update the
// state for a single address.
worker_address_book
let updated = worker_address_book
.lock()
.expect("mutex should be unpoisoned")
.update(event);
// `UpdateMisbehavior` events should only be passed to `update()` here,
// so that this channel is always updated when new addresses are banned.
if updated.is_none() {
let bans = worker_address_book
.lock()
.expect("mutex should be unpoisoned")
.bans();
if bans.contains_key(&event.addr().ip()) {
let _ = bans_sender.send(bans);
}
}
#[cfg(feature = "progress-bar")]
if matches!(howudoin::cancelled(), Some(true)) {
address_bar.close();
@ -136,6 +163,7 @@ impl AddressBookUpdater {
(
address_book,
bans_receiver,
worker_tx,
address_metrics,
address_book_updater_task_handle,

View File

@ -20,6 +20,18 @@ pub enum CacheDir {
CustomPath(PathBuf),
}
impl From<bool> for CacheDir {
fn from(value: bool) -> Self {
CacheDir::IsEnabled(value)
}
}
impl From<PathBuf> for CacheDir {
fn from(value: PathBuf) -> Self {
CacheDir::CustomPath(value)
}
}
impl CacheDir {
/// Returns a `CacheDir` enabled with the default path.
pub fn default_path() -> Self {

View File

@ -383,6 +383,13 @@ pub const MAX_OVERLOAD_DROP_PROBABILITY: f32 = 0.5;
/// The minimum interval between logging peer set status updates.
pub const MIN_PEER_SET_LOG_INTERVAL: Duration = Duration::from_secs(60);
/// The maximum number of peer misbehavior incidents before a peer is
/// disconnected and banned.
pub const MAX_PEER_MISBEHAVIOR_SCORE: u32 = 100;
/// The maximum number of banned IP addresses to be stored in-memory at any time.
pub const MAX_BANNED_IPS: usize = 20_000;
lazy_static! {
/// The minimum network protocol version accepted by this crate for each network,
/// represented as a network upgrade.

View File

@ -199,11 +199,19 @@ pub struct MetaAddr {
/// See the [`MetaAddr::last_failure`] method for details.
last_failure: Option<Instant>,
/// The misbehavior score for this peer.
#[cfg_attr(any(test, feature = "proptest-impl"), proptest(value = 0))]
misbehavior_score: u32,
/// The outcome of our most recent communication attempt with this peer.
//
// TODO: move the time and services fields into PeerAddrState?
// then some fields could be required in some states
pub(crate) last_connection_state: PeerAddrState,
/// Whether this peer address was added to the address book
/// when the peer made an inbound connection.
is_inbound: bool,
}
/// A change to an existing `MetaAddr`.
@ -260,6 +268,7 @@ pub enum MetaAddrChange {
)]
addr: PeerSocketAddr,
services: PeerServices,
is_inbound: bool,
},
/// Updates an existing `MetaAddr` when a peer responds with a message.
@ -280,6 +289,14 @@ pub enum MetaAddrChange {
addr: PeerSocketAddr,
services: Option<PeerServices>,
},
/// Updates an existing `MetaAddr` when a peer misbehaves such as by advertising
/// semantically invalid blocks or transactions.
#[cfg_attr(any(test, feature = "proptest-impl"), proptest(skip))]
UpdateMisbehavior {
addr: PeerSocketAddr,
score_increment: u32,
},
}
impl MetaAddr {
@ -306,6 +323,8 @@ impl MetaAddr {
last_attempt: None,
last_failure: None,
last_connection_state: NeverAttemptedGossiped,
misbehavior_score: 0,
is_inbound: false,
}
}
@ -338,10 +357,15 @@ impl MetaAddr {
/// - malicious peers could interfere with other peers' [`AddressBook`](crate::AddressBook)
/// state, or
/// - Zebra could advertise unreachable addresses to its own peers.
pub fn new_connected(addr: PeerSocketAddr, services: &PeerServices) -> MetaAddrChange {
pub fn new_connected(
addr: PeerSocketAddr,
services: &PeerServices,
is_inbound: bool,
) -> MetaAddrChange {
UpdateConnected {
addr: canonical_peer_addr(*addr),
services: *services,
is_inbound,
}
}
@ -421,6 +445,11 @@ impl MetaAddr {
self.last_response.or(self.untrusted_last_seen)
}
/// Returns whether the address is from an inbound peer connection
pub fn is_inbound(&self) -> bool {
self.is_inbound
}
/// Returns the unverified "last seen time" gossiped by the remote peer that
/// sent us this address.
///
@ -623,6 +652,11 @@ impl MetaAddr {
}
}
/// Returns a score of misbehavior encountered in a peer at this address.
pub fn misbehavior(&self) -> u32 {
self.misbehavior_score
}
/// Return a sanitized version of this `MetaAddr`, for sending to a remote peer.
///
/// Returns `None` if this `MetaAddr` should not be sent to remote peers.
@ -632,6 +666,11 @@ impl MetaAddr {
return None;
}
// Avoid responding to GetAddr requests with addresses of misbehaving peers.
if self.misbehavior_score != 0 || self.is_inbound {
return None;
}
// Sanitize time
let last_seen = self.last_seen()?;
let remainder = last_seen
@ -655,6 +694,8 @@ impl MetaAddr {
last_attempt: None,
last_failure: None,
last_connection_state: NeverAttemptedGossiped,
misbehavior_score: 0,
is_inbound: false,
})
}
}
@ -679,7 +720,8 @@ impl MetaAddrChange {
| UpdateAttempt { addr }
| UpdateConnected { addr, .. }
| UpdateResponded { addr, .. }
| UpdateFailed { addr, .. } => *addr,
| UpdateFailed { addr, .. }
| UpdateMisbehavior { addr, .. } => *addr,
}
}
@ -695,7 +737,8 @@ impl MetaAddrChange {
| UpdateAttempt { addr }
| UpdateConnected { addr, .. }
| UpdateResponded { addr, .. }
| UpdateFailed { addr, .. } => *addr = new_addr,
| UpdateFailed { addr, .. }
| UpdateMisbehavior { addr, .. } => *addr = new_addr,
}
}
@ -713,6 +756,7 @@ impl MetaAddrChange {
UpdateConnected { services, .. } => Some(*services),
UpdateResponded { .. } => None,
UpdateFailed { services, .. } => *services,
UpdateMisbehavior { .. } => None,
}
}
@ -729,7 +773,8 @@ impl MetaAddrChange {
UpdateAttempt { .. }
| UpdateConnected { .. }
| UpdateResponded { .. }
| UpdateFailed { .. } => None,
| UpdateFailed { .. }
| UpdateMisbehavior { .. } => None,
}
}
@ -760,7 +805,10 @@ impl MetaAddrChange {
// peer address. So the attempt time is a lower bound for the actual
// handshake time.
UpdateAttempt { .. } => Some(now),
UpdateConnected { .. } | UpdateResponded { .. } | UpdateFailed { .. } => None,
UpdateConnected { .. }
| UpdateResponded { .. }
| UpdateFailed { .. }
| UpdateMisbehavior { .. } => None,
}
}
@ -774,7 +822,7 @@ impl MetaAddrChange {
// - the peer will appear to be live for longer, delaying future
// reconnection attempts.
UpdateConnected { .. } | UpdateResponded { .. } => Some(now),
UpdateFailed { .. } => None,
UpdateFailed { .. } | UpdateMisbehavior { .. } => None,
}
}
@ -792,7 +840,7 @@ impl MetaAddrChange {
// states for longer, and
// - the peer will appear to be used for longer, delaying future
// reconnection attempts.
UpdateFailed { .. } => Some(now),
UpdateFailed { .. } | UpdateMisbehavior { .. } => Some(now),
}
}
@ -804,7 +852,7 @@ impl MetaAddrChange {
// local listeners get sanitized, so the state doesn't matter here
NewLocal { .. } => NeverAttemptedGossiped,
UpdateAttempt { .. } => AttemptPending,
UpdateConnected { .. } | UpdateResponded { .. } => Responded,
UpdateConnected { .. } | UpdateResponded { .. } | UpdateMisbehavior { .. } => Responded,
UpdateFailed { .. } => Failed,
}
}
@ -819,6 +867,27 @@ impl MetaAddrChange {
last_attempt: self.last_attempt(instant_now),
last_failure: self.last_failure(instant_now),
last_connection_state: self.peer_addr_state(),
misbehavior_score: self.misbehavior_score(),
is_inbound: self.is_inbound(),
}
}
/// Returns the misbehavior score increment for the current change.
pub fn misbehavior_score(&self) -> u32 {
match self {
MetaAddrChange::UpdateMisbehavior {
score_increment, ..
} => *score_increment,
_ => 0,
}
}
/// Returns whether this change was created for a new inbound connection.
pub fn is_inbound(&self) -> bool {
if let MetaAddrChange::UpdateConnected { is_inbound, .. } = self {
*is_inbound
} else {
false
}
}
@ -841,6 +910,8 @@ impl MetaAddrChange {
last_attempt: None,
last_failure: None,
last_connection_state: self.peer_addr_state(),
misbehavior_score: self.misbehavior_score(),
is_inbound: self.is_inbound(),
}
}
@ -902,10 +973,11 @@ impl MetaAddrChange {
let previous_has_been_attempted = !previous.last_connection_state.is_never_attempted();
let change_to_never_attempted = self.peer_addr_state().is_never_attempted();
let is_misbehavior_update = self.misbehavior_score() != 0;
// Invalid changes
if change_to_never_attempted && previous_has_been_attempted {
if change_to_never_attempted && previous_has_been_attempted && !is_misbehavior_update {
// Existing entry has been attempted, change is NeverAttempted
// - ignore the change
//
@ -916,7 +988,7 @@ impl MetaAddrChange {
return None;
}
if change_is_out_of_order && !change_is_concurrent {
if change_is_out_of_order && !change_is_concurrent && !is_misbehavior_update {
// Change is significantly out of order: ignore it.
//
// # Security
@ -926,7 +998,7 @@ impl MetaAddrChange {
return None;
}
if change_is_concurrent && !connection_has_more_progress {
if change_is_concurrent && !connection_has_more_progress && !is_misbehavior_update {
// Change is close together in time, and it would revert the connection to an earlier
// state.
//
@ -992,6 +1064,8 @@ impl MetaAddrChange {
last_attempt: None,
last_failure: None,
last_connection_state: self.peer_addr_state(),
misbehavior_score: previous.misbehavior_score + self.misbehavior_score(),
is_inbound: previous.is_inbound || self.is_inbound(),
})
} else {
// Existing entry and change are both Attempt, Responded, Failed,
@ -1014,6 +1088,8 @@ impl MetaAddrChange {
last_failure: self.last_failure(instant_now).or(previous.last_failure),
// Replace the state with the updated state.
last_connection_state: self.peer_addr_state(),
misbehavior_score: previous.misbehavior_score + self.misbehavior_score(),
is_inbound: previous.is_inbound || self.is_inbound(),
})
}
}

View File

@ -48,6 +48,8 @@ proptest! {
// also check the address, port, and services individually
prop_assert!(!addr.addr.ip().is_unspecified());
prop_assert_ne!(addr.addr.port(), 0);
prop_assert_eq!(addr.misbehavior(), 0);
prop_assert!(!addr.is_inbound());
if let Some(services) = addr.services {
prop_assert!(services.contains(PeerServices::NODE_NETWORK));

View File

@ -36,6 +36,8 @@ fn sanitize_extremes() {
last_attempt: None,
last_failure: None,
last_connection_state: Default::default(),
misbehavior_score: Default::default(),
is_inbound: false,
};
let max_time_entry = MetaAddr {
@ -46,6 +48,8 @@ fn sanitize_extremes() {
last_attempt: None,
last_failure: None,
last_connection_state: Default::default(),
misbehavior_score: Default::default(),
is_inbound: false,
};
if let Some(min_sanitized) = min_time_entry.sanitize(&Mainnet) {

View File

@ -37,7 +37,7 @@ use crate::{
external::{types::Nonce, InventoryHash, Message},
internal::{InventoryResponse, Request, Response},
},
BoxError, MAX_TX_INV_IN_SENT_MESSAGE,
BoxError, PeerSocketAddr, MAX_TX_INV_IN_SENT_MESSAGE,
};
use InventoryResponse::*;
@ -140,6 +140,7 @@ impl Handler {
&mut self,
msg: Message,
cached_addrs: &mut Vec<MetaAddr>,
transient_addr: Option<PeerSocketAddr>,
) -> Option<Message> {
let mut ignored_msg = None;
// TODO: can this be avoided?
@ -215,7 +216,9 @@ impl Handler {
Handler::Finished(Err(PeerError::NotFoundResponse(missing_transaction_ids)))
} else if pending_ids.is_empty() || ignored_msg.is_some() {
// If we got some of what we wanted, let the internal client know.
let available = transactions.into_iter().map(InventoryResponse::Available);
let available = transactions
.into_iter()
.map(|t| InventoryResponse::Available((t, transient_addr)));
let missing = pending_ids.into_iter().map(InventoryResponse::Missing);
Handler::Finished(Ok(Response::Transactions(
@ -263,7 +266,9 @@ impl Handler {
Handler::Finished(Err(PeerError::NotFoundResponse(missing_transaction_ids)))
} else {
// If we got some of what we wanted, let the internal client know.
let available = transactions.into_iter().map(InventoryResponse::Available);
let available = transactions
.into_iter()
.map(|t| InventoryResponse::Available((t, transient_addr)));
let missing = pending_ids.into_iter().map(InventoryResponse::Missing);
Handler::Finished(Ok(Response::Transactions(
@ -324,7 +329,9 @@ impl Handler {
if pending_hashes.is_empty() {
// If we got everything we wanted, let the internal client know.
let available = blocks.into_iter().map(InventoryResponse::Available);
let available = blocks
.into_iter()
.map(|block| InventoryResponse::Available((block, transient_addr)));
Handler::Finished(Ok(Response::Blocks(available.collect())))
} else {
// Keep on waiting for all the blocks we wanted, until we get them or time out.
@ -368,7 +375,9 @@ impl Handler {
Handler::Finished(Err(PeerError::NotFoundResponse(missing_block_hashes)))
} else {
// If we got some of what we wanted, let the internal client know.
let available = blocks.into_iter().map(InventoryResponse::Available);
let available = blocks
.into_iter()
.map(|block| InventoryResponse::Available((block, transient_addr)));
let missing = pending_hashes.into_iter().map(InventoryResponse::Missing);
Handler::Finished(Ok(Response::Blocks(available.chain(missing).collect())))
@ -854,7 +863,7 @@ where
let request_msg = match self.state {
State::AwaitingResponse {
ref mut handler, ..
} => span.in_scope(|| handler.process_message(peer_msg, &mut self.cached_addrs)),
} => span.in_scope(|| handler.process_message(peer_msg, &mut self.cached_addrs, self.connection_info.connected_addr.get_transient_addr())),
_ => unreachable!("unexpected state after AwaitingResponse: {:?}, peer_msg: {:?}, client_receiver: {:?}",
self.state,
peer_msg,
@ -1448,7 +1457,7 @@ where
for transaction in transactions.into_iter() {
match transaction {
Available(transaction) => {
Available((transaction, _)) => {
if let Err(e) = self.peer_tx.send(Message::Tx(transaction)).await {
self.fail_with(e).await;
return;
@ -1472,7 +1481,7 @@ where
for block in blocks.into_iter() {
match block {
Available(block) => {
Available((block, _)) => {
if let Err(e) = self.peer_tx.send(Message::Block(block)).await {
self.fail_with(e).await;
return;

View File

@ -113,7 +113,7 @@ proptest! {
);
let response = response_result.unwrap();
prop_assert_eq!(response, Response::Blocks(vec![Available(second_block.0)]));
prop_assert_eq!(response, Response::Blocks(vec![Available((second_block.0, None))]));
// Check the state after the response
let error = shared_error_slot.try_get_error();

View File

@ -262,10 +262,10 @@ impl ConnectedAddr {
/// TODO: remove the `get_` from these methods (Rust style avoids `get` prefixes)
pub fn get_address_book_addr(&self) -> Option<PeerSocketAddr> {
match self {
OutboundDirect { addr } => Some(*addr),
OutboundDirect { addr } | InboundDirect { addr } => Some(*addr),
// TODO: consider using the canonical address of the peer to track
// outbound proxy connections
InboundDirect { .. } | OutboundProxy { .. } | InboundProxy { .. } | Isolated => None,
OutboundProxy { .. } | InboundProxy { .. } | Isolated => None,
}
}
@ -370,6 +370,11 @@ impl ConnectedAddr {
addrs.into_iter()
}
/// Returns true if the [`ConnectedAddr`] was created for an inbound connection.
pub fn is_inbound(&self) -> bool {
matches!(self, InboundDirect { .. } | InboundProxy { .. })
}
}
impl fmt::Debug for ConnectedAddr {
@ -933,7 +938,11 @@ where
// the collector doesn't depend on network activity,
// so this await should not hang
let _ = address_book_updater
.send(MetaAddr::new_connected(book_addr, &remote_services))
.send(MetaAddr::new_connected(
book_addr,
&remote_services,
connected_addr.is_inbound(),
))
.await;
}

View File

@ -5,9 +5,9 @@
//! [tower-balance]: https://github.com/tower-rs/tower/tree/master/tower/src/balance
use std::{
collections::{BTreeMap, HashSet},
collections::{BTreeMap, HashMap, HashSet},
convert::Infallible,
net::SocketAddr,
net::{IpAddr, SocketAddr},
pin::Pin,
sync::Arc,
time::Duration,
@ -19,10 +19,11 @@ use futures::{
stream::{FuturesUnordered, StreamExt},
Future, TryFutureExt,
};
use indexmap::IndexMap;
use rand::seq::SliceRandom;
use tokio::{
net::{TcpListener, TcpStream},
sync::broadcast,
sync::{broadcast, mpsc, watch},
time::{sleep, Instant},
};
use tokio_stream::wrappers::IntervalStream;
@ -34,7 +35,7 @@ use tracing_futures::Instrument;
use zebra_chain::{chain_tip::ChainTip, diagnostic::task::WaitForPanics};
use crate::{
address_book_updater::AddressBookUpdater,
address_book_updater::{AddressBookUpdater, MIN_CHANNEL_SIZE},
constants,
meta_addr::{MetaAddr, MetaAddrChange},
peer::{
@ -101,6 +102,7 @@ pub async fn init<S, C>(
) -> (
Buffer<BoxService<Request, Response, BoxError>, Request>,
Arc<std::sync::Mutex<AddressBook>>,
mpsc::Sender<(PeerSocketAddr, u32)>,
)
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + Sync + 'static,
@ -109,8 +111,58 @@ where
{
let (tcp_listener, listen_addr) = open_listener(&config.clone()).await;
let (address_book, address_book_updater, address_metrics, address_book_updater_guard) =
AddressBookUpdater::spawn(&config, listen_addr);
let (
address_book,
bans_receiver,
address_book_updater,
address_metrics,
address_book_updater_guard,
) = AddressBookUpdater::spawn(&config, listen_addr);
let (misbehavior_tx, mut misbehavior_rx) = mpsc::channel(
// Leave enough room for a misbehaviour update on every peer connection
// before the channel is drained.
config
.peerset_total_connection_limit()
.max(MIN_CHANNEL_SIZE),
);
let misbehaviour_updater = address_book_updater.clone();
tokio::spawn(
async move {
let mut misbehaviors: HashMap<PeerSocketAddr, u32> = HashMap::new();
// Batch misbehaviour updates so peers can't keep the address book mutex locked
// by repeatedly sending invalid blocks or transactions.
let mut flush_timer =
IntervalStream::new(tokio::time::interval(Duration::from_secs(30)));
loop {
tokio::select! {
msg = misbehavior_rx.recv() => match msg {
Some((peer_addr, score_increment)) => *misbehaviors
.entry(peer_addr)
.or_default()
+= score_increment,
None => break,
},
_ = flush_timer.next() => {
for (addr, score_increment) in misbehaviors.drain() {
let _ = misbehaviour_updater
.send(MetaAddrChange::UpdateMisbehavior {
addr,
score_increment
})
.await;
}
},
};
}
tracing::warn!("exiting misbehavior update batch task");
}
.in_current_span(),
);
// Create a broadcast channel for peer inventory advertisements.
// If it reaches capacity, this channel drops older inventory advertisements.
@ -173,6 +225,7 @@ where
demand_tx.clone(),
handle_rx,
inv_receiver,
bans_receiver.clone(),
address_metrics,
MinimumPeerVersion::new(latest_chain_tip, &config.network),
None,
@ -188,6 +241,7 @@ where
constants::MIN_INBOUND_PEER_CONNECTION_INTERVAL,
listen_handshaker,
peerset_tx.clone(),
bans_receiver,
);
let listen_guard = tokio::spawn(listen_fut.in_current_span());
@ -258,7 +312,7 @@ where
])
.unwrap();
(peer_set, address_book)
(peer_set, address_book, misbehavior_tx)
}
/// Use the provided `outbound_connector` to connect to the configured DNS seeder and
@ -550,6 +604,7 @@ async fn accept_inbound_connections<S>(
min_inbound_peer_connection_interval: Duration,
handshaker: S,
peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
bans_receiver: watch::Receiver<Arc<IndexMap<IpAddr, std::time::Instant>>>,
) -> Result<(), BoxError>
where
S: Service<peer::HandshakeRequest<TcpStream>, Response = peer::Client, Error = BoxError>
@ -586,6 +641,12 @@ where
if let Ok((tcp_stream, addr)) = inbound_result {
let addr: PeerSocketAddr = addr.into();
if bans_receiver.borrow().clone().contains_key(&addr.ip()) {
debug!(?addr, "banned inbound connection attempt");
std::mem::drop(tcp_stream);
continue;
}
if active_inbound_connections.update_count()
>= config.peerset_inbound_connection_limit()
|| recent_inbound_connections.is_past_limit_or_add(addr.ip())

View File

@ -1444,7 +1444,7 @@ async fn local_listener_port_with(listen_addr: SocketAddr, network: Network) {
let inbound_service =
service_fn(|_| async { unreachable!("inbound service should never be called") });
let (_peer_service, address_book) = init(
let (_peer_service, address_book, _) = init(
config,
inbound_service,
NoChainTip,
@ -1510,7 +1510,7 @@ where
..default_config
};
let (_peer_service, address_book) = init(
let (_peer_service, address_book, _) = init(
config,
inbound_service,
NoChainTip,
@ -1547,8 +1547,13 @@ where
config.peerset_initial_target_size = peerset_initial_target_size;
}
let (address_book, address_book_updater, _address_metrics, _address_book_updater_guard) =
AddressBookUpdater::spawn(&config, config.listen_addr);
let (
address_book,
_bans_receiver,
address_book_updater,
_address_metrics,
_address_book_updater_guard,
) = AddressBookUpdater::spawn(&config, config.listen_addr);
// Add enough fake peers to go over the limit, even if the limit is zero.
let over_limit_peers = config.peerset_outbound_connection_limit() * 2 + 1;
@ -1678,6 +1683,8 @@ where
let over_limit_connections = config.peerset_inbound_connection_limit() * 2 + 1;
let (peerset_tx, peerset_rx) = mpsc::channel::<DiscoveredPeer>(over_limit_connections);
let (_bans_tx, bans_rx) = tokio::sync::watch::channel(Default::default());
// Start listening for connections.
let listen_fut = accept_inbound_connections(
config.clone(),
@ -1685,6 +1692,7 @@ where
MIN_INBOUND_PEER_CONNECTION_INTERVAL_FOR_TESTS,
listen_handshaker,
peerset_tx.clone(),
bans_rx,
);
let listen_task_handle = tokio::spawn(listen_fut);
@ -1789,8 +1797,13 @@ where
let (peerset_tx, peerset_rx) = mpsc::channel::<DiscoveredPeer>(peer_count + 1);
let (_address_book, address_book_updater, _address_metrics, address_book_updater_guard) =
AddressBookUpdater::spawn(&config, unused_v4);
let (
_address_book,
_bans_receiver,
address_book_updater,
_address_metrics,
address_book_updater_guard,
) = AddressBookUpdater::spawn(&config, unused_v4);
let add_fut = add_initial_peers(config, outbound_connector, peerset_tx, address_book_updater);
let add_task_handle = tokio::spawn(add_fut);

View File

@ -100,6 +100,7 @@ use std::{
marker::PhantomData,
net::IpAddr,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Instant,
};
@ -111,6 +112,7 @@ use futures::{
stream::FuturesUnordered,
task::noop_waker,
};
use indexmap::IndexMap;
use itertools::Itertools;
use num_integer::div_ceil;
use tokio::{
@ -183,6 +185,9 @@ where
/// A channel that asks the peer crawler task to connect to more peers.
demand_signal: mpsc::Sender<MorePeers>,
/// A watch channel receiver with a copy of banned IP addresses.
bans_receiver: watch::Receiver<Arc<IndexMap<IpAddr, std::time::Instant>>>,
// Peer Tracking: Ready Peers
//
/// Connected peers that are ready to receive requests from Zebra,
@ -280,6 +285,7 @@ where
/// and shuts down all the tasks as soon as one task exits;
/// - `inv_stream`: receives inventory changes from peers,
/// allowing the peer set to direct inventory requests;
/// - `bans_receiver`: receives a map of banned IP addresses that should be dropped;
/// - `address_book`: when peer set is busy, it logs address book diagnostics.
/// - `minimum_peer_version`: endpoint to see the minimum peer protocol version in real time.
/// - `max_conns_per_ip`: configured maximum number of peers that can be in the
@ -291,6 +297,7 @@ where
demand_signal: mpsc::Sender<MorePeers>,
handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>,
inv_stream: broadcast::Receiver<InventoryChange>,
bans_receiver: watch::Receiver<Arc<IndexMap<IpAddr, std::time::Instant>>>,
address_metrics: watch::Receiver<AddressMetrics>,
minimum_peer_version: MinimumPeerVersion<C>,
max_conns_per_ip: Option<usize>,
@ -299,6 +306,8 @@ where
// New peers
discover,
demand_signal,
// Banned peers
bans_receiver,
// Ready peers
ready_services: HashMap::new(),
@ -475,6 +484,12 @@ where
Some(Ok((key, svc))) => {
trace!(?key, "service became ready");
if self.bans_receiver.borrow().contains_key(&key.ip()) {
warn!(?key, "service is banned, dropping service");
std::mem::drop(svc);
continue;
}
self.push_ready(true, key, svc);
// Return Ok if at least one peer became ready.
@ -544,7 +559,15 @@ where
match peer_readiness {
// Still ready, add it back to the list.
Ok(()) => self.push_ready(false, key, svc),
Ok(()) => {
if self.bans_receiver.borrow().contains_key(&key.ip()) {
debug!(?key, "service ip is banned, dropping service");
std::mem::drop(svc);
continue;
}
self.push_ready(false, key, svc)
}
// Ready -> Errored
Err(error) => {

View File

@ -211,6 +211,7 @@ where
.unwrap_or_else(|| guard.create_inventory_receiver());
let address_metrics = guard.prepare_address_book(self.address_book);
let (_bans_sender, bans_receiver) = tokio::sync::watch::channel(Default::default());
let peer_set = PeerSet::new(
&config,
@ -218,6 +219,7 @@ where
demand_signal,
handle_rx,
inv_stream,
bans_receiver,
address_metrics,
minimum_peer_version,
max_conns_per_ip,

View File

@ -7,7 +7,7 @@ use zebra_chain::{
transaction::{UnminedTx, UnminedTxId},
};
use crate::{meta_addr::MetaAddr, protocol::internal::InventoryResponse};
use crate::{meta_addr::MetaAddr, protocol::internal::InventoryResponse, PeerSocketAddr};
#[cfg(any(test, feature = "proptest-impl"))]
use proptest_derive::Arbitrary;
@ -67,14 +67,14 @@ pub enum Response {
/// `zcashd` sometimes sends no response, and sometimes sends `notfound`.
//
// TODO: make this into a HashMap<block::Hash, InventoryResponse<Arc<Block>, ()>> - a unique list (#2244)
Blocks(Vec<InventoryResponse<Arc<Block>, block::Hash>>),
Blocks(Vec<InventoryResponse<(Arc<Block>, Option<PeerSocketAddr>), block::Hash>>),
/// A list of found unmined transactions, and missing unmined transaction IDs.
///
/// Each list contains zero or more entries.
//
// TODO: make this into a HashMap<UnminedTxId, InventoryResponse<UnminedTx, ()>> - a unique list (#2244)
Transactions(Vec<InventoryResponse<UnminedTx, UnminedTxId>>),
Transactions(Vec<InventoryResponse<(UnminedTx, Option<PeerSocketAddr>), UnminedTxId>>),
}
impl fmt::Display for Response {
@ -94,7 +94,7 @@ impl fmt::Display for Response {
// Display heights for single-block responses (which Zebra requests and expects)
Response::Blocks(blocks) if blocks.len() == 1 => {
match blocks.first().expect("len is 1") {
Available(block) => format!(
Available((block, _)) => format!(
"Block {{ height: {}, hash: {} }}",
block
.coinbase_height()

View File

@ -1319,10 +1319,10 @@ where
> = self.clone();
let network = self.network.clone();
if !network.is_regtest() {
if !network.disable_pow() {
return Err(ErrorObject::borrowed(
0,
"generate is only supported on regtest",
"generate is only supported on networks where PoW is disabled",
None,
));
}

View File

@ -173,7 +173,7 @@ impl StartCmd {
setup_rx,
));
let (peer_set, address_book) = zebra_network::init(
let (peer_set, address_book, misbehavior_sender) = zebra_network::init(
config.network.clone(),
inbound,
latest_chain_tip.clone(),
@ -200,6 +200,7 @@ impl StartCmd {
block_verifier_router.clone(),
state.clone(),
latest_chain_tip.clone(),
misbehavior_sender.clone(),
);
info!("initializing mempool");
@ -211,6 +212,7 @@ impl StartCmd {
sync_status.clone(),
latest_chain_tip.clone(),
chain_tip_change.clone(),
misbehavior_sender.clone(),
);
let mempool = BoxService::new(mempool);
let mempool = ServiceBuilder::new()
@ -230,6 +232,7 @@ impl StartCmd {
mempool: mempool.clone(),
state: state.clone(),
latest_chain_tip: latest_chain_tip.clone(),
misbehavior_sender,
};
setup_tx
.send(setup_data)

View File

@ -21,15 +21,15 @@ use futures::{
use tokio::sync::oneshot::{self, error::TryRecvError};
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, ServiceExt};
use zebra_network as zn;
use zebra_state as zs;
use zebra_network::{self as zn, PeerSocketAddr};
use zebra_state::{self as zs};
use zebra_chain::{
block::{self, Block},
serialization::ZcashSerialize,
transaction::UnminedTxId,
};
use zebra_consensus::router::RouterError;
use zebra_consensus::{router::RouterError, VerifyBlockError};
use zebra_network::{AddressBook, InventoryResponse};
use zebra_node_services::mempool;
@ -107,6 +107,9 @@ pub struct InboundSetupData {
/// Allows efficient access to the best tip of the blockchain.
pub latest_chain_tip: zs::LatestChainTip,
/// A channel to send misbehavior reports to the [`AddressBook`].
pub misbehavior_sender: tokio::sync::mpsc::Sender<(PeerSocketAddr, u32)>,
}
/// Tracks the internal state of the [`Inbound`] service during setup.
@ -148,6 +151,9 @@ pub enum Setup {
/// A service that manages cached blockchain state.
state: State,
/// A channel to send misbehavior reports to the [`AddressBook`].
misbehavior_sender: tokio::sync::mpsc::Sender<(PeerSocketAddr, u32)>,
},
/// Temporary state used in the inbound service's internal initialization code.
@ -261,6 +267,7 @@ impl Service<zn::Request> for Inbound {
mempool,
state,
latest_chain_tip,
misbehavior_sender,
} = setup_data;
let cached_peer_addr_response = CachedPeerAddrResponse::new(address_book);
@ -279,6 +286,7 @@ impl Service<zn::Request> for Inbound {
block_downloads,
mempool,
state,
misbehavior_sender,
}
}
Err(TryRecvError::Empty) => {
@ -314,13 +322,27 @@ impl Service<zn::Request> for Inbound {
mut block_downloads,
mempool,
state,
misbehavior_sender,
} => {
// # Correctness
//
// Clear the stream but ignore the final Pending return value.
// If we returned Pending here, and there were no waiting block downloads,
// then inbound requests would wait for the next block download, and hang forever.
while let Poll::Ready(Some(_)) = block_downloads.as_mut().poll_next(cx) {}
while let Poll::Ready(Some(result)) = block_downloads.as_mut().poll_next(cx) {
let Err((err, Some(advertiser_addr))) = result else {
continue;
};
let Ok(err) = err.downcast::<VerifyBlockError>() else {
continue;
};
if err.misbehavior_score() != 0 {
let _ =
misbehavior_sender.try_send((advertiser_addr, err.misbehavior_score()));
}
}
result = Ok(());
@ -329,6 +351,7 @@ impl Service<zn::Request> for Inbound {
block_downloads,
mempool,
state,
misbehavior_sender,
}
}
};
@ -362,6 +385,7 @@ impl Service<zn::Request> for Inbound {
block_downloads,
mempool,
state,
misbehavior_sender: _,
} => (cached_peer_addr_response, block_downloads, mempool, state),
_ => {
debug!("ignoring request from remote peer during setup");
@ -398,7 +422,7 @@ impl Service<zn::Request> for Inbound {
let state = state.clone();
async move {
let mut blocks: Vec<InventoryResponse<Arc<Block>, block::Hash>> = Vec::new();
let mut blocks: Vec<InventoryResponse<(Arc<Block>, Option<PeerSocketAddr>), block::Hash>> = Vec::new();
let mut total_size = 0;
// Ignore any block hashes past the response limit.
@ -422,7 +446,7 @@ impl Service<zn::Request> for Inbound {
// return the size from the state using a wrapper type.
total_size += block.zcash_serialized_size();
blocks.push(Available(block))
blocks.push(Available((block, None)))
},
// We don't need to limit the size of the missing block IDs list,
// because it is already limited to the size of the getdata request
@ -473,7 +497,7 @@ impl Service<zn::Request> for Inbound {
total_size += tx.size;
within_limit
}).map(Available);
}).map(|tx| Available((tx, None)));
// The network layer handles splitting this response into multiple `tx`
// messages, and a `notfound` message if needed.

View File

@ -20,7 +20,7 @@ use zebra_chain::{
block::{self, HeightDiff},
chain_tip::ChainTip,
};
use zebra_network as zn;
use zebra_network::{self as zn, PeerSocketAddr};
use zebra_state as zs;
use crate::components::sync::MIN_CONCURRENCY_LIMIT;
@ -107,7 +107,9 @@ where
//
/// A list of pending block download and verify tasks.
#[pin]
pending: FuturesUnordered<JoinHandle<Result<block::Hash, (BoxError, block::Hash)>>>,
pending: FuturesUnordered<
JoinHandle<Result<block::Hash, (BoxError, block::Hash, Option<PeerSocketAddr>)>>,
>,
/// A list of channels that can be used to cancel pending block download and
/// verify tasks.
@ -126,7 +128,7 @@ where
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
{
type Item = Result<block::Hash, BoxError>;
type Item = Result<block::Hash, (BoxError, Option<PeerSocketAddr>)>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
@ -145,9 +147,9 @@ where
this.cancel_handles.remove(&hash);
Poll::Ready(Some(Ok(hash)))
}
Err((e, hash)) => {
Err((e, hash, advertiser_addr)) => {
this.cancel_handles.remove(&hash);
Poll::Ready(Some(Err(e)))
Poll::Ready(Some(Err((e, advertiser_addr))))
}
}
} else {
@ -249,11 +251,13 @@ where
Ok(zs::Response::KnownBlock(Some(_))) => Err("already present".into()),
Ok(_) => unreachable!("wrong response"),
Err(e) => Err(e),
}?;
}
.map_err(|e| (e, None))?;
let block = if let zn::Response::Blocks(blocks) = network
let (block, advertiser_addr) = if let zn::Response::Blocks(blocks) = network
.oneshot(zn::Request::BlocksByHash(std::iter::once(hash).collect()))
.await?
.await
.map_err(|e| (e, None))?
{
assert_eq!(
blocks.len(),
@ -307,15 +311,18 @@ where
})
.unwrap_or(block::Height(0));
let block_height = block.coinbase_height().ok_or_else(|| {
debug!(
?hash,
"gossiped block with no height: dropped downloaded block"
);
metrics::counter!("gossip.no.height.dropped.block.count").increment(1);
let block_height = block
.coinbase_height()
.ok_or_else(|| {
debug!(
?hash,
"gossiped block with no height: dropped downloaded block"
);
metrics::counter!("gossip.no.height.dropped.block.count").increment(1);
BoxError::from("gossiped block with no height")
})?;
BoxError::from("gossiped block with no height")
})
.map_err(|e| (e, None))?;
if block_height > max_lookahead_height {
debug!(
@ -328,7 +335,7 @@ where
);
metrics::counter!("gossip.max.height.limit.dropped.block.count").increment(1);
Err("gossiped block height too far ahead")?;
Err("gossiped block height too far ahead").map_err(|e| (e.into(), None))?;
} else if block_height < min_accepted_height {
debug!(
?hash,
@ -340,13 +347,15 @@ where
);
metrics::counter!("gossip.min.height.limit.dropped.block.count").increment(1);
Err("gossiped block height behind the finalized tip")?;
Err("gossiped block height behind the finalized tip")
.map_err(|e| (e.into(), None))?;
}
verifier
.oneshot(zebra_consensus::Request::Commit(block))
.await
.map(|hash| (hash, block_height))
.map_err(|e| (e, advertiser_addr))
}
.map_ok(|(hash, height)| {
info!(?height, "downloaded and verified gossiped block");
@ -355,7 +364,7 @@ where
})
// Tack the hash onto the error so we can remove the cancel handle
// on failure as well as on success.
.map_err(move |e| (e, hash))
.map_err(move |(e, advertiser_addr)| (e, hash, advertiser_addr))
.in_current_span();
let task = tokio::spawn(async move {
@ -365,7 +374,7 @@ where
_ = &mut cancel_rx => {
trace!("task cancelled prior to completion");
metrics::counter!("gossip.cancelled.count").increment(1);
Err(("canceled".into(), hash))
Err(("canceled".into(), hash, None))
}
verification = fut => verification,
}

View File

@ -106,7 +106,7 @@ async fn mempool_requests_for_transactions() {
response,
added_transactions
.into_iter()
.map(Available)
.map(|tx| Available((tx, None)))
.collect::<Vec<_>>(),
)
}
@ -259,7 +259,10 @@ async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> {
.expect_request(Request::TransactionsById(txs))
.map(|responder| {
let unmined_transaction = UnminedTx::from(test_transaction.clone());
responder.respond(Response::Transactions(vec![Available(unmined_transaction)]))
responder.respond(Response::Transactions(vec![Available((
unmined_transaction,
None,
))]))
});
// Simulate a successful transaction verification
let verification = tx_verifier.expect_request_that(|_| true).map(|responder| {
@ -676,7 +679,7 @@ async fn inbound_block_height_lookahead_limit() -> Result<(), crate::BoxError> {
peer_set
.expect_request(Request::BlocksByHash(iter::once(block_hash).collect()))
.await
.respond(Response::Blocks(vec![Available(block)]));
.respond(Response::Blocks(vec![Available((block, None))]));
// Wait for the chain tip update
if let Err(timeout_error) = timeout(
@ -712,7 +715,7 @@ async fn inbound_block_height_lookahead_limit() -> Result<(), crate::BoxError> {
peer_set
.expect_request(Request::BlocksByHash(iter::once(block_hash).collect()))
.await
.respond(Response::Blocks(vec![Available(block)]));
.respond(Response::Blocks(vec![Available((block, None))]));
let response = state_service
.clone()
@ -808,6 +811,7 @@ async fn caches_getaddr_response() {
.service(Inbound::new(MAX_INBOUND_CONCURRENCY, setup_rx));
let inbound_service = BoxService::new(inbound_service);
let inbound_service = ServiceBuilder::new().buffer(1).service(inbound_service);
let (misbehavior_sender, _misbehavior_rx) = tokio::sync::mpsc::channel(1);
let setup_data = InboundSetupData {
address_book: address_book.clone(),
@ -816,6 +820,7 @@ async fn caches_getaddr_response() {
mempool: buffered_mempool_service.clone(),
state: state_service.clone(),
latest_chain_tip,
misbehavior_sender,
};
let r = setup_tx.send(setup_data);
// We can't expect or unwrap because the returned Result does not implement Debug
@ -963,6 +968,7 @@ async fn setup(
// Don't wait for the chain tip update here, we wait for expect_request(AdvertiseBlock) below,
// which is called by the gossip_best_tip_block_hashes task once the chain tip changes.
let (misbehavior_tx, _misbehavior_rx) = tokio::sync::mpsc::channel(1);
let (mut mempool_service, transaction_receiver) = Mempool::new(
&MempoolConfig::default(),
buffered_peer_set.clone(),
@ -971,6 +977,7 @@ async fn setup(
sync_status.clone(),
latest_chain_tip.clone(),
chain_tip_change.clone(),
misbehavior_tx,
);
// Pretend we're close to tip
@ -1031,6 +1038,7 @@ async fn setup(
let inbound_service = BoxService::new(inbound_service);
let inbound_service = ServiceBuilder::new().buffer(1).service(inbound_service);
let (misbehavior_sender, _misbehavior_rx) = tokio::sync::mpsc::channel(1);
let setup_data = InboundSetupData {
address_book,
block_download_peer_set: buffered_peer_set,
@ -1038,6 +1046,7 @@ async fn setup(
mempool: mempool_service.clone(),
state: state_service.clone(),
latest_chain_tip,
misbehavior_sender,
};
let r = setup_tx.send(setup_data);
// We can't expect or unwrap because the returned Result does not implement Debug

View File

@ -340,7 +340,8 @@ async fn outbound_tx_unrelated_response_notfound() -> Result<(), crate::BoxError
// We respond with an unrelated transaction, so the peer gives up on the request.
let unrelated_response: Transaction =
zebra_test::vectors::DUMMY_TX1.zcash_deserialize_into()?;
let unrelated_response = Response::Transactions(vec![Available(unrelated_response.into())]);
let unrelated_response =
Response::Transactions(vec![Available((unrelated_response.into(), None))]);
let (
// real services
@ -487,8 +488,8 @@ async fn outbound_tx_partial_response_notfound() -> Result<(), crate::BoxError>
let repeated_tx: Transaction = zebra_test::vectors::DUMMY_TX1.zcash_deserialize_into()?;
let repeated_tx: UnminedTx = repeated_tx.into();
let repeated_response = Response::Transactions(vec![
Available(repeated_tx.clone()),
Available(repeated_tx.clone()),
Available((repeated_tx.clone(), None)),
Available((repeated_tx.clone(), None)),
]);
let (
@ -523,6 +524,7 @@ async fn outbound_tx_partial_response_notfound() -> Result<(), crate::BoxError>
let available: Vec<UnminedTx> = tx_response
.iter()
.filter_map(InventoryResponse::available)
.map(|(tx, _)| tx)
.collect();
let missing: Vec<UnminedTxId> = tx_response
.iter()
@ -658,7 +660,7 @@ async fn setup(
..NetworkConfig::default()
};
let (mut peer_set, address_book) = zebra_network::init(
let (mut peer_set, address_book, _) = zebra_network::init(
network_config,
inbound_service.clone(),
latest_chain_tip.clone(),
@ -694,6 +696,7 @@ async fn setup(
.service(BoxService::new(mock_tx_verifier.clone()));
// Mempool
let (misbehavior_tx, _misbehavior_rx) = tokio::sync::mpsc::channel(1);
let mempool_config = MempoolConfig::default();
let (mut mempool_service, transaction_receiver) = Mempool::new(
&mempool_config,
@ -703,6 +706,7 @@ async fn setup(
sync_status.clone(),
latest_chain_tip.clone(),
chain_tip_change.clone(),
misbehavior_tx,
);
// Enable the mempool
@ -715,6 +719,7 @@ async fn setup(
.service(mempool_service);
// Initialize the inbound service
let (misbehavior_sender, _misbehavior_rx) = tokio::sync::mpsc::channel(1);
let setup_data = InboundSetupData {
address_book,
block_download_peer_set: peer_set.clone(),
@ -722,6 +727,7 @@ async fn setup(
mempool: mempool_service.clone(),
state: state_service.clone(),
latest_chain_tip,
misbehavior_sender,
};
let r = setup_tx.send(setup_data);
// We can't expect or unwrap because the returned Result does not implement Debug
@ -862,7 +868,7 @@ mod submitblock_test {
.buffer(10)
.service(BoxService::new(inbound_service));
let (peer_set, _address_book) = zebra_network::init(
let (peer_set, _address_book, _misbehavior_tx) = zebra_network::init(
network_config,
inbound_service.clone(),
latest_chain_tip.clone(),

View File

@ -27,7 +27,7 @@ use std::{
};
use futures::{future::FutureExt, stream::Stream};
use tokio::sync::{broadcast, oneshot};
use tokio::sync::{broadcast, mpsc, oneshot};
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};
use zebra_chain::{
@ -37,7 +37,7 @@ use zebra_chain::{
transaction::UnminedTxId,
};
use zebra_consensus::{error::TransactionError, transaction};
use zebra_network as zn;
use zebra_network::{self as zn, PeerSocketAddr};
use zebra_node_services::mempool::{Gossip, Request, Response};
use zebra_state as zs;
use zebra_state::{ChainTipChange, TipAction};
@ -71,7 +71,8 @@ pub use storage::{
pub use self::tests::UnboxMempoolError;
use downloads::{
Downloads as TxDownloads, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT,
Downloads as TxDownloads, TransactionDownloadVerifyError, TRANSACTION_DOWNLOAD_TIMEOUT,
TRANSACTION_VERIFY_TIMEOUT,
};
type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
@ -239,6 +240,9 @@ pub struct Mempool {
/// Used to broadcast transaction ids to peers.
transaction_sender: broadcast::Sender<HashSet<UnminedTxId>>,
/// Sender for reporting peer addresses that advertised unexpectedly invalid transactions.
misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
// Diagnostics
//
/// Queued transactions pending download or verification transmitter.
@ -263,6 +267,7 @@ pub struct Mempool {
}
impl Mempool {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
config: &Config,
outbound: Outbound,
@ -271,6 +276,7 @@ impl Mempool {
sync_status: SyncStatus,
latest_chain_tip: zs::LatestChainTip,
chain_tip_change: ChainTipChange,
misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
) -> (Self, broadcast::Receiver<HashSet<UnminedTxId>>) {
let (transaction_sender, transaction_receiver) =
tokio::sync::broadcast::channel(gossip::MAX_CHANGES_BEFORE_SEND * 2);
@ -286,6 +292,7 @@ impl Mempool {
state,
tx_verifier,
transaction_sender,
misbehavior_sender,
#[cfg(feature = "progress-bar")]
queued_count_bar: None,
#[cfg(feature = "progress-bar")]
@ -622,6 +629,19 @@ impl Service<Request> for Mempool {
}
}
Ok(Err((tx_id, error))) => {
if let TransactionDownloadVerifyError::Invalid {
error,
advertiser_addr: Some(advertiser_addr),
} = &error
{
if error.mempool_misbehavior_score() != 0 {
let _ = self.misbehavior_sender.try_send((
*advertiser_addr,
error.mempool_misbehavior_score(),
));
}
};
tracing::debug!(?tx_id, ?error, "mempool transaction failed to verify");
metrics::counter!("mempool.failed.verify.tasks.total", "reason" => error.to_string()).increment(1);

View File

@ -50,7 +50,7 @@ use zebra_chain::{
transparent,
};
use zebra_consensus::transaction as tx;
use zebra_network as zn;
use zebra_network::{self as zn, PeerSocketAddr};
use zebra_node_services::mempool::Gossip;
use zebra_state::{self as zs, CloneError};
@ -124,8 +124,11 @@ pub enum TransactionDownloadVerifyError {
#[error("transaction download / verification was cancelled")]
Cancelled,
#[error("transaction did not pass consensus validation: {0}")]
Invalid(#[from] zebra_consensus::error::TransactionError),
#[error("transaction did not pass consensus validation: {error}")]
Invalid {
error: zebra_consensus::error::TransactionError,
advertiser_addr: Option<PeerSocketAddr>,
},
}
/// Represents a [`Stream`] of download and verification tasks.
@ -330,7 +333,7 @@ where
trace!(?txid, ?next_height, "got next height");
let tx = match gossiped_tx {
let (tx, advertiser_addr) = match gossiped_tx {
Gossip::Id(txid) => {
let req = zn::Request::TransactionsById(std::iter::once(txid).collect());
@ -348,7 +351,7 @@ where
_ => unreachable!("wrong response to transaction request"),
};
let tx = tx.available().expect(
let (tx, advertiser_addr) = tx.available().expect(
"unexpected missing tx status: single tx failures should be errors",
);
@ -356,14 +359,14 @@ where
"mempool.downloaded.transactions.total",
"version" => format!("{}",tx.transaction.version()),
).increment(1);
tx
(tx, advertiser_addr)
}
Gossip::Tx(tx) => {
metrics::counter!(
"mempool.pushed.transactions.total",
"version" => format!("{}",tx.transaction.version()),
).increment(1);
tx
(tx, None)
}
};
@ -386,7 +389,7 @@ where
// Hide the transaction data to avoid filling the logs
trace!(?txid, result = ?result.as_ref().map(|_tx| ()), "verified transaction for the mempool");
result.map_err(|e| TransactionDownloadVerifyError::Invalid(e.into()))
result.map_err(|e| TransactionDownloadVerifyError::Invalid { error: e.into(), advertiser_addr } )
}
.map_ok(|(tx, spent_mempool_outpoints, tip_height)| {
metrics::counter!(

View File

@ -647,8 +647,8 @@ impl Storage {
// Consensus verification failed. Reject transaction to avoid
// having to download and verify it again just for it to fail again.
TransactionDownloadVerifyError::Invalid(e) => {
self.reject(tx_id, ExactTipRejectionError::FailedVerification(e).into())
TransactionDownloadVerifyError::Invalid { error, .. } => {
self.reject(tx_id, ExactTipRejectionError::FailedVerification(error).into())
}
}
}

View File

@ -271,6 +271,7 @@ fn setup(
let (mut chain_tip_sender, latest_chain_tip, chain_tip_change) =
ChainTipSender::new(None, network);
let (misbehavior_tx, _misbehavior_rx) = tokio::sync::mpsc::channel(1);
let (mempool, _transaction_receiver) = Mempool::new(
&Config {
tx_cost_limit: 160_000_000,
@ -282,6 +283,7 @@ fn setup(
sync_status,
latest_chain_tip,
chain_tip_change,
misbehavior_tx,
);
// sends a fake chain tip so that the mempool can be enabled

View File

@ -805,7 +805,7 @@ async fn mempool_reverifies_after_tip_change() -> Result<(), Report> {
.expect_request_that(|req| matches!(req, zn::Request::TransactionsById(_)))
.map(|responder| {
responder.respond(zn::Response::Transactions(vec![
zn::InventoryResponse::Available(tx.clone().into()),
zn::InventoryResponse::Available((tx.clone().into(), None)),
]));
})
.await;
@ -864,7 +864,7 @@ async fn mempool_reverifies_after_tip_change() -> Result<(), Report> {
.expect_request_that(|req| matches!(req, zn::Request::TransactionsById(_)))
.map(|responder| {
responder.respond(zn::Response::Transactions(vec![
zn::InventoryResponse::Available(tx.into()),
zn::InventoryResponse::Available((tx.into(), None)),
]));
})
.await;
@ -1069,7 +1069,7 @@ async fn setup(
let tx_verifier = MockService::build().for_unit_tests();
let (sync_status, recent_syncs) = SyncStatus::new();
let (misbehavior_tx, _misbehavior_rx) = tokio::sync::mpsc::channel(1);
let (mempool, mut mempool_transaction_receiver) = Mempool::new(
&mempool::Config {
tx_cost_limit,
@ -1081,6 +1081,7 @@ async fn setup(
sync_status,
latest_chain_tip,
chain_tip_change.clone(),
misbehavior_tx,
);
tokio::spawn(async move { while mempool_transaction_receiver.recv().await.is_ok() {} });

View File

@ -9,7 +9,7 @@ use futures::stream::{FuturesUnordered, StreamExt};
use indexmap::IndexSet;
use serde::{Deserialize, Serialize};
use tokio::{
sync::watch,
sync::{mpsc, watch},
task::JoinError,
time::{sleep, timeout},
};
@ -23,7 +23,7 @@ use zebra_chain::{
chain_tip::ChainTip,
};
use zebra_consensus::ParameterCheckpoint as _;
use zebra_network as zn;
use zebra_network::{self as zn, PeerSocketAddr};
use zebra_state as zs;
use crate::{
@ -380,6 +380,9 @@ where
/// Receiver that is `true` when the downloader is past the lookahead limit.
/// This is based on the downloaded block height and the state tip height.
past_lookahead_limit_receiver: zs::WatchReceiver<bool>,
/// Sender for reporting peer addresses that advertised unexpectedly invalid transactions.
misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
}
/// Polls the network to determine whether further blocks are available and
@ -425,6 +428,7 @@ where
verifier: ZV,
state: ZS,
latest_chain_tip: ZSTip,
misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
) -> (Self, SyncStatus) {
let mut download_concurrency_limit = config.sync.download_concurrency_limit;
let mut checkpoint_verify_concurrency_limit =
@ -513,6 +517,7 @@ where
prospective_tips: HashSet::new(),
recent_syncs,
past_lookahead_limit_receiver,
misbehavior_sender,
};
(new_syncer, sync_status)
@ -1094,10 +1099,23 @@ where
Ok((height, hash)) => {
trace!(?height, ?hash, "verified and committed block to state");
Ok(())
return Ok(());
}
Err(_) => Self::handle_response(response),
}
Err(BlockDownloadVerifyError::Invalid {
ref error,
advertiser_addr: Some(advertiser_addr),
..
}) if error.misbehavior_score() != 0 => {
let _ = self
.misbehavior_sender
.try_send((advertiser_addr, error.misbehavior_score()));
}
Err(_) => {}
};
Self::handle_response(response)
}
/// Handles a response to block hash submission, passing through any extra hashes.

View File

@ -27,7 +27,7 @@ use zebra_chain::{
block::{self, Height, HeightDiff},
chain_tip::ChainTip,
};
use zebra_network as zn;
use zebra_network::{self as zn, PeerSocketAddr};
use zebra_state as zs;
use crate::components::sync::{
@ -125,6 +125,7 @@ pub enum BlockDownloadVerifyError {
error: zebra_consensus::router::RouterError,
height: block::Height,
hash: block::Hash,
advertiser_addr: Option<PeerSocketAddr>,
},
#[error("block validation request failed: {error:?} {height:?} {hash:?}")]
@ -374,7 +375,7 @@ where
rsp = block_req => rsp.map_err(|error| BlockDownloadVerifyError::DownloadFailed { error, hash})?,
};
let block = if let zn::Response::Blocks(blocks) = rsp {
let (block, advertiser_addr) = if let zn::Response::Blocks(blocks) = rsp {
assert_eq!(
blocks.len(),
1,
@ -550,7 +551,7 @@ where
.map(|hash| (block_height, hash))
.map_err(|err| {
match err.downcast::<zebra_consensus::router::RouterError>() {
Ok(error) => BlockDownloadVerifyError::Invalid { error: *error, height: block_height, hash },
Ok(error) => BlockDownloadVerifyError::Invalid { error: *error, height: block_height, hash, advertiser_addr },
Err(error) => BlockDownloadVerifyError::ValidationRequestError { error, height: block_height, hash },
}
})

View File

@ -158,6 +158,7 @@ fn request_genesis_is_rate_limited() {
);
// start the sync
let (misbehavior_tx, _misbehavior_rx) = tokio::sync::mpsc::channel(1);
let (mut chain_sync, _) = ChainSync::new(
&ZebradConfig::default(),
Height(0),
@ -165,6 +166,7 @@ fn request_genesis_is_rate_limited() {
verifier_service,
state_service,
latest_chain_tip,
misbehavior_tx,
);
// run `request_genesis()` with a timeout of 13 seconds

View File

@ -86,7 +86,10 @@ async fn sync_blocks_ok() -> Result<(), crate::BoxError> {
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![Available(block0.clone())]));
.respond(zn::Response::Blocks(vec![Available((
block0.clone(),
None,
))]));
block_verifier_router
.expect_request(zebra_consensus::Request::Commit(block0))
@ -160,11 +163,17 @@ async fn sync_blocks_ok() -> Result<(), crate::BoxError> {
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block1_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![Available(block1.clone())]));
.respond(zn::Response::Blocks(vec![Available((
block1.clone(),
None,
))]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block2_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![Available(block2.clone())]));
.respond(zn::Response::Blocks(vec![Available((
block2.clone(),
None,
))]));
// We can't guarantee the verification request order
let mut remaining_blocks: HashMap<block::Hash, Arc<Block>> =
@ -224,11 +233,17 @@ async fn sync_blocks_ok() -> Result<(), crate::BoxError> {
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block3_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![Available(block3.clone())]));
.respond(zn::Response::Blocks(vec![Available((
block3.clone(),
None,
))]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block4_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![Available(block4.clone())]));
.respond(zn::Response::Blocks(vec![Available((
block4.clone(),
None,
))]));
// We can't guarantee the verification request order
let mut remaining_blocks: HashMap<block::Hash, Arc<Block>> =
@ -313,7 +328,10 @@ async fn sync_blocks_duplicate_hashes_ok() -> Result<(), crate::BoxError> {
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![Available(block0.clone())]));
.respond(zn::Response::Blocks(vec![Available((
block0.clone(),
None,
))]));
block_verifier_router
.expect_request(zebra_consensus::Request::Commit(block0))
@ -389,11 +407,17 @@ async fn sync_blocks_duplicate_hashes_ok() -> Result<(), crate::BoxError> {
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block1_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![Available(block1.clone())]));
.respond(zn::Response::Blocks(vec![Available((
block1.clone(),
None,
))]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block2_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![Available(block2.clone())]));
.respond(zn::Response::Blocks(vec![Available((
block2.clone(),
None,
))]));
// We can't guarantee the verification request order
let mut remaining_blocks: HashMap<block::Hash, Arc<Block>> =
@ -455,11 +479,17 @@ async fn sync_blocks_duplicate_hashes_ok() -> Result<(), crate::BoxError> {
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block3_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![Available(block3.clone())]));
.respond(zn::Response::Blocks(vec![Available((
block3.clone(),
None,
))]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block4_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![Available(block4.clone())]));
.respond(zn::Response::Blocks(vec![Available((
block4.clone(),
None,
))]));
// We can't guarantee the verification request order
let mut remaining_blocks: HashMap<block::Hash, Arc<Block>> =
@ -530,7 +560,10 @@ async fn sync_block_lookahead_drop() -> Result<(), crate::BoxError> {
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![Available(block982k.clone())]));
.respond(zn::Response::Blocks(vec![Available((
block982k.clone(),
None,
))]));
// Block is dropped because it is too far ahead of the tip.
// We expect more requests to the state service, because the syncer keeps on running.
@ -595,7 +628,10 @@ async fn sync_block_too_high_obtain_tips() -> Result<(), crate::BoxError> {
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![Available(block0.clone())]));
.respond(zn::Response::Blocks(vec![Available((
block0.clone(),
None,
))]));
block_verifier_router
.expect_request(zebra_consensus::Request::Commit(block0))
@ -677,15 +713,24 @@ async fn sync_block_too_high_obtain_tips() -> Result<(), crate::BoxError> {
iter::once(block982k_hash).collect(),
))
.await
.respond(zn::Response::Blocks(vec![Available(block982k.clone())]));
.respond(zn::Response::Blocks(vec![Available((
block982k.clone(),
None,
))]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block1_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![Available(block1.clone())]));
.respond(zn::Response::Blocks(vec![Available((
block1.clone(),
None,
))]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block2_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![Available(block2.clone())]));
.respond(zn::Response::Blocks(vec![Available((
block2.clone(),
None,
))]));
// At this point, the following tasks race:
// - The valid chain verifier requests
@ -756,7 +801,10 @@ async fn sync_block_too_high_extend_tips() -> Result<(), crate::BoxError> {
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![Available(block0.clone())]));
.respond(zn::Response::Blocks(vec![Available((
block0.clone(),
None,
))]));
block_verifier_router
.expect_request(zebra_consensus::Request::Commit(block0))
@ -830,11 +878,17 @@ async fn sync_block_too_high_extend_tips() -> Result<(), crate::BoxError> {
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block1_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![Available(block1.clone())]));
.respond(zn::Response::Blocks(vec![Available((
block1.clone(),
None,
))]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block2_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![Available(block2.clone())]));
.respond(zn::Response::Blocks(vec![Available((
block2.clone(),
None,
))]));
// We can't guarantee the verification request order
let mut remaining_blocks: HashMap<block::Hash, Arc<Block>> =
@ -896,17 +950,26 @@ async fn sync_block_too_high_extend_tips() -> Result<(), crate::BoxError> {
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block3_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![Available(block3.clone())]));
.respond(zn::Response::Blocks(vec![Available((
block3.clone(),
None,
))]));
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block4_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![Available(block4.clone())]));
.respond(zn::Response::Blocks(vec![Available((
block4.clone(),
None,
))]));
peer_set
.expect_request(zn::Request::BlocksByHash(
iter::once(block982k_hash).collect(),
))
.await
.respond(zn::Response::Blocks(vec![Available(block982k.clone())]));
.respond(zn::Response::Blocks(vec![Available((
block982k.clone(),
None,
))]));
// At this point, the following tasks race:
// - The valid chain verifier requests
@ -961,6 +1024,7 @@ fn setup() -> (
let (mock_chain_tip, mock_chain_tip_sender) = MockChainTip::new();
let (misbehavior_tx, _misbehavior_rx) = tokio::sync::mpsc::channel(1);
let (chain_sync, sync_status) = ChainSync::new(
&config,
Height(0),
@ -968,6 +1032,7 @@ fn setup() -> (
block_verifier_router.clone(),
state_service.clone(),
mock_chain_tip,
misbehavior_tx,
);
let chain_sync_future = chain_sync.sync();

View File

@ -3715,3 +3715,176 @@ fn check_no_git_refs_in_cargo_lock() {
panic!("Cargo.lock includes git sources")
}
}
/// Check that Zebra will disconnect from misbehaving peers.
#[tokio::test]
#[cfg(all(feature = "getblocktemplate-rpcs", not(target_os = "windows")))]
async fn disconnects_from_misbehaving_peers() -> Result<()> {
use std::sync::{atomic::AtomicBool, Arc};
use common::regtest::MiningRpcMethods;
use zebra_chain::parameters::testnet::{self, ConfiguredActivationHeights};
use zebra_rpc::methods::get_block_template_rpcs::types::peer_info::PeerInfo;
let _init_guard = zebra_test::init();
let network = testnet::Parameters::build()
.with_activation_heights(ConfiguredActivationHeights {
canopy: Some(1),
nu5: Some(2),
nu6: Some(3),
..Default::default()
})
.with_slow_start_interval(Height::MIN)
.with_disable_pow(true)
.to_network();
let test_type = LaunchWithEmptyState {
launches_lightwalletd: false,
};
let test_name = "disconnects_from_misbehaving_peers_test";
if !common::launch::can_spawn_zebrad_for_test_type(test_name, test_type, false) {
tracing::warn!("skipping disconnects_from_misbehaving_peers test");
return Ok(());
}
// Get the zebrad config
let mut config = test_type
.zebrad_config(test_name, false, None, &network)
.expect("already checked config")?;
config.network.cache_dir = false.into();
config.network.listen_addr = format!("127.0.0.1:{}", random_known_port()).parse()?;
let rpc_listen_addr = config.rpc.listen_addr.unwrap();
let rpc_client_1 = RpcRequestClient::new(rpc_listen_addr);
tracing::info!(
?rpc_listen_addr,
network_listen_addr = ?config.network.listen_addr,
"starting a zebrad child on incompatible custom Testnet"
);
let is_finished = Arc::new(AtomicBool::new(false));
{
let is_finished = Arc::clone(&is_finished);
let config = config.clone();
let (zebrad_failure_messages, zebrad_ignore_messages) = test_type.zebrad_failure_messages();
tokio::task::spawn_blocking(move || -> Result<()> {
let mut zebrad_child = testdir()?
.with_exact_config(&config)?
.spawn_child(args!["start"])?
.bypass_test_capture(true)
.with_timeout(test_type.zebrad_timeout())
.with_failure_regex_iter(zebrad_failure_messages, zebrad_ignore_messages);
while !is_finished.load(std::sync::atomic::Ordering::SeqCst) {
zebrad_child.wait_for_stdout_line(Some("zebraA1".to_string()));
}
Ok(())
});
}
config.network.initial_testnet_peers = [config.network.listen_addr.to_string()].into();
config.network.network = Network::new_default_testnet();
config.network.listen_addr = "127.0.0.1:0".parse()?;
config.rpc.listen_addr = Some(format!("127.0.0.1:{}", random_known_port()).parse()?);
let rpc_listen_addr = config.rpc.listen_addr.unwrap();
let rpc_client_2 = RpcRequestClient::new(rpc_listen_addr);
tracing::info!(
?rpc_listen_addr,
network_listen_addr = ?config.network.listen_addr,
"starting a zebrad child on the default Testnet"
);
{
let is_finished = Arc::clone(&is_finished);
tokio::task::spawn_blocking(move || -> Result<()> {
let (zebrad_failure_messages, zebrad_ignore_messages) =
test_type.zebrad_failure_messages();
let mut zebrad_child = testdir()?
.with_exact_config(&config)?
.spawn_child(args!["start"])?
.bypass_test_capture(true)
.with_timeout(test_type.zebrad_timeout())
.with_failure_regex_iter(zebrad_failure_messages, zebrad_ignore_messages);
while !is_finished.load(std::sync::atomic::Ordering::SeqCst) {
zebrad_child.wait_for_stdout_line(Some("zebraB2".to_string()));
}
Ok(())
});
}
tracing::info!("waiting for zebrad nodes to connect");
// Wait a few seconds for Zebra to start up and make outbound peer connections
tokio::time::sleep(LAUNCH_DELAY).await;
tracing::info!("checking for peers");
// Call `getpeerinfo` to check that the zebrad instances have connected
let peer_info: Vec<PeerInfo> = rpc_client_2
.json_result_from_call("getpeerinfo", "[]")
.await
.map_err(|err| eyre!(err))?;
assert!(!peer_info.is_empty(), "should have outbound peer");
tracing::info!(
?peer_info,
"found peer connection, committing genesis block"
);
let genesis_block = network.block_parsed_iter().next().unwrap();
rpc_client_1.submit_block(genesis_block.clone()).await?;
rpc_client_2.submit_block(genesis_block).await?;
// Call the `generate` method to mine blocks in the zebrad instance where PoW is disabled
tracing::info!("committed genesis block, mining blocks with invalid PoW");
tokio::time::sleep(Duration::from_secs(2)).await;
rpc_client_1.call("generate", "[500]").await?;
tracing::info!("wait for misbehavior messages to flush into address updater channel");
tokio::time::sleep(Duration::from_secs(30)).await;
tracing::info!("calling getpeerinfo to confirm Zebra has dropped the peer connection");
// Call `getpeerinfo` to check that the zebrad instances have disconnected
for i in 0..600 {
let peer_info: Vec<PeerInfo> = rpc_client_2
.json_result_from_call("getpeerinfo", "[]")
.await
.map_err(|err| eyre!(err))?;
if peer_info.is_empty() {
break;
} else if i % 10 == 0 {
tracing::info!(?peer_info, "has not yet disconnected from misbehaving peer");
}
rpc_client_1.call("generate", "[1]").await?;
tokio::time::sleep(Duration::from_secs(1)).await;
}
let peer_info: Vec<PeerInfo> = rpc_client_2
.json_result_from_call("getpeerinfo", "[]")
.await
.map_err(|err| eyre!(err))?;
tracing::info!(?peer_info, "called getpeerinfo");
assert!(peer_info.is_empty(), "should have no peers");
is_finished.store(true, std::sync::atomic::Ordering::SeqCst);
Ok(())
}

View File

@ -162,7 +162,7 @@ pub(crate) async fn run() -> Result<()> {
/// or `ProposalResponse` in 'proposal' mode.
async fn try_validate_block_template(client: &RpcRequestClient) -> Result<()> {
let mut response_json_result: GetBlockTemplate = client
.json_result_from_call("getblocktemplate", "[]".to_string())
.json_result_from_call("getblocktemplate", "[]")
.await
.expect("response should be success output with a serialized `GetBlockTemplate`");

View File

@ -40,7 +40,7 @@ pub(crate) async fn run() -> Result<()> {
// call `getpeerinfo` RPC method
let peer_info_result: Vec<PeerInfo> = RpcRequestClient::new(rpc_address)
.json_result_from_call("getpeerinfo", "[]".to_string())
.json_result_from_call("getpeerinfo", "[]")
.await
.map_err(|err| eyre!(err))?;