Configure symbol groups by name (#403)

* Refactor to make this change easier

* stop mutating AttestationConfig

* get the product names also

* ok

* refactor

* cleanup

* more cleanup

* more cleanup

* comment

* i think this works

* fix stuff

* clippy

* more cleanup

* main

* main

* fix formatting

* blah

* test

* cleanup

* fix python

* config

* fix test

* grr

* grr

* comments

Co-authored-by: Jayant Krishnamurthy <jkrishnamurthy@jumptrading.com>
This commit is contained in:
Jayant Krishnamurthy 2022-12-02 09:35:33 -08:00 committed by GitHub
parent 06b24609e1
commit 3beffdfe46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 489 additions and 219 deletions

View File

@ -1,5 +1,5 @@
#![deny(warnings)] #![deny(warnings)]
#![allow(clippy::result_large_err)] #![allow(clippy::result_unit_err)]
use { use {
anchor_lang::{ anchor_lang::{

View File

@ -1,6 +1,15 @@
use { use {
crate::BatchState, crate::{
log::info, attestation_cfg::SymbolConfig::{
Key,
Name,
},
P2WProductAccount,
},
log::{
info,
warn,
},
serde::{ serde::{
de::Error, de::Error,
Deserialize, Deserialize,
@ -14,25 +23,26 @@ use {
HashMap, HashMap,
HashSet, HashSet,
}, },
iter,
str::FromStr, str::FromStr,
}, },
}; };
/// Pyth2wormhole config specific to attestation requests /// Pyth2wormhole config specific to attestation requests
#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)] #[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)]
pub struct AttestationConfig { pub struct AttestationConfig {
#[serde(default = "default_min_msg_reuse_interval_ms")] #[serde(default = "default_min_msg_reuse_interval_ms")]
pub min_msg_reuse_interval_ms: u64, pub min_msg_reuse_interval_ms: u64,
#[serde(default = "default_max_msg_accounts")] #[serde(default = "default_max_msg_accounts")]
pub max_msg_accounts: u64, pub max_msg_accounts: u64,
/// Optionally, we take a mapping account to add remaining symbols from a Pyth deployments. These symbols are processed under attestation conditions for the `default` symbol group.
/// Optionally, we take a mapping account to add remaining symbols from a Pyth deployments.
/// These symbols are processed under `default_attestation_conditions`.
#[serde( #[serde(
deserialize_with = "opt_pubkey_string_de", deserialize_with = "opt_pubkey_string_de",
serialize_with = "opt_pubkey_string_ser", serialize_with = "opt_pubkey_string_ser",
default // Uses Option::default() which is None default // Uses Option::default() which is None
)] )]
pub mapping_addr: Option<Pubkey>, pub mapping_addr: Option<Pubkey>,
/// The known symbol list will be reloaded based off this /// The known symbol list will be reloaded based off this
/// interval, to account for mapping changes. Note: This interval /// interval, to account for mapping changes. Note: This interval
/// will only work if the mapping address is defined. Whenever /// will only work if the mapping address is defined. Whenever
@ -41,88 +51,230 @@ pub struct AttestationConfig {
/// symbol list, and before stopping the pre-existing obsolete /// symbol list, and before stopping the pre-existing obsolete
/// jobs to maintain uninterrupted cranking. /// jobs to maintain uninterrupted cranking.
#[serde(default = "default_mapping_reload_interval_mins")] #[serde(default = "default_mapping_reload_interval_mins")]
pub mapping_reload_interval_mins: u64, pub mapping_reload_interval_mins: u64,
#[serde(default = "default_min_rpc_interval_ms")] #[serde(default = "default_min_rpc_interval_ms")]
/// Rate-limiting minimum delay between RPC requests in milliseconds" /// Rate-limiting minimum delay between RPC requests in milliseconds
pub min_rpc_interval_ms: u64, pub min_rpc_interval_ms: u64,
pub symbol_groups: Vec<SymbolGroup>, /// Attestation conditions that will be used for any symbols included in the mapping
/// that aren't explicitly in one of the groups below, and any groups without explicitly
/// configured attestation conditions.
pub default_attestation_conditions: AttestationConditions,
/// Groups of symbols to publish.
pub symbol_groups: Vec<SymbolGroupConfig>,
} }
impl AttestationConfig { impl AttestationConfig {
/// Merges new symbols into the attestation config. Pre-existing /// Instantiate the batches of symbols to attest by matching the config against the collection
/// new symbols are ignored. The new_group_name group can already /// of on-chain product accounts.
/// exist - symbols will be appended to `symbols` field. pub fn instantiate_batches(
pub fn add_symbols( &self,
&mut self, product_accounts: &[P2WProductAccount],
mut new_symbols: HashMap<Pubkey, HashSet<Pubkey>>, max_batch_size: usize,
group_name: String, // Which group is extended by the new symbols ) -> Vec<SymbolBatch> {
) { // Construct mapping from the name of each product account to its corresponding symbols
// Remove pre-existing symbols from the new symbols collection let mut name_to_symbols: HashMap<String, Vec<P2WSymbol>> = HashMap::new();
for existing_group in &self.symbol_groups { for product_account in product_accounts {
for existing_sym in &existing_group.symbols { for price_account_key in &product_account.price_account_keys {
// Check if new symbols mention this product if let Some(name) = &product_account.name {
if let Some(prices) = new_symbols.get_mut(&existing_sym.product_addr) { let symbol = P2WSymbol {
// Prune the price if exists name: Some(name.clone()),
prices.remove(&existing_sym.price_addr); product_addr: product_account.key,
price_addr: *price_account_key,
};
name_to_symbols
.entry(name.clone())
.or_insert(vec![])
.push(symbol);
} }
} }
} }
// Turn the pruned symbols into P2WSymbol structs // Instantiate batches from the configured symbol groups.
let mut new_symbols_vec = new_symbols let mut configured_batches: Vec<SymbolBatch> = vec![];
.drain() // Makes us own the elements and lets us move them for group in &self.symbol_groups {
.flat_map(|(prod, prices)| iter::zip(iter::repeat(prod), prices)) // Flatten the tuple iterators let group_symbols: Vec<P2WSymbol> = group
.map(|(prod, price)| P2WSymbol { .symbols
name: None, .iter()
product_addr: prod, .flat_map(|symbol| match &symbol {
price_addr: price, Key {
}) name,
.collect::<Vec<P2WSymbol>>(); product,
price,
} => {
vec![P2WSymbol {
name: name.clone(),
product_addr: *product,
price_addr: *price,
}]
}
Name { name } => {
let maybe_matched_symbols: Option<&Vec<P2WSymbol>> =
name_to_symbols.get(name);
if let Some(matched_symbols) = maybe_matched_symbols {
matched_symbols.clone()
} else {
// It's slightly unfortunate that this is a warning, but it seems better than crashing.
// The data in the mapping account can change while the attester is running and trigger this case,
// which means that it is not necessarily a configuration problem.
// Note that any named symbols in the config which fail to match will still be included
// in the remaining_symbols group below.
warn!(
"Could not find product account for configured symbol {}",
name
);
vec![]
}
}
})
.collect();
// Find and extend OR create the group of specified name let group_conditions = group
match self .conditions
.symbol_groups .as_ref()
.iter_mut() .unwrap_or(&self.default_attestation_conditions);
.find(|g| g.group_name == group_name) // Advances the iterator and returns Some(item) on first hit configured_batches.extend(AttestationConfig::partition_into_batches(
{ &group.group_name,
Some(existing_group) => existing_group.symbols.append(&mut new_symbols_vec), max_batch_size,
None if !new_symbols_vec.is_empty() => { group_conditions,
// Group does not exist, assume defaults group_symbols,
let new_group = SymbolGroup { ))
group_name,
conditions: Default::default(),
symbols: new_symbols_vec,
};
self.symbol_groups.push(new_group);
}
None => {}
} }
// Find any accounts not included in existing batches and group them into a remainder batch
let existing_price_accounts: HashSet<Pubkey> = configured_batches
.iter()
.flat_map(|batch| batch.symbols.iter().map(|symbol| symbol.price_addr))
.chain(
configured_batches
.iter()
.flat_map(|batch| batch.symbols.iter().map(|symbol| symbol.price_addr)),
)
.collect();
let mut remaining_symbols: Vec<P2WSymbol> = vec![];
for product_account in product_accounts {
for price_account_key in &product_account.price_account_keys {
if !existing_price_accounts.contains(price_account_key) {
let symbol = P2WSymbol {
name: product_account.name.clone(),
product_addr: product_account.key,
price_addr: *price_account_key,
};
remaining_symbols.push(symbol);
}
}
}
let remaining_batches = AttestationConfig::partition_into_batches(
&"mapping".to_owned(),
max_batch_size,
&self.default_attestation_conditions,
remaining_symbols,
);
let all_batches = configured_batches
.into_iter()
.chain(remaining_batches.into_iter())
.collect::<Vec<SymbolBatch>>();
for batch in &all_batches {
info!(
"Batch {:?}, {} symbols",
batch.group_name,
batch.symbols.len(),
);
}
all_batches
} }
pub fn as_batches(&self, max_batch_size: usize) -> Vec<BatchState> { /// Partition symbols into a collection of batches, each of which contains no more than
self.symbol_groups /// `max_batch_size` symbols.
.iter() fn partition_into_batches(
.flat_map(move |g| { batch_name: &String,
let conditions4closure = g.conditions.clone(); max_batch_size: usize,
let name4closure = g.group_name.clone(); conditions: &AttestationConditions,
symbols: Vec<P2WSymbol>,
info!("Group {:?}, {} symbols", g.group_name, g.symbols.len(),); ) -> Vec<SymbolBatch> {
symbols
// Divide group into batches .as_slice()
g.symbols .chunks(max_batch_size)
.as_slice() .map(move |batch_symbols| SymbolBatch {
.chunks(max_batch_size) group_name: batch_name.to_owned(),
.map(move |symbols| { symbols: batch_symbols.to_vec(),
BatchState::new(name4closure.clone(), symbols, conditions4closure.clone()) conditions: conditions.clone(),
})
}) })
.collect() .collect()
} }
} }
#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)] #[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)]
pub struct SymbolGroup { pub struct SymbolGroupConfig {
pub group_name: String,
/// Attestation conditions applied to all symbols in this group
/// If not provided, use the default attestation conditions from `AttestationConfig`.
pub conditions: Option<AttestationConditions>,
/// The symbols to publish in this group.
pub symbols: Vec<SymbolConfig>,
}
/// Config entry for a symbol to attest.
#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum SymbolConfig {
/// A symbol specified by its product name.
Name {
/// The name of the symbol. This name is matched against the "symbol" field in the product
/// account metadata. If multiple price accounts have this name (either because 2 product
/// accounts have the same symbol or a single product account has multiple price accounts),
/// it matches *all* of them and puts them into this group.
name: String,
},
/// A symbol specified by its product and price account keys.
Key {
/// Optional human-readable name for the symbol (for logging purposes).
/// This field does not need to match the on-chain data for the product.
name: Option<String>,
#[serde(
deserialize_with = "pubkey_string_de",
serialize_with = "pubkey_string_ser"
)]
product: Pubkey,
#[serde(
deserialize_with = "pubkey_string_de",
serialize_with = "pubkey_string_ser"
)]
price: Pubkey,
},
}
impl ToString for SymbolConfig {
fn to_string(&self) -> String {
match &self {
Name { name } => name.clone(),
Key {
name: Some(name),
product: _,
price: _,
} => name.clone(),
Key {
name: None,
product,
price: _,
} => {
format!("Unnamed product {}", product)
}
}
}
}
/// A batch of symbols that's ready to be attested. Includes all necessary information
/// (such as price/product account keys).
#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)]
pub struct SymbolBatch {
pub group_name: String, pub group_name: String,
/// Attestation conditions applied to all symbols in this group /// Attestation conditions applied to all symbols in this group
pub conditions: AttestationConditions, pub conditions: AttestationConditions,
@ -157,7 +309,7 @@ pub const fn default_max_batch_jobs() -> usize {
/// of the active conditions is met. Option<> fields can be /// of the active conditions is met. Option<> fields can be
/// de-activated with None. All conditions are inactive by default, /// de-activated with None. All conditions are inactive by default,
/// except for the non-Option ones. /// except for the non-Option ones.
#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)] #[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)]
pub struct AttestationConditions { pub struct AttestationConditions {
/// Baseline, unconditional attestation interval. Attestation is triggered if the specified interval elapsed since last attestation. /// Baseline, unconditional attestation interval. Attestation is triggered if the specified interval elapsed since last attestation.
#[serde(default = "default_min_interval_secs")] #[serde(default = "default_min_interval_secs")]
@ -207,7 +359,6 @@ impl Default for AttestationConditions {
} }
} }
/// Config entry for a Pyth product + price pair
#[derive(Clone, Default, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] #[derive(Clone, Default, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)]
pub struct P2WSymbol { pub struct P2WSymbol {
/// User-defined human-readable name /// User-defined human-readable name
@ -274,54 +425,59 @@ where
mod tests { mod tests {
use { use {
super::*, super::*,
crate::attestation_cfg::SymbolConfig::{
Key,
Name,
},
solitaire::ErrBox, solitaire::ErrBox,
}; };
#[test] #[test]
fn test_sanity() -> Result<(), ErrBox> { fn test_sanity() -> Result<(), ErrBox> {
let fastbois = SymbolGroup { let fastbois = SymbolGroupConfig {
group_name: "fast bois".to_owned(), group_name: "fast bois".to_owned(),
conditions: AttestationConditions { conditions: Some(AttestationConditions {
min_interval_secs: 5, min_interval_secs: 5,
..Default::default() ..Default::default()
}, }),
symbols: vec![ symbols: vec![
P2WSymbol { Name {
name: Some("ETHUSD".to_owned()), name: "ETHUSD".to_owned(),
..Default::default()
}, },
P2WSymbol { Key {
name: Some("BTCUSD".to_owned()), name: Some("BTCUSD".to_owned()),
..Default::default() product: Pubkey::new_unique(),
price: Pubkey::new_unique(),
}, },
], ],
}; };
let slowbois = SymbolGroup { let slowbois = SymbolGroupConfig {
group_name: "slow bois".to_owned(), group_name: "slow bois".to_owned(),
conditions: AttestationConditions { conditions: Some(AttestationConditions {
min_interval_secs: 200, min_interval_secs: 200,
..Default::default() ..Default::default()
}, }),
symbols: vec![ symbols: vec![
P2WSymbol { Name {
name: Some("CNYAUD".to_owned()), name: "CNYAUD".to_owned(),
..Default::default()
}, },
P2WSymbol { Key {
name: Some("INRPLN".to_owned()), name: None,
..Default::default() product: Pubkey::new_unique(),
price: Pubkey::new_unique(),
}, },
], ],
}; };
let cfg = AttestationConfig { let cfg = AttestationConfig {
min_msg_reuse_interval_ms: 1000, min_msg_reuse_interval_ms: 1000,
max_msg_accounts: 100_000, max_msg_accounts: 100_000,
min_rpc_interval_ms: 2123, min_rpc_interval_ms: 2123,
mapping_addr: None, mapping_addr: None,
mapping_reload_interval_mins: 42, mapping_reload_interval_mins: 42,
symbol_groups: vec![fastbois, slowbois], default_attestation_conditions: AttestationConditions::default(),
symbol_groups: vec![fastbois, slowbois],
}; };
let serialized = serde_yaml::to_string(&cfg)?; let serialized = serde_yaml::to_string(&cfg)?;
@ -334,44 +490,128 @@ mod tests {
} }
#[test] #[test]
fn test_add_symbols_works() -> Result<(), ErrBox> { fn test_instantiate_batches() -> Result<(), ErrBox> {
let empty_config = AttestationConfig { let btc_product_key = Pubkey::new_unique();
min_msg_reuse_interval_ms: 1000, let btc_price_key = Pubkey::new_unique();
max_msg_accounts: 100,
min_rpc_interval_ms: 42422, let eth_product_key = Pubkey::new_unique();
mapping_addr: None, let eth_price_key_1 = Pubkey::new_unique();
mapping_reload_interval_mins: 42, let eth_price_key_2 = Pubkey::new_unique();
symbol_groups: vec![],
let unk_product_key = Pubkey::new_unique();
let unk_price_key = Pubkey::new_unique();
let eth_dup_product_key = Pubkey::new_unique();
let eth_dup_price_key = Pubkey::new_unique();
let attestation_conditions_1 = AttestationConditions {
min_interval_secs: 5,
..Default::default()
}; };
let mock_new_symbols = (0..255) let products = vec![
.map(|sym_idx| { P2WProductAccount {
let mut mock_prod_bytes = [0u8; 32]; name: Some("ETHUSD".to_owned()),
mock_prod_bytes[31] = sym_idx; key: eth_product_key,
price_account_keys: vec![eth_price_key_1, eth_price_key_2],
},
P2WProductAccount {
name: None,
key: unk_product_key,
price_account_keys: vec![unk_price_key],
},
];
let mut mock_prices = HashSet::new(); let group1 = SymbolGroupConfig {
for _px_idx in 1..=5 { group_name: "group 1".to_owned(),
let mut mock_price_bytes = [0u8; 32]; conditions: Some(attestation_conditions_1.clone()),
mock_price_bytes[31] = sym_idx; symbols: vec![
mock_prices.insert(Pubkey::new_from_array(mock_price_bytes)); Key {
name: Some("BTCUSD".to_owned()),
price: btc_price_key,
product: btc_product_key,
},
Name {
name: "ETHUSD".to_owned(),
},
],
};
let group2 = SymbolGroupConfig {
group_name: "group 2".to_owned(),
conditions: None,
symbols: vec![Key {
name: Some("ETHUSD".to_owned()),
price: eth_dup_price_key,
product: eth_dup_product_key,
}],
};
let default_attestation_conditions = AttestationConditions {
min_interval_secs: 1,
..Default::default()
};
let cfg = AttestationConfig {
min_msg_reuse_interval_ms: 1000,
max_msg_accounts: 100_000,
min_rpc_interval_ms: 2123,
mapping_addr: None,
mapping_reload_interval_mins: 42,
default_attestation_conditions: default_attestation_conditions.clone(),
symbol_groups: vec![group1, group2],
};
let batches = cfg.instantiate_batches(&products, 2);
assert_eq!(
batches,
vec![
SymbolBatch {
group_name: "group 1".to_owned(),
conditions: attestation_conditions_1.clone(),
symbols: vec![
P2WSymbol {
name: Some("BTCUSD".to_owned()),
product_addr: btc_product_key,
price_addr: btc_price_key,
},
P2WSymbol {
name: Some("ETHUSD".to_owned()),
product_addr: eth_product_key,
price_addr: eth_price_key_1,
}
],
},
SymbolBatch {
group_name: "group 1".to_owned(),
conditions: attestation_conditions_1,
symbols: vec![P2WSymbol {
name: Some("ETHUSD".to_owned()),
product_addr: eth_product_key,
price_addr: eth_price_key_2,
}],
},
SymbolBatch {
group_name: "group 2".to_owned(),
conditions: default_attestation_conditions.clone(),
symbols: vec![P2WSymbol {
name: Some("ETHUSD".to_owned()),
product_addr: eth_dup_product_key,
price_addr: eth_dup_price_key,
}],
},
SymbolBatch {
group_name: "mapping".to_owned(),
conditions: default_attestation_conditions,
symbols: vec![P2WSymbol {
name: None,
product_addr: unk_product_key,
price_addr: unk_price_key,
}],
} }
]
(Pubkey::new_from_array(mock_prod_bytes), mock_prices) );
})
.collect::<HashMap<Pubkey, HashSet<Pubkey>>>();
let mut config1 = empty_config.clone();
config1.add_symbols(mock_new_symbols.clone(), "default".to_owned());
let mut config2 = config1.clone();
// Should not be created because there's no new symbols to add
// (we're adding identical mock_new_symbols again)
config2.add_symbols(mock_new_symbols, "default2".to_owned());
assert_ne!(config1, empty_config); // Check that config grows from empty
assert_eq!(config1, config2); // Check that no changes are made if all symbols are already in there
Ok(()) Ok(())
} }

View File

@ -1,5 +1,6 @@
use { use {
crate::{ crate::{
attestation_cfg::SymbolBatch,
AttestationConditions, AttestationConditions,
P2WSymbol, P2WSymbol,
}, },
@ -27,17 +28,13 @@ pub struct BatchState {
} }
impl<'a> BatchState { impl<'a> BatchState {
pub fn new( pub fn new(group: &SymbolBatch) -> Self {
group_name: String,
symbols: &[P2WSymbol],
conditions: AttestationConditions,
) -> Self {
Self { Self {
group_name, group_name: group.group_name.clone(),
symbols: symbols.to_vec(), symbols: group.symbols.clone(),
conditions, conditions: group.conditions.clone(),
last_known_symbol_states: vec![None; symbols.len()], last_known_symbol_states: vec![None; group.symbols.len()],
last_job_finished_at: Instant::now(), last_job_finished_at: Instant::now(),
} }
} }

View File

@ -80,10 +80,6 @@ use {
AccountState, AccountState,
ErrBox, ErrBox,
}, },
std::collections::{
HashMap,
HashSet,
},
}; };
/// Future-friendly version of solitaire::ErrBox /// Future-friendly version of solitaire::ErrBox
@ -402,8 +398,8 @@ pub fn gen_attest_tx(
pub async fn crawl_pyth_mapping( pub async fn crawl_pyth_mapping(
rpc_client: &RpcClient, rpc_client: &RpcClient,
first_mapping_addr: &Pubkey, first_mapping_addr: &Pubkey,
) -> Result<HashMap<Pubkey, HashSet<Pubkey>>, ErrBox> { ) -> Result<Vec<P2WProductAccount>, ErrBox> {
let mut ret = HashMap::new(); let mut ret: Vec<P2WProductAccount> = vec![];
let mut n_mappings = 1; // We assume the first one must be valid let mut n_mappings = 1; // We assume the first one must be valid
let mut n_products_total = 0; // Grand total products in all mapping accounts let mut n_products_total = 0; // Grand total products in all mapping accounts
@ -439,6 +435,13 @@ pub async fn crawl_pyth_mapping(
} }
}; };
let mut prod_name = None;
for (key, val) in prod.iter() {
if key.eq_ignore_ascii_case("symbol") {
prod_name = Some(val.to_owned());
}
}
let mut price_addr = prod.px_acc; let mut price_addr = prod.px_acc;
let mut n_prod_prices = 0; let mut n_prod_prices = 0;
@ -454,6 +457,7 @@ pub async fn crawl_pyth_mapping(
} }
// loop until the last non-zero PriceAccount.next account // loop until the last non-zero PriceAccount.next account
let mut price_accounts: Vec<Pubkey> = vec![];
loop { loop {
let price_bytes = rpc_client.get_account_data(&price_addr).await?; let price_bytes = rpc_client.get_account_data(&price_addr).await?;
let price = match load_price_account(&price_bytes) { let price = match load_price_account(&price_bytes) {
@ -464,11 +468,7 @@ pub async fn crawl_pyth_mapping(
} }
}; };
// Append to existing set or create a new map entry price_accounts.push(price_addr);
ret.entry(*prod_addr)
.or_insert(HashSet::new())
.insert(price_addr);
n_prod_prices += 1; n_prod_prices += 1;
if price.next == Pubkey::default() { if price.next == Pubkey::default() {
@ -482,6 +482,11 @@ pub async fn crawl_pyth_mapping(
price_addr = price.next; price_addr = price.next;
} }
ret.push(P2WProductAccount {
key: *prod_addr,
name: prod_name.clone(),
price_account_keys: price_accounts,
});
n_prices_total += n_prod_prices; n_prices_total += n_prod_prices;
} }
@ -508,3 +513,10 @@ pub async fn crawl_pyth_mapping(
Ok(ret) Ok(ret)
} }
#[derive(Clone, Debug)]
pub struct P2WProductAccount {
pub key: Pubkey,
pub name: Option<String>,
pub price_account_keys: Vec<Pubkey>,
}

View File

@ -1,5 +1,3 @@
pub mod cli;
use { use {
clap::Parser, clap::Parser,
cli::{ cli::{
@ -30,7 +28,23 @@ use {
attest::P2W_MAX_BATCH_SIZE, attest::P2W_MAX_BATCH_SIZE,
Pyth2WormholeConfig, Pyth2WormholeConfig,
}, },
pyth2wormhole_client::*, pyth2wormhole_client::{
attestation_cfg::SymbolBatch,
crawl_pyth_mapping,
gen_attest_tx,
gen_init_tx,
gen_migrate_tx,
gen_set_config_tx,
gen_set_is_active_tx,
get_config_account,
start_metrics_server,
AttestationConfig,
BatchState,
ErrBoxSend,
P2WMessageQueue,
P2WSymbol,
RLMutex,
},
sha3::{ sha3::{
Digest, Digest,
Sha3_256, Sha3_256,
@ -68,6 +82,8 @@ use {
}, },
}; };
pub mod cli;
pub const SEQNO_PREFIX: &str = "Program log: Sequence: "; pub const SEQNO_PREFIX: &str = "Program log: Sequence: ";
lazy_static! { lazy_static! {
@ -272,7 +288,7 @@ async fn handle_attest_daemon_mode(
rpc_cfg: Arc<RLMutex<RpcCfg>>, rpc_cfg: Arc<RLMutex<RpcCfg>>,
payer: Keypair, payer: Keypair,
p2w_addr: Pubkey, p2w_addr: Pubkey,
mut attestation_cfg: AttestationConfig, attestation_cfg: AttestationConfig,
metrics_bind_addr: SocketAddr, metrics_bind_addr: SocketAddr,
) -> Result<(), ErrBox> { ) -> Result<(), ErrBox> {
tokio::spawn(start_metrics_server(metrics_bind_addr)); tokio::spawn(start_metrics_server(metrics_bind_addr));
@ -298,6 +314,7 @@ async fn handle_attest_daemon_mode(
attestation_cfg.max_msg_accounts as usize, attestation_cfg.max_msg_accounts as usize,
))); )));
let mut batch_cfg = vec![];
// This loop cranks attestations without interruption. This is // This loop cranks attestations without interruption. This is
// achieved by spinning up a new up-to-date symbol set before // achieved by spinning up a new up-to-date symbol set before
// letting go of the previous one. Additionally, hash of on-chain // letting go of the previous one. Additionally, hash of on-chain
@ -318,29 +335,18 @@ async fn handle_attest_daemon_mode(
}; };
// Use the mapping if specified // Use the mapping if specified
if let Some(mapping_addr) = attestation_cfg.mapping_addr.as_ref() { // If we cannot query the mapping account, retain the existing batch configuration.
match crawl_pyth_mapping(&lock_and_make_rpc(&rpc_cfg).await, mapping_addr).await { batch_cfg = attestation_config_to_batches(
Ok(additional_accounts) => { &rpc_cfg,
debug!( &attestation_cfg,
"Crawled mapping {} data:\n{:#?}", config.max_batch_size as usize,
mapping_addr, additional_accounts )
); .await
attestation_cfg.add_symbols(additional_accounts, "mapping".to_owned()); .unwrap_or(batch_cfg);
}
// De-escalate crawling errors; A temporary failure to
// look up the mapping should not crash the attester
Err(e) => {
error!("Could not crawl mapping {}: {:?}", mapping_addr, e);
}
}
}
debug!(
"Attestation config (includes mapping accounts):\n{:#?}",
attestation_cfg
);
// Hash currently known config // Hash currently known config
hasher.update(serde_yaml::to_vec(&attestation_cfg)?); hasher.update(serde_yaml::to_vec(&batch_cfg)?);
hasher.update(borsh::to_vec(&config)?); hasher.update(borsh::to_vec(&config)?);
let new_cfg_hash = hasher.finalize_reset(); let new_cfg_hash = hasher.finalize_reset();
@ -354,7 +360,7 @@ async fn handle_attest_daemon_mode(
info!("Spinning up attestation sched jobs"); info!("Spinning up attestation sched jobs");
// Start the new sched futures // Start the new sched futures
let new_sched_futs_handle = tokio::spawn(prepare_attestation_sched_jobs( let new_sched_futs_handle = tokio::spawn(prepare_attestation_sched_jobs(
&attestation_cfg, &batch_cfg,
&config, &config,
&rpc_cfg, &rpc_cfg,
&p2w_addr, &p2w_addr,
@ -372,7 +378,7 @@ async fn handle_attest_daemon_mode(
// Base case for first attestation attempt // Base case for first attestation attempt
old_sched_futs_state = Some(( old_sched_futs_state = Some((
tokio::spawn(prepare_attestation_sched_jobs( tokio::spawn(prepare_attestation_sched_jobs(
&attestation_cfg, &batch_cfg,
&config, &config,
&rpc_cfg, &rpc_cfg,
&p2w_addr, &p2w_addr,
@ -429,7 +435,7 @@ async fn lock_and_make_rpc(rlmtx: &RLMutex<RpcCfg>) -> RpcClient {
/// Non-daemon attestation scheduling /// Non-daemon attestation scheduling
async fn handle_attest_non_daemon_mode( async fn handle_attest_non_daemon_mode(
mut attestation_cfg: AttestationConfig, attestation_cfg: AttestationConfig,
rpc_cfg: Arc<RLMutex<RpcCfg>>, rpc_cfg: Arc<RLMutex<RpcCfg>>,
p2w_addr: Pubkey, p2w_addr: Pubkey,
payer: Keypair, payer: Keypair,
@ -438,29 +444,17 @@ async fn handle_attest_non_daemon_mode(
) -> Result<(), ErrBox> { ) -> Result<(), ErrBox> {
let p2w_cfg = get_config_account(&lock_and_make_rpc(&rpc_cfg).await, &p2w_addr).await?; let p2w_cfg = get_config_account(&lock_and_make_rpc(&rpc_cfg).await, &p2w_addr).await?;
// Use the mapping if specified let batch_config =
if let Some(mapping_addr) = attestation_cfg.mapping_addr.as_ref() { attestation_config_to_batches(&rpc_cfg, &attestation_cfg, p2w_cfg.max_batch_size as usize)
match crawl_pyth_mapping(&lock_and_make_rpc(&rpc_cfg).await, mapping_addr).await { .await
Ok(additional_accounts) => { .unwrap_or_else(|_| {
debug!( attestation_cfg.instantiate_batches(&[], p2w_cfg.max_batch_size as usize)
"Crawled mapping {} data:\n{:#?}", });
mapping_addr, additional_accounts
);
attestation_cfg.add_symbols(additional_accounts, "mapping".to_owned());
}
// De-escalate crawling errors; A temporary failure to
// look up the mapping should not crash the attester
Err(e) => {
error!("Could not crawl mapping {}: {:?}", mapping_addr, e);
}
}
}
debug!(
"Attestation config (includes mapping accounts):\n{:#?}",
attestation_cfg
);
let batches = attestation_cfg.as_batches(p2w_cfg.max_batch_size as usize); let batches: Vec<_> = batch_config
.into_iter()
.map(|x| BatchState::new(&x))
.collect();
let batch_count = batches.len(); let batch_count = batches.len();
// For enforcing min_msg_reuse_interval_ms, we keep a piece of // For enforcing min_msg_reuse_interval_ms, we keep a piece of
@ -510,22 +504,45 @@ async fn handle_attest_non_daemon_mode(
Ok(()) Ok(())
} }
/// Generate batches to attest by retrieving the on-chain product account data and grouping it
/// according to the configuration in `attestation_cfg`.
async fn attestation_config_to_batches(
rpc_cfg: &Arc<RLMutex<RpcCfg>>,
attestation_cfg: &AttestationConfig,
max_batch_size: usize,
) -> Result<Vec<SymbolBatch>, ErrBox> {
// Use the mapping if specified
let products = if let Some(mapping_addr) = attestation_cfg.mapping_addr.as_ref() {
let product_accounts_res =
crawl_pyth_mapping(&lock_and_make_rpc(rpc_cfg).await, mapping_addr).await;
if let Err(err) = &product_accounts_res {
error!(
"Could not crawl mapping {}: {:?}",
attestation_cfg.mapping_addr.unwrap_or_default(),
err
);
}
product_accounts_res?
} else {
vec![]
};
Ok(attestation_cfg.instantiate_batches(&products, max_batch_size))
}
/// Constructs attestation scheduling jobs from attestation config. /// Constructs attestation scheduling jobs from attestation config.
fn prepare_attestation_sched_jobs( fn prepare_attestation_sched_jobs(
attestation_cfg: &AttestationConfig, batch_cfg: &[SymbolBatch],
p2w_cfg: &Pyth2WormholeConfig, p2w_cfg: &Pyth2WormholeConfig,
rpc_cfg: &Arc<RLMutex<RpcCfg>>, rpc_cfg: &Arc<RLMutex<RpcCfg>>,
p2w_addr: &Pubkey, p2w_addr: &Pubkey,
payer: &Keypair, payer: &Keypair,
message_q_mtx: Arc<Mutex<P2WMessageQueue>>, message_q_mtx: Arc<Mutex<P2WMessageQueue>>,
) -> futures::future::JoinAll<impl Future<Output = Result<(), ErrBoxSend>>> { ) -> futures::future::JoinAll<impl Future<Output = Result<(), ErrBoxSend>>> {
info!(
"{} symbol groups read, dividing into batches",
attestation_cfg.symbol_groups.len(),
);
// Flatten attestation config into a plain list of batches // Flatten attestation config into a plain list of batches
let batches: Vec<_> = attestation_cfg.as_batches(p2w_cfg.max_batch_size as usize); let batches: Vec<_> = batch_cfg.iter().map(BatchState::new).collect();
let batch_count = batches.len(); let batch_count = batches.len();

View File

@ -121,7 +121,7 @@ impl From<Pyth2WormholeConfigV1> for Pyth2WormholeConfigV2 {
} }
// Added ops_owner which can toggle the is_active field // Added ops_owner which can toggle the is_active field
#[derive(Clone, Default, Hash, BorshDeserialize, BorshSerialize, PartialEq)] #[derive(Clone, Default, Hash, BorshDeserialize, BorshSerialize, PartialEq, Eq)]
#[cfg_attr(feature = "client", derive(Debug))] #[cfg_attr(feature = "client", derive(Debug))]
pub struct Pyth2WormholeConfigV3 { pub struct Pyth2WormholeConfigV3 {
/// Authority owning this contract /// Authority owning this contract

View File

@ -137,6 +137,8 @@ mapping_addr: {mapping_addr}
mapping_reload_interval_mins: 1 # Very fast for testing purposes mapping_reload_interval_mins: 1 # Very fast for testing purposes
min_rpc_interval_ms: 0 # RIP RPC min_rpc_interval_ms: 0 # RIP RPC
max_batch_jobs: 1000 # Where we're going there's no oomkiller max_batch_jobs: 1000 # Where we're going there's no oomkiller
default_attestation_conditions:
min_interval_secs: 60
symbol_groups: symbol_groups:
- group_name: fast_interval_only - group_name: fast_interval_only
conditions: conditions:
@ -155,9 +157,10 @@ symbol_groups:
product = thing["product"] product = thing["product"]
cfg_yaml += f""" cfg_yaml += f"""
- name: {name} - type: key
price_addr: {price} name: {name}
product_addr: {product}""" price: {price}
product: {product}"""
# End of fast_interval_only # End of fast_interval_only
@ -175,9 +178,10 @@ symbol_groups:
product = stuff["product"] product = stuff["product"]
cfg_yaml += f""" cfg_yaml += f"""
- name: {name} - type: key
price_addr: {price} name: {name}
product_addr: {product}""" price: {price}
product: {product}"""
cfg_yaml += f""" cfg_yaml += f"""
- group_name: mapping - group_name: mapping