Merge pull request #159 from grooviegermanikus/groovie/debugging-context-rpc-calls
debugging context rpc calls
This commit is contained in:
commit
1834e9495a
|
@ -1,3 +1,4 @@
|
||||||
|
use anyhow::Context;
|
||||||
use log::{info, warn};
|
use log::{info, warn};
|
||||||
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
|
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
|
||||||
use solana_rpc_client_api::config::RpcBlockConfig;
|
use solana_rpc_client_api::config::RpcBlockConfig;
|
||||||
|
@ -76,7 +77,8 @@ impl BlockProcessor {
|
||||||
rewards: Some(true),
|
rewards: Some(true),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.await?;
|
.await
|
||||||
|
.context("failed to get block")?;
|
||||||
|
|
||||||
let Some(block_height) = block.block_height else {
|
let Some(block_height) = block.block_height else {
|
||||||
return Ok(BlockProcessorResult::invalid());
|
return Ok(BlockProcessorResult::invalid());
|
||||||
|
|
|
@ -68,7 +68,8 @@ impl BlockStore {
|
||||||
RpcRequest::GetLatestBlockhash,
|
RpcRequest::GetLatestBlockhash,
|
||||||
json!([commitment_config]),
|
json!([commitment_config]),
|
||||||
)
|
)
|
||||||
.await?;
|
.await
|
||||||
|
.context("failed to poll latest blockhash")?;
|
||||||
|
|
||||||
let processed_blockhash = response.value.blockhash;
|
let processed_blockhash = response.value.blockhash;
|
||||||
let processed_block = BlockInformation {
|
let processed_block = BlockInformation {
|
||||||
|
@ -91,7 +92,8 @@ impl BlockStore {
|
||||||
) -> anyhow::Result<(String, BlockInformation)> {
|
) -> anyhow::Result<(String, BlockInformation)> {
|
||||||
let slot = rpc_client
|
let slot = rpc_client
|
||||||
.get_slot_with_commitment(commitment_config)
|
.get_slot_with_commitment(commitment_config)
|
||||||
.await?;
|
.await
|
||||||
|
.context("failed to fetch latest slot")?;
|
||||||
|
|
||||||
let block = rpc_client
|
let block = rpc_client
|
||||||
.get_block_with_config(
|
.get_block_with_config(
|
||||||
|
@ -104,7 +106,8 @@ impl BlockStore {
|
||||||
max_supported_transaction_version: Some(0),
|
max_supported_transaction_version: Some(0),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.await?;
|
.await
|
||||||
|
.context("failed to fetch latest blockhash")?;
|
||||||
|
|
||||||
let latest_block_hash = block.blockhash;
|
let latest_block_hash = block.blockhash;
|
||||||
let block_height = block
|
let block_height = block
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use anyhow::Context;
|
||||||
use std::{collections::VecDeque, str::FromStr, sync::Arc};
|
use std::{collections::VecDeque, str::FromStr, sync::Arc};
|
||||||
|
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
|
@ -36,7 +37,10 @@ impl LeaderSchedule {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn load_cluster_info(&self, rpc_client: Arc<RpcClient>) -> anyhow::Result<()> {
|
pub async fn load_cluster_info(&self, rpc_client: Arc<RpcClient>) -> anyhow::Result<()> {
|
||||||
let cluster_nodes = rpc_client.get_cluster_nodes().await?;
|
let cluster_nodes = rpc_client
|
||||||
|
.get_cluster_nodes()
|
||||||
|
.await
|
||||||
|
.context("failed to get cluster nodes")?;
|
||||||
cluster_nodes.iter().for_each(|x| {
|
cluster_nodes.iter().for_each(|x| {
|
||||||
if let Ok(pubkey) = Pubkey::from_str(x.pubkey.as_str()) {
|
if let Ok(pubkey) = Pubkey::from_str(x.pubkey.as_str()) {
|
||||||
self.cluster_nodes.insert(pubkey, Arc::new(x.clone()));
|
self.cluster_nodes.insert(pubkey, Arc::new(x.clone()));
|
||||||
|
@ -70,14 +74,17 @@ impl LeaderSchedule {
|
||||||
let first_slot_to_fetch = queue_end_slot + 1;
|
let first_slot_to_fetch = queue_end_slot + 1;
|
||||||
let leaders = rpc_client
|
let leaders = rpc_client
|
||||||
.get_slot_leaders(first_slot_to_fetch, last_slot_needed - first_slot_to_fetch)
|
.get_slot_leaders(first_slot_to_fetch, last_slot_needed - first_slot_to_fetch)
|
||||||
.await?;
|
.await
|
||||||
|
.context("failed to get slot leaders")?;
|
||||||
|
|
||||||
let mut leader_queue = self.leader_schedule.write().await;
|
let mut leader_queue = self.leader_schedule.write().await;
|
||||||
for i in first_slot_to_fetch..last_slot_needed {
|
for i in first_slot_to_fetch..last_slot_needed {
|
||||||
let current_leader = (i - first_slot_to_fetch) as usize;
|
let current_leader = (i - first_slot_to_fetch) as usize;
|
||||||
let leader = leaders[current_leader];
|
let leader = leaders[current_leader];
|
||||||
if !self.cluster_nodes.contains_key(&leader) {
|
if !self.cluster_nodes.contains_key(&leader) {
|
||||||
self.load_cluster_info(rpc_client.clone()).await?;
|
self.load_cluster_info(rpc_client.clone())
|
||||||
|
.await
|
||||||
|
.context("failed to load cluster info")?;
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.cluster_nodes.get(&leader) {
|
match self.cluster_nodes.get(&leader) {
|
||||||
|
|
|
@ -17,7 +17,7 @@ use solana_lite_rpc_services::{
|
||||||
tx_sender::{TxSender, TXS_IN_CHANNEL},
|
tx_sender::{TxSender, TXS_IN_CHANNEL},
|
||||||
};
|
};
|
||||||
|
|
||||||
use anyhow::bail;
|
use anyhow::{bail, Context};
|
||||||
use jsonrpsee::{core::SubscriptionResult, server::ServerBuilder, PendingSubscriptionSink};
|
use jsonrpsee::{core::SubscriptionResult, server::ServerBuilder, PendingSubscriptionSink};
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use prometheus::{opts, register_int_counter, IntCounter};
|
use prometheus::{opts, register_int_counter, IntCounter};
|
||||||
|
@ -83,7 +83,10 @@ impl LiteBridge {
|
||||||
max_retries: usize,
|
max_retries: usize,
|
||||||
) -> anyhow::Result<Self> {
|
) -> anyhow::Result<Self> {
|
||||||
let rpc_client = Arc::new(RpcClient::new(rpc_url.clone()));
|
let rpc_client = Arc::new(RpcClient::new(rpc_url.clone()));
|
||||||
let current_slot = rpc_client.get_slot().await?;
|
let current_slot = rpc_client
|
||||||
|
.get_slot()
|
||||||
|
.await
|
||||||
|
.context("failed to get initial slot")?;
|
||||||
|
|
||||||
let tx_store = empty_tx_store();
|
let tx_store = empty_tx_store();
|
||||||
|
|
||||||
|
@ -331,6 +334,7 @@ impl LiteRpcServer for LiteBridge {
|
||||||
.rpc_client
|
.rpc_client
|
||||||
.is_blockhash_valid(&blockhash, commitment)
|
.is_blockhash_valid(&blockhash, commitment)
|
||||||
.await
|
.await
|
||||||
|
.context("failed to get blockhash validity")
|
||||||
{
|
{
|
||||||
Ok(is_valid) => is_valid,
|
Ok(is_valid) => is_valid,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
@ -407,6 +411,7 @@ impl LiteRpcServer for LiteBridge {
|
||||||
.rpc_client
|
.rpc_client
|
||||||
.request_airdrop_with_config(&pubkey, lamports, config.unwrap_or_default())
|
.request_airdrop_with_config(&pubkey, lamports, config.unwrap_or_default())
|
||||||
.await
|
.await
|
||||||
|
.context("failed to request airdrop")
|
||||||
{
|
{
|
||||||
Ok(airdrop_sig) => airdrop_sig.to_string(),
|
Ok(airdrop_sig) => airdrop_sig.to_string(),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
@ -417,6 +422,7 @@ impl LiteRpcServer for LiteBridge {
|
||||||
.rpc_client
|
.rpc_client
|
||||||
.get_latest_blockhash_with_commitment(CommitmentConfig::finalized())
|
.get_latest_blockhash_with_commitment(CommitmentConfig::finalized())
|
||||||
.await
|
.await
|
||||||
|
.context("failed to get latest blockhash")
|
||||||
{
|
{
|
||||||
self.tx_store.insert(
|
self.tx_store.insert(
|
||||||
airdrop_sig.clone(),
|
airdrop_sig.clone(),
|
||||||
|
|
|
@ -238,7 +238,11 @@ impl BlockListener {
|
||||||
// TODO insert if not exists leader_id into accountaddrs
|
// TODO insert if not exists leader_id into accountaddrs
|
||||||
|
|
||||||
// fetch cluster time from rpc
|
// fetch cluster time from rpc
|
||||||
let block_time = self.rpc_client.get_block_time(slot).await?;
|
let block_time = self
|
||||||
|
.rpc_client
|
||||||
|
.get_block_time(slot)
|
||||||
|
.await
|
||||||
|
.context("failed to get block time")?;
|
||||||
|
|
||||||
// fetch local time from blockstore
|
// fetch local time from blockstore
|
||||||
let block_info = self
|
let block_info = self
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use anyhow::bail;
|
use anyhow::{bail, Context};
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use prometheus::{core::GenericGauge, opts, register_int_gauge};
|
use prometheus::{core::GenericGauge, opts, register_int_gauge};
|
||||||
use solana_client::nonblocking::rpc_client::RpcClient;
|
use solana_client::nonblocking::rpc_client::RpcClient;
|
||||||
|
@ -202,7 +202,8 @@ impl TpuService {
|
||||||
// setup
|
// setup
|
||||||
self.leader_schedule
|
self.leader_schedule
|
||||||
.load_cluster_info(self.rpc_client.clone())
|
.load_cluster_info(self.rpc_client.clone())
|
||||||
.await?;
|
.await
|
||||||
|
.context("failed to load initial cluster info")?;
|
||||||
self.update_current_stakes().await?;
|
self.update_current_stakes().await?;
|
||||||
self.update_leader_schedule().await?;
|
self.update_leader_schedule().await?;
|
||||||
self.update_quic_connections().await;
|
self.update_quic_connections().await;
|
||||||
|
|
Loading…
Reference in New Issue