From 3beffdfe464a5781b19f14760d7c5bf356974aec Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Fri, 2 Dec 2022 09:35:33 -0800 Subject: [PATCH] Configure symbol groups by name (#403) * Refactor to make this change easier * stop mutating AttestationConfig * get the product names also * ok * refactor * cleanup * more cleanup * more cleanup * comment * i think this works * fix stuff * clippy * more cleanup * main * main * fix formatting * blah * test * cleanup * fix python * config * fix test * grr * grr * comments Co-authored-by: Jayant Krishnamurthy --- .../programs/remote-executor/src/lib.rs | 2 +- .../client/src/attestation_cfg.rs | 506 +++++++++++++----- .../pyth2wormhole/client/src/batch_state.rs | 17 +- solana/pyth2wormhole/client/src/lib.rs | 34 +- solana/pyth2wormhole/client/src/main.rs | 131 +++-- solana/pyth2wormhole/program/src/config.rs | 2 +- third_party/pyth/p2w_autoattest.py | 16 +- 7 files changed, 489 insertions(+), 219 deletions(-) diff --git a/pythnet/remote-executor/programs/remote-executor/src/lib.rs b/pythnet/remote-executor/programs/remote-executor/src/lib.rs index bacb4251..74afed7d 100644 --- a/pythnet/remote-executor/programs/remote-executor/src/lib.rs +++ b/pythnet/remote-executor/programs/remote-executor/src/lib.rs @@ -1,5 +1,5 @@ #![deny(warnings)] -#![allow(clippy::result_large_err)] +#![allow(clippy::result_unit_err)] use { anchor_lang::{ diff --git a/solana/pyth2wormhole/client/src/attestation_cfg.rs b/solana/pyth2wormhole/client/src/attestation_cfg.rs index 76c0177c..5c0ce34d 100644 --- a/solana/pyth2wormhole/client/src/attestation_cfg.rs +++ b/solana/pyth2wormhole/client/src/attestation_cfg.rs @@ -1,6 +1,15 @@ use { - crate::BatchState, - log::info, + crate::{ + attestation_cfg::SymbolConfig::{ + Key, + Name, + }, + P2WProductAccount, + }, + log::{ + info, + warn, + }, serde::{ de::Error, Deserialize, @@ -14,25 +23,26 @@ use { HashMap, HashSet, }, - iter, str::FromStr, }, }; /// Pyth2wormhole config specific to attestation requests -#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)] +#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] pub struct AttestationConfig { #[serde(default = "default_min_msg_reuse_interval_ms")] - pub min_msg_reuse_interval_ms: u64, + pub min_msg_reuse_interval_ms: u64, #[serde(default = "default_max_msg_accounts")] - pub max_msg_accounts: u64, - /// Optionally, we take a mapping account to add remaining symbols from a Pyth deployments. These symbols are processed under attestation conditions for the `default` symbol group. + pub max_msg_accounts: u64, + + /// Optionally, we take a mapping account to add remaining symbols from a Pyth deployments. + /// These symbols are processed under `default_attestation_conditions`. #[serde( deserialize_with = "opt_pubkey_string_de", serialize_with = "opt_pubkey_string_ser", default // Uses Option::default() which is None )] - pub mapping_addr: Option, + pub mapping_addr: Option, /// The known symbol list will be reloaded based off this /// interval, to account for mapping changes. Note: This interval /// will only work if the mapping address is defined. Whenever @@ -41,88 +51,230 @@ pub struct AttestationConfig { /// symbol list, and before stopping the pre-existing obsolete /// jobs to maintain uninterrupted cranking. #[serde(default = "default_mapping_reload_interval_mins")] - pub mapping_reload_interval_mins: u64, + pub mapping_reload_interval_mins: u64, #[serde(default = "default_min_rpc_interval_ms")] - /// Rate-limiting minimum delay between RPC requests in milliseconds" - pub min_rpc_interval_ms: u64, - pub symbol_groups: Vec, + /// Rate-limiting minimum delay between RPC requests in milliseconds + pub min_rpc_interval_ms: u64, + /// Attestation conditions that will be used for any symbols included in the mapping + /// that aren't explicitly in one of the groups below, and any groups without explicitly + /// configured attestation conditions. + pub default_attestation_conditions: AttestationConditions, + + /// Groups of symbols to publish. + pub symbol_groups: Vec, } impl AttestationConfig { - /// Merges new symbols into the attestation config. Pre-existing - /// new symbols are ignored. The new_group_name group can already - /// exist - symbols will be appended to `symbols` field. - pub fn add_symbols( - &mut self, - mut new_symbols: HashMap>, - group_name: String, // Which group is extended by the new symbols - ) { - // Remove pre-existing symbols from the new symbols collection - for existing_group in &self.symbol_groups { - for existing_sym in &existing_group.symbols { - // Check if new symbols mention this product - if let Some(prices) = new_symbols.get_mut(&existing_sym.product_addr) { - // Prune the price if exists - prices.remove(&existing_sym.price_addr); + /// Instantiate the batches of symbols to attest by matching the config against the collection + /// of on-chain product accounts. + pub fn instantiate_batches( + &self, + product_accounts: &[P2WProductAccount], + max_batch_size: usize, + ) -> Vec { + // Construct mapping from the name of each product account to its corresponding symbols + let mut name_to_symbols: HashMap> = HashMap::new(); + for product_account in product_accounts { + for price_account_key in &product_account.price_account_keys { + if let Some(name) = &product_account.name { + let symbol = P2WSymbol { + name: Some(name.clone()), + product_addr: product_account.key, + price_addr: *price_account_key, + }; + + name_to_symbols + .entry(name.clone()) + .or_insert(vec![]) + .push(symbol); } } } - // Turn the pruned symbols into P2WSymbol structs - let mut new_symbols_vec = new_symbols - .drain() // Makes us own the elements and lets us move them - .flat_map(|(prod, prices)| iter::zip(iter::repeat(prod), prices)) // Flatten the tuple iterators - .map(|(prod, price)| P2WSymbol { - name: None, - product_addr: prod, - price_addr: price, - }) - .collect::>(); + // Instantiate batches from the configured symbol groups. + let mut configured_batches: Vec = vec![]; + for group in &self.symbol_groups { + let group_symbols: Vec = group + .symbols + .iter() + .flat_map(|symbol| match &symbol { + Key { + name, + product, + price, + } => { + vec![P2WSymbol { + name: name.clone(), + product_addr: *product, + price_addr: *price, + }] + } + Name { name } => { + let maybe_matched_symbols: Option<&Vec> = + name_to_symbols.get(name); + if let Some(matched_symbols) = maybe_matched_symbols { + matched_symbols.clone() + } else { + // It's slightly unfortunate that this is a warning, but it seems better than crashing. + // The data in the mapping account can change while the attester is running and trigger this case, + // which means that it is not necessarily a configuration problem. + // Note that any named symbols in the config which fail to match will still be included + // in the remaining_symbols group below. + warn!( + "Could not find product account for configured symbol {}", + name + ); + vec![] + } + } + }) + .collect(); - // Find and extend OR create the group of specified name - match self - .symbol_groups - .iter_mut() - .find(|g| g.group_name == group_name) // Advances the iterator and returns Some(item) on first hit - { - Some(existing_group) => existing_group.symbols.append(&mut new_symbols_vec), - None if !new_symbols_vec.is_empty() => { - // Group does not exist, assume defaults - let new_group = SymbolGroup { - group_name, - conditions: Default::default(), - symbols: new_symbols_vec, - }; - - self.symbol_groups.push(new_group); - } - None => {} + let group_conditions = group + .conditions + .as_ref() + .unwrap_or(&self.default_attestation_conditions); + configured_batches.extend(AttestationConfig::partition_into_batches( + &group.group_name, + max_batch_size, + group_conditions, + group_symbols, + )) } + + // Find any accounts not included in existing batches and group them into a remainder batch + let existing_price_accounts: HashSet = configured_batches + .iter() + .flat_map(|batch| batch.symbols.iter().map(|symbol| symbol.price_addr)) + .chain( + configured_batches + .iter() + .flat_map(|batch| batch.symbols.iter().map(|symbol| symbol.price_addr)), + ) + .collect(); + + let mut remaining_symbols: Vec = vec![]; + for product_account in product_accounts { + for price_account_key in &product_account.price_account_keys { + if !existing_price_accounts.contains(price_account_key) { + let symbol = P2WSymbol { + name: product_account.name.clone(), + product_addr: product_account.key, + price_addr: *price_account_key, + }; + remaining_symbols.push(symbol); + } + } + } + let remaining_batches = AttestationConfig::partition_into_batches( + &"mapping".to_owned(), + max_batch_size, + &self.default_attestation_conditions, + remaining_symbols, + ); + + let all_batches = configured_batches + .into_iter() + .chain(remaining_batches.into_iter()) + .collect::>(); + + for batch in &all_batches { + info!( + "Batch {:?}, {} symbols", + batch.group_name, + batch.symbols.len(), + ); + } + + all_batches } - pub fn as_batches(&self, max_batch_size: usize) -> Vec { - self.symbol_groups - .iter() - .flat_map(move |g| { - let conditions4closure = g.conditions.clone(); - let name4closure = g.group_name.clone(); - - info!("Group {:?}, {} symbols", g.group_name, g.symbols.len(),); - - // Divide group into batches - g.symbols - .as_slice() - .chunks(max_batch_size) - .map(move |symbols| { - BatchState::new(name4closure.clone(), symbols, conditions4closure.clone()) - }) + /// Partition symbols into a collection of batches, each of which contains no more than + /// `max_batch_size` symbols. + fn partition_into_batches( + batch_name: &String, + max_batch_size: usize, + conditions: &AttestationConditions, + symbols: Vec, + ) -> Vec { + symbols + .as_slice() + .chunks(max_batch_size) + .map(move |batch_symbols| SymbolBatch { + group_name: batch_name.to_owned(), + symbols: batch_symbols.to_vec(), + conditions: conditions.clone(), }) .collect() } } -#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)] -pub struct SymbolGroup { +#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] +pub struct SymbolGroupConfig { + pub group_name: String, + /// Attestation conditions applied to all symbols in this group + /// If not provided, use the default attestation conditions from `AttestationConfig`. + pub conditions: Option, + + /// The symbols to publish in this group. + pub symbols: Vec, +} + +/// Config entry for a symbol to attest. +#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum SymbolConfig { + /// A symbol specified by its product name. + Name { + /// The name of the symbol. This name is matched against the "symbol" field in the product + /// account metadata. If multiple price accounts have this name (either because 2 product + /// accounts have the same symbol or a single product account has multiple price accounts), + /// it matches *all* of them and puts them into this group. + name: String, + }, + /// A symbol specified by its product and price account keys. + Key { + /// Optional human-readable name for the symbol (for logging purposes). + /// This field does not need to match the on-chain data for the product. + name: Option, + + #[serde( + deserialize_with = "pubkey_string_de", + serialize_with = "pubkey_string_ser" + )] + product: Pubkey, + #[serde( + deserialize_with = "pubkey_string_de", + serialize_with = "pubkey_string_ser" + )] + price: Pubkey, + }, +} + +impl ToString for SymbolConfig { + fn to_string(&self) -> String { + match &self { + Name { name } => name.clone(), + Key { + name: Some(name), + product: _, + price: _, + } => name.clone(), + Key { + name: None, + product, + price: _, + } => { + format!("Unnamed product {}", product) + } + } + } +} + +/// A batch of symbols that's ready to be attested. Includes all necessary information +/// (such as price/product account keys). +#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] +pub struct SymbolBatch { pub group_name: String, /// Attestation conditions applied to all symbols in this group pub conditions: AttestationConditions, @@ -157,7 +309,7 @@ pub const fn default_max_batch_jobs() -> usize { /// of the active conditions is met. Option<> fields can be /// de-activated with None. All conditions are inactive by default, /// except for the non-Option ones. -#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)] +#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] pub struct AttestationConditions { /// Baseline, unconditional attestation interval. Attestation is triggered if the specified interval elapsed since last attestation. #[serde(default = "default_min_interval_secs")] @@ -207,7 +359,6 @@ impl Default for AttestationConditions { } } -/// Config entry for a Pyth product + price pair #[derive(Clone, Default, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] pub struct P2WSymbol { /// User-defined human-readable name @@ -274,54 +425,59 @@ where mod tests { use { super::*, + crate::attestation_cfg::SymbolConfig::{ + Key, + Name, + }, solitaire::ErrBox, }; #[test] fn test_sanity() -> Result<(), ErrBox> { - let fastbois = SymbolGroup { + let fastbois = SymbolGroupConfig { group_name: "fast bois".to_owned(), - conditions: AttestationConditions { + conditions: Some(AttestationConditions { min_interval_secs: 5, ..Default::default() - }, + }), symbols: vec![ - P2WSymbol { - name: Some("ETHUSD".to_owned()), - ..Default::default() + Name { + name: "ETHUSD".to_owned(), }, - P2WSymbol { - name: Some("BTCUSD".to_owned()), - ..Default::default() + Key { + name: Some("BTCUSD".to_owned()), + product: Pubkey::new_unique(), + price: Pubkey::new_unique(), }, ], }; - let slowbois = SymbolGroup { + let slowbois = SymbolGroupConfig { group_name: "slow bois".to_owned(), - conditions: AttestationConditions { + conditions: Some(AttestationConditions { min_interval_secs: 200, ..Default::default() - }, + }), symbols: vec![ - P2WSymbol { - name: Some("CNYAUD".to_owned()), - ..Default::default() + Name { + name: "CNYAUD".to_owned(), }, - P2WSymbol { - name: Some("INRPLN".to_owned()), - ..Default::default() + Key { + name: None, + product: Pubkey::new_unique(), + price: Pubkey::new_unique(), }, ], }; let cfg = AttestationConfig { - min_msg_reuse_interval_ms: 1000, - max_msg_accounts: 100_000, - min_rpc_interval_ms: 2123, - mapping_addr: None, - mapping_reload_interval_mins: 42, - symbol_groups: vec![fastbois, slowbois], + min_msg_reuse_interval_ms: 1000, + max_msg_accounts: 100_000, + min_rpc_interval_ms: 2123, + mapping_addr: None, + mapping_reload_interval_mins: 42, + default_attestation_conditions: AttestationConditions::default(), + symbol_groups: vec![fastbois, slowbois], }; let serialized = serde_yaml::to_string(&cfg)?; @@ -334,44 +490,128 @@ mod tests { } #[test] - fn test_add_symbols_works() -> Result<(), ErrBox> { - let empty_config = AttestationConfig { - min_msg_reuse_interval_ms: 1000, - max_msg_accounts: 100, - min_rpc_interval_ms: 42422, - mapping_addr: None, - mapping_reload_interval_mins: 42, - symbol_groups: vec![], + fn test_instantiate_batches() -> Result<(), ErrBox> { + let btc_product_key = Pubkey::new_unique(); + let btc_price_key = Pubkey::new_unique(); + + let eth_product_key = Pubkey::new_unique(); + let eth_price_key_1 = Pubkey::new_unique(); + let eth_price_key_2 = Pubkey::new_unique(); + + let unk_product_key = Pubkey::new_unique(); + let unk_price_key = Pubkey::new_unique(); + + let eth_dup_product_key = Pubkey::new_unique(); + let eth_dup_price_key = Pubkey::new_unique(); + + let attestation_conditions_1 = AttestationConditions { + min_interval_secs: 5, + ..Default::default() }; - let mock_new_symbols = (0..255) - .map(|sym_idx| { - let mut mock_prod_bytes = [0u8; 32]; - mock_prod_bytes[31] = sym_idx; + let products = vec![ + P2WProductAccount { + name: Some("ETHUSD".to_owned()), + key: eth_product_key, + price_account_keys: vec![eth_price_key_1, eth_price_key_2], + }, + P2WProductAccount { + name: None, + key: unk_product_key, + price_account_keys: vec![unk_price_key], + }, + ]; - let mut mock_prices = HashSet::new(); - for _px_idx in 1..=5 { - let mut mock_price_bytes = [0u8; 32]; - mock_price_bytes[31] = sym_idx; - mock_prices.insert(Pubkey::new_from_array(mock_price_bytes)); + let group1 = SymbolGroupConfig { + group_name: "group 1".to_owned(), + conditions: Some(attestation_conditions_1.clone()), + symbols: vec![ + Key { + name: Some("BTCUSD".to_owned()), + price: btc_price_key, + product: btc_product_key, + }, + Name { + name: "ETHUSD".to_owned(), + }, + ], + }; + + let group2 = SymbolGroupConfig { + group_name: "group 2".to_owned(), + conditions: None, + symbols: vec![Key { + name: Some("ETHUSD".to_owned()), + price: eth_dup_price_key, + product: eth_dup_product_key, + }], + }; + + let default_attestation_conditions = AttestationConditions { + min_interval_secs: 1, + ..Default::default() + }; + + let cfg = AttestationConfig { + min_msg_reuse_interval_ms: 1000, + max_msg_accounts: 100_000, + min_rpc_interval_ms: 2123, + mapping_addr: None, + mapping_reload_interval_mins: 42, + default_attestation_conditions: default_attestation_conditions.clone(), + symbol_groups: vec![group1, group2], + }; + + let batches = cfg.instantiate_batches(&products, 2); + + assert_eq!( + batches, + vec![ + SymbolBatch { + group_name: "group 1".to_owned(), + conditions: attestation_conditions_1.clone(), + symbols: vec![ + P2WSymbol { + name: Some("BTCUSD".to_owned()), + product_addr: btc_product_key, + price_addr: btc_price_key, + }, + P2WSymbol { + name: Some("ETHUSD".to_owned()), + product_addr: eth_product_key, + price_addr: eth_price_key_1, + } + ], + }, + SymbolBatch { + group_name: "group 1".to_owned(), + conditions: attestation_conditions_1, + symbols: vec![P2WSymbol { + name: Some("ETHUSD".to_owned()), + product_addr: eth_product_key, + price_addr: eth_price_key_2, + }], + }, + SymbolBatch { + group_name: "group 2".to_owned(), + conditions: default_attestation_conditions.clone(), + symbols: vec![P2WSymbol { + name: Some("ETHUSD".to_owned()), + product_addr: eth_dup_product_key, + price_addr: eth_dup_price_key, + }], + }, + SymbolBatch { + group_name: "mapping".to_owned(), + conditions: default_attestation_conditions, + symbols: vec![P2WSymbol { + name: None, + product_addr: unk_product_key, + price_addr: unk_price_key, + }], } - - (Pubkey::new_from_array(mock_prod_bytes), mock_prices) - }) - .collect::>>(); - - let mut config1 = empty_config.clone(); - - config1.add_symbols(mock_new_symbols.clone(), "default".to_owned()); - - let mut config2 = config1.clone(); - - // Should not be created because there's no new symbols to add - // (we're adding identical mock_new_symbols again) - config2.add_symbols(mock_new_symbols, "default2".to_owned()); - - assert_ne!(config1, empty_config); // Check that config grows from empty - assert_eq!(config1, config2); // Check that no changes are made if all symbols are already in there + ] + ); Ok(()) } diff --git a/solana/pyth2wormhole/client/src/batch_state.rs b/solana/pyth2wormhole/client/src/batch_state.rs index 224f45e8..7574e2f2 100644 --- a/solana/pyth2wormhole/client/src/batch_state.rs +++ b/solana/pyth2wormhole/client/src/batch_state.rs @@ -1,5 +1,6 @@ use { crate::{ + attestation_cfg::SymbolBatch, AttestationConditions, P2WSymbol, }, @@ -27,17 +28,13 @@ pub struct BatchState { } impl<'a> BatchState { - pub fn new( - group_name: String, - symbols: &[P2WSymbol], - conditions: AttestationConditions, - ) -> Self { + pub fn new(group: &SymbolBatch) -> Self { Self { - group_name, - symbols: symbols.to_vec(), - conditions, - last_known_symbol_states: vec![None; symbols.len()], - last_job_finished_at: Instant::now(), + group_name: group.group_name.clone(), + symbols: group.symbols.clone(), + conditions: group.conditions.clone(), + last_known_symbol_states: vec![None; group.symbols.len()], + last_job_finished_at: Instant::now(), } } diff --git a/solana/pyth2wormhole/client/src/lib.rs b/solana/pyth2wormhole/client/src/lib.rs index 78e134c8..1cc29029 100644 --- a/solana/pyth2wormhole/client/src/lib.rs +++ b/solana/pyth2wormhole/client/src/lib.rs @@ -80,10 +80,6 @@ use { AccountState, ErrBox, }, - std::collections::{ - HashMap, - HashSet, - }, }; /// Future-friendly version of solitaire::ErrBox @@ -402,8 +398,8 @@ pub fn gen_attest_tx( pub async fn crawl_pyth_mapping( rpc_client: &RpcClient, first_mapping_addr: &Pubkey, -) -> Result>, ErrBox> { - let mut ret = HashMap::new(); +) -> Result, ErrBox> { + let mut ret: Vec = vec![]; let mut n_mappings = 1; // We assume the first one must be valid let mut n_products_total = 0; // Grand total products in all mapping accounts @@ -439,6 +435,13 @@ pub async fn crawl_pyth_mapping( } }; + let mut prod_name = None; + for (key, val) in prod.iter() { + if key.eq_ignore_ascii_case("symbol") { + prod_name = Some(val.to_owned()); + } + } + let mut price_addr = prod.px_acc; let mut n_prod_prices = 0; @@ -454,6 +457,7 @@ pub async fn crawl_pyth_mapping( } // loop until the last non-zero PriceAccount.next account + let mut price_accounts: Vec = vec![]; loop { let price_bytes = rpc_client.get_account_data(&price_addr).await?; let price = match load_price_account(&price_bytes) { @@ -464,11 +468,7 @@ pub async fn crawl_pyth_mapping( } }; - // Append to existing set or create a new map entry - ret.entry(*prod_addr) - .or_insert(HashSet::new()) - .insert(price_addr); - + price_accounts.push(price_addr); n_prod_prices += 1; if price.next == Pubkey::default() { @@ -482,6 +482,11 @@ pub async fn crawl_pyth_mapping( price_addr = price.next; } + ret.push(P2WProductAccount { + key: *prod_addr, + name: prod_name.clone(), + price_account_keys: price_accounts, + }); n_prices_total += n_prod_prices; } @@ -508,3 +513,10 @@ pub async fn crawl_pyth_mapping( Ok(ret) } + +#[derive(Clone, Debug)] +pub struct P2WProductAccount { + pub key: Pubkey, + pub name: Option, + pub price_account_keys: Vec, +} diff --git a/solana/pyth2wormhole/client/src/main.rs b/solana/pyth2wormhole/client/src/main.rs index b1d915f6..235ce55f 100644 --- a/solana/pyth2wormhole/client/src/main.rs +++ b/solana/pyth2wormhole/client/src/main.rs @@ -1,5 +1,3 @@ -pub mod cli; - use { clap::Parser, cli::{ @@ -30,7 +28,23 @@ use { attest::P2W_MAX_BATCH_SIZE, Pyth2WormholeConfig, }, - pyth2wormhole_client::*, + pyth2wormhole_client::{ + attestation_cfg::SymbolBatch, + crawl_pyth_mapping, + gen_attest_tx, + gen_init_tx, + gen_migrate_tx, + gen_set_config_tx, + gen_set_is_active_tx, + get_config_account, + start_metrics_server, + AttestationConfig, + BatchState, + ErrBoxSend, + P2WMessageQueue, + P2WSymbol, + RLMutex, + }, sha3::{ Digest, Sha3_256, @@ -68,6 +82,8 @@ use { }, }; +pub mod cli; + pub const SEQNO_PREFIX: &str = "Program log: Sequence: "; lazy_static! { @@ -272,7 +288,7 @@ async fn handle_attest_daemon_mode( rpc_cfg: Arc>, payer: Keypair, p2w_addr: Pubkey, - mut attestation_cfg: AttestationConfig, + attestation_cfg: AttestationConfig, metrics_bind_addr: SocketAddr, ) -> Result<(), ErrBox> { tokio::spawn(start_metrics_server(metrics_bind_addr)); @@ -298,6 +314,7 @@ async fn handle_attest_daemon_mode( attestation_cfg.max_msg_accounts as usize, ))); + let mut batch_cfg = vec![]; // This loop cranks attestations without interruption. This is // achieved by spinning up a new up-to-date symbol set before // letting go of the previous one. Additionally, hash of on-chain @@ -318,29 +335,18 @@ async fn handle_attest_daemon_mode( }; // Use the mapping if specified - if let Some(mapping_addr) = attestation_cfg.mapping_addr.as_ref() { - match crawl_pyth_mapping(&lock_and_make_rpc(&rpc_cfg).await, mapping_addr).await { - Ok(additional_accounts) => { - debug!( - "Crawled mapping {} data:\n{:#?}", - mapping_addr, additional_accounts - ); - attestation_cfg.add_symbols(additional_accounts, "mapping".to_owned()); - } - // De-escalate crawling errors; A temporary failure to - // look up the mapping should not crash the attester - Err(e) => { - error!("Could not crawl mapping {}: {:?}", mapping_addr, e); - } - } - } - debug!( - "Attestation config (includes mapping accounts):\n{:#?}", - attestation_cfg - ); + // If we cannot query the mapping account, retain the existing batch configuration. + batch_cfg = attestation_config_to_batches( + &rpc_cfg, + &attestation_cfg, + config.max_batch_size as usize, + ) + .await + .unwrap_or(batch_cfg); + // Hash currently known config - hasher.update(serde_yaml::to_vec(&attestation_cfg)?); + hasher.update(serde_yaml::to_vec(&batch_cfg)?); hasher.update(borsh::to_vec(&config)?); let new_cfg_hash = hasher.finalize_reset(); @@ -354,7 +360,7 @@ async fn handle_attest_daemon_mode( info!("Spinning up attestation sched jobs"); // Start the new sched futures let new_sched_futs_handle = tokio::spawn(prepare_attestation_sched_jobs( - &attestation_cfg, + &batch_cfg, &config, &rpc_cfg, &p2w_addr, @@ -372,7 +378,7 @@ async fn handle_attest_daemon_mode( // Base case for first attestation attempt old_sched_futs_state = Some(( tokio::spawn(prepare_attestation_sched_jobs( - &attestation_cfg, + &batch_cfg, &config, &rpc_cfg, &p2w_addr, @@ -429,7 +435,7 @@ async fn lock_and_make_rpc(rlmtx: &RLMutex) -> RpcClient { /// Non-daemon attestation scheduling async fn handle_attest_non_daemon_mode( - mut attestation_cfg: AttestationConfig, + attestation_cfg: AttestationConfig, rpc_cfg: Arc>, p2w_addr: Pubkey, payer: Keypair, @@ -438,29 +444,17 @@ async fn handle_attest_non_daemon_mode( ) -> Result<(), ErrBox> { let p2w_cfg = get_config_account(&lock_and_make_rpc(&rpc_cfg).await, &p2w_addr).await?; - // Use the mapping if specified - if let Some(mapping_addr) = attestation_cfg.mapping_addr.as_ref() { - match crawl_pyth_mapping(&lock_and_make_rpc(&rpc_cfg).await, mapping_addr).await { - Ok(additional_accounts) => { - debug!( - "Crawled mapping {} data:\n{:#?}", - mapping_addr, additional_accounts - ); - attestation_cfg.add_symbols(additional_accounts, "mapping".to_owned()); - } - // De-escalate crawling errors; A temporary failure to - // look up the mapping should not crash the attester - Err(e) => { - error!("Could not crawl mapping {}: {:?}", mapping_addr, e); - } - } - } - debug!( - "Attestation config (includes mapping accounts):\n{:#?}", - attestation_cfg - ); + let batch_config = + attestation_config_to_batches(&rpc_cfg, &attestation_cfg, p2w_cfg.max_batch_size as usize) + .await + .unwrap_or_else(|_| { + attestation_cfg.instantiate_batches(&[], p2w_cfg.max_batch_size as usize) + }); - let batches = attestation_cfg.as_batches(p2w_cfg.max_batch_size as usize); + let batches: Vec<_> = batch_config + .into_iter() + .map(|x| BatchState::new(&x)) + .collect(); let batch_count = batches.len(); // For enforcing min_msg_reuse_interval_ms, we keep a piece of @@ -510,22 +504,45 @@ async fn handle_attest_non_daemon_mode( Ok(()) } +/// Generate batches to attest by retrieving the on-chain product account data and grouping it +/// according to the configuration in `attestation_cfg`. +async fn attestation_config_to_batches( + rpc_cfg: &Arc>, + attestation_cfg: &AttestationConfig, + max_batch_size: usize, +) -> Result, ErrBox> { + // Use the mapping if specified + let products = if let Some(mapping_addr) = attestation_cfg.mapping_addr.as_ref() { + let product_accounts_res = + crawl_pyth_mapping(&lock_and_make_rpc(rpc_cfg).await, mapping_addr).await; + + if let Err(err) = &product_accounts_res { + error!( + "Could not crawl mapping {}: {:?}", + attestation_cfg.mapping_addr.unwrap_or_default(), + err + ); + } + + product_accounts_res? + } else { + vec![] + }; + + Ok(attestation_cfg.instantiate_batches(&products, max_batch_size)) +} + /// Constructs attestation scheduling jobs from attestation config. fn prepare_attestation_sched_jobs( - attestation_cfg: &AttestationConfig, + batch_cfg: &[SymbolBatch], p2w_cfg: &Pyth2WormholeConfig, rpc_cfg: &Arc>, p2w_addr: &Pubkey, payer: &Keypair, message_q_mtx: Arc>, ) -> futures::future::JoinAll>> { - info!( - "{} symbol groups read, dividing into batches", - attestation_cfg.symbol_groups.len(), - ); - // Flatten attestation config into a plain list of batches - let batches: Vec<_> = attestation_cfg.as_batches(p2w_cfg.max_batch_size as usize); + let batches: Vec<_> = batch_cfg.iter().map(BatchState::new).collect(); let batch_count = batches.len(); diff --git a/solana/pyth2wormhole/program/src/config.rs b/solana/pyth2wormhole/program/src/config.rs index c2950821..e8e0919d 100644 --- a/solana/pyth2wormhole/program/src/config.rs +++ b/solana/pyth2wormhole/program/src/config.rs @@ -121,7 +121,7 @@ impl From for Pyth2WormholeConfigV2 { } // Added ops_owner which can toggle the is_active field -#[derive(Clone, Default, Hash, BorshDeserialize, BorshSerialize, PartialEq)] +#[derive(Clone, Default, Hash, BorshDeserialize, BorshSerialize, PartialEq, Eq)] #[cfg_attr(feature = "client", derive(Debug))] pub struct Pyth2WormholeConfigV3 { /// Authority owning this contract diff --git a/third_party/pyth/p2w_autoattest.py b/third_party/pyth/p2w_autoattest.py index 39fd3d97..122fdc4c 100755 --- a/third_party/pyth/p2w_autoattest.py +++ b/third_party/pyth/p2w_autoattest.py @@ -137,6 +137,8 @@ mapping_addr: {mapping_addr} mapping_reload_interval_mins: 1 # Very fast for testing purposes min_rpc_interval_ms: 0 # RIP RPC max_batch_jobs: 1000 # Where we're going there's no oomkiller +default_attestation_conditions: + min_interval_secs: 60 symbol_groups: - group_name: fast_interval_only conditions: @@ -155,9 +157,10 @@ symbol_groups: product = thing["product"] cfg_yaml += f""" - - name: {name} - price_addr: {price} - product_addr: {product}""" + - type: key + name: {name} + price: {price} + product: {product}""" # End of fast_interval_only @@ -175,9 +178,10 @@ symbol_groups: product = stuff["product"] cfg_yaml += f""" - - name: {name} - price_addr: {price} - product_addr: {product}""" + - type: key + name: {name} + price: {price} + product: {product}""" cfg_yaml += f""" - group_name: mapping