feat(hermes): add metrics

Co-authored-by: Reisen <Reisen@users.noreply.github.com>
This commit is contained in:
Ali Behjati 2023-10-17 18:52:49 +02:00
parent 727f9ec33d
commit 9da64103db
14 changed files with 529 additions and 1802 deletions

6
hermes/.envrc.sample Normal file
View File

@ -0,0 +1,6 @@
export PYTHNET_HTTP_ADDR=http://pythnet-http-rpc/
export PYTHNET_WS_ADDR=ws://pythnet-ws-rpc/
export RPC_LISTEN_ADDR=0.0.0.0:7575
export BENCHMARKS_ENDPOINT=https://benchmarks.pyth.network
export WORMHOLE_SPY_RPC_ADDR=http://spy-or-beacon-rpc
export RUST_LOG=info

1826
hermes/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
[package]
name = "hermes"
version = "0.3.3"
version = "0.4.0"
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
edition = "2021"
@ -27,7 +27,7 @@ libc = { version = "0.2.140" }
log = { version = "0.4.17" }
mock_instant = { version = "0.3.1", features = ["sync"] }
nonzero_ext = { version = "0.3.0" }
prometheus-client = { version = "0.21.1" }
prometheus-client = { version = "0.21.2" }
prost = { version = "0.12.1" }
pyth-sdk = { version = "0.8.0" }
pythnet-sdk = { path = "../pythnet/pythnet_sdk/", version = "2.0.0", features = ["strum"] }
@ -49,21 +49,6 @@ utoipa = { version = "3.4.0", features = ["axum_extras"] }
utoipa-swagger-ui = { version = "3.1.4", features = ["axum"] }
wormhole-sdk = { git = "https://github.com/wormhole-foundation/wormhole", tag = "v2.17.1" }
# Setup LibP2P. Unfortunately the dependencies required by libp2p are shared
# with the dependencies required by many Solana components. This means that we
# would have to use the same version of libp2p as solana. Luckily we don't need
# to worry about this until we want to hoist the libp2p portion of Hermes into
# Rust land.
libp2p = { version = "0.42.2", features = [
"gossipsub",
"identify",
"mplex",
"noise",
"secp256k1",
"websocket",
"yamux",
]}
# We are bound to this Solana version in order to match pyth-oracle.
solana-client = { version = "=1.13.3" }
solana-sdk = { version = "=1.13.3" }

View File

@ -36,6 +36,7 @@ use {
},
borsh::BorshDeserialize,
byteorder::BigEndian,
prometheus_client::registry::Registry,
pyth_sdk::{
Price,
PriceFeed,
@ -61,6 +62,7 @@ use {
wormhole_sdk::Vaa,
};
pub mod metrics;
pub mod wormhole_merkle;
#[derive(Clone, PartialEq, Debug)]
@ -100,7 +102,7 @@ impl AggregationEvent {
}
}
#[derive(Clone, PartialEq, Debug)]
#[derive(Clone, Debug)]
pub struct AggregateState {
/// The latest completed slot. This is used to check whether a completed state is new or out of
/// order.
@ -112,14 +114,18 @@ pub struct AggregateState {
/// The latest observed slot among different Aggregate updates. This is used for the health
/// probes.
pub latest_observed_slot: Option<Slot>,
/// Metrics
pub metrics: metrics::Metrics,
}
impl AggregateState {
pub fn new() -> Self {
pub fn new(metrics_registry: &mut Registry) -> Self {
Self {
latest_completed_slot: None,
latest_completed_update_at: None,
latest_observed_slot: None,
metrics: metrics::Metrics::new(metrics_registry),
}
}
}
@ -192,6 +198,13 @@ pub async fn store_update(state: &State, update: Update) -> Result<()> {
)
.await?;
state
.aggregate_state
.write()
.await
.metrics
.observe(proof.slot, metrics::Event::Vaa);
proof.slot
}
}
@ -203,6 +216,13 @@ pub async fn store_update(state: &State, update: Update) -> Result<()> {
state
.store_accumulator_messages(accumulator_messages)
.await?;
state
.aggregate_state
.write()
.await
.metrics
.observe(slot, metrics::Event::AccumulatorMessages);
slot
}
};
@ -269,6 +289,10 @@ pub async fn store_update(state: &State, update: Update) -> Result<()> {
.latest_completed_update_at
.replace(Instant::now());
aggregate_state
.metrics
.observe(slot, metrics::Event::CompletedUpdate);
Ok(())
}

View File

@ -0,0 +1,123 @@
use {
super::Slot,
prometheus_client::{
encoding::{
EncodeLabelSet,
EncodeLabelValue,
},
metrics::{
counter::Counter,
family::Family,
histogram::Histogram,
},
registry::Registry,
},
std::collections::{
BTreeMap,
HashMap,
},
tokio::time::Instant,
};
const MAX_SLOT_OBSERVATIONS: usize = 1000;
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)]
pub enum SlotOrder {
New,
OutOfOrder,
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)]
pub enum Event {
Vaa,
AccumulatorMessages,
CompletedUpdate,
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct ObservedSlotLabels {
pub order: SlotOrder,
pub event: Event,
}
#[derive(Clone, Debug)]
pub struct Metrics {
observed_slot: Family<ObservedSlotLabels, Counter>,
observed_slot_latency: Family<ObservedSlotLabels, Histogram>,
first_observed_time_of_slot: BTreeMap<Slot, Instant>,
newest_observed_slot: HashMap<Event, Slot>,
}
impl Metrics {
pub fn new(metrics_registry: &mut Registry) -> Self {
let new = Self {
observed_slot: Family::default(),
observed_slot_latency: Family::new_with_constructor(|| {
Histogram::new(
[
0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 1.0, 1.3, 1.7, 2.0, 3.0, 5.0, 10.0, 20.0,
]
.into_iter(),
)
}),
first_observed_time_of_slot: BTreeMap::new(),
newest_observed_slot: HashMap::new(),
};
{
let observed_slot = new.observed_slot.clone();
let observed_slot_latency = new.observed_slot_latency.clone();
metrics_registry.register(
"aggregate_observed_slot",
"Total number of observed slots",
observed_slot,
);
metrics_registry.register(
"aggregate_observed_slot_latency_seconds",
"Latency of observed slots in seconds",
observed_slot_latency,
);
}
new
}
/// Observe a slot and event. An event at a slot should be observed only once.
pub fn observe(&mut self, slot: Slot, event: Event) {
let order = if self
.newest_observed_slot
.get(&event)
.map_or(true, |&observed_slot| slot > observed_slot)
{
self.newest_observed_slot.insert(event.clone(), slot);
SlotOrder::New
} else {
SlotOrder::OutOfOrder
};
let labels = ObservedSlotLabels { order, event };
self.observed_slot.get_or_create(&labels).inc();
if let Some(start) = self.first_observed_time_of_slot.get(&slot) {
let latency = start.elapsed().as_secs_f64();
self.observed_slot_latency
.get_or_create(&labels)
.observe(latency);
} else {
self.first_observed_time_of_slot
.insert(slot, Instant::now());
self.observed_slot_latency
.get_or_create(&labels)
.observe(0.0);
}
// Clear out old slots
while self.first_observed_time_of_slot.len() > MAX_SLOT_OBSERVATIONS {
let oldest_slot = *self.first_observed_time_of_slot.keys().next().unwrap();
self.first_observed_time_of_slot.remove(&oldest_slot);
}
}
}

View File

@ -8,6 +8,7 @@ use {
anyhow::Result,
axum::{
extract::Extension,
middleware::from_fn_with_state,
routing::get,
Router,
},
@ -26,14 +27,16 @@ use {
utoipa_swagger_ui::SwaggerUi,
};
mod metrics_middleware;
mod rest;
mod types;
mod ws;
#[derive(Clone)]
pub struct ApiState {
pub state: Arc<State>,
pub ws: Arc<ws::WsState>,
pub state: Arc<State>,
pub ws: Arc<ws::WsState>,
pub metrics: Arc<metrics_middleware::Metrics>,
}
impl ApiState {
@ -43,8 +46,13 @@ impl ApiState {
requester_ip_header_name: String,
) -> Self {
Self {
metrics: Arc::new(metrics_middleware::Metrics::new(state.clone())),
ws: Arc::new(ws::WsState::new(
ws_whitelist,
requester_ip_header_name,
state.clone(),
)),
state,
ws: Arc::new(ws::WsState::new(ws_whitelist, requester_ip_header_name)),
}
}
}
@ -59,7 +67,7 @@ pub async fn run(
state: Arc<State>,
mut update_rx: Receiver<AggregationEvent>,
) -> Result<()> {
tracing::info!(endpoint = %opts.rpc.addr, "Starting RPC Server.");
tracing::info!(endpoint = %opts.rpc.listen_addr, "Starting RPC Server.");
#[derive(OpenApi)]
#[openapi(
@ -101,15 +109,20 @@ pub async fn run(
let app = app
.merge(SwaggerUi::new("/docs").url("/docs/openapi.json", ApiDoc::openapi()))
.route("/", get(rest::index))
.route("/live", get(rest::live))
.route("/ready", get(rest::ready))
.route("/ws", get(ws::ws_route_handler))
.route("/api/latest_price_feeds", get(rest::latest_price_feeds))
.route("/api/latest_vaas", get(rest::latest_vaas))
.route("/api/get_price_feed", get(rest::get_price_feed))
.route("/api/get_vaa", get(rest::get_vaa))
.route("/api/get_vaa_ccip", get(rest::get_vaa_ccip))
.route("/api/latest_price_feeds", get(rest::latest_price_feeds))
.route("/api/latest_vaas", get(rest::latest_vaas))
.route("/api/price_feed_ids", get(rest::price_feed_ids))
.route("/live", get(rest::live))
.route("/metrics", get(rest::metrics))
.route("/ready", get(rest::ready))
.route("/ws", get(ws::ws_route_handler))
.route_layer(from_fn_with_state(
state.clone(),
metrics_middleware::track_metrics,
))
.with_state(state.clone())
// Permissive CORS layer to allow all origins
.layer(CorsLayer::permissive())
@ -139,7 +152,7 @@ pub async fn run(
// Binds the axum's server to the configured address and port. This is a blocking call and will
// not return until the server is shutdown.
axum::Server::try_bind(&opts.rpc.addr)?
axum::Server::try_bind(&opts.rpc.listen_addr)?
.serve(app.into_make_service())
.with_graceful_shutdown(async {
// Ignore Ctrl+C errors, either way we need to shut down. The main Ctrl+C handler

View File

@ -0,0 +1,105 @@
use {
super::ApiState,
crate::state::State as AppState,
axum::{
extract::{
MatchedPath,
State,
},
http::Request,
middleware::Next,
response::IntoResponse,
},
prometheus_client::{
encoding::EncodeLabelSet,
metrics::{
counter::Counter,
family::Family,
histogram::Histogram,
},
},
std::sync::Arc,
tokio::time::Instant,
};
pub struct Metrics {
pub requests: Family<Labels, Counter>,
pub latencies: Family<Labels, Histogram>,
}
impl Metrics {
pub fn new(state: Arc<AppState>) -> Self {
let new = Self {
requests: Family::default(),
latencies: Family::new_with_constructor(|| {
Histogram::new(
[
0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0,
]
.into_iter(),
)
}),
};
{
let requests = new.requests.clone();
let latencies = new.latencies.clone();
tokio::spawn(async move {
let mut metrics_registry = state.metrics_registry.write().await;
metrics_registry.register("api_requests", "Total number of API requests", requests);
metrics_registry.register(
"api_request_latency_seconds",
"API request latency in seconds",
latencies,
);
});
}
new
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, EncodeLabelSet)]
pub struct Labels {
pub method: String,
pub path: String,
pub status: u16,
}
pub async fn track_metrics<B>(
State(api_state): State<ApiState>,
req: Request<B>,
next: Next<B>,
) -> impl IntoResponse {
let start = Instant::now();
let path = if let Some(matched_path) = req.extensions().get::<MatchedPath>() {
matched_path.as_str().to_owned()
} else {
req.uri().path().to_owned()
};
let method = req.method().clone();
let response = next.run(req).await;
let latency = start.elapsed().as_secs_f64();
let status = response.status().as_u16();
let labels = Labels {
method: method.to_string(),
path,
status,
};
api_state.metrics.requests.get_or_create(&labels).inc();
api_state
.metrics
.latencies
.get_or_create(&labels)
.observe(latency);
response
}

View File

@ -17,6 +17,7 @@ mod index;
mod latest_price_feeds;
mod latest_vaas;
mod live;
mod metrics;
mod price_feed_ids;
mod ready;
@ -28,6 +29,7 @@ pub use {
latest_price_feeds::*,
latest_vaas::*,
live::*,
metrics::*,
price_feed_ids::*,
ready::*,
};

View File

@ -0,0 +1,20 @@
//! Exposing prometheus metrics via HTTP in openmetrics format.
use {
axum::{
extract::State,
response::IntoResponse,
},
prometheus_client::encoding::text::encode,
};
pub async fn metrics(State(state): State<crate::api::ApiState>) -> impl IntoResponse {
let registry = state.state.metrics_registry.read().await;
let mut buffer = String::new();
// Should not fail if the metrics are valid and there is memory available
// to write to the buffer.
encode(&mut buffer, &registry).unwrap();
buffer
}

View File

@ -43,6 +43,16 @@ use {
},
ipnet::IpNet,
nonzero_ext::nonzero,
prometheus_client::{
encoding::{
EncodeLabelSet,
EncodeLabelValue,
},
metrics::{
counter::Counter,
family::Family,
},
},
pyth_sdk::PriceIdentifier,
serde::{
Deserialize,
@ -79,16 +89,65 @@ pub struct PriceFeedClientConfig {
allow_out_of_order: bool,
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)]
pub enum Interaction {
NewConnection,
CloseConnection,
ClientHeartbeat,
PriceUpdate,
ClientMessage,
RateLimit,
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)]
pub enum Status {
Success,
Error,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, EncodeLabelSet)]
pub struct Labels {
pub interaction: Interaction,
pub status: Status,
}
pub struct Metrics {
pub interactions: Family<Labels, Counter>,
}
impl Metrics {
pub fn new(state: Arc<State>) -> Self {
let new = Self {
interactions: Family::default(),
};
{
let interactions = new.interactions.clone();
tokio::spawn(async move {
state.metrics_registry.write().await.register(
"ws_interactions",
"Total number of websocket interactions",
interactions,
);
});
}
new
}
}
pub struct WsState {
pub subscriber_counter: AtomicUsize,
pub subscribers: DashMap<SubscriberId, mpsc::Sender<AggregationEvent>>,
pub bytes_limit_whitelist: Vec<IpNet>,
pub rate_limiter: DefaultKeyedRateLimiter<IpAddr>,
pub requester_ip_header_name: String,
pub metrics: Metrics,
}
impl WsState {
pub fn new(whitelist: Vec<IpNet>, requester_ip_header_name: String) -> Self {
pub fn new(whitelist: Vec<IpNet>, requester_ip_header_name: String, state: Arc<State>) -> Self {
Self {
subscriber_counter: AtomicUsize::new(0),
subscribers: DashMap::new(),
@ -97,6 +156,7 @@ impl WsState {
))),
bytes_limit_whitelist: whitelist,
requester_ip_header_name,
metrics: Metrics::new(state.clone()),
}
}
}
@ -161,7 +221,16 @@ async fn websocket_handler(
) {
let ws_state = state.ws.clone();
let id = ws_state.subscriber_counter.fetch_add(1, Ordering::SeqCst);
tracing::debug!(id, ?subscriber_ip, "New Websocket Connection");
ws_state
.metrics
.interactions
.get_or_create(&Labels {
interaction: Interaction::NewConnection,
status: Status::Success,
})
.inc();
let (notify_sender, notify_receiver) = mpsc::channel(NOTIFICATIONS_CHAN_LEN);
let (sender, receiver) = stream.split();
@ -247,6 +316,15 @@ impl Subscriber {
},
_ = self.ping_interval.tick() => {
if !self.responded_to_ping {
self.ws_state
.metrics
.interactions
.get_or_create(&Labels {
interaction: Interaction::ClientHeartbeat,
status: Status::Error,
})
.inc();
return Err(anyhow!("Subscriber did not respond to ping. Closing connection."));
}
self.responded_to_ping = false;
@ -318,6 +396,16 @@ impl Subscriber {
ip = %ip_addr,
"Rate limit exceeded. Closing connection.",
);
self.ws_state
.metrics
.interactions
.get_or_create(&Labels {
interaction: Interaction::RateLimit,
status: Status::Error,
})
.inc();
self.sender
.send(
serde_json::to_string(&ServerResponseMessage::Err {
@ -335,6 +423,15 @@ impl Subscriber {
// `sender.feed` buffers a message to the client but does not flush it, so we can send
// multiple messages and flush them all at once.
self.sender.feed(message.into()).await?;
self.ws_state
.metrics
.interactions
.get_or_create(&Labels {
interaction: Interaction::PriceUpdate,
status: Status::Success,
})
.inc();
}
self.sender.flush().await?;
@ -350,6 +447,14 @@ impl Subscriber {
// to subscribers list will be closed and it will eventually get
// removed.
tracing::trace!(id = self.id, "Subscriber Closed Connection.");
self.ws_state
.metrics
.interactions
.get_or_create(&Labels {
interaction: Interaction::CloseConnection,
status: Status::Success,
})
.inc();
// Send the close message to gracefully shut down the connection
// Otherwise the client might get an abnormal Websocket closure
@ -365,6 +470,16 @@ impl Subscriber {
return Ok(());
}
Message::Pong(_) => {
// This metric can be used to monitor the number of active connections
self.ws_state
.metrics
.interactions
.get_or_create(&Labels {
interaction: Interaction::ClientHeartbeat,
status: Status::Success,
})
.inc();
self.responded_to_ping = true;
return Ok(());
}
@ -372,6 +487,14 @@ impl Subscriber {
match maybe_client_message {
Err(e) => {
self.ws_state
.metrics
.interactions
.get_or_create(&Labels {
interaction: Interaction::ClientMessage,
status: Status::Error,
})
.inc();
self.sender
.send(
serde_json::to_string(&ServerMessage::Response(
@ -437,6 +560,15 @@ impl Subscriber {
}
}
self.ws_state
.metrics
.interactions
.get_or_create(&Labels {
interaction: Interaction::ClientMessage,
status: Status::Success,
})
.inc();
self.sender
.send(
serde_json::to_string(&ServerMessage::Response(ServerResponseMessage::Success))?

View File

@ -6,11 +6,11 @@ use clap::Args;
pub struct Options {
/// Address of a PythNet compatible websocket RPC endpoint.
#[arg(long = "pythnet-ws-addr")]
#[arg(env = "PYTHNET_WS_ENDPOINT")]
pub ws_endpoint: String,
#[arg(env = "PYTHNET_WS_ADDR")]
pub ws_addr: String,
/// Addres of a PythNet compatible HTP RPC endpoint.
#[arg(long = "pythnet-http-addr")]
#[arg(env = "PYTHNET_HTTP_ENDPOINT")]
pub http_endpoint: String,
#[arg(env = "PYTHNET_HTTP_ADDR")]
pub http_addr: String,
}

View File

@ -4,7 +4,7 @@ use {
std::net::SocketAddr,
};
const DEFAULT_RPC_ADDR: &str = "127.0.0.1:33999";
const DEFAULT_RPC_LISTEN_ADDR: &str = "127.0.0.1:33999";
const DEFAULT_RPC_REQUESTER_IP_HEADER_NAME: &str = "X-Forwarded-For";
#[derive(Args, Clone, Debug)]
@ -13,9 +13,9 @@ const DEFAULT_RPC_REQUESTER_IP_HEADER_NAME: &str = "X-Forwarded-For";
pub struct Options {
/// Address and port the RPC server will bind to.
#[arg(long = "rpc-listen-addr")]
#[arg(default_value = DEFAULT_RPC_ADDR)]
#[arg(env = "RPC_ADDR")]
pub addr: SocketAddr,
#[arg(default_value = DEFAULT_RPC_LISTEN_ADDR)]
#[arg(env = "RPC_LISTEN_ADDR")]
pub listen_addr: SocketAddr,
/// Whitelisted websocket ip network addresses (separated by comma).
#[arg(long = "rpc-ws-whitelist")]

View File

@ -255,21 +255,18 @@ async fn fetch_existing_guardian_sets(
#[tracing::instrument(skip(opts, state))]
pub async fn spawn(opts: RunOptions, state: Arc<State>) -> Result<()> {
tracing::info!(
endpoint = opts.pythnet.ws_endpoint,
"Started Pythnet Listener."
);
tracing::info!(endpoint = opts.pythnet.ws_addr, "Started Pythnet Listener.");
fetch_existing_guardian_sets(
state.clone(),
opts.pythnet.http_endpoint.clone(),
opts.pythnet.http_addr.clone(),
opts.wormhole.contract_addr,
)
.await?;
let task_listener = {
let store = state.clone();
let pythnet_ws_endpoint = opts.pythnet.ws_endpoint.clone();
let pythnet_ws_endpoint = opts.pythnet.ws_addr.clone();
tokio::spawn(async move {
while !crate::SHOULD_EXIT.load(Ordering::Acquire) {
let current_time = Instant::now();
@ -289,7 +286,7 @@ pub async fn spawn(opts: RunOptions, state: Arc<State>) -> Result<()> {
let task_guadian_watcher = {
let store = state.clone();
let pythnet_http_endpoint = opts.pythnet.http_endpoint.clone();
let pythnet_http_endpoint = opts.pythnet.http_addr.clone();
tokio::spawn(async move {
while !crate::SHOULD_EXIT.load(Ordering::Acquire) {
// Poll for new guardian sets every 60 seconds. We use a short wait time so we can

View File

@ -9,6 +9,7 @@ use {
},
network::wormhole::GuardianSet,
},
prometheus_client::registry::Registry,
reqwest::Url,
std::{
collections::{
@ -46,6 +47,9 @@ pub struct State {
/// Benchmarks endpoint
pub benchmarks_endpoint: Option<Url>,
/// Metrics registry
pub metrics_registry: RwLock<Registry>,
}
impl State {
@ -54,13 +58,15 @@ impl State {
cache_size: u64,
benchmarks_endpoint: Option<Url>,
) -> Arc<Self> {
let mut metrics_registry = Registry::default();
Arc::new(Self {
cache: Cache::new(cache_size),
observed_vaa_seqs: RwLock::new(Default::default()),
guardian_set: RwLock::new(Default::default()),
api_update_tx: update_tx,
aggregate_state: RwLock::new(AggregateState::new()),
aggregate_state: RwLock::new(AggregateState::new(&mut metrics_registry)),
benchmarks_endpoint,
metrics_registry: RwLock::new(metrics_registry),
})
}
}