Drozdziak1/refactor p2w autoattest py (#502)

* p2w_autoattest.py: Stop using non-daemon mode

* wormhole_attester: Remove non-daemon mode and its uses, v3.0.0

We used to need non-daemon mode to run a manual healthcheck on the
attester process and its configuration. Currently, we're able to
handle this much better with the build-in healthcheck HTTP
endpoint. For production, we should be able to get rid of
p2w_autoattest.py entirely.

Co-authored-by: Stan Drozd <stan@pyth.network>
This commit is contained in:
Stanisław Drozd 2023-01-17 16:01:16 +01:00 committed by GitHub
parent 9cbdeb3588
commit 99246c24d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 4 additions and 209 deletions

View File

@ -168,34 +168,6 @@ symbol_groups:
# modules like async HTTP requests and tokio runtime logs # modules like async HTTP requests and tokio runtime logs
os.environ["RUST_LOG"] = os.environ.get("RUST_LOG", "info") os.environ["RUST_LOG"] = os.environ.get("RUST_LOG", "info")
# Send the first attestation in one-shot mode for testing
first_attest_result = run_or_die(
[
"pwhac",
"--commitment",
"confirmed",
"--p2w-addr",
P2W_SOL_ADDRESS,
"--rpc-url",
SOL_RPC_URL,
"--payer",
SOL_PAYER_KEYPAIR,
"attest",
"-f",
P2W_ATTESTATION_CFG,
"--timeout",
P2W_RPC_TIMEOUT_SECS,
],
capture_output=True,
debug = True,
)
logging.info("p2w_autoattest ready to roll!")
# Let k8s know the service is up
readiness_thread = threading.Thread(target=readiness, daemon=True)
readiness_thread.start()
# Do not exit this script if a continuous attestation stops for # Do not exit this script if a continuous attestation stops for
# whatever reason (this avoids k8s restart penalty) # whatever reason (this avoids k8s restart penalty)
while True: while True:
@ -214,7 +186,6 @@ while True:
"attest", "attest",
"-f", "-f",
P2W_ATTESTATION_CFG, P2W_ATTESTATION_CFG,
"-d",
"--timeout", "--timeout",
P2W_RPC_TIMEOUT_SECS, P2W_RPC_TIMEOUT_SECS,
] ]

View File

@ -2710,7 +2710,7 @@ dependencies = [
[[package]] [[package]]
name = "pyth-wormhole-attester-client" name = "pyth-wormhole-attester-client"
version = "2.0.0" version = "3.0.0"
dependencies = [ dependencies = [
"borsh", "borsh",
"clap 3.1.18", "clap 3.1.18",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "pyth-wormhole-attester-client" name = "pyth-wormhole-attester-client"
version = "2.0.0" version = "3.0.0"
edition = "2018" edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View File

@ -59,27 +59,6 @@ pub enum Action {
Attest { Attest {
#[clap(short = 'f', long = "--config", help = "Attestation YAML config")] #[clap(short = 'f', long = "--config", help = "Attestation YAML config")]
attestation_cfg: PathBuf, attestation_cfg: PathBuf,
#[clap(
short = 'n',
long = "--n-retries",
help = "How many times to retry send_transaction() on each batch before flagging a failure. Only active outside daemon mode",
default_value = "5"
)]
n_retries: usize,
#[clap(
short = 'i',
long = "--retry-interval",
help = "How long to wait between send_transaction
retries. Only active outside daemon mode",
default_value = "5"
)]
retry_interval_secs: u64,
#[clap(
short = 'd',
long = "--daemon",
help = "Do not stop attesting. In this mode, this program will behave more like a daemon and continuously attest the specified symbols."
)]
daemon: bool,
#[clap( #[clap(
short = 't', short = 't',
long = "--timeout", long = "--timeout",

View File

@ -223,11 +223,8 @@ async fn main() -> Result<(), ErrBox> {
} }
Action::Attest { Action::Attest {
ref attestation_cfg, ref attestation_cfg,
n_retries,
retry_interval_secs,
confirmation_timeout_secs, confirmation_timeout_secs,
metrics_bind_addr, metrics_bind_addr,
daemon,
} => { } => {
// Load the attestation config yaml // Load the attestation config yaml
let attestation_cfg: AttestationConfig = let attestation_cfg: AttestationConfig =
@ -249,26 +246,7 @@ async fn main() -> Result<(), ErrBox> {
Duration::from_millis(attestation_cfg.min_rpc_interval_ms), Duration::from_millis(attestation_cfg.min_rpc_interval_ms),
)); ));
if daemon { handle_attest(rpc_cfg, payer, p2w_addr, attestation_cfg, metrics_bind_addr).await?;
handle_attest_daemon_mode(
rpc_cfg,
payer,
p2w_addr,
attestation_cfg,
metrics_bind_addr,
)
.await?;
} else {
handle_attest_non_daemon_mode(
attestation_cfg,
rpc_cfg,
p2w_addr,
payer,
n_retries,
Duration::from_secs(retry_interval_secs),
)
.await?;
}
} }
Action::GetEmitter => unreachable! {}, // It is handled early in this function. Action::GetEmitter => unreachable! {}, // It is handled early in this function.
Action::SetIsActive { Action::SetIsActive {
@ -296,7 +274,7 @@ async fn main() -> Result<(), ErrBox> {
} }
/// Continuously send batch attestations for symbols of an attestation config. /// Continuously send batch attestations for symbols of an attestation config.
async fn handle_attest_daemon_mode( async fn handle_attest(
rpc_cfg: Arc<RLMutex<RpcCfg>>, rpc_cfg: Arc<RLMutex<RpcCfg>>,
payer: Keypair, payer: Keypair,
p2w_addr: Pubkey, p2w_addr: Pubkey,
@ -463,76 +441,6 @@ async fn lock_and_make_rpc(rlmtx: &RLMutex<RpcCfg>) -> RpcClient {
RpcClient::new_with_timeout_and_commitment(url, timeout, commitment) RpcClient::new_with_timeout_and_commitment(url, timeout, commitment)
} }
/// Non-daemon attestation scheduling
async fn handle_attest_non_daemon_mode(
attestation_cfg: AttestationConfig,
rpc_cfg: Arc<RLMutex<RpcCfg>>,
p2w_addr: Pubkey,
payer: Keypair,
n_retries: usize,
retry_interval: Duration,
) -> Result<(), ErrBox> {
let p2w_cfg = get_config_account(&lock_and_make_rpc(&rpc_cfg).await, &p2w_addr).await?;
let batch_config =
attestation_config_to_batches(&rpc_cfg, &attestation_cfg, p2w_cfg.max_batch_size as usize)
.await
.unwrap_or_else(|_| {
attestation_cfg.instantiate_batches(&[], p2w_cfg.max_batch_size as usize)
});
let batches: Vec<_> = batch_config
.into_iter()
.map(|x| BatchState::new(&x))
.collect();
let batch_count = batches.len();
// For enforcing min_msg_reuse_interval_ms, we keep a piece of
// state that creates or reuses accounts if enough time had
// passed
let message_q_mtx = Arc::new(Mutex::new(P2WMessageQueue::new(
Duration::from_millis(attestation_cfg.min_msg_reuse_interval_ms),
attestation_cfg.max_msg_accounts as usize,
)));
let retry_jobs = batches.into_iter().enumerate().map(|(idx, batch_state)| {
attestation_retry_job(AttestationRetryJobArgs {
batch_no: idx + 1,
batch_count,
group_name: batch_state.group_name,
symbols: batch_state.symbols,
n_retries,
retry_interval,
rpc_cfg: rpc_cfg.clone(),
p2w_addr,
p2w_config: p2w_cfg.clone(),
payer: Keypair::from_bytes(&payer.to_bytes()).unwrap(),
message_q_mtx: message_q_mtx.clone(),
})
});
let results = futures::future::join_all(retry_jobs).await;
// After completing, we count any errors coming from the sched
// futs.
let errors: Vec<_> = results
.iter()
.enumerate()
.filter_map(|(idx, r)| {
r.as_ref()
.err()
.map(|e| format!("Error {}: {:?}\n", idx + 1, e))
})
.collect();
if !errors.is_empty() {
let err_lines = errors.join("\n");
let msg = format!("{} batches failed:\n{}", errors.len(), err_lines);
error!("{}", msg);
return Err(msg.into());
}
Ok(())
}
/// Generate batches to attest by retrieving the on-chain product account data and grouping it /// Generate batches to attest by retrieving the on-chain product account data and grouping it
/// according to the configuration in `attestation_cfg`. /// according to the configuration in `attestation_cfg`.
@ -692,69 +600,6 @@ async fn attestation_sched_job(args: AttestationSchedJobArgs) -> Result<(), ErrB
} }
} }
pub struct AttestationRetryJobArgs {
pub batch_no: usize,
pub batch_count: usize,
pub group_name: String,
pub symbols: Vec<P2WSymbol>,
pub n_retries: usize,
pub retry_interval: Duration,
pub rpc_cfg: Arc<RLMutex<RpcCfg>>,
pub p2w_addr: Pubkey,
pub p2w_config: Pyth2WormholeConfig,
pub payer: Keypair,
pub message_q_mtx: Arc<Mutex<P2WMessageQueue>>,
}
/// A future that cranks a batch up to n_retries times, pausing for
/// retry_interval in between; Used exclusively in non-daemon mode
async fn attestation_retry_job(args: AttestationRetryJobArgs) -> Result<(), ErrBoxSend> {
let AttestationRetryJobArgs {
batch_no,
batch_count,
group_name,
symbols,
n_retries,
retry_interval,
rpc_cfg,
p2w_addr,
p2w_config,
payer,
message_q_mtx,
} = args;
let mut res = Err(
"attestation_retry_job INTERNAL: Could not get a single attestation job result"
.to_string()
.into(),
);
for _i in 0..=n_retries {
res = attestation_job(AttestationJobArgs {
rlmtx: rpc_cfg.clone(),
batch_no,
batch_count,
group_name: group_name.clone(),
p2w_addr,
config: p2w_config.clone(),
payer: Keypair::from_bytes(&payer.to_bytes()).unwrap(), // Keypair has no clone
symbols: symbols.clone(),
max_jobs_sema: Arc::new(Semaphore::new(1)), // Not important for non-daemon mode
message_q_mtx: message_q_mtx.clone(),
})
.await;
// Finish early on success
if res.is_ok() {
break;
}
tokio::time::sleep(retry_interval).await;
}
res
}
/// Arguments for attestation_job(). This struct rules out same-type /// Arguments for attestation_job(). This struct rules out same-type
/// ordering errors due to the large argument count /// ordering errors due to the large argument count
pub struct AttestationJobArgs { pub struct AttestationJobArgs {