Add diagnostics for peer set hangs (#3203)

* Use a named CancelHeartbeatTask unit struct for the channel type

* Prefer cancel handles in selects, if both are ready

* Fix message metrics to just show the command name

* Add metrics for internal requests and responses

* Add internal requests and responses to the messages dashboard

* Add a canceled metric, and peer addresses to request and response metrics

* Add a canceled messages graph

* Add connection state metrics for currently open connections

* Fix the connection state graph with new metrics

* Always send an error before dropping pending responses

* Move error detail logging into `fail_with`

* Delete an unused timer future

* Make error strings in metrics less verbose

* Downgrade some error logs to info

* Remove a redundant expect

* Avoid unnecessary allocations for connection state metrics

* Fix missed updates to mempool and block gossip metrics
This commit is contained in:
teor 2021-12-15 07:11:03 +10:00 committed by GitHub
parent 062b98ce61
commit 1835ec2c8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1862 additions and 531 deletions

File diff suppressed because it is too large Load Diff

View File

@ -16,7 +16,7 @@
"gnetId": null,
"graphTooltip": 0,
"id": 6,
"iteration": 1629935221644,
"iteration": 1639360549666,
"links": [],
"panels": [
{
@ -33,11 +33,332 @@
"fillGradient": 0,
"gridPos": {
"h": 9,
"w": 12,
"w": 24,
"x": 0,
"y": 0
},
"hiddenSeries": false,
"id": 12,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.7",
"pointradius": 2,
"points": false,
"renderer": "flot",
"repeat": "job",
"scopedVars": {
"job": {
"selected": true,
"text": "zebrad-mainnet",
"value": "zebrad-mainnet"
}
},
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "sum by (command) (zebra_net_in_requests{job=\"$job\"})",
"interval": "",
"legendFormat": "Req::{{command}}",
"refId": "A"
},
{
"exemplar": true,
"expr": "sum by (command) (zebra_net_out_responses{job=\"$job\"})",
"hide": false,
"interval": "",
"legendFormat": "Rsp::{{command}}",
"refId": "B"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "inbound requests & responses - $job",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"$$hashKey": "object:65",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"$$hashKey": "object:66",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": null,
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 9,
"w": 24,
"x": 0,
"y": 9
},
"hiddenSeries": false,
"id": 13,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.7",
"pointradius": 2,
"points": false,
"renderer": "flot",
"repeat": "job",
"scopedVars": {
"job": {
"selected": true,
"text": "zebrad-mainnet",
"value": "zebrad-mainnet"
}
},
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "sum by (command) (zebra_net_out_requests{job=\"$job\"})",
"interval": "",
"legendFormat": "Req::{{command}}",
"refId": "A"
},
{
"exemplar": true,
"expr": "sum by (command) (zebra_net_in_responses{job=\"$job\"})",
"hide": false,
"interval": "",
"legendFormat": "Rsp::{{command}}",
"refId": "B"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "outbound requests & responses - $job",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"$$hashKey": "object:65",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"$$hashKey": "object:66",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": null,
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 9,
"w": 24,
"x": 0,
"y": 18
},
"hiddenSeries": false,
"id": 14,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.7",
"pointradius": 2,
"points": false,
"renderer": "flot",
"repeat": "job",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "sum by (command) (zebra_net_out_requests_canceled{job=\"$job\"})",
"interval": "",
"legendFormat": "Req::{{command}}",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "canceled outbound requests - $job",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"$$hashKey": "object:65",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"$$hashKey": "object:66",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": null,
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 9,
"w": 24,
"x": 0,
"y": 27
},
"hiddenSeries": false,
"id": 2,
"legend": {
"avg": false,
@ -62,7 +383,7 @@
"repeat": "job",
"scopedVars": {
"job": {
"selected": false,
"selected": true,
"text": "zebrad-mainnet",
"value": "zebrad-mainnet"
}
@ -137,114 +458,9 @@
"fillGradient": 0,
"gridPos": {
"h": 9,
"w": 12,
"x": 12,
"y": 0
},
"hiddenSeries": false,
"id": 12,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.7",
"pointradius": 2,
"points": false,
"renderer": "flot",
"repeatIteration": 1629935221644,
"repeatPanelId": 2,
"scopedVars": {
"job": {
"selected": false,
"text": "zebrad-testnet",
"value": "zebrad-testnet"
}
},
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "sum by (command) (zcash_net_in_messages{job=\"$job\"})",
"interval": "",
"legendFormat": "{{command}}",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "inbound message types - $job",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"$$hashKey": "object:65",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"$$hashKey": "object:66",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": null,
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 9,
"w": 12,
"w": 24,
"x": 0,
"y": 9
"y": 36
},
"hiddenSeries": false,
"id": 4,
@ -271,7 +487,7 @@
"repeat": "job",
"scopedVars": {
"job": {
"selected": false,
"selected": true,
"text": "zebrad-mainnet",
"value": "zebrad-mainnet"
}
@ -346,114 +562,9 @@
"fillGradient": 0,
"gridPos": {
"h": 9,
"w": 12,
"x": 12,
"y": 9
},
"hiddenSeries": false,
"id": 13,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.7",
"pointradius": 2,
"points": false,
"renderer": "flot",
"repeatIteration": 1629935221644,
"repeatPanelId": 4,
"scopedVars": {
"job": {
"selected": false,
"text": "zebrad-testnet",
"value": "zebrad-testnet"
}
},
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "sum by (command) (zcash_net_out_messages{job=\"$job\"})",
"interval": "",
"legendFormat": "{{command}}",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "outbound message types - $job",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"$$hashKey": "object:65",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"$$hashKey": "object:66",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": null,
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 9,
"w": 12,
"w": 24,
"x": 0,
"y": 18
"y": 45
},
"hiddenSeries": false,
"id": 7,
@ -480,7 +591,7 @@
"repeat": "job",
"scopedVars": {
"job": {
"selected": false,
"selected": true,
"text": "zebrad-mainnet",
"value": "zebrad-mainnet"
}
@ -555,114 +666,9 @@
"fillGradient": 0,
"gridPos": {
"h": 9,
"w": 12,
"x": 12,
"y": 18
},
"hiddenSeries": false,
"id": 14,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.7",
"pointradius": 2,
"points": false,
"renderer": "flot",
"repeatIteration": 1629935221644,
"repeatPanelId": 7,
"scopedVars": {
"job": {
"selected": false,
"text": "zebrad-testnet",
"value": "zebrad-testnet"
}
},
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "sum by (addr) (zcash_net_in_messages{job=\"$job\"})",
"interval": "",
"legendFormat": "{{command}}",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "inbound message peers - $job",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"$$hashKey": "object:65",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"$$hashKey": "object:66",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": null,
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 9,
"w": 12,
"w": 24,
"x": 0,
"y": 27
"y": 54
},
"hiddenSeries": false,
"id": 11,
@ -689,7 +695,7 @@
"repeat": "job",
"scopedVars": {
"job": {
"selected": false,
"selected": true,
"text": "zebrad-mainnet",
"value": "zebrad-mainnet"
}
@ -749,111 +755,6 @@
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": null,
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 9,
"w": 12,
"x": 12,
"y": 27
},
"hiddenSeries": false,
"id": 15,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.7",
"pointradius": 2,
"points": false,
"renderer": "flot",
"repeatIteration": 1629935221644,
"repeatPanelId": 11,
"scopedVars": {
"job": {
"selected": false,
"text": "zebrad-testnet",
"value": "zebrad-testnet"
}
},
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "sum by (addr) (zcash_net_out_messages{job=\"$job\"})",
"interval": "",
"legendFormat": "{{command}}",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "outbound message peers - $job",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"$$hashKey": "object:65",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"$$hashKey": "object:66",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"schemaVersion": 27,
@ -866,10 +767,10 @@
"current": {
"selected": false,
"text": [
"All"
"zebrad-mainnet"
],
"value": [
"$__all"
"zebrad-mainnet"
]
},
"datasource": null,
@ -899,12 +800,12 @@
]
},
"time": {
"from": "now-6h",
"from": "now-5m",
"to": "now"
},
"timepicker": {},
"timezone": "",
"title": "network messages",
"uid": "YQ3yxiVnk",
"version": 6
"version": 9
}

View File

@ -18,7 +18,7 @@ mod minimum_peer_version;
#[cfg(not(test))]
use client::ClientRequest;
#[cfg(test)]
pub(crate) use client::ClientRequest;
pub(crate) use client::{CancelHeartbeatTask, ClientRequest};
use client::{ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender};

View File

@ -22,7 +22,7 @@ use super::{ErrorSlot, PeerError, SharedPeerError};
pub struct Client {
/// Used to shut down the corresponding heartbeat.
/// This is always Some except when we take it on drop.
pub(crate) shutdown_tx: Option<oneshot::Sender<()>>,
pub(crate) shutdown_tx: Option<oneshot::Sender<CancelHeartbeatTask>>,
/// Used to send [`Request`]s to the remote peer.
pub(crate) server_tx: mpsc::Sender<ClientRequest>,
@ -36,6 +36,13 @@ pub struct Client {
pub(crate) version: Version,
}
/// A signal sent by the [`Client`] half of a peer connection,
/// to cancel a [`Client`]'s heartbeat task.
///
/// When it receives this signal, the heartbeat task exits.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub struct CancelHeartbeatTask;
/// A message from the `peer::Client` to the `peer::Server`.
#[derive(Debug)]
pub(crate) struct ClientRequest {
@ -294,6 +301,6 @@ impl Drop for Client {
.shutdown_tx
.take()
.expect("must not drop twice")
.send(());
.send(CancelHeartbeatTask);
}
}

View File

@ -7,7 +7,7 @@
//! And it's unclear if these assumptions match the `zcashd` implementation.
//! It should be refactored into a cleaner set of request/response pairs (#1515).
use std::{collections::HashSet, fmt, pin::Pin, sync::Arc};
use std::{borrow::Cow, collections::HashSet, fmt, pin::Pin, sync::Arc};
use futures::{
future::{self, Either},
@ -27,8 +27,8 @@ use zebra_chain::{
use crate::{
constants,
peer::{
ClientRequestReceiver, ErrorSlot, InProgressClientRequest, MustUseOneshotSender, PeerError,
SharedPeerError,
error::AlreadyErrored, ClientRequestReceiver, ErrorSlot, InProgressClientRequest,
MustUseOneshotSender, PeerError, SharedPeerError,
},
peer_set::ConnectionTracker,
protocol::{
@ -92,6 +92,25 @@ impl fmt::Display for Handler {
}
impl Handler {
/// Returns the Zebra internal handler type as a string.
pub fn command(&self) -> Cow<'static, str> {
match self {
Handler::Finished(Ok(response)) => format!("Finished({})", response.command()).into(),
Handler::Finished(Err(error)) => format!("Finished({})", error.kind()).into(),
Handler::Ping(_) => "Ping".into(),
Handler::Peers => "Peers".into(),
Handler::FindBlocks { .. } => "FindBlocks".into(),
Handler::FindHeaders { .. } => "FindHeaders".into(),
Handler::BlocksByHash { .. } => "BlocksByHash".into(),
Handler::TransactionsById { .. } => "TransactionsById".into(),
Handler::MempoolTransactionIds => "MempoolTransactionIds".into(),
}
}
/// Try to handle `msg` as a response to a client request, possibly consuming
/// it in the process.
///
@ -165,7 +184,7 @@ impl Handler {
if !transactions.is_empty() {
// if our peers start sending mixed solicited and unsolicited transactions,
// we should update this code to handle those responses
error!("unexpected transaction from peer: transaction responses should be sent in a continuous batch, followed by notfound. Using partial received transactions as the peer response");
info!("unexpected transaction from peer: transaction responses should be sent in a continuous batch, followed by notfound. Using partial received transactions as the peer response");
// TODO: does the caller need a list of missing transactions? (#1515)
Handler::Finished(Ok(Response::Transactions(transactions)))
} else {
@ -198,11 +217,11 @@ impl Handler {
if missing_transaction_ids != pending_ids {
trace!(?missing_invs, ?missing_transaction_ids, ?pending_ids);
// if these errors are noisy, we should replace them with debugs
error!("unexpected notfound message from peer: all remaining transaction hashes should be listed in the notfound. Using partial received transactions as the peer response");
info!("unexpected notfound message from peer: all remaining transaction hashes should be listed in the notfound. Using partial received transactions as the peer response");
}
if missing_transaction_ids.len() != missing_invs.len() {
trace!(?missing_invs, ?missing_transaction_ids, ?pending_ids);
error!("unexpected notfound message from peer: notfound contains duplicate hashes or non-transaction hashes. Using partial received transactions as the peer response");
info!("unexpected notfound message from peer: notfound contains duplicate hashes or non-transaction hashes. Using partial received transactions as the peer response");
}
if !transactions.is_empty() {
@ -297,11 +316,11 @@ impl Handler {
if missing_blocks != pending_hashes {
trace!(?items, ?missing_blocks, ?pending_hashes);
// if these errors are noisy, we should replace them with debugs
error!("unexpected notfound message from peer: all remaining block hashes should be listed in the notfound. Using partial received blocks as the peer response");
info!("unexpected notfound message from peer: all remaining block hashes should be listed in the notfound. Using partial received blocks as the peer response");
}
if missing_blocks.len() != items.len() {
trace!(?items, ?missing_blocks, ?pending_hashes);
error!("unexpected notfound message from peer: notfound contains duplicate hashes or non-block hashes. Using partial received blocks as the peer response");
info!("unexpected notfound message from peer: notfound contains duplicate hashes or non-block hashes. Using partial received blocks as the peer response");
}
if !blocks.is_empty() {
@ -375,6 +394,19 @@ impl fmt::Display for State {
}
}
impl State {
/// Returns the Zebra internal state as a string.
pub fn command(&self) -> Cow<'static, str> {
match self {
State::AwaitingRequest => "AwaitingRequest".into(),
State::AwaitingResponse { handler, .. } => {
format!("AwaitingResponse({})", handler.command()).into()
}
State::Failed => "Failed".into(),
}
}
}
/// The state associated with a peer connection.
pub struct Connection<S, Tx> {
/// The state of this connection's current request or response.
@ -414,6 +446,12 @@ pub struct Connection<S, Tx> {
/// Eventually, Zebra could stop making connections entirely.
#[allow(dead_code)]
pub(super) connection_tracker: ConnectionTracker,
/// The metrics label for this peer. Usually the remote IP and port.
pub(super) metrics_label: String,
/// The state for this peer, when the metrics were last updated.
pub(super) last_metrics_state: Option<Cow<'static, str>>,
}
impl<S, Tx> Connection<S, Tx>
@ -447,6 +485,8 @@ where
// If there is a pending request, we wait only on an incoming peer message, and
// check whether it can be interpreted as a response to the pending request.
loop {
self.update_state_metrics(None);
match self.state {
State::AwaitingRequest => {
trace!("awaiting client request or peer message");
@ -498,6 +538,7 @@ where
.request_timer
.as_mut()
.expect("timeout must be set while awaiting response");
// CORRECTNESS
//
// Currently, select prefers the first future if multiple
@ -513,6 +554,8 @@ where
Either::Right((None, _)) => self.fail_with(PeerError::ConnectionClosed),
Either::Right((Some(Err(e)), _)) => self.fail_with(e),
Either::Right((Some(Ok(peer_msg)), _cancel)) => {
self.update_state_metrics(format!("Out::Rsp::{}", peer_msg.command()));
// Try to process the message using the handler.
// This extremely awkward construction avoids
// keeping a live reference to handler across the
@ -531,9 +574,14 @@ where
),
};
self.update_state_metrics(None);
// Check whether the handler is finished
// processing messages and update the state.
self.state = match self.state {
//
// Replace the state with a temporary value,
// so we can take ownership of the response sender.
self.state = match std::mem::replace(&mut self.state, State::Failed) {
State::AwaitingResponse {
handler: Handler::Finished(response),
tx,
@ -541,6 +589,13 @@ where
} => {
if let Ok(response) = response.as_ref() {
debug!(%response, "finished receiving peer response to Zebra request");
// Add a metric for inbound responses to outbound requests.
metrics::counter!(
"zebra.net.in.responses",
1,
"command" => response.command(),
"addr" => self.metrics_label.clone(),
);
} else {
debug!(error = ?response, "error in peer response to Zebra request");
}
@ -568,6 +623,8 @@ where
),
};
self.update_state_metrics(None);
// If the message was not consumed, check whether it
// should be handled as a request.
if let Some(msg) = request_msg {
@ -579,12 +636,18 @@ where
Either::Left((Either::Right(_), _peer_fut)) => {
trace!(parent: &span, "client request timed out");
let e = PeerError::ClientRequestTimeout;
self.state = match self.state {
// Replace the state with a temporary value,
// so we can take ownership of the response sender.
self.state = match std::mem::replace(&mut self.state, State::Failed) {
// Special case: ping timeouts fail the connection.
State::AwaitingResponse {
handler: Handler::Ping(_),
tx,
..
} => {
let e = SharedPeerError::from(e);
let _ = tx.send(Err(e.clone()));
self.fail_with(e);
State::Failed
}
@ -666,12 +729,20 @@ where
//
// See the original bug #1510 and PR #1531, and the later bug #1599
// and PR #1600.
assert!(
self.error_slot.try_update_error(e).is_ok(),
"calling fail_with on already-failed connection state: failed connections must stop processing pending requests and responses, then close the connection. state: {:?} client receiver: {:?}",
self.state,
self.client_rx
);
let error_result = self.error_slot.try_update_error(e.clone());
if let Err(AlreadyErrored { original_error }) = error_result {
panic!(
"multiple failures for connection: \n\
failed connections should stop processing pending requests and responses, \n\
then close the connection. \n\
state: {:?} \n\
client receiver: {:?} \n\
original error: {:?} \n\
new error: {:?}",
self.state, self.client_rx, original_error, e,
);
}
// We want to close the client channel and set State::Failed so
// that we can flush any pending client requests. However, we may have
@ -679,6 +750,8 @@ where
// we need to deal with it first if it exists.
self.client_rx.close();
let old_state = std::mem::replace(&mut self.state, State::Failed);
self.update_state_metrics(None);
if let State::AwaitingResponse { tx, .. } = old_state {
// # Correctness
//
@ -706,11 +779,29 @@ where
if tx.is_canceled() {
metrics::counter!("peer.canceled", 1);
debug!(state = %self.state, %request, "ignoring canceled request");
metrics::counter!(
"zebra.net.out.requests.canceled",
1,
"command" => request.command(),
"addr" => self.metrics_label.clone(),
);
self.update_state_metrics(format!("Out::Req::Canceled::{}", request.command()));
return;
}
debug!(state = %self.state, %request, "sending request from Zebra to peer");
// Add a metric for outbound requests.
metrics::counter!(
"zebra.net.out.requests",
1,
"command" => request.command(),
"addr" => self.metrics_label.clone(),
);
self.update_state_metrics(format!("Out::Req::{}", request.command()));
// These matches return a Result with (new_state, Option<Sender>) or an (error, Sender)
let new_state_result = match (&self.state, request) {
(Failed, request) => panic!(
@ -867,7 +958,9 @@ where
// send a response before dropping tx.
let _ = tx.send(Ok(Response::Nil));
self.state = AwaitingRequest;
self.request_timer = Some(Box::pin(sleep(constants::REQUEST_TIMEOUT)));
// We only need a timer when we're waiting for a response.
// (And we don't want to accidentally re-use old timers.)
self.request_timer = None;
}
Ok((new_state @ AwaitingResponse { .. }, None)) => {
self.state = new_state;
@ -900,6 +993,8 @@ where
trace!(?msg);
debug!(state = %self.state, %msg, "received peer request to Zebra");
self.update_state_metrics(format!("In::Msg::{}", msg.command()));
let req = match msg {
Message::Ping(nonce) => {
trace!(?nonce, "responding to heartbeat");
@ -1040,6 +1135,15 @@ where
trace!(?req);
use tower::{load_shed::error::Overloaded, ServiceExt};
// Add a metric for inbound requests
metrics::counter!(
"zebra.net.in.requests",
1,
"command" => req.command(),
"addr" => self.metrics_label.clone(),
);
self.update_state_metrics(format!("In::Req::{}", req.command()));
if self.svc.ready().await.is_err() {
// Treat all service readiness errors as Overloaded
// TODO: treat `TryRecvError::Closed` in `Inbound::poll_ready` as a fatal error (#1655)
@ -1050,7 +1154,7 @@ where
let rsp = match self.svc.call(req.clone()).await {
Err(e) => {
if e.is::<Overloaded>() {
tracing::warn!("inbound service is overloaded, closing connection");
tracing::info!("inbound service is overloaded, closing connection");
metrics::counter!("pool.closed.loadshed", 1);
self.fail_with(PeerError::Overloaded);
} else {
@ -1058,16 +1162,25 @@ where
// them to disconnect, and we might be using them to sync blocks.
// For similar reasons, we don't want to fail_with() here - we
// only close the connection if the peer is doing something wrong.
error!(%e,
connection_state = ?self.state,
client_receiver = ?self.client_rx,
"error processing peer request");
info!(%e,
connection_state = ?self.state,
client_receiver = ?self.client_rx,
"error processing peer request");
}
return;
}
Ok(rsp) => rsp,
};
// Add a metric for outbound responses to inbound requests
metrics::counter!(
"zebra.net.out.responses",
1,
"command" => rsp.command(),
"addr" => self.metrics_label.clone(),
);
self.update_state_metrics(format!("In::Rsp::{}", rsp.command()));
match rsp.clone() {
Response::Nil => { /* generic success, do nothing */ }
Response::Peers(addrs) => {
@ -1122,6 +1235,62 @@ where
}
}
impl<S, Tx> Connection<S, Tx> {
/// Update the connection state metrics for this connection,
/// using `extra_state_info` as additional state information.
fn update_state_metrics(&mut self, extra_state_info: impl Into<Option<String>>) {
let current_metrics_state = if let Some(extra_state_info) = extra_state_info.into() {
format!("{}::{}", self.state.command(), extra_state_info).into()
} else {
self.state.command()
};
if self.last_metrics_state.as_ref() == Some(&current_metrics_state) {
return;
}
self.erase_state_metrics();
// Set the new state
metrics::increment_gauge!(
"zebra.net.connection.state",
1.0,
"command" => current_metrics_state.clone(),
"addr" => self.metrics_label.clone(),
);
self.last_metrics_state = Some(current_metrics_state);
}
/// Erase the connection state metrics for this connection.
fn erase_state_metrics(&mut self) {
if let Some(last_metrics_state) = self.last_metrics_state.take() {
metrics::gauge!(
"zebra.net.connection.state",
0.0,
"command" => last_metrics_state,
"addr" => self.metrics_label.clone(),
);
}
}
}
impl<S, Tx> Drop for Connection<S, Tx> {
fn drop(&mut self) {
if let State::AwaitingResponse { tx, .. } =
std::mem::replace(&mut self.state, State::Failed)
{
if let Some(error) = self.error_slot.try_get_error() {
let _ = tx.send(Err(error));
} else {
let _ = tx.send(Err(PeerError::ConnectionDropped.into()));
}
}
self.erase_state_metrics();
}
}
/// Map a list of inventory hashes to the corresponding unmined transaction IDs.
/// Non-transaction inventory hashes are skipped.
///

View File

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{borrow::Cow, sync::Arc};
use thiserror::Error;
@ -29,6 +29,10 @@ pub enum PeerError {
#[error("Peer closed connection")]
ConnectionClosed,
/// Zebra dropped the [`Connection`].
#[error("Internal connection dropped")]
ConnectionDropped,
/// The remote peer did not respond to a [`peer::Client`] request in time.
#[error("Client request timed out")]
ClientRequestTimeout,
@ -52,6 +56,22 @@ pub enum PeerError {
NotFound(Vec<InventoryHash>),
}
impl PeerError {
/// Returns the Zebra internal handler type as a string.
pub fn kind(&self) -> Cow<'static, str> {
match self {
PeerError::ConnectionClosed => "ConnectionClosed".into(),
PeerError::ConnectionDropped => "ConnectionDropped".into(),
PeerError::ClientRequestTimeout => "ClientRequestTimeout".into(),
// TODO: add error kinds or summaries to `SerializationError`
PeerError::Serialization(inner) => format!("Serialization({})", inner).into(),
PeerError::DuplicateHandshake => "DuplicateHandshake".into(),
PeerError::Overloaded => "Overloaded".into(),
PeerError::NotFound(_) => "NotFound".into(),
}
}
}
/// A shared error slot for peer errors.
///
/// # Correctness
@ -101,8 +121,7 @@ impl ErrorSlot {
let mut guard = self.0.lock().expect("error mutex should be unpoisoned");
if let Some(original_error) = guard.clone() {
error!(?original_error, new_error = ?e, "peer connection already errored");
Err(AlreadyErrored)
Err(AlreadyErrored { original_error })
} else {
*guard = Some(e);
Ok(())
@ -110,8 +129,12 @@ impl ErrorSlot {
}
}
/// The `ErrorSlot` already contains an error.
pub struct AlreadyErrored;
/// Error used when the `ErrorSlot` already contains an error.
#[derive(Clone, Debug)]
pub struct AlreadyErrored {
/// The original error in the error slot.
pub original_error: SharedPeerError,
}
/// An error during a handshake with a remote peer.
#[derive(Error, Debug)]

View File

@ -809,7 +809,7 @@ where
metrics::counter!(
"zcash.net.out.messages",
1,
"command" => msg.to_string(),
"command" => msg.command(),
"addr" => connected_addr.get_transient_addr_label(),
);
// We need to use future::ready rather than an async block here,
@ -840,7 +840,7 @@ where
metrics::counter!(
"zcash.net.in.messages",
1,
"command" => msg.to_string(),
"command" => msg.command(),
"addr" => connected_addr.get_transient_addr_label(),
);
@ -924,6 +924,8 @@ where
error_slot: slot,
peer_tx,
connection_tracker,
metrics_label: connected_addr.get_transient_addr_label(),
last_metrics_state: None,
};
tokio::spawn(

View File

@ -21,7 +21,10 @@ use zebra_chain::{
use super::MorePeers;
use crate::{
peer::{Client, ClientRequest, ErrorSlot, LoadTrackedClient, MinimumPeerVersion},
peer::{
CancelHeartbeatTask, Client, ClientRequest, ErrorSlot, LoadTrackedClient,
MinimumPeerVersion,
},
peer_set::PeerSet,
protocol::external::{types::Version, InventoryHash},
AddressBook, Config,
@ -38,7 +41,7 @@ const MAX_PEERS: usize = 20;
/// A handle to a mocked [`Client`] instance.
struct MockedClientHandle {
_request_receiver: mpsc::Receiver<ClientRequest>,
shutdown_receiver: oneshot::Receiver<()>,
shutdown_receiver: oneshot::Receiver<CancelHeartbeatTask>,
version: Version,
}
@ -74,7 +77,7 @@ impl MockedClientHandle {
pub fn is_connected(&mut self) -> bool {
match self.shutdown_receiver.try_recv() {
Ok(None) => true,
Ok(Some(())) | Err(oneshot::Canceled) => false,
Ok(Some(CancelHeartbeatTask)) | Err(oneshot::Canceled) => false,
}
}
}

View File

@ -426,3 +426,30 @@ impl fmt::Display for Message {
})
}
}
impl Message {
/// Returns the Zcash protocol message command as a string.
pub fn command(&self) -> &'static str {
match self {
Message::Version { .. } => "version",
Message::Verack => "verack",
Message::Ping(_) => "ping",
Message::Pong(_) => "pong",
Message::Reject { .. } => "reject",
Message::GetAddr => "getaddr",
Message::Addr(_) => "addr",
Message::GetBlocks { .. } => "getblocks",
Message::Inv(_) => "inv",
Message::GetHeaders { .. } => "getheaders",
Message::Headers(_) => "headers",
Message::GetData(_) => "getdata",
Message::Block(_) => "block",
Message::Tx(_) => "tx",
Message::NotFound(_) => "notfound",
Message::Mempool => "mempool",
Message::FilterLoad { .. } => "filterload",
Message::FilterAdd { .. } => "filteradd",
Message::FilterClear => "filterclear",
}
}
}

View File

@ -228,3 +228,25 @@ impl fmt::Display for Request {
})
}
}
impl Request {
/// Returns the Zebra internal request type as a string.
pub fn command(&self) -> &'static str {
match self {
Request::Peers => "Peers",
Request::Ping(_) => "Ping",
Request::BlocksByHash(_) => "BlocksByHash",
Request::TransactionsById(_) => "TransactionsById",
Request::FindBlocks { .. } => "FindBlocks",
Request::FindHeaders { .. } => "FindHeaders",
Request::PushTransaction(_) => "PushTransaction",
Request::AdvertiseTransactionIds(_) => "AdvertiseTransactionIds",
Request::AdvertiseBlock(_) => "AdvertiseBlock",
Request::MempoolTransactionIds => "MempoolTransactionIds",
}
}
}

View File

@ -92,3 +92,22 @@ impl fmt::Display for Response {
})
}
}
impl Response {
/// Returns the Zebra internal response type as a string.
pub fn command(&self) -> &'static str {
match self {
Response::Nil => "Nil",
Response::Peers(_) => "Peers",
Response::Blocks(_) => "Blocks",
Response::BlockHashes(_) => "BlockHashes",
Response::BlockHeaders { .. } => "BlockHeaders",
Response::Transactions(_) => "Transactions",
Response::TransactionIds(_) => "TransactionIds",
}
}
}

View File

@ -173,6 +173,8 @@ where
?MAX_INBOUND_CONCURRENCY,
"block hash already queued for inbound download: ignored block"
);
metrics::gauge!("gossip.queued.block.count", self.pending.len() as _);
return DownloadAction::AlreadyQueued;
}
@ -183,6 +185,8 @@ where
?MAX_INBOUND_CONCURRENCY,
"too many blocks queued for inbound download: ignored block"
);
metrics::gauge!("gossip.queued.block.count", self.pending.len() as _);
return DownloadAction::FullQueue;
}
@ -229,9 +233,9 @@ where
.in_current_span();
let task = tokio::spawn(async move {
// TODO: if the verifier and cancel are both ready, which should we
// prefer? (Currently, select! chooses one at random.)
// Prefer the cancel handle if both are ready.
tokio::select! {
biased;
_ = &mut cancel_rx => {
tracing::trace!("task cancelled prior to completion");
metrics::counter!("gossip.cancelled.count", 1);

View File

@ -263,6 +263,11 @@ where
?MAX_INBOUND_CONCURRENCY,
"transaction id already queued for inbound download: ignored transaction"
);
metrics::gauge!(
"mempool.currently.queued.transactions",
self.pending.len() as _
);
return Err(MempoolError::AlreadyQueued);
}
@ -273,6 +278,11 @@ where
?MAX_INBOUND_CONCURRENCY,
"too many transactions queued for inbound download: ignored transaction"
);
metrics::gauge!(
"mempool.currently.queued.transactions",
self.pending.len() as _
);
return Err(MempoolError::FullQueue);
}
@ -358,9 +368,9 @@ where
.in_current_span();
let task = tokio::spawn(async move {
// TODO: if the verifier and cancel are both ready, which should we
// prefer? (Currently, select! chooses one at random.)
// Prefer the cancel handle if both are ready.
tokio::select! {
biased;
_ = &mut cancel_rx => {
tracing::trace!("task cancelled prior to completion");
metrics::counter!("mempool.cancelled.verify.tasks.total", 1);

View File

@ -179,9 +179,9 @@ where
let mut verifier = self.verifier.clone();
let task = tokio::spawn(
async move {
// TODO: if the verifier and cancel are both ready, which should
// we prefer? (Currently, select! chooses one at random.)
// Prefer the cancel handle if both are ready.
let rsp = tokio::select! {
biased;
_ = &mut cancel_rx => {
tracing::trace!("task cancelled prior to download completion");
metrics::counter!("sync.cancelled.download.count", 1);
@ -205,9 +205,9 @@ where
.await
.map_err(BlockDownloadVerifyError::VerifierError)?
.call(block);
// TODO: if the verifier and cancel are both ready, which should
// we prefer? (Currently, select! chooses one at random.)
// Prefer the cancel handle if both are ready.
let verification = tokio::select! {
biased;
_ = &mut cancel_rx => {
tracing::trace!("task cancelled prior to verification");
metrics::counter!("sync.cancelled.verify.count", 1);