Drozdziak1/p2w attest cont mapping reload (#330)

* pyth2wormhole-client: automatically crawl mapping based on interval

* Make the mapping crawl automation seamless

* pyth2wormhole-client: Make mapping crawling routine more robust

This change takes care of recoverable mapping crawling
errors (e.g. malformed single price on single product is no longer
dropping otherwise good different prices and products in the mapping
in favor of a warn message)

* pyth2wormhole-client: Move mapping crawl sleep near logic it affects

* pyth2wormhole-client: remove stray comment

* pyth2wormhole: Fix faulty merge with master

* pyth2wormhole-client: Fix mapping crawl price counting

* pyth2wormhole-client: split daemon/non-daemon, improve readabi[...]

...lity and remove most warnings

* pyth2wormhole-client: parts-per-thousand -> base points

* pyth2wormhole-client: inaccurate comment

* p2w-client: review advice - bp -> bps, std hasher -> sha256

* pyth2wormhole-client: reuse message queue across mapping lookups
This commit is contained in:
Stanisław Drozd 2022-10-24 17:19:25 +02:00 committed by GitHub
parent d9e94b284d
commit 3eb2beabe7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 577 additions and 309 deletions

View File

@ -1417,9 +1417,9 @@ dependencies = [
[[package]]
name = "generic-array"
version = "0.14.5"
version = "0.14.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd48d33ec7f05fbfa152300fdad764757cbded343c1aa1cff2fbaf4134851803"
checksum = "bff49e947297f3312447abdca79f45f4738097cc82b06e72054d2223f601f1b9"
dependencies = [
"serde",
"typenum",
@ -2648,6 +2648,7 @@ dependencies = [
"clap 3.1.18",
"env_logger 0.8.4",
"futures",
"generic-array",
"log",
"p2w-sdk",
"pyth-client 0.5.1",
@ -2655,6 +2656,7 @@ dependencies = [
"pyth2wormhole",
"serde",
"serde_yaml",
"sha3 0.10.6",
"shellexpand",
"solana-client",
"solana-program",
@ -3353,9 +3355,9 @@ dependencies = [
[[package]]
name = "sha3"
version = "0.10.5"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2904bea16a1ae962b483322a1c7b81d976029203aea1f461e51cd7705db7ba9"
checksum = "bdf0c33fae925bdc080598b84bc15c55e7b9a4a43b3c704da051f977469691c9"
dependencies = [
"digest 0.10.5",
"keccak",
@ -3844,7 +3846,7 @@ dependencies = [
"serde_bytes",
"serde_derive",
"sha2 0.10.2",
"sha3 0.10.5",
"sha3 0.10.6",
"solana-frozen-abi",
"solana-frozen-abi-macro",
"solana-sdk-macro",
@ -4028,7 +4030,7 @@ dependencies = [
"serde_derive",
"serde_json",
"sha2 0.10.2",
"sha3 0.10.5",
"sha3 0.10.6",
"solana-frozen-abi",
"solana-frozen-abi-macro",
"solana-logger",
@ -5221,7 +5223,7 @@ dependencies = [
"hex-literal",
"nom",
"primitive-types 0.11.1",
"sha3 0.10.5",
"sha3 0.10.6",
"thiserror",
]
@ -5237,7 +5239,7 @@ dependencies = [
"hex-literal",
"nom",
"primitive-types 0.11.1",
"sha3 0.10.5",
"sha3 0.10.6",
"solana-program",
"thiserror",
"wormhole-core",

View File

@ -31,6 +31,8 @@ solana-transaction-status = "=1.10.31"
solitaire = {git = "https://github.com/wormhole-foundation/wormhole", tag = "v2.8.9"}
tokio = {version = "1", features = ["sync", "rt-multi-thread", "time"]}
futures = "0.3.21"
sha3 = "0.10.6"
generic-array = "0.14.6"
[dev-dependencies]
pyth-client = "0.5.0"

View File

@ -7,6 +7,8 @@ use std::{
str::FromStr,
};
use log::info;
use serde::{
de::Error,
Deserialize,
@ -16,8 +18,10 @@ use serde::{
};
use solana_program::pubkey::Pubkey;
use crate::BatchState;
/// Pyth2wormhole config specific to attestation requests
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)]
pub struct AttestationConfig {
#[serde(default = "default_min_msg_reuse_interval_ms")]
pub min_msg_reuse_interval_ms: u64,
@ -30,6 +34,15 @@ pub struct AttestationConfig {
default // Uses Option::default() which is None
)]
pub mapping_addr: Option<Pubkey>,
/// The known symbol list will be reloaded based off this
/// interval, to account for mapping changes. Note: This interval
/// will only work if the mapping address is defined. Whenever
/// it's time to look up the mapping, new attestation jobs are
/// started lazily, only if mapping contents affected the known
/// symbol list, and before stopping the pre-existing obsolete
/// jobs to maintain uninterrupted cranking.
#[serde(default = "default_mapping_reload_interval_mins")]
pub mapping_reload_interval_mins: u64,
#[serde(default = "default_min_rpc_interval_ms")]
/// Rate-limiting minimum delay between RPC requests in milliseconds"
pub min_rpc_interval_ms: u64,
@ -49,7 +62,7 @@ impl AttestationConfig {
for existing_group in &self.symbol_groups {
for existing_sym in &existing_group.symbols {
// Check if new symbols mention this product
if let Some(mut prices) = new_symbols.get_mut(&existing_sym.product_addr) {
if let Some(prices) = new_symbols.get_mut(&existing_sym.product_addr) {
// Prune the price if exists
prices.remove(&existing_sym.price_addr);
}
@ -74,7 +87,7 @@ impl AttestationConfig {
.iter_mut()
.find(|g| g.group_name == group_name) // Advances the iterator and returns Some(item) on first hit
{
Some(mut existing_group) => existing_group.symbols.append(&mut new_symbols_vec),
Some(existing_group) => existing_group.symbols.append(&mut new_symbols_vec),
None if new_symbols_vec.len() != 0 => {
// Group does not exist, assume defaults
let new_group = SymbolGroup {
@ -88,9 +101,30 @@ impl AttestationConfig {
None => {}
}
}
pub fn as_batches(&self, max_batch_size: usize) -> Vec<BatchState> {
self.symbol_groups
.iter()
.map(move |g| {
let conditions4closure = g.conditions.clone();
let name4closure = g.group_name.clone();
info!("Group {:?}, {} symbols", g.group_name, g.symbols.len(),);
// Divide group into batches
g.symbols
.as_slice()
.chunks(max_batch_size.clone())
.map(move |symbols| {
BatchState::new(name4closure.clone(), symbols, conditions4closure.clone())
})
})
.flatten()
.collect()
}
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)]
pub struct SymbolGroup {
pub group_name: String,
/// Attestation conditions applied to all symbols in this group
@ -106,6 +140,10 @@ pub const fn default_min_msg_reuse_interval_ms() -> u64 {
10_000 // 10s
}
pub const fn default_mapping_reload_interval_mins() -> u64 {
15
}
pub const fn default_min_rpc_interval_ms() -> u64 {
150
}
@ -122,7 +160,7 @@ pub const fn default_max_batch_jobs() -> usize {
/// of the active conditions is met. Option<> fields can be
/// de-activated with None. All conditions are inactive by default,
/// except for the non-Option ones.
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)]
pub struct AttestationConditions {
/// Baseline, unconditional attestation interval. Attestation is triggered if the specified interval elapsed since last attestation.
#[serde(default = "default_min_interval_secs")]
@ -134,9 +172,10 @@ pub struct AttestationConditions {
#[serde(default = "default_max_batch_jobs")]
pub max_batch_jobs: usize,
/// Trigger attestation if price changes by the specified percentage.
/// Trigger attestation if price changes by the specified
/// percentage, expressed in integer basis points (1bps = 0.01%)
#[serde(default)]
pub price_changed_pct: Option<f64>,
pub price_changed_bps: Option<u64>,
/// Trigger attestation if publish_time advances at least the
/// specified amount.
@ -152,11 +191,11 @@ impl AttestationConditions {
let AttestationConditions {
min_interval_secs: _min_interval_secs,
max_batch_jobs: _max_batch_jobs,
price_changed_pct,
price_changed_bps,
publish_time_min_delta_secs,
} = self;
price_changed_pct.is_some() || publish_time_min_delta_secs.is_some()
price_changed_bps.is_some() || publish_time_min_delta_secs.is_some()
}
}
@ -165,14 +204,14 @@ impl Default for AttestationConditions {
Self {
min_interval_secs: default_min_interval_secs(),
max_batch_jobs: default_max_batch_jobs(),
price_changed_pct: None,
price_changed_bps: None,
publish_time_min_delta_secs: None,
}
}
}
/// Config entry for a Pyth product + price pair
#[derive(Clone, Default, Debug, Deserialize, Serialize, PartialEq, Eq)]
#[derive(Clone, Default, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)]
pub struct P2WSymbol {
/// User-defined human-readable name
pub name: Option<String>,
@ -283,6 +322,7 @@ mod tests {
max_msg_accounts: 100_000,
min_rpc_interval_ms: 2123,
mapping_addr: None,
mapping_reload_interval_mins: 42,
symbol_groups: vec![fastbois, slowbois],
};
@ -302,6 +342,7 @@ mod tests {
max_msg_accounts: 100,
min_rpc_interval_ms: 42422,
mapping_addr: None,
mapping_reload_interval_mins: 42,
symbol_groups: vec![],
};

View File

@ -1,11 +1,8 @@
use futures::future::TryFutureExt;
use log::{
debug,
trace,
warn,
};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::signature::Signature;
use pyth_sdk_solana::state::PriceAccount;
@ -16,31 +13,29 @@ use std::time::{
use crate::{
AttestationConditions,
ErrBox,
P2WSymbol,
RLMutex,
};
/// Runtime representation of a batch. It refers to the original group
/// from the config.
#[derive(Debug)]
pub struct BatchState<'a> {
pub struct BatchState {
pub group_name: String,
pub symbols: &'a [P2WSymbol],
pub symbols: Vec<P2WSymbol>,
pub last_known_symbol_states: Vec<Option<PriceAccount>>,
pub conditions: AttestationConditions,
pub last_job_finished_at: Instant,
}
impl<'a> BatchState<'a> {
impl<'a> BatchState {
pub fn new(
group_name: String,
symbols: &'a [P2WSymbol],
symbols: &[P2WSymbol],
conditions: AttestationConditions,
) -> Self {
Self {
group_name,
symbols,
symbols: symbols.to_vec(),
conditions,
last_known_symbol_states: vec![None; symbols.len()],
last_job_finished_at: Instant::now(),
@ -69,7 +64,7 @@ impl<'a> BatchState<'a> {
// Only lookup and compare symbols if the conditions require
if self.conditions.need_onchain_lookup() {
let mut new_symbol_states: Vec<Option<PriceAccount>> =
let new_symbol_states: Vec<Option<PriceAccount>> =
match c.get_multiple_accounts(&pubkeys).await {
Ok(acc_opts) => {
acc_opts
@ -120,9 +115,9 @@ impl<'a> BatchState<'a> {
))
}
// price_changed_pct
} else if let Some(pct) = self.conditions.price_changed_pct {
let pct = pct.abs();
// price_changed_bps
} else if let Some(bps) = self.conditions.price_changed_bps {
let pct = bps as f64 / 100.0;
let price_pct_diff = ((old.agg.price as f64 - new.agg.price as f64)
/ old.agg.price as f64
* 100.0)

View File

@ -123,9 +123,7 @@ pub enum Action {
},
#[clap(about = "Print out emitter address for the specified pyth2wormhole contract")]
GetEmitter,
#[clap(
about = "Set the value of is_active config as ops_owner"
)]
#[clap(about = "Set the value of is_active config as ops_owner")]
SetIsActive {
/// Current ops owner keypair path
#[clap(
@ -139,5 +137,5 @@ pub enum Action {
possible_values = ["true", "false"],
)]
new_is_active: String,
}
},
}

View File

@ -10,6 +10,7 @@ use borsh::{
use log::{
debug,
trace,
warn,
};
use pyth_sdk_solana::state::{
load_mapping_account,
@ -61,7 +62,6 @@ use std::collections::{
use p2w_sdk::P2WEmitter;
use pyth2wormhole::{
attest::P2W_MAX_BATCH_SIZE,
config::{
OldP2WConfigAccount,
P2WConfigAccount,
@ -409,23 +409,41 @@ pub async fn crawl_pyth_mapping(
let mut ret = HashMap::new();
let mut n_mappings = 1; // We assume the first one must be valid
let mut n_products = 0;
let mut n_prices = 0;
let mut n_products_total = 0; // Grand total products in all mapping accounts
let mut n_prices_total = 0; // Grand total prices in all product accounts in all mapping accounts
let mut mapping_addr = first_mapping_addr.clone();
// loop until the last non-zero MappingAccount.next account
loop {
let mapping_bytes = rpc_client.get_account_data(&mapping_addr).await?;
let mapping = match load_mapping_account(&mapping_bytes) {
Ok(p) => p,
Err(e) => {
warn!(
"Mapping: Could not parse account {} as a Pyth mapping, crawling terminated. Error: {:?}",
mapping_addr, e
);
continue;
}
};
let mapping = load_mapping_account(&mapping_bytes)?;
// Products in this mapping account
let mut n_mapping_products = 0;
// loop through all products in this mapping; filter out zeroed-out empty product slots
for prod_addr in mapping.products.iter().filter(|p| *p != &Pubkey::default()) {
let prod_bytes = rpc_client.get_account_data(prod_addr).await?;
let prod = load_product_account(&prod_bytes)?;
let prod = match load_product_account(&prod_bytes) {
Ok(p) => p,
Err(e) => {
warn!("Mapping {}: Could not parse account {} as a Pyth product, skipping to next product. Error: {:?}", mapping_addr, prod_addr, e);
continue;
}
};
let mut price_addr = prod.px_acc.clone();
let mut n_prod_prices = 0;
// the product might have no price, can happen in tilt due to race-condition, failed tx to add price, ...
if price_addr == Pubkey::default() {
@ -441,32 +459,46 @@ pub async fn crawl_pyth_mapping(
// loop until the last non-zero PriceAccount.next account
loop {
let price_bytes = rpc_client.get_account_data(&price_addr).await?;
let price = load_price_account(&price_bytes)?;
let price = match load_price_account(&price_bytes) {
Ok(p) => p,
Err(e) => {
warn!("Product {}: Could not parse account {} as a Pyth price, skipping to next product. Error: {:?}", prod_addr, price_addr, e);
break;
}
};
// Append to existing set or create a new map entry
ret.entry(prod_addr.clone())
.or_insert(HashSet::new())
.insert(price_addr);
n_prices += 1;
n_prod_prices += 1;
if price.next == Pubkey::default() {
trace!("Product {}: processed {} prices", prod_addr, n_prices);
trace!(
"Product {}: processed {} price(s)",
prod_addr,
n_prod_prices
);
break;
}
price_addr = price.next.clone();
}
n_products += 1;
n_prices_total += n_prod_prices;
}
trace!(
"Mapping {}: processed {} products",
mapping_addr,
n_products
);
n_mapping_products += 1;
n_products_total += n_mapping_products;
// Traverse other mapping accounts if applicable
if mapping.next == Pubkey::default() {
trace!(
"Mapping {}: processed {} products",
mapping_addr,
n_mapping_products
);
break;
}
mapping_addr = mapping.next.clone();
@ -474,7 +506,7 @@ pub async fn crawl_pyth_mapping(
}
debug!(
"Processed {} price(s) in {} product account(s), in {} mapping account(s)",
n_prices, n_products, n_mappings
n_prices_total, n_products_total, n_mappings
);
Ok(ret)

View File

@ -2,9 +2,7 @@ pub mod cli;
use std::{
fs::File,
pin::Pin,
sync::Arc,
thread,
time::{
Duration,
Instant,
@ -12,22 +10,22 @@ use std::{
};
use clap::Parser;
use futures::future::{
Future,
FutureExt,
TryFuture,
TryFutureExt,
use futures::{
future::{
Future,
TryFutureExt,
},
};
use generic_array::GenericArray;
use log::{
debug,
error,
info,
trace,
warn,
LevelFilter,
};
use sha3::{Digest, Sha3_256};
use solana_client::{
client_error::ClientError,
nonblocking::rpc_client::RpcClient,
rpc_config::RpcTransactionConfig,
};
@ -36,7 +34,6 @@ use solana_sdk::{
commitment_config::CommitmentConfig,
signature::{
read_keypair_file,
Signature,
},
signer::keypair::Keypair,
};
@ -133,7 +130,7 @@ async fn main() -> Result<(), ErrBox> {
remove_ops_owner,
} => {
let old_config = get_config_account(&rpc_client, &p2w_addr).await?;
let new_ops_owner = if remove_ops_owner {
None
} else if let Some(given_ops_owner) = ops_owner_addr {
@ -187,30 +184,50 @@ async fn main() -> Result<(), ErrBox> {
daemon,
} => {
// Load the attestation config yaml
let mut attestation_cfg: AttestationConfig =
let attestation_cfg: AttestationConfig =
serde_yaml::from_reader(File::open(attestation_cfg)?)?;
if let Some(mapping_addr) = attestation_cfg.mapping_addr.as_ref() {
let additional_accounts = crawl_pyth_mapping(&rpc_client, mapping_addr).await?;
info!("Additional mapping accounts:\n{:#?}", additional_accounts);
attestation_cfg.add_symbols(additional_accounts, "mapping".to_owned());
}
// Derive seeded accounts
let emitter_addr = P2WEmitter::key(None, &p2w_addr);
handle_attest(
cli.rpc_url,
cli.commitment,
payer,
p2w_addr,
attestation_cfg,
n_retries,
Duration::from_secs(retry_interval_secs),
Duration::from_secs(confirmation_timeout_secs),
daemon,
)
.await?;
info!("Using emitter addr {}", emitter_addr);
// Note: For global rate-limitting of RPC requests, we use a
// custom Mutex wrapper which enforces a delay of rpc_interval
// between RPC accesses.
let rpc_cfg = Arc::new(RLMutex::new(
RpcCfg {
url: cli.rpc_url,
timeout: Duration::from_secs(confirmation_timeout_secs),
commitment: cli.commitment.clone(),
},
Duration::from_millis(attestation_cfg.min_rpc_interval_ms),
));
if daemon {
handle_attest_daemon_mode(
rpc_cfg,
payer,
p2w_addr,
attestation_cfg,
)
.await?;
} else {
handle_attest_non_daemon_mode(
attestation_cfg,
rpc_cfg,
p2w_addr,
payer,
n_retries,
Duration::from_secs(retry_interval_secs),
)
.await?;
}
}
Action::GetEmitter => unreachable! {}, // It is handled early in this function.
Action::SetIsActive { ops_owner, new_is_active } => {
Action::SetIsActive {
ops_owner,
new_is_active,
} => {
let tx = gen_set_is_active_tx(
payer,
p2w_addr,
@ -225,132 +242,138 @@ async fn main() -> Result<(), ErrBox> {
"Applied config:\n{:?}",
get_config_account(&rpc_client, &p2w_addr).await?
);
},
}
}
Ok(())
}
/// Send a series of batch attestations for symbols of an attestation config.
async fn handle_attest(
rpc_url: String,
commitment: CommitmentConfig,
/// Continuously send batch attestations for symbols of an attestation config.
async fn handle_attest_daemon_mode(
rpc_cfg: Arc<RLMutex<RpcCfg>>,
payer: Keypair,
p2w_addr: Pubkey,
attestation_cfg: AttestationConfig,
n_retries: usize,
retry_interval: Duration,
confirmation_timeout: Duration,
daemon: bool,
mut attestation_cfg: AttestationConfig,
) -> Result<(), ErrBox> {
// Derive seeded accounts
let emitter_addr = P2WEmitter::key(None, &p2w_addr);
info!("Using emitter addr {}", emitter_addr);
let config = get_config_account(
&RpcClient::new_with_timeout_and_commitment(
rpc_url.clone(),
confirmation_timeout,
commitment.clone(),
),
&p2w_addr,
)
.await?;
debug!("Symbol config:\n{:#?}", attestation_cfg);
info!(
"{} symbol groups read, dividing into batches",
attestation_cfg.symbol_groups.len(),
"Crawling mapping {:?} every {} minutes",
attestation_cfg.mapping_addr, attestation_cfg.mapping_reload_interval_mins
);
// Reused for failed batch retries
let mut batches: Vec<_> = attestation_cfg
.symbol_groups
.iter()
.map(|g| {
let conditions4closure = g.conditions.clone();
let name4closure = g.group_name.clone();
info!("Group {:?}, {} symbols", g.group_name, g.symbols.len(),);
// Divide group into batches
g.symbols
.as_slice()
.chunks(config.max_batch_size as usize)
.map(move |symbols| {
BatchState::new(name4closure.clone(), symbols, conditions4closure.clone())
})
})
.flatten()
.enumerate()
.map(|(idx, batch_state)| (idx + 1, batch_state))
.collect();
let batch_count = batches.len();
/// Note: For global rate-limitting of RPC requests, we use a
/// custom Mutex wrapper which enforces a delay of rpc_interval
/// between RPC accesses.
let rpc_cfg = Arc::new(RLMutex::new(
RpcCfg {
url: rpc_url,
timeout: confirmation_timeout,
commitment: commitment.clone(),
},
Duration::from_millis(attestation_cfg.min_rpc_interval_ms),
));
// Used for easier detection of config changes
let mut hasher = Sha3_256::new();
let mut old_sched_futs_state: Option<(JoinHandle<_>, GenericArray<u8, _>)> = None; // (old_futs_handle, old_config_hash)
// For enforcing min_msg_reuse_interval_ms, we keep a piece of
// state that creates or reuses accounts if enough time had
// passed. It is crucial that this queue is reused across mapping
// lookups, so that previous symbol set's messages have enough
// time to be picked up by Wormhole guardians.
let message_q_mtx = Arc::new(Mutex::new(P2WMessageQueue::new(
Duration::from_millis(attestation_cfg.min_msg_reuse_interval_ms),
attestation_cfg.max_msg_accounts as usize,
)));
// Create attestation scheduling routines; see attestation_sched_job() for details
let mut attestation_sched_futs = batches.into_iter().map(|(batch_no, batch)| {
attestation_sched_job(
batch,
batch_no,
batch_count,
n_retries,
retry_interval,
daemon,
rpc_cfg.clone(),
p2w_addr,
config.clone(),
Keypair::from_bytes(&payer.to_bytes()).unwrap(),
message_q_mtx.clone(),
)
});
// This loop cranks attestations without interruption. This is
// achieved by spinning up a new up-to-date symbol set before
// letting go of the previous one. Additionally, hash of on-chain
// and attestation configs is used to prevent needless reloads of
// an unchanged symbol set.
loop {
let start_time = Instant::now(); // Helps timekeep mapping lookups accurately
info!("Spinning up attestation sched jobs");
let config = get_config_account(&lock_and_make_rpc(&rpc_cfg).await, &p2w_addr).await?;
let results = futures::future::join_all(attestation_sched_futs).await; // May never finish for daemon mode
info!("Got {} results", results.len());
// With daemon mode off, the sched jobs return from the
// join_all. We filter out errors and report them
let errors: Vec<_> = results
.iter()
.enumerate()
.filter_map(|(idx, r)| {
r.as_ref()
.err()
.map(|e| format!("Error {}: {:#?}\n", idx + 1, e))
})
.collect();
if !errors.is_empty() {
let err_lines = errors.join("\n");
let msg = format!(
"{} of {} batches failed:\n{}",
errors.len(),
batch_count,
err_lines
// Use the mapping if specified
if let Some(mapping_addr) = attestation_cfg.mapping_addr.as_ref() {
match crawl_pyth_mapping(&lock_and_make_rpc(&rpc_cfg).await, mapping_addr).await {
Ok(additional_accounts) => {
debug!(
"Crawled mapping {} data:\n{:#?}",
mapping_addr, additional_accounts
);
attestation_cfg.add_symbols(additional_accounts, "mapping".to_owned());
}
// De-escalate crawling errors; A temporary failure to
// look up the mapping should not crash the attester
Err(e) => {
error!("Could not crawl mapping {}: {:?}", mapping_addr, e);
}
}
}
debug!(
"Attestation config (includes mapping accounts):\n{:#?}",
attestation_cfg
);
error!("{}", msg);
return Err(msg.into());
// Hash currently known config
hasher.update(serde_yaml::to_vec(&attestation_cfg)?);
hasher.update(borsh::to_vec(&config)?);
let new_cfg_hash = hasher.finalize_reset();
if let Some((old_handle, old_cfg_hash)) = old_sched_futs_state.as_ref() {
// Ignore unchanged configs
if &new_cfg_hash == old_cfg_hash {
info!("Note: Attestation config and on-chain config unchanged, not stopping existing attestation sched jobs");
} else {
// Process changed config into attestation scheduling futures
info!("Spinning up attestation sched jobs");
// Start the new sched futures
let new_sched_futs_handle = tokio::spawn(prepare_attestation_sched_jobs(
&attestation_cfg,
&config,
&rpc_cfg,
&p2w_addr,
&payer,
message_q_mtx.clone()
));
// Quit old sched futures
old_handle.abort();
// The just started futures become the on-going attestation state
old_sched_futs_state = Some((new_sched_futs_handle, new_cfg_hash));
}
} else {
// Base case for first attestation attempt
old_sched_futs_state = Some((
tokio::spawn(prepare_attestation_sched_jobs(
&attestation_cfg,
&config,
&rpc_cfg,
&p2w_addr,
&payer,
message_q_mtx.clone()
)),
new_cfg_hash,
));
}
// Sum up elapsed time, wait for next run accurately
let target = Duration::from_secs(attestation_cfg.mapping_reload_interval_mins * 60);
let elapsed = start_time.elapsed();
let remaining = target.saturating_sub(elapsed);
if remaining == Duration::from_secs(0) {
warn!(
"Processing took more than desired mapping lookup interval of {} seconds, not sleeping. Consider increasing {}",
target.as_secs(),
// stringify prints the up-to-date setting name automatically
stringify!(attestation_cfg.mapping_reload_interval_mins)
);
} else {
info!(
"Processing new mapping took {}.{}s, next config/mapping refresh in {}.{}s",
elapsed.as_secs(),
elapsed.subsec_millis(),
remaining.as_secs(),
remaining.subsec_millis()
);
}
tokio::time::sleep(remaining).await;
}
Ok(())
@ -373,33 +396,161 @@ async fn lock_and_make_rpc(rlmtx: &RLMutex<RpcCfg>) -> RpcClient {
RpcClient::new_with_timeout_and_commitment(url, timeout, commitment)
}
/// A future that decides how a batch is sent.
///
/// In daemon mode, attestations of the batch are scheduled
/// continuously using spawn(), which means that a next attestation of
/// the same batch begins immediately when a condition is met without
/// waiting for the previous attempt to finish. Subsequent
/// attestations are started according to the attestation_conditions
/// field on the batch. Concurrent requests per batch are limited by
/// the max_batch_jobs field to prevent excess memory usage on network
/// slowdowns etc..
///
/// With daemon_mode off, this future attempts only one blocking
/// attestation of the batch and returns the result.
async fn attestation_sched_job(
mut batch: BatchState<'_>,
batch_no: usize,
batch_count: usize,
n_retries: usize,
retry_interval: Duration,
daemon: bool,
/// Non-daemon attestation scheduling
async fn handle_attest_non_daemon_mode(
mut attestation_cfg: AttestationConfig,
rpc_cfg: Arc<RLMutex<RpcCfg>>,
p2w_addr: Pubkey,
config: Pyth2WormholeConfig,
payer: Keypair,
n_retries: usize,
retry_interval: Duration,
) -> Result<(), ErrBox> {
let p2w_cfg = get_config_account(&lock_and_make_rpc(&rpc_cfg).await, &p2w_addr).await?;
// Use the mapping if specified
if let Some(mapping_addr) = attestation_cfg.mapping_addr.as_ref() {
match crawl_pyth_mapping(&lock_and_make_rpc(&rpc_cfg).await, mapping_addr).await {
Ok(additional_accounts) => {
debug!(
"Crawled mapping {} data:\n{:#?}",
mapping_addr, additional_accounts
);
attestation_cfg.add_symbols(additional_accounts, "mapping".to_owned());
}
// De-escalate crawling errors; A temporary failure to
// look up the mapping should not crash the attester
Err(e) => {
error!("Could not crawl mapping {}: {:?}", mapping_addr, e);
}
}
}
debug!(
"Attestation config (includes mapping accounts):\n{:#?}",
attestation_cfg
);
let batches = attestation_cfg.as_batches(p2w_cfg.max_batch_size as usize);
let batch_count = batches.len();
// For enforcing min_msg_reuse_interval_ms, we keep a piece of
// state that creates or reuses accounts if enough time had
// passed
let message_q_mtx = Arc::new(Mutex::new(P2WMessageQueue::new(
Duration::from_millis(attestation_cfg.min_msg_reuse_interval_ms),
attestation_cfg.max_msg_accounts as usize,
)));
let retry_jobs = batches.into_iter().enumerate().map(|(idx, batch_state)| {
attestation_retry_job(AttestationRetryJobArgs {
batch_no: idx + 1,
batch_count: batch_count.clone(),
group_name: batch_state.group_name,
symbols: batch_state.symbols.clone(),
n_retries,
retry_interval: retry_interval.clone(),
rpc_cfg: rpc_cfg.clone(),
p2w_addr: p2w_addr.clone(),
p2w_config: p2w_cfg.clone(),
payer: Keypair::from_bytes(&payer.to_bytes()).unwrap(),
message_q_mtx: message_q_mtx.clone(),
})
});
let results = futures::future::join_all(retry_jobs).await;
// After completing, we count any errors coming from the sched
// futs.
let errors: Vec<_> = results
.iter()
.enumerate()
.filter_map(|(idx, r)| {
r.as_ref()
.err()
.map(|e| format!("Error {}: {:?}\n", idx + 1, e))
})
.collect();
if !errors.is_empty() {
let err_lines = errors.join("\n");
let msg = format!("{} batches failed:\n{}", errors.len(), err_lines);
error!("{}", msg);
return Err(msg.into());
}
Ok(())
}
/// Constructs attestation scheduling jobs from attestation config.
fn prepare_attestation_sched_jobs(
attestation_cfg: &AttestationConfig,
p2w_cfg: &Pyth2WormholeConfig,
rpc_cfg: &Arc<RLMutex<RpcCfg>>,
p2w_addr: &Pubkey,
payer: &Keypair,
message_q_mtx: Arc<Mutex<P2WMessageQueue>>,
) -> Result<(), ErrBoxSend> {
let mut retries_left = n_retries;
) -> futures::future::JoinAll<impl Future<Output = Result<(), ErrBoxSend>>> {
info!(
"{} symbol groups read, dividing into batches",
attestation_cfg.symbol_groups.len(),
);
// Flatten attestation config into a plain list of batches
let batches: Vec<_> = attestation_cfg.as_batches(p2w_cfg.max_batch_size as usize);
let batch_count = batches.len();
// Create attestation scheduling routines; see attestation_sched_job() for details
let attestation_sched_futs = batches.into_iter().enumerate().map(|(idx, batch)| {
attestation_sched_job(AttestationSchedJobArgs {
batch,
batch_no: idx + 1,
batch_count,
rpc_cfg: rpc_cfg.clone(),
p2w_addr: p2w_addr.clone(),
config: p2w_cfg.clone(),
payer: Keypair::from_bytes(&payer.to_bytes()).unwrap(),
message_q_mtx: message_q_mtx.clone(),
})
});
futures::future::join_all(attestation_sched_futs)
}
/// The argument count on attestation_sched_job got out of hand. This
/// helps keep the correct order in check.
pub struct AttestationSchedJobArgs {
pub batch: BatchState,
pub batch_no: usize,
pub batch_count: usize,
pub rpc_cfg: Arc<RLMutex<RpcCfg>>,
pub p2w_addr: Pubkey,
pub config: Pyth2WormholeConfig,
pub payer: Keypair,
pub message_q_mtx: Arc<Mutex<P2WMessageQueue>>,
}
/// A future that decides how a batch is sent in daemon mode.
///
/// Attestations of the batch are scheduled continuously using
/// spawn(), which means that a next attestation of the same batch
/// begins immediately when a condition is met without waiting for the
/// previous attempt to finish. Subsequent attestations are started
/// according to the attestation_conditions field on the
/// batch. Concurrent requests per batch are limited by the
/// max_batch_jobs field to prevent excess memory usage on network
/// slowdowns etc..
async fn attestation_sched_job(args: AttestationSchedJobArgs) -> Result<(), ErrBoxSend> {
let AttestationSchedJobArgs {
mut batch,
batch_no,
batch_count,
rpc_cfg,
p2w_addr,
config,
payer,
message_q_mtx,
} = args;
// Enforces the max batch job count
let sema = Arc::new(Semaphore::new(batch.conditions.max_batch_jobs));
loop {
@ -408,115 +559,162 @@ async fn attestation_sched_job(
batch_no, batch_count, batch.group_name
);
let job = attestation_job(
rpc_cfg.clone(),
let job = attestation_job(AttestationJobArgs {
rlmtx: rpc_cfg.clone(),
batch_no,
batch_count,
batch.group_name.clone(),
group_name: batch.group_name.clone(),
p2w_addr,
config.clone(),
Keypair::from_bytes(&payer.to_bytes()).unwrap(), // Keypair has no clone
batch.symbols.to_vec(),
sema.clone(),
message_q_mtx.clone(),
);
config: config.clone(),
payer: Keypair::from_bytes(&payer.to_bytes()).unwrap(), // Keypair has no clone
symbols: batch.symbols.to_vec(),
max_jobs_sema: sema.clone(),
message_q_mtx: message_q_mtx.clone(),
});
if daemon {
// park this routine until a resend condition is met
loop {
if let Some(reason) = batch
.should_resend(&lock_and_make_rpc(&rpc_cfg).await)
.await
{
info!(
"Batch {}/{}, group {}: Resending (reason: {:?})",
batch_no, batch_count, batch.group_name, reason
);
break;
}
// park this routine until a resend condition is met
loop {
if let Some(reason) = batch
.should_resend(&lock_and_make_rpc(&rpc_cfg).await)
.await
{
info!(
"Batch {}/{}, group {}: Resending (reason: {:?})",
batch_no, batch_count, batch.group_name, reason
);
break;
}
}
if sema.available_permits() == 0 {
warn!(
"Batch {}/{}, group {:?}: Ran out of job \
if sema.available_permits() == 0 {
warn!(
"Batch {}/{}, group {:?}: Ran out of job \
permits, some attestation conditions may be \
delayed. For better accuracy, increase \
max_batch_jobs or adjust attestation \
conditions",
batch_no, batch_count, batch.group_name
);
}
// This short-lived permit prevents scheduling
// excess attestation jobs (which could eventually
// eat all memory). It is freed as soon as we
// leave this code block.
let _permit4sched = sema.acquire().await?;
let batch_no4err_msg = batch_no.clone();
let batch_count4err_msg = batch_count.clone();
let group_name4err_msg = batch.group_name.clone();
// We never get to error reporting in daemon mode, attach a map_err
let job_with_err_msg = job.map_err(move |e| {
warn!(
"Batch {}/{}, group {:?} ERR: {:#?}",
batch_no4err_msg, batch_count4err_msg, group_name4err_msg, e
);
e
});
// Spawn the job in background
let _detached_job: JoinHandle<_> = tokio::spawn(job_with_err_msg);
} else {
// Await and return the single result in non-daemon mode, with retries if necessary
match job.await {
Ok(_) => return Ok(()),
Err(e) => {
if retries_left == 0 {
return Err(e);
} else {
retries_left -= 1;
debug!(
"{}/{}, group {:?}: attestation failure: {}",
batch_no,
batch_count,
batch.group_name,
e.to_string()
);
info!(
"Batch {}/{}, group {:?}: retrying in {}.{}s, {} retries left",
batch_no,
batch_count,
batch.group_name,
retry_interval.as_secs(),
retry_interval.subsec_millis(),
retries_left,
);
tokio::time::sleep(retry_interval).await;
}
}
}
batch_no, batch_count, batch.group_name
);
}
// This short-lived permit prevents scheduling
// excess attestation jobs (which could eventually
// eat all memory). It is freed as soon as we
// leave this code block.
let _permit4sched = sema.acquire().await?;
let batch_no4err_msg = batch_no.clone();
let batch_count4err_msg = batch_count.clone();
let group_name4err_msg = batch.group_name.clone();
// We never get to error reporting in daemon mode, attach a map_err
let job_with_err_msg = job.map_err(move |e| {
warn!(
"Batch {}/{}, group {:?} ERR: {:#?}",
batch_no4err_msg, batch_count4err_msg, group_name4err_msg, e
);
e
});
// Spawn the job in background
let _detached_job: JoinHandle<_> = tokio::spawn(job_with_err_msg);
batch.last_job_finished_at = Instant::now();
}
}
pub struct AttestationRetryJobArgs {
pub batch_no: usize,
pub batch_count: usize,
pub group_name: String,
pub symbols: Vec<P2WSymbol>,
pub n_retries: usize,
pub retry_interval: Duration,
pub rpc_cfg: Arc<RLMutex<RpcCfg>>,
pub p2w_addr: Pubkey,
pub p2w_config: Pyth2WormholeConfig,
pub payer: Keypair,
pub message_q_mtx: Arc<Mutex<P2WMessageQueue>>,
}
/// A future that cranks a batch up to n_retries times, pausing for
/// retry_interval in between; Used exclusively in non-daemon mode
async fn attestation_retry_job(args: AttestationRetryJobArgs) -> Result<(), ErrBoxSend> {
let AttestationRetryJobArgs {
batch_no,
batch_count,
group_name,
symbols,
n_retries,
retry_interval,
rpc_cfg,
p2w_addr,
p2w_config,
payer,
message_q_mtx,
} = args;
let mut res = Err(format!(
"attestation_retry_job INTERNAL: Could not get a single attestation job result"
)
.into());
for _i in 0..=n_retries {
res = attestation_job(AttestationJobArgs {
rlmtx: rpc_cfg.clone(),
batch_no,
batch_count,
group_name: group_name.clone(),
p2w_addr,
config: p2w_config.clone(),
payer: Keypair::from_bytes(&payer.to_bytes()).unwrap(), // Keypair has no clone
symbols: symbols.clone(),
max_jobs_sema: Arc::new(Semaphore::new(1)), // Not important for non-daemon mode
message_q_mtx: message_q_mtx.clone(),
})
.await;
// Finish early on success
if res.is_ok() {
break;
}
tokio::time::sleep(retry_interval).await;
}
res
}
/// Arguments for attestation_job(). This struct rules out same-type
/// ordering errors due to the large argument count
pub struct AttestationJobArgs {
pub rlmtx: Arc<RLMutex<RpcCfg>>,
pub batch_no: usize,
pub batch_count: usize,
pub group_name: String,
pub p2w_addr: Pubkey,
pub config: Pyth2WormholeConfig,
pub payer: Keypair,
pub symbols: Vec<P2WSymbol>,
pub max_jobs_sema: Arc<Semaphore>,
pub message_q_mtx: Arc<Mutex<P2WMessageQueue>>,
}
/// A future for a single attempt to attest a batch on Solana.
async fn attestation_job(
rlmtx: Arc<RLMutex<RpcCfg>>,
batch_no: usize,
batch_count: usize,
group_name: String,
p2w_addr: Pubkey,
config: Pyth2WormholeConfig,
payer: Keypair,
symbols: Vec<P2WSymbol>,
max_jobs_sema: Arc<Semaphore>,
message_q_mtx: Arc<Mutex<P2WMessageQueue>>,
) -> Result<(), ErrBoxSend> {
async fn attestation_job(args: AttestationJobArgs) -> Result<(), ErrBoxSend> {
let AttestationJobArgs {
rlmtx,
batch_no,
batch_count,
group_name,
p2w_addr,
config,
payer,
symbols,
max_jobs_sema,
message_q_mtx,
} = args;
// Will be dropped after attestation is complete
let _permit = max_jobs_sema.acquire().await?;

View File

@ -1,7 +1,6 @@
//! Re-usable message scheme for pyth2wormhole
use log::debug;
use solana_program::system_instruction;
use std::{
collections::VecDeque,
time::{
@ -89,6 +88,7 @@ impl P2WMessageQueue {
}
}
#[cfg(test)]
pub mod test {
use super::*;

View File

@ -38,7 +38,6 @@ pub type Pyth2WormholeConfig = Pyth2WormholeConfigV3;
pub type P2WConfigAccount<'b, const IsInitialized: AccountState> =
P2WConfigAccountV3<'b, IsInitialized>;
impl Owned for Pyth2WormholeConfig {
fn owner(&self) -> AccountOwner {
AccountOwner::This
@ -120,7 +119,7 @@ impl From<Pyth2WormholeConfigV1> for Pyth2WormholeConfigV2 {
}
// Added ops_owner which can toggle the is_active field
#[derive(Clone, Default, BorshDeserialize, BorshSerialize)]
#[derive(Clone, Default, Hash, BorshDeserialize, BorshSerialize, PartialEq)]
#[cfg_attr(feature = "client", derive(Debug))]
pub struct Pyth2WormholeConfigV3 {
/// Authority owning this contract
@ -162,7 +161,7 @@ impl From<Pyth2WormholeConfigV2> for Pyth2WormholeConfigV3 {
pyth_owner,
max_batch_size,
is_active: true,
ops_owner: None
ops_owner: None,
}
}
}

View File

@ -184,6 +184,7 @@ if P2W_ATTESTATION_CFG is None:
cfg_yaml = f"""
---
mapping_addr: {mapping_addr}
mapping_reload_interval_mins: 1 # Very fast for testing purposes
min_rpc_interval_ms: 0 # RIP RPC
max_batch_jobs: 1000 # Where we're going there's no oomkiller
symbol_groups:
@ -214,7 +215,7 @@ symbol_groups:
- group_name: longer_interval_sensitive_changes
conditions:
min_interval_secs: 10
price_changed_pct: 3
price_changed_bps: 300
symbols:
"""
@ -232,7 +233,7 @@ symbol_groups:
- group_name: mapping
conditions:
min_interval_secs: 30
price_changed_pct: 5
price_changed_bps: 500
symbols: []
"""