From ea91d9d353debc75aa599ea6dc3c08813776f9d0 Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Wed, 7 Feb 2024 12:52:32 +0100 Subject: [PATCH] rust client: optionally derive prio fees from feed (#866) This allows connecting to a lite-rpc feed to receive block priority updates and computing recently good priority fee values based on that. --- Cargo.lock | 2 + bin/cli/src/main.rs | 11 +- bin/keeper/src/crank.rs | 3 + bin/keeper/src/main.rs | 31 ++-- bin/keeper/src/taker.rs | 8 +- bin/liquidator/src/main.rs | 44 +++-- bin/liquidator/src/trigger_tcs.rs | 6 +- bin/settler/src/main.rs | 28 ++-- bin/settler/src/settle.rs | 2 +- lib/client/Cargo.toml | 2 + lib/client/src/client.rs | 36 ++++- lib/client/src/jupiter/v4.rs | 7 +- lib/client/src/jupiter/v6.rs | 7 +- lib/client/src/lib.rs | 2 + lib/client/src/priority_fees.rs | 240 ++++++++++++++++++++++++++++ lib/client/src/priority_fees_cli.rs | 80 ++++++++++ 16 files changed, 456 insertions(+), 53 deletions(-) create mode 100644 lib/client/src/priority_fees.rs create mode 100644 lib/client/src/priority_fees_cli.rs diff --git a/Cargo.lock b/Cargo.lock index b4a48109e..26b1068d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3445,6 +3445,7 @@ dependencies = [ "atty", "base64 0.13.1", "bincode", + "clap 3.2.25", "derive_builder", "fixed 1.11.0 (git+https://github.com/blockworks-foundation/fixed.git?branch=v1.11.0-borsh0_10-mango)", "futures 0.3.28", @@ -3469,6 +3470,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tokio-tungstenite 0.17.2", "tracing", "tracing-subscriber", ] diff --git a/bin/cli/src/main.rs b/bin/cli/src/main.rs index a27e0245c..90b3a74f6 100644 --- a/bin/cli/src/main.rs +++ b/bin/cli/src/main.rs @@ -137,10 +137,13 @@ impl Rpc { .cluster(anchor_client::Cluster::from_str(&self.url)?) .commitment(solana_sdk::commitment_config::CommitmentConfig::confirmed()) .fee_payer(Some(Arc::new(fee_payer))) - .transaction_builder_config(TransactionBuilderConfig { - prioritization_micro_lamports: Some(5), - compute_budget_per_instruction: Some(250_000), - }) + .transaction_builder_config( + TransactionBuilderConfig::builder() + .prioritization_micro_lamports(Some(5)) + .compute_budget_per_instruction(Some(250_000)) + .build() + .unwrap(), + ) .build() .unwrap()) } diff --git a/bin/keeper/src/crank.rs b/bin/keeper/src/crank.rs index 90569d765..8082f41a8 100644 --- a/bin/keeper/src/crank.rs +++ b/bin/keeper/src/crank.rs @@ -12,6 +12,7 @@ use solana_sdk::{ instruction::{AccountMeta, Instruction}, pubkey::Pubkey, }; +use tokio::task::JoinHandle; use tracing::*; use warp::Filter; @@ -80,6 +81,7 @@ pub async fn runner( interval_consume_events: u64, interval_update_funding: u64, interval_check_for_changes_and_abort: u64, + extra_jobs: Vec>, ) -> Result<(), anyhow::Error> { let handles1 = mango_client .context @@ -144,6 +146,7 @@ pub async fn runner( ), serve_metrics(), debugging_handle, + futures::future::join_all(extra_jobs), ); Ok(()) diff --git a/bin/keeper/src/main.rs b/bin/keeper/src/main.rs index 02f374f19..dad08649e 100644 --- a/bin/keeper/src/main.rs +++ b/bin/keeper/src/main.rs @@ -8,7 +8,8 @@ use anchor_client::Cluster; use clap::{Parser, Subcommand}; use mango_v4_client::{ - keypair_from_cli, Client, FallbackOracleConfig, MangoClient, TransactionBuilderConfig, + keypair_from_cli, priority_fees_cli, Client, FallbackOracleConfig, MangoClient, + TransactionBuilderConfig, }; use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::pubkey::Pubkey; @@ -63,9 +64,12 @@ struct Cli { #[clap(long, env, default_value_t = 10)] timeout: u64, - /// prioritize each transaction with this many microlamports/cu - #[clap(long, env, default_value = "0")] - prioritization_micro_lamports: u64, + #[clap(flatten)] + prioritization_fee_cli: priority_fees_cli::PriorityFeeArgs, + + /// url to the lite-rpc websocket, optional + #[clap(long, env, default_value = "")] + lite_rpc_url: String, } #[derive(Subcommand, Debug, Clone)] @@ -87,6 +91,10 @@ async fn main() -> Result<(), anyhow::Error> { }; let cli = Cli::parse_from(args); + let (prio_provider, prio_jobs) = cli + .prioritization_fee_cli + .make_prio_provider(cli.lite_rpc_url.clone())?; + let owner = Arc::new(keypair_from_cli(&cli.owner)); let rpc_url = cli.rpc_url; @@ -105,11 +113,13 @@ async fn main() -> Result<(), anyhow::Error> { .commitment(commitment) .fee_payer(Some(owner.clone())) .timeout(Duration::from_secs(cli.timeout)) - .transaction_builder_config(TransactionBuilderConfig { - prioritization_micro_lamports: (cli.prioritization_micro_lamports > 0) - .then_some(cli.prioritization_micro_lamports), - compute_budget_per_instruction: None, - }) + .transaction_builder_config( + TransactionBuilderConfig::builder() + .priority_fee_provider(prio_provider) + .compute_budget_per_instruction(None) + .build() + .unwrap(), + ) .fallback_oracle_config(FallbackOracleConfig::Never) .build() .unwrap(), @@ -143,12 +153,13 @@ async fn main() -> Result<(), anyhow::Error> { cli.interval_consume_events, cli.interval_update_funding, cli.interval_check_new_listings_and_abort, + prio_jobs, ) .await } Command::Taker { .. } => { let client = mango_client.clone(); - taker::runner(client, debugging_handle).await + taker::runner(client, debugging_handle, prio_jobs).await } } } diff --git a/bin/keeper/src/taker.rs b/bin/keeper/src/taker.rs index 90a56d0bb..70f024df0 100644 --- a/bin/keeper/src/taker.rs +++ b/bin/keeper/src/taker.rs @@ -10,13 +10,15 @@ use mango_v4::{ accounts_ix::{Serum3OrderType, Serum3SelfTradeBehavior, Serum3Side}, state::TokenIndex, }; +use tokio::task::JoinHandle; use tracing::*; use crate::MangoClient; pub async fn runner( mango_client: Arc, - _debugging_handle: impl Future, + debugging_handle: impl Future, + extra_jobs: Vec>, ) -> Result<(), anyhow::Error> { ensure_deposit(&mango_client).await?; ensure_oo(&mango_client).await?; @@ -53,7 +55,9 @@ pub async fn runner( futures::join!( futures::future::join_all(handles1), - futures::future::join_all(handles2) + futures::future::join_all(handles2), + debugging_handle, + futures::future::join_all(extra_jobs), ); Ok(()) diff --git a/bin/liquidator/src/main.rs b/bin/liquidator/src/main.rs index 2d4e1cc1d..ed091e55f 100644 --- a/bin/liquidator/src/main.rs +++ b/bin/liquidator/src/main.rs @@ -7,6 +7,7 @@ use anchor_client::Cluster; use anyhow::Context; use clap::Parser; use mango_v4::state::{PerpMarketIndex, TokenIndex}; +use mango_v4_client::priority_fees_cli; use mango_v4_client::AsyncChannelSendUnlessFull; use mango_v4_client::{ account_update_stream, chain_data, error_tracking::ErrorTracking, jupiter, keypair_from_cli, @@ -148,9 +149,12 @@ struct Cli { #[clap(long, env, value_enum, default_value = "swap-sell-into-buy")] tcs_mode: TcsMode, - /// prioritize each transaction with this many microlamports/cu - #[clap(long, env, default_value = "0")] - prioritization_micro_lamports: u64, + #[clap(flatten)] + prioritization_fee_cli: priority_fees_cli::PriorityFeeArgs, + + /// url to the lite-rpc websocket, optional + #[clap(long, env, default_value = "")] + lite_rpc_url: String, /// compute limit requested for liquidation instructions #[clap(long, env, default_value = "250000")] @@ -189,20 +193,31 @@ pub fn encode_address(addr: &Pubkey) -> String { async fn main() -> anyhow::Result<()> { mango_v4_client::tracing_subscriber_init(); - let args = if let Ok(cli_dotenv) = CliDotenv::try_parse() { + let args: Vec = if let Ok(cli_dotenv) = CliDotenv::try_parse() { dotenv::from_path(cli_dotenv.dotenv)?; - cli_dotenv.remaining_args + std::env::args_os() + .take(1) + .chain(cli_dotenv.remaining_args.into_iter()) + .collect() } else { dotenv::dotenv().ok(); std::env::args_os().collect() }; let cli = Cli::parse_from(args); - let liqor_owner = Arc::new(keypair_from_cli(&cli.liqor_owner)); + // + // Priority fee setup + // + let (prio_provider, prio_jobs) = cli + .prioritization_fee_cli + .make_prio_provider(cli.lite_rpc_url.clone())?; + // + // Client setup + // + let liqor_owner = Arc::new(keypair_from_cli(&cli.liqor_owner)); let rpc_url = cli.rpc_url; let ws_url = rpc_url.replace("https", "wss"); - let rpc_timeout = Duration::from_secs(10); let cluster = Cluster::Custom(rpc_url.clone(), ws_url.clone()); let commitment = CommitmentConfig::processed(); @@ -214,12 +229,14 @@ async fn main() -> anyhow::Result<()> { .jupiter_v4_url(cli.jupiter_v4_url) .jupiter_v6_url(cli.jupiter_v6_url) .jupiter_token(cli.jupiter_token) - .transaction_builder_config(TransactionBuilderConfig { - prioritization_micro_lamports: (cli.prioritization_micro_lamports > 0) - .then_some(cli.prioritization_micro_lamports), - // Liquidation and tcs triggers set their own budgets, this is a default for other tx - compute_budget_per_instruction: Some(250_000), - }) + .transaction_builder_config( + TransactionBuilderConfig::builder() + .priority_fee_provider(prio_provider) + // Liquidation and tcs triggers set their own budgets, this is a default for other tx + .compute_budget_per_instruction(Some(250_000)) + .build() + .unwrap(), + ) .override_send_transaction_urls(cli.override_send_transaction_url) .build() .unwrap(); @@ -584,6 +601,7 @@ async fn main() -> anyhow::Result<()> { check_changes_for_abort_job, ] .into_iter() + .chain(prio_jobs.into_iter()) .collect(); jobs.next().await; diff --git a/bin/liquidator/src/trigger_tcs.rs b/bin/liquidator/src/trigger_tcs.rs index c8b914698..7e3f00203 100644 --- a/bin/liquidator/src/trigger_tcs.rs +++ b/bin/liquidator/src/trigger_tcs.rs @@ -14,7 +14,7 @@ use mango_v4::{ use mango_v4_client::{chain_data, jupiter, MangoClient, TransactionBuilder}; use anyhow::Context as AnyhowContext; -use solana_sdk::{signature::Signature, signer::Signer}; +use solana_sdk::signature::Signature; use tracing::*; use {fixed::types::I80F48, solana_sdk::pubkey::Pubkey}; @@ -1168,10 +1168,8 @@ impl Context { let fee_payer = self.mango_client.client.fee_payer(); TransactionBuilder { instructions: vec![compute_ix], - address_lookup_tables: vec![], - payer: fee_payer.pubkey(), signers: vec![self.mango_client.owner.clone(), fee_payer], - config: self.mango_client.client.config().transaction_builder_config, + ..self.mango_client.transaction_builder().await? } }; diff --git a/bin/settler/src/main.rs b/bin/settler/src/main.rs index 31391a81b..57f408a21 100644 --- a/bin/settler/src/main.rs +++ b/bin/settler/src/main.rs @@ -6,8 +6,8 @@ use anchor_client::Cluster; use clap::Parser; use mango_v4::state::{PerpMarketIndex, TokenIndex}; use mango_v4_client::{ - account_update_stream, chain_data, keypair_from_cli, snapshot_source, websocket_source, Client, - MangoClient, MangoGroupContext, TransactionBuilderConfig, + account_update_stream, chain_data, keypair_from_cli, priority_fees_cli, snapshot_source, + websocket_source, Client, MangoClient, MangoGroupContext, TransactionBuilderConfig, }; use tracing::*; @@ -61,9 +61,12 @@ struct Cli { #[clap(long, env, default_value = "100")] get_multiple_accounts_count: usize, - /// prioritize each transaction with this many microlamports/cu - #[clap(long, env, default_value = "0")] - prioritization_micro_lamports: u64, + #[clap(flatten)] + prioritization_fee_cli: priority_fees_cli::PriorityFeeArgs, + + /// url to the lite-rpc websocket, optional + #[clap(long, env, default_value = "")] + lite_rpc_url: String, /// compute budget for each instruction #[clap(long, env, default_value = "250000")] @@ -87,6 +90,10 @@ async fn main() -> anyhow::Result<()> { }; let cli = Cli::parse_from(args); + let (prio_provider, prio_jobs) = cli + .prioritization_fee_cli + .make_prio_provider(cli.lite_rpc_url.clone())?; + let settler_owner = Arc::new(keypair_from_cli(&cli.settler_owner)); let rpc_url = cli.rpc_url; @@ -100,11 +107,11 @@ async fn main() -> anyhow::Result<()> { commitment, settler_owner.clone(), Some(rpc_timeout), - TransactionBuilderConfig { - prioritization_micro_lamports: (cli.prioritization_micro_lamports > 0) - .then_some(cli.prioritization_micro_lamports), - compute_budget_per_instruction: Some(cli.compute_budget_per_instruction), - }, + TransactionBuilderConfig::builder() + .compute_budget_per_instruction(Some(cli.compute_budget_per_instruction)) + .priority_fee_provider(prio_provider) + .build() + .unwrap(), ); // The representation of current on-chain account data @@ -352,6 +359,7 @@ async fn main() -> anyhow::Result<()> { check_changes_for_abort_job, ] .into_iter() + .chain(prio_jobs.into_iter()) .collect(); jobs.next().await; diff --git a/bin/settler/src/settle.rs b/bin/settler/src/settle.rs index 9d66f9214..edd84c700 100644 --- a/bin/settler/src/settle.rs +++ b/bin/settler/src/settle.rs @@ -284,7 +284,7 @@ impl<'a> SettleBatchProcessor<'a> { address_lookup_tables: self.address_lookup_tables.clone(), payer: fee_payer.pubkey(), signers: vec![fee_payer], - config: client.config().transaction_builder_config, + config: client.config().transaction_builder_config.clone(), } .transaction_with_blockhash(self.blockhash) } diff --git a/lib/client/Cargo.toml b/lib/client/Cargo.toml index b986fd0bf..cc1c29a9d 100644 --- a/lib/client/Cargo.toml +++ b/lib/client/Cargo.toml @@ -15,6 +15,7 @@ async-channel = "1.6" async-once-cell = { version = "0.4.2", features = ["unpin"] } async-trait = "0.1.52" atty = "0.2" +clap = { version = "3.1.8", features = ["derive", "env"] } derive_builder = "0.12.0" fixed = { workspace = true, features = ["serde", "borsh"] } futures = "0.3.25" @@ -38,6 +39,7 @@ thiserror = "1.0.31" reqwest = "0.11.17" tokio = { version = "1", features = ["full"] } tokio-stream = { version = "0.1.9"} +tokio-tungstenite = "0.17.0" serde = "1.0.141" serde_json = "1.0.82" base64 = "0.13.0" diff --git a/lib/client/src/client.rs b/lib/client/src/client.rs index a3a5d0df7..b1584d197 100644 --- a/lib/client/src/client.rs +++ b/lib/client/src/client.rs @@ -29,6 +29,7 @@ use crate::confirm_transaction::{wait_for_transaction_confirmation, RpcConfirmTr use crate::context::MangoGroupContext; use crate::gpa::{fetch_anchor_account, fetch_mango_accounts}; use crate::health_cache; +use crate::priority_fees::{FixedPriorityFeeProvider, PriorityFeeProvider}; use crate::util::PreparedInstructions; use crate::{jupiter, util}; use solana_address_lookup_table_program::state::AddressLookupTable; @@ -53,7 +54,7 @@ use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signer::Si pub const MAX_ACCOUNTS_PER_TRANSACTION: usize = 64; // very close to anchor_client::Client, which unfortunately has no accessors or Clone -#[derive(Clone, Debug, Builder)] +#[derive(Clone, Builder)] #[builder(name = "ClientBuilder", build_fn(name = "build_config"))] pub struct ClientConfig { /// RPC url @@ -376,7 +377,7 @@ impl MangoClient { address_lookup_tables: vec![], payer: payer.pubkey(), signers: vec![owner, payer], - config: client.config.transaction_builder_config, + config: client.config.transaction_builder_config.clone(), } .send_and_confirm(&client) .await?; @@ -1858,7 +1859,7 @@ impl MangoClient { address_lookup_tables: self.mango_address_lookup_tables().await?, payer: fee_payer.pubkey(), signers: vec![fee_payer], - config: self.client.config.transaction_builder_config, + config: self.client.config.transaction_builder_config.clone(), }) } @@ -1872,7 +1873,7 @@ impl MangoClient { address_lookup_tables: vec![], payer: fee_payer.pubkey(), signers: vec![fee_payer], - config: self.client.config.transaction_builder_config, + config: self.client.config.transaction_builder_config.clone(), } .simulate(&self.client) .await @@ -1951,14 +1952,30 @@ impl Default for FallbackOracleConfig { } } -#[derive(Copy, Clone, Debug, Default)] +#[derive(Clone, Default, Builder)] pub struct TransactionBuilderConfig { /// adds a SetComputeUnitPrice instruction in front if none exists - pub prioritization_micro_lamports: Option, + pub priority_fee_provider: Option>, /// adds a SetComputeUnitBudget instruction if none exists pub compute_budget_per_instruction: Option, } +impl TransactionBuilderConfig { + pub fn builder() -> TransactionBuilderConfigBuilder { + TransactionBuilderConfigBuilder::default() + } +} + +impl TransactionBuilderConfigBuilder { + pub fn prioritization_micro_lamports(&mut self, cu: Option) -> &mut Self { + self.priority_fee_provider( + cu.map(|cu| { + Arc::new(FixedPriorityFeeProvider::new(cu)) as Arc + }), + ) + } +} + pub struct TransactionBuilder { pub instructions: Vec, pub address_lookup_tables: Vec, @@ -2012,7 +2029,12 @@ impl TransactionBuilder { ); } - let cu_prio = self.config.prioritization_micro_lamports.unwrap_or(0); + let cu_prio = self + .config + .priority_fee_provider + .as_ref() + .map(|provider| provider.compute_unit_fee_microlamports()) + .unwrap_or(0); if !has_compute_unit_price && cu_prio > 0 { ixs.insert(0, ComputeBudgetInstruction::set_compute_unit_price(cu_prio)); } diff --git a/lib/client/src/jupiter/v4.rs b/lib/client/src/jupiter/v4.rs index 85bbb6eea..2c6dfb271 100644 --- a/lib/client/src/jupiter/v4.rs +++ b/lib/client/src/jupiter/v4.rs @@ -338,7 +338,12 @@ impl<'a> JupiterV4<'a> { address_lookup_tables, payer, signers: vec![self.mango_client.owner.clone()], - config: self.mango_client.client.config().transaction_builder_config, + config: self + .mango_client + .client + .config() + .transaction_builder_config + .clone(), }) } diff --git a/lib/client/src/jupiter/v6.rs b/lib/client/src/jupiter/v6.rs index 1d79371d9..6c73fc741 100644 --- a/lib/client/src/jupiter/v6.rs +++ b/lib/client/src/jupiter/v6.rs @@ -388,7 +388,12 @@ impl<'a> JupiterV6<'a> { address_lookup_tables, payer, signers: vec![self.mango_client.owner.clone()], - config: self.mango_client.client.config().transaction_builder_config, + config: self + .mango_client + .client + .config() + .transaction_builder_config + .clone(), }) } diff --git a/lib/client/src/lib.rs b/lib/client/src/lib.rs index a584630fa..882a931f6 100644 --- a/lib/client/src/lib.rs +++ b/lib/client/src/lib.rs @@ -15,6 +15,8 @@ pub mod gpa; pub mod health_cache; pub mod jupiter; pub mod perp_pnl; +pub mod priority_fees; +pub mod priority_fees_cli; pub mod snapshot_source; mod util; pub mod websocket_source; diff --git a/lib/client/src/priority_fees.rs b/lib/client/src/priority_fees.rs new file mode 100644 index 000000000..179fc0361 --- /dev/null +++ b/lib/client/src/priority_fees.rs @@ -0,0 +1,240 @@ +use futures::{SinkExt, StreamExt}; +use jsonrpc_core::{MethodCall, Notification, Params, Version}; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; +use tokio::sync::broadcast; +use tokio::task::JoinHandle; +use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::protocol::Message; +use tracing::*; + +pub trait PriorityFeeProvider: Sync + Send { + fn compute_unit_fee_microlamports(&self) -> u64; +} + +pub struct FixedPriorityFeeProvider { + pub compute_unit_fee_microlamports: u64, +} + +impl FixedPriorityFeeProvider { + pub fn new(fee_microlamports: u64) -> Self { + Self { + compute_unit_fee_microlamports: fee_microlamports, + } + } +} + +impl PriorityFeeProvider for FixedPriorityFeeProvider { + fn compute_unit_fee_microlamports(&self) -> u64 { + self.compute_unit_fee_microlamports + } +} + +#[derive(Builder)] +pub struct EmaPriorityFeeProviderConfig { + pub percentile: u8, + + #[builder(default = "0.2")] + pub alpha: f64, + + pub fallback_prio: u64, + + #[builder(default = "Duration::from_secs(15)")] + pub max_age: Duration, +} + +impl EmaPriorityFeeProviderConfig { + pub fn builder() -> EmaPriorityFeeProviderConfigBuilder { + EmaPriorityFeeProviderConfigBuilder::default() + } +} + +#[derive(Default)] +struct CuPercentileEmaPriorityFeeProviderData { + ema: f64, + last_update: Option, +} + +pub struct CuPercentileEmaPriorityFeeProvider { + data: RwLock, + config: EmaPriorityFeeProviderConfig, +} + +impl PriorityFeeProvider for CuPercentileEmaPriorityFeeProvider { + fn compute_unit_fee_microlamports(&self) -> u64 { + let data = self.data.read().unwrap(); + if let Some(last_update) = data.last_update { + if Instant::now().duration_since(last_update) > self.config.max_age { + return self.config.fallback_prio; + } + } else { + return self.config.fallback_prio; + } + data.ema as u64 + } +} + +impl CuPercentileEmaPriorityFeeProvider { + pub fn run( + config: EmaPriorityFeeProviderConfig, + sender: &broadcast::Sender, + ) -> (Arc, JoinHandle<()>) { + let this = Arc::new(Self { + data: Default::default(), + config, + }); + let handle = tokio::spawn({ + let this_c = this.clone(); + let rx = sender.subscribe(); + async move { Self::run_update_job(this_c, rx).await } + }); + (this, handle) + } + + async fn run_update_job(provider: Arc, mut rx: broadcast::Receiver) { + let config = &provider.config; + loop { + let block_prios = rx.recv().await.unwrap(); + let prio = match block_prios.by_cu_percentile.get(&config.percentile) { + Some(v) => *v as f64, + None => { + error!("percentile not available: {}", config.percentile); + continue; + } + }; + + let mut data = provider.data.write().unwrap(); + data.ema = data.ema * (1.0 - config.alpha) + config.alpha * prio; + data.last_update = Some(Instant::now()); + } + } +} + +#[derive(Clone, Default, Debug)] +pub struct BlockPrioFees { + pub slot: u64, + // prio fee percentile in percent -> prio fee + pub percentile: HashMap, + // cu percentile in percent -> median prio fee of the group + pub by_cu_percentile: HashMap, +} + +#[derive(serde::Deserialize)] +struct BlockPrioritizationFeesNotificationContext { + slot: u64, +} + +#[derive(serde::Deserialize)] +struct BlockPrioritizationFeesNotificationValue { + by_tx: Vec, + by_tx_percentiles: Vec, + by_cu: Vec, + by_cu_percentiles: Vec, +} + +#[derive(serde::Deserialize)] +struct BlockPrioritizationFeesNotificationParams { + context: BlockPrioritizationFeesNotificationContext, + value: BlockPrioritizationFeesNotificationValue, +} + +fn as_block_prioritization_fees_notification( + notification_str: &str, +) -> anyhow::Result> { + let notification: Notification = match serde_json::from_str(¬ification_str) { + Ok(v) => v, + Err(_) => return Ok(None), // not a notification at all + }; + if notification.method != "blockPrioritizationFeesNotification" { + return Ok(None); + } + let map = match notification.params { + Params::Map(m) => m, + _ => anyhow::bail!("unexpected params, expected map"), + }; + let result = map + .get("result") + .ok_or(anyhow::anyhow!("missing params.result"))? + .clone(); + + let mut data = BlockPrioFees::default(); + let v: BlockPrioritizationFeesNotificationParams = serde_json::from_value(result)?; + data.slot = v.context.slot; + for (percentile, prio) in v.value.by_tx_percentiles.iter().zip(v.value.by_tx.iter()) { + let int_perc: u8 = ((percentile * 100.0) as u64).try_into()?; + data.percentile.insert(int_perc, *prio); + } + for (percentile, prio) in v.value.by_cu_percentiles.iter().zip(v.value.by_cu.iter()) { + let int_perc: u8 = ((percentile * 100.0) as u64).try_into()?; + data.by_cu_percentile.insert(int_perc, *prio); + } + + Ok(Some(data)) +} + +async fn connect_and_broadcast( + url: &str, + sender: &broadcast::Sender, +) -> anyhow::Result<()> { + let (ws_stream, _) = connect_async(url).await?; + let (mut write, mut read) = ws_stream.split(); + + // Create a JSON-RPC request + let call = MethodCall { + jsonrpc: Some(Version::V2), + method: "blockPrioritizationFeesSubscribe".to_string(), + params: Params::None, + id: jsonrpc_core::Id::Num(1), + }; + + let request = serde_json::to_string(&call).unwrap(); + write.send(Message::Text(request)).await?; + + loop { + let timeout = tokio::time::sleep(Duration::from_secs(20)); + tokio::select! { + message = read.next() => { + match message { + Some(Ok(Message::Text(text))) => { + if let Some(block_prio) = as_block_prioritization_fees_notification(&text)? { + // Failure might just mean there is no receiver right now + let _ = sender.send(block_prio); + } + } + Some(Ok(Message::Ping(..))) => {} + Some(Ok(Message::Pong(..))) => {} + Some(Ok(msg @ _)) => { + anyhow::bail!("received a non-text message: {:?}", msg); + }, + Some(Err(e)) => { + anyhow::bail!("error receiving message: {}", e); + } + None => { + anyhow::bail!("websocket stream closed"); + } + } + }, + _ = timeout => { + anyhow::bail!("timeout"); + } + } + } +} + +async fn connect_and_broadcast_loop(url: &str, sender: broadcast::Sender) { + loop { + if let Err(err) = connect_and_broadcast(url, &sender).await { + info!("recent block prio feed error, restarting: {err:?}"); + } + } +} + +pub fn run_broadcast_from_websocket_feed( + url: String, +) -> (broadcast::Sender, JoinHandle<()>) { + let (sender, _) = broadcast::channel(10); + let sender_c = sender.clone(); + let handle = tokio::spawn(async move { connect_and_broadcast_loop(&url, sender_c).await }); + (sender, handle) +} diff --git a/lib/client/src/priority_fees_cli.rs b/lib/client/src/priority_fees_cli.rs new file mode 100644 index 000000000..c2f44bc34 --- /dev/null +++ b/lib/client/src/priority_fees_cli.rs @@ -0,0 +1,80 @@ +use std::sync::Arc; +use tokio::task::JoinHandle; +use tracing::*; + +use crate::priority_fees::*; + +#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, Eq)] +enum PriorityFeeStyleArg { + None, + Fixed, + LiteRpcCuPercentileEma, +} + +#[derive(clap::Args, Debug, Clone)] +pub struct PriorityFeeArgs { + /// choose prio fee style + #[clap(long, env, value_enum, default_value = "none")] + prioritization_style: PriorityFeeStyleArg, + + /// prioritize each transaction with this many microlamports/cu + /// + /// for dynamic prio styles, this is the fallback value + #[clap(long, env, default_value = "0")] + prioritization_micro_lamports: u64, + + #[clap(long, env, default_value = "50")] + prioritization_ema_percentile: u8, + + #[clap(long, env, default_value = "0.2")] + prioritization_ema_alpha: f64, +} + +impl PriorityFeeArgs { + pub fn make_prio_provider( + &self, + lite_rpc_url: String, + ) -> anyhow::Result<(Option>, Vec>)> { + let prio_style; + if self.prioritization_micro_lamports > 0 + && self.prioritization_style == PriorityFeeStyleArg::None + { + info!("forcing prioritization-style to fixed, since prioritization-micro-lamports was set"); + prio_style = PriorityFeeStyleArg::Fixed; + } else { + prio_style = self.prioritization_style; + } + + Ok(match prio_style { + PriorityFeeStyleArg::None => (None, vec![]), + PriorityFeeStyleArg::Fixed => ( + Some(Arc::new(FixedPriorityFeeProvider::new( + self.prioritization_micro_lamports, + ))), + vec![], + ), + PriorityFeeStyleArg::LiteRpcCuPercentileEma => { + if lite_rpc_url.is_empty() { + anyhow::bail!("cannot use recent-cu-percentile-ema prioritization style without a lite-rpc url"); + } + let (block_prio_broadcaster, block_prio_job) = + run_broadcast_from_websocket_feed(lite_rpc_url); + let (prio_fee_provider, prio_fee_provider_job) = + CuPercentileEmaPriorityFeeProvider::run( + EmaPriorityFeeProviderConfig::builder() + .percentile(75) + .fallback_prio(self.prioritization_micro_lamports) + .alpha(self.prioritization_ema_alpha) + .percentile(self.prioritization_ema_percentile) + .build() + .unwrap(), + &block_prio_broadcaster, + ); + ( + Some(prio_fee_provider), + vec![block_prio_job, prio_fee_provider_job], + ) + } + }) + } +}