From 99246c24d4aa47c453bb225601acb8f2d9824ff1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stanis=C5=82aw=20Drozd?= Date: Tue, 17 Jan 2023 16:01:16 +0100 Subject: [PATCH] Drozdziak1/refactor p2w autoattest py (#502) * p2w_autoattest.py: Stop using non-daemon mode * wormhole_attester: Remove non-daemon mode and its uses, v3.0.0 We used to need non-daemon mode to run a manual healthcheck on the attester process and its configuration. Currently, we're able to handle this much better with the build-in healthcheck HTTP endpoint. For production, we should be able to get rid of p2w_autoattest.py entirely. Co-authored-by: Stan Drozd --- third_party/pyth/p2w_autoattest.py | 29 ----- wormhole-attester/Cargo.lock | 2 +- wormhole-attester/client/Cargo.toml | 2 +- wormhole-attester/client/src/cli.rs | 21 ---- wormhole-attester/client/src/main.rs | 159 +-------------------------- 5 files changed, 4 insertions(+), 209 deletions(-) diff --git a/third_party/pyth/p2w_autoattest.py b/third_party/pyth/p2w_autoattest.py index 8f8ed40a..da89c042 100755 --- a/third_party/pyth/p2w_autoattest.py +++ b/third_party/pyth/p2w_autoattest.py @@ -168,34 +168,6 @@ symbol_groups: # modules like async HTTP requests and tokio runtime logs os.environ["RUST_LOG"] = os.environ.get("RUST_LOG", "info") -# Send the first attestation in one-shot mode for testing -first_attest_result = run_or_die( - [ - "pwhac", - "--commitment", - "confirmed", - "--p2w-addr", - P2W_SOL_ADDRESS, - "--rpc-url", - SOL_RPC_URL, - "--payer", - SOL_PAYER_KEYPAIR, - "attest", - "-f", - P2W_ATTESTATION_CFG, - "--timeout", - P2W_RPC_TIMEOUT_SECS, - ], - capture_output=True, - debug = True, -) - -logging.info("p2w_autoattest ready to roll!") - -# Let k8s know the service is up -readiness_thread = threading.Thread(target=readiness, daemon=True) -readiness_thread.start() - # Do not exit this script if a continuous attestation stops for # whatever reason (this avoids k8s restart penalty) while True: @@ -214,7 +186,6 @@ while True: "attest", "-f", P2W_ATTESTATION_CFG, - "-d", "--timeout", P2W_RPC_TIMEOUT_SECS, ] diff --git a/wormhole-attester/Cargo.lock b/wormhole-attester/Cargo.lock index cfa74419..3ec78af4 100644 --- a/wormhole-attester/Cargo.lock +++ b/wormhole-attester/Cargo.lock @@ -2710,7 +2710,7 @@ dependencies = [ [[package]] name = "pyth-wormhole-attester-client" -version = "2.0.0" +version = "3.0.0" dependencies = [ "borsh", "clap 3.1.18", diff --git a/wormhole-attester/client/Cargo.toml b/wormhole-attester/client/Cargo.toml index 6ba2adf0..be98f4b7 100644 --- a/wormhole-attester/client/Cargo.toml +++ b/wormhole-attester/client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-wormhole-attester-client" -version = "2.0.0" +version = "3.0.0" edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/wormhole-attester/client/src/cli.rs b/wormhole-attester/client/src/cli.rs index 63d519f3..ee213564 100644 --- a/wormhole-attester/client/src/cli.rs +++ b/wormhole-attester/client/src/cli.rs @@ -59,27 +59,6 @@ pub enum Action { Attest { #[clap(short = 'f', long = "--config", help = "Attestation YAML config")] attestation_cfg: PathBuf, - #[clap( - short = 'n', - long = "--n-retries", - help = "How many times to retry send_transaction() on each batch before flagging a failure. Only active outside daemon mode", - default_value = "5" - )] - n_retries: usize, - #[clap( - short = 'i', - long = "--retry-interval", - help = "How long to wait between send_transaction - retries. Only active outside daemon mode", - default_value = "5" - )] - retry_interval_secs: u64, - #[clap( - short = 'd', - long = "--daemon", - help = "Do not stop attesting. In this mode, this program will behave more like a daemon and continuously attest the specified symbols." - )] - daemon: bool, #[clap( short = 't', long = "--timeout", diff --git a/wormhole-attester/client/src/main.rs b/wormhole-attester/client/src/main.rs index f1b73745..725d4d9a 100644 --- a/wormhole-attester/client/src/main.rs +++ b/wormhole-attester/client/src/main.rs @@ -223,11 +223,8 @@ async fn main() -> Result<(), ErrBox> { } Action::Attest { ref attestation_cfg, - n_retries, - retry_interval_secs, confirmation_timeout_secs, metrics_bind_addr, - daemon, } => { // Load the attestation config yaml let attestation_cfg: AttestationConfig = @@ -249,26 +246,7 @@ async fn main() -> Result<(), ErrBox> { Duration::from_millis(attestation_cfg.min_rpc_interval_ms), )); - if daemon { - handle_attest_daemon_mode( - rpc_cfg, - payer, - p2w_addr, - attestation_cfg, - metrics_bind_addr, - ) - .await?; - } else { - handle_attest_non_daemon_mode( - attestation_cfg, - rpc_cfg, - p2w_addr, - payer, - n_retries, - Duration::from_secs(retry_interval_secs), - ) - .await?; - } + handle_attest(rpc_cfg, payer, p2w_addr, attestation_cfg, metrics_bind_addr).await?; } Action::GetEmitter => unreachable! {}, // It is handled early in this function. Action::SetIsActive { @@ -296,7 +274,7 @@ async fn main() -> Result<(), ErrBox> { } /// Continuously send batch attestations for symbols of an attestation config. -async fn handle_attest_daemon_mode( +async fn handle_attest( rpc_cfg: Arc>, payer: Keypair, p2w_addr: Pubkey, @@ -463,76 +441,6 @@ async fn lock_and_make_rpc(rlmtx: &RLMutex) -> RpcClient { RpcClient::new_with_timeout_and_commitment(url, timeout, commitment) } -/// Non-daemon attestation scheduling -async fn handle_attest_non_daemon_mode( - attestation_cfg: AttestationConfig, - rpc_cfg: Arc>, - p2w_addr: Pubkey, - payer: Keypair, - n_retries: usize, - retry_interval: Duration, -) -> Result<(), ErrBox> { - let p2w_cfg = get_config_account(&lock_and_make_rpc(&rpc_cfg).await, &p2w_addr).await?; - - let batch_config = - attestation_config_to_batches(&rpc_cfg, &attestation_cfg, p2w_cfg.max_batch_size as usize) - .await - .unwrap_or_else(|_| { - attestation_cfg.instantiate_batches(&[], p2w_cfg.max_batch_size as usize) - }); - - let batches: Vec<_> = batch_config - .into_iter() - .map(|x| BatchState::new(&x)) - .collect(); - let batch_count = batches.len(); - - // For enforcing min_msg_reuse_interval_ms, we keep a piece of - // state that creates or reuses accounts if enough time had - // passed - let message_q_mtx = Arc::new(Mutex::new(P2WMessageQueue::new( - Duration::from_millis(attestation_cfg.min_msg_reuse_interval_ms), - attestation_cfg.max_msg_accounts as usize, - ))); - - let retry_jobs = batches.into_iter().enumerate().map(|(idx, batch_state)| { - attestation_retry_job(AttestationRetryJobArgs { - batch_no: idx + 1, - batch_count, - group_name: batch_state.group_name, - symbols: batch_state.symbols, - n_retries, - retry_interval, - rpc_cfg: rpc_cfg.clone(), - p2w_addr, - p2w_config: p2w_cfg.clone(), - payer: Keypair::from_bytes(&payer.to_bytes()).unwrap(), - message_q_mtx: message_q_mtx.clone(), - }) - }); - - let results = futures::future::join_all(retry_jobs).await; - - // After completing, we count any errors coming from the sched - // futs. - let errors: Vec<_> = results - .iter() - .enumerate() - .filter_map(|(idx, r)| { - r.as_ref() - .err() - .map(|e| format!("Error {}: {:?}\n", idx + 1, e)) - }) - .collect(); - - if !errors.is_empty() { - let err_lines = errors.join("\n"); - let msg = format!("{} batches failed:\n{}", errors.len(), err_lines); - error!("{}", msg); - return Err(msg.into()); - } - Ok(()) -} /// Generate batches to attest by retrieving the on-chain product account data and grouping it /// according to the configuration in `attestation_cfg`. @@ -692,69 +600,6 @@ async fn attestation_sched_job(args: AttestationSchedJobArgs) -> Result<(), ErrB } } -pub struct AttestationRetryJobArgs { - pub batch_no: usize, - pub batch_count: usize, - pub group_name: String, - pub symbols: Vec, - pub n_retries: usize, - pub retry_interval: Duration, - pub rpc_cfg: Arc>, - pub p2w_addr: Pubkey, - pub p2w_config: Pyth2WormholeConfig, - pub payer: Keypair, - pub message_q_mtx: Arc>, -} - -/// A future that cranks a batch up to n_retries times, pausing for -/// retry_interval in between; Used exclusively in non-daemon mode -async fn attestation_retry_job(args: AttestationRetryJobArgs) -> Result<(), ErrBoxSend> { - let AttestationRetryJobArgs { - batch_no, - batch_count, - group_name, - symbols, - n_retries, - retry_interval, - rpc_cfg, - p2w_addr, - p2w_config, - payer, - message_q_mtx, - } = args; - - let mut res = Err( - "attestation_retry_job INTERNAL: Could not get a single attestation job result" - .to_string() - .into(), - ); - - for _i in 0..=n_retries { - res = attestation_job(AttestationJobArgs { - rlmtx: rpc_cfg.clone(), - batch_no, - batch_count, - group_name: group_name.clone(), - p2w_addr, - config: p2w_config.clone(), - payer: Keypair::from_bytes(&payer.to_bytes()).unwrap(), // Keypair has no clone - symbols: symbols.clone(), - max_jobs_sema: Arc::new(Semaphore::new(1)), // Not important for non-daemon mode - message_q_mtx: message_q_mtx.clone(), - }) - .await; - - // Finish early on success - if res.is_ok() { - break; - } - - tokio::time::sleep(retry_interval).await; - } - - res -} - /// Arguments for attestation_job(). This struct rules out same-type /// ordering errors due to the large argument count pub struct AttestationJobArgs {