[hermes] Add health probe (#899)

* [hermes] Add health probe

* Address feedbacks

* Address feedbacks
This commit is contained in:
Ali Behjati 2023-06-19 19:25:29 +02:00 committed by GitHub
parent b596090bd8
commit 26f3fc3653
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 43 additions and 12 deletions

View File

@ -46,6 +46,7 @@ pub async fn run(store: Arc<Store>, mut update_rx: Receiver<()>, rpc_addr: Strin
let app = app
.route("/", get(rest::index))
.route("/live", get(rest::live))
.route("/ready", get(rest::ready))
.route("/ws", get(ws::ws_route_handler))
.route("/api/latest_price_feeds", get(rest::latest_price_feeds))
.route("/api/latest_vaas", get(rest::latest_vaas))

View File

@ -250,10 +250,15 @@ pub async fn get_vaa_ccip(
}))
}
// This function implements the `/live` endpoint. It returns a `200` status code. This endpoint is
// used by the Kubernetes liveness probe.
pub async fn live() -> Result<impl IntoResponse, std::convert::Infallible> {
Ok(())
pub async fn live() -> Response {
(StatusCode::OK, "OK").into_response()
}
pub async fn ready(State(state): State<super::State>) -> Response {
match state.store.is_ready().await {
true => (StatusCode::OK, "OK").into_response(),
false => (StatusCode::SERVICE_UNAVAILABLE, "Service Unavailable").into_response(),
}
}
// This is the index page for the REST service. It will list all the available endpoints.
@ -261,6 +266,7 @@ pub async fn live() -> Result<impl IntoResponse, std::convert::Infallible> {
pub async fn index() -> impl IntoResponse {
Json([
"/live",
"/ready",
"/api/price_feed_ids",
"/api/latest_price_feeds?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&..(&verbose=true)(&binary=true)",
"/api/latest_vaas?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&...",

View File

@ -52,14 +52,19 @@ use {
},
sync::Arc,
time::{
Duration,
SystemTime,
UNIX_EPOCH,
},
},
tokio::sync::{
mpsc::Sender,
RwLock,
tokio::{
sync::{
mpsc::Sender,
RwLock,
},
time::{
Duration,
Instant,
},
},
wormhole_sdk::{
Address,
@ -74,10 +79,11 @@ pub mod types;
pub mod wormhole;
pub struct Store {
pub storage: StorageInstance,
pub observed_vaa_seqs: Cache<u64, bool>,
pub guardian_set: RwLock<BTreeMap<u32, GuardianSet>>,
pub update_tx: Sender<()>,
pub storage: StorageInstance,
pub observed_vaa_seqs: Cache<u64, bool>,
pub guardian_set: RwLock<BTreeMap<u32, GuardianSet>>,
pub update_tx: Sender<()>,
pub last_completed_update_at: RwLock<Option<Instant>>,
}
impl Store {
@ -90,6 +96,7 @@ impl Store {
.build(),
guardian_set: RwLock::new(Default::default()),
update_tx,
last_completed_update_at: RwLock::new(None),
})
}
@ -170,6 +177,11 @@ impl Store {
self.update_tx.send(()).await?;
self.last_completed_update_at
.write()
.await
.replace(Instant::now());
Ok(())
}
@ -258,4 +270,16 @@ impl Store {
.map(|key| PriceIdentifier::new(key.id))
.collect()
}
pub async fn is_ready(&self) -> bool {
const STALENESS_THRESHOLD: Duration = Duration::from_secs(30);
let last_completed_update_at = self.last_completed_update_at.read().await;
match last_completed_update_at.as_ref() {
Some(last_completed_update_at) => {
last_completed_update_at.elapsed() < STALENESS_THRESHOLD
}
None => false,
}
}
}