feat(hermes): add tracing

This commit is contained in:
Reisen 2023-09-02 11:00:17 +00:00 committed by Reisen
parent 7cbdcb562d
commit 860178f057
11 changed files with 325 additions and 124 deletions

78
hermes/Cargo.lock generated
View File

@ -835,9 +835,9 @@ dependencies = [
[[package]] [[package]]
name = "chrono" name = "chrono"
version = "0.4.26" version = "0.4.28"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec837a71355b28f6556dbd569b37b3f363091c0bd4b2e735674521b4c5fd9bc5" checksum = "95ed24df0632f708f5f6d8082675bef2596f7084dee3dd55f632290bf35bfe0f"
dependencies = [ dependencies = [
"android-tzdata", "android-tzdata",
"iana-time-zone", "iana-time-zone",
@ -846,7 +846,7 @@ dependencies = [
"serde", "serde",
"time 0.1.45", "time 0.1.45",
"wasm-bindgen", "wasm-bindgen",
"winapi", "windows-targets 0.48.1",
] ]
[[package]] [[package]]
@ -1764,7 +1764,7 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
[[package]] [[package]]
name = "hermes" name = "hermes"
version = "0.1.13" version = "0.1.14"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"axum", "axum",
@ -1772,6 +1772,7 @@ dependencies = [
"base64 0.21.2", "base64 0.21.2",
"borsh 0.10.3", "borsh 0.10.3",
"byteorder", "byteorder",
"chrono",
"dashmap", "dashmap",
"derive_more", "derive_more",
"env_logger 0.10.0", "env_logger 0.10.0",
@ -1801,6 +1802,8 @@ dependencies = [
"strum", "strum",
"tokio", "tokio",
"tower-http", "tower-http",
"tracing",
"tracing-subscriber",
"utoipa", "utoipa",
"utoipa-swagger-ui", "utoipa-swagger-ui",
"wormhole-sdk", "wormhole-sdk",
@ -3226,6 +3229,16 @@ dependencies = [
"minimal-lexical", "minimal-lexical",
] ]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]] [[package]]
name = "num" name = "num"
version = "0.2.1" version = "0.2.1"
@ -3494,6 +3507,12 @@ dependencies = [
"vcpkg", "vcpkg",
] ]
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]] [[package]]
name = "owning_ref" name = "owning_ref"
version = "0.4.1" version = "0.4.1"
@ -4798,6 +4817,15 @@ dependencies = [
"keccak", "keccak",
] ]
[[package]]
name = "sharded-slab"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31"
dependencies = [
"lazy_static",
]
[[package]] [[package]]
name = "shell-words" name = "shell-words"
version = "1.1.0" version = "1.1.0"
@ -5737,6 +5765,16 @@ dependencies = [
"syn 2.0.26", "syn 2.0.26",
] ]
[[package]]
name = "thread_local"
version = "1.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152"
dependencies = [
"cfg-if",
"once_cell",
]
[[package]] [[package]]
name = "time" name = "time"
version = "0.1.45" version = "0.1.45"
@ -6027,6 +6065,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
dependencies = [ dependencies = [
"once_cell", "once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922"
dependencies = [
"lazy_static",
"log",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
dependencies = [
"nu-ansi-term",
"sharded-slab",
"smallvec",
"thread_local",
"tracing-core",
"tracing-log",
] ]
[[package]] [[package]]
@ -6293,6 +6357,12 @@ dependencies = [
"zip", "zip",
] ]
[[package]]
name = "valuable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]] [[package]]
name = "value-bag" name = "value-bag"
version = "1.4.1" version = "1.4.1"

View File

@ -1,43 +1,46 @@
[package] [package]
name = "hermes" name = "hermes"
version = "0.1.13" version = "0.1.14"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
anyhow = { version = "1.0.69" } anyhow = { version = "1.0.69" }
axum = { version = "0.6.20", features = ["json", "ws", "macros"] } axum = { version = "0.6.20", features = ["json", "ws", "macros"] }
axum-macros = { version = "0.3.8" } axum-macros = { version = "0.3.8" }
base64 = { version = "0.21.0" } base64 = { version = "0.21.0" }
borsh = { version = "0.10.3" } borsh = { version = "0.10.3" }
byteorder = { version = "1.4.3" } byteorder = { version = "1.4.3" }
dashmap = { version = "5.4.0" } chrono = { version = "0.4.28" }
derive_more = { version = "0.99.17" } dashmap = { version = "5.4.0" }
env_logger = { version = "0.10.0" } derive_more = { version = "0.99.17" }
futures = { version = "0.3.28" } env_logger = { version = "0.10.0" }
hex = { version = "0.4.3" } futures = { version = "0.3.28" }
humantime = { version = "2.1.0" } hex = { version = "0.4.3" }
lazy_static = { version = "1.4.0" } humantime = { version = "2.1.0" }
libc = { version = "0.2.140" } lazy_static = { version = "1.4.0" }
log = { version = "0.4.17" } libc = { version = "0.2.140" }
mock_instant = { version = "0.3.1", features = ["sync"] } log = { version = "0.4.17" }
prometheus-client = { version = "0.21.1" } mock_instant = { version = "0.3.1", features = ["sync"] }
pyth-sdk = { version = "0.8.0" } prometheus-client = { version = "0.21.1" }
pythnet-sdk = { path = "../pythnet/pythnet_sdk/", version = "2.0.0", features = ["strum"] } pyth-sdk = { version = "0.8.0" }
rand = { version = "0.8.5" } pythnet-sdk = { path = "../pythnet/pythnet_sdk/", version = "2.0.0", features = ["strum"] }
reqwest = { version = "0.11.14", features = ["blocking", "json"] } rand = { version = "0.8.5" }
secp256k1 = { version = "0.27.0", features = ["rand", "recovery", "serde"] } reqwest = { version = "0.11.14", features = ["blocking", "json"] }
serde = { version = "1.0.152", features = ["derive"] } secp256k1 = { version = "0.27.0", features = ["rand", "recovery", "serde"] }
serde_json = { version = "1.0.93" } serde = { version = "1.0.152", features = ["derive"] }
serde_qs = { version = "0.12.0", features = ["axum"] } serde_json = { version = "1.0.93" }
serde_wormhole = { git = "https://github.com/wormhole-foundation/wormhole", tag = "v2.17.1" } serde_qs = { version = "0.12.0", features = ["axum"] }
sha3 = { version = "0.10.4" } serde_wormhole = { git = "https://github.com/wormhole-foundation/wormhole", tag = "v2.17.1" }
structopt = { version = "0.3.26" } sha3 = { version = "0.10.4" }
strum = { version = "0.24.1", features = ["derive"] } structopt = { version = "0.3.26" }
tokio = { version = "1.26.0", features = ["full"] } strum = { version = "0.24.1", features = ["derive"] }
tower-http = { version = "0.4.0", features = ["cors"] } tokio = { version = "1.26.0", features = ["full"] }
utoipa = { version = "3.4.0", features = ["axum_extras"] } tower-http = { version = "0.4.0", features = ["cors"] }
utoipa-swagger-ui = { version = "3.1.4", features = ["axum"] } tracing = { version = "0.1.37", features = ["log"] }
wormhole-sdk = { git = "https://github.com/wormhole-foundation/wormhole", tag = "v2.17.1" } tracing-subscriber = { version = "0.3.17" }
utoipa = { version = "3.4.0", features = ["axum_extras"] }
utoipa-swagger-ui = { version = "3.1.4", features = ["axum"] }
wormhole-sdk = { git = "https://github.com/wormhole-foundation/wormhole", tag = "v2.17.1" }
# Setup LibP2P. Unfortunately the dependencies required by libp2p are shared # Setup LibP2P. Unfortunately the dependencies required by libp2p are shared
# with the dependencies required by many Solana components. This means that we # with the dependencies required by many Solana components. This means that we

View File

@ -47,8 +47,9 @@ impl State {
/// ///
/// Currently this is based on Axum due to the simplicity and strong ecosystem support for the /// Currently this is based on Axum due to the simplicity and strong ecosystem support for the
/// packages they are based on (tokio & hyper). /// packages they are based on (tokio & hyper).
#[tracing::instrument(skip(opts, store, update_rx))]
pub async fn run(opts: RunOptions, store: Arc<Store>, mut update_rx: Receiver<()>) -> Result<()> { pub async fn run(opts: RunOptions, store: Arc<Store>, mut update_rx: Receiver<()>) -> Result<()> {
log::info!("Starting RPC server on {}", opts.api_addr); tracing::info!(endpoint = %opts.api_addr, "Starting RPC Server.");
#[derive(OpenApi)] #[derive(OpenApi)]
#[openapi( #[openapi(
@ -109,7 +110,7 @@ pub async fn run(opts: RunOptions, store: Arc<Store>, mut update_rx: Receiver<()
// Causes a full application shutdown if an error occurs, we can't recover from this so // Causes a full application shutdown if an error occurs, we can't recover from this so
// we just quit. // we just quit.
if update_rx.recv().await.is_none() { if update_rx.recv().await.is_none() {
log::error!("Failed to receive update from store."); tracing::error!("Failed to receive update from store.");
crate::SHOULD_EXIT.store(true, Ordering::Release); crate::SHOULD_EXIT.store(true, Ordering::Release);
break; break;
} }
@ -117,7 +118,7 @@ pub async fn run(opts: RunOptions, store: Arc<Store>, mut update_rx: Receiver<()
notify_updates(state.ws.clone()).await; notify_updates(state.ws.clone()).await;
} }
log::info!("Shutting down websocket updates...") tracing::info!("Shutting down websocket updates...")
}); });
// Binds the axum's server to the configured address and port. This is a blocking call and will // Binds the axum's server to the configured address and port. This is a blocking call and will

View File

@ -61,10 +61,11 @@ pub async fn ws_route_handler(
ws.on_upgrade(|socket| websocket_handler(socket, state)) ws.on_upgrade(|socket| websocket_handler(socket, state))
} }
#[tracing::instrument(skip(stream, state))]
async fn websocket_handler(stream: WebSocket, state: super::State) { async fn websocket_handler(stream: WebSocket, state: super::State) {
let ws_state = state.ws.clone(); let ws_state = state.ws.clone();
let id = ws_state.subscriber_counter.fetch_add(1, Ordering::SeqCst); let id = ws_state.subscriber_counter.fetch_add(1, Ordering::SeqCst);
log::debug!("New websocket connection, assigning id: {}", id); tracing::debug!(id, "New Websocket Connection");
let (notify_sender, notify_receiver) = mpsc::channel::<()>(NOTIFICATIONS_CHAN_LEN); let (notify_sender, notify_receiver) = mpsc::channel::<()>(NOTIFICATIONS_CHAN_LEN);
let (sender, receiver) = stream.split(); let (sender, receiver) = stream.split();
@ -112,10 +113,11 @@ impl Subscriber {
} }
} }
#[tracing::instrument(skip(self))]
pub async fn run(&mut self) { pub async fn run(&mut self) {
while !self.closed { while !self.closed {
if let Err(e) = self.handle_next().await { if let Err(e) = self.handle_next().await {
log::debug!("Subscriber {}: Error handling next message: {}", self.id, e); tracing::debug!(subscriber = self.id, error = ?e, "Error Handling Subscriber Message.");
break; break;
} }
} }
@ -179,6 +181,7 @@ impl Subscriber {
Ok(()) Ok(())
} }
#[tracing::instrument(skip(self, message))]
async fn handle_client_message(&mut self, message: Message) -> Result<()> { async fn handle_client_message(&mut self, message: Message) -> Result<()> {
let maybe_client_message = match message { let maybe_client_message = match message {
Message::Close(_) => { Message::Close(_) => {
@ -186,13 +189,12 @@ impl Subscriber {
// list, instead when the Subscriber struct is dropped the channel // list, instead when the Subscriber struct is dropped the channel
// to subscribers list will be closed and it will eventually get // to subscribers list will be closed and it will eventually get
// removed. // removed.
log::trace!("Subscriber {} closed connection", self.id); tracing::trace!(id = self.id, "Subscriber Closed Connection.");
// Send the close message to gracefully shut down the connection // Send the close message to gracefully shut down the connection
// Otherwise the client might get an abnormal Websocket closure // Otherwise the client might get an abnormal Websocket closure
// error. // error.
self.sender.close().await?; self.sender.close().await?;
self.closed = true; self.closed = true;
return Ok(()); return Ok(());
} }
@ -222,6 +224,7 @@ impl Subscriber {
.await?; .await?;
return Ok(()); return Ok(());
} }
Ok(ClientMessage::Subscribe { Ok(ClientMessage::Subscribe {
ids, ids,
verbose, verbose,

View File

@ -27,14 +27,15 @@ mod store;
pub(crate) static SHOULD_EXIT: AtomicBool = AtomicBool::new(false); pub(crate) static SHOULD_EXIT: AtomicBool = AtomicBool::new(false);
/// Initialize the Application. This can be invoked either by real main, or by the Geyser plugin. /// Initialize the Application. This can be invoked either by real main, or by the Geyser plugin.
#[tracing::instrument]
async fn init() -> Result<()> { async fn init() -> Result<()> {
log::info!("Initializing Hermes..."); tracing::info!("Initializing Hermes...");
// Parse the command line arguments with StructOpt, will exit automatically on `--help` or // Parse the command line arguments with StructOpt, will exit automatically on `--help` or
// with invalid arguments. // with invalid arguments.
match config::Options::from_args() { match config::Options::from_args() {
config::Options::Run(opts) => { config::Options::Run(opts) => {
log::info!("Starting hermes service..."); tracing::info!("Starting hermes service...");
// The update channel is used to send store update notifications to the public API. // The update channel is used to send store update notifications to the public API.
let (update_tx, update_rx) = tokio::sync::mpsc::channel(1000); let (update_tx, update_rx) = tokio::sync::mpsc::channel(1000);
@ -45,8 +46,9 @@ async fn init() -> Result<()> {
// Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown. We // Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown. We
// also send off any notifications needed to close off any waiting tasks. // also send off any notifications needed to close off any waiting tasks.
spawn(async move { spawn(async move {
tracing::info!("Registered shutdown signal handler...");
tokio::signal::ctrl_c().await.unwrap(); tokio::signal::ctrl_c().await.unwrap();
log::info!("Shut down signal received, waiting for tasks..."); tracing::info!("Shut down signal received, waiting for tasks...");
SHOULD_EXIT.store(true, std::sync::atomic::Ordering::Release); SHOULD_EXIT.store(true, std::sync::atomic::Ordering::Release);
let _ = update_tx.send(()).await; let _ = update_tx.send(()).await;
}); });
@ -70,9 +72,20 @@ async fn init() -> Result<()> {
} }
#[tokio::main] #[tokio::main]
#[tracing::instrument]
async fn main() -> Result<()> { async fn main() -> Result<()> {
env_logger::init(); env_logger::init();
// Initialize a Tracing Subscriber
tracing::subscriber::set_global_default(
tracing_subscriber::fmt()
.compact()
.with_file(false)
.with_line_number(true)
.with_thread_ids(true)
.finish(),
)?;
// Launch the application. If it fails, print the full backtrace and exit. RUST_BACKTRACE // Launch the application. If it fails, print the full backtrace and exit. RUST_BACKTRACE
// should be set to 1 for this otherwise it will only print the top-level error. // should be set to 1 for this otherwise it will only print the top-level error.
if let Err(result) = init().await { if let Err(result) = init().await {

View File

@ -189,7 +189,11 @@ func RegisterObservationCallback(f C.callback_t, network_id, bootstrap_addrs, li
defer h.Close() defer h.Close()
topic := fmt.Sprintf("%s/%s", networkID, "broadcast") topic := fmt.Sprintf("%s/%s", networkID, "broadcast")
ps, err := pubsub.NewGossipSub(ctx, h) ps, err := pubsub.NewGossipSub(
ctx,
h,
pubsub.WithValidateQueueSize(1024),
)
if err != nil { if err != nil {
err := fmt.Errorf("Failed to create Pubsub: %w", err) err := fmt.Errorf("Failed to create Pubsub: %w", err)
fmt.Println(err) fmt.Println(err)
@ -205,7 +209,7 @@ func RegisterObservationCallback(f C.callback_t, network_id, bootstrap_addrs, li
defer th.Close() defer th.Close()
sub, err := th.Subscribe() sub, err := th.Subscribe(pubsub.WithBufferSize(1024))
if err != nil { if err != nil {
err := fmt.Errorf("Failed to subscribe topic: %w", err) err := fmt.Errorf("Failed to subscribe topic: %w", err)
fmt.Println(err) fmt.Println(err)

View File

@ -19,6 +19,10 @@ use {
}, },
anyhow::Result, anyhow::Result,
libp2p::Multiaddr, libp2p::Multiaddr,
pythnet_sdk::wire::v1::{
WormholeMessage,
WormholePayload,
},
std::{ std::{
ffi::{ ffi::{
c_char, c_char,
@ -34,6 +38,10 @@ use {
Mutex, Mutex,
}, },
}, },
wormhole_sdk::{
Address,
Chain,
},
}; };
extern "C" { extern "C" {
@ -53,7 +61,32 @@ pub struct ObservationC {
pub vaa_len: usize, pub vaa_len: usize,
} }
pub type Observation = Vec<u8>; /// A wrapper around a VAA observed from Wormhole.
///
/// This wrapper tracks a Span that allows tracking the VAA through the system. This Span is
/// expected to be tied to the `proxy` Span and so logging will always be traced back to the
/// associated `proxy` Span regardless of where in the system it is being used.
#[derive(Clone, Debug)]
pub struct Vaa {
pub span: tracing::Span,
pub data: Vec<u8>,
}
// Allow PartialEq on Vaa that ignores the Span.
impl PartialEq for Vaa {
fn eq(&self, other: &Self) -> bool {
self.data == other.data
}
}
/// Deref to &[u8] so we can ignore the wrapper when passing it to the store.
impl std::ops::Deref for Vaa {
type Target = [u8];
fn deref(&self) -> &Self::Target {
&self.data
}
}
// A Static Channel to pipe the `Observation` from the callback into the local Rust handler for // A Static Channel to pipe the `Observation` from the callback into the local Rust handler for
// observation messages. It has to be static for now because there's no way to capture state in // observation messages. It has to be static for now because there's no way to capture state in
@ -61,8 +94,8 @@ pub type Observation = Vec<u8>;
// TODO: Move this channel to the module level that spawns the services // TODO: Move this channel to the module level that spawns the services
lazy_static::lazy_static! { lazy_static::lazy_static! {
pub static ref OBSERVATIONS: ( pub static ref OBSERVATIONS: (
Mutex<Sender<Observation>>, Mutex<Sender<Vaa>>,
Mutex<Receiver<Observation>>, Mutex<Receiver<Vaa>>,
) = { ) = {
let (tx, rc) = std::sync::mpsc::channel(); let (tx, rc) = std::sync::mpsc::channel();
(Mutex::new(tx), Mutex::new(rc)) (Mutex::new(tx), Mutex::new(rc))
@ -72,20 +105,60 @@ lazy_static::lazy_static! {
/// This function is passed as a callback to the Go libp2p runtime, it passes observations back and /// This function is passed as a callback to the Go libp2p runtime, it passes observations back and
/// acts as a proxy forwarding these observations into our main loop. /// acts as a proxy forwarding these observations into our main loop.
#[no_mangle] #[no_mangle]
#[tracing::instrument(skip(o))]
extern "C" fn proxy(o: ObservationC) { extern "C" fn proxy(o: ObservationC) {
// Create a fixed slice from the pointer and length. // Create a fixed slice from the pointer and length.
let vaa = unsafe { std::slice::from_raw_parts(o.vaa, o.vaa_len) }.to_owned(); let vaa = unsafe { std::slice::from_raw_parts(o.vaa, o.vaa_len) }.to_owned();
// The chances of the mutex getting poisioned is very low and if it happens
// there is no way for us to recover from it. // Deserialize VAA to check Creation Time
let deserialized_vaa = {
serde_wormhole::from_slice::<wormhole_sdk::Vaa<&serde_wormhole::RawMessage>>(&vaa)
.map_err(|e| {
tracing::error!(error = ?e, "Failed to deserialize VAA.");
})
.ok()
}
.unwrap();
if deserialized_vaa.emitter_chain != Chain::Pythnet
|| deserialized_vaa.emitter_address != Address(pythnet_sdk::ACCUMULATOR_EMITTER_ADDRESS)
{
return; // Ignore VAA from other emitters
}
// Get the slot from the VAA.
let slot = match WormholeMessage::try_from_bytes(deserialized_vaa.payload)
.unwrap()
.payload
{
WormholePayload::Merkle(proof) => proof.slot,
};
// Create a Span tied to the Span of the curent proxy.
let span = tracing::span!(
parent: tracing::Span::current(),
tracing::Level::INFO,
"Observation",
slot = slot,
);
// Find the observation time for said VAA (which is a unix timestamp) and serialize as a ISO 8601 string.
let observed_time = deserialized_vaa.timestamp;
let observed_time = chrono::NaiveDateTime::from_timestamp_opt(observed_time as i64, 0).unwrap();
let observed_time = observed_time.format("%Y-%m-%dT%H:%M:%S.%fZ").to_string();
span.in_scope(|| tracing::info!(vaa_timestamp = observed_time, "Observed VAA"));
// The chances of the mutex getting poisioned is very low and if it happens there is no way for
// us to recover from it.
if OBSERVATIONS if OBSERVATIONS
.0 .0
.lock() .lock()
.map_err(|_| ()) .map_err(|_| ())
.and_then(|tx| tx.send(vaa).map_err(|_| ())) .and_then(|tx| tx.send(Vaa { span, data: vaa }).map_err(|_| ()))
.is_err() .is_err()
{ {
log::error!("Failed to lock p2p observation channel or to send observation.");
crate::SHOULD_EXIT.store(true, Ordering::Release); crate::SHOULD_EXIT.store(true, Ordering::Release);
tracing::error!("Failed to lock p2p observation channel or to send observation.");
} }
} }
@ -94,6 +167,7 @@ extern "C" fn proxy(o: ObservationC) {
/// TODO: handle_message should be capable of handling more than just Observations. But we don't /// TODO: handle_message should be capable of handling more than just Observations. But we don't
/// have our own P2P network, we pass it in to keep the code structure and read directly from the /// have our own P2P network, we pass it in to keep the code structure and read directly from the
/// OBSERVATIONS channel in the RPC for now. /// OBSERVATIONS channel in the RPC for now.
#[tracing::instrument(skip(wh_bootstrap_addrs, wh_listen_addrs))]
pub fn bootstrap( pub fn bootstrap(
network_id: String, network_id: String,
wh_bootstrap_addrs: Vec<Multiaddr>, wh_bootstrap_addrs: Vec<Multiaddr>,
@ -124,12 +198,16 @@ pub fn bootstrap(
wh_listen_addrs_cstr.as_ptr(), wh_listen_addrs_cstr.as_ptr(),
); );
} }
tracing::info!("Registered observation callback.");
Ok(()) Ok(())
} }
// Spawn's the P2P layer as a separate thread via Go. // Spawn's the P2P layer as a separate thread via Go.
#[tracing::instrument(skip(opts, store))]
pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> { pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
log::info!("Starting P2P server on {:?}", opts.wh_listen_addrs); tracing::info!(listeners = ?opts.wh_listen_addrs, "Starting P2P Server");
std::thread::spawn(|| { std::thread::spawn(|| {
if bootstrap( if bootstrap(
@ -139,7 +217,7 @@ pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
) )
.is_err() .is_err()
{ {
log::error!("Failed to bootstrap P2P server."); tracing::error!("Failed to bootstrap P2P server.");
crate::SHOULD_EXIT.store(true, Ordering::Release); crate::SHOULD_EXIT.store(true, Ordering::Release);
} }
}); });
@ -148,14 +226,14 @@ pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
// Listen in the background for new VAA's from the p2p layer // Listen in the background for new VAA's from the p2p layer
// and update the state accordingly. // and update the state accordingly.
while !crate::SHOULD_EXIT.load(Ordering::Acquire) { while !crate::SHOULD_EXIT.load(Ordering::Acquire) {
let vaa_bytes = tokio::task::spawn_blocking(|| { let vaa = tokio::task::spawn_blocking(|| {
let observation = OBSERVATIONS.1.lock(); let observation = OBSERVATIONS.1.lock();
let observation = match observation { let observation = match observation {
Ok(observation) => observation, Ok(observation) => observation,
Err(e) => { Err(e) => {
// This should never happen, but if it does, we want to panic and crash // This should never happen, but if it does, we want to panic and crash
// as it is not recoverable. // as it is not recoverable.
log::error!("Failed to lock p2p observation channel: {e}"); tracing::error!(error = ?e, "Failed to lock p2p observation channel.");
crate::SHOULD_EXIT.store(true, Ordering::Release); crate::SHOULD_EXIT.store(true, Ordering::Release);
return Err(anyhow::anyhow!("Failed to lock p2p observation channel")); return Err(anyhow::anyhow!("Failed to lock p2p observation channel"));
} }
@ -166,7 +244,7 @@ pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
Err(e) => { Err(e) => {
// This should never happen, but if it does, we want to shutdown the // This should never happen, but if it does, we want to shutdown the
// application as it is unrecoverable. // application as it is unrecoverable.
log::error!("Failed to receive p2p observation: {e}"); tracing::error!(error = ?e, "Failed to receive p2p observation.");
crate::SHOULD_EXIT.store(true, Ordering::Release); crate::SHOULD_EXIT.store(true, Ordering::Release);
Err(anyhow::anyhow!("Failed to receive p2p observation.")) Err(anyhow::anyhow!("Failed to receive p2p observation."))
} }
@ -174,15 +252,18 @@ pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
}) })
.await??; .await??;
vaa.span
.in_scope(|| tracing::info!("Received VAA from P2P layer."));
let store = store.clone(); let store = store.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = store.store_update(Update::Vaa(vaa_bytes)).await { if let Err(e) = store.store_update(Update::Vaa(vaa)).await {
log::error!("Failed to process VAA: {:?}", e); tracing::error!(error = ?e, "Failed to process VAA.");
} }
}); });
} }
log::info!("Shutting down P2P server..."); tracing::info!("Shutting down P2P server...");
Ok::<(), anyhow::Error>(()) Ok::<(), anyhow::Error>(())
}); });

View File

@ -156,7 +156,7 @@ pub async fn run(store: Arc<Store>, pythnet_ws_endpoint: String) -> Result<()> {
let account: Account = match update.value.account.decode() { let account: Account = match update.value.account.decode() {
Some(account) => account, Some(account) => account,
None => { None => {
log::error!("Failed to decode account from update: {:?}", update); tracing::error!(?update, "Failed to decode account from update.");
continue; continue;
} }
}; };
@ -179,20 +179,20 @@ pub async fn run(store: Arc<Store>, pythnet_ws_endpoint: String) -> Result<()> {
.store_update(Update::AccumulatorMessages(accumulator_messages)) .store_update(Update::AccumulatorMessages(accumulator_messages))
.await .await
{ {
log::error!("Failed to store accumulator messages: {:?}", err); tracing::error!(error = ?err, "Failed to store accumulator messages.");
} }
}); });
} else { } else {
log::error!( tracing::error!(
"Failed to verify the messages public key: {:?} != {:?}", ?candidate,
candidate, ?update.value.pubkey,
update.value.pubkey "Failed to verify message public keys.",
); );
} }
} }
Err(err) => { Err(err) => {
log::error!("Failed to parse AccumulatorMessages: {:?}", err); tracing::error!(error = ?err, "Failed to parse AccumulatorMessages.");
} }
}; };
} }
@ -222,10 +222,10 @@ async fn fetch_existing_guardian_sets(
let current = let current =
fetch_guardian_set(&client, wormhole_contract_addr, bridge.guardian_set_index).await?; fetch_guardian_set(&client, wormhole_contract_addr, bridge.guardian_set_index).await?;
log::info!( tracing::info!(
"Retrieved Current GuardianSet ({}): {}", guardian_set_index = bridge.guardian_set_index,
bridge.guardian_set_index, %current,
current "Retrieved Current GuardianSet.",
); );
store store
@ -242,10 +242,10 @@ async fn fetch_existing_guardian_sets(
) )
.await?; .await?;
log::info!( tracing::info!(
"Retrieved Previous GuardianSet ({}): {}", previous_guardian_set_index = bridge.guardian_set_index - 1,
bridge.guardian_set_index - 1, %previous,
previous "Retrieved Previous GuardianSet.",
); );
store store
@ -256,10 +256,11 @@ async fn fetch_existing_guardian_sets(
Ok(()) Ok(())
} }
#[tracing::instrument(skip(opts, store))]
pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> { pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
log::info!( tracing::info!(
"Starting Pythnet listener using {}", endpoint = opts.pythnet_ws_endpoint,
opts.pythnet_ws_endpoint "Started Pythnet Listener."
); );
fetch_existing_guardian_sets( fetch_existing_guardian_sets(
@ -277,17 +278,15 @@ pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
let current_time = Instant::now(); let current_time = Instant::now();
if let Err(ref e) = run(store.clone(), pythnet_ws_endpoint.clone()).await { if let Err(ref e) = run(store.clone(), pythnet_ws_endpoint.clone()).await {
log::error!("Error in Pythnet network listener: {:?}", e); tracing::error!(error = ?e, "Error in Pythnet network listener.");
if current_time.elapsed() < Duration::from_secs(30) { if current_time.elapsed() < Duration::from_secs(30) {
log::error!( tracing::error!("Pythnet listener restarting too quickly. Sleep 1s.");
"Pythnet network listener restarting too quickly. Sleeping for 1s"
);
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;
} }
} }
} }
log::info!("Shutting down Pythnet listener..."); tracing::info!("Shutting down Pythnet listener...");
}) })
}; };
@ -315,12 +314,12 @@ pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
{ {
Ok(_) => {} Ok(_) => {}
Err(err) => { Err(err) => {
log::error!("Failed to poll for new guardian sets: {:?}", err); tracing::error!(error = ?err, "Failed to poll for new guardian sets.")
} }
} }
} }
log::info!("Shutting down Pythnet guardian set poller..."); tracing::info!("Shutting down Pythnet guardian set poller...");
}) })
}; };

View File

@ -73,11 +73,7 @@ use {
mpsc::Sender, mpsc::Sender,
RwLock, RwLock,
}, },
wormhole_sdk::{ wormhole_sdk::Vaa,
Address,
Chain,
Vaa,
},
}; };
pub mod proof; pub mod proof;
@ -118,20 +114,15 @@ impl Store {
} }
/// Stores the update data in the store /// Stores the update data in the store
#[tracing::instrument(skip(self, update))]
pub async fn store_update(&self, update: Update) -> Result<()> { pub async fn store_update(&self, update: Update) -> Result<()> {
// The slot that the update is originating from. It should be available // The slot that the update is originating from. It should be available
// in all the updates. // in all the updates.
let slot = match update { let slot = match update {
Update::Vaa(vaa_bytes) => { Update::Vaa(update_vaa) => {
// FIXME: Move to wormhole.rs // FIXME: Move to wormhole.rs
let vaa = let vaa =
serde_wormhole::from_slice::<Vaa<&serde_wormhole::RawMessage>>(&vaa_bytes)?; serde_wormhole::from_slice::<Vaa<&serde_wormhole::RawMessage>>(&update_vaa)?;
if vaa.emitter_chain != Chain::Pythnet
|| vaa.emitter_address != Address(pythnet_sdk::ACCUMULATOR_EMITTER_ADDRESS)
{
return Ok(()); // Ignore VAA from other emitters
}
if self.observed_vaa_seqs.read().await.contains(&vaa.sequence) { if self.observed_vaa_seqs.read().await.contains(&vaa.sequence) {
return Ok(()); // Ignore VAA if we have already seen it return Ok(()); // Ignore VAA if we have already seen it
@ -142,13 +133,16 @@ impl Store {
let vaa = match vaa { let vaa = match vaa {
Ok(vaa) => vaa, Ok(vaa) => vaa,
Err(err) => { Err(err) => {
log::info!("Ignoring invalid VAA: {:?}", err); tracing::warn!(error = ?err, "Ignoring invalid VAA.");
return Ok(()); return Ok(());
} }
}; };
{ {
let mut observed_vaa_seqs = self.observed_vaa_seqs.write().await; let mut observed_vaa_seqs = self.observed_vaa_seqs.write().await;
if observed_vaa_seqs.contains(&vaa.sequence) {
return Ok(()); // Ignore VAA if we have already seen it
}
observed_vaa_seqs.insert(vaa.sequence); observed_vaa_seqs.insert(vaa.sequence);
while observed_vaa_seqs.len() > OBSERVED_CACHE_SIZE { while observed_vaa_seqs.len() > OBSERVED_CACHE_SIZE {
observed_vaa_seqs.pop_first(); observed_vaa_seqs.pop_first();
@ -157,16 +151,35 @@ impl Store {
match WormholeMessage::try_from_bytes(vaa.payload)?.payload { match WormholeMessage::try_from_bytes(vaa.payload)?.payload {
WormholePayload::Merkle(proof) => { WormholePayload::Merkle(proof) => {
log::info!("Storing merkle proof for slot {:?}", proof.slot,); update_vaa.span.in_scope(|| {
store_wormhole_merkle_verified_message(self, proof.clone(), vaa_bytes) tracing::info!(slot = proof.slot, "Storing VAA Merkle Proof.");
.await?; });
store_wormhole_merkle_verified_message(
self,
proof.clone(),
update_vaa.to_owned(),
)
.await?;
proof.slot proof.slot
} }
} }
} }
Update::AccumulatorMessages(accumulator_messages) => { Update::AccumulatorMessages(accumulator_messages) => {
let slot = accumulator_messages.slot; let slot = accumulator_messages.slot;
log::info!("Storing accumulator messages for slot {:?}.", slot,); if let Some(state) = self.storage.fetch_wormhole_merkle_state(slot).await? {
state.vaa.span.in_scope(|| {
tracing::info!(
slot = slot,
"Storing Accumulator Messages (existing Proof)."
);
});
} else {
tracing::info!(slot = slot, "Storing Accumulator Messages.");
}
self.storage self.storage
.store_accumulator_messages(accumulator_messages) .store_accumulator_messages(accumulator_messages)
.await?; .await?;
@ -185,6 +198,10 @@ impl Store {
_ => return Ok(()), _ => return Ok(()),
}; };
wormhole_merkle_state.vaa.span.in_scope(|| {
tracing::info!(slot = wormhole_merkle_state.root.slot, "Completed Update.");
});
// Once the accumulator reaches a complete state for a specific slot // Once the accumulator reaches a complete state for a specific slot
// we can build the message states // we can build the message states
self.build_message_states(accumulator_messages, wormhole_merkle_state) self.build_message_states(accumulator_messages, wormhole_merkle_state)
@ -200,6 +217,7 @@ impl Store {
Ok(()) Ok(())
} }
#[tracing::instrument(skip(self, accumulator_messages, wormhole_merkle_state))]
async fn build_message_states( async fn build_message_states(
&self, &self,
accumulator_messages: AccumulatorMessages, accumulator_messages: AccumulatorMessages,
@ -232,7 +250,7 @@ impl Store {
}) })
.collect::<Result<Vec<_>>>()?; .collect::<Result<Vec<_>>>()?;
log::info!("Message states len: {:?}", message_states.len()); tracing::info!(len = message_states.len(), "Storing Message States.");
self.storage.store_message_states(message_states).await?; self.storage.store_message_states(message_states).await?;
@ -385,7 +403,10 @@ mod test {
payload: serde_wormhole::RawMessage::new(wormhole_message.as_ref()), payload: serde_wormhole::RawMessage::new(wormhole_message.as_ref()),
}; };
updates.push(Update::Vaa(serde_wormhole::to_vec(&vaa).unwrap())); updates.push(Update::Vaa(crate::network::p2p::Vaa {
span: tracing::Span::current(),
data: serde_wormhole::to_vec(&vaa).unwrap(),
}));
updates updates
} }

View File

@ -1,8 +1,11 @@
use { use {
crate::store::{ crate::{
storage::MessageState, network::p2p::Vaa,
types::AccumulatorMessages, store::{
Store, storage::MessageState,
types::AccumulatorMessages,
Store,
},
}, },
anyhow::{ anyhow::{
anyhow, anyhow,
@ -36,26 +39,23 @@ pub const MAX_MESSAGE_IN_SINGLE_UPDATE_DATA: usize = 255;
#[derive(Clone, PartialEq, Debug)] #[derive(Clone, PartialEq, Debug)]
pub struct WormholeMerkleState { pub struct WormholeMerkleState {
pub root: WormholeMerkleRoot, pub root: WormholeMerkleRoot,
pub vaa: Vec<u8>, pub vaa: Vaa,
} }
#[derive(Clone, PartialEq, Debug)] #[derive(Clone, PartialEq, Debug)]
pub struct WormholeMerkleMessageProof { pub struct WormholeMerkleMessageProof {
pub vaa: Vec<u8>,
pub proof: MerklePath<Keccak160>, pub proof: MerklePath<Keccak160>,
pub vaa: Vaa,
} }
pub async fn store_wormhole_merkle_verified_message( pub async fn store_wormhole_merkle_verified_message(
store: &Store, store: &Store,
root: WormholeMerkleRoot, root: WormholeMerkleRoot,
vaa_bytes: Vec<u8>, vaa: Vaa,
) -> Result<()> { ) -> Result<()> {
store store
.storage .storage
.store_wormhole_merkle_state(WormholeMerkleState { .store_wormhole_merkle_state(WormholeMerkleState { root, vaa })
root,
vaa: vaa_bytes,
})
.await?; .await?;
Ok(()) Ok(())
} }
@ -108,9 +108,13 @@ pub fn construct_update_data(mut message_states: Vec<&MessageState>) -> Result<V
.vaa .vaa
.clone(); .clone();
vaa.span.in_scope(|| {
tracing::info!("Constructing update data for {} Messages.", messages.len())
});
Ok(to_vec::<_, byteorder::BE>(&AccumulatorUpdateData::new( Ok(to_vec::<_, byteorder::BE>(&AccumulatorUpdateData::new(
Proof::WormholeMerkle { Proof::WormholeMerkle {
vaa: vaa.into(), vaa: (*vaa).to_owned().into(),
updates: messages updates: messages
.iter() .iter()
.map(|message| { .map(|message| {

View File

@ -1,5 +1,6 @@
use { use {
super::proof::wormhole_merkle::WormholeMerkleMessageProof, super::proof::wormhole_merkle::WormholeMerkleMessageProof,
crate::network::p2p::Vaa,
borsh::BorshDeserialize, borsh::BorshDeserialize,
pythnet_sdk::messages::PriceFeedMessage, pythnet_sdk::messages::PriceFeedMessage,
}; };
@ -44,8 +45,9 @@ impl AccumulatorMessages {
} }
} }
#[derive(Debug)]
pub enum Update { pub enum Update {
Vaa(Vec<u8>), Vaa(Vaa),
AccumulatorMessages(AccumulatorMessages), AccumulatorMessages(AccumulatorMessages),
} }