1. Fix some address crawler timing issues (#3293)

* Stop holding completed messages until the next inbound message

* Add more info to network message block download debug logs

* Simplify address metrics logs

* Try handling inbound messages as responses, then try as a new request

* Improve address book logging

* Fix a race between the first heartbeat and getaddr requests

* Temporarily reduce the getaddr fanout to 1

* Update metrics when exiting the Connection run loop

* Downgrade some debug logs to trace
This commit is contained in:
teor 2022-01-05 09:43:30 +10:00 committed by GitHub
parent 9b127168eb
commit 469fa6b917
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 207 additions and 105 deletions

View File

@ -279,6 +279,7 @@ impl AddressBook {
?previous,
total_peers = self.by_addr.len(),
recent_peers = self.recently_live_peers(chrono_now).count(),
"calculated updated address book entry",
);
if let Some(updated) = updated {
@ -303,6 +304,15 @@ impl AddressBook {
self.by_addr.insert(updated.addr, updated);
debug!(
?change,
?updated,
?previous,
total_peers = self.by_addr.len(),
recent_peers = self.recently_live_peers(chrono_now).count(),
"updated address book entry",
);
// Security: Limit the number of peers in the address book.
//
// We only delete outdated peers when we have too many peers.
@ -317,6 +327,14 @@ impl AddressBook {
.expect("just checked there is at least one peer");
self.by_addr.remove(&surplus_peer.addr);
debug!(
surplus = ?surplus_peer,
?updated,
total_peers = self.by_addr.len(),
recent_peers = self.recently_live_peers(chrono_now).count(),
"removed surplus address book entry",
);
}
assert!(self.len() <= self.addr_limit);

View File

@ -49,14 +49,17 @@ impl AddressBookUpdater {
// based on the maximum number of inbound and outbound peers.
let (worker_tx, mut worker_rx) = mpsc::channel(config.peerset_total_connection_limit());
let address_book =
AddressBook::new(local_listener, span!(Level::TRACE, "address book updater"));
let address_book = AddressBook::new(local_listener, span!(Level::TRACE, "address book"));
let address_metrics = address_book.address_metrics_watcher();
let address_book = Arc::new(std::sync::Mutex::new(address_book));
let worker_address_book = address_book.clone();
let worker = move || {
info!("starting the address book updater");
while let Some(event) = worker_rx.blocking_recv() {
trace!(?event, "got address book change");
// # Correctness
//
// Briefly hold the address book threaded mutex, to update the
@ -67,7 +70,9 @@ impl AddressBookUpdater {
.update(event);
}
Err(AllAddressBookUpdaterSendersClosed.into())
let error = Err(AllAddressBookUpdaterSendersClosed.into());
info!(?error, "stopping address book updater");
error
};
// Correctness: spawn address book accesses on a blocking thread,

View File

@ -133,14 +133,19 @@ pub const PEER_GET_ADDR_TIMEOUT: Duration = Duration::from_secs(8);
/// The number of GetAddr requests sent when crawling for new peers.
///
/// ## SECURITY
/// # Security
///
/// The fanout should be greater than 2, so that Zebra avoids getting a majority
/// of its initial address book entries from a single peer.
///
/// Zebra regularly crawls for new peers, initiating a new crawl every
/// [`crawl_new_peer_interval`](crate::config::Config.crawl_new_peer_interval).
pub const GET_ADDR_FANOUT: usize = 3;
///
/// TODO: Restore the fanout to 3, once fanouts are limited to the number of ready peers (#2214)
///
/// In #3110, we changed the fanout to 1, to make sure we actually use cached address responses.
/// With a fanout of 3, we were dropping a lot of responses, because the overall crawl timed out.
pub const GET_ADDR_FANOUT: usize = 1;
/// The maximum number of addresses allowed in an `addr` or `addrv2` message.
///

View File

@ -525,7 +525,14 @@ where
}
Either::Left((Some(Err(e)), _)) => self.fail_with(e),
Either::Left((Some(Ok(msg)), _)) => {
self.handle_message_as_request(msg).await
let unhandled_msg = self.handle_message_as_request(msg).await;
if let Some(unhandled_msg) = unhandled_msg {
debug!(
%unhandled_msg,
"ignoring unhandled request while awaiting a request"
);
}
}
Either::Right((None, _)) => {
trace!("client_rx closed, ending connection");
@ -593,32 +600,19 @@ where
self.update_state_metrics(None);
// # Correctness
// Check whether the handler is finished processing messages,
// and update the state.
// (Some messages can indicate that a response has finished,
// even if the message wasn't consumed as a response or a request.)
//
// Handle any unsolicited messages first, to clear the queue.
// Then check for responses to our request messages.
//
// This significantly reduces our message failure rate.
// (Otherwise, every unsolicited message can disrupt our pending request.)
// If the message was not consumed, check whether it
// should be handled as a request.
if let Some(msg) = request_msg {
// do NOT instrument with the request span, this is
// independent work
self.handle_message_as_request(msg).await;
} else {
// Otherwise, check whether the handler is finished
// processing messages and update the state.
//
// Replace the state with a temporary value,
// so we can take ownership of the response sender.
self.state = match std::mem::replace(&mut self.state, State::Failed) {
State::AwaitingResponse {
handler: Handler::Finished(response),
tx,
..
} => {
// Replace the state with a temporary value,
// so we can take ownership of the response sender.
self.state = match std::mem::replace(&mut self.state, State::Failed) {
State::AwaitingResponse {
handler: Handler::Finished(response),
tx,
..
} => {
if let Ok(response) = response.as_ref() {
debug!(%response, "finished receiving peer response to Zebra request");
// Add a metric for inbound responses to outbound requests.
@ -631,27 +625,36 @@ where
} else {
debug!(error = ?response, "error in peer response to Zebra request");
}
let _ = tx.send(response.map_err(Into::into));
State::AwaitingRequest
}
pending @ State::AwaitingResponse { .. } => {
// Drop the new request message from the remote peer,
// because we can't process multiple requests at the same time.
debug!(
new_request = %request_msg
.as_ref()
.map(|m| m.to_string())
.unwrap_or_else(|| "None".into()),
awaiting_response = %pending,
"ignoring new request while awaiting a response"
);
pending
},
_ => unreachable!(
"unexpected failed connection state while AwaitingResponse: client_receiver: {:?}",
self.client_rx
),
};
let _ = tx.send(response.map_err(Into::into));
State::AwaitingRequest
}
pending @ State::AwaitingResponse { .. } =>
pending
,
_ => unreachable!(
"unexpected failed connection state while AwaitingResponse: client_receiver: {:?}",
self.client_rx
),
};
self.update_state_metrics(None);
// If the message was not consumed as a response,
// check whether it can be handled as a request.
let unused_msg = if let Some(request_msg) = request_msg {
// do NOT instrument with the request span, this is
// independent work
self.handle_message_as_request(request_msg).await
} else {
None
};
if let Some(unused_msg) = unused_msg {
debug!(
%unused_msg,
%self.state,
"ignoring peer message: not a response or a request",
);
}
}
Either::Left((Either::Right(_), _peer_fut)) => {
@ -697,10 +700,13 @@ where
}
}
let error = self.error_slot.try_get_error();
assert!(
self.error_slot.try_get_error().is_some(),
error.is_some(),
"closing connections must call fail_with() or shutdown() to set the error slot"
);
self.update_state_metrics(error.expect("checked is_some").to_string());
}
/// Fail this connection.
@ -937,12 +943,15 @@ where
};
}
/// Handle `msg` as a request from a peer to this Zebra instance.
///
/// If the message is not handled, it is returned.
// This function has its own span, because we're creating a new work
// context (namely, the work of processing the inbound msg as a request)
#[instrument(name = "msg_as_req", skip(self, msg), fields(%msg))]
async fn handle_message_as_request(&mut self, msg: Message) {
#[instrument(name = "msg_as_req", skip(self, msg), fields(msg = msg.command()))]
async fn handle_message_as_request(&mut self, msg: Message) -> Option<Message> {
trace!(?msg);
debug!(state = %self.state, %msg, "received peer request to Zebra");
debug!(state = %self.state, %msg, "received inbound peer message");
self.update_state_metrics(format!("In::Msg::{}", msg.command()));
@ -952,40 +961,40 @@ where
if let Err(e) = self.peer_tx.send(Message::Pong(nonce)).await {
self.fail_with(e);
}
return;
None
}
// These messages shouldn't be sent outside of a handshake.
Message::Version { .. } => {
self.fail_with(PeerError::DuplicateHandshake);
return;
None
}
Message::Verack { .. } => {
self.fail_with(PeerError::DuplicateHandshake);
return;
None
}
// These messages should already be handled as a response if they
// could be a response, so if we see them here, they were either
// sent unsolicited, or they were sent in response to a canceled request
// that we've already forgotten about.
Message::Reject { .. } => {
tracing::debug!("got reject message unsolicited or from canceled request");
return;
debug!(%msg, "got reject message unsolicited or from canceled request");
None
}
Message::NotFound { .. } => {
tracing::debug!("got notfound message unsolicited or from canceled request");
return;
debug!(%msg, "got notfound message unsolicited or from canceled request");
None
}
Message::Pong(_) => {
tracing::debug!("got pong message unsolicited or from canceled request");
return;
debug!(%msg, "got pong message unsolicited or from canceled request");
None
}
Message::Block(_) => {
tracing::debug!("got block message unsolicited or from canceled request");
return;
debug!(%msg, "got block message unsolicited or from canceled request");
None
}
Message::Headers(_) => {
tracing::debug!("got headers message unsolicited or from canceled request");
return;
debug!(%msg, "got headers message unsolicited or from canceled request");
None
}
// These messages should never be sent by peers.
Message::FilterLoad { .. }
@ -998,45 +1007,45 @@ where
//
// Since we can't verify their source, Zebra needs to ignore unexpected messages,
// because closing the connection could cause a denial of service or eclipse attack.
debug!("got BIP111 message without advertising NODE_BLOOM");
return;
debug!(%msg, "got BIP111 message without advertising NODE_BLOOM");
None
}
// Zebra crawls the network proactively, to prevent
// peers from inserting data into our address book.
Message::Addr(_) => {
trace!("ignoring unsolicited addr message");
return;
debug!(%msg, "ignoring unsolicited addr message");
None
}
Message::Tx(transaction) => Request::PushTransaction(transaction),
Message::Inv(items) => match &items[..] {
Message::Tx(ref transaction) => Some(Request::PushTransaction(transaction.clone())),
Message::Inv(ref items) => match &items[..] {
// We don't expect to be advertised multiple blocks at a time,
// so we ignore any advertisements of multiple blocks.
[InventoryHash::Block(hash)] => Request::AdvertiseBlock(*hash),
[InventoryHash::Block(hash)] => Some(Request::AdvertiseBlock(*hash)),
// Some peers advertise invs with mixed item types.
// But we're just interested in the transaction invs.
//
// TODO: split mixed invs into multiple requests,
// but skip runs of multiple blocks.
tx_ids if tx_ids.iter().any(|item| item.unmined_tx_id().is_some()) => {
Request::AdvertiseTransactionIds(transaction_ids(&items).collect())
}
tx_ids if tx_ids.iter().any(|item| item.unmined_tx_id().is_some()) => Some(
Request::AdvertiseTransactionIds(transaction_ids(items).collect()),
),
// Log detailed messages for ignored inv advertisement messages.
[] => {
debug!("ignoring empty inv");
return;
debug!(%msg, "ignoring empty inv");
None
}
[InventoryHash::Block(_), InventoryHash::Block(_), ..] => {
debug!("ignoring inv with multiple blocks");
return;
debug!(%msg, "ignoring inv with multiple blocks");
None
}
_ => {
debug!("ignoring inv with no transactions");
return;
debug!(%msg, "ignoring inv with no transactions");
None
}
},
Message::GetData(items) => match &items[..] {
Message::GetData(ref items) => match &items[..] {
// Some peers advertise invs with mixed item types.
// So we suspect they might do the same with getdata.
//
@ -1050,31 +1059,47 @@ where
.iter()
.any(|item| matches!(item, InventoryHash::Block(_))) =>
{
Request::BlocksByHash(block_hashes(&items).collect())
Some(Request::BlocksByHash(block_hashes(items).collect()))
}
tx_ids if tx_ids.iter().any(|item| item.unmined_tx_id().is_some()) => {
Request::TransactionsById(transaction_ids(&items).collect())
Some(Request::TransactionsById(transaction_ids(items).collect()))
}
// Log detailed messages for ignored getdata request messages.
[] => {
debug!("ignoring empty getdata");
return;
debug!(%msg, "ignoring empty getdata");
None
}
_ => {
debug!("ignoring getdata with no blocks or transactions");
return;
debug!(%msg, "ignoring getdata with no blocks or transactions");
None
}
},
Message::GetAddr => Request::Peers,
Message::GetBlocks { known_blocks, stop } => Request::FindBlocks { known_blocks, stop },
Message::GetHeaders { known_blocks, stop } => {
Request::FindHeaders { known_blocks, stop }
}
Message::Mempool => Request::MempoolTransactionIds,
Message::GetAddr => Some(Request::Peers),
Message::GetBlocks {
ref known_blocks,
stop,
} => Some(Request::FindBlocks {
known_blocks: known_blocks.clone(),
stop,
}),
Message::GetHeaders {
ref known_blocks,
stop,
} => Some(Request::FindHeaders {
known_blocks: known_blocks.clone(),
stop,
}),
Message::Mempool => Some(Request::MempoolTransactionIds),
};
self.drive_peer_request(req).await
if let Some(req) = req {
self.drive_peer_request(req).await;
None
} else {
// return the unused message
Some(msg)
}
}
/// Given a `req` originating from the peer, drive it to completion and send

View File

@ -11,7 +11,12 @@ use std::{
use chrono::{TimeZone, Utc};
use futures::{channel::oneshot, future, FutureExt, SinkExt, StreamExt};
use tokio::{net::TcpStream, sync::broadcast, task::JoinError, time::timeout};
use tokio::{
net::TcpStream,
sync::broadcast,
task::JoinError,
time::{timeout, Instant},
};
use tokio_stream::wrappers::IntervalStream;
use tokio_util::codec::Framed;
use tower::Service;
@ -978,8 +983,16 @@ async fn send_periodic_heartbeats(
) {
use futures::future::Either;
let mut interval_stream =
IntervalStream::new(tokio::time::interval(constants::HEARTBEAT_INTERVAL));
// Don't send the first heartbeat immediately - we've just completed the handshake!
let mut interval = tokio::time::interval_at(
Instant::now() + constants::HEARTBEAT_INTERVAL,
constants::HEARTBEAT_INTERVAL,
);
// If the heartbeat is delayed, also delay all future heartbeats.
// (Shorter heartbeat intervals just add load, without any benefit.)
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let mut interval_stream = IntervalStream::new(interval);
loop {
let shutdown_rx_ref = Pin::new(&mut shutdown_rx);

View File

@ -650,6 +650,12 @@ where
// - use the `select!` macro for all actions, because the `select` function
// is biased towards the first ready future
info!(
crawl_new_peer_interval = ?config.crawl_new_peer_interval,
outbound_connections = ?active_outbound_connections.update_count(),
"starting the peer address crawler",
);
let mut handshakes = FuturesUnordered::new();
// <FuturesUnordered as Stream> returns None when empty.
// Keeping an unresolved future in the pool means the stream

View File

@ -777,7 +777,7 @@ where
self.last_peer_log = Some(Instant::now());
let address_metrics = self.address_metrics.borrow();
let address_metrics = *self.address_metrics.borrow();
if unready_services_len == 0 {
warn!(
?address_metrics,
@ -804,7 +804,7 @@ where
// Security: make sure we haven't exceeded the connection limit
if num_peers > self.peerset_total_connection_limit {
let address_metrics = self.address_metrics.borrow();
let address_metrics = *self.address_metrics.borrow();
panic!(
"unexpectedly exceeded configured peer set connection limit: \n\
peers: {:?}, ready: {:?}, unready: {:?}, \n\

View File

@ -388,8 +388,10 @@ impl fmt::Display for Message {
user_agent,
),
Message::Verack => "verack".to_string(),
Message::Ping(_) => "ping".to_string(),
Message::Pong(_) => "pong".to_string(),
Message::Reject {
message,
reason,
@ -401,25 +403,39 @@ impl fmt::Display for Message {
reason,
if data.is_some() { "Some" } else { "None" },
),
Message::GetAddr => "getaddr".to_string(),
Message::Addr(addrs) => format!("addr {{ addrs: {} }}", addrs.len()),
Message::GetBlocks { known_blocks, stop } => format!(
"getblocks {{ known_blocks: {}, stop: {} }}",
known_blocks.len(),
if stop.is_some() { "Some" } else { "None" },
),
Message::Inv(invs) => format!("inv {{ invs: {} }}", invs.len()),
Message::GetHeaders { known_blocks, stop } => format!(
"getheaders {{ known_blocks: {}, stop: {} }}",
known_blocks.len(),
if stop.is_some() { "Some" } else { "None" },
),
Message::Headers(headers) => format!("headers {{ headers: {} }}", headers.len()),
Message::GetData(invs) => format!("getdata {{ invs: {} }}", invs.len()),
Message::Block(_) => "block".to_string(),
Message::Block(block) => format!(
"block {{ height: {}, hash: {} }}",
block
.coinbase_height()
.as_ref()
.map(|h| h.0.to_string())
.unwrap_or_else(|| "None".into()),
block.hash(),
),
Message::Tx(_) => "tx".to_string(),
Message::NotFound(invs) => format!("notfound {{ invs: {} }}", invs.len()),
Message::Mempool => "mempool".to_string(),
Message::FilterLoad { .. } => "filterload".to_string(),
Message::FilterAdd { .. } => "filteradd".to_string(),
Message::FilterClear => "filterclear".to_string(),

View File

@ -79,7 +79,21 @@ impl fmt::Display for Response {
Response::Peers(peers) => format!("Peers {{ peers: {} }}", peers.len()),
// Display heights for single-block responses (which Zebra requests and expects)
Response::Blocks(blocks) if blocks.len() == 1 => {
let block = blocks.first().expect("len is 1");
format!(
"Block {{ height: {}, hash: {} }}",
block
.coinbase_height()
.as_ref()
.map(|h| h.0.to_string())
.unwrap_or_else(|| "None".into()),
block.hash(),
)
}
Response::Blocks(blocks) => format!("Blocks {{ blocks: {} }}", blocks.len()),
Response::BlockHashes(hashes) => format!("BlockHashes {{ hashes: {} }}", hashes.len()),
Response::BlockHeaders(headers) => {
format!("BlockHeaders {{ headers: {} }}", headers.len())