diff --git a/solana/pyth2wormhole/Cargo.lock b/solana/pyth2wormhole/Cargo.lock index ff0dc08e..afe52a36 100644 --- a/solana/pyth2wormhole/Cargo.lock +++ b/solana/pyth2wormhole/Cargo.lock @@ -1417,9 +1417,9 @@ dependencies = [ [[package]] name = "generic-array" -version = "0.14.5" +version = "0.14.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd48d33ec7f05fbfa152300fdad764757cbded343c1aa1cff2fbaf4134851803" +checksum = "bff49e947297f3312447abdca79f45f4738097cc82b06e72054d2223f601f1b9" dependencies = [ "serde", "typenum", @@ -2648,6 +2648,7 @@ dependencies = [ "clap 3.1.18", "env_logger 0.8.4", "futures", + "generic-array", "log", "p2w-sdk", "pyth-client 0.5.1", @@ -2655,6 +2656,7 @@ dependencies = [ "pyth2wormhole", "serde", "serde_yaml", + "sha3 0.10.6", "shellexpand", "solana-client", "solana-program", @@ -3353,9 +3355,9 @@ dependencies = [ [[package]] name = "sha3" -version = "0.10.5" +version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2904bea16a1ae962b483322a1c7b81d976029203aea1f461e51cd7705db7ba9" +checksum = "bdf0c33fae925bdc080598b84bc15c55e7b9a4a43b3c704da051f977469691c9" dependencies = [ "digest 0.10.5", "keccak", @@ -3844,7 +3846,7 @@ dependencies = [ "serde_bytes", "serde_derive", "sha2 0.10.2", - "sha3 0.10.5", + "sha3 0.10.6", "solana-frozen-abi", "solana-frozen-abi-macro", "solana-sdk-macro", @@ -4028,7 +4030,7 @@ dependencies = [ "serde_derive", "serde_json", "sha2 0.10.2", - "sha3 0.10.5", + "sha3 0.10.6", "solana-frozen-abi", "solana-frozen-abi-macro", "solana-logger", @@ -5221,7 +5223,7 @@ dependencies = [ "hex-literal", "nom", "primitive-types 0.11.1", - "sha3 0.10.5", + "sha3 0.10.6", "thiserror", ] @@ -5237,7 +5239,7 @@ dependencies = [ "hex-literal", "nom", "primitive-types 0.11.1", - "sha3 0.10.5", + "sha3 0.10.6", "solana-program", "thiserror", "wormhole-core", diff --git a/solana/pyth2wormhole/client/Cargo.toml b/solana/pyth2wormhole/client/Cargo.toml index ae627cd9..81a62a80 100644 --- a/solana/pyth2wormhole/client/Cargo.toml +++ b/solana/pyth2wormhole/client/Cargo.toml @@ -31,6 +31,8 @@ solana-transaction-status = "=1.10.31" solitaire = {git = "https://github.com/wormhole-foundation/wormhole", tag = "v2.8.9"} tokio = {version = "1", features = ["sync", "rt-multi-thread", "time"]} futures = "0.3.21" +sha3 = "0.10.6" +generic-array = "0.14.6" [dev-dependencies] pyth-client = "0.5.0" diff --git a/solana/pyth2wormhole/client/src/attestation_cfg.rs b/solana/pyth2wormhole/client/src/attestation_cfg.rs index f307717c..c81e9a43 100644 --- a/solana/pyth2wormhole/client/src/attestation_cfg.rs +++ b/solana/pyth2wormhole/client/src/attestation_cfg.rs @@ -7,6 +7,8 @@ use std::{ str::FromStr, }; +use log::info; + use serde::{ de::Error, Deserialize, @@ -16,8 +18,10 @@ use serde::{ }; use solana_program::pubkey::Pubkey; +use crate::BatchState; + /// Pyth2wormhole config specific to attestation requests -#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)] pub struct AttestationConfig { #[serde(default = "default_min_msg_reuse_interval_ms")] pub min_msg_reuse_interval_ms: u64, @@ -30,6 +34,15 @@ pub struct AttestationConfig { default // Uses Option::default() which is None )] pub mapping_addr: Option, + /// The known symbol list will be reloaded based off this + /// interval, to account for mapping changes. Note: This interval + /// will only work if the mapping address is defined. Whenever + /// it's time to look up the mapping, new attestation jobs are + /// started lazily, only if mapping contents affected the known + /// symbol list, and before stopping the pre-existing obsolete + /// jobs to maintain uninterrupted cranking. + #[serde(default = "default_mapping_reload_interval_mins")] + pub mapping_reload_interval_mins: u64, #[serde(default = "default_min_rpc_interval_ms")] /// Rate-limiting minimum delay between RPC requests in milliseconds" pub min_rpc_interval_ms: u64, @@ -49,7 +62,7 @@ impl AttestationConfig { for existing_group in &self.symbol_groups { for existing_sym in &existing_group.symbols { // Check if new symbols mention this product - if let Some(mut prices) = new_symbols.get_mut(&existing_sym.product_addr) { + if let Some(prices) = new_symbols.get_mut(&existing_sym.product_addr) { // Prune the price if exists prices.remove(&existing_sym.price_addr); } @@ -74,7 +87,7 @@ impl AttestationConfig { .iter_mut() .find(|g| g.group_name == group_name) // Advances the iterator and returns Some(item) on first hit { - Some(mut existing_group) => existing_group.symbols.append(&mut new_symbols_vec), + Some(existing_group) => existing_group.symbols.append(&mut new_symbols_vec), None if new_symbols_vec.len() != 0 => { // Group does not exist, assume defaults let new_group = SymbolGroup { @@ -88,9 +101,30 @@ impl AttestationConfig { None => {} } } + + pub fn as_batches(&self, max_batch_size: usize) -> Vec { + self.symbol_groups + .iter() + .map(move |g| { + let conditions4closure = g.conditions.clone(); + let name4closure = g.group_name.clone(); + + info!("Group {:?}, {} symbols", g.group_name, g.symbols.len(),); + + // Divide group into batches + g.symbols + .as_slice() + .chunks(max_batch_size.clone()) + .map(move |symbols| { + BatchState::new(name4closure.clone(), symbols, conditions4closure.clone()) + }) + }) + .flatten() + .collect() + } } -#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)] pub struct SymbolGroup { pub group_name: String, /// Attestation conditions applied to all symbols in this group @@ -106,6 +140,10 @@ pub const fn default_min_msg_reuse_interval_ms() -> u64 { 10_000 // 10s } +pub const fn default_mapping_reload_interval_mins() -> u64 { + 15 +} + pub const fn default_min_rpc_interval_ms() -> u64 { 150 } @@ -122,7 +160,7 @@ pub const fn default_max_batch_jobs() -> usize { /// of the active conditions is met. Option<> fields can be /// de-activated with None. All conditions are inactive by default, /// except for the non-Option ones. -#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)] pub struct AttestationConditions { /// Baseline, unconditional attestation interval. Attestation is triggered if the specified interval elapsed since last attestation. #[serde(default = "default_min_interval_secs")] @@ -134,9 +172,10 @@ pub struct AttestationConditions { #[serde(default = "default_max_batch_jobs")] pub max_batch_jobs: usize, - /// Trigger attestation if price changes by the specified percentage. + /// Trigger attestation if price changes by the specified + /// percentage, expressed in integer basis points (1bps = 0.01%) #[serde(default)] - pub price_changed_pct: Option, + pub price_changed_bps: Option, /// Trigger attestation if publish_time advances at least the /// specified amount. @@ -152,11 +191,11 @@ impl AttestationConditions { let AttestationConditions { min_interval_secs: _min_interval_secs, max_batch_jobs: _max_batch_jobs, - price_changed_pct, + price_changed_bps, publish_time_min_delta_secs, } = self; - price_changed_pct.is_some() || publish_time_min_delta_secs.is_some() + price_changed_bps.is_some() || publish_time_min_delta_secs.is_some() } } @@ -165,14 +204,14 @@ impl Default for AttestationConditions { Self { min_interval_secs: default_min_interval_secs(), max_batch_jobs: default_max_batch_jobs(), - price_changed_pct: None, + price_changed_bps: None, publish_time_min_delta_secs: None, } } } /// Config entry for a Pyth product + price pair -#[derive(Clone, Default, Debug, Deserialize, Serialize, PartialEq, Eq)] +#[derive(Clone, Default, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] pub struct P2WSymbol { /// User-defined human-readable name pub name: Option, @@ -283,6 +322,7 @@ mod tests { max_msg_accounts: 100_000, min_rpc_interval_ms: 2123, mapping_addr: None, + mapping_reload_interval_mins: 42, symbol_groups: vec![fastbois, slowbois], }; @@ -302,6 +342,7 @@ mod tests { max_msg_accounts: 100, min_rpc_interval_ms: 42422, mapping_addr: None, + mapping_reload_interval_mins: 42, symbol_groups: vec![], }; diff --git a/solana/pyth2wormhole/client/src/batch_state.rs b/solana/pyth2wormhole/client/src/batch_state.rs index cf74830f..62f914af 100644 --- a/solana/pyth2wormhole/client/src/batch_state.rs +++ b/solana/pyth2wormhole/client/src/batch_state.rs @@ -1,11 +1,8 @@ -use futures::future::TryFutureExt; use log::{ debug, - trace, warn, }; use solana_client::nonblocking::rpc_client::RpcClient; -use solana_sdk::signature::Signature; use pyth_sdk_solana::state::PriceAccount; @@ -16,31 +13,29 @@ use std::time::{ use crate::{ AttestationConditions, - ErrBox, P2WSymbol, - RLMutex, }; /// Runtime representation of a batch. It refers to the original group /// from the config. #[derive(Debug)] -pub struct BatchState<'a> { +pub struct BatchState { pub group_name: String, - pub symbols: &'a [P2WSymbol], + pub symbols: Vec, pub last_known_symbol_states: Vec>, pub conditions: AttestationConditions, pub last_job_finished_at: Instant, } -impl<'a> BatchState<'a> { +impl<'a> BatchState { pub fn new( group_name: String, - symbols: &'a [P2WSymbol], + symbols: &[P2WSymbol], conditions: AttestationConditions, ) -> Self { Self { group_name, - symbols, + symbols: symbols.to_vec(), conditions, last_known_symbol_states: vec![None; symbols.len()], last_job_finished_at: Instant::now(), @@ -69,7 +64,7 @@ impl<'a> BatchState<'a> { // Only lookup and compare symbols if the conditions require if self.conditions.need_onchain_lookup() { - let mut new_symbol_states: Vec> = + let new_symbol_states: Vec> = match c.get_multiple_accounts(&pubkeys).await { Ok(acc_opts) => { acc_opts @@ -120,9 +115,9 @@ impl<'a> BatchState<'a> { )) } - // price_changed_pct - } else if let Some(pct) = self.conditions.price_changed_pct { - let pct = pct.abs(); + // price_changed_bps + } else if let Some(bps) = self.conditions.price_changed_bps { + let pct = bps as f64 / 100.0; let price_pct_diff = ((old.agg.price as f64 - new.agg.price as f64) / old.agg.price as f64 * 100.0) diff --git a/solana/pyth2wormhole/client/src/cli.rs b/solana/pyth2wormhole/client/src/cli.rs index b0a80ce7..7105da8d 100644 --- a/solana/pyth2wormhole/client/src/cli.rs +++ b/solana/pyth2wormhole/client/src/cli.rs @@ -123,9 +123,7 @@ pub enum Action { }, #[clap(about = "Print out emitter address for the specified pyth2wormhole contract")] GetEmitter, - #[clap( - about = "Set the value of is_active config as ops_owner" - )] + #[clap(about = "Set the value of is_active config as ops_owner")] SetIsActive { /// Current ops owner keypair path #[clap( @@ -139,5 +137,5 @@ pub enum Action { possible_values = ["true", "false"], )] new_is_active: String, - } + }, } diff --git a/solana/pyth2wormhole/client/src/lib.rs b/solana/pyth2wormhole/client/src/lib.rs index 098850b7..57c3d366 100644 --- a/solana/pyth2wormhole/client/src/lib.rs +++ b/solana/pyth2wormhole/client/src/lib.rs @@ -10,6 +10,7 @@ use borsh::{ use log::{ debug, trace, + warn, }; use pyth_sdk_solana::state::{ load_mapping_account, @@ -61,7 +62,6 @@ use std::collections::{ use p2w_sdk::P2WEmitter; use pyth2wormhole::{ - attest::P2W_MAX_BATCH_SIZE, config::{ OldP2WConfigAccount, P2WConfigAccount, @@ -409,23 +409,41 @@ pub async fn crawl_pyth_mapping( let mut ret = HashMap::new(); let mut n_mappings = 1; // We assume the first one must be valid - let mut n_products = 0; - let mut n_prices = 0; + let mut n_products_total = 0; // Grand total products in all mapping accounts + let mut n_prices_total = 0; // Grand total prices in all product accounts in all mapping accounts let mut mapping_addr = first_mapping_addr.clone(); // loop until the last non-zero MappingAccount.next account loop { let mapping_bytes = rpc_client.get_account_data(&mapping_addr).await?; + let mapping = match load_mapping_account(&mapping_bytes) { + Ok(p) => p, + Err(e) => { + warn!( + "Mapping: Could not parse account {} as a Pyth mapping, crawling terminated. Error: {:?}", + mapping_addr, e + ); + continue; + } + }; - let mapping = load_mapping_account(&mapping_bytes)?; + // Products in this mapping account + let mut n_mapping_products = 0; // loop through all products in this mapping; filter out zeroed-out empty product slots for prod_addr in mapping.products.iter().filter(|p| *p != &Pubkey::default()) { let prod_bytes = rpc_client.get_account_data(prod_addr).await?; - let prod = load_product_account(&prod_bytes)?; + let prod = match load_product_account(&prod_bytes) { + Ok(p) => p, + Err(e) => { + warn!("Mapping {}: Could not parse account {} as a Pyth product, skipping to next product. Error: {:?}", mapping_addr, prod_addr, e); + continue; + } + }; let mut price_addr = prod.px_acc.clone(); + let mut n_prod_prices = 0; // the product might have no price, can happen in tilt due to race-condition, failed tx to add price, ... if price_addr == Pubkey::default() { @@ -441,32 +459,46 @@ pub async fn crawl_pyth_mapping( // loop until the last non-zero PriceAccount.next account loop { let price_bytes = rpc_client.get_account_data(&price_addr).await?; - let price = load_price_account(&price_bytes)?; + let price = match load_price_account(&price_bytes) { + Ok(p) => p, + Err(e) => { + warn!("Product {}: Could not parse account {} as a Pyth price, skipping to next product. Error: {:?}", prod_addr, price_addr, e); + break; + } + }; // Append to existing set or create a new map entry ret.entry(prod_addr.clone()) .or_insert(HashSet::new()) .insert(price_addr); - n_prices += 1; + n_prod_prices += 1; if price.next == Pubkey::default() { - trace!("Product {}: processed {} prices", prod_addr, n_prices); + trace!( + "Product {}: processed {} price(s)", + prod_addr, + n_prod_prices + ); break; } + price_addr = price.next.clone(); } - n_products += 1; + n_prices_total += n_prod_prices; } - trace!( - "Mapping {}: processed {} products", - mapping_addr, - n_products - ); + n_mapping_products += 1; + n_products_total += n_mapping_products; // Traverse other mapping accounts if applicable if mapping.next == Pubkey::default() { + trace!( + "Mapping {}: processed {} products", + mapping_addr, + n_mapping_products + ); + break; } mapping_addr = mapping.next.clone(); @@ -474,7 +506,7 @@ pub async fn crawl_pyth_mapping( } debug!( "Processed {} price(s) in {} product account(s), in {} mapping account(s)", - n_prices, n_products, n_mappings + n_prices_total, n_products_total, n_mappings ); Ok(ret) diff --git a/solana/pyth2wormhole/client/src/main.rs b/solana/pyth2wormhole/client/src/main.rs index cd201bc9..b9514da0 100644 --- a/solana/pyth2wormhole/client/src/main.rs +++ b/solana/pyth2wormhole/client/src/main.rs @@ -2,9 +2,7 @@ pub mod cli; use std::{ fs::File, - pin::Pin, sync::Arc, - thread, time::{ Duration, Instant, @@ -12,22 +10,22 @@ use std::{ }; use clap::Parser; -use futures::future::{ - Future, - FutureExt, - TryFuture, - TryFutureExt, +use futures::{ + future::{ + Future, + TryFutureExt, + }, }; +use generic_array::GenericArray; use log::{ debug, error, info, - trace, warn, LevelFilter, }; +use sha3::{Digest, Sha3_256}; use solana_client::{ - client_error::ClientError, nonblocking::rpc_client::RpcClient, rpc_config::RpcTransactionConfig, }; @@ -36,7 +34,6 @@ use solana_sdk::{ commitment_config::CommitmentConfig, signature::{ read_keypair_file, - Signature, }, signer::keypair::Keypair, }; @@ -133,7 +130,7 @@ async fn main() -> Result<(), ErrBox> { remove_ops_owner, } => { let old_config = get_config_account(&rpc_client, &p2w_addr).await?; - + let new_ops_owner = if remove_ops_owner { None } else if let Some(given_ops_owner) = ops_owner_addr { @@ -187,30 +184,50 @@ async fn main() -> Result<(), ErrBox> { daemon, } => { // Load the attestation config yaml - let mut attestation_cfg: AttestationConfig = + let attestation_cfg: AttestationConfig = serde_yaml::from_reader(File::open(attestation_cfg)?)?; - if let Some(mapping_addr) = attestation_cfg.mapping_addr.as_ref() { - let additional_accounts = crawl_pyth_mapping(&rpc_client, mapping_addr).await?; - info!("Additional mapping accounts:\n{:#?}", additional_accounts); - attestation_cfg.add_symbols(additional_accounts, "mapping".to_owned()); - } + // Derive seeded accounts + let emitter_addr = P2WEmitter::key(None, &p2w_addr); - handle_attest( - cli.rpc_url, - cli.commitment, - payer, - p2w_addr, - attestation_cfg, - n_retries, - Duration::from_secs(retry_interval_secs), - Duration::from_secs(confirmation_timeout_secs), - daemon, - ) - .await?; + info!("Using emitter addr {}", emitter_addr); + // Note: For global rate-limitting of RPC requests, we use a + // custom Mutex wrapper which enforces a delay of rpc_interval + // between RPC accesses. + let rpc_cfg = Arc::new(RLMutex::new( + RpcCfg { + url: cli.rpc_url, + timeout: Duration::from_secs(confirmation_timeout_secs), + commitment: cli.commitment.clone(), + }, + Duration::from_millis(attestation_cfg.min_rpc_interval_ms), + )); + + if daemon { + handle_attest_daemon_mode( + rpc_cfg, + payer, + p2w_addr, + attestation_cfg, + ) + .await?; + } else { + handle_attest_non_daemon_mode( + attestation_cfg, + rpc_cfg, + p2w_addr, + payer, + n_retries, + Duration::from_secs(retry_interval_secs), + ) + .await?; + } } Action::GetEmitter => unreachable! {}, // It is handled early in this function. - Action::SetIsActive { ops_owner, new_is_active } => { + Action::SetIsActive { + ops_owner, + new_is_active, + } => { let tx = gen_set_is_active_tx( payer, p2w_addr, @@ -225,132 +242,138 @@ async fn main() -> Result<(), ErrBox> { "Applied config:\n{:?}", get_config_account(&rpc_client, &p2w_addr).await? ); - }, + } } Ok(()) } -/// Send a series of batch attestations for symbols of an attestation config. -async fn handle_attest( - rpc_url: String, - commitment: CommitmentConfig, +/// Continuously send batch attestations for symbols of an attestation config. +async fn handle_attest_daemon_mode( + rpc_cfg: Arc>, payer: Keypair, p2w_addr: Pubkey, - attestation_cfg: AttestationConfig, - n_retries: usize, - retry_interval: Duration, - confirmation_timeout: Duration, - daemon: bool, + mut attestation_cfg: AttestationConfig, ) -> Result<(), ErrBox> { - // Derive seeded accounts - let emitter_addr = P2WEmitter::key(None, &p2w_addr); - - info!("Using emitter addr {}", emitter_addr); - - let config = get_config_account( - &RpcClient::new_with_timeout_and_commitment( - rpc_url.clone(), - confirmation_timeout, - commitment.clone(), - ), - &p2w_addr, - ) - .await?; - - debug!("Symbol config:\n{:#?}", attestation_cfg); - info!( - "{} symbol groups read, dividing into batches", - attestation_cfg.symbol_groups.len(), + "Crawling mapping {:?} every {} minutes", + attestation_cfg.mapping_addr, attestation_cfg.mapping_reload_interval_mins ); - // Reused for failed batch retries - let mut batches: Vec<_> = attestation_cfg - .symbol_groups - .iter() - .map(|g| { - let conditions4closure = g.conditions.clone(); - let name4closure = g.group_name.clone(); - - info!("Group {:?}, {} symbols", g.group_name, g.symbols.len(),); - - // Divide group into batches - g.symbols - .as_slice() - .chunks(config.max_batch_size as usize) - .map(move |symbols| { - BatchState::new(name4closure.clone(), symbols, conditions4closure.clone()) - }) - }) - .flatten() - .enumerate() - .map(|(idx, batch_state)| (idx + 1, batch_state)) - .collect(); - let batch_count = batches.len(); - - /// Note: For global rate-limitting of RPC requests, we use a - /// custom Mutex wrapper which enforces a delay of rpc_interval - /// between RPC accesses. - let rpc_cfg = Arc::new(RLMutex::new( - RpcCfg { - url: rpc_url, - timeout: confirmation_timeout, - commitment: commitment.clone(), - }, - Duration::from_millis(attestation_cfg.min_rpc_interval_ms), - )); + // Used for easier detection of config changes + let mut hasher = Sha3_256::new(); + let mut old_sched_futs_state: Option<(JoinHandle<_>, GenericArray)> = None; // (old_futs_handle, old_config_hash) + // For enforcing min_msg_reuse_interval_ms, we keep a piece of + // state that creates or reuses accounts if enough time had + // passed. It is crucial that this queue is reused across mapping + // lookups, so that previous symbol set's messages have enough + // time to be picked up by Wormhole guardians. let message_q_mtx = Arc::new(Mutex::new(P2WMessageQueue::new( Duration::from_millis(attestation_cfg.min_msg_reuse_interval_ms), attestation_cfg.max_msg_accounts as usize, ))); - // Create attestation scheduling routines; see attestation_sched_job() for details - let mut attestation_sched_futs = batches.into_iter().map(|(batch_no, batch)| { - attestation_sched_job( - batch, - batch_no, - batch_count, - n_retries, - retry_interval, - daemon, - rpc_cfg.clone(), - p2w_addr, - config.clone(), - Keypair::from_bytes(&payer.to_bytes()).unwrap(), - message_q_mtx.clone(), - ) - }); + // This loop cranks attestations without interruption. This is + // achieved by spinning up a new up-to-date symbol set before + // letting go of the previous one. Additionally, hash of on-chain + // and attestation configs is used to prevent needless reloads of + // an unchanged symbol set. + loop { + let start_time = Instant::now(); // Helps timekeep mapping lookups accurately - info!("Spinning up attestation sched jobs"); + let config = get_config_account(&lock_and_make_rpc(&rpc_cfg).await, &p2w_addr).await?; - let results = futures::future::join_all(attestation_sched_futs).await; // May never finish for daemon mode - - info!("Got {} results", results.len()); - - // With daemon mode off, the sched jobs return from the - // join_all. We filter out errors and report them - let errors: Vec<_> = results - .iter() - .enumerate() - .filter_map(|(idx, r)| { - r.as_ref() - .err() - .map(|e| format!("Error {}: {:#?}\n", idx + 1, e)) - }) - .collect(); - - if !errors.is_empty() { - let err_lines = errors.join("\n"); - let msg = format!( - "{} of {} batches failed:\n{}", - errors.len(), - batch_count, - err_lines + // Use the mapping if specified + if let Some(mapping_addr) = attestation_cfg.mapping_addr.as_ref() { + match crawl_pyth_mapping(&lock_and_make_rpc(&rpc_cfg).await, mapping_addr).await { + Ok(additional_accounts) => { + debug!( + "Crawled mapping {} data:\n{:#?}", + mapping_addr, additional_accounts + ); + attestation_cfg.add_symbols(additional_accounts, "mapping".to_owned()); + } + // De-escalate crawling errors; A temporary failure to + // look up the mapping should not crash the attester + Err(e) => { + error!("Could not crawl mapping {}: {:?}", mapping_addr, e); + } + } + } + debug!( + "Attestation config (includes mapping accounts):\n{:#?}", + attestation_cfg ); - error!("{}", msg); - return Err(msg.into()); + + // Hash currently known config + hasher.update(serde_yaml::to_vec(&attestation_cfg)?); + hasher.update(borsh::to_vec(&config)?); + + let new_cfg_hash = hasher.finalize_reset(); + + if let Some((old_handle, old_cfg_hash)) = old_sched_futs_state.as_ref() { + // Ignore unchanged configs + if &new_cfg_hash == old_cfg_hash { + info!("Note: Attestation config and on-chain config unchanged, not stopping existing attestation sched jobs"); + } else { + // Process changed config into attestation scheduling futures + info!("Spinning up attestation sched jobs"); + // Start the new sched futures + let new_sched_futs_handle = tokio::spawn(prepare_attestation_sched_jobs( + &attestation_cfg, + &config, + &rpc_cfg, + &p2w_addr, + &payer, + message_q_mtx.clone() + )); + + // Quit old sched futures + old_handle.abort(); + + // The just started futures become the on-going attestation state + old_sched_futs_state = Some((new_sched_futs_handle, new_cfg_hash)); + } + } else { + // Base case for first attestation attempt + old_sched_futs_state = Some(( + tokio::spawn(prepare_attestation_sched_jobs( + &attestation_cfg, + &config, + &rpc_cfg, + &p2w_addr, + &payer, + message_q_mtx.clone() + )), + new_cfg_hash, + )); + } + + // Sum up elapsed time, wait for next run accurately + let target = Duration::from_secs(attestation_cfg.mapping_reload_interval_mins * 60); + let elapsed = start_time.elapsed(); + + let remaining = target.saturating_sub(elapsed); + + if remaining == Duration::from_secs(0) { + warn!( + "Processing took more than desired mapping lookup interval of {} seconds, not sleeping. Consider increasing {}", + target.as_secs(), + // stringify prints the up-to-date setting name automatically + stringify!(attestation_cfg.mapping_reload_interval_mins) + ); + } else { + info!( + "Processing new mapping took {}.{}s, next config/mapping refresh in {}.{}s", + elapsed.as_secs(), + elapsed.subsec_millis(), + remaining.as_secs(), + remaining.subsec_millis() + ); + } + + tokio::time::sleep(remaining).await; } Ok(()) @@ -373,33 +396,161 @@ async fn lock_and_make_rpc(rlmtx: &RLMutex) -> RpcClient { RpcClient::new_with_timeout_and_commitment(url, timeout, commitment) } -/// A future that decides how a batch is sent. -/// -/// In daemon mode, attestations of the batch are scheduled -/// continuously using spawn(), which means that a next attestation of -/// the same batch begins immediately when a condition is met without -/// waiting for the previous attempt to finish. Subsequent -/// attestations are started according to the attestation_conditions -/// field on the batch. Concurrent requests per batch are limited by -/// the max_batch_jobs field to prevent excess memory usage on network -/// slowdowns etc.. -/// -/// With daemon_mode off, this future attempts only one blocking -/// attestation of the batch and returns the result. -async fn attestation_sched_job( - mut batch: BatchState<'_>, - batch_no: usize, - batch_count: usize, - n_retries: usize, - retry_interval: Duration, - daemon: bool, +/// Non-daemon attestation scheduling +async fn handle_attest_non_daemon_mode( + mut attestation_cfg: AttestationConfig, rpc_cfg: Arc>, p2w_addr: Pubkey, - config: Pyth2WormholeConfig, payer: Keypair, + n_retries: usize, + retry_interval: Duration, +) -> Result<(), ErrBox> { + let p2w_cfg = get_config_account(&lock_and_make_rpc(&rpc_cfg).await, &p2w_addr).await?; + + // Use the mapping if specified + if let Some(mapping_addr) = attestation_cfg.mapping_addr.as_ref() { + match crawl_pyth_mapping(&lock_and_make_rpc(&rpc_cfg).await, mapping_addr).await { + Ok(additional_accounts) => { + debug!( + "Crawled mapping {} data:\n{:#?}", + mapping_addr, additional_accounts + ); + attestation_cfg.add_symbols(additional_accounts, "mapping".to_owned()); + } + // De-escalate crawling errors; A temporary failure to + // look up the mapping should not crash the attester + Err(e) => { + error!("Could not crawl mapping {}: {:?}", mapping_addr, e); + } + } + } + debug!( + "Attestation config (includes mapping accounts):\n{:#?}", + attestation_cfg + ); + + let batches = attestation_cfg.as_batches(p2w_cfg.max_batch_size as usize); + let batch_count = batches.len(); + + // For enforcing min_msg_reuse_interval_ms, we keep a piece of + // state that creates or reuses accounts if enough time had + // passed + let message_q_mtx = Arc::new(Mutex::new(P2WMessageQueue::new( + Duration::from_millis(attestation_cfg.min_msg_reuse_interval_ms), + attestation_cfg.max_msg_accounts as usize, + ))); + + let retry_jobs = batches.into_iter().enumerate().map(|(idx, batch_state)| { + attestation_retry_job(AttestationRetryJobArgs { + batch_no: idx + 1, + batch_count: batch_count.clone(), + group_name: batch_state.group_name, + symbols: batch_state.symbols.clone(), + n_retries, + retry_interval: retry_interval.clone(), + rpc_cfg: rpc_cfg.clone(), + p2w_addr: p2w_addr.clone(), + p2w_config: p2w_cfg.clone(), + payer: Keypair::from_bytes(&payer.to_bytes()).unwrap(), + message_q_mtx: message_q_mtx.clone(), + }) + }); + + let results = futures::future::join_all(retry_jobs).await; + + // After completing, we count any errors coming from the sched + // futs. + let errors: Vec<_> = results + .iter() + .enumerate() + .filter_map(|(idx, r)| { + r.as_ref() + .err() + .map(|e| format!("Error {}: {:?}\n", idx + 1, e)) + }) + .collect(); + + if !errors.is_empty() { + let err_lines = errors.join("\n"); + let msg = format!("{} batches failed:\n{}", errors.len(), err_lines); + error!("{}", msg); + return Err(msg.into()); + } + Ok(()) +} + +/// Constructs attestation scheduling jobs from attestation config. +fn prepare_attestation_sched_jobs( + attestation_cfg: &AttestationConfig, + p2w_cfg: &Pyth2WormholeConfig, + rpc_cfg: &Arc>, + p2w_addr: &Pubkey, + payer: &Keypair, message_q_mtx: Arc>, -) -> Result<(), ErrBoxSend> { - let mut retries_left = n_retries; +) -> futures::future::JoinAll>> { + info!( + "{} symbol groups read, dividing into batches", + attestation_cfg.symbol_groups.len(), + ); + + // Flatten attestation config into a plain list of batches + let batches: Vec<_> = attestation_cfg.as_batches(p2w_cfg.max_batch_size as usize); + + let batch_count = batches.len(); + + + // Create attestation scheduling routines; see attestation_sched_job() for details + let attestation_sched_futs = batches.into_iter().enumerate().map(|(idx, batch)| { + attestation_sched_job(AttestationSchedJobArgs { + batch, + batch_no: idx + 1, + batch_count, + rpc_cfg: rpc_cfg.clone(), + p2w_addr: p2w_addr.clone(), + config: p2w_cfg.clone(), + payer: Keypair::from_bytes(&payer.to_bytes()).unwrap(), + message_q_mtx: message_q_mtx.clone(), + }) + }); + + futures::future::join_all(attestation_sched_futs) +} + +/// The argument count on attestation_sched_job got out of hand. This +/// helps keep the correct order in check. +pub struct AttestationSchedJobArgs { + pub batch: BatchState, + pub batch_no: usize, + pub batch_count: usize, + pub rpc_cfg: Arc>, + pub p2w_addr: Pubkey, + pub config: Pyth2WormholeConfig, + pub payer: Keypair, + pub message_q_mtx: Arc>, +} + +/// A future that decides how a batch is sent in daemon mode. +/// +/// Attestations of the batch are scheduled continuously using +/// spawn(), which means that a next attestation of the same batch +/// begins immediately when a condition is met without waiting for the +/// previous attempt to finish. Subsequent attestations are started +/// according to the attestation_conditions field on the +/// batch. Concurrent requests per batch are limited by the +/// max_batch_jobs field to prevent excess memory usage on network +/// slowdowns etc.. +async fn attestation_sched_job(args: AttestationSchedJobArgs) -> Result<(), ErrBoxSend> { + let AttestationSchedJobArgs { + mut batch, + batch_no, + batch_count, + rpc_cfg, + p2w_addr, + config, + payer, + message_q_mtx, + } = args; + // Enforces the max batch job count let sema = Arc::new(Semaphore::new(batch.conditions.max_batch_jobs)); loop { @@ -408,115 +559,162 @@ async fn attestation_sched_job( batch_no, batch_count, batch.group_name ); - let job = attestation_job( - rpc_cfg.clone(), + let job = attestation_job(AttestationJobArgs { + rlmtx: rpc_cfg.clone(), batch_no, batch_count, - batch.group_name.clone(), + group_name: batch.group_name.clone(), p2w_addr, - config.clone(), - Keypair::from_bytes(&payer.to_bytes()).unwrap(), // Keypair has no clone - batch.symbols.to_vec(), - sema.clone(), - message_q_mtx.clone(), - ); + config: config.clone(), + payer: Keypair::from_bytes(&payer.to_bytes()).unwrap(), // Keypair has no clone + symbols: batch.symbols.to_vec(), + max_jobs_sema: sema.clone(), + message_q_mtx: message_q_mtx.clone(), + }); - if daemon { - // park this routine until a resend condition is met - loop { - if let Some(reason) = batch - .should_resend(&lock_and_make_rpc(&rpc_cfg).await) - .await - { - info!( - "Batch {}/{}, group {}: Resending (reason: {:?})", - batch_no, batch_count, batch.group_name, reason - ); - break; - } + // park this routine until a resend condition is met + loop { + if let Some(reason) = batch + .should_resend(&lock_and_make_rpc(&rpc_cfg).await) + .await + { + info!( + "Batch {}/{}, group {}: Resending (reason: {:?})", + batch_no, batch_count, batch.group_name, reason + ); + break; } + } - if sema.available_permits() == 0 { - warn!( - "Batch {}/{}, group {:?}: Ran out of job \ + if sema.available_permits() == 0 { + warn!( + "Batch {}/{}, group {:?}: Ran out of job \ permits, some attestation conditions may be \ delayed. For better accuracy, increase \ max_batch_jobs or adjust attestation \ conditions", - batch_no, batch_count, batch.group_name - ); - } - - // This short-lived permit prevents scheduling - // excess attestation jobs (which could eventually - // eat all memory). It is freed as soon as we - // leave this code block. - let _permit4sched = sema.acquire().await?; - - let batch_no4err_msg = batch_no.clone(); - let batch_count4err_msg = batch_count.clone(); - let group_name4err_msg = batch.group_name.clone(); - - // We never get to error reporting in daemon mode, attach a map_err - let job_with_err_msg = job.map_err(move |e| { - warn!( - "Batch {}/{}, group {:?} ERR: {:#?}", - batch_no4err_msg, batch_count4err_msg, group_name4err_msg, e - ); - e - }); - - // Spawn the job in background - let _detached_job: JoinHandle<_> = tokio::spawn(job_with_err_msg); - } else { - // Await and return the single result in non-daemon mode, with retries if necessary - match job.await { - Ok(_) => return Ok(()), - Err(e) => { - if retries_left == 0 { - return Err(e); - } else { - retries_left -= 1; - debug!( - "{}/{}, group {:?}: attestation failure: {}", - batch_no, - batch_count, - batch.group_name, - e.to_string() - ); - info!( - "Batch {}/{}, group {:?}: retrying in {}.{}s, {} retries left", - batch_no, - batch_count, - batch.group_name, - retry_interval.as_secs(), - retry_interval.subsec_millis(), - retries_left, - ); - - tokio::time::sleep(retry_interval).await; - } - } - } + batch_no, batch_count, batch.group_name + ); } + // This short-lived permit prevents scheduling + // excess attestation jobs (which could eventually + // eat all memory). It is freed as soon as we + // leave this code block. + let _permit4sched = sema.acquire().await?; + + let batch_no4err_msg = batch_no.clone(); + let batch_count4err_msg = batch_count.clone(); + let group_name4err_msg = batch.group_name.clone(); + + // We never get to error reporting in daemon mode, attach a map_err + let job_with_err_msg = job.map_err(move |e| { + warn!( + "Batch {}/{}, group {:?} ERR: {:#?}", + batch_no4err_msg, batch_count4err_msg, group_name4err_msg, e + ); + e + }); + + // Spawn the job in background + let _detached_job: JoinHandle<_> = tokio::spawn(job_with_err_msg); + batch.last_job_finished_at = Instant::now(); } } +pub struct AttestationRetryJobArgs { + pub batch_no: usize, + pub batch_count: usize, + pub group_name: String, + pub symbols: Vec, + pub n_retries: usize, + pub retry_interval: Duration, + pub rpc_cfg: Arc>, + pub p2w_addr: Pubkey, + pub p2w_config: Pyth2WormholeConfig, + pub payer: Keypair, + pub message_q_mtx: Arc>, +} + +/// A future that cranks a batch up to n_retries times, pausing for +/// retry_interval in between; Used exclusively in non-daemon mode +async fn attestation_retry_job(args: AttestationRetryJobArgs) -> Result<(), ErrBoxSend> { + let AttestationRetryJobArgs { + batch_no, + batch_count, + group_name, + symbols, + n_retries, + retry_interval, + rpc_cfg, + p2w_addr, + p2w_config, + payer, + message_q_mtx, + } = args; + + let mut res = Err(format!( + "attestation_retry_job INTERNAL: Could not get a single attestation job result" + ) + .into()); + + for _i in 0..=n_retries { + res = attestation_job(AttestationJobArgs { + rlmtx: rpc_cfg.clone(), + batch_no, + batch_count, + group_name: group_name.clone(), + p2w_addr, + config: p2w_config.clone(), + payer: Keypair::from_bytes(&payer.to_bytes()).unwrap(), // Keypair has no clone + symbols: symbols.clone(), + max_jobs_sema: Arc::new(Semaphore::new(1)), // Not important for non-daemon mode + message_q_mtx: message_q_mtx.clone(), + }) + .await; + + // Finish early on success + if res.is_ok() { + break; + } + + tokio::time::sleep(retry_interval).await; + } + + res +} + +/// Arguments for attestation_job(). This struct rules out same-type +/// ordering errors due to the large argument count +pub struct AttestationJobArgs { + pub rlmtx: Arc>, + pub batch_no: usize, + pub batch_count: usize, + pub group_name: String, + pub p2w_addr: Pubkey, + pub config: Pyth2WormholeConfig, + pub payer: Keypair, + pub symbols: Vec, + pub max_jobs_sema: Arc, + pub message_q_mtx: Arc>, +} + /// A future for a single attempt to attest a batch on Solana. -async fn attestation_job( - rlmtx: Arc>, - batch_no: usize, - batch_count: usize, - group_name: String, - p2w_addr: Pubkey, - config: Pyth2WormholeConfig, - payer: Keypair, - symbols: Vec, - max_jobs_sema: Arc, - message_q_mtx: Arc>, -) -> Result<(), ErrBoxSend> { +async fn attestation_job(args: AttestationJobArgs) -> Result<(), ErrBoxSend> { + let AttestationJobArgs { + rlmtx, + batch_no, + batch_count, + group_name, + p2w_addr, + config, + payer, + symbols, + max_jobs_sema, + message_q_mtx, + } = args; + // Will be dropped after attestation is complete let _permit = max_jobs_sema.acquire().await?; diff --git a/solana/pyth2wormhole/client/src/message.rs b/solana/pyth2wormhole/client/src/message.rs index 0ad7a293..9b7d20b4 100644 --- a/solana/pyth2wormhole/client/src/message.rs +++ b/solana/pyth2wormhole/client/src/message.rs @@ -1,7 +1,6 @@ //! Re-usable message scheme for pyth2wormhole use log::debug; -use solana_program::system_instruction; use std::{ collections::VecDeque, time::{ @@ -89,6 +88,7 @@ impl P2WMessageQueue { } } +#[cfg(test)] pub mod test { use super::*; diff --git a/solana/pyth2wormhole/program/src/config.rs b/solana/pyth2wormhole/program/src/config.rs index 396275cc..30e32bae 100644 --- a/solana/pyth2wormhole/program/src/config.rs +++ b/solana/pyth2wormhole/program/src/config.rs @@ -38,7 +38,6 @@ pub type Pyth2WormholeConfig = Pyth2WormholeConfigV3; pub type P2WConfigAccount<'b, const IsInitialized: AccountState> = P2WConfigAccountV3<'b, IsInitialized>; - impl Owned for Pyth2WormholeConfig { fn owner(&self) -> AccountOwner { AccountOwner::This @@ -120,7 +119,7 @@ impl From for Pyth2WormholeConfigV2 { } // Added ops_owner which can toggle the is_active field -#[derive(Clone, Default, BorshDeserialize, BorshSerialize)] +#[derive(Clone, Default, Hash, BorshDeserialize, BorshSerialize, PartialEq)] #[cfg_attr(feature = "client", derive(Debug))] pub struct Pyth2WormholeConfigV3 { /// Authority owning this contract @@ -162,7 +161,7 @@ impl From for Pyth2WormholeConfigV3 { pyth_owner, max_batch_size, is_active: true, - ops_owner: None + ops_owner: None, } } } diff --git a/third_party/pyth/p2w_autoattest.py b/third_party/pyth/p2w_autoattest.py index 9e4875fa..9c996b68 100755 --- a/third_party/pyth/p2w_autoattest.py +++ b/third_party/pyth/p2w_autoattest.py @@ -184,6 +184,7 @@ if P2W_ATTESTATION_CFG is None: cfg_yaml = f""" --- mapping_addr: {mapping_addr} +mapping_reload_interval_mins: 1 # Very fast for testing purposes min_rpc_interval_ms: 0 # RIP RPC max_batch_jobs: 1000 # Where we're going there's no oomkiller symbol_groups: @@ -214,7 +215,7 @@ symbol_groups: - group_name: longer_interval_sensitive_changes conditions: min_interval_secs: 10 - price_changed_pct: 3 + price_changed_bps: 300 symbols: """ @@ -232,7 +233,7 @@ symbol_groups: - group_name: mapping conditions: min_interval_secs: 30 - price_changed_pct: 5 + price_changed_bps: 500 symbols: [] """