From 18397eeab7758b8c346b79e1e8190b4e3962dab2 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 19 Jul 2023 09:33:10 +0200 Subject: [PATCH 1/2] add context to rpc calls --- core/src/block_processor.rs | 4 +++- core/src/block_store.rs | 9 ++++++--- core/src/leader_schedule.rs | 10 +++++++--- lite-rpc/src/bridge.rs | 8 ++++++-- services/src/block_listenser.rs | 3 ++- services/src/tpu_utils/tpu_service.rs | 5 +++-- 6 files changed, 27 insertions(+), 12 deletions(-) diff --git a/core/src/block_processor.rs b/core/src/block_processor.rs index c8d55192..db5d3c8b 100644 --- a/core/src/block_processor.rs +++ b/core/src/block_processor.rs @@ -14,6 +14,7 @@ use solana_transaction_status::{ UiTransactionStatusMeta, }; use std::sync::Arc; +use anyhow::Context; use crate::block_store::{BlockInformation, BlockStore}; @@ -76,7 +77,8 @@ impl BlockProcessor { rewards: Some(true), }, ) - .await?; + .await + .context("failed to get block")?; let Some(block_height) = block.block_height else { return Ok(BlockProcessorResult::invalid()); diff --git a/core/src/block_store.rs b/core/src/block_store.rs index d4a86ced..1118a462 100644 --- a/core/src/block_store.rs +++ b/core/src/block_store.rs @@ -68,7 +68,8 @@ impl BlockStore { RpcRequest::GetLatestBlockhash, json!([commitment_config]), ) - .await?; + .await + .context("failed to poll latest blockhash")?; let processed_blockhash = response.value.blockhash; let processed_block = BlockInformation { @@ -91,7 +92,8 @@ impl BlockStore { ) -> anyhow::Result<(String, BlockInformation)> { let slot = rpc_client .get_slot_with_commitment(commitment_config) - .await?; + .await + .context("failed to fetch latest slot")?; let block = rpc_client .get_block_with_config( @@ -104,7 +106,8 @@ impl BlockStore { max_supported_transaction_version: Some(0), }, ) - .await?; + .await + .context("failed to fetch latest blockhash")?; let latest_block_hash = block.blockhash; let block_height = block diff --git a/core/src/leader_schedule.rs b/core/src/leader_schedule.rs index e2dd222b..0f80ff31 100644 --- a/core/src/leader_schedule.rs +++ b/core/src/leader_schedule.rs @@ -1,4 +1,5 @@ use std::{collections::VecDeque, str::FromStr, sync::Arc}; +use anyhow::Context; use dashmap::DashMap; use log::warn; @@ -36,7 +37,8 @@ impl LeaderSchedule { } pub async fn load_cluster_info(&self, rpc_client: Arc) -> anyhow::Result<()> { - let cluster_nodes = rpc_client.get_cluster_nodes().await?; + let cluster_nodes = rpc_client.get_cluster_nodes().await + .context("failed to get cluster nodes")?; cluster_nodes.iter().for_each(|x| { if let Ok(pubkey) = Pubkey::from_str(x.pubkey.as_str()) { self.cluster_nodes.insert(pubkey, Arc::new(x.clone())); @@ -70,14 +72,16 @@ impl LeaderSchedule { let first_slot_to_fetch = queue_end_slot + 1; let leaders = rpc_client .get_slot_leaders(first_slot_to_fetch, last_slot_needed - first_slot_to_fetch) - .await?; + .await + .context("failed to get slot leaders")?; let mut leader_queue = self.leader_schedule.write().await; for i in first_slot_to_fetch..last_slot_needed { let current_leader = (i - first_slot_to_fetch) as usize; let leader = leaders[current_leader]; if !self.cluster_nodes.contains_key(&leader) { - self.load_cluster_info(rpc_client.clone()).await?; + self.load_cluster_info(rpc_client.clone()).await + .context("failed to load cluster info")?; } match self.cluster_nodes.get(&leader) { diff --git a/lite-rpc/src/bridge.rs b/lite-rpc/src/bridge.rs index 436e5a9b..46869ef5 100644 --- a/lite-rpc/src/bridge.rs +++ b/lite-rpc/src/bridge.rs @@ -17,7 +17,7 @@ use solana_lite_rpc_services::{ tx_sender::{TxSender, TXS_IN_CHANNEL}, }; -use anyhow::bail; +use anyhow::{bail, Context}; use jsonrpsee::{core::SubscriptionResult, server::ServerBuilder, PendingSubscriptionSink}; use log::{error, info}; use prometheus::{opts, register_int_counter, IntCounter}; @@ -83,7 +83,8 @@ impl LiteBridge { max_retries: usize, ) -> anyhow::Result { let rpc_client = Arc::new(RpcClient::new(rpc_url.clone())); - let current_slot = rpc_client.get_slot().await?; + let current_slot = rpc_client.get_slot().await + .context("failed to get initial slot")?; let tx_store = empty_tx_store(); @@ -331,6 +332,7 @@ impl LiteRpcServer for LiteBridge { .rpc_client .is_blockhash_valid(&blockhash, commitment) .await + .context("failed to get blockhash validity") { Ok(is_valid) => is_valid, Err(err) => { @@ -407,6 +409,7 @@ impl LiteRpcServer for LiteBridge { .rpc_client .request_airdrop_with_config(&pubkey, lamports, config.unwrap_or_default()) .await + .context("failed to request airdrop") { Ok(airdrop_sig) => airdrop_sig.to_string(), Err(err) => { @@ -417,6 +420,7 @@ impl LiteRpcServer for LiteBridge { .rpc_client .get_latest_blockhash_with_commitment(CommitmentConfig::finalized()) .await + .context("failed to get latest blockhash") { self.tx_store.insert( airdrop_sig.clone(), diff --git a/services/src/block_listenser.rs b/services/src/block_listenser.rs index 2f7af353..f5258dd1 100644 --- a/services/src/block_listenser.rs +++ b/services/src/block_listenser.rs @@ -238,7 +238,8 @@ impl BlockListener { // TODO insert if not exists leader_id into accountaddrs // fetch cluster time from rpc - let block_time = self.rpc_client.get_block_time(slot).await?; + let block_time = self.rpc_client.get_block_time(slot).await + .context("failed to get block time")?; // fetch local time from blockstore let block_info = self diff --git a/services/src/tpu_utils/tpu_service.rs b/services/src/tpu_utils/tpu_service.rs index f646c3ae..7b925033 100644 --- a/services/src/tpu_utils/tpu_service.rs +++ b/services/src/tpu_utils/tpu_service.rs @@ -1,4 +1,4 @@ -use anyhow::bail; +use anyhow::{bail, Context}; use log::{error, info}; use prometheus::{core::GenericGauge, opts, register_int_gauge}; use solana_client::nonblocking::rpc_client::RpcClient; @@ -202,7 +202,8 @@ impl TpuService { // setup self.leader_schedule .load_cluster_info(self.rpc_client.clone()) - .await?; + .await + .context("failed to load initial cluster info")?; self.update_current_stakes().await?; self.update_leader_schedule().await?; self.update_quic_connections().await; From 37e44aa9b3b199444bbdb165bb439631c8ab9117 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 19 Jul 2023 09:33:52 +0200 Subject: [PATCH 2/2] code format --- core/src/block_processor.rs | 2 +- core/src/leader_schedule.rs | 9 ++++++--- lite-rpc/src/bridge.rs | 4 +++- services/src/block_listenser.rs | 5 ++++- 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/core/src/block_processor.rs b/core/src/block_processor.rs index db5d3c8b..86d80b36 100644 --- a/core/src/block_processor.rs +++ b/core/src/block_processor.rs @@ -1,3 +1,4 @@ +use anyhow::Context; use log::{info, warn}; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_rpc_client_api::config::RpcBlockConfig; @@ -14,7 +15,6 @@ use solana_transaction_status::{ UiTransactionStatusMeta, }; use std::sync::Arc; -use anyhow::Context; use crate::block_store::{BlockInformation, BlockStore}; diff --git a/core/src/leader_schedule.rs b/core/src/leader_schedule.rs index 0f80ff31..cb3c52ad 100644 --- a/core/src/leader_schedule.rs +++ b/core/src/leader_schedule.rs @@ -1,5 +1,5 @@ -use std::{collections::VecDeque, str::FromStr, sync::Arc}; use anyhow::Context; +use std::{collections::VecDeque, str::FromStr, sync::Arc}; use dashmap::DashMap; use log::warn; @@ -37,7 +37,9 @@ impl LeaderSchedule { } pub async fn load_cluster_info(&self, rpc_client: Arc) -> anyhow::Result<()> { - let cluster_nodes = rpc_client.get_cluster_nodes().await + let cluster_nodes = rpc_client + .get_cluster_nodes() + .await .context("failed to get cluster nodes")?; cluster_nodes.iter().for_each(|x| { if let Ok(pubkey) = Pubkey::from_str(x.pubkey.as_str()) { @@ -80,7 +82,8 @@ impl LeaderSchedule { let current_leader = (i - first_slot_to_fetch) as usize; let leader = leaders[current_leader]; if !self.cluster_nodes.contains_key(&leader) { - self.load_cluster_info(rpc_client.clone()).await + self.load_cluster_info(rpc_client.clone()) + .await .context("failed to load cluster info")?; } diff --git a/lite-rpc/src/bridge.rs b/lite-rpc/src/bridge.rs index 46869ef5..561954ff 100644 --- a/lite-rpc/src/bridge.rs +++ b/lite-rpc/src/bridge.rs @@ -83,7 +83,9 @@ impl LiteBridge { max_retries: usize, ) -> anyhow::Result { let rpc_client = Arc::new(RpcClient::new(rpc_url.clone())); - let current_slot = rpc_client.get_slot().await + let current_slot = rpc_client + .get_slot() + .await .context("failed to get initial slot")?; let tx_store = empty_tx_store(); diff --git a/services/src/block_listenser.rs b/services/src/block_listenser.rs index f5258dd1..2b911b2b 100644 --- a/services/src/block_listenser.rs +++ b/services/src/block_listenser.rs @@ -238,7 +238,10 @@ impl BlockListener { // TODO insert if not exists leader_id into accountaddrs // fetch cluster time from rpc - let block_time = self.rpc_client.get_block_time(slot).await + let block_time = self + .rpc_client + .get_block_time(slot) + .await .context("failed to get block time")?; // fetch local time from blockstore