fix(security): Rate-limit and size-limit peer transaction ID messages (#6625)
* Update MAX_TX_INV_IN_MESSAGE for ZIP-239 WTX IDs * Combine multiple transaction updates into a single gossip & rate-limit gossips * Rate-limit block gossips * Fix mempool_transaction_expiration gossip test timings * Enforce MAX_TX_INV_IN_MESSAGE in the network layer, rather than each service * Fix documentation for `Message::Tx` * Split MAX_INV_IN_RECEIVED_MESSAGE and MAX_TX_INV_IN_SENT_MESSAGE * Fix log message typo * Move some docs to/from another PR --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
parent
b0d9471214
commit
dc5198959e
|
@ -181,7 +181,7 @@ pub use crate::{
|
||||||
peer_set::init,
|
peer_set::init,
|
||||||
policies::RetryLimit,
|
policies::RetryLimit,
|
||||||
protocol::{
|
protocol::{
|
||||||
external::{Version, VersionMessage},
|
external::{Version, VersionMessage, MAX_TX_INV_IN_SENT_MESSAGE},
|
||||||
internal::{InventoryResponse, Request, Response},
|
internal::{InventoryResponse, Request, Response},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
|
@ -37,7 +37,7 @@ use crate::{
|
||||||
external::{types::Nonce, InventoryHash, Message},
|
external::{types::Nonce, InventoryHash, Message},
|
||||||
internal::{InventoryResponse, Request, Response},
|
internal::{InventoryResponse, Request, Response},
|
||||||
},
|
},
|
||||||
BoxError,
|
BoxError, MAX_TX_INV_IN_SENT_MESSAGE,
|
||||||
};
|
};
|
||||||
|
|
||||||
use InventoryResponse::*;
|
use InventoryResponse::*;
|
||||||
|
@ -992,9 +992,31 @@ where
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
(AwaitingRequest, AdvertiseTransactionIds(hashes)) => {
|
(AwaitingRequest, AdvertiseTransactionIds(hashes)) => {
|
||||||
|
let max_tx_inv_in_message: usize = MAX_TX_INV_IN_SENT_MESSAGE
|
||||||
|
.try_into()
|
||||||
|
.expect("constant fits in usize");
|
||||||
|
|
||||||
|
// # Security
|
||||||
|
//
|
||||||
|
// In most cases, we try to split over-sized requests into multiple network-layer
|
||||||
|
// messages. But we are unlikely to reach this limit with the default mempool
|
||||||
|
// config, so a gossip like this could indicate a network amplification attack.
|
||||||
|
//
|
||||||
|
// This limit is particularly important here, because advertisements send the same
|
||||||
|
// message to half our available peers.
|
||||||
|
//
|
||||||
|
// If there are thousands of transactions in the mempool, letting peers know the
|
||||||
|
// exact transactions we have isn't that important, so it's ok to drop arbitrary
|
||||||
|
// transaction hashes from our response.
|
||||||
|
if hashes.len() > max_tx_inv_in_message {
|
||||||
|
debug!(inv_count = ?hashes.len(), ?MAX_TX_INV_IN_SENT_MESSAGE, "unusually large transaction ID gossip");
|
||||||
|
}
|
||||||
|
|
||||||
|
let hashes = hashes.into_iter().take(max_tx_inv_in_message).map(Into::into).collect();
|
||||||
|
|
||||||
self
|
self
|
||||||
.peer_tx
|
.peer_tx
|
||||||
.send(Message::Inv(hashes.iter().map(|h| (*h).into()).collect()))
|
.send(Message::Inv(hashes))
|
||||||
.await
|
.await
|
||||||
.map(|()|
|
.map(|()|
|
||||||
Handler::Finished(Ok(Response::Nil))
|
Handler::Finished(Ok(Response::Nil))
|
||||||
|
@ -1351,11 +1373,30 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Response::TransactionIds(hashes) => {
|
Response::TransactionIds(hashes) => {
|
||||||
if let Err(e) = self
|
let max_tx_inv_in_message: usize = MAX_TX_INV_IN_SENT_MESSAGE
|
||||||
.peer_tx
|
.try_into()
|
||||||
.send(Message::Inv(hashes.into_iter().map(Into::into).collect()))
|
.expect("constant fits in usize");
|
||||||
.await
|
|
||||||
{
|
// # Security
|
||||||
|
//
|
||||||
|
// In most cases, we try to split over-sized responses into multiple network-layer
|
||||||
|
// messages. But we are unlikely to reach this limit with the default mempool
|
||||||
|
// config, so a response like this could indicate a network amplification attack.
|
||||||
|
//
|
||||||
|
// If there are thousands of transactions in the mempool, letting peers know the
|
||||||
|
// exact transactions we have isn't that important, so it's ok to drop arbitrary
|
||||||
|
// transaction hashes from our response.
|
||||||
|
if hashes.len() > max_tx_inv_in_message {
|
||||||
|
debug!(inv_count = ?hashes.len(), ?MAX_TX_INV_IN_SENT_MESSAGE, "unusually large transaction ID response");
|
||||||
|
}
|
||||||
|
|
||||||
|
let hashes = hashes
|
||||||
|
.into_iter()
|
||||||
|
.take(max_tx_inv_in_message)
|
||||||
|
.map(Into::into)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if let Err(e) = self.peer_tx.send(Message::Inv(hashes)).await {
|
||||||
self.fail_with(e)
|
self.fail_with(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,7 @@ mod tests;
|
||||||
|
|
||||||
pub use addr::{canonical_peer_addr, canonical_socket_addr, AddrInVersion};
|
pub use addr::{canonical_peer_addr, canonical_socket_addr, AddrInVersion};
|
||||||
pub use codec::Codec;
|
pub use codec::Codec;
|
||||||
pub use inv::InventoryHash;
|
pub use inv::{InventoryHash, MAX_TX_INV_IN_SENT_MESSAGE};
|
||||||
pub use message::{Message, VersionMessage};
|
pub use message::{Message, VersionMessage};
|
||||||
pub use types::{Nonce, Version};
|
pub use types::{Nonce, Version};
|
||||||
|
|
||||||
|
|
|
@ -179,12 +179,26 @@ impl ZcashDeserialize for InventoryHash {
|
||||||
/// The minimum serialized size of an [`InventoryHash`].
|
/// The minimum serialized size of an [`InventoryHash`].
|
||||||
pub(crate) const MIN_INV_HASH_SIZE: usize = 36;
|
pub(crate) const MIN_INV_HASH_SIZE: usize = 36;
|
||||||
|
|
||||||
/// The maximum number of transaction inventory items in a network message.
|
/// The maximum number of inventory items in a network message received from a peer.
|
||||||
/// We also use this limit for block inventory, because it is typically much smaller.
|
///
|
||||||
|
/// After [ZIP-239](https://zips.z.cash/zip-0239#deployment), this would allow a message filled
|
||||||
|
/// with `MSG_WTX` entries to be around 3.4 MB, so we also need a separate constant to limit the
|
||||||
|
/// number of `inv` entries that we send.
|
||||||
///
|
///
|
||||||
/// Same as `MAX_INV_SZ` in `zcashd`:
|
/// Same as `MAX_INV_SZ` in `zcashd`:
|
||||||
/// <https://github.com/zcash/zcash/blob/adfc7218435faa1c8985a727f997a795dcffa0c7/src/net.h#L50>
|
/// <https://github.com/zcash/zcash/blob/adfc7218435faa1c8985a727f997a795dcffa0c7/src/net.h#L50>
|
||||||
pub const MAX_TX_INV_IN_MESSAGE: u64 = 50_000;
|
pub const MAX_INV_IN_RECEIVED_MESSAGE: u64 = 50_000;
|
||||||
|
|
||||||
|
/// The maximum number of transaction inventory items in a network message received from a peer.
|
||||||
|
///
|
||||||
|
/// After [ZIP-239](https://zips.z.cash/zip-0239#deployment), this would allow a message filled
|
||||||
|
/// with `MSG_WTX` entries to be around 3.4 MB, so we also need a separate constant to limit the
|
||||||
|
/// number of `inv` entries that we send.
|
||||||
|
///
|
||||||
|
/// This constant is not critical to compatibility: it just needs to be less than or equal to
|
||||||
|
/// `zcashd`'s `MAX_INV_SZ`:
|
||||||
|
/// <https://github.com/zcash/zcash/blob/adfc7218435faa1c8985a727f997a795dcffa0c7/src/net.h#L50>
|
||||||
|
pub const MAX_TX_INV_IN_SENT_MESSAGE: u64 = 25_000;
|
||||||
|
|
||||||
impl TrustedPreallocate for InventoryHash {
|
impl TrustedPreallocate for InventoryHash {
|
||||||
fn max_allocation() -> u64 {
|
fn max_allocation() -> u64 {
|
||||||
|
@ -193,6 +207,6 @@ impl TrustedPreallocate for InventoryHash {
|
||||||
// a single message
|
// a single message
|
||||||
let message_size_limit = ((MAX_PROTOCOL_MESSAGE_LEN - 1) / MIN_INV_HASH_SIZE) as u64;
|
let message_size_limit = ((MAX_PROTOCOL_MESSAGE_LEN - 1) / MIN_INV_HASH_SIZE) as u64;
|
||||||
|
|
||||||
min(message_size_limit, MAX_TX_INV_IN_MESSAGE)
|
min(message_size_limit, MAX_INV_IN_RECEIVED_MESSAGE)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -202,7 +202,11 @@ pub enum Message {
|
||||||
|
|
||||||
/// A `tx` message.
|
/// A `tx` message.
|
||||||
///
|
///
|
||||||
/// This message is used to advertise unmined transactions for the mempool.
|
/// This message can be used to:
|
||||||
|
/// - send unmined transactions in response to `GetData` requests, and
|
||||||
|
/// - advertise unmined transactions for the mempool.
|
||||||
|
///
|
||||||
|
/// Zebra chooses to advertise new transactions using `Inv(hash)` rather than `Tx(transaction)`.
|
||||||
///
|
///
|
||||||
/// [Bitcoin reference](https://en.bitcoin.it/wiki/Protocol_documentation#tx)
|
/// [Bitcoin reference](https://en.bitcoin.it/wiki/Protocol_documentation#tx)
|
||||||
Tx(UnminedTx),
|
Tx(UnminedTx),
|
||||||
|
|
|
@ -16,7 +16,7 @@ use crate::{
|
||||||
meta_addr::MetaAddr,
|
meta_addr::MetaAddr,
|
||||||
protocol::external::{
|
protocol::external::{
|
||||||
addr::{AddrV1, AddrV2, ADDR_V1_SIZE, ADDR_V2_MIN_SIZE},
|
addr::{AddrV1, AddrV2, ADDR_V1_SIZE, ADDR_V2_MIN_SIZE},
|
||||||
inv::{InventoryHash, MAX_TX_INV_IN_MESSAGE},
|
inv::{InventoryHash, MAX_INV_IN_RECEIVED_MESSAGE},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -66,7 +66,7 @@ proptest! {
|
||||||
//
|
//
|
||||||
// Special case: Zcash has a slightly smaller limit for transaction invs,
|
// Special case: Zcash has a slightly smaller limit for transaction invs,
|
||||||
// so we use it for all invs.
|
// so we use it for all invs.
|
||||||
prop_assert!(smallest_disallowed_serialized.len() > min(MAX_PROTOCOL_MESSAGE_LEN, usize::try_from(MAX_TX_INV_IN_MESSAGE).expect("fits in usize")));
|
prop_assert!(smallest_disallowed_serialized.len() > min(MAX_PROTOCOL_MESSAGE_LEN, usize::try_from(MAX_INV_IN_RECEIVED_MESSAGE).expect("fits in usize")));
|
||||||
|
|
||||||
// Create largest_allowed_vec by removing one element from smallest_disallowed_vec without copying (for efficiency)
|
// Create largest_allowed_vec by removing one element from smallest_disallowed_vec without copying (for efficiency)
|
||||||
smallest_disallowed_vec.pop();
|
smallest_disallowed_vec.pop();
|
||||||
|
|
|
@ -148,14 +148,11 @@ pub enum Request {
|
||||||
|
|
||||||
/// Advertise a set of unmined transactions to all peers.
|
/// Advertise a set of unmined transactions to all peers.
|
||||||
///
|
///
|
||||||
/// This is intended to be used in Zebra with a single transaction at a time
|
/// Both Zebra and zcashd sometimes advertise multiple transactions at once.
|
||||||
/// (set of size 1), but multiple transactions are permitted because this is
|
|
||||||
/// how we interpret advertisements from zcashd, which sometimes advertises
|
|
||||||
/// multiple transactions at once.
|
|
||||||
///
|
///
|
||||||
/// This is implemented by sending an `inv` message containing the unmined
|
/// This is implemented by sending an `inv` message containing the unmined
|
||||||
/// transaction ID, allowing the remote peer to choose whether to download
|
/// transaction IDs, allowing the remote peer to choose whether to download
|
||||||
/// it. Remote peers who choose to download the transaction will generate a
|
/// them. Remote peers who choose to download the transaction will generate a
|
||||||
/// [`Request::TransactionsById`] against the "inbound" service passed to
|
/// [`Request::TransactionsById`] against the "inbound" service passed to
|
||||||
/// [`init`](crate::init).
|
/// [`init`](crate::init).
|
||||||
///
|
///
|
||||||
|
|
|
@ -468,6 +468,7 @@ impl Service<zn::Request> for Inbound {
|
||||||
block_downloads.download_and_verify(hash);
|
block_downloads.download_and_verify(hash);
|
||||||
async { Ok(zn::Response::Nil) }.boxed()
|
async { Ok(zn::Response::Nil) }.boxed()
|
||||||
}
|
}
|
||||||
|
// The size of this response is limited by the `Connection` state machine in the network layer
|
||||||
zn::Request::MempoolTransactionIds => {
|
zn::Request::MempoolTransactionIds => {
|
||||||
mempool.clone().oneshot(mempool::Request::TransactionIds).map_ok(|resp| match resp {
|
mempool.clone().oneshot(mempool::Request::TransactionIds).map_ok(|resp| match resp {
|
||||||
mempool::Response::TransactionIds(transaction_ids) if transaction_ids.is_empty() => zn::Response::Nil,
|
mempool::Response::TransactionIds(transaction_ids) if transaction_ids.is_empty() => zn::Response::Nil,
|
||||||
|
|
|
@ -35,7 +35,7 @@ use crate::{
|
||||||
gossip_mempool_transaction_id, unmined_transactions_in_blocks, Config as MempoolConfig,
|
gossip_mempool_transaction_id, unmined_transactions_in_blocks, Config as MempoolConfig,
|
||||||
Mempool, MempoolError, SameEffectsChainRejectionError, UnboxMempoolError,
|
Mempool, MempoolError, SameEffectsChainRejectionError, UnboxMempoolError,
|
||||||
},
|
},
|
||||||
sync::{self, BlockGossipError, SyncStatus},
|
sync::{self, BlockGossipError, SyncStatus, TIPS_RESPONSE_TIMEOUT},
|
||||||
},
|
},
|
||||||
BoxError,
|
BoxError,
|
||||||
};
|
};
|
||||||
|
@ -420,7 +420,8 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
|
||||||
let mut hs = HashSet::new();
|
let mut hs = HashSet::new();
|
||||||
hs.insert(tx1_id);
|
hs.insert(tx1_id);
|
||||||
|
|
||||||
// Transaction and Block IDs are gossipped, in any order
|
// Transaction and Block IDs are gossipped, in any order, after waiting for the gossip delay
|
||||||
|
tokio::time::sleep(TIPS_RESPONSE_TIMEOUT).await;
|
||||||
let possible_requests = &mut [
|
let possible_requests = &mut [
|
||||||
Request::AdvertiseTransactionIds(hs),
|
Request::AdvertiseTransactionIds(hs),
|
||||||
Request::AdvertiseBlock(block_two.hash()),
|
Request::AdvertiseBlock(block_two.hash()),
|
||||||
|
@ -488,7 +489,8 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// Block is gossiped
|
// Test the block is gossiped, after waiting for the multi-gossip delay
|
||||||
|
tokio::time::sleep(TIPS_RESPONSE_TIMEOUT).await;
|
||||||
peer_set
|
peer_set
|
||||||
.expect_request(Request::AdvertiseBlock(block_three.hash()))
|
.expect_request(Request::AdvertiseBlock(block_three.hash()))
|
||||||
.await
|
.await
|
||||||
|
@ -564,7 +566,9 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
|
||||||
MempoolError::StorageEffectsChain(SameEffectsChainRejectionError::Expired)
|
MempoolError::StorageEffectsChain(SameEffectsChainRejectionError::Expired)
|
||||||
);
|
);
|
||||||
|
|
||||||
// Test transaction 2 is gossiped
|
// Test transaction 2 is gossiped, after waiting for the multi-gossip delay
|
||||||
|
tokio::time::sleep(TIPS_RESPONSE_TIMEOUT).await;
|
||||||
|
|
||||||
let mut hs = HashSet::new();
|
let mut hs = HashSet::new();
|
||||||
hs.insert(tx2_id);
|
hs.insert(tx2_id);
|
||||||
peer_set
|
peer_set
|
||||||
|
@ -583,18 +587,6 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
|
||||||
zebra_test::vectors::BLOCK_MAINNET_6_BYTES
|
zebra_test::vectors::BLOCK_MAINNET_6_BYTES
|
||||||
.zcash_deserialize_into()
|
.zcash_deserialize_into()
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
zebra_test::vectors::BLOCK_MAINNET_7_BYTES
|
|
||||||
.zcash_deserialize_into()
|
|
||||||
.unwrap(),
|
|
||||||
zebra_test::vectors::BLOCK_MAINNET_8_BYTES
|
|
||||||
.zcash_deserialize_into()
|
|
||||||
.unwrap(),
|
|
||||||
zebra_test::vectors::BLOCK_MAINNET_9_BYTES
|
|
||||||
.zcash_deserialize_into()
|
|
||||||
.unwrap(),
|
|
||||||
zebra_test::vectors::BLOCK_MAINNET_10_BYTES
|
|
||||||
.zcash_deserialize_into()
|
|
||||||
.unwrap(),
|
|
||||||
];
|
];
|
||||||
for block in more_blocks {
|
for block in more_blocks {
|
||||||
state_service
|
state_service
|
||||||
|
@ -605,7 +597,8 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// Block is gossiped
|
// Test the block is gossiped, after waiting for the multi-gossip delay
|
||||||
|
tokio::time::sleep(TIPS_RESPONSE_TIMEOUT).await;
|
||||||
peer_set
|
peer_set
|
||||||
.expect_request(Request::AdvertiseBlock(block.hash()))
|
.expect_request(Request::AdvertiseBlock(block.hash()))
|
||||||
.await
|
.await
|
||||||
|
|
|
@ -27,7 +27,7 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures::{future::FutureExt, stream::Stream};
|
use futures::{future::FutureExt, stream::Stream};
|
||||||
use tokio::sync::watch;
|
use tokio::sync::broadcast;
|
||||||
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};
|
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};
|
||||||
|
|
||||||
use zebra_chain::{
|
use zebra_chain::{
|
||||||
|
@ -233,7 +233,7 @@ pub struct Mempool {
|
||||||
|
|
||||||
/// Sender part of a gossip transactions channel.
|
/// Sender part of a gossip transactions channel.
|
||||||
/// Used to broadcast transaction ids to peers.
|
/// Used to broadcast transaction ids to peers.
|
||||||
transaction_sender: watch::Sender<HashSet<UnminedTxId>>,
|
transaction_sender: broadcast::Sender<HashSet<UnminedTxId>>,
|
||||||
|
|
||||||
// Diagnostics
|
// Diagnostics
|
||||||
//
|
//
|
||||||
|
@ -267,9 +267,9 @@ impl Mempool {
|
||||||
sync_status: SyncStatus,
|
sync_status: SyncStatus,
|
||||||
latest_chain_tip: zs::LatestChainTip,
|
latest_chain_tip: zs::LatestChainTip,
|
||||||
chain_tip_change: ChainTipChange,
|
chain_tip_change: ChainTipChange,
|
||||||
) -> (Self, watch::Receiver<HashSet<UnminedTxId>>) {
|
) -> (Self, broadcast::Receiver<HashSet<UnminedTxId>>) {
|
||||||
let (transaction_sender, transaction_receiver) =
|
let (transaction_sender, transaction_receiver) =
|
||||||
tokio::sync::watch::channel(HashSet::new());
|
tokio::sync::broadcast::channel(gossip::MAX_CHANGES_BEFORE_SEND * 2);
|
||||||
|
|
||||||
let mut service = Mempool {
|
let mut service = Mempool {
|
||||||
config: config.clone(),
|
config: config.clone(),
|
||||||
|
@ -659,9 +659,6 @@ impl Service<Request> for Mempool {
|
||||||
if !send_to_peers_ids.is_empty() {
|
if !send_to_peers_ids.is_empty() {
|
||||||
tracing::trace!(?send_to_peers_ids, "sending new transactions to peers");
|
tracing::trace!(?send_to_peers_ids, "sending new transactions to peers");
|
||||||
|
|
||||||
// TODO:
|
|
||||||
// - if the transaction gossip task is slow, we can overwrite unsent IDs here
|
|
||||||
// - does this happen often enough to be worth a fix?
|
|
||||||
self.transaction_sender.send(send_to_peers_ids)?;
|
self.transaction_sender.send(send_to_peers_ids)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,31 +5,37 @@
|
||||||
|
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
|
||||||
use tokio::sync::watch;
|
use tokio::sync::broadcast::{
|
||||||
|
self,
|
||||||
|
error::{RecvError, TryRecvError},
|
||||||
|
};
|
||||||
use tower::{timeout::Timeout, Service, ServiceExt};
|
use tower::{timeout::Timeout, Service, ServiceExt};
|
||||||
|
|
||||||
use zebra_chain::transaction::UnminedTxId;
|
use zebra_chain::transaction::UnminedTxId;
|
||||||
|
use zebra_network::MAX_TX_INV_IN_SENT_MESSAGE;
|
||||||
|
|
||||||
use zebra_network as zn;
|
use zebra_network as zn;
|
||||||
|
|
||||||
use crate::{components::sync::TIPS_RESPONSE_TIMEOUT, BoxError};
|
use crate::{components::sync::TIPS_RESPONSE_TIMEOUT, BoxError};
|
||||||
|
|
||||||
/// The maximum number of times we will delay sending because there is a new change.
|
/// The maximum number of channel messages we will combine into a single peer broadcast.
|
||||||
pub const MAX_CHANGES_BEFORE_SEND: usize = 10;
|
pub const MAX_CHANGES_BEFORE_SEND: usize = 10;
|
||||||
|
|
||||||
/// Runs continuously, gossiping new [`UnminedTxId`] to peers.
|
/// Runs continuously, gossiping new [`UnminedTxId`] to peers.
|
||||||
///
|
///
|
||||||
/// Broadcasts any [`UnminedTxId`] that gets stored in the mempool to all ready
|
/// Broadcasts any new [`UnminedTxId`]s that get stored in the mempool to multiple ready peers.
|
||||||
/// peers.
|
|
||||||
///
|
|
||||||
/// [`UnminedTxId`]: zebra_chain::transaction::UnminedTxId
|
|
||||||
pub async fn gossip_mempool_transaction_id<ZN>(
|
pub async fn gossip_mempool_transaction_id<ZN>(
|
||||||
mut receiver: watch::Receiver<HashSet<UnminedTxId>>,
|
mut receiver: broadcast::Receiver<HashSet<UnminedTxId>>,
|
||||||
broadcast_network: ZN,
|
broadcast_network: ZN,
|
||||||
) -> Result<(), BoxError>
|
) -> Result<(), BoxError>
|
||||||
where
|
where
|
||||||
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
|
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
|
||||||
ZN::Future: Send,
|
ZN::Future: Send,
|
||||||
{
|
{
|
||||||
|
let max_tx_inv_in_message: usize = MAX_TX_INV_IN_SENT_MESSAGE
|
||||||
|
.try_into()
|
||||||
|
.expect("constant fits in usize");
|
||||||
|
|
||||||
info!("initializing transaction gossip task");
|
info!("initializing transaction gossip task");
|
||||||
|
|
||||||
// use the same timeout as tips requests,
|
// use the same timeout as tips requests,
|
||||||
|
@ -39,29 +45,41 @@ where
|
||||||
loop {
|
loop {
|
||||||
let mut combined_changes = 1;
|
let mut combined_changes = 1;
|
||||||
|
|
||||||
// once we get new data in the channel, broadcast to peers,
|
// once we get new data in the channel, broadcast to peers
|
||||||
// the mempool automatically combines some transactions that arrive close together
|
//
|
||||||
receiver.changed().await?;
|
// the mempool automatically combines some transaction IDs that arrive close together,
|
||||||
let mut txs = receiver.borrow().clone();
|
// and this task also combines the changes that are in the channel before sending
|
||||||
tokio::task::yield_now().await;
|
let mut txs = loop {
|
||||||
|
match receiver.recv().await {
|
||||||
|
Ok(txs) => break txs,
|
||||||
|
Err(RecvError::Lagged(skip_count)) => info!(
|
||||||
|
?skip_count,
|
||||||
|
"dropped transactions before gossiping due to heavy mempool or network load"
|
||||||
|
),
|
||||||
|
Err(closed @ RecvError::Closed) => Err(closed)?,
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// also combine transactions that arrived shortly after this one
|
// also combine transaction IDs that arrived shortly after this one,
|
||||||
while receiver.has_changed()? && combined_changes < MAX_CHANGES_BEFORE_SEND {
|
// but limit the number of changes and the number of transaction IDs
|
||||||
// Correctness
|
// (the network layer handles the actual limits, this just makes sure the loop terminates)
|
||||||
// - set the has_changed() flag to false using borrow_and_update()
|
while combined_changes <= MAX_CHANGES_BEFORE_SEND && txs.len() < max_tx_inv_in_message {
|
||||||
// - clone() so we don't hold the watch channel lock while modifying txs
|
match receiver.try_recv() {
|
||||||
let extra_txs = receiver.borrow_and_update().clone();
|
Ok(extra_txs) => txs.extend(extra_txs.iter()),
|
||||||
txs.extend(extra_txs.iter());
|
Err(TryRecvError::Empty) => break,
|
||||||
|
Err(TryRecvError::Lagged(skip_count)) => info!(
|
||||||
|
?skip_count,
|
||||||
|
"dropped transactions before gossiping due to heavy mempool or network load"
|
||||||
|
),
|
||||||
|
Err(closed @ TryRecvError::Closed) => Err(closed)?,
|
||||||
|
}
|
||||||
|
|
||||||
combined_changes += 1;
|
combined_changes += 1;
|
||||||
|
|
||||||
tokio::task::yield_now().await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let txs_len = txs.len();
|
let txs_len = txs.len();
|
||||||
let request = zn::Request::AdvertiseTransactionIds(txs);
|
let request = zn::Request::AdvertiseTransactionIds(txs);
|
||||||
|
|
||||||
// TODO: rate-limit this info level log?
|
|
||||||
info!(%request, changes = %combined_changes, "sending mempool transaction broadcast");
|
info!(%request, changes = %combined_changes, "sending mempool transaction broadcast");
|
||||||
debug!(
|
debug!(
|
||||||
?request,
|
?request,
|
||||||
|
@ -73,5 +91,11 @@ where
|
||||||
let _ = broadcast_network.ready().await?.call(request).await;
|
let _ = broadcast_network.ready().await?.call(request).await;
|
||||||
|
|
||||||
metrics::counter!("mempool.gossiped.transactions.total", txs_len as u64);
|
metrics::counter!("mempool.gossiped.transactions.total", txs_len as u64);
|
||||||
|
|
||||||
|
// wait for at least the network timeout between gossips
|
||||||
|
//
|
||||||
|
// in practice, transactions arrive every 1-20 seconds,
|
||||||
|
// so waiting 6 seconds can delay transaction propagation, in order to reduce peer load
|
||||||
|
tokio::time::sleep(TIPS_RESPONSE_TIMEOUT).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -85,5 +85,11 @@ where
|
||||||
.map_err(PeerSetReadiness)?
|
.map_err(PeerSetReadiness)?
|
||||||
.call(request)
|
.call(request)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
// wait for at least the network timeout between gossips
|
||||||
|
//
|
||||||
|
// in practice, we expect blocks to arrive approximately every 75 seconds,
|
||||||
|
// so waiting 6 seconds won't make much difference
|
||||||
|
tokio::time::sleep(TIPS_RESPONSE_TIMEOUT).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue