rust client: optionally derive prio fees from feed (#866)

This allows connecting to a lite-rpc feed to receive block priority
updates and computing recently good priority fee values based on that.
This commit is contained in:
Christian Kamm 2024-02-07 12:52:32 +01:00 committed by GitHub
parent d9a9c7d664
commit ea91d9d353
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 456 additions and 53 deletions

2
Cargo.lock generated
View File

@ -3445,6 +3445,7 @@ dependencies = [
"atty",
"base64 0.13.1",
"bincode",
"clap 3.2.25",
"derive_builder",
"fixed 1.11.0 (git+https://github.com/blockworks-foundation/fixed.git?branch=v1.11.0-borsh0_10-mango)",
"futures 0.3.28",
@ -3469,6 +3470,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-stream",
"tokio-tungstenite 0.17.2",
"tracing",
"tracing-subscriber",
]

View File

@ -137,10 +137,13 @@ impl Rpc {
.cluster(anchor_client::Cluster::from_str(&self.url)?)
.commitment(solana_sdk::commitment_config::CommitmentConfig::confirmed())
.fee_payer(Some(Arc::new(fee_payer)))
.transaction_builder_config(TransactionBuilderConfig {
prioritization_micro_lamports: Some(5),
compute_budget_per_instruction: Some(250_000),
})
.transaction_builder_config(
TransactionBuilderConfig::builder()
.prioritization_micro_lamports(Some(5))
.compute_budget_per_instruction(Some(250_000))
.build()
.unwrap(),
)
.build()
.unwrap())
}

View File

@ -12,6 +12,7 @@ use solana_sdk::{
instruction::{AccountMeta, Instruction},
pubkey::Pubkey,
};
use tokio::task::JoinHandle;
use tracing::*;
use warp::Filter;
@ -80,6 +81,7 @@ pub async fn runner(
interval_consume_events: u64,
interval_update_funding: u64,
interval_check_for_changes_and_abort: u64,
extra_jobs: Vec<JoinHandle<()>>,
) -> Result<(), anyhow::Error> {
let handles1 = mango_client
.context
@ -144,6 +146,7 @@ pub async fn runner(
),
serve_metrics(),
debugging_handle,
futures::future::join_all(extra_jobs),
);
Ok(())

View File

@ -8,7 +8,8 @@ use anchor_client::Cluster;
use clap::{Parser, Subcommand};
use mango_v4_client::{
keypair_from_cli, Client, FallbackOracleConfig, MangoClient, TransactionBuilderConfig,
keypair_from_cli, priority_fees_cli, Client, FallbackOracleConfig, MangoClient,
TransactionBuilderConfig,
};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
@ -63,9 +64,12 @@ struct Cli {
#[clap(long, env, default_value_t = 10)]
timeout: u64,
/// prioritize each transaction with this many microlamports/cu
#[clap(long, env, default_value = "0")]
prioritization_micro_lamports: u64,
#[clap(flatten)]
prioritization_fee_cli: priority_fees_cli::PriorityFeeArgs,
/// url to the lite-rpc websocket, optional
#[clap(long, env, default_value = "")]
lite_rpc_url: String,
}
#[derive(Subcommand, Debug, Clone)]
@ -87,6 +91,10 @@ async fn main() -> Result<(), anyhow::Error> {
};
let cli = Cli::parse_from(args);
let (prio_provider, prio_jobs) = cli
.prioritization_fee_cli
.make_prio_provider(cli.lite_rpc_url.clone())?;
let owner = Arc::new(keypair_from_cli(&cli.owner));
let rpc_url = cli.rpc_url;
@ -105,11 +113,13 @@ async fn main() -> Result<(), anyhow::Error> {
.commitment(commitment)
.fee_payer(Some(owner.clone()))
.timeout(Duration::from_secs(cli.timeout))
.transaction_builder_config(TransactionBuilderConfig {
prioritization_micro_lamports: (cli.prioritization_micro_lamports > 0)
.then_some(cli.prioritization_micro_lamports),
compute_budget_per_instruction: None,
})
.transaction_builder_config(
TransactionBuilderConfig::builder()
.priority_fee_provider(prio_provider)
.compute_budget_per_instruction(None)
.build()
.unwrap(),
)
.fallback_oracle_config(FallbackOracleConfig::Never)
.build()
.unwrap(),
@ -143,12 +153,13 @@ async fn main() -> Result<(), anyhow::Error> {
cli.interval_consume_events,
cli.interval_update_funding,
cli.interval_check_new_listings_and_abort,
prio_jobs,
)
.await
}
Command::Taker { .. } => {
let client = mango_client.clone();
taker::runner(client, debugging_handle).await
taker::runner(client, debugging_handle, prio_jobs).await
}
}
}

View File

@ -10,13 +10,15 @@ use mango_v4::{
accounts_ix::{Serum3OrderType, Serum3SelfTradeBehavior, Serum3Side},
state::TokenIndex,
};
use tokio::task::JoinHandle;
use tracing::*;
use crate::MangoClient;
pub async fn runner(
mango_client: Arc<MangoClient>,
_debugging_handle: impl Future,
debugging_handle: impl Future,
extra_jobs: Vec<JoinHandle<()>>,
) -> Result<(), anyhow::Error> {
ensure_deposit(&mango_client).await?;
ensure_oo(&mango_client).await?;
@ -53,7 +55,9 @@ pub async fn runner(
futures::join!(
futures::future::join_all(handles1),
futures::future::join_all(handles2)
futures::future::join_all(handles2),
debugging_handle,
futures::future::join_all(extra_jobs),
);
Ok(())

View File

@ -7,6 +7,7 @@ use anchor_client::Cluster;
use anyhow::Context;
use clap::Parser;
use mango_v4::state::{PerpMarketIndex, TokenIndex};
use mango_v4_client::priority_fees_cli;
use mango_v4_client::AsyncChannelSendUnlessFull;
use mango_v4_client::{
account_update_stream, chain_data, error_tracking::ErrorTracking, jupiter, keypair_from_cli,
@ -148,9 +149,12 @@ struct Cli {
#[clap(long, env, value_enum, default_value = "swap-sell-into-buy")]
tcs_mode: TcsMode,
/// prioritize each transaction with this many microlamports/cu
#[clap(long, env, default_value = "0")]
prioritization_micro_lamports: u64,
#[clap(flatten)]
prioritization_fee_cli: priority_fees_cli::PriorityFeeArgs,
/// url to the lite-rpc websocket, optional
#[clap(long, env, default_value = "")]
lite_rpc_url: String,
/// compute limit requested for liquidation instructions
#[clap(long, env, default_value = "250000")]
@ -189,20 +193,31 @@ pub fn encode_address(addr: &Pubkey) -> String {
async fn main() -> anyhow::Result<()> {
mango_v4_client::tracing_subscriber_init();
let args = if let Ok(cli_dotenv) = CliDotenv::try_parse() {
let args: Vec<std::ffi::OsString> = if let Ok(cli_dotenv) = CliDotenv::try_parse() {
dotenv::from_path(cli_dotenv.dotenv)?;
cli_dotenv.remaining_args
std::env::args_os()
.take(1)
.chain(cli_dotenv.remaining_args.into_iter())
.collect()
} else {
dotenv::dotenv().ok();
std::env::args_os().collect()
};
let cli = Cli::parse_from(args);
let liqor_owner = Arc::new(keypair_from_cli(&cli.liqor_owner));
//
// Priority fee setup
//
let (prio_provider, prio_jobs) = cli
.prioritization_fee_cli
.make_prio_provider(cli.lite_rpc_url.clone())?;
//
// Client setup
//
let liqor_owner = Arc::new(keypair_from_cli(&cli.liqor_owner));
let rpc_url = cli.rpc_url;
let ws_url = rpc_url.replace("https", "wss");
let rpc_timeout = Duration::from_secs(10);
let cluster = Cluster::Custom(rpc_url.clone(), ws_url.clone());
let commitment = CommitmentConfig::processed();
@ -214,12 +229,14 @@ async fn main() -> anyhow::Result<()> {
.jupiter_v4_url(cli.jupiter_v4_url)
.jupiter_v6_url(cli.jupiter_v6_url)
.jupiter_token(cli.jupiter_token)
.transaction_builder_config(TransactionBuilderConfig {
prioritization_micro_lamports: (cli.prioritization_micro_lamports > 0)
.then_some(cli.prioritization_micro_lamports),
// Liquidation and tcs triggers set their own budgets, this is a default for other tx
compute_budget_per_instruction: Some(250_000),
})
.transaction_builder_config(
TransactionBuilderConfig::builder()
.priority_fee_provider(prio_provider)
// Liquidation and tcs triggers set their own budgets, this is a default for other tx
.compute_budget_per_instruction(Some(250_000))
.build()
.unwrap(),
)
.override_send_transaction_urls(cli.override_send_transaction_url)
.build()
.unwrap();
@ -584,6 +601,7 @@ async fn main() -> anyhow::Result<()> {
check_changes_for_abort_job,
]
.into_iter()
.chain(prio_jobs.into_iter())
.collect();
jobs.next().await;

View File

@ -14,7 +14,7 @@ use mango_v4::{
use mango_v4_client::{chain_data, jupiter, MangoClient, TransactionBuilder};
use anyhow::Context as AnyhowContext;
use solana_sdk::{signature::Signature, signer::Signer};
use solana_sdk::signature::Signature;
use tracing::*;
use {fixed::types::I80F48, solana_sdk::pubkey::Pubkey};
@ -1168,10 +1168,8 @@ impl Context {
let fee_payer = self.mango_client.client.fee_payer();
TransactionBuilder {
instructions: vec![compute_ix],
address_lookup_tables: vec![],
payer: fee_payer.pubkey(),
signers: vec![self.mango_client.owner.clone(), fee_payer],
config: self.mango_client.client.config().transaction_builder_config,
..self.mango_client.transaction_builder().await?
}
};

View File

@ -6,8 +6,8 @@ use anchor_client::Cluster;
use clap::Parser;
use mango_v4::state::{PerpMarketIndex, TokenIndex};
use mango_v4_client::{
account_update_stream, chain_data, keypair_from_cli, snapshot_source, websocket_source, Client,
MangoClient, MangoGroupContext, TransactionBuilderConfig,
account_update_stream, chain_data, keypair_from_cli, priority_fees_cli, snapshot_source,
websocket_source, Client, MangoClient, MangoGroupContext, TransactionBuilderConfig,
};
use tracing::*;
@ -61,9 +61,12 @@ struct Cli {
#[clap(long, env, default_value = "100")]
get_multiple_accounts_count: usize,
/// prioritize each transaction with this many microlamports/cu
#[clap(long, env, default_value = "0")]
prioritization_micro_lamports: u64,
#[clap(flatten)]
prioritization_fee_cli: priority_fees_cli::PriorityFeeArgs,
/// url to the lite-rpc websocket, optional
#[clap(long, env, default_value = "")]
lite_rpc_url: String,
/// compute budget for each instruction
#[clap(long, env, default_value = "250000")]
@ -87,6 +90,10 @@ async fn main() -> anyhow::Result<()> {
};
let cli = Cli::parse_from(args);
let (prio_provider, prio_jobs) = cli
.prioritization_fee_cli
.make_prio_provider(cli.lite_rpc_url.clone())?;
let settler_owner = Arc::new(keypair_from_cli(&cli.settler_owner));
let rpc_url = cli.rpc_url;
@ -100,11 +107,11 @@ async fn main() -> anyhow::Result<()> {
commitment,
settler_owner.clone(),
Some(rpc_timeout),
TransactionBuilderConfig {
prioritization_micro_lamports: (cli.prioritization_micro_lamports > 0)
.then_some(cli.prioritization_micro_lamports),
compute_budget_per_instruction: Some(cli.compute_budget_per_instruction),
},
TransactionBuilderConfig::builder()
.compute_budget_per_instruction(Some(cli.compute_budget_per_instruction))
.priority_fee_provider(prio_provider)
.build()
.unwrap(),
);
// The representation of current on-chain account data
@ -352,6 +359,7 @@ async fn main() -> anyhow::Result<()> {
check_changes_for_abort_job,
]
.into_iter()
.chain(prio_jobs.into_iter())
.collect();
jobs.next().await;

View File

@ -284,7 +284,7 @@ impl<'a> SettleBatchProcessor<'a> {
address_lookup_tables: self.address_lookup_tables.clone(),
payer: fee_payer.pubkey(),
signers: vec![fee_payer],
config: client.config().transaction_builder_config,
config: client.config().transaction_builder_config.clone(),
}
.transaction_with_blockhash(self.blockhash)
}

View File

@ -15,6 +15,7 @@ async-channel = "1.6"
async-once-cell = { version = "0.4.2", features = ["unpin"] }
async-trait = "0.1.52"
atty = "0.2"
clap = { version = "3.1.8", features = ["derive", "env"] }
derive_builder = "0.12.0"
fixed = { workspace = true, features = ["serde", "borsh"] }
futures = "0.3.25"
@ -38,6 +39,7 @@ thiserror = "1.0.31"
reqwest = "0.11.17"
tokio = { version = "1", features = ["full"] }
tokio-stream = { version = "0.1.9"}
tokio-tungstenite = "0.17.0"
serde = "1.0.141"
serde_json = "1.0.82"
base64 = "0.13.0"

View File

@ -29,6 +29,7 @@ use crate::confirm_transaction::{wait_for_transaction_confirmation, RpcConfirmTr
use crate::context::MangoGroupContext;
use crate::gpa::{fetch_anchor_account, fetch_mango_accounts};
use crate::health_cache;
use crate::priority_fees::{FixedPriorityFeeProvider, PriorityFeeProvider};
use crate::util::PreparedInstructions;
use crate::{jupiter, util};
use solana_address_lookup_table_program::state::AddressLookupTable;
@ -53,7 +54,7 @@ use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signer::Si
pub const MAX_ACCOUNTS_PER_TRANSACTION: usize = 64;
// very close to anchor_client::Client, which unfortunately has no accessors or Clone
#[derive(Clone, Debug, Builder)]
#[derive(Clone, Builder)]
#[builder(name = "ClientBuilder", build_fn(name = "build_config"))]
pub struct ClientConfig {
/// RPC url
@ -376,7 +377,7 @@ impl MangoClient {
address_lookup_tables: vec![],
payer: payer.pubkey(),
signers: vec![owner, payer],
config: client.config.transaction_builder_config,
config: client.config.transaction_builder_config.clone(),
}
.send_and_confirm(&client)
.await?;
@ -1858,7 +1859,7 @@ impl MangoClient {
address_lookup_tables: self.mango_address_lookup_tables().await?,
payer: fee_payer.pubkey(),
signers: vec![fee_payer],
config: self.client.config.transaction_builder_config,
config: self.client.config.transaction_builder_config.clone(),
})
}
@ -1872,7 +1873,7 @@ impl MangoClient {
address_lookup_tables: vec![],
payer: fee_payer.pubkey(),
signers: vec![fee_payer],
config: self.client.config.transaction_builder_config,
config: self.client.config.transaction_builder_config.clone(),
}
.simulate(&self.client)
.await
@ -1951,14 +1952,30 @@ impl Default for FallbackOracleConfig {
}
}
#[derive(Copy, Clone, Debug, Default)]
#[derive(Clone, Default, Builder)]
pub struct TransactionBuilderConfig {
/// adds a SetComputeUnitPrice instruction in front if none exists
pub prioritization_micro_lamports: Option<u64>,
pub priority_fee_provider: Option<Arc<dyn PriorityFeeProvider>>,
/// adds a SetComputeUnitBudget instruction if none exists
pub compute_budget_per_instruction: Option<u32>,
}
impl TransactionBuilderConfig {
pub fn builder() -> TransactionBuilderConfigBuilder {
TransactionBuilderConfigBuilder::default()
}
}
impl TransactionBuilderConfigBuilder {
pub fn prioritization_micro_lamports(&mut self, cu: Option<u64>) -> &mut Self {
self.priority_fee_provider(
cu.map(|cu| {
Arc::new(FixedPriorityFeeProvider::new(cu)) as Arc<dyn PriorityFeeProvider>
}),
)
}
}
pub struct TransactionBuilder {
pub instructions: Vec<Instruction>,
pub address_lookup_tables: Vec<AddressLookupTableAccount>,
@ -2012,7 +2029,12 @@ impl TransactionBuilder {
);
}
let cu_prio = self.config.prioritization_micro_lamports.unwrap_or(0);
let cu_prio = self
.config
.priority_fee_provider
.as_ref()
.map(|provider| provider.compute_unit_fee_microlamports())
.unwrap_or(0);
if !has_compute_unit_price && cu_prio > 0 {
ixs.insert(0, ComputeBudgetInstruction::set_compute_unit_price(cu_prio));
}

View File

@ -338,7 +338,12 @@ impl<'a> JupiterV4<'a> {
address_lookup_tables,
payer,
signers: vec![self.mango_client.owner.clone()],
config: self.mango_client.client.config().transaction_builder_config,
config: self
.mango_client
.client
.config()
.transaction_builder_config
.clone(),
})
}

View File

@ -388,7 +388,12 @@ impl<'a> JupiterV6<'a> {
address_lookup_tables,
payer,
signers: vec![self.mango_client.owner.clone()],
config: self.mango_client.client.config().transaction_builder_config,
config: self
.mango_client
.client
.config()
.transaction_builder_config
.clone(),
})
}

View File

@ -15,6 +15,8 @@ pub mod gpa;
pub mod health_cache;
pub mod jupiter;
pub mod perp_pnl;
pub mod priority_fees;
pub mod priority_fees_cli;
pub mod snapshot_source;
mod util;
pub mod websocket_source;

View File

@ -0,0 +1,240 @@
use futures::{SinkExt, StreamExt};
use jsonrpc_core::{MethodCall, Notification, Params, Version};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::protocol::Message;
use tracing::*;
pub trait PriorityFeeProvider: Sync + Send {
fn compute_unit_fee_microlamports(&self) -> u64;
}
pub struct FixedPriorityFeeProvider {
pub compute_unit_fee_microlamports: u64,
}
impl FixedPriorityFeeProvider {
pub fn new(fee_microlamports: u64) -> Self {
Self {
compute_unit_fee_microlamports: fee_microlamports,
}
}
}
impl PriorityFeeProvider for FixedPriorityFeeProvider {
fn compute_unit_fee_microlamports(&self) -> u64 {
self.compute_unit_fee_microlamports
}
}
#[derive(Builder)]
pub struct EmaPriorityFeeProviderConfig {
pub percentile: u8,
#[builder(default = "0.2")]
pub alpha: f64,
pub fallback_prio: u64,
#[builder(default = "Duration::from_secs(15)")]
pub max_age: Duration,
}
impl EmaPriorityFeeProviderConfig {
pub fn builder() -> EmaPriorityFeeProviderConfigBuilder {
EmaPriorityFeeProviderConfigBuilder::default()
}
}
#[derive(Default)]
struct CuPercentileEmaPriorityFeeProviderData {
ema: f64,
last_update: Option<Instant>,
}
pub struct CuPercentileEmaPriorityFeeProvider {
data: RwLock<CuPercentileEmaPriorityFeeProviderData>,
config: EmaPriorityFeeProviderConfig,
}
impl PriorityFeeProvider for CuPercentileEmaPriorityFeeProvider {
fn compute_unit_fee_microlamports(&self) -> u64 {
let data = self.data.read().unwrap();
if let Some(last_update) = data.last_update {
if Instant::now().duration_since(last_update) > self.config.max_age {
return self.config.fallback_prio;
}
} else {
return self.config.fallback_prio;
}
data.ema as u64
}
}
impl CuPercentileEmaPriorityFeeProvider {
pub fn run(
config: EmaPriorityFeeProviderConfig,
sender: &broadcast::Sender<BlockPrioFees>,
) -> (Arc<Self>, JoinHandle<()>) {
let this = Arc::new(Self {
data: Default::default(),
config,
});
let handle = tokio::spawn({
let this_c = this.clone();
let rx = sender.subscribe();
async move { Self::run_update_job(this_c, rx).await }
});
(this, handle)
}
async fn run_update_job(provider: Arc<Self>, mut rx: broadcast::Receiver<BlockPrioFees>) {
let config = &provider.config;
loop {
let block_prios = rx.recv().await.unwrap();
let prio = match block_prios.by_cu_percentile.get(&config.percentile) {
Some(v) => *v as f64,
None => {
error!("percentile not available: {}", config.percentile);
continue;
}
};
let mut data = provider.data.write().unwrap();
data.ema = data.ema * (1.0 - config.alpha) + config.alpha * prio;
data.last_update = Some(Instant::now());
}
}
}
#[derive(Clone, Default, Debug)]
pub struct BlockPrioFees {
pub slot: u64,
// prio fee percentile in percent -> prio fee
pub percentile: HashMap<u8, u64>,
// cu percentile in percent -> median prio fee of the group
pub by_cu_percentile: HashMap<u8, u64>,
}
#[derive(serde::Deserialize)]
struct BlockPrioritizationFeesNotificationContext {
slot: u64,
}
#[derive(serde::Deserialize)]
struct BlockPrioritizationFeesNotificationValue {
by_tx: Vec<u64>,
by_tx_percentiles: Vec<f64>,
by_cu: Vec<u64>,
by_cu_percentiles: Vec<f64>,
}
#[derive(serde::Deserialize)]
struct BlockPrioritizationFeesNotificationParams {
context: BlockPrioritizationFeesNotificationContext,
value: BlockPrioritizationFeesNotificationValue,
}
fn as_block_prioritization_fees_notification(
notification_str: &str,
) -> anyhow::Result<Option<BlockPrioFees>> {
let notification: Notification = match serde_json::from_str(&notification_str) {
Ok(v) => v,
Err(_) => return Ok(None), // not a notification at all
};
if notification.method != "blockPrioritizationFeesNotification" {
return Ok(None);
}
let map = match notification.params {
Params::Map(m) => m,
_ => anyhow::bail!("unexpected params, expected map"),
};
let result = map
.get("result")
.ok_or(anyhow::anyhow!("missing params.result"))?
.clone();
let mut data = BlockPrioFees::default();
let v: BlockPrioritizationFeesNotificationParams = serde_json::from_value(result)?;
data.slot = v.context.slot;
for (percentile, prio) in v.value.by_tx_percentiles.iter().zip(v.value.by_tx.iter()) {
let int_perc: u8 = ((percentile * 100.0) as u64).try_into()?;
data.percentile.insert(int_perc, *prio);
}
for (percentile, prio) in v.value.by_cu_percentiles.iter().zip(v.value.by_cu.iter()) {
let int_perc: u8 = ((percentile * 100.0) as u64).try_into()?;
data.by_cu_percentile.insert(int_perc, *prio);
}
Ok(Some(data))
}
async fn connect_and_broadcast(
url: &str,
sender: &broadcast::Sender<BlockPrioFees>,
) -> anyhow::Result<()> {
let (ws_stream, _) = connect_async(url).await?;
let (mut write, mut read) = ws_stream.split();
// Create a JSON-RPC request
let call = MethodCall {
jsonrpc: Some(Version::V2),
method: "blockPrioritizationFeesSubscribe".to_string(),
params: Params::None,
id: jsonrpc_core::Id::Num(1),
};
let request = serde_json::to_string(&call).unwrap();
write.send(Message::Text(request)).await?;
loop {
let timeout = tokio::time::sleep(Duration::from_secs(20));
tokio::select! {
message = read.next() => {
match message {
Some(Ok(Message::Text(text))) => {
if let Some(block_prio) = as_block_prioritization_fees_notification(&text)? {
// Failure might just mean there is no receiver right now
let _ = sender.send(block_prio);
}
}
Some(Ok(Message::Ping(..))) => {}
Some(Ok(Message::Pong(..))) => {}
Some(Ok(msg @ _)) => {
anyhow::bail!("received a non-text message: {:?}", msg);
},
Some(Err(e)) => {
anyhow::bail!("error receiving message: {}", e);
}
None => {
anyhow::bail!("websocket stream closed");
}
}
},
_ = timeout => {
anyhow::bail!("timeout");
}
}
}
}
async fn connect_and_broadcast_loop(url: &str, sender: broadcast::Sender<BlockPrioFees>) {
loop {
if let Err(err) = connect_and_broadcast(url, &sender).await {
info!("recent block prio feed error, restarting: {err:?}");
}
}
}
pub fn run_broadcast_from_websocket_feed(
url: String,
) -> (broadcast::Sender<BlockPrioFees>, JoinHandle<()>) {
let (sender, _) = broadcast::channel(10);
let sender_c = sender.clone();
let handle = tokio::spawn(async move { connect_and_broadcast_loop(&url, sender_c).await });
(sender, handle)
}

View File

@ -0,0 +1,80 @@
use std::sync::Arc;
use tokio::task::JoinHandle;
use tracing::*;
use crate::priority_fees::*;
#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, Eq)]
enum PriorityFeeStyleArg {
None,
Fixed,
LiteRpcCuPercentileEma,
}
#[derive(clap::Args, Debug, Clone)]
pub struct PriorityFeeArgs {
/// choose prio fee style
#[clap(long, env, value_enum, default_value = "none")]
prioritization_style: PriorityFeeStyleArg,
/// prioritize each transaction with this many microlamports/cu
///
/// for dynamic prio styles, this is the fallback value
#[clap(long, env, default_value = "0")]
prioritization_micro_lamports: u64,
#[clap(long, env, default_value = "50")]
prioritization_ema_percentile: u8,
#[clap(long, env, default_value = "0.2")]
prioritization_ema_alpha: f64,
}
impl PriorityFeeArgs {
pub fn make_prio_provider(
&self,
lite_rpc_url: String,
) -> anyhow::Result<(Option<Arc<dyn PriorityFeeProvider>>, Vec<JoinHandle<()>>)> {
let prio_style;
if self.prioritization_micro_lamports > 0
&& self.prioritization_style == PriorityFeeStyleArg::None
{
info!("forcing prioritization-style to fixed, since prioritization-micro-lamports was set");
prio_style = PriorityFeeStyleArg::Fixed;
} else {
prio_style = self.prioritization_style;
}
Ok(match prio_style {
PriorityFeeStyleArg::None => (None, vec![]),
PriorityFeeStyleArg::Fixed => (
Some(Arc::new(FixedPriorityFeeProvider::new(
self.prioritization_micro_lamports,
))),
vec![],
),
PriorityFeeStyleArg::LiteRpcCuPercentileEma => {
if lite_rpc_url.is_empty() {
anyhow::bail!("cannot use recent-cu-percentile-ema prioritization style without a lite-rpc url");
}
let (block_prio_broadcaster, block_prio_job) =
run_broadcast_from_websocket_feed(lite_rpc_url);
let (prio_fee_provider, prio_fee_provider_job) =
CuPercentileEmaPriorityFeeProvider::run(
EmaPriorityFeeProviderConfig::builder()
.percentile(75)
.fallback_prio(self.prioritization_micro_lamports)
.alpha(self.prioritization_ema_alpha)
.percentile(self.prioritization_ema_percentile)
.build()
.unwrap(),
&block_prio_broadcaster,
);
(
Some(prio_fee_provider),
vec![block_prio_job, prio_fee_provider_job],
)
}
})
}
}