p2w-client: Implement additional trigger conditions (#207)

* p2w-client: Implement additional trigger conditions

From now on, we support price change by a given percentage as well as
publish time changes for triggering attestations.

commit-id:9bd145e1

* p2w-client: Harden price_pct_change for negative values

commit-id:cb679208

* p2w-client: Make sure we always update each symbol's state

commit-id:27f276dc

* p2w-client: include resend state lookups in the RPC interval setting

commit-id:68de125f

* p2w-client: Improve local symbol state handling, min interval=60s

With this change, we update local state only if we meet a
condition. Additionally, the publish_time change becomes a
configurable minimal delta.

commit-id:f8139cd6

* p2w-client: Fix a state update bug for state updates

commit-id:29f44a39
This commit is contained in:
Stanisław Drozd 2022-05-12 14:39:49 +02:00 committed by GitHub
parent 7b1dbc1938
commit 941017de4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 185 additions and 32 deletions

View File

@ -1977,6 +1977,7 @@ dependencies = [
"log",
"p2w-sdk",
"pyth-client 0.5.0",
"pyth-sdk-solana",
"pyth2wormhole",
"serde",
"serde_yaml",

View File

@ -20,6 +20,7 @@ log = "0.4.14"
wormhole-bridge-solana = {path = "../../bridge/program"}
pyth2wormhole = {path = "../program"}
p2w-sdk = { path = "../../../third_party/pyth/p2w-sdk/rust", features=["solana"] }
pyth-sdk-solana = "0.4.0"
serde = "1"
serde_yaml = "0.8"
shellexpand = "2.1.0"

View File

@ -13,12 +13,12 @@ use serde::{
use solana_program::pubkey::Pubkey;
/// Pyth2wormhole config specific to attestation requests
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
#[derive(Debug, Deserialize, Serialize, PartialEq)]
pub struct AttestationConfig {
pub symbol_groups: Vec<SymbolGroup>,
}
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
#[derive(Debug, Deserialize, Serialize, PartialEq)]
pub struct SymbolGroup {
pub group_name: String,
/// Attestation conditions applied to all symbols in this group
@ -26,10 +26,28 @@ pub struct SymbolGroup {
pub symbols: Vec<P2WSymbol>,
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub const fn DEFAULT_MIN_INTERVAL_SECS() -> u64 {
60
}
/// Spontaneous attestation triggers. Attestation is triggered if any
/// of the active conditions is met. Option<> fields can be
/// de-activated with None. All conditions are inactive by default,
/// except for min_interval_secs set to 1 minute.
#[derive(Clone, Default, Debug, Deserialize, Serialize, PartialEq)]
pub struct AttestationConditions {
/// How often to attest
pub min_freq_secs: u64,
/// Baseline, unconditional attestation interval. Attestation is triggered if the specified interval elapsed since last attestation.
#[serde(default = "DEFAULT_MIN_INTERVAL_SECS")]
pub min_interval_secs: u64,
/// Trigger attestation if price changes by the specified percentage.
#[serde(default)]
pub price_changed_pct: Option<f64>,
/// Trigger attestation if publish_time advances at least the
/// specified amount.
#[serde(default)]
pub publish_time_min_delta_secs: Option<u64>,
}
/// Config entry for a Pyth product + price pair
@ -50,6 +68,14 @@ pub struct P2WSymbol {
pub price_addr: Pubkey,
}
impl ToString for P2WSymbol {
fn to_string(&self) -> String {
self.name
.clone()
.unwrap_or(format!("Unnamed product {}", self.product_addr))
}
}
// Helper methods for strinigified SOL addresses
fn pubkey_string_ser<S>(k: &Pubkey, ser: S) -> Result<S::Ok, S::Error>
@ -78,7 +104,10 @@ mod tests {
fn test_sanity() -> Result<(), ErrBox> {
let fastbois = SymbolGroup {
group_name: "fast bois".to_owned(),
conditions: AttestationConditions { min_freq_secs: 5 },
conditions: AttestationConditions {
min_interval_secs: 5,
..Default::default()
},
symbols: vec![
P2WSymbol {
name: Some("ETHUSD".to_owned()),
@ -93,7 +122,10 @@ mod tests {
let slowbois = SymbolGroup {
group_name: "slow bois".to_owned(),
conditions: AttestationConditions { min_freq_secs: 200 },
conditions: AttestationConditions {
min_interval_secs: 200,
..Default::default()
},
symbols: vec![
P2WSymbol {
name: Some("CNYAUD".to_owned()),

View File

@ -1,6 +1,16 @@
use log::{
debug,
warn,
};
use solana_client::rpc_client::RpcClient;
use solana_sdk::signature::Signature;
use std::time::Instant;
use pyth_sdk_solana::state::PriceAccount;
use std::time::{
Duration,
Instant,
};
use crate::{
AttestationConditions,
@ -12,6 +22,7 @@ use crate::{
pub struct BatchState<'a> {
pub group_name: String,
pub symbols: &'a [P2WSymbol],
pub last_known_symbol_states: Vec<Option<PriceAccount>>,
pub conditions: AttestationConditions,
status: BatchTxStatus,
status_changed_at: Instant,
@ -27,6 +38,7 @@ impl<'a> BatchState<'a> {
group_name,
symbols,
conditions,
last_known_symbol_states: vec![None; symbols.len()],
status: BatchTxStatus::Sending { attempt_no: 1 },
status_changed_at: Instant::now(),
}
@ -38,11 +50,123 @@ impl<'a> BatchState<'a> {
pub fn get_status(&self) -> &BatchTxStatus {
&self.status
}
/// Ensure that status changes are accompanied by a timestamp bump
pub fn set_status(&mut self, s: BatchTxStatus) {
self.status_changed_at = Instant::now();
self.status = s;
}
/// Evaluate the configured attestation conditions for this
/// batch. RPC is used to update last known state. Returns
/// Some("<reason>") if any trigger condition was met. Only the
/// first encountered condition is mentioned.
pub fn should_resend(&mut self, c: &RpcClient) -> Option<String> {
let mut ret = None;
let sym_count = self.symbols.len();
let mut new_symbol_states: Vec<Option<PriceAccount>> = Vec::with_capacity(sym_count);
for (idx, sym) in self.symbols.iter().enumerate() {
let new_state = match c
.get_account_data(&sym.price_addr)
.map_err(|e| e.to_string())
.and_then(|bytes| {
pyth_sdk_solana::state::load_price_account(&bytes)
.map(|state| state.clone())
.map_err(|e| e.to_string())
}) {
Ok(state) => Some(state),
Err(e) => {
warn!(
"Symbol {} ({}/{}): Could not look up state: {}",
sym.name
.as_ref()
.unwrap_or(&format!("Unnamed product {}", sym.product_addr)),
idx + 1,
sym_count,
e.to_string()
);
None
}
};
new_symbol_states.push(new_state);
}
// min interval
if self.get_status_changed_at().elapsed()
> Duration::from_secs(self.conditions.min_interval_secs)
{
ret = Some(format!(
"minimum interval of {}s elapsed since last state change",
self.conditions.min_interval_secs
));
}
for (idx, old_new_tup) in self
.last_known_symbol_states
.iter_mut() // Borrow mutably to make the update easier
.zip(new_symbol_states.iter())
.enumerate()
{
// Only evaluate this symbol if a triggering condition is not already met
if ret.is_none() {
match old_new_tup {
(Some(old), Some(new)) => {
// publish_time_changed
if let Some(min_delta_secs) = self.conditions.publish_time_min_delta_secs {
if new.timestamp - old.timestamp > min_delta_secs as i64 {
ret = Some(format!(
"publish_time advanced by at least {}s for {:?}",
min_delta_secs,
self.symbols[idx].to_string(),
))
}
// price_changed_pct
} else if let Some(pct) = self.conditions.price_changed_pct {
let pct = pct.abs();
let price_pct_diff = ((old.agg.price as f64 - new.agg.price as f64)
/ old.agg.price as f64
* 100.0)
.abs();
if price_pct_diff > pct {
ret = Some(format!(
"price moved by at least {}% for {:?}",
pct,
self.symbols[idx].to_string()
))
}
}
}
_ => {
debug!(
"Symbol {:?} {}/{}, old or new state value is None, skipping...",
self.symbols[idx].to_string(),
idx + 1,
sym_count
);
}
}
}
}
// Update with newer state if a condition was met
if ret.is_some() {
for (old, new) in self
.last_known_symbol_states
.iter_mut()
.zip(new_symbol_states.into_iter())
{
if new.is_some() {
*old = new;
}
}
}
return ret;
}
}
#[derive(Debug)]

View File

@ -14,8 +14,8 @@ use log::{
debug,
error,
info,
warn,
trace,
warn,
LevelFilter,
};
use solana_client::rpc_client::RpcClient;
@ -161,19 +161,13 @@ fn handle_attest(
g.symbols
.as_slice()
.chunks(config.max_batch_size as usize)
.enumerate()
.map(move |(idx, symbols)| {
(
idx + 1,
BatchState::new(
name4closure.clone(),
symbols,
conditions4closure.clone(),
),
)
.map(move |symbols| {
BatchState::new(name4closure.clone(), symbols, conditions4closure.clone())
})
})
.flatten()
.enumerate()
.map(|(idx, batch_state)| (idx + 1, batch_state))
.collect();
let batch_count = batches.len();
@ -347,29 +341,30 @@ fn handle_attest(
Success { .. } | FailedSend { .. } | FailedConfirm { .. } => {
// We only try to re-schedule under --daemon
if daemon {
if state.get_status_changed_at().elapsed()
> Duration::from_secs(state.conditions.min_freq_secs)
{
if let Some(reason) = state.should_resend(rpc_client) {
info!(
"Batch {}/{} (group {:?}): resending (reason: {})",
batch_no, batch_count, state.group_name, reason,
);
state.set_status(Sending { attempt_no: 1 });
} else {
let elapsed = state.get_status_changed_at().elapsed();
trace!(
"Batch {}/{} (group {:?}): waiting ({}.{}/{}.{})",
"Batch {}/{} (group {:?}): waiting ({}.{}s elapsed)",
batch_no,
batch_count,
state.group_name,
elapsed.as_secs(),
elapsed.subsec_millis(),
conf_timeout.as_secs(),
conf_timeout.subsec_millis()
)
}
} else {
// Track the finished batches outside daemon mode
finished_count += 1;
// No RPC requests are made on terminal states outside daemon mode, skip sleep
continue;
}
// Track the finished batches
finished_count += 1;
continue; // No RPC requests are made any of these cases, skip sleep
}
}

View File

@ -186,7 +186,7 @@ if P2W_ATTESTATION_CFG is None:
symbol_groups:
- group_name: things
conditions:
min_freq_secs: 17
min_interval_secs: 17
symbols:
"""
@ -208,7 +208,7 @@ symbol_groups:
cfg_yaml += f"""
- group_name: stuff
conditions:
min_freq_secs: 19
min_interval_secs: 19
symbols:
"""