attester: Add an on-chain last attestation timestamp and rate limit arg (#622)

* attester: Add an on-chain last attestation timestamp and rate limit

In consequence, attester clients are able to rate-limit attestations
among _all_ active attesters - because the new last attestation timestamp is kept up
to date on chain. Ultimately, this value being shared by concurrent clients,
this feature limits our tx expenses while fulfilling our preferred attestation rates.

* attester: Use custom error code instead of log for rate limit

* attester: Add defaults for default attestation conditions

* attester: Use a dedicated function for rate limit default

* attester: Option<u32> -> u32 rate limit interval

This lets users pass 0 to disable the feature (0-rate limiting means
no rate limiting at all), which was not possible with the Option type.
This commit is contained in:
Stanisław Drozd 2023-02-24 15:03:57 +01:00 committed by GitHub
parent ae88640422
commit 1978d73b94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 120 additions and 26 deletions

View File

@ -116,9 +116,10 @@ max_batch_jobs: 1000 # Where we're going there's no oomkiller
default_attestation_conditions:
min_interval_secs: 10
symbol_groups:
- group_name: fast_interval_only
- group_name: fast_interval_rate_limited
conditions:
min_interval_secs: 1
rate_limit_interval_secs: 2
symbols:
"""

View File

@ -66,6 +66,7 @@ pub struct AttestationConfig {
/// Attestation conditions that will be used for any symbols included in the mapping
/// that aren't explicitly in one of the groups below, and any groups without explicitly
/// configured attestation conditions.
#[serde(default)]
pub default_attestation_conditions: AttestationConditions,
/// Groups of symbols to publish.
@ -317,20 +318,35 @@ pub const fn default_min_interval_secs() -> u64 {
60
}
pub const fn default_rate_limit_interval_secs() -> u32 {
1
}
pub const fn default_max_batch_jobs() -> usize {
20
}
/// Spontaneous attestation triggers. Attestation is triggered if any
/// of the active conditions is met. Option<> fields can be
/// Per-group attestation resend rules. Attestation is triggered if
/// any of the active conditions is met. Option<> fields can be
/// de-activated with None. All conditions are inactive by default,
/// except for the non-Option ones.
#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)]
pub struct AttestationConditions {
/// Baseline, unconditional attestation interval. Attestation is triggered if the specified interval elapsed since last attestation.
/// Lower bound on attestation rate. Attestation is triggered
/// unconditionally whenever the specified interval elapses since
/// last attestation.
#[serde(default = "default_min_interval_secs")]
pub min_interval_secs: u64,
/// Upper bound on attestation rate. Attesting the same batch
/// before this many seconds pass fails the tx. This limit is
/// enforced on-chain, letting concurret attesters prevent
/// redundant batch resends and tx expenses. NOTE: The client
/// logic does not include rate limit failures in monitoring error
/// counts. 0 effectively disables this feature.
#[serde(default = "default_rate_limit_interval_secs")]
pub rate_limit_interval_secs: u32,
/// Limit concurrent attestation attempts per batch. This setting
/// should act only as a failsafe cap on resource consumption and is
/// best set well above the expected average number of jobs.
@ -358,6 +374,7 @@ impl AttestationConditions {
max_batch_jobs: _max_batch_jobs,
price_changed_bps,
publish_time_min_delta_secs,
rate_limit_interval_secs: _,
} = self;
price_changed_bps.is_some() || publish_time_min_delta_secs.is_some()
@ -371,6 +388,7 @@ impl Default for AttestationConditions {
max_batch_jobs: default_max_batch_jobs(),
price_changed_bps: None,
publish_time_min_delta_secs: None,
rate_limit_interval_secs: default_rate_limit_interval_secs(),
}
}
}

View File

@ -298,6 +298,9 @@ pub fn gen_attest_tx(
wh_msg_id: u64,
symbols: &[P2WSymbol],
latest_blockhash: Hash,
// Desired rate limit interval. If all of the symbols are over
// the limit, the tx will fail. 0 means off.
rate_limit_interval_secs: u32,
) -> Result<Transaction, ErrBoxSend> {
let emitter_addr = P2WEmitter::key(None, &p2w_addr);
@ -390,8 +393,9 @@ pub fn gen_attest_tx(
let ix_data = (
pyth_wormhole_attester::instruction::Instruction::Attest,
AttestData {
consistency_level: ConsistencyLevel::Confirmed,
consistency_level: ConsistencyLevel::Confirmed,
message_account_id: wh_msg_id,
rate_limit_interval_secs,
},
);

View File

@ -1,3 +1,9 @@
use {
pyth_wormhole_attester::error::AttesterCustomError,
solana_program::instruction::InstructionError,
solana_sdk::transaction::TransactionError,
};
pub mod cli;
use {
@ -585,6 +591,7 @@ async fn attestation_sched_job(args: AttestationSchedJobArgs) -> Result<(), ErrB
symbols: batch.symbols.to_vec(),
max_jobs_sema: sema.clone(),
message_q_mtx: message_q_mtx.clone(),
rate_limit_interval_secs: batch.conditions.rate_limit_interval_secs,
});
// This short-lived permit prevents scheduling excess
@ -603,16 +610,17 @@ async fn attestation_sched_job(args: AttestationSchedJobArgs) -> Result<(), ErrB
/// Arguments for attestation_job(). This struct rules out same-type
/// ordering errors due to the large argument count
pub struct AttestationJobArgs {
pub rlmtx: Arc<RLMutex<RpcCfg>>,
pub batch_no: usize,
pub batch_count: usize,
pub group_name: String,
pub p2w_addr: Pubkey,
pub config: Pyth2WormholeConfig,
pub payer: Keypair,
pub symbols: Vec<P2WSymbol>,
pub max_jobs_sema: Arc<Semaphore>,
pub message_q_mtx: Arc<Mutex<P2WMessageQueue>>,
pub rlmtx: Arc<RLMutex<RpcCfg>>,
pub batch_no: usize,
pub batch_count: usize,
pub group_name: String,
pub p2w_addr: Pubkey,
pub config: Pyth2WormholeConfig,
pub payer: Keypair,
pub symbols: Vec<P2WSymbol>,
pub max_jobs_sema: Arc<Semaphore>,
pub rate_limit_interval_secs: u32,
pub message_q_mtx: Arc<Mutex<P2WMessageQueue>>,
}
/// A future for a single attempt to attest a batch on Solana.
@ -627,6 +635,7 @@ async fn attestation_job(args: AttestationJobArgs) -> Result<(), ErrBoxSend> {
payer,
symbols,
max_jobs_sema,
rate_limit_interval_secs,
message_q_mtx,
} = args;
let batch_no4err_msg = batch_no;
@ -662,21 +671,37 @@ async fn attestation_job(args: AttestationJobArgs) -> Result<(), ErrBoxSend> {
let wh_msg_id = message_q_mtx.lock().await.get_account()?.id;
let tx_res: Result<_, ErrBoxSend> = gen_attest_tx(
let tx = gen_attest_tx(
p2w_addr,
&config,
&payer,
wh_msg_id,
symbols.as_slice(),
latest_blockhash,
);
rate_limit_interval_secs,
)?;
let tx_processing_start_time = Instant::now();
let sig = rpc
.send_and_confirm_transaction(&tx_res?)
.map_err(|e| -> ErrBoxSend { e.into() })
.await?;
let sig = match rpc.send_and_confirm_transaction(&tx).await {
Ok(s) => Ok(s),
Err(e) => match e.get_transaction_error() {
Some(TransactionError::InstructionError(_idx, InstructionError::Custom(code)))
if code == AttesterCustomError::AttestRateLimitReached as u32 =>
{
info!(
"Batch {}/{}, group {:?} OK: configured {} second rate limit interval reached, backing off",
batch_no, batch_count, group_name, rate_limit_interval_secs,
);
// Note: We return early if rate limit tx
// error is detected. This ensures that we
// don't count this attempt in ok/err
// monitoring and healthcheck counters.
return Ok(());
}
_other => Err(e),
},
}?;
let tx_data = rpc
.get_transaction_with_config(
&sig,

View File

@ -111,6 +111,7 @@ async fn test_happy_path() -> Result<(), p2wc::ErrBoxSend> {
0,
symbols.as_slice(),
ctx.last_blockhash,
0,
)?;
// NOTE: 2022-09-05

View File

@ -2,6 +2,7 @@ use {
crate::{
attestation_state::AttestationStatePDA,
config::P2WConfigAccount,
error::AttesterCustomError,
message::{
P2WMessage,
P2WMessageDrvData,
@ -127,8 +128,17 @@ pub struct Attest<'b> {
#[derive(BorshDeserialize, BorshSerialize)]
pub struct AttestData {
pub consistency_level: ConsistencyLevel,
pub message_account_id: u64,
pub consistency_level: ConsistencyLevel,
pub message_account_id: u64,
/// Fail the transaction if the global attestation rate of all
/// symbols in this batch is more frequent than the passed
/// interval. This is checked using the attestation time stored in
/// attestation state. This enables all of the clients to only
/// contribute attestations if their desired interval is not
/// already reached. If at least one symbol has been waiting
/// longer than this interval, we attest the whole batch. 0
/// effectively disables this feature.
pub rate_limit_interval_secs: u32,
}
pub fn attest(ctx: &ExecutionContext, accs: &mut Attest, data: AttestData) -> SoliResult<()> {
@ -180,6 +190,10 @@ pub fn attest(ctx: &ExecutionContext, accs: &mut Attest, data: AttestData) -> So
// Collect the validated symbols here for batch serialization
let mut attestations = Vec::with_capacity(price_pairs.len());
let this_attestation_time = accs.clock.unix_timestamp;
let mut over_rate_limit = true;
for (state, price) in price_pairs.into_iter() {
// Pyth must own the price
if accs.config.pyth_owner != *price.owner {
@ -200,8 +214,6 @@ pub fn attest(ctx: &ExecutionContext, accs: &mut Attest, data: AttestData) -> So
return Err(ProgramError::InvalidAccountData.into());
}
let attestation_time = accs.clock.unix_timestamp;
let price_data_ref = price.try_borrow_data()?;
// Parse the upstream Pyth struct to extract current publish
@ -214,6 +226,7 @@ pub fn attest(ctx: &ExecutionContext, accs: &mut Attest, data: AttestData) -> So
// Retrieve and rotate last_attested_tradind_publish_time
// Pick the value to store for the next attestation of this
// symbol. We use the prev_ value if the symbol is not
// currently being traded. The oracle marks the last known
@ -237,7 +250,7 @@ pub fn attest(ctx: &ExecutionContext, accs: &mut Attest, data: AttestData) -> So
// Build an attestatioin struct for this symbol using the just decided current value
let attestation = PriceAttestation::from_pyth_price_struct(
Identifier::new(price.key.to_bytes()),
attestation_time,
this_attestation_time,
current_last_attested_trading_publish_time,
price_struct,
);
@ -245,6 +258,21 @@ pub fn attest(ctx: &ExecutionContext, accs: &mut Attest, data: AttestData) -> So
// Save the new value for the next attestation of this symbol
state.0 .0.last_attested_trading_publish_time = new_last_attested_trading_publish_time;
// don't re-evaluate if at least one symbol was found to be under limit
if over_rate_limit {
// Evaluate rate limit - should be smaller than duration from last attestation
if this_attestation_time - state.0 .0.last_attestation_time
>= data.rate_limit_interval_secs as i64
{
over_rate_limit = false;
} else {
trace!("Price {:?}: over rate limit", price.key);
}
}
// Update last attestation time
state.0 .0.last_attestation_time = this_attestation_time;
// handling of last_attested_trading_publish_time ends here
if !state.0 .0.is_initialized() {
@ -272,6 +300,14 @@ pub fn attest(ctx: &ExecutionContext, accs: &mut Attest, data: AttestData) -> So
attestations.push(attestation);
}
// Do not proceed if none of the symbols is under rate limit
if over_rate_limit {
trace!("All symbols over limit, bailing out");
return Err(
ProgramError::Custom(AttesterCustomError::AttestRateLimitReached as u32).into(),
);
}
let batch_attestation = BatchPriceAttestation {
price_attestations: attestations,
};

View File

@ -25,6 +25,8 @@ use {
pub struct AttestationState {
/// The last trading publish_time this attester saw
pub last_attested_trading_publish_time: UnixTimestamp,
/// The last time this symbol was attested
pub last_attestation_time: UnixTimestamp,
}
impl Owned for AttestationState {

View File

@ -0,0 +1,6 @@
/// Append-only custom error list.
#[repr(u32)]
pub enum AttesterCustomError {
/// Explicitly checked for in client code, change carefully
AttestRateLimitReached = 13,
}

View File

@ -3,6 +3,7 @@
pub mod attest;
pub mod attestation_state;
pub mod config;
pub mod error;
pub mod initialize;
pub mod message;
pub mod migrate;