[wormhole-attester] Add a healthcheck on the metrics port (#429)

* wormhole attester: Add a healthcheck on the metrics port

* pyth2wormhole healthcheck: apply review advice

- Move metrics/healthcheck counter updates to be next to each other
- Change "0-sized window disables healthcheck" into an explicit config value
- move healthcheck updates past the atomic counter updates
This commit is contained in:
Stanisław Drozd 2022-12-15 18:29:23 +01:00 committed by GitHub
parent 51754457a6
commit d517f11af0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 292 additions and 101 deletions

View File

@ -45,11 +45,35 @@ spec:
- name: P2W_EXIT_ON_ERROR
value: "true"
tty: true
readinessProbe:
tcpSocket:
port: 2000
periodSeconds: 1
failureThreshold: 300
# Probes, in order of appearance https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/
#
# Startup probe - delays other probes until it gets its first success
startupProbe:
httpGet:
path: /healthcheck
port: 3000
failureThreshold: 100 # up to 100 * 10 seconds to report initial healthy status
periodSeconds: 10
# Readiness probe - Used to tell load balancers to
# start/stop sending traffic to the container, *without*
# restarting it. The attester does not accept any traffic as
# part of its workflow, which means this isn't very useful.
# readinessProbe:
# httpGet:
# path: /healthcheck
# port: 3000
# failureThreshold: 1
# periodSeconds: 10
#
# Liveness probe - decides restarts for misbehaving
# containers
livenessProbe:
httpGet:
path: /healthcheck
port: 3000
failureThreshold: 1 # If the attester healthcheck fails once,
periodSeconds: 10
ports:
- containerPort: 4343
name: p2w-attest

View File

@ -2710,7 +2710,7 @@ dependencies = [
[[package]]
name = "pyth2wormhole-client"
version = "1.2.0"
version = "1.3.0"
dependencies = [
"borsh",
"clap 3.1.18",

View File

@ -1,6 +1,6 @@
[package]
name = "pyth2wormhole-client"
version = "1.2.0"
version = "1.3.0"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View File

@ -35,6 +35,14 @@ pub struct AttestationConfig {
#[serde(default = "default_max_msg_accounts")]
pub max_msg_accounts: u64,
/// How many consecutive attestation failures cause the service to
/// report as unhealthy.
#[serde(default = "default_healthcheck_window_size")]
pub healthcheck_window_size: u64,
#[serde(default = "default_enable_healthcheck")]
pub enable_healthcheck: bool,
/// Optionally, we take a mapping account to add remaining symbols from a Pyth deployments.
/// These symbols are processed under `default_attestation_conditions`.
#[serde(
@ -85,7 +93,7 @@ impl AttestationConfig {
name_to_symbols
.entry(name.clone())
.or_insert(vec![])
.or_default()
.push(symbol);
}
}
@ -289,6 +297,14 @@ pub const fn default_min_msg_reuse_interval_ms() -> u64 {
10_000 // 10s
}
pub const fn default_healthcheck_window_size() -> u64 {
100
}
pub const fn default_enable_healthcheck() -> bool {
true
}
pub const fn default_mapping_reload_interval_mins() -> u64 {
15
}
@ -473,6 +489,8 @@ mod tests {
let cfg = AttestationConfig {
min_msg_reuse_interval_ms: 1000,
max_msg_accounts: 100_000,
enable_healthcheck: true,
healthcheck_window_size: 100,
min_rpc_interval_ms: 2123,
mapping_addr: None,
mapping_reload_interval_mins: 42,
@ -555,6 +573,8 @@ mod tests {
let cfg = AttestationConfig {
min_msg_reuse_interval_ms: 1000,
max_msg_accounts: 100_000,
healthcheck_window_size: 100,
enable_healthcheck: true,
min_rpc_interval_ms: 2123,
mapping_addr: None,
mapping_reload_interval_mins: 42,

View File

@ -0,0 +1,52 @@
use {
std::{
collections::VecDeque,
sync::Arc,
},
tokio::sync::Mutex,
};
lazy_static::lazy_static! {
pub static ref HEALTHCHECK_STATE: Arc<Mutex<HealthCheckState>> = Arc::new(Mutex::new(HealthCheckState::new(1, false)));
}
/// Helper structure for deciding service health
pub struct HealthCheckState {
/// Whether to report the healthy/unhealthy status
pub enable: bool,
/// Sliding LIFO window over last `max_window_size` attestation results (true = ok, false = error)
pub window: VecDeque<bool>,
/// Window size
pub max_window_size: usize,
}
impl HealthCheckState {
pub fn new(max_window_size: usize, enable: bool) -> Self {
Self {
enable,
window: VecDeque::with_capacity(max_window_size),
max_window_size,
}
}
/// Check service health, return None if not enough data is present
pub fn is_healthy(&self) -> Option<bool> {
if self.window.len() >= self.max_window_size && self.enable {
// If all results are false, return false (unhealthy).
Some(self.window.iter().any(|entry| *entry))
} else {
// The window isn't big enough yet or the healthcheck is disabled
None
}
}
/// Rotate the window
pub fn add_result(&mut self, res: bool) {
self.window.push_front(res);
// Trim window back to size if needed. truncate() deletes from
// the back and has no effect if new size is greater than
// current size.
self.window.truncate(self.max_window_size);
}
}

View File

@ -1,5 +1,6 @@
pub mod attestation_cfg;
pub mod batch_state;
pub mod healthcheck;
pub mod message;
pub mod util;
@ -10,6 +11,10 @@ pub use {
P2WSymbol,
},
batch_state::BatchState,
healthcheck::{
HealthCheckState,
HEALTHCHECK_STATE,
},
message::P2WMessageQueue,
pyth2wormhole::Pyth2WormholeConfig,
util::{

View File

@ -1,3 +1,5 @@
pub mod cli;
use {
clap::Parser,
cli::{
@ -44,6 +46,7 @@ use {
P2WMessageQueue,
P2WSymbol,
RLMutex,
HEALTHCHECK_STATE,
},
sha3::{
Digest,
@ -82,20 +85,20 @@ use {
},
};
pub mod cli;
pub const SEQNO_PREFIX: &str = "Program log: Sequence: ";
lazy_static! {
static ref ATTESTATIONS_OK_CNT: IntCounter =
register_int_counter!("attestations_ok", "Number of successful attestations").unwrap();
register_int_counter!("attestations_ok", "Number of successful attestations")
.expect("FATAL: Could not instantiate ATTESTATIONS_OK_CNT");
static ref ATTESTATIONS_ERR_CNT: IntCounter =
register_int_counter!("attestations_err", "Number of failed attestations").unwrap();
register_int_counter!("attestations_err", "Number of failed attestations")
.expect("FATAL: Could not instantiate ATTESTATIONS_ERR_CNT");
static ref LAST_SEQNO_GAUGE: IntGauge = register_int_gauge!(
"last_seqno",
"Latest sequence number produced by this attester"
)
.unwrap();
.expect("FATAL: Could not instantiate LAST_SEQNO_GAUGE");
}
#[tokio::main(flavor = "multi_thread")]
@ -291,6 +294,21 @@ async fn handle_attest_daemon_mode(
attestation_cfg: AttestationConfig,
metrics_bind_addr: SocketAddr,
) -> Result<(), ErrBox> {
// Update healthcheck window size from config
if attestation_cfg.enable_healthcheck {
if attestation_cfg.healthcheck_window_size == 0 {
return Err(format!(
"{} must be above 0",
stringify!(attestation_cfg.healthcheck_window_size)
)
.into());
}
let mut hc = HEALTHCHECK_STATE.lock().await;
hc.max_window_size = attestation_cfg.healthcheck_window_size as usize;
} else {
warn!("WARNING: Healthcheck is disabled");
}
tokio::spawn(start_metrics_server(metrics_bind_addr));
info!("Started serving metrics on {}", metrics_bind_addr);
@ -611,19 +629,6 @@ async fn attestation_sched_job(args: AttestationSchedJobArgs) -> Result<(), ErrB
batch_no, batch_count, batch.group_name
);
let job = attestation_job(AttestationJobArgs {
rlmtx: rpc_cfg.clone(),
batch_no,
batch_count,
group_name: batch.group_name.clone(),
p2w_addr,
config: config.clone(),
payer: Keypair::from_bytes(&payer.to_bytes()).unwrap(), // Keypair has no clone
symbols: batch.symbols.to_vec(),
max_jobs_sema: sema.clone(),
message_q_mtx: message_q_mtx.clone(),
});
// park this routine until a resend condition is met
loop {
if let Some(reason) = batch
@ -649,27 +654,27 @@ async fn attestation_sched_job(args: AttestationSchedJobArgs) -> Result<(), ErrB
);
}
// This short-lived permit prevents scheduling
// excess attestation jobs (which could eventually
// eat all memory). It is freed as soon as we
// leave this code block.
let _permit4sched = sema.acquire().await?;
let batch_no4err_msg = batch_no;
let batch_count4err_msg = batch_count;
let group_name4err_msg = batch.group_name.clone();
// We never get to error reporting in daemon mode, attach a map_err
let job_with_err_msg = job.map_err(move |e| {
warn!(
"Batch {}/{}, group {:?} ERR: {:#?}",
batch_no4err_msg, batch_count4err_msg, group_name4err_msg, e
);
e
let job = attestation_job(AttestationJobArgs {
rlmtx: rpc_cfg.clone(),
batch_no,
batch_count,
group_name: batch.group_name.clone(),
p2w_addr,
config: config.clone(),
payer: Keypair::from_bytes(&payer.to_bytes()).unwrap(), // Keypair has no clone
symbols: batch.symbols.to_vec(),
max_jobs_sema: sema.clone(),
message_q_mtx: message_q_mtx.clone(),
});
// This short-lived permit prevents scheduling excess
// attestation jobs hanging on the max jobs semaphore (which could
// eventually eat all memory). It is freed as soon as we leave
// this code block.
let _permit4sched = sema.acquire().await?;
// Spawn the job in background
let _detached_job: JoinHandle<_> = tokio::spawn(job_with_err_msg);
let _detached_job: JoinHandle<_> = tokio::spawn(job);
batch.last_job_finished_at = Instant::now();
}
@ -767,67 +772,103 @@ async fn attestation_job(args: AttestationJobArgs) -> Result<(), ErrBoxSend> {
max_jobs_sema,
message_q_mtx,
} = args;
let batch_no4err_msg = batch_no;
let batch_count4err_msg = batch_count;
let group_name4err_msg = group_name.clone();
// Will be dropped after attestation is complete
let _permit = max_jobs_sema.acquire().await?;
// The following async block is just wrapping the job in a log
// statement and err counter increase in case the job fails. It is
// done by using the or_else() future method. No other actions are
// performed and the error is propagated up the stack.
//
// This is necessary to learn about errors in jobs started with
// tokio::spawn() because in this package spawned futures are
// never explicitly awaited on.
//
// Previously, the or_else() existed in attestation_sched_job()
// which schedules this future. It was moved here for readability,
// after introduction of Prometheus metrics and the healthcheck,
// which helped keep metrics updates closer together.
let job_with_err_msg = (async move {
// Will be dropped after attestation is complete
let _permit = max_jobs_sema.acquire().await?;
debug!(
"Batch {}/{}, group {:?}: Starting attestation job",
batch_no, batch_count, group_name
);
let rpc = lock_and_make_rpc(&rlmtx).await; // Reuse the same lock for the blockhash/tx/get_transaction
let latest_blockhash = rpc
.get_latest_blockhash()
.map_err(|e| -> ErrBoxSend { e.into() })
.await?;
debug!(
"Batch {}/{}, group {:?}: Starting attestation job",
batch_no, batch_count, group_name
);
let rpc = lock_and_make_rpc(&rlmtx).await; // Reuse the same lock for the blockhash/tx/get_transaction
let latest_blockhash = rpc
.get_latest_blockhash()
.map_err(|e| -> ErrBoxSend { e.into() })
.await?;
let wh_msg_id = message_q_mtx.lock().await.get_account()?.id;
let wh_msg_id = message_q_mtx.lock().await.get_account()?.id;
let tx_res: Result<_, ErrBoxSend> = gen_attest_tx(
p2w_addr,
&config,
&payer,
wh_msg_id,
symbols.as_slice(),
latest_blockhash,
);
let sig = rpc
.send_and_confirm_transaction(&tx_res?)
.map_err(|e| -> ErrBoxSend { e.into() })
.await?;
let tx_data = rpc
.get_transaction_with_config(
&sig,
RpcTransactionConfig {
encoding: Some(UiTransactionEncoding::Json),
commitment: Some(rpc.commitment()),
max_supported_transaction_version: None,
},
)
.await?;
let seqno = tx_data
.transaction
.meta
.and_then(|meta| meta.log_messages)
.and_then(|logs| {
let mut seqno = None;
for log in logs {
if log.starts_with(SEQNO_PREFIX) {
seqno = Some(log.replace(SEQNO_PREFIX, ""));
break;
let tx_res: Result<_, ErrBoxSend> = gen_attest_tx(
p2w_addr,
&config,
&payer,
wh_msg_id,
symbols.as_slice(),
latest_blockhash,
);
let sig = rpc
.send_and_confirm_transaction(&tx_res?)
.map_err(|e| -> ErrBoxSend { e.into() })
.await?;
let tx_data = rpc
.get_transaction_with_config(
&sig,
RpcTransactionConfig {
encoding: Some(UiTransactionEncoding::Json),
commitment: Some(rpc.commitment()),
max_supported_transaction_version: None,
},
)
.await?;
let seqno = tx_data
.transaction
.meta
.and_then(|meta| meta.log_messages)
.and_then(|logs| {
let mut seqno = None;
for log in logs {
if log.starts_with(SEQNO_PREFIX) {
seqno = Some(log.replace(SEQNO_PREFIX, ""));
break;
}
}
}
seqno
})
.ok_or_else(|| -> ErrBoxSend { "No seqno in program logs".to_string().into() })?;
seqno
})
.ok_or_else(|| -> ErrBoxSend { "No seqno in program logs".to_string().into() })?;
info!(
"Batch {}/{}, group {:?} OK. Sequence: {}",
batch_no, batch_count, group_name, seqno
);
ATTESTATIONS_OK_CNT.inc();
LAST_SEQNO_GAUGE.set(seqno.parse::<i64>()?);
Result::<(), ErrBoxSend>::Ok(())
info!(
"Batch {}/{}, group {:?} OK. Sequence: {}",
batch_no, batch_count, group_name, seqno
);
ATTESTATIONS_OK_CNT.inc();
LAST_SEQNO_GAUGE.set(seqno.parse::<i64>()?);
HEALTHCHECK_STATE.lock().await.add_result(true); // Report this job as successful to healthcheck
Result::<(), ErrBoxSend>::Ok(())
})
.or_else(move |e| async move {
// log any errors coming from the job
warn!(
"Batch {}/{}, group {:?} ERR: {:#?}",
batch_no4err_msg, batch_count4err_msg, group_name4err_msg, e
);
// Bump counters
ATTESTATIONS_ERR_CNT.inc();
HEALTHCHECK_STATE.lock().await.add_result(false); // Report this job as failed to healthcheck
Err(e)
});
job_with_err_msg.await
}
fn init_logging() {

View File

@ -1,4 +1,5 @@
use {
crate::HEALTHCHECK_STATE,
http::status::StatusCode,
log::{
error,
@ -123,10 +124,58 @@ async fn metrics_handler() -> Result<impl Reply, Rejection> {
}
}
/// Shares healthcheck result via HTTP status codes. The idea is to
/// get a yes/no health answer using a plain HTTP request. Note: Curl
/// does not treat 3xx statuses as errors by default.
async fn healthcheck_handler() -> Result<impl Reply, Rejection> {
let hc_state = HEALTHCHECK_STATE.lock().await;
match hc_state.is_healthy() {
// Healthy - 200 OK
Some(true) => {
let ok_count = hc_state
.window
.iter()
.fold(0usize, |acc, val| if *val { acc + 1 } else { acc });
let msg = format!(
"healthy, {} of {} last attestations OK",
ok_count, hc_state.max_window_size
);
Ok(reply::with_status(msg, StatusCode::OK))
}
// Unhealthy - 503 Service Unavailable
Some(false) => {
let msg = format!(
"unhealthy, all of {} latest attestations returned error",
hc_state.max_window_size
);
Ok(reply::with_status(msg, StatusCode::SERVICE_UNAVAILABLE))
}
// No data - 307 Temporary Redirect
None => {
let msg = if hc_state.enable {
format!(
"Not enough data in window, {} of {} min attempts made",
hc_state.window.len(),
hc_state.max_window_size
)
} else {
"Healthcheck disabled (enable_healthcheck is false)".to_string()
};
Ok(reply::with_status(msg, StatusCode::TEMPORARY_REDIRECT))
}
}
}
/// Serves Prometheus metrics and the result of the healthcheck
pub async fn start_metrics_server(addr: impl Into<SocketAddr> + 'static) {
let metrics_route = warp::path("metrics") // The metrics subpage is standardized to always be /metrics
let metrics_route = warp::path("metrics") // The Prometheus metrics subpage is standardized to always be /metrics
.and(warp::path::end())
.and_then(metrics_handler);
let healthcheck_route = warp::path("healthcheck")
.and(warp::path::end())
.and_then(healthcheck_handler);
warp::serve(metrics_route).bind(addr).await;
warp::serve(metrics_route.or(healthcheck_route))
.bind(addr)
.await;
}