Router: add a liquidity endpoint (#2)

Fetch total liquidity (in US $) of a token reachable through autobahn
This commit is contained in:
Serge Farny 2024-10-01 10:23:40 +02:00 committed by GitHub
parent 9bd763fd2d
commit 1bd6d1aa1a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 288 additions and 47 deletions

View File

@ -29,28 +29,6 @@ pub struct Dex {
} }
impl Dex { impl Dex {
pub fn _desc(&self) -> String {
match &self.subscription_mode {
DexSubscriptionMode::Disabled => {
format!("Dex {} mode=Disabled", self.name)
}
DexSubscriptionMode::Accounts(subscribed_pks) => {
format!("Dex {} mode=gMa #pks={}", self.name, subscribed_pks.len())
}
DexSubscriptionMode::Programs(subscribed_prgs) => format!(
"Dex {} mode=gPa program_ids={:?}",
self.name, subscribed_prgs
),
DexSubscriptionMode::Mixed(m) => format!(
"Dex {} mode=mix #pks={} program_ids={:?}, tokens_for_owners={:?}",
self.name,
m.accounts.len(),
m.programs,
m.token_accounts_for_owner
),
}
}
pub fn edges(&self) -> Vec<Arc<Edge>> { pub fn edges(&self) -> Vec<Arc<Edge>> {
let edges: Vec<Arc<Edge>> = self let edges: Vec<Arc<Edge>> = self
.edges_per_pk .edges_per_pk

View File

@ -0,0 +1,57 @@
use crate::edge::Edge;
use router_lib::dex::AccountProviderView;
use std::sync::Arc;
pub fn compute_liquidity(
edge: &Arc<Edge>,
chain_data: &AccountProviderView,
) -> anyhow::Result<u64> {
let loaded = edge.prepare(&chain_data)?;
let first_in_amount = edge
.state
.read()
.unwrap()
.cached_prices
.first()
.map(|x| x.0);
let Some(first_in_amount) = first_in_amount else {
anyhow::bail!("Too early to compute liquidity");
};
let mut iter_counter = 0;
let mut has_failed = false;
let mut last_successful_in_amount = first_in_amount;
let mut next_in_amount = first_in_amount;
let mut last_successful_out_amount = 0;
let acceptable_price_impact = 0.3;
loop {
if next_in_amount == 0 || iter_counter > 50 {
break;
}
iter_counter = iter_counter + 1;
let quote = edge.quote(&loaded, &chain_data, next_in_amount);
let expected_output = (2.0 - acceptable_price_impact) * last_successful_out_amount as f64;
let out_amount = quote.map(|x| x.out_amount).unwrap_or(0);
if (out_amount as f64) < expected_output {
if has_failed {
break;
}
has_failed = true;
next_in_amount = next_in_amount
.saturating_add(last_successful_in_amount)
.saturating_div(2);
continue;
};
last_successful_in_amount = next_in_amount;
last_successful_out_amount = out_amount;
next_in_amount = next_in_amount.saturating_mul(2);
}
Ok(last_successful_out_amount)
}

View File

@ -0,0 +1,66 @@
use crate::token_cache::TokenCache;
use ordered_float::{FloatCore, Pow};
use router_lib::price_feeds::price_cache::PriceCache;
use solana_program::pubkey::Pubkey;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
pub struct Liquidity {
pub liquidity_by_pool: HashMap<Pubkey, u64>,
}
pub struct LiquidityProvider {
liquidity_by_mint: HashMap<Pubkey, Liquidity>,
token_cache: TokenCache,
price_cache: PriceCache,
}
pub type LiquidityProviderArcRw = Arc<RwLock<LiquidityProvider>>;
impl LiquidityProvider {
pub fn new(token_cache: TokenCache, price_cache: PriceCache) -> LiquidityProvider {
LiquidityProvider {
liquidity_by_mint: Default::default(),
token_cache,
price_cache,
}
}
pub fn set_liquidity(&mut self, mint: Pubkey, pool: Pubkey, liquidity: u64) {
if let Some(cache) = self.liquidity_by_mint.get_mut(&mint) {
cache.liquidity_by_pool.insert(pool, liquidity);
} else {
self.liquidity_by_mint.insert(
mint,
Liquidity {
liquidity_by_pool: HashMap::from([(pool, liquidity)]),
},
);
}
}
pub fn get_total_liquidity_native(&self, mint: Pubkey) -> u64 {
if let Some(cache) = self.liquidity_by_mint.get(&mint) {
let mut sum = 0u64;
for amount in cache.liquidity_by_pool.iter().map(|x| x.1) {
sum = sum.saturating_add(*amount);
}
sum
} else {
0
}
}
pub fn get_total_liquidity_in_dollars(&self, mint: Pubkey) -> anyhow::Result<f64> {
let liquidity_native = self.get_total_liquidity_native(mint);
let price = self
.price_cache
.price_ui(mint)
.ok_or(anyhow::format_err!("no price"))?;
let decimal = self.token_cache.token(mint).map(|x| x.decimals)?;
let liquidity_dollars = (liquidity_native as f64 / 10.0.pow(decimal as f64)) * price;
Ok(liquidity_dollars)
}
}

View File

@ -0,0 +1,64 @@
use crate::debug_tools;
use crate::edge::Edge;
use crate::liquidity::liquidity_computer::compute_liquidity;
use crate::liquidity::liquidity_provider::LiquidityProviderArcRw;
use crate::util::tokio_spawn;
use itertools::Itertools;
use router_lib::dex::AccountProviderView;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tracing::{debug, info};
pub fn spawn_liquidity_updater_job(
provider: LiquidityProviderArcRw,
edges: Vec<Arc<Edge>>,
chain_data: AccountProviderView,
mut exit: broadcast::Receiver<()>,
) -> JoinHandle<()> {
let job = tokio_spawn("liquidity_updater", async move {
let mut refresh_all_interval = tokio::time::interval(Duration::from_secs(30));
refresh_all_interval.tick().await;
loop {
tokio::select! {
_ = exit.recv() => {
info!("shutting down liquidity_updater task");
break;
}
_ = refresh_all_interval.tick() => {
refresh_liquidity(&provider, &edges, &chain_data);
}
}
}
});
job
}
fn refresh_liquidity(
provider: &LiquidityProviderArcRw,
edges: &Vec<Arc<Edge>>,
account_provider: &AccountProviderView,
) {
for edge in edges {
let liquidity = compute_liquidity(&edge, &account_provider);
if let Ok(liquidity) = liquidity {
provider
.write()
.unwrap()
.set_liquidity(edge.output_mint, edge.id.key(), liquidity);
} else {
debug!("Could not compute liquidity for {}", edge.id.desc())
}
}
for mint in edges.iter().map(|x| x.output_mint).unique() {
debug!(
"Liquidity for {} -> {}",
debug_tools::name(&mint),
provider.read().unwrap().get_total_liquidity_native(mint)
)
}
}

View File

@ -0,0 +1,7 @@
mod liquidity_computer;
mod liquidity_provider;
mod liquidity_updater;
pub use liquidity_provider::LiquidityProvider;
pub use liquidity_provider::LiquidityProviderArcRw;
pub use liquidity_updater::spawn_liquidity_updater_job;

View File

@ -1,10 +1,35 @@
use crate::edge_updater::{spawn_updater_job, Dex}; use crate::edge_updater::{spawn_updater_job, Dex};
use crate::hot_mints::HotMintsCache;
use crate::ix_builder::{SwapInstructionsBuilderImpl, SwapStepInstructionBuilderImpl}; use crate::ix_builder::{SwapInstructionsBuilderImpl, SwapStepInstructionBuilderImpl};
use crate::liquidity::{spawn_liquidity_updater_job, LiquidityProvider};
use crate::path_warmer::spawn_path_warmer_job; use crate::path_warmer::spawn_path_warmer_job;
use crate::prometheus_sync::PrometheusSync;
use crate::routing::Routing;
use crate::server::alt_provider::RpcAltProvider;
use crate::server::hash_provider::RpcHashProvider;
use crate::server::http_server::HttpServer;
use crate::server::live_account_provider::LiveAccountProvider;
use crate::server::route_provider::RoutingRouteProvider;
use crate::source::mint_accounts_source::{request_mint_metadata, Token};
use crate::token_cache::{Decimals, TokenCache};
use crate::tx_watcher::spawn_tx_watcher_jobs;
use crate::util::tokio_spawn;
use dex_orca::OrcaDex;
use itertools::chain; use itertools::chain;
use mango_feeds_connector::chain_data::ChainData; use mango_feeds_connector::chain_data::ChainData;
use mango_feeds_connector::SlotUpdate; use mango_feeds_connector::SlotUpdate;
use prelude::*; use prelude::*;
use router_config_lib::{string_or_env, AccountDataSourceConfig, Config};
use router_feed_lib::account_write::{AccountOrSnapshotUpdate, AccountWrite};
use router_feed_lib::get_program_account::FeedMetadata;
use router_feed_lib::router_rpc_client::RouterRpcClient;
use router_feed_lib::router_rpc_wrapper::RouterRpcWrapper;
use router_lib::chain_data::ChainDataArcRw;
use router_lib::dex::{
AccountProviderView, ChainDataAccountProvider, DexInterface, DexSubscriptionMode,
};
use router_lib::mango;
use router_lib::price_feeds::composite::CompositePriceFeed;
use router_lib::price_feeds::price_cache::PriceCache; use router_lib::price_feeds::price_cache::PriceCache;
use router_lib::price_feeds::price_feed::PriceFeed; use router_lib::price_feeds::price_feed::PriceFeed;
use solana_client::nonblocking::rpc_client::RpcClient; use solana_client::nonblocking::rpc_client::RpcClient;
@ -18,31 +43,6 @@ use std::time::{Duration, Instant};
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use crate::hot_mints::HotMintsCache;
use crate::prometheus_sync::PrometheusSync;
use crate::routing::Routing;
use crate::server::alt_provider::RpcAltProvider;
use crate::server::hash_provider::RpcHashProvider;
use crate::server::http_server::HttpServer;
use crate::server::live_account_provider::LiveAccountProvider;
use crate::server::route_provider::RoutingRouteProvider;
use crate::source::mint_accounts_source::{request_mint_metadata, Token};
use crate::token_cache::{Decimals, TokenCache};
use crate::tx_watcher::spawn_tx_watcher_jobs;
use crate::util::tokio_spawn;
use dex_orca::OrcaDex;
use router_config_lib::{string_or_env, AccountDataSourceConfig, Config};
use router_feed_lib::account_write::{AccountOrSnapshotUpdate, AccountWrite};
use router_feed_lib::get_program_account::FeedMetadata;
use router_feed_lib::router_rpc_client::RouterRpcClient;
use router_feed_lib::router_rpc_wrapper::RouterRpcWrapper;
use router_lib::chain_data::ChainDataArcRw;
use router_lib::dex::{
AccountProviderView, ChainDataAccountProvider, DexInterface, DexSubscriptionMode,
};
use router_lib::mango;
use router_lib::price_feeds::composite::CompositePriceFeed;
mod alt; mod alt;
mod debug_tools; mod debug_tools;
mod dex; mod dex;
@ -50,6 +50,7 @@ pub mod edge;
mod edge_updater; mod edge_updater;
mod hot_mints; mod hot_mints;
pub mod ix_builder; pub mod ix_builder;
mod liquidity;
mod metrics; mod metrics;
mod mock; mod mock;
mod path_warmer; mod path_warmer;
@ -387,11 +388,23 @@ async fn main() -> anyhow::Result<()> {
router_version as u8, router_version as u8,
)); ));
let liquidity_provider = Arc::new(RwLock::new(LiquidityProvider::new(
token_cache.clone(),
price_cache.clone(),
)));
let liquidity_job = spawn_liquidity_updater_job(
liquidity_provider.clone(),
edges.clone(),
chain_data_wrapper,
exit_sender.subscribe(),
);
let server_job = HttpServer::start( let server_job = HttpServer::start(
route_provider.clone(), route_provider.clone(),
hash_provider, hash_provider,
alt_provider, alt_provider,
live_account_provider, live_account_provider,
liquidity_provider.clone(),
ix_builder, ix_builder,
config.clone(), config.clone(),
exit_sender.subscribe(), exit_sender.subscribe(),
@ -504,6 +517,7 @@ async fn main() -> anyhow::Result<()> {
tx_sender_job, tx_sender_job,
tx_watcher_job, tx_watcher_job,
account_update_job, account_update_job,
liquidity_job,
] ]
.into_iter() .into_iter()
.chain(update_jobs.into_iter()) .chain(update_jobs.into_iter())

View File

@ -21,12 +21,15 @@ use tower_http::cors::{AllowHeaders, AllowMethods, Any, CorsLayer};
use crate::alt::alt_optimizer; use crate::alt::alt_optimizer;
use crate::ix_builder::SwapInstructionsBuilder; use crate::ix_builder::SwapInstructionsBuilder;
use crate::liquidity::{LiquidityProvider, LiquidityProviderArcRw};
use crate::routing_types::Route; use crate::routing_types::Route;
use crate::server::alt_provider::AltProvider; use crate::server::alt_provider::AltProvider;
use crate::server::hash_provider::HashProvider; use crate::server::hash_provider::HashProvider;
use crate::{debug_tools, metrics}; use crate::{debug_tools, metrics};
use router_config_lib::Config; use router_config_lib::Config;
use router_lib::dex::{AccountProvider, AccountProviderView, SwapMode}; use router_lib::dex::{AccountProvider, AccountProviderView, SwapMode};
use router_lib::model::liquidity_request::LiquidityRequest;
use router_lib::model::liquidity_response::LiquidityResponse;
use router_lib::model::quote_response::{RoutePlan, SwapInfo}; use router_lib::model::quote_response::{RoutePlan, SwapInfo};
// make sure the transaction can be executed // make sure the transaction can be executed
@ -49,6 +52,7 @@ impl HttpServer {
hash_provider: Arc<THashProvider>, hash_provider: Arc<THashProvider>,
alt_provider: Arc<TAltProvider>, alt_provider: Arc<TAltProvider>,
live_account_provider: Arc<TAccountProvider>, live_account_provider: Arc<TAccountProvider>,
liquidity_provider: LiquidityProviderArcRw,
ix_builder: Arc<TIxBuilder>, ix_builder: Arc<TIxBuilder>,
config: Config, config: Config,
exit: tokio::sync::broadcast::Receiver<()>, exit: tokio::sync::broadcast::Receiver<()>,
@ -58,6 +62,7 @@ impl HttpServer {
hash_provider, hash_provider,
alt_provider, alt_provider,
live_account_provider, live_account_provider,
liquidity_provider,
ix_builder, ix_builder,
config, config,
exit, exit,
@ -80,6 +85,7 @@ impl HttpServer {
hash_provider: Arc<THashProvider>, hash_provider: Arc<THashProvider>,
alt_provider: Arc<TAltProvider>, alt_provider: Arc<TAltProvider>,
live_account_provider: Arc<TAccountProvider>, live_account_provider: Arc<TAccountProvider>,
liquidity_provider: LiquidityProviderArcRw,
ix_builder: Arc<TIxBuilder>, ix_builder: Arc<TIxBuilder>,
config: Config, config: Config,
exit: tokio::sync::broadcast::Receiver<()>, exit: tokio::sync::broadcast::Receiver<()>,
@ -107,6 +113,7 @@ impl HttpServer {
hash_provider, hash_provider,
alt_provider, alt_provider,
live_account_provider, live_account_provider,
liquidity_provider,
ix_builder, ix_builder,
reprice_frequency, reprice_frequency,
)?; )?;
@ -462,6 +469,28 @@ impl HttpServer {
Html("マンゴールーター") Html("マンゴールーター")
} }
async fn liquidity_handler(
liquidity_provider: LiquidityProviderArcRw,
Form(input): Form<LiquidityRequest>,
) -> Result<Json<Value>, AppError> {
let mut result = HashMap::new();
let reader = liquidity_provider.read().unwrap();
for mint_str in input.mints.split(",") {
let mint_str = mint_str.trim().to_string();
let mint = Pubkey::from_str(&mint_str)?;
result.insert(
mint_str,
reader.get_total_liquidity_in_dollars(mint).unwrap_or(0.0),
);
}
drop(reader);
let json_response = serde_json::json!(LiquidityResponse { liquidity: result });
Ok(Json(json_response))
}
fn extract_client_key(headers: &HeaderMap) -> &str { fn extract_client_key(headers: &HeaderMap) -> &str {
if let Some(client_key) = headers.get("x-client-key") { if let Some(client_key) = headers.get("x-client-key") {
client_key.to_str().unwrap_or("invalid") client_key.to_str().unwrap_or("invalid")
@ -482,6 +511,7 @@ impl HttpServer {
hash_provider: Arc<THashProvider>, hash_provider: Arc<THashProvider>,
alt_provider: Arc<TAltProvider>, alt_provider: Arc<TAltProvider>,
live_account_provider: Arc<TAccountProvider>, live_account_provider: Arc<TAccountProvider>,
liquidity_provider: LiquidityProviderArcRw,
ix_builder: Arc<TIxBuilder>, ix_builder: Arc<TIxBuilder>,
reprice_probability: f64, reprice_probability: f64,
) -> anyhow::Result<Router<()>> { ) -> anyhow::Result<Router<()>> {
@ -495,6 +525,12 @@ impl HttpServer {
router = router.route("/", routing::get(Self::handler)); router = router.route("/", routing::get(Self::handler));
let lp = liquidity_provider.clone();
router = router.route(
"/liquidity",
routing::get(move |form| Self::liquidity_handler(lp, form)),
);
let alt = address_lookup_tables.clone(); let alt = address_lookup_tables.clone();
let rp = route_provider.clone(); let rp = route_provider.clone();
let hp = hash_provider.clone(); let hp = hash_provider.clone();

View File

@ -0,0 +1,8 @@
use serde::Deserialize;
#[derive(Deserialize, Debug)]
#[allow(dead_code)]
#[serde(rename_all = "camelCase")]
pub struct LiquidityRequest {
pub mints: String,
}

View File

@ -0,0 +1,9 @@
use serde_derive::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
#[serde_with::serde_as]
pub struct LiquidityResponse {
pub liquidity: HashMap<String, f64>,
}

View File

@ -1,3 +1,5 @@
pub mod liquidity_request;
pub mod liquidity_response;
pub mod quote_request; pub mod quote_request;
pub mod quote_response; pub mod quote_response;
pub mod swap_request; pub mod swap_request;