From 43b9cac3a1760b5bd3952e164ae89442662b3d44 Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Fri, 19 Jan 2024 16:35:30 +0100 Subject: [PATCH] Rust client: Allow sending transactions to multiple rpcs (#853) --- bin/cli/src/main.rs | 16 +-- bin/cli/src/save_snapshot.rs | 6 +- bin/liquidator/src/main.rs | 10 +- bin/liquidator/src/trigger_tcs.rs | 2 +- bin/service-mango-crank/src/main.rs | 2 +- bin/service-mango-fills/src/main.rs | 2 +- bin/service-mango-orderbook/src/main.rs | 2 +- bin/service-mango-pnl/src/main.rs | 4 +- bin/settler/src/main.rs | 4 +- bin/settler/src/settle.rs | 13 +-- lib/client/src/client.rs | 139 +++++++++++++++++++----- lib/client/src/jupiter/v4.rs | 12 +- lib/client/src/jupiter/v6.rs | 18 +-- 13 files changed, 156 insertions(+), 74 deletions(-) diff --git a/bin/cli/src/main.rs b/bin/cli/src/main.rs index 9dd1c5d84..a27e0245c 100644 --- a/bin/cli/src/main.rs +++ b/bin/cli/src/main.rs @@ -133,16 +133,16 @@ enum Command { impl Rpc { fn client(&self, override_fee_payer: Option<&str>) -> anyhow::Result { let fee_payer = keypair_from_cli(override_fee_payer.unwrap_or(&self.fee_payer)); - Ok(Client::new( - anchor_client::Cluster::from_str(&self.url)?, - solana_sdk::commitment_config::CommitmentConfig::confirmed(), - Arc::new(fee_payer), - None, - TransactionBuilderConfig { + Ok(Client::builder() + .cluster(anchor_client::Cluster::from_str(&self.url)?) + .commitment(solana_sdk::commitment_config::CommitmentConfig::confirmed()) + .fee_payer(Some(Arc::new(fee_payer))) + .transaction_builder_config(TransactionBuilderConfig { prioritization_micro_lamports: Some(5), compute_budget_per_instruction: Some(250_000), - }, - )) + }) + .build() + .unwrap()) } } diff --git a/bin/cli/src/save_snapshot.rs b/bin/cli/src/save_snapshot.rs index 50124b875..4c0c375eb 100644 --- a/bin/cli/src/save_snapshot.rs +++ b/bin/cli/src/save_snapshot.rs @@ -23,10 +23,10 @@ pub async fn save_snapshot( } fs::create_dir_all(out_path).unwrap(); - let rpc_url = client.cluster.url().to_string(); - let ws_url = client.cluster.ws_url().to_string(); + let rpc_url = client.config().cluster.url().to_string(); + let ws_url = client.config().cluster.ws_url().to_string(); - let group_context = MangoGroupContext::new_from_rpc(&client.rpc_async(), mango_group).await?; + let group_context = MangoGroupContext::new_from_rpc(client.rpc_async(), mango_group).await?; let oracles_and_vaults = group_context .tokens diff --git a/bin/liquidator/src/main.rs b/bin/liquidator/src/main.rs index e45d79ca3..205b05e74 100644 --- a/bin/liquidator/src/main.rs +++ b/bin/liquidator/src/main.rs @@ -93,6 +93,9 @@ struct Cli { #[clap(short, long, env)] rpc_url: String, + #[clap(long, env, value_delimiter = ';')] + override_send_transaction_url: Option>, + #[clap(long, env)] liqor_mango_account: Pubkey, @@ -207,7 +210,7 @@ async fn main() -> anyhow::Result<()> { .cluster(cluster.clone()) .commitment(commitment) .fee_payer(Some(liqor_owner.clone())) - .timeout(Some(rpc_timeout)) + .timeout(rpc_timeout) .jupiter_v4_url(cli.jupiter_v4_url) .jupiter_v6_url(cli.jupiter_v6_url) .jupiter_token(cli.jupiter_token) @@ -217,6 +220,7 @@ async fn main() -> anyhow::Result<()> { // Liquidation and tcs triggers set their own budgets, this is a default for other tx compute_budget_per_instruction: Some(250_000), }) + .override_send_transaction_urls(cli.override_send_transaction_url) .build() .unwrap(); @@ -225,7 +229,7 @@ async fn main() -> anyhow::Result<()> { // Reading accounts from chain_data let account_fetcher = Arc::new(chain_data::AccountFetcher { chain_data: chain_data.clone(), - rpc: client.rpc_async(), + rpc: client.new_rpc_async(), }); let mango_account = account_fetcher @@ -238,7 +242,7 @@ async fn main() -> anyhow::Result<()> { warn!("rebalancing on delegated accounts will be unable to free token positions reliably, withdraw dust manually"); } - let group_context = MangoGroupContext::new_from_rpc(&client.rpc_async(), mango_group).await?; + let group_context = MangoGroupContext::new_from_rpc(client.rpc_async(), mango_group).await?; let mango_oracles = group_context .tokens diff --git a/bin/liquidator/src/trigger_tcs.rs b/bin/liquidator/src/trigger_tcs.rs index 00ad4aadd..5f1a1caa6 100644 --- a/bin/liquidator/src/trigger_tcs.rs +++ b/bin/liquidator/src/trigger_tcs.rs @@ -1168,7 +1168,7 @@ impl Context { address_lookup_tables: vec![], payer: fee_payer.pubkey(), signers: vec![self.mango_client.owner.clone(), fee_payer], - config: self.mango_client.client.transaction_builder_config, + config: self.mango_client.client.config().transaction_builder_config, } }; diff --git a/bin/service-mango-crank/src/main.rs b/bin/service-mango-crank/src/main.rs index 1a36dc6e2..bb0c144a1 100644 --- a/bin/service-mango-crank/src/main.rs +++ b/bin/service-mango-crank/src/main.rs @@ -78,7 +78,7 @@ async fn main() -> anyhow::Result<()> { ); let group_pk = Pubkey::from_str(&config.mango_group).unwrap(); let group_context = - Arc::new(MangoGroupContext::new_from_rpc(&client.rpc_async(), group_pk).await?); + Arc::new(MangoGroupContext::new_from_rpc(client.rpc_async(), group_pk).await?); let perp_queue_pks: Vec<_> = group_context .perp_markets diff --git a/bin/service-mango-fills/src/main.rs b/bin/service-mango-fills/src/main.rs index 067ea32aa..77dd666b0 100644 --- a/bin/service-mango-fills/src/main.rs +++ b/bin/service-mango-fills/src/main.rs @@ -373,7 +373,7 @@ async fn main() -> anyhow::Result<()> { ); let group_context = Arc::new( MangoGroupContext::new_from_rpc( - &client.rpc_async(), + client.rpc_async(), Pubkey::from_str(&config.mango_group).unwrap(), ) .await?, diff --git a/bin/service-mango-orderbook/src/main.rs b/bin/service-mango-orderbook/src/main.rs index 47abce5e2..e2691d89d 100644 --- a/bin/service-mango-orderbook/src/main.rs +++ b/bin/service-mango-orderbook/src/main.rs @@ -357,7 +357,7 @@ async fn main() -> anyhow::Result<()> { ); let group_context = Arc::new( MangoGroupContext::new_from_rpc( - &client.rpc_async(), + client.rpc_async(), Pubkey::from_str(&config.mango_group).unwrap(), ) .await?, diff --git a/bin/service-mango-pnl/src/main.rs b/bin/service-mango-pnl/src/main.rs index 2b3d69b56..c2f2f385c 100644 --- a/bin/service-mango-pnl/src/main.rs +++ b/bin/service-mango-pnl/src/main.rs @@ -265,7 +265,7 @@ async fn main() -> anyhow::Result<()> { ); let group_context = Arc::new( MangoGroupContext::new_from_rpc( - &client.rpc_async(), + client.rpc_async(), Pubkey::from_str(&config.pnl.mango_group).unwrap(), ) .await?, @@ -273,7 +273,7 @@ async fn main() -> anyhow::Result<()> { let chain_data = Arc::new(RwLock::new(chain_data::ChainData::new())); let account_fetcher = Arc::new(chain_data::AccountFetcher { chain_data: chain_data.clone(), - rpc: client.rpc_async(), + rpc: client.new_rpc_async(), }); let metrics_tx = metrics::start(config.metrics, "pnl".into()); diff --git a/bin/settler/src/main.rs b/bin/settler/src/main.rs index 5c039b4c0..4c6e06ade 100644 --- a/bin/settler/src/main.rs +++ b/bin/settler/src/main.rs @@ -112,7 +112,7 @@ async fn main() -> anyhow::Result<()> { // Reading accounts from chain_data let account_fetcher = Arc::new(chain_data::AccountFetcher { chain_data: chain_data.clone(), - rpc: client.rpc_async(), + rpc: client.new_rpc_async(), }); let mango_account = account_fetcher @@ -120,7 +120,7 @@ async fn main() -> anyhow::Result<()> { .await?; let mango_group = mango_account.fixed.group; - let group_context = MangoGroupContext::new_from_rpc(&client.rpc_async(), mango_group).await?; + let group_context = MangoGroupContext::new_from_rpc(client.rpc_async(), mango_group).await?; let mango_oracles = group_context .tokens diff --git a/bin/settler/src/settle.rs b/bin/settler/src/settle.rs index 6a5485cc7..51b91d5fd 100644 --- a/bin/settler/src/settle.rs +++ b/bin/settler/src/settle.rs @@ -6,8 +6,7 @@ use mango_v4::accounts_zerocopy::KeyedAccountSharedData; use mango_v4::health::HealthType; use mango_v4::state::{OracleAccountInfos, PerpMarket, PerpMarketIndex}; use mango_v4_client::{ - chain_data, health_cache, prettify_solana_client_error, MangoClient, PreparedInstructions, - TransactionBuilder, + chain_data, health_cache, MangoClient, PreparedInstructions, TransactionBuilder, }; use solana_sdk::address_lookup_table_account::AddressLookupTableAccount; use solana_sdk::commitment_config::CommitmentConfig; @@ -273,7 +272,7 @@ impl<'a> SettleBatchProcessor<'a> { address_lookup_tables: self.address_lookup_tables.clone(), payer: fee_payer.pubkey(), signers: vec![fee_payer], - config: client.transaction_builder_config, + config: client.config().transaction_builder_config, } .transaction_with_blockhash(self.blockhash) } @@ -286,13 +285,7 @@ impl<'a> SettleBatchProcessor<'a> { let tx = self.transaction()?; self.instructions.clear(); - let send_result = self - .mango_client - .client - .rpc_async() - .send_transaction_with_config(&tx, self.mango_client.client.rpc_send_transaction_config) - .await - .map_err(prettify_solana_client_error); + let send_result = self.mango_client.client.send_transaction(&tx).await; if let Err(err) = send_result { info!("error while sending settle batch: {}", err); diff --git a/lib/client/src/client.rs b/lib/client/src/client.rs index 9716df468..fd051d26e 100644 --- a/lib/client/src/client.rs +++ b/lib/client/src/client.rs @@ -12,8 +12,9 @@ use anchor_spl::associated_token::get_associated_token_address; use anchor_spl::token::Token; use fixed::types::I80F48; -use futures::{stream, StreamExt, TryStreamExt}; +use futures::{stream, StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; +use tracing::*; use mango_v4::accounts_ix::{Serum3OrderType, Serum3SelfTradeBehavior, Serum3Side}; use mango_v4::accounts_zerocopy::KeyedAccountSharedData; @@ -24,6 +25,7 @@ use mango_v4::state::{ use solana_address_lookup_table_program::state::AddressLookupTable; use solana_client::nonblocking::rpc_client::RpcClient as RpcClientAsync; +use solana_client::rpc_client::SerializableTransaction; use solana_client::rpc_config::RpcSendTransactionConfig; use solana_client::rpc_response::RpcSimulateTransactionResult; use solana_sdk::address_lookup_table_account::AddressLookupTableAccount; @@ -51,7 +53,8 @@ pub const MAX_ACCOUNTS_PER_TRANSACTION: usize = 64; // very close to anchor_client::Client, which unfortunately has no accessors or Clone #[derive(Clone, Debug, Builder)] -pub struct Client { +#[builder(name = "ClientBuilder", build_fn(name = "build_config"))] +pub struct ClientConfig { /// RPC url /// /// Defaults to Cluster::Mainnet, using the public crowded mainnet-beta rpc endpoint. @@ -70,8 +73,8 @@ pub struct Client { /// /// This timeout applies to rpc requests. Note that the timeout for transaction /// confirmation is configured separately in rpc_confirm_transaction_config. - #[builder(default = "Some(Duration::from_secs(60))")] - pub timeout: Option, + #[builder(default = "Duration::from_secs(60)")] + pub timeout: Duration, #[builder(default)] pub transaction_builder_config: TransactionBuilderConfig, @@ -92,9 +95,19 @@ pub struct Client { #[builder(default = "\"\".into()")] pub jupiter_token: String, + + /// If set, don't use `cluster` for sending transactions and send to all + /// addresses configured here instead. + #[builder(default = "None")] + pub override_send_transaction_urls: Option>, } impl ClientBuilder { + pub fn build(&self) -> Result { + let config = self.build_config()?; + Ok(Client::new_from_config(config)) + } + pub fn default_rpc_send_transaction_config() -> RpcSendTransactionConfig { RpcSendTransactionConfig { preflight_commitment: Some(CommitmentLevel::Processed), @@ -109,6 +122,11 @@ impl ClientBuilder { } } } +pub struct Client { + config: ClientConfig, + rpc_async: RpcClientAsync, + send_transaction_rpc_asyncs: Vec, +} impl Client { pub fn builder() -> ClientBuilder { @@ -127,35 +145,101 @@ impl Client { .cluster(cluster) .commitment(commitment) .fee_payer(Some(fee_payer)) - .timeout(timeout) + .timeout(timeout.unwrap_or(Duration::from_secs(30))) .transaction_builder_config(transaction_builder_config) .build() .unwrap() } - pub fn rpc_async(&self) -> RpcClientAsync { - let url = self.cluster.url().to_string(); - if let Some(timeout) = self.timeout.as_ref() { - RpcClientAsync::new_with_timeout_and_commitment(url, *timeout, self.commitment) - } else { - RpcClientAsync::new_with_commitment(url, self.commitment) + pub fn new_from_config(config: ClientConfig) -> Self { + Self { + rpc_async: RpcClientAsync::new_with_timeout_and_commitment( + config.cluster.url().to_string(), + config.timeout, + config.commitment, + ), + send_transaction_rpc_asyncs: config + .override_send_transaction_urls + .clone() + .unwrap_or_else(|| vec![config.cluster.url().to_string()]) + .into_iter() + .map(|url| { + RpcClientAsync::new_with_timeout_and_commitment( + url, + config.timeout, + config.commitment, + ) + }) + .collect_vec(), + config, } } + pub fn config(&self) -> &ClientConfig { + &self.config + } + + pub fn rpc_async(&self) -> &RpcClientAsync { + &self.rpc_async + } + + /// Sometimes clients don't want to borrow the Client instance and just pass on RpcClientAsync + pub fn new_rpc_async(&self) -> RpcClientAsync { + let url = self.config.cluster.url().to_string(); + RpcClientAsync::new_with_timeout_and_commitment( + url, + self.config.timeout, + self.config.commitment, + ) + } + // TODO: this function here is awkward, since it (intentionally) doesn't use MangoClient::account_fetcher pub async fn rpc_anchor_account( &self, address: &Pubkey, ) -> anyhow::Result { - fetch_anchor_account(&self.rpc_async(), address).await + fetch_anchor_account(self.rpc_async(), address).await } pub fn fee_payer(&self) -> Arc { - self.fee_payer + self.config + .fee_payer .as_ref() .expect("fee payer must be set") .clone() } + + /// Sends a transaction via the configured cluster (or all override_send_transaction_urls). + /// + /// Returns the tx signature if at least one send returned ok. + /// Note that a success does not mean that the transaction is confirmed. + pub async fn send_transaction( + &self, + tx: &impl SerializableTransaction, + ) -> anyhow::Result { + let futures = self.send_transaction_rpc_asyncs.iter().map(|rpc| { + rpc.send_transaction_with_config(tx, self.config.rpc_send_transaction_config) + .map_err(prettify_solana_client_error) + }); + let mut results = futures::future::join_all(futures).await; + + // If all fail, return the first + let successful_sends = results.iter().filter(|r| r.is_ok()).count(); + if successful_sends == 0 { + results.remove(0)?; + } + + // Otherwise just log errors + for (result, rpc) in results.iter().zip(self.send_transaction_rpc_asyncs.iter()) { + if let Err(err) = result { + info!( + rpc = rpc.url(), + successful_sends, "one of the transaction sends failed: {err:?}", + ) + } + } + return Ok(*tx.get_signature()); + } } // todo: might want to integrate geyser, websockets, or simple http polling for keeping data fresh @@ -193,7 +277,7 @@ impl MangoClient { group: Pubkey, owner: &Keypair, ) -> anyhow::Result> { - fetch_mango_accounts(&client.rpc_async(), mango_v4::ID, group, owner.pubkey()).await + fetch_mango_accounts(client.rpc_async(), mango_v4::ID, group, owner.pubkey()).await } pub async fn find_or_create_account( @@ -287,7 +371,7 @@ impl MangoClient { address_lookup_tables: vec![], payer: payer.pubkey(), signers: vec![owner, payer], - config: client.transaction_builder_config, + config: client.config.transaction_builder_config, } .send_and_confirm(&client) .await?; @@ -301,7 +385,7 @@ impl MangoClient { account: Pubkey, owner: Arc, ) -> anyhow::Result { - let rpc = client.rpc_async(); + let rpc = client.new_rpc_async(); let account_fetcher = Arc::new(CachedAccountFetcher::new(Arc::new(RpcAccountFetcher { rpc, }))); @@ -1679,7 +1763,7 @@ impl MangoClient { address_lookup_tables: self.mango_address_lookup_tables().await?, payer: fee_payer.pubkey(), signers: vec![self.owner.clone(), fee_payer], - config: self.client.transaction_builder_config, + config: self.client.config.transaction_builder_config, } .send_and_confirm(&self.client) .await @@ -1695,7 +1779,7 @@ impl MangoClient { address_lookup_tables: self.mango_address_lookup_tables().await?, payer: fee_payer.pubkey(), signers: vec![fee_payer], - config: self.client.transaction_builder_config, + config: self.client.config.transaction_builder_config, } .send_and_confirm(&self.client) .await @@ -1711,7 +1795,7 @@ impl MangoClient { address_lookup_tables: vec![], payer: fee_payer.pubkey(), signers: vec![fee_payer], - config: self.client.transaction_builder_config, + config: self.client.config.transaction_builder_config, } .simulate(&self.client) .await @@ -1730,7 +1814,7 @@ impl MangoClient { match MangoGroupContext::new_from_rpc(&rpc_async, mango_client.group()).await { Ok(v) => v, Err(e) => { - tracing::warn!("could not fetch context to check for changes: {e:?}"); + warn!("could not fetch context to check for changes: {e:?}"); continue; } }; @@ -1775,9 +1859,9 @@ impl TransactionSize { #[derive(Copy, Clone, Debug, Default)] pub struct TransactionBuilderConfig { - // adds a SetComputeUnitPrice instruction in front if none exists + /// adds a SetComputeUnitPrice instruction in front if none exists pub prioritization_micro_lamports: Option, - // adds a SetComputeUnitBudget instruction if none exists + /// adds a SetComputeUnitBudget instruction if none exists pub compute_budget_per_instruction: Option, } @@ -1870,9 +1954,7 @@ impl TransactionBuilder { pub async fn send(&self, client: &Client) -> anyhow::Result { let rpc = client.rpc_async(); let tx = self.transaction(&rpc).await?; - rpc.send_transaction_with_config(&tx, client.rpc_send_transaction_config) - .await - .map_err(prettify_solana_client_error) + client.send_transaction(&tx).await } pub async fn simulate(&self, client: &Client) -> anyhow::Result { @@ -1885,15 +1967,12 @@ impl TransactionBuilder { let rpc = client.rpc_async(); let tx = self.transaction(&rpc).await?; let recent_blockhash = tx.message.recent_blockhash(); - let signature = rpc - .send_transaction_with_config(&tx, client.rpc_send_transaction_config) - .await - .map_err(prettify_solana_client_error)?; + let signature = client.send_transaction(&tx).await?; wait_for_transaction_confirmation( &rpc, &signature, recent_blockhash, - &client.rpc_confirm_transaction_config, + &client.config.rpc_confirm_transaction_config, ) .await?; Ok(signature) diff --git a/lib/client/src/jupiter/v4.rs b/lib/client/src/jupiter/v4.rs index 29cf2be06..770ddf3e9 100644 --- a/lib/client/src/jupiter/v4.rs +++ b/lib/client/src/jupiter/v4.rs @@ -107,7 +107,10 @@ impl<'a> JupiterV4<'a> { let response = self .mango_client .http_client - .get(format!("{}/quote", self.mango_client.client.jupiter_v4_url)) + .get(format!( + "{}/quote", + self.mango_client.client.config().jupiter_v4_url + )) .query(&[ ("inputMint", input_mint.to_string()), ("outputMint", output_mint.to_string()), @@ -158,7 +161,10 @@ impl<'a> JupiterV4<'a> { let swap_response = self .mango_client .http_client - .post(format!("{}/swap", self.mango_client.client.jupiter_v4_url)) + .post(format!( + "{}/swap", + self.mango_client.client.config().jupiter_v4_url + )) .json(&SwapRequest { route: route.clone(), user_public_key: self.mango_client.owner.pubkey().to_string(), @@ -330,7 +336,7 @@ impl<'a> JupiterV4<'a> { address_lookup_tables, payer, signers: vec![self.mango_client.owner.clone()], - config: self.mango_client.client.transaction_builder_config, + config: self.mango_client.client.config().transaction_builder_config, }) } diff --git a/lib/client/src/jupiter/v6.rs b/lib/client/src/jupiter/v6.rs index 3b4ab074e..09ccd6cf1 100644 --- a/lib/client/src/jupiter/v6.rs +++ b/lib/client/src/jupiter/v6.rs @@ -194,15 +194,15 @@ impl<'a> JupiterV6<'a> { ), ), ]; - let client = &self.mango_client.client; - if !client.jupiter_token.is_empty() { - query_args.push(("token", client.jupiter_token.clone())); + let config = self.mango_client.client.config(); + if !config.jupiter_token.is_empty() { + query_args.push(("token", config.jupiter_token.clone())); } let response = self .mango_client .http_client - .get(format!("{}/quote", client.jupiter_v6_url)) + .get(format!("{}/quote", config.jupiter_v6_url)) .query(&query_args) .send() .await @@ -267,15 +267,15 @@ impl<'a> JupiterV6<'a> { .context("building health accounts")?; let mut query_args = vec![]; - let client = &self.mango_client.client; - if !client.jupiter_token.is_empty() { - query_args.push(("token", client.jupiter_token.clone())); + let config = self.mango_client.client.config(); + if !config.jupiter_token.is_empty() { + query_args.push(("token", config.jupiter_token.clone())); } let swap_response = self .mango_client .http_client - .post(format!("{}/swap-instructions", client.jupiter_v6_url)) + .post(format!("{}/swap-instructions", config.jupiter_v6_url)) .query(&query_args) .json(&SwapRequest { user_public_key: owner.to_string(), @@ -386,7 +386,7 @@ impl<'a> JupiterV6<'a> { address_lookup_tables, payer, signers: vec![self.mango_client.owner.clone()], - config: self.mango_client.client.transaction_builder_config, + config: self.mango_client.client.config().transaction_builder_config, }) }