From 1bd6d1aa1a6c49e5d732d3990cb72d0b16ba93cf Mon Sep 17 00:00:00 2001 From: Serge Farny Date: Tue, 1 Oct 2024 10:23:40 +0200 Subject: [PATCH] Router: add a liquidity endpoint (#2) Fetch total liquidity (in US $) of a token reachable through autobahn --- bin/autobahn-router/src/edge_updater.rs | 22 ------- .../src/liquidity/liquidity_computer.rs | 57 ++++++++++++++++ .../src/liquidity/liquidity_provider.rs | 66 +++++++++++++++++++ .../src/liquidity/liquidity_updater.rs | 64 ++++++++++++++++++ bin/autobahn-router/src/liquidity/mod.rs | 7 ++ bin/autobahn-router/src/main.rs | 64 +++++++++++------- bin/autobahn-router/src/server/http_server.rs | 36 ++++++++++ lib/router-lib/src/model/liquidity_request.rs | 8 +++ .../src/model/liquidity_response.rs | 9 +++ lib/router-lib/src/model/mod.rs | 2 + 10 files changed, 288 insertions(+), 47 deletions(-) create mode 100644 bin/autobahn-router/src/liquidity/liquidity_computer.rs create mode 100644 bin/autobahn-router/src/liquidity/liquidity_provider.rs create mode 100644 bin/autobahn-router/src/liquidity/liquidity_updater.rs create mode 100644 bin/autobahn-router/src/liquidity/mod.rs create mode 100644 lib/router-lib/src/model/liquidity_request.rs create mode 100644 lib/router-lib/src/model/liquidity_response.rs diff --git a/bin/autobahn-router/src/edge_updater.rs b/bin/autobahn-router/src/edge_updater.rs index fa57aa0..f27e9d9 100644 --- a/bin/autobahn-router/src/edge_updater.rs +++ b/bin/autobahn-router/src/edge_updater.rs @@ -29,28 +29,6 @@ pub struct Dex { } impl Dex { - pub fn _desc(&self) -> String { - match &self.subscription_mode { - DexSubscriptionMode::Disabled => { - format!("Dex {} mode=Disabled", self.name) - } - DexSubscriptionMode::Accounts(subscribed_pks) => { - format!("Dex {} mode=gMa #pks={}", self.name, subscribed_pks.len()) - } - DexSubscriptionMode::Programs(subscribed_prgs) => format!( - "Dex {} mode=gPa program_ids={:?}", - self.name, subscribed_prgs - ), - DexSubscriptionMode::Mixed(m) => format!( - "Dex {} mode=mix #pks={} program_ids={:?}, tokens_for_owners={:?}", - self.name, - m.accounts.len(), - m.programs, - m.token_accounts_for_owner - ), - } - } - pub fn edges(&self) -> Vec> { let edges: Vec> = self .edges_per_pk diff --git a/bin/autobahn-router/src/liquidity/liquidity_computer.rs b/bin/autobahn-router/src/liquidity/liquidity_computer.rs new file mode 100644 index 0000000..d8f381d --- /dev/null +++ b/bin/autobahn-router/src/liquidity/liquidity_computer.rs @@ -0,0 +1,57 @@ +use crate::edge::Edge; +use router_lib::dex::AccountProviderView; +use std::sync::Arc; + +pub fn compute_liquidity( + edge: &Arc, + chain_data: &AccountProviderView, +) -> anyhow::Result { + let loaded = edge.prepare(&chain_data)?; + + let first_in_amount = edge + .state + .read() + .unwrap() + .cached_prices + .first() + .map(|x| x.0); + let Some(first_in_amount) = first_in_amount else { + anyhow::bail!("Too early to compute liquidity"); + }; + + let mut iter_counter = 0; + let mut has_failed = false; + let mut last_successful_in_amount = first_in_amount; + let mut next_in_amount = first_in_amount; + let mut last_successful_out_amount = 0; + let acceptable_price_impact = 0.3; + + loop { + if next_in_amount == 0 || iter_counter > 50 { + break; + } + iter_counter = iter_counter + 1; + + let quote = edge.quote(&loaded, &chain_data, next_in_amount); + let expected_output = (2.0 - acceptable_price_impact) * last_successful_out_amount as f64; + + let out_amount = quote.map(|x| x.out_amount).unwrap_or(0); + + if (out_amount as f64) < expected_output { + if has_failed { + break; + } + has_failed = true; + next_in_amount = next_in_amount + .saturating_add(last_successful_in_amount) + .saturating_div(2); + continue; + }; + + last_successful_in_amount = next_in_amount; + last_successful_out_amount = out_amount; + next_in_amount = next_in_amount.saturating_mul(2); + } + + Ok(last_successful_out_amount) +} diff --git a/bin/autobahn-router/src/liquidity/liquidity_provider.rs b/bin/autobahn-router/src/liquidity/liquidity_provider.rs new file mode 100644 index 0000000..6316169 --- /dev/null +++ b/bin/autobahn-router/src/liquidity/liquidity_provider.rs @@ -0,0 +1,66 @@ +use crate::token_cache::TokenCache; +use ordered_float::{FloatCore, Pow}; +use router_lib::price_feeds::price_cache::PriceCache; +use solana_program::pubkey::Pubkey; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +pub struct Liquidity { + pub liquidity_by_pool: HashMap, +} + +pub struct LiquidityProvider { + liquidity_by_mint: HashMap, + token_cache: TokenCache, + price_cache: PriceCache, +} + +pub type LiquidityProviderArcRw = Arc>; + +impl LiquidityProvider { + pub fn new(token_cache: TokenCache, price_cache: PriceCache) -> LiquidityProvider { + LiquidityProvider { + liquidity_by_mint: Default::default(), + token_cache, + price_cache, + } + } + + pub fn set_liquidity(&mut self, mint: Pubkey, pool: Pubkey, liquidity: u64) { + if let Some(cache) = self.liquidity_by_mint.get_mut(&mint) { + cache.liquidity_by_pool.insert(pool, liquidity); + } else { + self.liquidity_by_mint.insert( + mint, + Liquidity { + liquidity_by_pool: HashMap::from([(pool, liquidity)]), + }, + ); + } + } + + pub fn get_total_liquidity_native(&self, mint: Pubkey) -> u64 { + if let Some(cache) = self.liquidity_by_mint.get(&mint) { + let mut sum = 0u64; + for amount in cache.liquidity_by_pool.iter().map(|x| x.1) { + sum = sum.saturating_add(*amount); + } + sum + } else { + 0 + } + } + + pub fn get_total_liquidity_in_dollars(&self, mint: Pubkey) -> anyhow::Result { + let liquidity_native = self.get_total_liquidity_native(mint); + let price = self + .price_cache + .price_ui(mint) + .ok_or(anyhow::format_err!("no price"))?; + let decimal = self.token_cache.token(mint).map(|x| x.decimals)?; + + let liquidity_dollars = (liquidity_native as f64 / 10.0.pow(decimal as f64)) * price; + + Ok(liquidity_dollars) + } +} diff --git a/bin/autobahn-router/src/liquidity/liquidity_updater.rs b/bin/autobahn-router/src/liquidity/liquidity_updater.rs new file mode 100644 index 0000000..da0c3d5 --- /dev/null +++ b/bin/autobahn-router/src/liquidity/liquidity_updater.rs @@ -0,0 +1,64 @@ +use crate::debug_tools; +use crate::edge::Edge; +use crate::liquidity::liquidity_computer::compute_liquidity; +use crate::liquidity::liquidity_provider::LiquidityProviderArcRw; +use crate::util::tokio_spawn; +use itertools::Itertools; +use router_lib::dex::AccountProviderView; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::broadcast; +use tokio::task::JoinHandle; +use tracing::{debug, info}; + +pub fn spawn_liquidity_updater_job( + provider: LiquidityProviderArcRw, + edges: Vec>, + chain_data: AccountProviderView, + mut exit: broadcast::Receiver<()>, +) -> JoinHandle<()> { + let job = tokio_spawn("liquidity_updater", async move { + let mut refresh_all_interval = tokio::time::interval(Duration::from_secs(30)); + refresh_all_interval.tick().await; + + loop { + tokio::select! { + _ = exit.recv() => { + info!("shutting down liquidity_updater task"); + break; + } + _ = refresh_all_interval.tick() => { + refresh_liquidity(&provider, &edges, &chain_data); + } + } + } + }); + + job +} + +fn refresh_liquidity( + provider: &LiquidityProviderArcRw, + edges: &Vec>, + account_provider: &AccountProviderView, +) { + for edge in edges { + let liquidity = compute_liquidity(&edge, &account_provider); + if let Ok(liquidity) = liquidity { + provider + .write() + .unwrap() + .set_liquidity(edge.output_mint, edge.id.key(), liquidity); + } else { + debug!("Could not compute liquidity for {}", edge.id.desc()) + } + } + + for mint in edges.iter().map(|x| x.output_mint).unique() { + debug!( + "Liquidity for {} -> {}", + debug_tools::name(&mint), + provider.read().unwrap().get_total_liquidity_native(mint) + ) + } +} diff --git a/bin/autobahn-router/src/liquidity/mod.rs b/bin/autobahn-router/src/liquidity/mod.rs new file mode 100644 index 0000000..e86abe9 --- /dev/null +++ b/bin/autobahn-router/src/liquidity/mod.rs @@ -0,0 +1,7 @@ +mod liquidity_computer; +mod liquidity_provider; +mod liquidity_updater; + +pub use liquidity_provider::LiquidityProvider; +pub use liquidity_provider::LiquidityProviderArcRw; +pub use liquidity_updater::spawn_liquidity_updater_job; diff --git a/bin/autobahn-router/src/main.rs b/bin/autobahn-router/src/main.rs index 08e8424..118cb19 100644 --- a/bin/autobahn-router/src/main.rs +++ b/bin/autobahn-router/src/main.rs @@ -1,10 +1,35 @@ use crate::edge_updater::{spawn_updater_job, Dex}; +use crate::hot_mints::HotMintsCache; use crate::ix_builder::{SwapInstructionsBuilderImpl, SwapStepInstructionBuilderImpl}; +use crate::liquidity::{spawn_liquidity_updater_job, LiquidityProvider}; use crate::path_warmer::spawn_path_warmer_job; +use crate::prometheus_sync::PrometheusSync; +use crate::routing::Routing; +use crate::server::alt_provider::RpcAltProvider; +use crate::server::hash_provider::RpcHashProvider; +use crate::server::http_server::HttpServer; +use crate::server::live_account_provider::LiveAccountProvider; +use crate::server::route_provider::RoutingRouteProvider; +use crate::source::mint_accounts_source::{request_mint_metadata, Token}; +use crate::token_cache::{Decimals, TokenCache}; +use crate::tx_watcher::spawn_tx_watcher_jobs; +use crate::util::tokio_spawn; +use dex_orca::OrcaDex; use itertools::chain; use mango_feeds_connector::chain_data::ChainData; use mango_feeds_connector::SlotUpdate; use prelude::*; +use router_config_lib::{string_or_env, AccountDataSourceConfig, Config}; +use router_feed_lib::account_write::{AccountOrSnapshotUpdate, AccountWrite}; +use router_feed_lib::get_program_account::FeedMetadata; +use router_feed_lib::router_rpc_client::RouterRpcClient; +use router_feed_lib::router_rpc_wrapper::RouterRpcWrapper; +use router_lib::chain_data::ChainDataArcRw; +use router_lib::dex::{ + AccountProviderView, ChainDataAccountProvider, DexInterface, DexSubscriptionMode, +}; +use router_lib::mango; +use router_lib::price_feeds::composite::CompositePriceFeed; use router_lib::price_feeds::price_cache::PriceCache; use router_lib::price_feeds::price_feed::PriceFeed; use solana_client::nonblocking::rpc_client::RpcClient; @@ -18,31 +43,6 @@ use std::time::{Duration, Instant}; use tokio::sync::broadcast; use tokio::task::JoinHandle; -use crate::hot_mints::HotMintsCache; -use crate::prometheus_sync::PrometheusSync; -use crate::routing::Routing; -use crate::server::alt_provider::RpcAltProvider; -use crate::server::hash_provider::RpcHashProvider; -use crate::server::http_server::HttpServer; -use crate::server::live_account_provider::LiveAccountProvider; -use crate::server::route_provider::RoutingRouteProvider; -use crate::source::mint_accounts_source::{request_mint_metadata, Token}; -use crate::token_cache::{Decimals, TokenCache}; -use crate::tx_watcher::spawn_tx_watcher_jobs; -use crate::util::tokio_spawn; -use dex_orca::OrcaDex; -use router_config_lib::{string_or_env, AccountDataSourceConfig, Config}; -use router_feed_lib::account_write::{AccountOrSnapshotUpdate, AccountWrite}; -use router_feed_lib::get_program_account::FeedMetadata; -use router_feed_lib::router_rpc_client::RouterRpcClient; -use router_feed_lib::router_rpc_wrapper::RouterRpcWrapper; -use router_lib::chain_data::ChainDataArcRw; -use router_lib::dex::{ - AccountProviderView, ChainDataAccountProvider, DexInterface, DexSubscriptionMode, -}; -use router_lib::mango; -use router_lib::price_feeds::composite::CompositePriceFeed; - mod alt; mod debug_tools; mod dex; @@ -50,6 +50,7 @@ pub mod edge; mod edge_updater; mod hot_mints; pub mod ix_builder; +mod liquidity; mod metrics; mod mock; mod path_warmer; @@ -387,11 +388,23 @@ async fn main() -> anyhow::Result<()> { router_version as u8, )); + let liquidity_provider = Arc::new(RwLock::new(LiquidityProvider::new( + token_cache.clone(), + price_cache.clone(), + ))); + let liquidity_job = spawn_liquidity_updater_job( + liquidity_provider.clone(), + edges.clone(), + chain_data_wrapper, + exit_sender.subscribe(), + ); + let server_job = HttpServer::start( route_provider.clone(), hash_provider, alt_provider, live_account_provider, + liquidity_provider.clone(), ix_builder, config.clone(), exit_sender.subscribe(), @@ -504,6 +517,7 @@ async fn main() -> anyhow::Result<()> { tx_sender_job, tx_watcher_job, account_update_job, + liquidity_job, ] .into_iter() .chain(update_jobs.into_iter()) diff --git a/bin/autobahn-router/src/server/http_server.rs b/bin/autobahn-router/src/server/http_server.rs index 40459e5..407f929 100644 --- a/bin/autobahn-router/src/server/http_server.rs +++ b/bin/autobahn-router/src/server/http_server.rs @@ -21,12 +21,15 @@ use tower_http::cors::{AllowHeaders, AllowMethods, Any, CorsLayer}; use crate::alt::alt_optimizer; use crate::ix_builder::SwapInstructionsBuilder; +use crate::liquidity::{LiquidityProvider, LiquidityProviderArcRw}; use crate::routing_types::Route; use crate::server::alt_provider::AltProvider; use crate::server::hash_provider::HashProvider; use crate::{debug_tools, metrics}; use router_config_lib::Config; use router_lib::dex::{AccountProvider, AccountProviderView, SwapMode}; +use router_lib::model::liquidity_request::LiquidityRequest; +use router_lib::model::liquidity_response::LiquidityResponse; use router_lib::model::quote_response::{RoutePlan, SwapInfo}; // make sure the transaction can be executed @@ -49,6 +52,7 @@ impl HttpServer { hash_provider: Arc, alt_provider: Arc, live_account_provider: Arc, + liquidity_provider: LiquidityProviderArcRw, ix_builder: Arc, config: Config, exit: tokio::sync::broadcast::Receiver<()>, @@ -58,6 +62,7 @@ impl HttpServer { hash_provider, alt_provider, live_account_provider, + liquidity_provider, ix_builder, config, exit, @@ -80,6 +85,7 @@ impl HttpServer { hash_provider: Arc, alt_provider: Arc, live_account_provider: Arc, + liquidity_provider: LiquidityProviderArcRw, ix_builder: Arc, config: Config, exit: tokio::sync::broadcast::Receiver<()>, @@ -107,6 +113,7 @@ impl HttpServer { hash_provider, alt_provider, live_account_provider, + liquidity_provider, ix_builder, reprice_frequency, )?; @@ -462,6 +469,28 @@ impl HttpServer { Html("マンゴールーター") } + async fn liquidity_handler( + liquidity_provider: LiquidityProviderArcRw, + Form(input): Form, + ) -> Result, AppError> { + let mut result = HashMap::new(); + let reader = liquidity_provider.read().unwrap(); + + for mint_str in input.mints.split(",") { + let mint_str = mint_str.trim().to_string(); + let mint = Pubkey::from_str(&mint_str)?; + result.insert( + mint_str, + reader.get_total_liquidity_in_dollars(mint).unwrap_or(0.0), + ); + } + + drop(reader); + let json_response = serde_json::json!(LiquidityResponse { liquidity: result }); + + Ok(Json(json_response)) + } + fn extract_client_key(headers: &HeaderMap) -> &str { if let Some(client_key) = headers.get("x-client-key") { client_key.to_str().unwrap_or("invalid") @@ -482,6 +511,7 @@ impl HttpServer { hash_provider: Arc, alt_provider: Arc, live_account_provider: Arc, + liquidity_provider: LiquidityProviderArcRw, ix_builder: Arc, reprice_probability: f64, ) -> anyhow::Result> { @@ -495,6 +525,12 @@ impl HttpServer { router = router.route("/", routing::get(Self::handler)); + let lp = liquidity_provider.clone(); + router = router.route( + "/liquidity", + routing::get(move |form| Self::liquidity_handler(lp, form)), + ); + let alt = address_lookup_tables.clone(); let rp = route_provider.clone(); let hp = hash_provider.clone(); diff --git a/lib/router-lib/src/model/liquidity_request.rs b/lib/router-lib/src/model/liquidity_request.rs new file mode 100644 index 0000000..fdd87f0 --- /dev/null +++ b/lib/router-lib/src/model/liquidity_request.rs @@ -0,0 +1,8 @@ +use serde::Deserialize; + +#[derive(Deserialize, Debug)] +#[allow(dead_code)] +#[serde(rename_all = "camelCase")] +pub struct LiquidityRequest { + pub mints: String, +} diff --git a/lib/router-lib/src/model/liquidity_response.rs b/lib/router-lib/src/model/liquidity_response.rs new file mode 100644 index 0000000..970ba5b --- /dev/null +++ b/lib/router-lib/src/model/liquidity_response.rs @@ -0,0 +1,9 @@ +use serde_derive::{Deserialize, Serialize}; +use std::collections::HashMap; + +#[derive(Deserialize, Serialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +#[serde_with::serde_as] +pub struct LiquidityResponse { + pub liquidity: HashMap, +} diff --git a/lib/router-lib/src/model/mod.rs b/lib/router-lib/src/model/mod.rs index b689bb2..e05b067 100644 --- a/lib/router-lib/src/model/mod.rs +++ b/lib/router-lib/src/model/mod.rs @@ -1,3 +1,5 @@ +pub mod liquidity_request; +pub mod liquidity_response; pub mod quote_request; pub mod quote_response; pub mod swap_request;