From f0552e5f1b6c835c5a1c03bcd63d32ff11b7b4d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stanis=C5=82aw=20Drozd?= Date: Mon, 18 Jul 2022 14:40:03 +0200 Subject: [PATCH] pyth2wormhole-client: refactor into fully-async futures-based approach (#219) * pyth2wormhole-client: refactor into fully-async futures-based approach commit-id:2ed35045 * p2w-client: Change inconsistent rpc constructor commit-id:cb3b2ff2 * p2w-client: Move job creation to a function, simplify comment commit-id:35328b38 * pyth2wormhole-client: Use get_multiple_accounts commit-id:7fc85157 * Implement a rate-limited mutex for RPC client commit-id:1a243063 * pyth2wormhole-client: only guard beginning new requests in RLMutex commit-id:d8251474 * pyth2wormhole-client: RLMutex: ensure the inner guard is not dropped commit-id:c3513f5e * pyth2wormhole-client: Clarify attestation_sched_futs comment commit-id:97033670 * pyth2wormhole-client: Use CommitmentConfig's native FromStr parsing commit-id:835d7125 * pyth2wormhole-client: doc comment typo commit-id:5ee388de * pyth2wormhole-client: move closures to their own async functions This makes the main.rs async attestation routines easier to read. commit-id:3565a744 * pyth2wormhole-client: fix merge typo * pyth2wormhole-client: Apply Tom's readability advice * pyth2wormhole-client reword attestation_sched_job() comment * pyth2wormhole-client: expand attestation_sched_job() comment * pyth2wormhole-client: e x p a n d the comment * Trigger CI * p2w-client/main.rs: correct missing awaits after merge --- solana/pyth2wormhole/Cargo.lock | 2 + solana/pyth2wormhole/client/Cargo.toml | 2 + .../client/src/attestation_cfg.rs | 18 +- .../pyth2wormhole/client/src/batch_state.rs | 111 ++- solana/pyth2wormhole/client/src/cli.rs | 32 +- solana/pyth2wormhole/client/src/lib.rs | 38 +- solana/pyth2wormhole/client/src/main.rs | 632 +++++++++--------- solana/pyth2wormhole/client/src/util.rs | 97 +++ .../pyth2wormhole/client/tests/test_attest.rs | 2 +- solana/pyth2wormhole/program/src/migrate.rs | 9 +- third_party/pyth/p2w_autoattest.py | 17 +- 11 files changed, 541 insertions(+), 419 deletions(-) create mode 100644 solana/pyth2wormhole/client/src/util.rs diff --git a/solana/pyth2wormhole/Cargo.lock b/solana/pyth2wormhole/Cargo.lock index f428c1dd..ad00c217 100644 --- a/solana/pyth2wormhole/Cargo.lock +++ b/solana/pyth2wormhole/Cargo.lock @@ -2153,6 +2153,7 @@ dependencies = [ "borsh", "clap 3.1.18", "env_logger 0.8.4", + "futures", "log", "p2w-sdk", "pyth-client 0.5.1", @@ -2168,6 +2169,7 @@ dependencies = [ "solana-transaction-status", "solitaire", "solitaire-client", + "tokio", "wormhole-bridge-solana", ] diff --git a/solana/pyth2wormhole/client/Cargo.toml b/solana/pyth2wormhole/client/Cargo.toml index 35a6a3ae..47217474 100644 --- a/solana/pyth2wormhole/client/Cargo.toml +++ b/solana/pyth2wormhole/client/Cargo.toml @@ -29,6 +29,8 @@ solana-sdk = "=1.10.13" solana-transaction-status = "=1.10.13" solitaire-client = {path = "../../solitaire/client"} solitaire = {path = "../../solitaire/program"} +tokio = {version = "1", features = ["sync", "rt", "time"]} +futures = "0.3.21" [dev-dependencies] pyth-client = "0.5.0" diff --git a/solana/pyth2wormhole/client/src/attestation_cfg.rs b/solana/pyth2wormhole/client/src/attestation_cfg.rs index 10bb8281..4e2dffd4 100644 --- a/solana/pyth2wormhole/client/src/attestation_cfg.rs +++ b/solana/pyth2wormhole/client/src/attestation_cfg.rs @@ -26,20 +26,30 @@ pub struct SymbolGroup { pub symbols: Vec, } -pub const fn DEFAULT_MIN_INTERVAL_SECS() -> u64 { +pub const fn default_min_interval_secs() -> u64 { 60 } +pub const fn default_max_batch_jobs() -> usize { + 20 +} + /// Spontaneous attestation triggers. Attestation is triggered if any /// of the active conditions is met. Option<> fields can be /// de-activated with None. All conditions are inactive by default, -/// except for min_interval_secs set to 1 minute. +/// except for the non-Option ones. #[derive(Clone, Default, Debug, Deserialize, Serialize, PartialEq)] pub struct AttestationConditions { /// Baseline, unconditional attestation interval. Attestation is triggered if the specified interval elapsed since last attestation. - #[serde(default = "DEFAULT_MIN_INTERVAL_SECS")] + #[serde(default = "default_min_interval_secs")] pub min_interval_secs: u64, + /// Limit concurrent attestation attempts per batch. This setting + /// should act only as a failsafe cap on resource consumption and is + /// best set well above the expected average number of jobs. + #[serde(default = "default_max_batch_jobs")] + pub max_batch_jobs: usize, + /// Trigger attestation if price changes by the specified percentage. #[serde(default)] pub price_changed_pct: Option, @@ -51,7 +61,7 @@ pub struct AttestationConditions { } /// Config entry for a Pyth product + price pair -#[derive(Default, Debug, Deserialize, Serialize, PartialEq, Eq)] +#[derive(Clone, Default, Debug, Deserialize, Serialize, PartialEq, Eq)] pub struct P2WSymbol { /// User-defined human-readable name pub name: Option, diff --git a/solana/pyth2wormhole/client/src/batch_state.rs b/solana/pyth2wormhole/client/src/batch_state.rs index f09b69a7..fd5078c6 100644 --- a/solana/pyth2wormhole/client/src/batch_state.rs +++ b/solana/pyth2wormhole/client/src/batch_state.rs @@ -1,8 +1,10 @@ +use futures::future::TryFutureExt; use log::{ debug, + trace, warn, }; -use solana_client::rpc_client::RpcClient; +use solana_client::nonblocking::rpc_client::RpcClient; use solana_sdk::signature::Signature; use pyth_sdk_solana::state::PriceAccount; @@ -16,17 +18,18 @@ use crate::{ AttestationConditions, ErrBox, P2WSymbol, + RLMutex, }; +/// Runtime representation of a batch. It refers to the original group +/// from the config. #[derive(Debug)] pub struct BatchState<'a> { pub group_name: String, pub symbols: &'a [P2WSymbol], pub last_known_symbol_states: Vec>, pub conditions: AttestationConditions, - status: BatchTxStatus, - status_changed_at: Instant, - pub last_success_at: Option, + pub last_job_finished_at: Instant, } impl<'a> BatchState<'a> { @@ -40,63 +43,51 @@ impl<'a> BatchState<'a> { symbols, conditions, last_known_symbol_states: vec![None; symbols.len()], - status: BatchTxStatus::Sending { attempt_no: 1 }, - status_changed_at: Instant::now(), - last_success_at: None, + last_job_finished_at: Instant::now(), } } - /// Ensure only set_status() alters the timestamp - pub fn get_status_changed_at(&self) -> &Instant { - &self.status_changed_at - } - pub fn get_status(&self) -> &BatchTxStatus { - &self.status - } - - /// Ensure that status changes are accompanied by a timestamp bump - pub fn set_status(&mut self, s: BatchTxStatus) { - self.status_changed_at = Instant::now(); - self.status = s; - } /// Evaluate the configured attestation conditions for this /// batch. RPC is used to update last known state. Returns /// Some("") if any trigger condition was met. Only the /// first encountered condition is mentioned. - pub fn should_resend(&mut self, c: &RpcClient) -> Option { + pub async fn should_resend(&mut self, c: &RpcClient) -> Option { let mut ret = None; let sym_count = self.symbols.len(); - let mut new_symbol_states: Vec> = Vec::with_capacity(sym_count); - for (idx, sym) in self.symbols.iter().enumerate() { - let new_state = match c - .get_account_data(&sym.price_addr) - .map_err(|e| e.to_string()) - .and_then(|bytes| { - pyth_sdk_solana::state::load_price_account(&bytes) - .map(|state| state.clone()) - .map_err(|e| e.to_string()) - }) { - Ok(state) => Some(state), - Err(e) => { - warn!( - "Symbol {} ({}/{}): Could not look up state: {}", - sym.name - .as_ref() - .unwrap_or(&format!("Unnamed product {}", sym.product_addr)), - idx + 1, - sym_count, - e.to_string() - ); - None - } - }; + let pubkeys: Vec<_> = self.symbols.iter().map(|s| s.price_addr).collect(); - new_symbol_states.push(new_state); - } + // Always learn the current on-chain state for each symbol, use None values if lookup fails + let mut new_symbol_states: Vec> = match c + .get_multiple_accounts(&pubkeys) + .await + { + Ok(acc_opts) => { + acc_opts + .into_iter() + .enumerate() + .map(|(idx, opt)| { + // Take each Some(acc), make it None and log on load_price_account() error + opt.and_then(|acc| { + pyth_sdk_solana::state::load_price_account(&acc.data) + .cloned() // load_price_account() transmutes the data reference into another reference, and owning acc_opts is not enough + .map_err(|e| { + warn!("Could not parse symbol {}/{}: {}", idx, sym_count, e); + e + }) + .ok() // Err becomes None + }) + }) + .collect() + } + Err(e) => { + warn!("Could not look up any symbols on-chain: {}", e); + vec![None; sym_count] + } + }; // min interval - if self.get_status_changed_at().elapsed() + if self.last_job_finished_at.elapsed() > Duration::from_secs(self.conditions.min_interval_secs) { ret = Some(format!( @@ -154,7 +145,9 @@ impl<'a> BatchState<'a> { } } - // Update with newer state if a condition was met + // Update with newer state only if a condition was met. We + // don't want to shadow changes that may happen over a larger + // period between state lookups. if ret.is_some() { for (old, new) in self .last_known_symbol_states @@ -170,23 +163,3 @@ impl<'a> BatchState<'a> { return ret; } } - -#[derive(Debug)] -pub enum BatchTxStatus { - Sending { - attempt_no: usize, - }, - Confirming { - attempt_no: usize, - signature: Signature, - }, - Success { - seqno: String, - }, - FailedSend { - last_err: ErrBox, - }, - FailedConfirm { - last_err: ErrBox, - }, -} diff --git a/solana/pyth2wormhole/client/src/cli.rs b/solana/pyth2wormhole/client/src/cli.rs index 7391c613..dfd523fc 100644 --- a/solana/pyth2wormhole/client/src/cli.rs +++ b/solana/pyth2wormhole/client/src/cli.rs @@ -1,6 +1,7 @@ //! CLI options use solana_program::pubkey::Pubkey; +use solana_sdk::commitment_config::CommitmentConfig; use std::path::PathBuf; use clap::{ @@ -29,6 +30,14 @@ pub struct Cli { pub payer: String, #[clap(short, long, default_value = "http://localhost:8899")] pub rpc_url: String, + #[clap( + long = "rpc-interval", + default_value = "150", + help = "Rate-limiting minimum delay between RPC requests in milliseconds" + )] + pub rpc_interval_ms: u64, + #[clap(long, default_value = "confirmed")] + pub commitment: CommitmentConfig, #[clap(long)] pub p2w_addr: Pubkey, #[clap(subcommand)] @@ -60,10 +69,18 @@ pub enum Action { #[clap( short = 'n', long = "--n-retries", - help = "How many times to retry send_transaction() on each batch before flagging a failure.", + help = "How many times to retry send_transaction() on each batch before flagging a failure. Only active outside daemon mode", default_value = "5" )] n_retries: usize, + #[clap( + short = 'i', + long = "--retry-interval", + help = "How long to wait between send_transaction + retries. Only active outside daemon mode", + default_value = "5" + )] + retry_interval_secs: u64, #[clap( short = 'd', long = "--daemon", @@ -73,17 +90,10 @@ pub enum Action { #[clap( short = 't', long = "--timeout", - help = "How many seconds to wait before giving up on get_transaction() for tx confirmation.", - default_value = "40" + help = "How many seconds to wait before giving up on tx confirmation.", + default_value = "20" )] - conf_timeout_secs: u64, - #[clap( - short = 'i', - long = "--rpc-interval", - help = "How many milliseconds to wait between SOL RPC requests", - default_value = "200" - )] - rpc_interval_ms: u64, + confirmation_timeout_secs: u64, }, #[clap(about = "Retrieve a pyth2wormhole program's current settings")] GetConfig, diff --git a/solana/pyth2wormhole/client/src/lib.rs b/solana/pyth2wormhole/client/src/lib.rs index 1d2879b7..210a8fa4 100644 --- a/solana/pyth2wormhole/client/src/lib.rs +++ b/solana/pyth2wormhole/client/src/lib.rs @@ -1,11 +1,12 @@ pub mod attestation_cfg; pub mod batch_state; +pub mod util; use borsh::{ BorshDeserialize, BorshSerialize, }; -use solana_client::rpc_client::RpcClient; +use solana_client::nonblocking::rpc_client::RpcClient; use solana_program::{ hash::Hash, instruction::{ @@ -51,19 +52,24 @@ use pyth2wormhole::{ migrate::MigrateAccounts, set_config::SetConfigAccounts, AttestData, - Pyth2WormholeConfig, }; +pub use pyth2wormhole::Pyth2WormholeConfig; + pub use attestation_cfg::{ AttestationConditions, AttestationConfig, P2WSymbol, }; -pub use batch_state::{ - BatchState, - BatchTxStatus, +pub use batch_state::BatchState; +pub use util::{ + RLMutex, + RLMutexGuard, }; +/// Future-friendly version of solitaire::ErrBox +pub type ErrBoxSend = Box; + pub fn gen_init_tx( payer: Keypair, p2w_addr: Pubkey, @@ -159,14 +165,17 @@ pub fn gen_migrate_tx( } /// Get the current config account data for given p2w program address -pub fn get_config_account( +pub async fn get_config_account( rpc_client: &RpcClient, p2w_addr: &Pubkey, ) -> Result { let p2w_config_addr = P2WConfigAccount::<{ AccountState::Initialized }>::key(None, p2w_addr); let config = Pyth2WormholeConfig::try_from_slice( - rpc_client.get_account_data(&p2w_config_addr)?.as_slice(), + rpc_client + .get_account_data(&p2w_config_addr) + .await? + .as_slice(), )?; Ok(config) @@ -181,7 +190,7 @@ pub fn gen_attest_tx( symbols: &[P2WSymbol], wh_msg: &Keypair, latest_blockhash: Hash, -) -> Result { +) -> Result { let emitter_addr = P2WEmitter::key(None, &p2w_addr); let seq_addr = Sequence::key( @@ -193,11 +202,11 @@ pub fn gen_attest_tx( let p2w_config_addr = P2WConfigAccount::<{ AccountState::Initialized }>::key(None, &p2w_addr); if symbols.len() > p2w_config.max_batch_size as usize { - return Err(format!( + return Err((format!( "Expected up to {} symbols for batch, {} were found", p2w_config.max_batch_size, symbols.len() - ) + )) .into()); } // Initial attest() accounts @@ -267,7 +276,14 @@ pub fn gen_attest_tx( }, ); - let ix = Instruction::new_with_bytes(p2w_addr, ix_data.try_to_vec()?.as_slice(), acc_metas); + let ix = Instruction::new_with_bytes( + p2w_addr, + ix_data + .try_to_vec() + .map_err(|e| -> ErrBoxSend { Box::new(e) })? + .as_slice(), + acc_metas, + ); let tx_signed = Transaction::new_signed_with_payer::>( &[ix], diff --git a/solana/pyth2wormhole/client/src/main.rs b/solana/pyth2wormhole/client/src/main.rs index c7d56f61..26d0be19 100644 --- a/solana/pyth2wormhole/client/src/main.rs +++ b/solana/pyth2wormhole/client/src/main.rs @@ -2,6 +2,8 @@ pub mod cli; use std::{ fs::File, + pin::Pin, + sync::Arc, thread, time::{ Duration, @@ -10,6 +12,12 @@ use std::{ }; use clap::Parser; +use futures::future::{ + Future, + FutureExt, + TryFuture, + TryFutureExt, +}; use log::{ debug, error, @@ -18,7 +26,10 @@ use log::{ warn, LevelFilter, }; -use solana_client::rpc_client::RpcClient; +use solana_client::{ + client_error::ClientError, + nonblocking::rpc_client::RpcClient, +}; use solana_program::pubkey::Pubkey; use solana_sdk::{ commitment_config::CommitmentConfig, @@ -33,6 +44,10 @@ use solitaire::{ ErrBox, }; use solitaire_client::Keypair; +use tokio::{ + sync::Semaphore, + task::JoinHandle, +}; use cli::{ Action, @@ -49,16 +64,18 @@ use pyth2wormhole_client::*; pub const SEQNO_PREFIX: &'static str = "Program log: Sequence: "; -fn main() -> Result<(), ErrBox> { +#[tokio::main] +async fn main() -> Result<(), ErrBox> { let cli = Cli::parse(); init_logging(cli.log_level); let payer = read_keypair_file(&*shellexpand::tilde(&cli.payer))?; - let rpc_client = RpcClient::new_with_commitment(cli.rpc_url, CommitmentConfig::confirmed()); + + let rpc_client = RpcClient::new_with_commitment(cli.rpc_url.clone(), cli.commitment.clone()); let p2w_addr = cli.p2w_addr; - let latest_blockhash = rpc_client.get_latest_blockhash()?; + let latest_blockhash = rpc_client.get_latest_blockhash().await?; match cli.action { Action::Init { @@ -79,10 +96,16 @@ fn main() -> Result<(), ErrBox> { }, latest_blockhash, )?; - rpc_client.send_and_confirm_transaction_with_spinner(&tx)?; + rpc_client + .send_and_confirm_transaction_with_spinner(&tx) + .await?; + println!( + "Initialized with conifg:\n{:?}", + get_config_account(&rpc_client, &p2w_addr).await? + ); } Action::GetConfig => { - println!("{:?}", get_config_account(&rpc_client, &p2w_addr)?); + println!("{:?}", get_config_account(&rpc_client, &p2w_addr).await?); } Action::SetConfig { ref owner, @@ -91,7 +114,7 @@ fn main() -> Result<(), ErrBox> { new_pyth_owner_addr, is_active, } => { - let old_config = get_config_account(&rpc_client, &p2w_addr)?; + let old_config = get_config_account(&rpc_client, &p2w_addr).await?; let tx = gen_set_config_tx( payer, p2w_addr, @@ -105,10 +128,12 @@ fn main() -> Result<(), ErrBox> { }, latest_blockhash, )?; - rpc_client.send_and_confirm_transaction_with_spinner(&tx)?; + rpc_client + .send_and_confirm_transaction_with_spinner(&tx) + .await?; println!( "Applied conifg:\n{:?}", - get_config_account(&rpc_client, &p2w_addr)? + get_config_account(&rpc_client, &p2w_addr).await? ); } Action::Migrate { @@ -120,58 +145,69 @@ fn main() -> Result<(), ErrBox> { read_keypair_file(&*shellexpand::tilde(&owner))?, latest_blockhash, )?; - rpc_client.send_and_confirm_transaction_with_spinner(&tx)?; + rpc_client.send_and_confirm_transaction_with_spinner(&tx).await?; println!( "Applied conifg:\n{:?}", - get_config_account(&rpc_client, &p2w_addr)? + get_config_account(&rpc_client, &p2w_addr).await? ); } Action::Attest { ref attestation_cfg, n_retries, + retry_interval_secs, + confirmation_timeout_secs, daemon, - conf_timeout_secs, - rpc_interval_ms, } => { // Load the attestation config yaml let attestation_cfg: AttestationConfig = serde_yaml::from_reader(File::open(attestation_cfg)?)?; handle_attest( - &rpc_client, + cli.rpc_url, + Duration::from_millis(cli.rpc_interval_ms), + cli.commitment, payer, p2w_addr, - &attestation_cfg, + attestation_cfg, n_retries, + Duration::from_secs(retry_interval_secs), + Duration::from_secs(confirmation_timeout_secs), daemon, - Duration::from_secs(conf_timeout_secs), - Duration::from_millis(rpc_interval_ms), - )?; + ) + .await?; } } Ok(()) } -use BatchTxStatus::*; - /// Send a series of batch attestations for symbols of an attestation config. -fn handle_attest( - rpc_client: &RpcClient, +async fn handle_attest( + rpc_url: String, + rpc_interval: Duration, + commitment: CommitmentConfig, payer: Keypair, p2w_addr: Pubkey, - attestation_cfg: &AttestationConfig, + attestation_cfg: AttestationConfig, n_retries: usize, + retry_interval: Duration, + confirmation_timeout: Duration, daemon: bool, - conf_timeout: Duration, - rpc_interval: Duration, ) -> Result<(), ErrBox> { // Derive seeded accounts let emitter_addr = P2WEmitter::key(None, &p2w_addr); info!("Using emitter addr {}", emitter_addr); - let config = get_config_account(rpc_client, &p2w_addr)?; + let config = get_config_account( + &RpcClient::new_with_timeout_and_commitment( + rpc_url.clone(), + confirmation_timeout, + commitment.clone(), + ), + &p2w_addr, + ) + .await?; debug!("Symbol config:\n{:#?}", attestation_cfg); @@ -185,7 +221,6 @@ fn handle_attest( .symbol_groups .iter() .map(|g| { - // FIXME: The forbidden nested closure move technique (a lost art of pleasing the borrow checker) let conditions4closure = g.conditions.clone(); let name4closure = g.group_name.clone(); @@ -205,301 +240,276 @@ fn handle_attest( .collect(); let batch_count = batches.len(); - // NOTE(2022-04-26): only increment this if `daemon` is false - let mut finished_count = 0; + /// Note: For global rate-limitting of RPC requests, we use a + /// custom Mutex wrapper which enforces a delay of rpc_interval + /// between RPC accesses. + let rpc_cfg = Arc::new(RLMutex::new( + RpcCfg { + url: rpc_url, + timeout: confirmation_timeout, + commitment: commitment.clone(), + }, + rpc_interval, + )); - // Stats - // TODO(2022-05-12): These should become Prometheus metrics in the future - let mut stats_start_time = Instant::now(); - let mut tx_successes = 0; - let mut tx_send_failures = 0; - let mut tx_confirm_failures = 0; + // Create attestation scheduling routines; see attestation_sched_job() for details + let mut attestation_sched_futs = batches.into_iter().map(|(batch_no, batch)| { + attestation_sched_job( + batch, + batch_no, + batch_count, + n_retries, + retry_interval, + daemon, + rpc_cfg.clone(), + p2w_addr, + config.clone(), + Keypair::from_bytes(&payer.to_bytes()).unwrap(), + ) + }); - let mut batch_wait_times = Duration::ZERO; + info!("Spinning up attestation sched jobs"); - // TODO(2021-03-09): Extract logic into helper functions - while daemon || finished_count < batches.len() { - finished_count = 0; - for (batch_no, state) in batches.iter_mut() { - match state.get_status().clone() { - Sending { attempt_no } => { - info!( - "Batch {}/{} contents (group {:?}): {:?}", - batch_no, - batch_count, - state.group_name, - state - .symbols - .iter() - .map(|s| s - .name - .clone() - .unwrap_or(format!("unnamed product {:?}", s.product_addr))) - .collect::>() - ); + let results = futures::future::join_all(attestation_sched_futs).await; // May never finish for daemon mode - // Send the transaction - let res = rpc_client - .get_latest_blockhash() - .map_err(|e| -> ErrBox { e.into() }) - .and_then(|latest_blockhash| { - let tx_signed = gen_attest_tx( - p2w_addr, - &config, - &payer, - state.symbols, - &Keypair::new(), - latest_blockhash, - )?; + info!("Got {} results", results.len()); - rpc_client - .send_transaction(&tx_signed) - .map_err(|e| -> ErrBox { e.into() }) - }); - - // Individual batch errors mustn't prevent other batches from being sent. - match res { - Ok(signature) => { - info!( - "Batch {}/{} (group {:?}) tx send: OK (Attempt {} of {})", - batch_no, batch_count, state.group_name, attempt_no, n_retries - ); - - state.set_status(Confirming { - attempt_no: *attempt_no, - signature, - }); - } - Err(e) => { - let msg = format!( - "Batch {}/{} (group {:?}) tx send error (attempt {} of {}): {}", - batch_no, - batch_count, - state.group_name, - attempt_no, - n_retries + 1, - e.to_string() - ); - warn!("{}", &msg); - - tx_send_failures += 1; - - if attempt_no < &n_retries { - state.set_status(Sending { - attempt_no: attempt_no + 1, - }) - } else { - // This batch failed all attempts, note the error but do not schedule for retry - error!( - "Batch {}/{} (group {:?}) tx send: All {} attempts failed", - state.group_name, - batch_no, - batch_count, - n_retries + 1 - ); - state.set_status(FailedSend { last_err: e }); - } - } - } - } - Confirming { - attempt_no, - signature, - } => { - let res = rpc_client - .get_transaction(&signature, UiTransactionEncoding::Json) - .map_err(|e| -> ErrBox { e.into() }) - .and_then(|this_tx| { - this_tx - .transaction - .meta - .and_then(|meta| meta.log_messages) - .and_then(|logs| { - let mut seqno = None; - for log in logs { - if log.starts_with(SEQNO_PREFIX) { - seqno = Some(log.replace(SEQNO_PREFIX, "")); - break; - } - } - seqno - }) - .ok_or_else(|| format!("No seqno in program logs").into()) - }); - - match res { - Ok(seqno) => { - // NOTE(2022-03-09): p2w_autoattest.py relies on parsing this println!() - println!("Sequence number: {}", seqno); - info!("Batch {}/{}: OK, seqno {}", batch_no, batch_count, seqno); - - tx_successes += 1; - - // Include delay for average - if let Some(t) = state.last_success_at.as_ref() { - batch_wait_times += t.elapsed(); - } - - state.last_success_at = Some(Instant::now()); - state.set_status(Success { seqno }); - } - Err(e) => { - let elapsed = state.get_status_changed_at().elapsed(); - let msg = format!( - "Batch {}/{} (group {:?}) tx confirmation failed ({}.{}/{}.{}): {}", - batch_no, - batch_count, - state.group_name, - elapsed.as_secs(), - elapsed.subsec_millis(), - conf_timeout.as_secs(), - conf_timeout.subsec_millis(), - e.to_string() - ); - debug!("{}", &msg); // Output volume usually not suitable for warn!() - - if elapsed > conf_timeout { - // This batch exceeded the timeout, - // note the error and schedule for a - // fresh send attempt - warn!( - "Batch {}/{} (group {:?}) tx confirm: Took more than {}.{} seconds (attempt {} of {}): {}", - state.group_name, - batch_no, - batch_count, - conf_timeout.as_secs(), - conf_timeout.subsec_millis(), - attempt_no, n_retries, - msg - ); - - tx_confirm_failures += 1; - - if attempt_no < &n_retries { - state.set_status(Sending { - attempt_no: attempt_no + 1, - }); - } else { - error!( - "Batch {}/{} (group {:?}) tx confirm: All {} attempts failed", - state.group_name, - batch_no, - batch_count, - n_retries + 1 - ); - state.set_status(FailedConfirm { last_err: e }); - } - } - } - } - } - Success { .. } | FailedSend { .. } | FailedConfirm { .. } => { - // We only try to re-schedule under --daemon - if daemon { - if let Some(reason) = state.should_resend(rpc_client) { - info!( - "Batch {}/{} (group {:?}): resending (reason: {})", - batch_no, batch_count, state.group_name, reason, - ); - state.set_status(Sending { attempt_no: 1 }); - } else { - let elapsed = state.get_status_changed_at().elapsed(); - trace!( - "Batch {}/{} (group {:?}): waiting ({}.{}s elapsed)", - batch_no, - batch_count, - state.group_name, - elapsed.as_secs(), - elapsed.subsec_millis(), - ) - } - } else { - // Track the finished batches outside daemon mode - finished_count += 1; - - // No RPC requests are made on terminal states outside daemon mode, skip sleep - continue; - } - } - } - - thread::sleep(rpc_interval); - } - - // Print stats on every pass through the batches - let stats_seconds_elapsed = stats_start_time.elapsed().as_millis() as f64 / 1000.0; - let tps = tx_successes as f64 / stats_seconds_elapsed; - let sym_ps = tps * config.max_batch_size as f64; - - let mut total = (tx_successes + tx_send_failures + tx_confirm_failures) as f64; - // Avoid division by 0 - if total < 0.0001 { - total = 0.0001; - } - - let successes_pct = tx_successes as f64 / total * 100.0; - let send_failures_pct = tx_send_failures as f64 / total * 100.0; - let confirm_failures_pct = tx_confirm_failures as f64 / total * 100.0; - - info!( - "Stats since start: -Runtime: {}s -TPS: {:.4} tx/s ({:.4} symbols/s) -Total attestation attempts: {} -Successful txs: {} ({:.2})% -Sending failures: {} ({:.2})% -Confirmation failures: {} ({:.2})% -Average batch resend delay: {:.2}s", - stats_start_time.elapsed().as_secs(), - tps, - sym_ps, - total, - tx_successes, - successes_pct, - tx_send_failures, - send_failures_pct, - tx_confirm_failures, - confirm_failures_pct, - batch_wait_times.as_secs() as f64 / (tx_successes as f64 + 0.000001), - ); - } - - let mut errors = Vec::new(); - - // Filter out errors - for (batch_no, state) in batches { - match state.get_status() { - Success { .. } => {} - FailedSend { last_err, .. } | FailedConfirm { last_err, .. } => { - errors.push(last_err.to_string()) - } - other => { - // Be loud about non-terminal states left behind - let msg = format!( - "INTERNAL: Batch {} left in non-terminal state {:#?}", - batch_no, other - ); - - error!("{}", msg); - - errors.push(msg); - } - } - } + // With daemon mode off, the sched jobs return from the + // join_all. We filter out errors and report them + let errors: Vec<_> = results + .iter() + .filter_map(|r| r.as_ref().err().map(|e| e.to_string())) + .collect(); if !errors.is_empty() { - let err_list = errors.join("\n"); - - Err(format!( + let err_lines = errors.join("\n"); + let msg = format!( "{} of {} batches failed:\n{}", errors.len(), batch_count, - err_list - ) - .into()) - } else { - Ok(()) + err_lines + ); + error!("{}", msg); + return Err(msg.into()); } + + Ok(()) +} + +#[derive(Clone)] +pub struct RpcCfg { + pub url: String, + pub timeout: Duration, + pub commitment: CommitmentConfig, +} + +/// Helper function for claiming the rate-limited mutex and constructing an RPC instance +async fn lock_and_make_rpc(rlmtx: &RLMutex) -> RpcClient { + let RpcCfg { + url, + timeout, + commitment, + } = rlmtx.lock().await.clone(); + RpcClient::new_with_timeout_and_commitment(url, timeout, commitment) +} + +/// A future that decides how a batch is sent. +/// +/// In daemon mode, attestations of the batch are scheduled +/// continuously using spawn(), which means that a next attestation of +/// the same batch begins immediately when a condition is met without +/// waiting for the previous attempt to finish. Subsequent +/// attestations are started according to the attestation_conditions +/// field on the batch. Concurrent requests per batch are limited by +/// the max_batch_jobs field to prevent excess memory usage on network +/// slowdowns etc.. +/// +/// With daemon_mode off, this future attempts only one blocking +/// attestation of the batch and returns the result. +async fn attestation_sched_job( + mut batch: BatchState<'_>, + batch_no: usize, + batch_count: usize, + n_retries: usize, + retry_interval: Duration, + daemon: bool, + rpc_cfg: Arc>, + p2w_addr: Pubkey, + config: Pyth2WormholeConfig, + payer: Keypair, +) -> Result<(), ErrBoxSend> { + let mut retries_left = n_retries; + // Enforces the max batch job count + let sema = Arc::new(Semaphore::new(batch.conditions.max_batch_jobs)); + loop { + debug!( + "Batch {}/{}, group {:?}: Scheduling attestation job", + batch_no, batch_count, batch.group_name + ); + + let job = attestation_job( + rpc_cfg.clone(), + batch_no, + batch_count, + batch.group_name.clone(), + p2w_addr, + config.clone(), + Keypair::from_bytes(&payer.to_bytes()).unwrap(), // Keypair has no clone + batch.symbols.to_vec(), + sema.clone(), + ); + + if daemon { + // park this routine until a resend condition is met + loop { + if let Some(reason) = batch + .should_resend(&lock_and_make_rpc(&rpc_cfg).await) + .await + { + info!( + "Batch {}/{}, group {}: Resending (reason: {:?})", + batch_no, batch_count, batch.group_name, reason + ); + break; + } + } + + if sema.available_permits() == 0 { + warn!( + "Batch {}/{}, group {:?}: Ran out of job \ + permits, some attestation conditions may be \ + delayed. For better accuracy, increase \ + max_batch_jobs or adjust attestation \ + conditions", + batch_no, batch_count, batch.group_name + ); + } + + // This short-lived permit prevents scheduling + // excess attestation jobs (which could eventually + // eat all memory). It is freed as soon as we + // leave this code block. + let _permit4sched = sema.acquire().await?; + + let batch_no4err_msg = batch_no.clone(); + let batch_count4err_msg = batch_count.clone(); + let group_name4err_msg = batch.group_name.clone(); + + // We never get to error reporting in daemon mode, attach a map_err + let job_with_err_msg = job.map_err(move |e| async move { + warn!( + "Batch {}/{}, group {:?} ERR: {}", + batch_no4err_msg, + batch_count4err_msg, + group_name4err_msg, + e.to_string() + ); + e + }); + + // Spawn the job in background + let _detached_job: JoinHandle<_> = tokio::spawn(job_with_err_msg); + } else { + // Await and return the single result in non-daemon mode, with retries if necessary + match job.await { + Ok(_) => return Ok(()), + Err(e) => { + if retries_left == 0 { + return Err(e); + } else { + retries_left -= 1; + debug!( + "{}/{}, group {:?}: attestation failure: {}", + batch_no, + batch_count, + batch.group_name, + e.to_string() + ); + info!( + "Batch {}/{}, group {:?}: retrying in {}.{}s, {} retries left", + batch_no, + batch_count, + batch.group_name, + retry_interval.as_secs(), + retry_interval.subsec_millis(), + retries_left, + ); + + tokio::time::sleep(retry_interval).await; + } + } + } + } + + batch.last_job_finished_at = Instant::now(); + } +} + +/// A future for a single attempt to attest a batch on Solana. +async fn attestation_job( + rlmtx: Arc>, + batch_no: usize, + batch_count: usize, + group_name: String, + p2w_addr: Pubkey, + config: Pyth2WormholeConfig, + payer: Keypair, + symbols: Vec, + max_jobs_sema: Arc, +) -> Result<(), ErrBoxSend> { + // Will be dropped after attestation is complete + let _permit = max_jobs_sema.acquire().await?; + + debug!( + "Batch {}/{}, group {:?}: Starting attestation job", + batch_no, batch_count, group_name + ); + let rpc = lock_and_make_rpc(&*rlmtx).await; // Reuse the same lock for the blockhash/tx/get_transaction + let latest_blockhash = rpc + .get_latest_blockhash() + .map_err(|e| -> ErrBoxSend { e.into() }) + .await?; + + let tx_res: Result<_, ErrBoxSend> = gen_attest_tx( + p2w_addr, + &config, + &payer, + symbols.as_slice(), + &Keypair::new(), + latest_blockhash, + ); + let tx = tx_res?; + let sig = rpc + .send_and_confirm_transaction(&tx) + .map_err(|e| -> ErrBoxSend { e.into() }) + .await?; + let tx_data = rpc + .get_transaction(&sig, UiTransactionEncoding::Json) + .map_err(|e| -> ErrBoxSend { e.into() }) + .await?; + let seqno = tx_data + .transaction + .meta + .and_then(|meta| meta.log_messages) + .and_then(|logs| { + let mut seqno = None; + for log in logs { + if log.starts_with(SEQNO_PREFIX) { + seqno = Some(log.replace(SEQNO_PREFIX, "")); + break; + } + } + seqno + }) + .ok_or_else(|| -> ErrBoxSend { format!("No seqno in program logs").into() })?; + + info!( + "Batch {}/{}, group {:?} OK", + batch_no, batch_count, group_name + ); + // NOTE(2022-03-09): p2w_autoattest.py relies on parsing this println!{} + println!("Sequence number: {}", seqno); + Result::<(), ErrBoxSend>::Ok(()) } fn init_logging(verbosity: u32) { diff --git a/solana/pyth2wormhole/client/src/util.rs b/solana/pyth2wormhole/client/src/util.rs new file mode 100644 index 00000000..c87a0db2 --- /dev/null +++ b/solana/pyth2wormhole/client/src/util.rs @@ -0,0 +1,97 @@ +use log::trace; + +use std::{ + ops::{ + Deref, + DerefMut, + }, + time::{ + Duration, + Instant, + }, +}; +use tokio::sync::{ + Mutex, + MutexGuard, +}; + +/// Rate-limited mutex. Ensures there's a period of minimum rl_interval between lock acquisitions +pub struct RLMutex { + mtx: Mutex>, + rl_interval: Duration, +} + +/// Helper to make the last_released writes also guarded by the mutex +pub struct RLMutexState { + /// Helps make sure regular passage of time is subtracted from sleep duration + last_released: Instant, + val: T, +} + +impl Deref for RLMutexState { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.val + } +} + +impl DerefMut for RLMutexState { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.val + } +} + +/// Helper wrapper to record lock release times via Drop +pub struct RLMutexGuard<'a, T> { + guard: MutexGuard<'a, RLMutexState>, +} + +impl<'a, T> Drop for RLMutexGuard<'a, T> { + fn drop(&mut self) { + let state: &mut RLMutexState = + MutexGuard::<'a, RLMutexState>::deref_mut(&mut self.guard); + state.last_released = Instant::now(); + } +} + +impl<'a, T> Deref for RLMutexGuard<'a, T> { + type Target = T; + fn deref(&self) -> &Self::Target { + self.guard.deref() + } +} + +impl<'a, T> DerefMut for RLMutexGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + self.guard.deref_mut() + } +} + +impl RLMutex { + pub fn new(val: T, rl_interval: Duration) -> Self { + Self { + mtx: Mutex::new(RLMutexState { + last_released: Instant::now() - rl_interval, + val, + }), + rl_interval, + } + } + + pub async fn lock(&self) -> RLMutexGuard<'_, T> { + let guard = self.mtx.lock().await; + let elapsed = guard.last_released.elapsed(); + if elapsed < self.rl_interval { + let sleep_time = self.rl_interval - elapsed; + trace!( + "RLMutex: Parking lock future for {}.{}s", + sleep_time.as_secs(), + sleep_time.subsec_millis() + ); + + tokio::time::sleep(sleep_time).await; + } + + RLMutexGuard { guard } + } +} diff --git a/solana/pyth2wormhole/client/tests/test_attest.rs b/solana/pyth2wormhole/client/tests/test_attest.rs index 45a4326c..0109bbf4 100644 --- a/solana/pyth2wormhole/client/tests/test_attest.rs +++ b/solana/pyth2wormhole/client/tests/test_attest.rs @@ -37,7 +37,7 @@ use fixtures::{ }; #[tokio::test] -async fn test_happy_path() -> Result<(), solitaire::ErrBox> { +async fn test_happy_path() -> Result<(), p2wc::ErrBoxSend> { // Programs let p2w_program_id = Pubkey::new_unique(); let wh_fixture_program_id = Pubkey::new_unique(); diff --git a/solana/pyth2wormhole/program/src/migrate.rs b/solana/pyth2wormhole/program/src/migrate.rs index d52edacc..4035e7ad 100644 --- a/solana/pyth2wormhole/program/src/migrate.rs +++ b/solana/pyth2wormhole/program/src/migrate.rs @@ -51,11 +51,7 @@ impl<'b> InstructionContext<'b> for Migrate<'b> { } } -pub fn migrate( - ctx: &ExecutionContext, - accs: &mut Migrate, - data: (), -) -> SoliResult<()> { +pub fn migrate(ctx: &ExecutionContext, accs: &mut Migrate, data: ()) -> SoliResult<()> { let old_config: &OldPyth2WormholeConfig = &accs.old_config.1; if &old_config.owner != accs.current_owner.info().key { @@ -82,7 +78,8 @@ pub fn migrate( **accs.old_config.info().lamports.borrow_mut() = 0; // Credit payer with saved balance - accs.payer.info() + accs.payer + .info() .lamports .borrow_mut() .checked_add(old_config_balance_val) diff --git a/third_party/pyth/p2w_autoattest.py b/third_party/pyth/p2w_autoattest.py index bf21069d..f29046d0 100755 --- a/third_party/pyth/p2w_autoattest.py +++ b/third_party/pyth/p2w_autoattest.py @@ -184,9 +184,9 @@ if P2W_ATTESTATION_CFG is None: cfg_yaml = """ --- symbol_groups: - - group_name: things + - group_name: fast_interval_only conditions: - min_interval_secs: 17 + min_interval_secs: 3 symbols: """ @@ -206,9 +206,10 @@ symbol_groups: product_addr: {product}""" cfg_yaml += f""" - - group_name: stuff + - group_name: longer_interval_sensitive_changes conditions: - min_interval_secs: 19 + min_interval_secs: 10 + price_changed_pct: 3 symbols: """ @@ -231,8 +232,10 @@ symbol_groups: first_attest_result = run_or_die( [ "pyth2wormhole-client", + "--commitment", + "finalized", "--log-level", - "4", + "3", "--p2w-addr", P2W_SOL_ADDRESS, "--rpc-url", @@ -266,8 +269,10 @@ while True: p2w_client_process = Popen( [ "pyth2wormhole-client", + "--commitment", + "finalized", "--log-level", - "4", + "3", "--p2w-addr", P2W_SOL_ADDRESS, "--rpc-url",