fix(mempool): Stop ignoring some transaction broadcasts (#6230)

* Combine transaction IDs that arrive close together rather than overwriting them

* Reduce the size of transaction ID gossip logs

* Limit the maximum number of times we wait for new changes before sending

* Make logs even shorter

* Expand correctness comment

* Remove trailing space

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
teor 2023-03-02 14:56:24 +10:00 committed by GitHub
parent 639cc766eb
commit 60ebefc988
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 42 additions and 16 deletions

View File

@ -203,9 +203,9 @@ impl fmt::Display for Request {
Request::Ping(_) => "Ping".to_string(),
Request::BlocksByHash(hashes) => {
format!("BlocksByHash {{ hashes: {} }}", hashes.len())
format!("BlocksByHash({})", hashes.len())
}
Request::TransactionsById(ids) => format!("TransactionsById {{ ids: {} }}", ids.len()),
Request::TransactionsById(ids) => format!("TransactionsById({})", ids.len()),
Request::FindBlocks { known_blocks, stop } => format!(
"FindBlocks {{ known_blocks: {}, stop: {} }}",
@ -220,7 +220,7 @@ impl fmt::Display for Request {
Request::PushTransaction(_) => "PushTransaction".to_string(),
Request::AdvertiseTransactionIds(ids) => {
format!("AdvertiseTransactionIds {{ ids: {} }}", ids.len())
format!("AdvertiseTransactionIds({})", ids.len())
}
Request::AdvertiseBlock(_) => "AdvertiseBlock".to_string(),

View File

@ -450,6 +450,9 @@ impl Service<Request> for Mempool {
if !send_to_peers_ids.is_empty() {
tracing::trace!(?send_to_peers_ids, "sending new transactions to peers");
// TODO:
// - if the transaction gossip task is slow, we can overwrite unsent IDs here
// - does this happen often enough to be worth a fix?
self.transaction_sender.send(send_to_peers_ids)?;
}
}

View File

@ -3,18 +3,18 @@
//! This module is just a function [`gossip_mempool_transaction_id`] that waits for mempool
//! insertion events received in a channel and broadcasts the transactions to peers.
use tower::{timeout::Timeout, Service, ServiceExt};
use zebra_network as zn;
use tokio::sync::watch;
use zebra_chain::transaction::UnminedTxId;
use std::collections::HashSet;
use crate::BoxError;
use tokio::sync::watch;
use tower::{timeout::Timeout, Service, ServiceExt};
use crate::components::sync::TIPS_RESPONSE_TIMEOUT;
use zebra_chain::transaction::UnminedTxId;
use zebra_network as zn;
use crate::{components::sync::TIPS_RESPONSE_TIMEOUT, BoxError};
/// The maximum number of times we will delay sending because there is a new change.
pub const MAX_CHANGES_BEFORE_SEND: usize = 10;
/// Runs continuously, gossiping new [`UnminedTxId`] to peers.
///
@ -37,14 +37,37 @@ where
let mut broadcast_network = Timeout::new(broadcast_network, TIPS_RESPONSE_TIMEOUT);
loop {
// once we get new data in the channel, broadcast to peers
receiver.changed().await?;
let mut combined_changes = 1;
// once we get new data in the channel, broadcast to peers,
// the mempool automatically combines some transactions that arrive close together
receiver.changed().await?;
let mut txs = receiver.borrow().clone();
tokio::task::yield_now().await;
// also combine transactions that arrived shortly after this one
while receiver.has_changed()? && combined_changes < MAX_CHANGES_BEFORE_SEND {
// Correctness
// - set the has_changed() flag to false using borrow_and_update()
// - clone() so we don't hold the watch channel lock while modifying txs
let extra_txs = receiver.borrow_and_update().clone();
txs.extend(extra_txs.iter());
combined_changes += 1;
tokio::task::yield_now().await;
}
let txs = receiver.borrow().clone();
let txs_len = txs.len();
let request = zn::Request::AdvertiseTransactionIds(txs);
info!(?request, "sending mempool transaction broadcast");
// TODO: rate-limit this info level log?
info!(%request, changes = %combined_changes, "sending mempool transaction broadcast");
debug!(
?request,
changes = ?combined_changes,
"full list of mempool transactions in broadcast"
);
// broadcast requests don't return errors, and we'd just want to ignore them anyway
let _ = broadcast_network.ready().await?.call(request).await;