refactor(hermes): move metrics endpoint to a separate metrics server

This commit is contained in:
Ali Behjati 2023-10-19 11:21:40 +02:00
parent 29dc1082dd
commit 8075d0704e
8 changed files with 84 additions and 23 deletions

View File

@ -1,6 +1,7 @@
export PYTHNET_HTTP_ADDR=http://pythnet-http-rpc/
export PYTHNET_WS_ADDR=ws://pythnet-ws-rpc/
export RPC_LISTEN_ADDR=0.0.0.0:7575
export METRICS_LISTEN_ADDR=0.0.0.0:7576
export BENCHMARKS_ENDPOINT=https://benchmarks.pyth.network
export WORMHOLE_SPY_RPC_ADDR=http://spy-or-beacon-rpc
export RUST_LOG=info

View File

@ -153,7 +153,6 @@ pub async fn run(opts: RunOptions, state: ApiState) -> Result<()> {
.route("/api/latest_vaas", get(rest::latest_vaas))
.route("/api/price_feed_ids", get(rest::price_feed_ids))
.route("/live", get(rest::live))
.route("/metrics", get(rest::metrics))
.route("/ready", get(rest::ready))
.route("/ws", get(ws::ws_route_handler))
.route_layer(from_fn_with_state(

View File

@ -17,7 +17,6 @@ mod index;
mod latest_price_feeds;
mod latest_vaas;
mod live;
mod metrics;
mod price_feed_ids;
mod ready;
@ -29,7 +28,6 @@ pub use {
latest_price_feeds::*,
latest_vaas::*,
live::*,
metrics::*,
price_feed_ids::*,
ready::*,
};

View File

@ -1,20 +0,0 @@
//! Exposing prometheus metrics via HTTP in openmetrics format.
use {
axum::{
extract::State,
response::IntoResponse,
},
prometheus_client::encoding::text::encode,
};
pub async fn metrics(State(state): State<crate::api::ApiState>) -> impl IntoResponse {
let registry = state.state.metrics_registry.read().await;
let mut buffer = String::new();
// Should not fail if the metrics are valid and there is memory available
// to write to the buffer.
encode(&mut buffer, &registry).unwrap();
buffer
}

View File

@ -8,6 +8,7 @@ use clap::{
};
mod benchmarks;
mod metrics;
mod pythnet;
mod rpc;
mod wormhole;
@ -44,6 +45,10 @@ pub struct RunOptions {
/// Benchmarks Options
#[command(flatten)]
pub benchmarks: benchmarks::Options,
/// Metrics Options
#[command(flatten)]
pub metrics: metrics::Options,
}
#[derive(Args, Clone, Debug)]

View File

@ -0,0 +1,17 @@
use {
clap::Args,
std::net::SocketAddr,
};
const DEFAULT_METRICS_SERVER_LISTEN_ADDR: &str = "127.0.0.1:33888";
#[derive(Args, Clone, Debug)]
#[command(next_help_heading = "Metrics Options")]
#[group(id = "Metrics")]
pub struct Options {
/// Address and port the RPC server will bind to.
#[arg(long = "metrics-server-listen-addr")]
#[arg(default_value = DEFAULT_METRICS_SERVER_LISTEN_ADDR)]
#[arg(env = "METRICS_LISTEN_ADDR")]
pub server_listen_addr: SocketAddr,
}

View File

@ -19,6 +19,7 @@ use {
mod aggregate;
mod api;
mod config;
mod metrics_server;
mod network;
mod serde;
mod state;
@ -61,6 +62,7 @@ async fn init() -> Result<()> {
let tasks = join_all([
Box::pin(spawn(network::wormhole::spawn(opts.clone(), store.clone()))),
Box::pin(spawn(network::pythnet::spawn(opts.clone(), store.clone()))),
Box::pin(spawn(metrics_server::run(opts.clone(), store.clone()))),
Box::pin(spawn(api::spawn(opts.clone(), store.clone(), update_rx))),
])
.await;

View File

@ -0,0 +1,59 @@
//! Metrics Server
//!
//! This server serves metrics over /metrics in OpenMetrics format.
use {
crate::{
config::RunOptions,
state::State as AppState,
},
anyhow::Result,
axum::{
extract::State,
response::IntoResponse,
routing::get,
Router,
},
prometheus_client::encoding::text::encode,
std::sync::{
atomic::Ordering,
Arc,
},
};
#[tracing::instrument(skip(opts, state))]
pub async fn run(opts: RunOptions, state: Arc<AppState>) -> Result<()> {
tracing::info!(endpoint = %opts.metrics.server_listen_addr, "Starting Metrics Server.");
let app = Router::new();
let app = app
.route("/metrics", get(metrics))
.with_state(state.clone());
// Binds the axum's server to the configured address and port. This is a blocking call and will
// not return until the server is shutdown.
axum::Server::try_bind(&opts.metrics.server_listen_addr)?
.serve(app.into_make_service())
.with_graceful_shutdown(async {
while !crate::SHOULD_EXIT.load(Ordering::Acquire) {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
tracing::info!("Shutting down metrics server...");
})
.await?;
Ok(())
}
pub async fn metrics(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let registry = state.metrics_registry.read().await;
let mut buffer = String::new();
// Should not fail if the metrics are valid and there is memory available
// to write to the buffer.
encode(&mut buffer, &registry).unwrap();
buffer
}