From 78d59bc5034f957af2fa630a79d42b01a2f4ece1 Mon Sep 17 00:00:00 2001 From: Serge Farny Date: Wed, 2 Oct 2024 15:17:07 +0200 Subject: [PATCH 1/5] Router: lazy compute prices for cold mints --- bin/autobahn-router/src/edge.rs | 29 ++++++++++-- bin/autobahn-router/src/edge_updater.rs | 18 +++++++- bin/autobahn-router/src/hot_mints.rs | 45 +++++++++++++++++++ bin/autobahn-router/src/main.rs | 10 ++++- bin/autobahn-router/src/routing.rs | 24 +++++++++- .../src/server/route_provider.rs | 6 +++ 6 files changed, 124 insertions(+), 8 deletions(-) diff --git a/bin/autobahn-router/src/edge.rs b/bin/autobahn-router/src/edge.rs index 1a67157..8631c7f 100644 --- a/bin/autobahn-router/src/edge.rs +++ b/bin/autobahn-router/src/edge.rs @@ -17,6 +17,7 @@ pub struct EdgeState { // TODO: it may be much better to store this centrally, so it's cheap to take a snapshot pub cached_prices: Vec<(u64, f64, f64)>, is_valid: bool, + is_dirty: bool, pub last_update: u64, pub last_update_slot: u64, @@ -153,7 +154,7 @@ impl Edge { }) .collect_vec(); - debug!(input_mint = %self.input_mint, pool = %self.key(), multiplier = multiplier, price = price, amounts = amounts.iter().join(";"), "price_data"); + trace!(input_mint = %self.input_mint, pool = %self.key(), multiplier = multiplier, price = price, amounts = amounts.iter().join(";"), "price_data"); let overflow = amounts.iter().any(|x| *x == u64::MAX); if overflow { @@ -166,6 +167,7 @@ impl Edge { state.last_update_slot = chain_data.newest_processed_slot(); state.cached_prices.clear(); state.is_valid = false; + state.is_dirty = false; return; } @@ -196,6 +198,7 @@ impl Edge { state.last_update_slot = chain_data.newest_processed_slot(); state.cached_prices.clear(); state.is_valid = true; + state.is_dirty = false; if let Some(timestamp) = state.cooldown_until { if timestamp < state.last_update { @@ -229,6 +232,26 @@ impl Edge { } } + pub fn mark_as_dirty(&self) { + let mut state = self.state.write().unwrap(); + state.is_dirty = true; + } + + pub fn update_if_needed( + &self, + chain_data: &AccountProviderView, + token_cache: &TokenCache, + price_cache: &PriceCache, + path_warming_amounts: &Vec, + ) { + if !self.state.read().unwrap().is_dirty { + return; + } + + debug!("Lazily updating {}->{}", debug_tools::name(&self.input_mint), debug_tools::name(&self.output_mint)); + self.update(chain_data, token_cache, price_cache, path_warming_amounts) + } + pub fn update( &self, chain_data: &AccountProviderView, @@ -259,7 +282,7 @@ impl EdgeState { /// Returns the price (in native/native) and ln(price) most applicable for the in amount /// Returns None if invalid pub fn cached_price_for(&self, in_amount: u64) -> Option<(f64, f64)> { - if !self.is_valid() || self.cached_prices.is_empty() { + if !self.is_valid() || self.cached_prices.is_empty() || self.is_dirty { return None; } @@ -272,7 +295,7 @@ impl EdgeState { } pub fn cached_price_exact_out_for(&self, out_amount: u64) -> Option<(f64, f64)> { - if !self.is_valid_out() { + if !self.is_valid_out() || self.cached_prices.is_empty() || self.is_dirty { return None; } diff --git a/bin/autobahn-router/src/edge_updater.rs b/bin/autobahn-router/src/edge_updater.rs index f27e9d9..cb32f11 100644 --- a/bin/autobahn-router/src/edge_updater.rs +++ b/bin/autobahn-router/src/edge_updater.rs @@ -1,4 +1,5 @@ use crate::edge::Edge; +use crate::hot_mints::{HotMintUpdate, HotMintsCache}; use crate::metrics; use crate::token_cache::TokenCache; use crate::util::tokio_spawn; @@ -11,7 +12,7 @@ use router_lib::price_feeds::price_cache::PriceCache; use router_lib::price_feeds::price_feed::PriceUpdate; use solana_program::pubkey::Pubkey; use std::collections::{HashMap, HashSet}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; @@ -76,6 +77,8 @@ pub fn spawn_updater_job( token_cache: TokenCache, price_cache: PriceCache, path_warming_amounts: Vec, + hot_mints: Arc>, + mut hot_mints_receiver: broadcast::Receiver, register_mint_sender: async_channel::Sender, ready_sender: async_channel::Sender<()>, mut slot_updates: broadcast::Receiver, @@ -184,6 +187,19 @@ pub fn spawn_updater_job( }, Ok(price_upd) = price_updates.recv() => { if let Some(impacted_edges) = updater.state.edges_per_mint.get(&price_upd.mint) { + let reader = hot_mints.read().unwrap(); + for edge in impacted_edges { + if reader.is_hot(&edge.input_mint) { + updater.state.dirty_edges.insert(edge.unique_id(), edge.clone()); + } + else { + edge.mark_as_dirty(); + } + } + }; + }, + Ok(hot_mint_upd) = hot_mints_receiver.recv() => { + if let Some(impacted_edges) = updater.state.edges_per_mint.get(&hot_mint_upd.mint) { for edge in impacted_edges { updater.state.dirty_edges.insert(edge.unique_id(), edge.clone()); } diff --git a/bin/autobahn-router/src/hot_mints.rs b/bin/autobahn-router/src/hot_mints.rs index d6edc6a..9400042 100644 --- a/bin/autobahn-router/src/hot_mints.rs +++ b/bin/autobahn-router/src/hot_mints.rs @@ -1,8 +1,10 @@ use crate::debug_tools; use router_config_lib::HotMintsConfig; +use serde_derive::{Deserialize, Serialize}; use solana_program::pubkey::Pubkey; use std::collections::{HashSet, VecDeque}; use std::str::FromStr; +use tokio::sync::broadcast; use tracing::info; pub struct HotMintsCache { @@ -10,9 +12,41 @@ pub struct HotMintsCache { always_hot: HashSet, latest_unordered: HashSet, latest_ordered: VecDeque, + sender: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HotMintUpdate { + pub mint: Pubkey, } impl HotMintsCache { + pub fn new_with_watcher( + config: &Option, + sender: broadcast::Sender, + ) -> Self { + let config = config.clone().unwrap_or(HotMintsConfig { + always_hot_mints: vec![ + "So11111111111111111111111111111111111111112".to_string(), + "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v".to_string(), + "Es9vMFrzaCERmJfrF4H2FYD4KCoNkY11McCe8BenwNYB".to_string(), + ], + keep_latest_count: 100, + }); + + HotMintsCache { + max_count: config.keep_latest_count, + always_hot: config + .always_hot_mints + .iter() + .map(|x| Pubkey::from_str(x).unwrap()) + .collect(), + latest_unordered: Default::default(), + latest_ordered: Default::default(), + sender: Some(sender), + } + } + pub fn new(config: &Option) -> Self { let config = config.clone().unwrap_or(HotMintsConfig { always_hot_mints: vec![ @@ -32,6 +66,7 @@ impl HotMintsCache { .collect(), latest_unordered: Default::default(), latest_ordered: Default::default(), + sender: None, } } @@ -54,6 +89,12 @@ impl HotMintsCache { } if self.latest_unordered.insert(pubkey) { + if let Some(sender) = &self.sender { + if sender.receiver_count() > 0 { + sender.send(HotMintUpdate { mint: pubkey }).unwrap(); + } + } + info!("Adding {} to hot mints", debug_tools::name(&pubkey)); } self.latest_ordered.push_front(pubkey); @@ -66,6 +107,10 @@ impl HotMintsCache { .copied() .collect() } + + pub fn is_hot(&self, mint: &Pubkey) -> bool { + self.latest_unordered.contains(mint) || self.always_hot.contains(mint) + } } #[cfg(test)] diff --git a/bin/autobahn-router/src/main.rs b/bin/autobahn-router/src/main.rs index 9fe7037..83389db 100644 --- a/bin/autobahn-router/src/main.rs +++ b/bin/autobahn-router/src/main.rs @@ -1,5 +1,5 @@ use crate::edge_updater::{spawn_updater_job, Dex}; -use crate::hot_mints::HotMintsCache; +use crate::hot_mints::{HotMintUpdate, HotMintsCache}; use crate::ix_builder::{SwapInstructionsBuilderImpl, SwapStepInstructionBuilderImpl}; use crate::liquidity::{spawn_liquidity_updater_job, LiquidityProvider}; use crate::path_warmer::spawn_path_warmer_job; @@ -99,7 +99,11 @@ async fn main() -> anyhow::Result<()> { let config = Config::load(&args[1])?; let router_version = RouterVersion::OverestimateAmount; - let hot_mints = Arc::new(RwLock::new(HotMintsCache::new(&config.hot_mints))); + let (hot_mint_sender, hot_mint_receiver) = broadcast::channel::(20); + let hot_mints = Arc::new(RwLock::new(HotMintsCache::new_with_watcher( + &config.hot_mints, + hot_mint_sender.clone(), + ))); let mango_data = match mango::mango_fetcher::fetch_mango_data().await { Err(e) => { @@ -348,6 +352,8 @@ async fn main() -> anyhow::Result<()> { token_cache.clone(), price_cache.clone(), path_warming_amounts.clone(), + hot_mints.clone(), + hot_mint_sender.subscribe(), price_feed.register_mint_sender(), ready_channels[i].0.clone(), rpc_slot_sender.subscribe(), diff --git a/bin/autobahn-router/src/routing.rs b/bin/autobahn-router/src/routing.rs index f7ed222..74d65cb 100644 --- a/bin/autobahn-router/src/routing.rs +++ b/bin/autobahn-router/src/routing.rs @@ -2,19 +2,20 @@ use crate::debug_tools; use crate::metrics; use crate::prelude::*; use crate::routing_objectpool::RoutingObjectPools; +use crate::routing_types::*; +use crate::token_cache::TokenCache; use mango_feeds_connector::chain_data::AccountData; use ordered_float::NotNan; use router_config_lib::Config; use router_lib::dex::SwapMode; use router_lib::dex::{AccountProviderView, DexEdge}; +use router_lib::price_feeds::price_cache::PriceCache; use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::time::{Duration, Instant}; use std::u64; use thiserror::Error; use tracing::Level; -use crate::routing_types::*; - #[derive(Error, Debug)] pub enum RoutingError { #[error("unsupported input mint {0:?}")] @@ -1889,6 +1890,25 @@ impl Routing { " - available edge" ); } + + pub fn lazy_compute_prices( + &self, + chain_data: &AccountProviderView, + token_cache: &TokenCache, + price_cache: &PriceCache, + mint: &Pubkey, + ) { + for edge in &self.edges { + if edge.input_mint.eq(mint) { + edge.update_if_needed( + chain_data, + token_cache, + price_cache, + &self.path_warming_amounts, + ) + } + } + } } #[cfg(test)] diff --git a/bin/autobahn-router/src/server/route_provider.rs b/bin/autobahn-router/src/server/route_provider.rs index b331304..8a4f948 100644 --- a/bin/autobahn-router/src/server/route_provider.rs +++ b/bin/autobahn-router/src/server/route_provider.rs @@ -91,6 +91,12 @@ impl RouteProvider for RoutingRouteProvider { hot_mints_guard.get() }; + // ensure new hot mints are ready (edge cached_price should be available) + self.routing + .lazy_compute_prices(&self.chain_data, &self.tokens, &self.prices, &from_mint); + self.routing + .lazy_compute_prices(&self.chain_data, &self.tokens, &self.prices, &to_mint); + let route = self.routing.find_best_route( &self.chain_data, &from_mint, From 6ee0ae7ee189d08a151aefcc2490470820ed8d55 Mon Sep 17 00:00:00 2001 From: Serge Farny Date: Mon, 7 Oct 2024 14:21:46 +0200 Subject: [PATCH 2/5] Router: add USDT to always hot mints --- bin/autobahn-router/template-config.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/bin/autobahn-router/template-config.toml b/bin/autobahn-router/template-config.toml index 40fdd0a..87187f5 100644 --- a/bin/autobahn-router/template-config.toml +++ b/bin/autobahn-router/template-config.toml @@ -109,6 +109,7 @@ min_quote_out_to_in_amount_ratio = 0.65 always_hot_mints = [ "So11111111111111111111111111111111111111112", # SOL "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", # USDC + "Es9vMFrzaCERmJfrF4H2FYD4KCoNkY11McCe8BenwNYB", # USDT ] keep_latest_count = 50 From 1260eeb88c7a4055371d42aac4fa91d067266481 Mon Sep 17 00:00:00 2001 From: Serge Farny Date: Tue, 8 Oct 2024 11:43:06 +0200 Subject: [PATCH 3/5] Router: lazy update prices based on quote request direction --- bin/autobahn-router/src/routing.rs | 5 +++-- bin/autobahn-router/src/server/route_provider.rs | 4 +--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/bin/autobahn-router/src/routing.rs b/bin/autobahn-router/src/routing.rs index 74d65cb..5985df0 100644 --- a/bin/autobahn-router/src/routing.rs +++ b/bin/autobahn-router/src/routing.rs @@ -1896,10 +1896,11 @@ impl Routing { chain_data: &AccountProviderView, token_cache: &TokenCache, price_cache: &PriceCache, - mint: &Pubkey, + input_mint: &Pubkey, + output_mint: &Pubkey, ) { for edge in &self.edges { - if edge.input_mint.eq(mint) { + if edge.input_mint.eq(input_mint) || edge.output_mint.eq(output_mint) { edge.update_if_needed( chain_data, token_cache, diff --git a/bin/autobahn-router/src/server/route_provider.rs b/bin/autobahn-router/src/server/route_provider.rs index 8a4f948..7a29cc5 100644 --- a/bin/autobahn-router/src/server/route_provider.rs +++ b/bin/autobahn-router/src/server/route_provider.rs @@ -93,9 +93,7 @@ impl RouteProvider for RoutingRouteProvider { // ensure new hot mints are ready (edge cached_price should be available) self.routing - .lazy_compute_prices(&self.chain_data, &self.tokens, &self.prices, &from_mint); - self.routing - .lazy_compute_prices(&self.chain_data, &self.tokens, &self.prices, &to_mint); + .lazy_compute_prices(&self.chain_data, &self.tokens, &self.prices, &from_mint, &to_mint); let route = self.routing.find_best_route( &self.chain_data, From de74c464238b42fb5ea17a74b9d01fb4723d8406 Mon Sep 17 00:00:00 2001 From: Serge Farny Date: Tue, 8 Oct 2024 15:04:52 +0200 Subject: [PATCH 4/5] Router: add comment --- bin/autobahn-router/src/edge.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/bin/autobahn-router/src/edge.rs b/bin/autobahn-router/src/edge.rs index 8631c7f..6fd0f7a 100644 --- a/bin/autobahn-router/src/edge.rs +++ b/bin/autobahn-router/src/edge.rs @@ -16,8 +16,13 @@ pub struct EdgeState { /// List of (input, price, ln-price) pairs, sorted by input asc // TODO: it may be much better to store this centrally, so it's cheap to take a snapshot pub cached_prices: Vec<(u64, f64, f64)>, + + /// Will be invalid if we fail to compute a quote is_valid: bool, + + /// Will be dirty if we got some account update for this edge, but didn't recompute the price cache yet is_dirty: bool, + pub last_update: u64, pub last_update_slot: u64, From c6722bb68ef67671586ef0fff1960554b8a72fd1 Mon Sep 17 00:00:00 2001 From: Serge Farny Date: Tue, 8 Oct 2024 15:05:12 +0200 Subject: [PATCH 5/5] Router: formatting --- bin/autobahn-router/src/edge.rs | 6 +++++- bin/autobahn-router/src/server/route_provider.rs | 9 +++++++-- lib/router-lib/src/lib.rs | 4 ++-- programs/autobahn-executor/src/lib.rs | 1 - 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/bin/autobahn-router/src/edge.rs b/bin/autobahn-router/src/edge.rs index 6fd0f7a..03022ac 100644 --- a/bin/autobahn-router/src/edge.rs +++ b/bin/autobahn-router/src/edge.rs @@ -253,7 +253,11 @@ impl Edge { return; } - debug!("Lazily updating {}->{}", debug_tools::name(&self.input_mint), debug_tools::name(&self.output_mint)); + debug!( + "Lazily updating {}->{}", + debug_tools::name(&self.input_mint), + debug_tools::name(&self.output_mint) + ); self.update(chain_data, token_cache, price_cache, path_warming_amounts) } diff --git a/bin/autobahn-router/src/server/route_provider.rs b/bin/autobahn-router/src/server/route_provider.rs index 7a29cc5..1137db8 100644 --- a/bin/autobahn-router/src/server/route_provider.rs +++ b/bin/autobahn-router/src/server/route_provider.rs @@ -92,8 +92,13 @@ impl RouteProvider for RoutingRouteProvider { }; // ensure new hot mints are ready (edge cached_price should be available) - self.routing - .lazy_compute_prices(&self.chain_data, &self.tokens, &self.prices, &from_mint, &to_mint); + self.routing.lazy_compute_prices( + &self.chain_data, + &self.tokens, + &self.prices, + &from_mint, + &to_mint, + ); let route = self.routing.find_best_route( &self.chain_data, diff --git a/lib/router-lib/src/lib.rs b/lib/router-lib/src/lib.rs index 03cc776..499a176 100644 --- a/lib/router-lib/src/lib.rs +++ b/lib/router-lib/src/lib.rs @@ -9,6 +9,6 @@ pub mod test_tools; pub mod utils; pub mod autobahn_executor { - use solana_sdk::declare_id; - declare_id!("AutobNFLMzX1rFCDgwWpwr3ztG5c1oDbSrGq7Jj2LgE"); + use solana_sdk::declare_id; + declare_id!("AutobNFLMzX1rFCDgwWpwr3ztG5c1oDbSrGq7Jj2LgE"); } diff --git a/programs/autobahn-executor/src/lib.rs b/programs/autobahn-executor/src/lib.rs index 50dd562..367868a 100644 --- a/programs/autobahn-executor/src/lib.rs +++ b/programs/autobahn-executor/src/lib.rs @@ -13,7 +13,6 @@ use solana_program::program_error::ProgramError; use solana_program::program_pack::Pack; use solana_program::{account_info::AccountInfo, pubkey::Pubkey}; - #[cfg(not(feature = "no-entrypoint"))] use {default_env::default_env, solana_program::entrypoint, solana_security_txt::security_txt};