pyth2wormhole-client: Implement and use daemon mode (#192)

* pyth2wormhole-client: Implement and use daemon mode

commit-id:7001bdf7

* pyth2wormhole-client: harden CLI, simplify attestation loop

commit-id:ad5e4857

* pyth2wormhole-client: Use requires_if for batch_interval/daemon

commit-id:0da03fd5
This commit is contained in:
Stanisław Drozd 2022-05-02 13:08:03 +02:00 committed by GitHub
parent 8635668417
commit 97ebef0c6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 131 additions and 34 deletions

View File

@ -1918,9 +1918,9 @@ dependencies = [
[[package]] [[package]]
name = "pyth-sdk" name = "pyth-sdk"
version = "0.2.0" version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c610102a39fc4bae29a3b5a628ee134d25afb3dca3937692f5e634f1287fe0b4" checksum = "cb06993b8c8ad7f50042e8b6b6ae4ed2a98722495845b12efc9a12f4301b901b"
dependencies = [ dependencies = [
"borsh", "borsh",
"borsh-derive", "borsh-derive",
@ -1930,9 +1930,9 @@ dependencies = [
[[package]] [[package]]
name = "pyth-sdk-solana" name = "pyth-sdk-solana"
version = "0.2.0" version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1fdc94592a28fa829b0d6fa619392b1a1744048e5b78a74a4ba93cf541eddae" checksum = "b83f33cbdeccc350e021f6b4bc714655aa38352fac80a93b16b1902863aedb62"
dependencies = [ dependencies = [
"borsh", "borsh",
"borsh-derive", "borsh-derive",

View File

@ -7,7 +7,7 @@ use clap::Clap;
#[derive(Clap)] #[derive(Clap)]
#[clap( #[clap(
about = "A client for the pyth2wormhole Solana program", about = "A client for the pyth2wormhole Solana program",
author = "The Wormhole Project" author = "Pyth Network Contributors"
)] )]
pub struct Cli { pub struct Cli {
#[clap( #[clap(
@ -53,21 +53,35 @@ pub enum Action {
#[clap( #[clap(
short = 'n', short = 'n',
long = "--n-retries", long = "--n-retries",
about = "How many times to retry send_transaction() on each batch on failure", about = "How many times to retry send_transaction() on each batch before flagging a failure.",
default_value = "5" default_value = "5"
)] )]
n_retries: usize, n_retries: usize,
#[clap(
short = 'd',
long = "--daemon",
about = "Do not stop attesting. In this mode, this program will behave more like a daemon and continuously attest the specified symbols.",
)]
daemon: bool,
#[clap(
short = 'b',
long = "--batch-interval",
about = "How often in seconds to transmit each batch. Only active with --daemon.",
default_value = "30",
requires_if("true", "daemon"),
)]
batch_interval_secs: u64,
#[clap( #[clap(
short = 't', short = 't',
long = "--timeout", long = "--timeout",
about = "How many seconds to wait before giving up on get_transaction() for each batch", about = "How many seconds to wait before giving up on get_transaction() for tx confirmation.",
default_value = "40" default_value = "40"
)] )]
conf_timeout_secs: u64, conf_timeout_secs: u64,
#[clap( #[clap(
short = 'i', short = 'i',
long = "--rpc-interval", long = "--rpc-interval",
about = "How many milliseconds to waist between SOL RPC requests", about = "How many milliseconds to wait between SOL RPC requests",
default_value = "200" default_value = "200"
)] )]
rpc_interval_ms: u64, rpc_interval_ms: u64,

View File

@ -94,6 +94,8 @@ fn main() -> Result<(), ErrBox> {
Action::Attest { Action::Attest {
ref attestation_cfg, ref attestation_cfg,
n_retries, n_retries,
daemon,
batch_interval_secs,
conf_timeout_secs, conf_timeout_secs,
rpc_interval_ms, rpc_interval_ms,
} => { } => {
@ -107,6 +109,8 @@ fn main() -> Result<(), ErrBox> {
p2w_addr, p2w_addr,
&attestation_cfg, &attestation_cfg,
n_retries, n_retries,
daemon,
Duration::from_secs(batch_interval_secs),
Duration::from_secs(conf_timeout_secs), Duration::from_secs(conf_timeout_secs),
Duration::from_millis(rpc_interval_ms), Duration::from_millis(rpc_interval_ms),
)?; )?;
@ -129,12 +133,18 @@ pub enum BatchTxState<'a> {
sent_at: Instant, sent_at: Instant,
}, },
Success { Success {
symbols: &'a [P2WSymbol],
occured_at: Instant,
seqno: String, seqno: String,
}, },
FailedSend { FailedSend {
symbols: &'a [P2WSymbol],
occured_at: Instant,
last_err: ErrBox, last_err: ErrBox,
}, },
FailedConfirm { FailedConfirm {
symbols: &'a [P2WSymbol],
occured_at: Instant,
last_err: ErrBox, last_err: ErrBox,
}, },
} }
@ -148,6 +158,8 @@ fn handle_attest(
p2w_addr: Pubkey, p2w_addr: Pubkey,
attestation_cfg: &AttestationConfig, attestation_cfg: &AttestationConfig,
n_retries: usize, n_retries: usize,
daemon: bool,
batch_interval: Duration,
conf_timeout: Duration, conf_timeout: Duration,
rpc_interval: Duration, rpc_interval: Duration,
) -> Result<(), ErrBox> { ) -> Result<(), ErrBox> {
@ -195,10 +207,11 @@ fn handle_attest(
}) })
.collect(); .collect();
// NOTE(2022-04-26): only increment this if `daemon` is false
let mut finished_count = 0; let mut finished_count = 0;
// TODO(2021-03-09): Extract logic into helper functions // TODO(2021-03-09): Extract logic into helper functions
while finished_count < batches.len() { while daemon || finished_count < batches.len() {
finished_count = 0; finished_count = 0;
for (batch_no, state) in batches.iter_mut() { for (batch_no, state) in batches.iter_mut() {
match state { match state {
@ -279,7 +292,11 @@ fn handle_attest(
batch_count, batch_count,
n_retries + 1 n_retries + 1
); );
*state = BatchTxState::FailedSend { last_err: e }; *state = BatchTxState::FailedSend {
symbols,
occured_at: Instant::now(),
last_err: e,
};
} }
} }
} }
@ -317,7 +334,11 @@ fn handle_attest(
println!("Sequence number: {}", seqno); println!("Sequence number: {}", seqno);
info!("Batch {}/{}: OK, seqno {}", batch_no, batch_count, seqno); info!("Batch {}/{}: OK, seqno {}", batch_no, batch_count, seqno);
*state = BatchTxState::Success { seqno }; *state = BatchTxState::Success {
symbols,
seqno,
occured_at: Instant::now(),
};
} }
Err(e) => { Err(e) => {
let elapsed = sent_at.elapsed(); let elapsed = sent_at.elapsed();
@ -359,15 +380,44 @@ fn handle_attest(
batch_count, batch_count,
n_retries + 1 n_retries + 1
); );
*state = BatchTxState::FailedConfirm { last_err: e }; *state = BatchTxState::FailedConfirm {
symbols,
occured_at: Instant::now(),
last_err: e,
};
} }
} }
} }
} }
} }
Success { .. } | FailedSend { .. } | FailedConfirm { .. } => { Success {
finished_count += 1; // Gather terminal states for top-level loop exit symbols,
continue; // No requests were made, skip sleep occured_at,
..
}
| FailedSend {
symbols,
occured_at,
..
}
| FailedConfirm {
symbols,
occured_at,
..
} => {
// We only try to re-schedule under --daemon
if daemon {
if occured_at.elapsed() > batch_interval {
*state = BatchTxState::Sending {
symbols,
attempt_no: 1,
};
}
}
// Track the finished batches
finished_count += 1;
continue; // No RPC requests are made any of these cases, skip sleep
} }
} }
@ -382,7 +432,7 @@ fn handle_attest(
use BatchTxState::*; use BatchTxState::*;
match state { match state {
Success { .. } => {} Success { .. } => {}
FailedSend { last_err } | FailedConfirm { last_err } => { FailedSend { last_err, .. } | FailedConfirm { last_err, .. } => {
errors.push(last_err.to_string()) errors.push(last_err.to_string())
} }
other => { other => {

View File

@ -10,6 +10,7 @@ import threading
import time import time
from http.client import HTTPConnection from http.client import HTTPConnection
from http.server import BaseHTTPRequestHandler, HTTPServer from http.server import BaseHTTPRequestHandler, HTTPServer
from subprocess import PIPE, STDOUT, Popen
from pyth_utils import * from pyth_utils import *
@ -20,7 +21,6 @@ logging.basicConfig(
P2W_SOL_ADDRESS = os.environ.get( P2W_SOL_ADDRESS = os.environ.get(
"P2W_SOL_ADDRESS", "P2WH424242424242424242424242424242424242424" "P2W_SOL_ADDRESS", "P2WH424242424242424242424242424242424242424"
) )
P2W_ATTEST_INTERVAL = float(os.environ.get("P2W_ATTEST_INTERVAL", 5))
P2W_OWNER_KEYPAIR = os.environ.get( P2W_OWNER_KEYPAIR = os.environ.get(
"P2W_OWNER_KEYPAIR", "/usr/src/solana/keys/p2w_owner.json" "P2W_OWNER_KEYPAIR", "/usr/src/solana/keys/p2w_owner.json"
) )
@ -36,12 +36,15 @@ WORMHOLE_ADDRESS = os.environ.get(
"WORMHOLE_ADDRESS", "Bridge1p5gheXUvJ6jGWGeCsgPKgnE3YgdGKRVCMY9o" "WORMHOLE_ADDRESS", "Bridge1p5gheXUvJ6jGWGeCsgPKgnE3YgdGKRVCMY9o"
) )
P2W_EXIT_ON_ERROR = os.environ.get("P2W_EXIT_ON_ERROR", "False").lower() == "true" P2W_MAX_LOG_LINES = int(os.environ.get("P2W_MAX_LOG_LINES", 1000))
ATTESTATIONS = { ATTESTATIONS = {
"pendingSeqnos": [], "pendingSeqnos": [],
} }
SEQNO_REGEX = re.compile(r"Sequence number: (\d+)")
class P2WAutoattestStatusEndpoint(BaseHTTPRequestHandler): class P2WAutoattestStatusEndpoint(BaseHTTPRequestHandler):
""" """
@ -86,6 +89,16 @@ if SOL_AIRDROP_AMT > 0:
], ],
) )
def find_and_log_seqnos(s):
# parse seqnos
matches = SEQNO_REGEX.findall(s)
seqnos = list(map(lambda m: int(m), matches))
ATTESTATIONS["pendingSeqnos"] += seqnos
if len(seqnos) > 0:
logging.info(f"{len(seqnos)} batch seqno(s) received: {seqnos})")
if P2W_INITIALIZE_SOL_CONTRACT is not None: if P2W_INITIALIZE_SOL_CONTRACT is not None:
# Get actor pubkeys # Get actor pubkeys
@ -188,7 +201,8 @@ symbols:"""
f.flush() f.flush()
attest_result = run_or_die( # Send the first attestation in one-shot mode for testing
first_attest_result = run_or_die(
[ [
"pyth2wormhole-client", "pyth2wormhole-client",
"--log-level", "--log-level",
@ -207,7 +221,8 @@ attest_result = run_or_die(
) )
logging.info("p2w_autoattest ready to roll!") logging.info("p2w_autoattest ready to roll!")
logging.info(f"Attest Interval: {P2W_ATTEST_INTERVAL}")
find_and_log_seqnos(first_attest_result.stdout)
# Serve p2w endpoint # Serve p2w endpoint
endpoint_thread = threading.Thread(target=serve_attestations, daemon=True) endpoint_thread = threading.Thread(target=serve_attestations, daemon=True)
@ -217,18 +232,12 @@ endpoint_thread.start()
readiness_thread = threading.Thread(target=readiness, daemon=True) readiness_thread = threading.Thread(target=readiness, daemon=True)
readiness_thread.start() readiness_thread.start()
seqno_regex = re.compile(r"Sequence number: (\d+)")
# Do not exit this script if a continuous attestation stops for
# whatever reason (this avoids k8s restart penalty)
while True: while True:
matches = seqno_regex.findall(attest_result.stdout) # Start the child process in daemon mode
p2w_client_process = Popen(
seqnos = list(map(lambda m: int(m), matches))
ATTESTATIONS["pendingSeqnos"] += seqnos
logging.info(f"{len(seqnos)} batch seqno(s) received: {seqnos})")
attest_result = run_or_die(
[ [
"pyth2wormhole-client", "pyth2wormhole-client",
"--log-level", "--log-level",
@ -242,8 +251,32 @@ while True:
"attest", "attest",
"-f", "-f",
P2W_ATTESTATION_CFG, P2W_ATTESTATION_CFG,
"-d",
], ],
capture_output=True, stdout=PIPE,
die=P2W_EXIT_ON_ERROR, stderr=STDOUT,
) text=True,
time.sleep(P2W_ATTEST_INTERVAL) )
saved_log_lines = []
# Keep listening for seqnos until the program exits
while p2w_client_process.poll() is None:
line = p2w_client_process.stdout.readline()
# Always pass output to the debug level
logging.debug(f"pyth2wormhole-client: {line}")
find_and_log_seqnos(line)
# Extend with new line
saved_log_lines.append(line)
# trim back to specified maximum
if len(saved_log_lines) > P2W_MAX_LOG_LINES:
saved_log_lines.pop(0)
# Yell if the supposedly non-stop attestation process exits
logging.warn(f"pyth2wormhole-client stopped unexpectedly with code {p2w_client_process.retcode}")
logging.warn(f"Last {len(saved_log_lines)} log lines:\n{(saved_log_lines)}")