Merge remote-tracking branch 'origin/main' into groovie/tpu-quic-integration-test-on-main

This commit is contained in:
GroovieGermanikus 2023-07-20 10:45:47 +02:00
commit e48ccbea57
7 changed files with 41 additions and 26 deletions

View File

@ -1,3 +1,4 @@
use anyhow::Context;
use log::{info, warn}; use log::{info, warn};
use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::config::RpcBlockConfig; use solana_rpc_client_api::config::RpcBlockConfig;
@ -76,7 +77,8 @@ impl BlockProcessor {
rewards: Some(true), rewards: Some(true),
}, },
) )
.await?; .await
.context("failed to get block")?;
let Some(block_height) = block.block_height else { let Some(block_height) = block.block_height else {
return Ok(BlockProcessorResult::invalid()); return Ok(BlockProcessorResult::invalid());

View File

@ -68,7 +68,8 @@ impl BlockStore {
RpcRequest::GetLatestBlockhash, RpcRequest::GetLatestBlockhash,
json!([commitment_config]), json!([commitment_config]),
) )
.await?; .await
.context("failed to poll latest blockhash")?;
let processed_blockhash = response.value.blockhash; let processed_blockhash = response.value.blockhash;
let processed_block = BlockInformation { let processed_block = BlockInformation {
@ -91,7 +92,8 @@ impl BlockStore {
) -> anyhow::Result<(String, BlockInformation)> { ) -> anyhow::Result<(String, BlockInformation)> {
let slot = rpc_client let slot = rpc_client
.get_slot_with_commitment(commitment_config) .get_slot_with_commitment(commitment_config)
.await?; .await
.context("failed to fetch latest slot")?;
let block = rpc_client let block = rpc_client
.get_block_with_config( .get_block_with_config(
@ -104,7 +106,8 @@ impl BlockStore {
max_supported_transaction_version: Some(0), max_supported_transaction_version: Some(0),
}, },
) )
.await?; .await
.context("failed to fetch latest blockhash")?;
let latest_block_hash = block.blockhash; let latest_block_hash = block.blockhash;
let block_height = block let block_height = block

View File

@ -1,3 +1,4 @@
use anyhow::Context;
use std::{collections::VecDeque, str::FromStr, sync::Arc}; use std::{collections::VecDeque, str::FromStr, sync::Arc};
use dashmap::DashMap; use dashmap::DashMap;
@ -36,7 +37,10 @@ impl LeaderSchedule {
} }
pub async fn load_cluster_info(&self, rpc_client: Arc<RpcClient>) -> anyhow::Result<()> { pub async fn load_cluster_info(&self, rpc_client: Arc<RpcClient>) -> anyhow::Result<()> {
let cluster_nodes = rpc_client.get_cluster_nodes().await?; let cluster_nodes = rpc_client
.get_cluster_nodes()
.await
.context("failed to get cluster nodes")?;
cluster_nodes.iter().for_each(|x| { cluster_nodes.iter().for_each(|x| {
if let Ok(pubkey) = Pubkey::from_str(x.pubkey.as_str()) { if let Ok(pubkey) = Pubkey::from_str(x.pubkey.as_str()) {
self.cluster_nodes.insert(pubkey, Arc::new(x.clone())); self.cluster_nodes.insert(pubkey, Arc::new(x.clone()));
@ -70,14 +74,17 @@ impl LeaderSchedule {
let first_slot_to_fetch = queue_end_slot + 1; let first_slot_to_fetch = queue_end_slot + 1;
let leaders = rpc_client let leaders = rpc_client
.get_slot_leaders(first_slot_to_fetch, last_slot_needed - first_slot_to_fetch) .get_slot_leaders(first_slot_to_fetch, last_slot_needed - first_slot_to_fetch)
.await?; .await
.context("failed to get slot leaders")?;
let mut leader_queue = self.leader_schedule.write().await; let mut leader_queue = self.leader_schedule.write().await;
for i in first_slot_to_fetch..last_slot_needed { for i in first_slot_to_fetch..last_slot_needed {
let current_leader = (i - first_slot_to_fetch) as usize; let current_leader = (i - first_slot_to_fetch) as usize;
let leader = leaders[current_leader]; let leader = leaders[current_leader];
if !self.cluster_nodes.contains_key(&leader) { if !self.cluster_nodes.contains_key(&leader) {
self.load_cluster_info(rpc_client.clone()).await?; self.load_cluster_info(rpc_client.clone())
.await
.context("failed to load cluster info")?;
} }
match self.cluster_nodes.get(&leader) { match self.cluster_nodes.get(&leader) {

View File

@ -17,7 +17,7 @@ use solana_lite_rpc_services::{
tx_sender::{TxSender, TXS_IN_CHANNEL}, tx_sender::{TxSender, TXS_IN_CHANNEL},
}; };
use anyhow::bail; use anyhow::{bail, Context};
use jsonrpsee::{core::SubscriptionResult, server::ServerBuilder, PendingSubscriptionSink}; use jsonrpsee::{core::SubscriptionResult, server::ServerBuilder, PendingSubscriptionSink};
use log::{error, info}; use log::{error, info};
use prometheus::{opts, register_int_counter, IntCounter}; use prometheus::{opts, register_int_counter, IntCounter};
@ -83,7 +83,10 @@ impl LiteBridge {
max_retries: usize, max_retries: usize,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let rpc_client = Arc::new(RpcClient::new(rpc_url.clone())); let rpc_client = Arc::new(RpcClient::new(rpc_url.clone()));
let current_slot = rpc_client.get_slot().await?; let current_slot = rpc_client
.get_slot()
.await
.context("failed to get initial slot")?;
let tx_store = empty_tx_store(); let tx_store = empty_tx_store();
@ -101,7 +104,7 @@ impl LiteBridge {
max_number_of_connections: 10, max_number_of_connections: 10,
unistream_timeout: Duration::from_millis(500), unistream_timeout: Duration::from_millis(500),
write_timeout: Duration::from_secs(1), write_timeout: Duration::from_secs(1),
number_of_transactions_per_unistream: 10, number_of_transactions_per_unistream: 8,
}, },
}; };
@ -331,6 +334,7 @@ impl LiteRpcServer for LiteBridge {
.rpc_client .rpc_client
.is_blockhash_valid(&blockhash, commitment) .is_blockhash_valid(&blockhash, commitment)
.await .await
.context("failed to get blockhash validity")
{ {
Ok(is_valid) => is_valid, Ok(is_valid) => is_valid,
Err(err) => { Err(err) => {
@ -407,6 +411,7 @@ impl LiteRpcServer for LiteBridge {
.rpc_client .rpc_client
.request_airdrop_with_config(&pubkey, lamports, config.unwrap_or_default()) .request_airdrop_with_config(&pubkey, lamports, config.unwrap_or_default())
.await .await
.context("failed to request airdrop")
{ {
Ok(airdrop_sig) => airdrop_sig.to_string(), Ok(airdrop_sig) => airdrop_sig.to_string(),
Err(err) => { Err(err) => {
@ -417,6 +422,7 @@ impl LiteRpcServer for LiteBridge {
.rpc_client .rpc_client
.get_latest_blockhash_with_commitment(CommitmentConfig::finalized()) .get_latest_blockhash_with_commitment(CommitmentConfig::finalized())
.await .await
.context("failed to get latest blockhash")
{ {
self.tx_store.insert( self.tx_store.insert(
airdrop_sig.clone(), airdrop_sig.clone(),

View File

@ -238,7 +238,11 @@ impl BlockListener {
// TODO insert if not exists leader_id into accountaddrs // TODO insert if not exists leader_id into accountaddrs
// fetch cluster time from rpc // fetch cluster time from rpc
let block_time = self.rpc_client.get_block_time(slot).await?; let block_time = self
.rpc_client
.get_block_time(slot)
.await
.context("failed to get block time")?;
// fetch local time from blockstore // fetch local time from blockstore
let block_info = self let block_info = self

View File

@ -146,19 +146,11 @@ impl ActiveConnection {
} }
} }
if txs.len() >= number_of_transactions_per_unistream - 1 { // queue getting full and a connection poll is getting slower
// queue getting full and a connection poll is getting slower // add more connections to the pool
// add more connections to the pool if connection_pool.len() < max_number_of_connections {
if connection_pool.len() < max_number_of_connections { connection_pool.add_connection().await;
connection_pool.add_connection().await; NB_QUIC_CONNECTIONS.inc();
NB_QUIC_CONNECTIONS.inc();
}
} else if txs.len() == 1 {
// low traffic / reduce connection till minimum 1
if connection_pool.len() > 1 {
connection_pool.remove_connection().await;
NB_QUIC_CONNECTIONS.dec();
}
} }
let task_counter = task_counter.clone(); let task_counter = task_counter.clone();

View File

@ -1,4 +1,4 @@
use anyhow::bail; use anyhow::{bail, Context};
use log::{error, info}; use log::{error, info};
use prometheus::{core::GenericGauge, opts, register_int_gauge}; use prometheus::{core::GenericGauge, opts, register_int_gauge};
use solana_client::nonblocking::rpc_client::RpcClient; use solana_client::nonblocking::rpc_client::RpcClient;
@ -202,7 +202,8 @@ impl TpuService {
// setup // setup
self.leader_schedule self.leader_schedule
.load_cluster_info(self.rpc_client.clone()) .load_cluster_info(self.rpc_client.clone())
.await?; .await
.context("failed to load initial cluster info")?;
self.update_current_stakes().await?; self.update_current_stakes().await?;
self.update_leader_schedule().await?; self.update_leader_schedule().await?;
self.update_quic_connections().await; self.update_quic_connections().await;