fixing tests, cargo fmt

This commit is contained in:
godmodegalactus 2023-09-20 17:01:23 +02:00
parent 7f3d3303b7
commit 255b9de7f5
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
11 changed files with 68 additions and 36 deletions

View File

@ -159,15 +159,18 @@ impl BlockInformationStore {
self.blocks.len()
}
pub async fn is_blockhash_valid(&self, blockhash: &String, commitment_config: CommitmentConfig,) -> (bool, Slot) {
pub async fn is_blockhash_valid(
&self,
blockhash: &String,
commitment_config: CommitmentConfig,
) -> (bool, Slot) {
let latest_block = self.get_latest_block(commitment_config).await;
match self.blocks.get(blockhash) {
Some(block_information) => {
(latest_block.block_height <= block_information.last_valid_blockheight, latest_block.slot)
},
None => {
(false, latest_block.slot)
}
Some(block_information) => (
latest_block.block_height <= block_information.last_valid_blockheight,
latest_block.slot,
),
None => (false, latest_block.slot),
}
}
}

View File

@ -1,8 +1,8 @@
use std::sync::{atomic::AtomicU64, Arc};
use solana_sdk::hash::Hash;
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
use solana_sdk::slot_history::Slot;
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
use crate::{
stores::{
@ -11,7 +11,8 @@ use crate::{
},
structures::{
identity_stakes::IdentityStakes,
slot_notification::{AtomicSlot, SlotNotification}, transaction_sent_info::SentTransactionInfo,
slot_notification::{AtomicSlot, SlotNotification},
transaction_sent_info::SentTransactionInfo,
},
};
@ -47,13 +48,23 @@ impl DataCache {
self.tx_subs.clean(ttl_duration);
}
pub async fn check_if_confirmed_or_expired_blockheight(&self, sent_transaction_info: &SentTransactionInfo) -> bool {
self.txs.is_transaction_confirmed(&sent_transaction_info.signature) || self.block_store.get_latest_block(CommitmentConfig::processed()).await.block_height > sent_transaction_info.last_valid_block_height
pub async fn check_if_confirmed_or_expired_blockheight(
&self,
sent_transaction_info: &SentTransactionInfo,
) -> bool {
self.txs
.is_transaction_confirmed(&sent_transaction_info.signature)
|| self
.block_store
.get_latest_block(CommitmentConfig::processed())
.await
.block_height
> sent_transaction_info.last_valid_block_height
}
pub fn new_for_tests() -> Self {
Self {
block_store: BlockInformationStore::new(BlockInformation{
block_store: BlockInformationStore::new(BlockInformation {
block_height: 0,
blockhash: Hash::new_unique().to_string(),
cleanup_slot: 1000,

View File

@ -4,4 +4,4 @@ pub mod block_information_store;
pub mod cluster_info_store;
pub mod data_cache;
pub mod subscription_store;
pub mod tx_store;
pub mod tx_store;

View File

@ -7,4 +7,4 @@ pub mod produced_block;
pub mod proxy_request_format;
pub mod rotating_queue;
pub mod slot_notification;
pub mod transaction_sent_info;
pub mod transaction_sent_info;

View File

@ -8,4 +8,4 @@ pub struct SentTransactionInfo {
pub slot: Slot,
pub transaction: WireTransaction,
pub last_valid_block_height: u64,
}
}

View File

@ -21,9 +21,7 @@ use solana_rpc_client_api::{
config::{RpcContextConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig},
response::{Response as RpcResponse, RpcBlockhash, RpcResponseContext, RpcVersionInfo},
};
use solana_sdk::{
commitment_config::CommitmentConfig, pubkey::Pubkey, slot_history::Slot,
};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, slot_history::Slot};
use solana_transaction_status::TransactionStatus;
use std::{str::FromStr, sync::Arc};
use tokio::net::ToSocketAddrs;
@ -189,7 +187,11 @@ impl LiteRpcServer for LiteBridge {
let commitment = config.unwrap_or_default().commitment.unwrap_or_default();
let commitment = CommitmentConfig { commitment };
let (is_valid, slot) = self.data_cache.block_store.is_blockhash_valid(&blockhash, commitment).await;
let (is_valid, slot) = self
.data_cache
.block_store
.is_blockhash_valid(&blockhash, commitment)
.await;
Ok(RpcResponse {
context: RpcResponseContext {

View File

@ -5,8 +5,9 @@ use log::{debug, error, info, trace, warn};
use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters;
use solana_lite_rpc_core::solana_utils::SerializableTransaction;
use solana_lite_rpc_core::stores::tx_store::empty_tx_store;
use solana_lite_rpc_core::stores::data_cache::DataCache;
use solana_lite_rpc_core::structures::identity_stakes::IdentityStakesData;
use solana_lite_rpc_core::structures::transaction_sent_info::SentTransactionInfo;
use solana_lite_rpc_services::tpu_utils::tpu_connection_manager::TpuConnectionManager;
use solana_sdk::hash::Hash;
use solana_sdk::instruction::Instruction;
@ -514,7 +515,7 @@ async fn start_literpc_client_direct_mode(
connections_to_keep,
identity_stakes,
// note: tx_store is useless in this scenario as it is never changed; it's only used to check for duplicates
empty_tx_store().clone(),
DataCache::new_for_tests(),
QUIC_CONNECTION_PARAMS,
)
.await;
@ -523,7 +524,7 @@ async fn start_literpc_client_direct_mode(
let raw_sample_tx = build_raw_sample_tx(i);
trace!(
"broadcast transaction {} to {} receivers: {}",
raw_sample_tx.0,
raw_sample_tx.signature,
broadcast_sender.receiver_count(),
format!("hi {}", i)
);
@ -632,7 +633,7 @@ async fn start_literpc_client_proxy_mode(
let raw_sample_tx = build_raw_sample_tx(i);
trace!(
"broadcast transaction {} to {} receivers: {}",
raw_sample_tx.0,
raw_sample_tx.signature,
broadcast_sender.receiver_count(),
format!("hi {}", i)
);
@ -731,16 +732,22 @@ impl SolanaQuicStreamer {
const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr";
pub fn build_raw_sample_tx(i: u32) -> (String, Vec<u8>) {
pub fn build_raw_sample_tx(i: u32) -> SentTransactionInfo {
let payer_keypair = Keypair::from_base58_string(
"rKiJ7H5UUp3JR18kNyTF1XPuwPKHEM7gMLWHZPWP5djrW1vSjfwjhvJrevxF9MPmUmN9gJMLHZdLMgc9ao78eKr",
);
let tx = build_sample_tx(&payer_keypair, i);
let raw_tx = bincode::serialize::<VersionedTransaction>(&tx).expect("failed to serialize tx");
let transaction =
bincode::serialize::<VersionedTransaction>(&tx).expect("failed to serialize tx");
(tx.get_signature().to_string(), raw_tx)
SentTransactionInfo {
signature: tx.get_signature().to_string(),
slot: 1,
transaction,
last_valid_block_height: 300,
}
}
// "hi 1234 " -> 1234

View File

@ -6,7 +6,10 @@ use solana_lite_rpc_core::{
quic_connection::{PooledConnection, QuicConnectionPool},
quic_connection_utils::{QuicConnectionParameters, QuicConnectionUtils},
stores::data_cache::DataCache,
structures::{identity_stakes::IdentityStakesData, rotating_queue::RotatingQueue, transaction_sent_info::SentTransactionInfo},
structures::{
identity_stakes::IdentityStakesData, rotating_queue::RotatingQueue,
transaction_sent_info::SentTransactionInfo,
},
};
use solana_sdk::pubkey::Pubkey;
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;

View File

@ -2,7 +2,10 @@ use crate::tpu_utils::tpu_service::TpuService;
use anyhow::{bail, Context};
use log::error;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use solana_lite_rpc_core::{stores::tx_store::TxStore, AnyhowJoinHandle, structures::transaction_sent_info::SentTransactionInfo};
use solana_lite_rpc_core::{
stores::tx_store::TxStore, structures::transaction_sent_info::SentTransactionInfo,
AnyhowJoinHandle,
};
use std::time::Duration;
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender},
@ -76,8 +79,7 @@ impl TransactionReplayer {
continue;
}
// ignore reset error
let _ =
tpu_service.send_transaction(&tx_replay.transaction);
let _ = tpu_service.send_transaction(&tx_replay.transaction);
if tx_replay.replay_count < tx_replay.max_replay {
tx_replay.replay_count += 1;

View File

@ -9,7 +9,10 @@ use crate::{
tx_sender::TxSender,
};
use anyhow::bail;
use solana_lite_rpc_core::{solana_utils::SerializableTransaction, types::SlotStream, structures::transaction_sent_info::SentTransactionInfo};
use solana_lite_rpc_core::{
solana_utils::SerializableTransaction, structures::transaction_sent_info::SentTransactionInfo,
types::SlotStream,
};
use solana_lite_rpc_core::{
stores::block_information_store::{BlockInformation, BlockInformationStore},
structures::notifications::NotificationSender,
@ -128,7 +131,7 @@ impl TransactionService {
else {
bail!("Blockhash not found in block store".to_string());
};
let max_replay = max_retries.map_or(self.max_retries, |x| x as usize);
let transaction_info = SentTransactionInfo {
signature: signature.to_string(),

View File

@ -13,7 +13,10 @@ use tokio::sync::mpsc::Receiver;
use crate::tpu_utils::tpu_service::TpuService;
use solana_lite_rpc_core::{
stores::{data_cache::DataCache, tx_store::TxProps},
structures::{notifications::{NotificationMsg, NotificationSender, TransactionNotification}, transaction_sent_info::SentTransactionInfo},
structures::{
notifications::{NotificationMsg, NotificationSender, TransactionNotification},
transaction_sent_info::SentTransactionInfo,
},
AnyhowJoinHandle,
};
@ -90,9 +93,7 @@ impl TxSender {
transaction_info.signature.clone(),
TxProps::new(transaction_info.last_valid_block_height),
);
let quic_response = match tpu_client.send_transaction(
transaction_info
) {
let quic_response = match tpu_client.send_transaction(transaction_info) {
Ok(_) => {
TXS_SENT.inc_by(1);
1