Cache unsolicited address messages, and provide them to Zebra when requested (#3294)

This commit is contained in:
teor 2022-01-06 06:55:59 +10:00 committed by GitHub
parent 469fa6b917
commit 144c532de4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 103 additions and 42 deletions

View File

@ -26,6 +26,7 @@ use zebra_chain::{
use crate::{
constants,
meta_addr::MetaAddr,
peer::{
error::AlreadyErrored, ClientRequestReceiver, ErrorSlot, InProgressClientRequest,
MustUseOneshotSender, PeerError, SharedPeerError,
@ -420,6 +421,15 @@ pub struct Connection<S, Tx> {
/// other state handling.
pub(super) request_timer: Option<Pin<Box<Sleep>>>,
/// A cached copy of the last unsolicited `addr` or `addrv2` message from this peer.
///
/// When Zebra requests peers, the cache is consumed and returned as a synthetic response.
/// This works around `zcashd`'s address response rate-limit.
///
/// Multi-peer `addr` or `addrv2` messages replace single-peer messages in the cache.
/// (`zcashd` also gossips its own address at regular intervals.)
pub(super) cached_addrs: Vec<MetaAddr>,
/// The `inbound` service, used to answer requests from this connection's peer.
pub(super) svc: S,
@ -548,6 +558,52 @@ where
}
}
}
// Check whether the handler is finished before waiting for a response message,
// because the response might be `Nil` or synthetic.
State::AwaitingResponse {
handler: Handler::Finished(_),
ref span,
..
} => {
// We have to get rid of the span reference so we can tamper with the state.
let span = span.clone();
trace!(
parent: &span,
"returning completed response to client request"
);
// Replace the state with a temporary value,
// so we can take ownership of the response sender.
let tmp_state = std::mem::replace(&mut self.state, State::Failed);
if let State::AwaitingResponse {
handler: Handler::Finished(response),
tx,
..
} = tmp_state
{
if let Ok(response) = response.as_ref() {
debug!(%response, "finished receiving peer response to Zebra request");
// Add a metric for inbound responses to outbound requests.
metrics::counter!(
"zebra.net.in.responses",
1,
"command" => response.command(),
"addr" => self.metrics_label.clone(),
);
} else {
debug!(error = ?response, "error in peer response to Zebra request");
}
let _ = tx.send(response.map_err(Into::into));
} else {
unreachable!("already checked for AwaitingResponse");
}
self.state = State::AwaitingRequest;
}
// We're awaiting a response to a client request,
// so wait on either a peer message, or on a request cancellation.
State::AwaitingResponse {
@ -600,45 +656,6 @@ where
self.update_state_metrics(None);
// Check whether the handler is finished processing messages,
// and update the state.
// (Some messages can indicate that a response has finished,
// even if the message wasn't consumed as a response or a request.)
//
// Replace the state with a temporary value,
// so we can take ownership of the response sender.
self.state = match std::mem::replace(&mut self.state, State::Failed) {
State::AwaitingResponse {
handler: Handler::Finished(response),
tx,
..
} => {
if let Ok(response) = response.as_ref() {
debug!(%response, "finished receiving peer response to Zebra request");
// Add a metric for inbound responses to outbound requests.
metrics::counter!(
"zebra.net.in.responses",
1,
"command" => response.command(),
"addr" => self.metrics_label.clone(),
);
} else {
debug!(error = ?response, "error in peer response to Zebra request");
}
let _ = tx.send(response.map_err(Into::into));
State::AwaitingRequest
}
pending @ State::AwaitingResponse { .. } =>
pending
,
_ => unreachable!(
"unexpected failed connection state while AwaitingResponse: client_receiver: {:?}",
self.client_rx
),
};
self.update_state_metrics(None);
// If the message was not consumed as a response,
// check whether it can be handled as a request.
let unused_msg = if let Some(request_msg) = request_msg {
@ -695,6 +712,7 @@ where
}
}
}
// This connection has failed: stop the event loop, and complete the future.
State::Failed => break,
}
@ -723,7 +741,7 @@ where
self.shutdown(error);
}
/// Handle an incoming client request, possibly generating outgoing messages to the
/// Handle an internal client request, possibly generating outgoing messages to the
/// remote peer.
///
/// NOTE: the caller should use .instrument(msg.span) to instrument the function.
@ -772,6 +790,25 @@ where
pending,
self.client_rx
),
// Consume the cached addresses from the peer,
// to work-around a `zcashd` response rate-limit
(AwaitingRequest, Peers) if !self.cached_addrs.is_empty() => {
let cached_addrs = std::mem::take(&mut self.cached_addrs);
debug!(
addrs = cached_addrs.len(),
"responding to Peers request using cached addresses",
);
Ok((
AwaitingResponse {
handler: Handler::Finished(Ok(Response::Peers(cached_addrs))),
tx,
span,
},
None,
))}
,
(AwaitingRequest, Peers) => match self.peer_tx.send(Message::GetAddr).await {
Ok(()) => Ok((
AwaitingResponse {
@ -783,6 +820,7 @@ where
)),
Err(e) => Err((e, tx)),
},
(AwaitingRequest, Ping(nonce)) => match self.peer_tx.send(Message::Ping(nonce)).await {
Ok(()) => Ok((
AwaitingResponse {
@ -1012,8 +1050,23 @@ where
}
// Zebra crawls the network proactively, to prevent
// peers from inserting data into our address book.
Message::Addr(_) => {
debug!(%msg, "ignoring unsolicited addr message");
Message::Addr(ref addrs) => {
// Workaround `zcashd`'s `getaddr` response rate-limit
if addrs.len() > 1 {
// Always refresh the cache with multi-addr messages.
debug!(%msg, "caching unsolicited multi-addr message");
self.cached_addrs = addrs.clone();
} else if addrs.len() == 1 && self.cached_addrs.len() <= 1 {
// Only refresh a cached single addr message with another single addr.
// (`zcashd` regularly advertises its own address.)
debug!(%msg, "caching unsolicited single addr message");
self.cached_addrs = addrs.clone();
} else {
debug!(
%msg,
"ignoring unsolicited single addr message: already cached a multi-addr message"
);
}
None
}
Message::Tx(ref transaction) => Some(Request::PushTransaction(transaction.clone())),

View File

@ -43,6 +43,7 @@ async fn connection_run_loop_ok() {
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
@ -103,6 +104,7 @@ async fn connection_run_loop_future_drop() {
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
@ -152,6 +154,7 @@ async fn connection_run_loop_client_close() {
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
@ -208,6 +211,7 @@ async fn connection_run_loop_client_drop() {
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
@ -263,6 +267,7 @@ async fn connection_run_loop_inbound_close() {
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
@ -319,6 +324,7 @@ async fn connection_run_loop_inbound_drop() {
let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
@ -379,6 +385,7 @@ async fn connection_run_loop_failed() {
let connection = Connection {
state: State::Failed,
request_timer: None,
cached_addrs: Vec::new(),
svc: unused_inbound_service,
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),

View File

@ -915,6 +915,7 @@ where
let server = Connection {
state: connection::State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: inbound_service,
client_rx: server_rx.into(),
error_slot: error_slot.clone(),