This commit is contained in:
Serge Farny 2024-11-01 18:18:42 +09:00 committed by GitHub
commit bff5a24772
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 138 additions and 8 deletions

View File

@ -16,7 +16,13 @@ pub struct EdgeState {
/// List of (input, price, ln-price) pairs, sorted by input asc
// TODO: it may be much better to store this centrally, so it's cheap to take a snapshot
pub cached_prices: Vec<(u64, f64, f64)>,
/// Will be invalid if we fail to compute a quote
is_valid: bool,
/// Will be dirty if we got some account update for this edge, but didn't recompute the price cache yet
is_dirty: bool,
pub last_update: u64,
pub last_update_slot: u64,
@ -153,7 +159,7 @@ impl Edge {
})
.collect_vec();
debug!(input_mint = %self.input_mint, pool = %self.key(), multiplier = multiplier, price = price, amounts = amounts.iter().join(";"), "price_data");
trace!(input_mint = %self.input_mint, pool = %self.key(), multiplier = multiplier, price = price, amounts = amounts.iter().join(";"), "price_data");
let overflow = amounts.iter().any(|x| *x == u64::MAX);
if overflow {
@ -166,6 +172,7 @@ impl Edge {
state.last_update_slot = chain_data.newest_processed_slot();
state.cached_prices.clear();
state.is_valid = false;
state.is_dirty = false;
return;
}
@ -196,6 +203,7 @@ impl Edge {
state.last_update_slot = chain_data.newest_processed_slot();
state.cached_prices.clear();
state.is_valid = true;
state.is_dirty = false;
if let Some(timestamp) = state.cooldown_until {
if timestamp < state.last_update {
@ -229,6 +237,30 @@ impl Edge {
}
}
pub fn mark_as_dirty(&self) {
let mut state = self.state.write().unwrap();
state.is_dirty = true;
}
pub fn update_if_needed(
&self,
chain_data: &AccountProviderView,
token_cache: &TokenCache,
price_cache: &PriceCache,
path_warming_amounts: &Vec<u64>,
) {
if !self.state.read().unwrap().is_dirty {
return;
}
debug!(
"Lazily updating {}->{}",
debug_tools::name(&self.input_mint),
debug_tools::name(&self.output_mint)
);
self.update(chain_data, token_cache, price_cache, path_warming_amounts)
}
pub fn update(
&self,
chain_data: &AccountProviderView,
@ -259,7 +291,7 @@ impl EdgeState {
/// Returns the price (in native/native) and ln(price) most applicable for the in amount
/// Returns None if invalid
pub fn cached_price_for(&self, in_amount: u64) -> Option<(f64, f64)> {
if !self.is_valid() || self.cached_prices.is_empty() {
if !self.is_valid() || self.cached_prices.is_empty() || self.is_dirty {
return None;
}
@ -272,7 +304,7 @@ impl EdgeState {
}
pub fn cached_price_exact_out_for(&self, out_amount: u64) -> Option<(f64, f64)> {
if !self.is_valid_out() {
if !self.is_valid_out() || self.cached_prices.is_empty() || self.is_dirty {
return None;
}

View File

@ -1,4 +1,5 @@
use crate::edge::Edge;
use crate::hot_mints::{HotMintUpdate, HotMintsCache};
use crate::metrics;
use crate::token_cache::TokenCache;
use crate::util::tokio_spawn;
@ -11,7 +12,7 @@ use router_lib::price_feeds::price_cache::PriceCache;
use router_lib::price_feeds::price_feed::PriceUpdate;
use solana_program::pubkey::Pubkey;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
@ -76,6 +77,8 @@ pub fn spawn_updater_job(
token_cache: TokenCache,
price_cache: PriceCache,
path_warming_amounts: Vec<u64>,
hot_mints: Arc<RwLock<HotMintsCache>>,
mut hot_mints_receiver: broadcast::Receiver<HotMintUpdate>,
register_mint_sender: async_channel::Sender<Pubkey>,
ready_sender: async_channel::Sender<()>,
mut slot_updates: broadcast::Receiver<u64>,
@ -184,6 +187,19 @@ pub fn spawn_updater_job(
},
Ok(price_upd) = price_updates.recv() => {
if let Some(impacted_edges) = updater.state.edges_per_mint.get(&price_upd.mint) {
let reader = hot_mints.read().unwrap();
for edge in impacted_edges {
if reader.is_hot(&edge.input_mint) {
updater.state.dirty_edges.insert(edge.unique_id(), edge.clone());
}
else {
edge.mark_as_dirty();
}
}
};
},
Ok(hot_mint_upd) = hot_mints_receiver.recv() => {
if let Some(impacted_edges) = updater.state.edges_per_mint.get(&hot_mint_upd.mint) {
for edge in impacted_edges {
updater.state.dirty_edges.insert(edge.unique_id(), edge.clone());
}

View File

@ -1,8 +1,10 @@
use crate::debug_tools;
use router_config_lib::HotMintsConfig;
use serde_derive::{Deserialize, Serialize};
use solana_program::pubkey::Pubkey;
use std::collections::{HashSet, VecDeque};
use std::str::FromStr;
use tokio::sync::broadcast;
use tracing::info;
pub struct HotMintsCache {
@ -10,9 +12,41 @@ pub struct HotMintsCache {
always_hot: HashSet<Pubkey>,
latest_unordered: HashSet<Pubkey>,
latest_ordered: VecDeque<Pubkey>,
sender: Option<broadcast::Sender<HotMintUpdate>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HotMintUpdate {
pub mint: Pubkey,
}
impl HotMintsCache {
pub fn new_with_watcher(
config: &Option<HotMintsConfig>,
sender: broadcast::Sender<HotMintUpdate>,
) -> Self {
let config = config.clone().unwrap_or(HotMintsConfig {
always_hot_mints: vec![
"So11111111111111111111111111111111111111112".to_string(),
"EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v".to_string(),
"Es9vMFrzaCERmJfrF4H2FYD4KCoNkY11McCe8BenwNYB".to_string(),
],
keep_latest_count: 100,
});
HotMintsCache {
max_count: config.keep_latest_count,
always_hot: config
.always_hot_mints
.iter()
.map(|x| Pubkey::from_str(x).unwrap())
.collect(),
latest_unordered: Default::default(),
latest_ordered: Default::default(),
sender: Some(sender),
}
}
pub fn new(config: &Option<HotMintsConfig>) -> Self {
let config = config.clone().unwrap_or(HotMintsConfig {
always_hot_mints: vec![
@ -32,6 +66,7 @@ impl HotMintsCache {
.collect(),
latest_unordered: Default::default(),
latest_ordered: Default::default(),
sender: None,
}
}
@ -54,6 +89,12 @@ impl HotMintsCache {
}
if self.latest_unordered.insert(pubkey) {
if let Some(sender) = &self.sender {
if sender.receiver_count() > 0 {
sender.send(HotMintUpdate { mint: pubkey }).unwrap();
}
}
info!("Adding {} to hot mints", debug_tools::name(&pubkey));
}
self.latest_ordered.push_front(pubkey);
@ -66,6 +107,10 @@ impl HotMintsCache {
.copied()
.collect()
}
pub fn is_hot(&self, mint: &Pubkey) -> bool {
self.latest_unordered.contains(mint) || self.always_hot.contains(mint)
}
}
#[cfg(test)]

View File

@ -1,5 +1,5 @@
use crate::edge_updater::{spawn_updater_job, Dex};
use crate::hot_mints::HotMintsCache;
use crate::hot_mints::{HotMintUpdate, HotMintsCache};
use crate::ix_builder::{SwapInstructionsBuilderImpl, SwapStepInstructionBuilderImpl};
use crate::liquidity::{spawn_liquidity_updater_job, LiquidityProvider};
use crate::path_warmer::spawn_path_warmer_job;
@ -99,7 +99,11 @@ async fn main() -> anyhow::Result<()> {
let config = Config::load(&args[1])?;
let router_version = RouterVersion::OverestimateAmount;
let hot_mints = Arc::new(RwLock::new(HotMintsCache::new(&config.hot_mints)));
let (hot_mint_sender, hot_mint_receiver) = broadcast::channel::<HotMintUpdate>(20);
let hot_mints = Arc::new(RwLock::new(HotMintsCache::new_with_watcher(
&config.hot_mints,
hot_mint_sender.clone(),
)));
let mango_data = match mango::mango_fetcher::fetch_mango_data().await {
Err(e) => {
@ -348,6 +352,8 @@ async fn main() -> anyhow::Result<()> {
token_cache.clone(),
price_cache.clone(),
path_warming_amounts.clone(),
hot_mints.clone(),
hot_mint_sender.subscribe(),
price_feed.register_mint_sender(),
ready_channels[i].0.clone(),
rpc_slot_sender.subscribe(),

View File

@ -2,19 +2,20 @@ use crate::debug_tools;
use crate::metrics;
use crate::prelude::*;
use crate::routing_objectpool::RoutingObjectPools;
use crate::routing_types::*;
use crate::token_cache::TokenCache;
use mango_feeds_connector::chain_data::AccountData;
use ordered_float::NotNan;
use router_config_lib::Config;
use router_lib::dex::SwapMode;
use router_lib::dex::{AccountProviderView, DexEdge};
use router_lib::price_feeds::price_cache::PriceCache;
use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::time::{Duration, Instant};
use std::u64;
use thiserror::Error;
use tracing::Level;
use crate::routing_types::*;
#[derive(Error, Debug)]
pub enum RoutingError {
#[error("unsupported input mint {0:?}")]
@ -1889,6 +1890,26 @@ impl Routing {
" - available edge"
);
}
pub fn lazy_compute_prices(
&self,
chain_data: &AccountProviderView,
token_cache: &TokenCache,
price_cache: &PriceCache,
input_mint: &Pubkey,
output_mint: &Pubkey,
) {
for edge in &self.edges {
if edge.input_mint.eq(input_mint) || edge.output_mint.eq(output_mint) {
edge.update_if_needed(
chain_data,
token_cache,
price_cache,
&self.path_warming_amounts,
)
}
}
}
}
#[cfg(test)]

View File

@ -91,6 +91,15 @@ impl RouteProvider for RoutingRouteProvider {
hot_mints_guard.get()
};
// ensure new hot mints are ready (edge cached_price should be available)
self.routing.lazy_compute_prices(
&self.chain_data,
&self.tokens,
&self.prices,
&from_mint,
&to_mint,
);
let route = self.routing.find_best_route(
&self.chain_data,
&from_mint,

View File

@ -109,6 +109,7 @@ min_quote_out_to_in_amount_ratio = 0.65
always_hot_mints = [
"So11111111111111111111111111111111111111112", # SOL
"EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", # USDC
"Es9vMFrzaCERmJfrF4H2FYD4KCoNkY11McCe8BenwNYB", # USDT
]
keep_latest_count = 50