pyth2wormhole-client:should_resend(): optimize interval-only batches (#338)
* pyth2wormhole-client:should_resend(): optimize interval-only batches Most attestation conditions in prod are interval-only. This change ensures that interval-only batches don't waste time on the on-chain state lookup request. * pyth2wormhole: Fix build error, redundant new_symbol_states decl * pyth2wormhole-client: is_onchain -> needs_onchain_lookup * pyth2wormhole-client: typo
This commit is contained in:
parent
124589da09
commit
6407eaa244
|
@ -144,6 +144,22 @@ pub struct AttestationConditions {
|
|||
pub publish_time_min_delta_secs: Option<u64>,
|
||||
}
|
||||
|
||||
impl AttestationConditions {
|
||||
/// Used by should_resend() to check if it needs to make the expensive RPC request
|
||||
pub fn need_onchain_lookup(&self) -> bool {
|
||||
// Bug trap for new fields that also need to be included in
|
||||
// the returned expression
|
||||
let AttestationConditions {
|
||||
min_interval_secs: _min_interval_secs,
|
||||
max_batch_jobs: _max_batch_jobs,
|
||||
price_changed_pct,
|
||||
publish_time_min_delta_secs,
|
||||
} = self;
|
||||
|
||||
price_changed_pct.is_some() || publish_time_min_delta_secs.is_some()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for AttestationConditions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
|
|
|
@ -57,35 +57,6 @@ impl<'a> BatchState<'a> {
|
|||
let sym_count = self.symbols.len();
|
||||
let pubkeys: Vec<_> = self.symbols.iter().map(|s| s.price_addr).collect();
|
||||
|
||||
// Always learn the current on-chain state for each symbol, use None values if lookup fails
|
||||
let mut new_symbol_states: Vec<Option<PriceAccount>> = match c
|
||||
.get_multiple_accounts(&pubkeys)
|
||||
.await
|
||||
{
|
||||
Ok(acc_opts) => {
|
||||
acc_opts
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(idx, opt)| {
|
||||
// Take each Some(acc), make it None and log on load_price_account() error
|
||||
opt.and_then(|acc| {
|
||||
pyth_sdk_solana::state::load_price_account(&acc.data)
|
||||
.cloned() // load_price_account() transmutes the data reference into another reference, and owning acc_opts is not enough
|
||||
.map_err(|e| {
|
||||
warn!("Could not parse symbol {}/{}: {}", idx, sym_count, e);
|
||||
e
|
||||
})
|
||||
.ok() // Err becomes None
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Could not look up any symbols on-chain: {}", e);
|
||||
vec![None; sym_count]
|
||||
}
|
||||
};
|
||||
|
||||
// min interval
|
||||
if self.last_job_finished_at.elapsed()
|
||||
> Duration::from_secs(self.conditions.min_interval_secs)
|
||||
|
@ -96,14 +67,47 @@ impl<'a> BatchState<'a> {
|
|||
));
|
||||
}
|
||||
|
||||
for (idx, old_new_tup) in self
|
||||
.last_known_symbol_states
|
||||
.iter_mut() // Borrow mutably to make the update easier
|
||||
.zip(new_symbol_states.iter())
|
||||
.enumerate()
|
||||
{
|
||||
// Only evaluate this symbol if a triggering condition is not already met
|
||||
if ret.is_none() {
|
||||
// Only lookup and compare symbols if the conditions require
|
||||
if self.conditions.need_onchain_lookup() {
|
||||
let mut new_symbol_states: Vec<Option<PriceAccount>> =
|
||||
match c.get_multiple_accounts(&pubkeys).await {
|
||||
Ok(acc_opts) => {
|
||||
acc_opts
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(idx, opt)| {
|
||||
// Take each Some(acc), make it None and log on load_price_account() error
|
||||
opt.and_then(|acc| {
|
||||
pyth_sdk_solana::state::load_price_account(&acc.data)
|
||||
.cloned() // load_price_account() transmutes the data reference into another reference, and owning acc_opts is not enough
|
||||
.map_err(|e| {
|
||||
warn!(
|
||||
"Could not parse symbol {}/{}: {}",
|
||||
idx, sym_count, e
|
||||
);
|
||||
e
|
||||
})
|
||||
.ok() // Err becomes None
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Could not look up any symbols on-chain: {}", e);
|
||||
vec![None; sym_count]
|
||||
}
|
||||
};
|
||||
|
||||
for (idx, old_new_tup) in self
|
||||
.last_known_symbol_states
|
||||
.iter_mut() // Borrow mutably to make the update easier
|
||||
.zip(new_symbol_states.iter())
|
||||
.enumerate()
|
||||
{
|
||||
// Only evaluate this symbol if a triggering condition is not already met
|
||||
if ret.is_some() {
|
||||
break;
|
||||
}
|
||||
match old_new_tup {
|
||||
(Some(old), Some(new)) => {
|
||||
// publish_time_changed
|
||||
|
@ -143,19 +147,18 @@ impl<'a> BatchState<'a> {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update with newer state only if a condition was met. We
|
||||
// don't want to shadow changes that may happen over a larger
|
||||
// period between state lookups.
|
||||
if ret.is_some() {
|
||||
for (old, new) in self
|
||||
.last_known_symbol_states
|
||||
.iter_mut()
|
||||
.zip(new_symbol_states.into_iter())
|
||||
{
|
||||
if new.is_some() {
|
||||
*old = new;
|
||||
// Update with newer state only if a condition was met. We
|
||||
// don't want to shadow changes that may happen over a larger
|
||||
// period between state lookups.
|
||||
if ret.is_some() {
|
||||
for (old, new) in self
|
||||
.last_known_symbol_states
|
||||
.iter_mut()
|
||||
.zip(new_symbol_states.into_iter())
|
||||
{
|
||||
if new.is_some() {
|
||||
*old = new;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue