refactor(hermes): pass run args as struct, add docstrings

In some cases arguments are passed and renamed (see `api_addr -> rpc_addr`) or
are unnecesarily converted (see `api_addr.to_string()` -> `api_addr.parse()`.
In the future, we are likely to add many more arguments to Hermes as well, so
this commit moves them into a separate struct which is forwarded throughout the
application instead.

The struct's are cloned, but only happens during launch of a hermes service
component so the cost doesn't matter.
This commit is contained in:
Reisen 2023-08-09 09:27:42 +00:00 committed by Reisen
parent 1a00598334
commit b74df4ff17
5 changed files with 132 additions and 127 deletions

View File

@ -1,6 +1,9 @@
use {
self::ws::notify_updates,
crate::store::Store,
crate::{
config::RunOptions,
store::Store,
},
anyhow::Result,
axum::{
extract::Extension,
@ -41,23 +44,34 @@ impl State {
///
/// Currently this is based on Axum due to the simplicity and strong ecosystem support for the
/// packages they are based on (tokio & hyper).
pub async fn run(store: Arc<Store>, mut update_rx: Receiver<()>, rpc_addr: String) -> Result<()> {
pub async fn run(opts: RunOptions, store: Arc<Store>, mut update_rx: Receiver<()>) -> Result<()> {
log::info!("Starting RPC server on {}", opts.api_addr);
#[derive(OpenApi)]
#[openapi(
paths(
rest::latest_price_feeds,
rest::latest_vaas,
rest::get_price_feed,
rest::get_vaa,
rest::get_vaa_ccip,
rest::price_feed_ids,
),
components(
schemas(types::RpcPriceFeedMetadata, types::RpcPriceFeed, types::RpcPrice, types::RpcPriceIdentifier, types::PriceIdInput, rest::GetVaaResponse, rest::GetVaaCcipResponse, rest::GetVaaCcipInput)
),
tags(
(name = "hermes", description = "Pyth Real-Time Pricing API")
)
paths(
rest::get_price_feed,
rest::get_vaa,
rest::get_vaa_ccip,
rest::latest_price_feeds,
rest::latest_vaas,
rest::price_feed_ids,
),
components(
schemas(
rest::GetVaaCcipInput,
rest::GetVaaCcipResponse,
rest::GetVaaResponse,
types::PriceIdInput,
types::RpcPrice,
types::RpcPriceFeed,
types::RpcPriceFeedMetadata,
types::RpcPriceIdentifier,
)
),
tags(
(name = "hermes", description = "Pyth Real-Time Pricing API")
)
)]
struct ApiDoc;
@ -103,7 +117,7 @@ pub async fn run(store: Arc<Store>, mut update_rx: Receiver<()>, rpc_addr: Strin
// Binds the axum's server to the configured address and port. This is a blocking call and will
// not return until the server is shutdown.
axum::Server::try_bind(&rpc_addr.parse()?)?
axum::Server::try_bind(&opts.api_addr)?
.serve(app.into_make_service())
.with_graceful_shutdown(async {
signal::ctrl_c()

View File

@ -5,52 +5,60 @@ use {
structopt::StructOpt,
};
/// StructOpt definitions that provides the following arguments and commands:
///
/// Some of these arguments are not currently used, but are included for future use to guide the
/// structure of the application.
const DEFAULT_NETWORK_ID: &str = "/wormhole/mainnet/2";
const DEFAULT_WORMHOLE_BOOTSTRAP_ADDRS: &str = "/dns4/wormhole-mainnet-v2-bootstrap.certus.one/udp/8999/quic/p2p/12D3KooWQp644DK27fd3d4Km3jr7gHiuJJ5ZGmy8hH4py7fP4FP7";
const DEFAULT_WORMHOLE_LISTEN_ADDRS: &str = "/ip4/0.0.0.0/udp/30910/quic,/ip6/::/udp/30910/quic";
const DEFAULT_API_ADDR: &str = "127.0.0.1:33999";
/// `Options` is a structup definition to provide clean command-line args for Hermes.
#[derive(StructOpt, Debug)]
#[structopt(name = "hermes", about = "Hermes")]
pub enum Options {
Run {
#[structopt(long, env = "PYTHNET_WS_ENDPOINT")]
pythnet_ws_endpoint: String,
#[structopt(long, env = "PYTHNET_HTTP_ENDPOINT")]
pythnet_http_endpoint: String,
/// Network ID for Wormhole
#[structopt(
long,
default_value = "/wormhole/mainnet/2",
env = "WORMHOLE_NETWORK_ID"
)]
wh_network_id: String,
/// Multiaddresses for Wormhole bootstrap peers (separated by comma).
#[structopt(
long,
use_delimiter = true,
default_value = "/dns4/wormhole-mainnet-v2-bootstrap.certus.one/udp/8999/quic/p2p/12D3KooWQp644DK27fd3d4Km3jr7gHiuJJ5ZGmy8hH4py7fP4FP7",
env = "WORMHOLE_BOOTSTRAP_ADDRS"
)]
wh_bootstrap_addrs: Vec<Multiaddr>,
/// Multiaddresses to bind Wormhole P2P to (separated by comma)
#[structopt(
long,
use_delimiter = true,
default_value = "/ip4/0.0.0.0/udp/30910/quic,/ip6/::/udp/30910/quic",
env = "WORMHOLE_LISTEN_ADDRS"
)]
wh_listen_addrs: Vec<Multiaddr>,
/// The address to bind the API server to.
#[structopt(long, default_value = "127.0.0.1:33999")]
api_addr: SocketAddr,
/// Address of the Wormhole contract on the target PythNet cluster.
#[structopt(long, default_value = "H3fxXJ86ADW2PNuDDmZJg6mzTtPxkYCpNuQUTgmJ7AjU")]
wh_contract_addr: Pubkey,
},
/// Run the hermes service.
Run(RunOptions),
}
#[derive(Clone, Debug, StructOpt)]
pub struct RunOptions {
/// The address to bind the API server to.
#[structopt(long)]
#[structopt(default_value = DEFAULT_API_ADDR)]
#[structopt(env = "API_ADDR")]
pub api_addr: SocketAddr,
/// Address of a PythNet compatible websocket RPC endpoint.
#[structopt(long)]
#[structopt(env = "PYTHNET_WS_ENDPOINT")]
pub pythnet_ws_endpoint: String,
/// Addres of a PythNet compatible HTP RPC endpoint.
#[structopt(long)]
#[structopt(env = "PYTHNET_HTTP_ENDPOINT")]
pub pythnet_http_endpoint: String,
/// Multiaddresses for Wormhole bootstrap peers (separated by comma).
#[structopt(long)]
#[structopt(use_delimiter = true)]
#[structopt(default_value = DEFAULT_WORMHOLE_BOOTSTRAP_ADDRS)]
#[structopt(env = "WORMHOLE_BOOTSTRAP_ADDRS")]
pub wh_bootstrap_addrs: Vec<Multiaddr>,
/// Address of the Wormhole contract on the target PythNet cluster.
#[structopt(long)]
#[structopt(default_value = "H3fxXJ86ADW2PNuDDmZJg6mzTtPxkYCpNuQUTgmJ7AjU")]
#[structopt(env = "WORMHOLE_CONTRACT_ADDR")]
pub wh_contract_addr: Pubkey,
/// Multiaddresses to bind Wormhole P2P to (separated by comma)
#[structopt(long)]
#[structopt(use_delimiter = true)]
#[structopt(default_value = DEFAULT_WORMHOLE_LISTEN_ADDRS)]
#[structopt(env = "WORMHOLE_LISTEN_ADDRS")]
pub wh_listen_addrs: Vec<Multiaddr>,
/// Network ID for Wormhole
#[structopt(long)]
#[structopt(default_value = DEFAULT_NETWORK_ID)]
#[structopt(env = "WORMHOLE_NETWORK_ID")]
pub wh_network_id: String,
}

View File

@ -22,44 +22,18 @@ async fn init() -> Result<()> {
// Parse the command line arguments with StructOpt, will exit automatically on `--help` or
// with invalid arguments.
match config::Options::from_args() {
config::Options::Run {
pythnet_ws_endpoint,
pythnet_http_endpoint,
wh_network_id,
wh_bootstrap_addrs,
wh_listen_addrs,
wh_contract_addr,
api_addr,
} => {
// A channel to emit state updates to api
config::Options::Run(opts) => {
log::info!("Starting hermes service...");
// The update channel is used to send store update notifications to the public API.
let (update_tx, update_rx) = tokio::sync::mpsc::channel(1000);
log::info!("Running Hermes...");
// Initialize a cache store with a 1000 element circular buffer.
let store = Store::new(update_tx, 1000);
// Spawn the P2P layer.
log::info!("Starting P2P server on {:?}", wh_listen_addrs);
network::p2p::spawn(
store.clone(),
wh_network_id.to_string(),
wh_bootstrap_addrs,
wh_listen_addrs,
)
.await?;
// Spawn the Pythnet listener
log::info!("Starting Pythnet listener using {}", pythnet_ws_endpoint);
network::pythnet::spawn(
store.clone(),
pythnet_ws_endpoint,
pythnet_http_endpoint,
wh_contract_addr,
)
.await?;
// Run the RPC server and wait for it to shutdown gracefully.
log::info!("Starting RPC server on {}", api_addr);
api::run(store.clone(), update_rx, api_addr.to_string()).await?;
network::p2p::spawn(opts.clone(), store.clone()).await?;
network::pythnet::spawn(opts.clone(), store.clone()).await?;
api::run(opts.clone(), store.clone(), update_rx).await?;
}
}

View File

@ -10,9 +10,12 @@
//! their infrastructure.
use {
crate::store::{
types::Update,
Store,
crate::{
config::RunOptions,
store::{
types::Update,
Store,
},
},
anyhow::Result,
libp2p::Multiaddr,
@ -122,13 +125,17 @@ pub fn bootstrap(
}
// Spawn's the P2P layer as a separate thread via Go.
pub async fn spawn(
store: Arc<Store>,
network_id: String,
wh_bootstrap_addrs: Vec<Multiaddr>,
wh_listen_addrs: Vec<Multiaddr>,
) -> Result<()> {
std::thread::spawn(|| bootstrap(network_id, wh_bootstrap_addrs, wh_listen_addrs).unwrap());
pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
log::info!("Starting P2P server on {:?}", opts.wh_listen_addrs);
std::thread::spawn(|| {
bootstrap(
opts.wh_network_id,
opts.wh_bootstrap_addrs,
opts.wh_listen_addrs,
)
.unwrap()
});
tokio::spawn(async move {
// Listen in the background for new VAA's from the p2p layer

View File

@ -3,17 +3,20 @@
//! storage.
use {
crate::store::{
types::{
AccumulatorMessages,
Update,
crate::{
config::RunOptions,
store::{
types::{
AccumulatorMessages,
Update,
},
wormhole::{
BridgeData,
GuardianSet,
GuardianSetData,
},
Store,
},
wormhole::{
BridgeData,
GuardianSet,
GuardianSetData,
},
Store,
},
anyhow::{
anyhow,
@ -248,23 +251,22 @@ async fn fetch_existing_guardian_sets(
Ok(())
}
pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
log::info!(
"Starting Pythnet listener using {}",
opts.pythnet_ws_endpoint
);
pub async fn spawn(
store: Arc<Store>,
pythnet_ws_endpoint: String,
pythnet_http_endpoint: String,
wormhole_contract_addr: Pubkey,
) -> Result<()> {
fetch_existing_guardian_sets(
store.clone(),
pythnet_http_endpoint.clone(),
wormhole_contract_addr,
opts.pythnet_http_endpoint.clone(),
opts.wh_contract_addr,
)
.await?;
{
let store = store.clone();
let pythnet_ws_endpoint = pythnet_ws_endpoint.clone();
let pythnet_ws_endpoint = opts.pythnet_ws_endpoint.clone();
tokio::spawn(async move {
loop {
let current_time = Instant::now();
@ -285,7 +287,7 @@ pub async fn spawn(
{
let store = store.clone();
let pythnet_http_endpoint = pythnet_http_endpoint.clone();
let pythnet_http_endpoint = opts.pythnet_http_endpoint.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
@ -293,7 +295,7 @@ pub async fn spawn(
match fetch_existing_guardian_sets(
store.clone(),
pythnet_http_endpoint.clone(),
wormhole_contract_addr,
opts.wh_contract_addr,
)
.await
{