test: add transaction service integration test

This commit is contained in:
Lou-Kamades 2024-03-29 13:15:34 -05:00
parent 0105602e05
commit 23199c3a3b
9 changed files with 411 additions and 6 deletions

30
Cargo.lock generated
View File

@ -4812,6 +4812,36 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "solana-lite-rpc-services-integration-test"
version = "0.1.0"
dependencies = [
"anyhow",
"bench",
"bincode",
"countmap",
"crossbeam-channel",
"dashmap 5.5.3",
"itertools 0.10.5",
"lite-rpc",
"log",
"serde",
"serde_json",
"solana-lite-rpc-address-lookup-tables",
"solana-lite-rpc-cluster-endpoints",
"solana-lite-rpc-core",
"solana-lite-rpc-prioritization-fees",
"solana-lite-rpc-services",
"solana-lite-rpc-util",
"solana-net-utils",
"solana-rpc-client",
"solana-sdk",
"solana-streamer",
"solana-transaction-status",
"tokio",
"tracing-subscriber",
]
[[package]]
name = "solana-lite-rpc-util"
version = "0.2.4"

View File

@ -5,6 +5,7 @@ members = [
"core",
"util",
"services",
"services-integration-test",
"lite-rpc",
"quic-forward-proxy",
"quic-forward-proxy-integration-test",

View File

@ -0,0 +1,37 @@
[package]
name = "solana-lite-rpc-services-integration-test"
version = "0.1.0"
edition = "2021"
description = "Integration test for lite rpc services"
rust-version = "1.73.0"
repository = "https://github.com/blockworks-foundation/lite-rpc"
license = "AGPL"
publish = false
[dependencies]
bincode = { workspace = true }
bench = { path = "../bench" }
lite-rpc = { path = "../lite-rpc" }
solana-lite-rpc-address-lookup-tables = { workspace = true }
solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-util = { workspace = true }
solana-lite-rpc-services = { workspace = true }
solana-lite-rpc-prioritization-fees = { workspace = true }
solana-lite-rpc-cluster-endpoints = { workspace = true }
solana-sdk = { workspace = true }
solana-streamer = { workspace = true }
solana-transaction-status = { workspace = true }
solana-net-utils = { workspace = true }
solana-rpc-client = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
dashmap = { workspace = true }
itertools = { workspace = true }
tracing-subscriber = { workspace = true, features = ["std", "env-filter"] }
tokio = { version = "1.28.2", features = ["full", "fs"]}
[dev-dependencies]
crossbeam-channel = "0.5.6"
countmap = "0.2.0"

View File

@ -0,0 +1,233 @@
use dashmap::DashMap;
use solana_lite_rpc_core::{
keypair_loader::load_identity_keypair,
stores::{
block_information_store::{BlockInformation, BlockInformationStore},
cluster_info_store::ClusterInfo,
data_cache::{DataCache, SlotCache},
subscription_store::SubscriptionStore,
tx_store::TxStore,
},
structures::{
account_filter::AccountFilters, epoch::EpochCache, identity_stakes::IdentityStakes,
leaderschedule::CalculatedSchedule, produced_block::ProducedBlock,
},
types::BlockStream, AnyhowJoinHandle,
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{
commitment_config::CommitmentConfig, signature::Keypair,
signer::Signer,
};
use std::sync::Arc;
use lite_rpc::{cli::Config, service_spawner::ServiceSpawner, DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE};
use log::{debug, info, trace};
use solana_lite_rpc_cluster_endpoints::{
endpoint_stremers::EndpointStreaming,
geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig},
grpc_subscription::create_grpc_subscription,
json_rpc_leaders_getter::JsonRpcLeaderGetter,
};
use solana_lite_rpc_services::{tpu_utils::tpu_connection_path::TpuConnectionPath, transaction_replayer::TransactionReplayer, transaction_service::TransactionService};
use solana_lite_rpc_services::tpu_utils::tpu_service::{TpuService, TpuServiceConfig};
use solana_lite_rpc_services::{
quic_connection_utils::QuicConnectionParameters, tx_sender::TxSender,
};
use std::net::{SocketAddr, ToSocketAddrs};
use std::time::Duration;
use tokio::time::{timeout, Instant};
use tokio::sync::RwLock;
// TODO: refactor
async fn get_latest_block(
mut block_stream: BlockStream,
commitment_config: CommitmentConfig,
) -> ProducedBlock {
let started = Instant::now();
loop {
match timeout(Duration::from_millis(500), block_stream.recv()).await {
Ok(Ok(block)) => {
if block.commitment_config == commitment_config {
return block;
}
}
Err(_elapsed) => {
debug!(
"waiting for latest block ({}) ... {:.02}ms",
commitment_config.commitment,
started.elapsed().as_secs_f32() * 1000.0
);
}
Ok(Err(_error)) => {
panic!("Did not recv blocks");
}
}
}
}
const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameters {
connection_timeout: Duration::from_secs(2),
connection_retry_count: 10,
finalize_timeout: Duration::from_secs(2),
max_number_of_connections: 8,
unistream_timeout: Duration::from_secs(2),
write_timeout: Duration::from_secs(2),
number_of_transactions_per_unistream: 10,
unistreams_to_create_new_connection_in_percentage: 10,
};
pub async fn setup_tx_service() -> anyhow::Result<(TransactionService, DataCache, AnyhowJoinHandle)> {
let config = Config::load().await?;
let grpc_sources = config.get_grpc_sources();
let Config {
rpc_addr,
fanout_size,
identity_keypair,
transaction_retry_after_secs,
quic_proxy_addr,
prometheus_addr,
maximum_retries_per_tx,
account_filters,
..
} = config;
let validator_identity = Arc::new(
load_identity_keypair(identity_keypair)
.await?
.unwrap_or_else(Keypair::new),
);
let retry_after = Duration::from_secs(transaction_retry_after_secs);
let tpu_connection_path = configure_tpu_connection_path(quic_proxy_addr);
trace!("tpu_connection_path: {}", tpu_connection_path);
let account_filters = if let Some(account_filters) = account_filters {
serde_json::from_str::<AccountFilters>(account_filters.as_str())
.expect("Account filters should be valid")
} else {
vec![]
};
let rpc_client = Arc::new(RpcClient::new(rpc_addr.clone()));
let timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};
info!("Creating geyser subscription...");
let (subscriptions, _cluster_endpoint_tasks) = create_grpc_subscription(
rpc_client.clone(),
grpc_sources
.iter()
.map(|s| {
GrpcSourceConfig::new(s.addr.clone(), s.x_token.clone(), None, timeouts.clone())
})
.collect(),
account_filters.clone(),
)?;
let EndpointStreaming {
// note: blocks_notifier will be dropped at some point
blocks_notifier,
slot_notifier,
..
} = subscriptions;
info!("Waiting for first finalized block...");
let finalized_block =
get_latest_block(blocks_notifier.resubscribe(), CommitmentConfig::finalized()).await;
info!("Got finalized block: {:?}", finalized_block.slot);
let (epoch_data, _current_epoch_info) = EpochCache::bootstrap_epoch(&rpc_client).await?;
let block_information_store =
BlockInformationStore::new(BlockInformation::from_block(&finalized_block));
let data_cache = DataCache {
block_information_store,
cluster_info: ClusterInfo::default(),
identity_stakes: IdentityStakes::new(validator_identity.pubkey()),
slot_cache: SlotCache::new(finalized_block.slot),
tx_subs: SubscriptionStore::default(),
txs: TxStore {
store: Arc::new(DashMap::new()),
},
epoch_data,
leader_schedule: Arc::new(RwLock::new(CalculatedSchedule::default())),
};
let tpu_config = TpuServiceConfig {
fanout_slots: fanout_size,
maximum_transaction_in_queue: 20000,
quic_connection_params: QUIC_CONNECTION_PARAMS,
tpu_connection_path,
};
let spawner = ServiceSpawner {
prometheus_addr,
data_cache: data_cache.clone(),
};
//init grpc leader schedule and vote account is configured.
let leader_schedule = Arc::new(JsonRpcLeaderGetter::new(rpc_client.clone(), 1024, 128));
let tpu_service: TpuService = TpuService::new(
tpu_config,
validator_identity,
leader_schedule,
data_cache.clone(),
)
.await?;
let tx_sender = TxSender::new(data_cache.clone(), tpu_service.clone());
let tx_replayer = TransactionReplayer::new(tpu_service.clone(), data_cache.clone(), retry_after);
trace!("spawning tx_service");
let (transaction_service, tx_service_jh) = spawner.spawn_tx_service(
tx_sender,
tx_replayer,
tpu_service,
DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE,
None,
maximum_retries_per_tx,
slot_notifier.resubscribe(),
);
trace!("tx_service spawned successfully");
Ok((transaction_service, data_cache, tx_service_jh))
}
// TODO: deduplicate
pub fn configure_tpu_connection_path(quic_proxy_addr: Option<String>) -> TpuConnectionPath {
match quic_proxy_addr {
None => {
TpuConnectionPath::QuicDirectPath
},
Some(prox_address) => {
let proxy_socket_addr = parse_host_port(prox_address.as_str()).unwrap();
TpuConnectionPath::QuicForwardProxyPath {
// e.g. "127.0.0.1:11111" or "localhost:11111"
forward_proxy_address: proxy_socket_addr,
}
}
}
}
pub fn parse_host_port(host_port: &str) -> Result<SocketAddr, String> {
let addrs: Vec<_> = host_port
.to_socket_addrs()
.map_err(|err| format!("Unable to resolve host {host_port}: {err}"))?
.collect();
if addrs.is_empty() {
Err(format!("Unable to resolve host: {host_port}"))
} else if addrs.len() > 1 {
Err(format!("Multiple addresses resolved for host: {host_port}"))
} else {
Ok(addrs[0])
}
}

View File

@ -0,0 +1,78 @@
use bench::{create_memo_tx, create_rng, BenchmarkTransactionParams};
use solana_sdk::{
commitment_config::CommitmentConfig, signature::Keypair,
transaction::VersionedTransaction,
};
use log::{debug, info, trace};
use std::time::Duration;
use tokio::time::Instant;
use crate::setup::setup_tx_service;
mod setup;
const SAMPLE_SIZE: usize = 10000;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
/// TC 4
///- send txs on LiteRPC broadcast channel and consume them using the Solana quic-streamer
/// - see quic_proxy_tpu_integrationtest.rs (note: not only about proxy)
/// - run cargo test (maybe need to use release build)
/// - Goal: measure performance of LiteRPC internal channel/thread structure and the TPU_service performance
pub async fn txn_broadcast() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
info!("START BENCHMARK: txn_broadcast");
debug!("spawning tx_service");
let (transaction_service, data_cache, _tx_jh) = setup_tx_service().await?;
debug!("tx_service spawned successfully");
let mut rng = create_rng(None);
let payer = Keypair::new();
let params = BenchmarkTransactionParams {
tx_size: bench::tx_size::TxSize::Small,
cu_price_micro_lamports: 1,
};
let mut i = 0;
let mut times: Vec<Duration> = vec![];
// TODO: save stats
// TODO: txn sink?
while i < SAMPLE_SIZE {
let blockhash = data_cache.block_information_store.get_latest_blockhash(CommitmentConfig::confirmed()).await;
let tx = create_memo_tx(&payer, blockhash, &mut rng, &params);
let serialized = bincode::serialize::<VersionedTransaction>(&tx)
.expect("Could not serialize VersionedTransaction");
info!("Sending txn: {:?} {:?}", tx.signatures[0], i);
let send_start = Instant::now();
transaction_service
.send_transaction(
serialized,
Some(1),
)
.await?;
let send_time = send_start.elapsed();
debug!("sent in {:?}", send_time);
times.push(send_time);
i += 1;
}
times.sort();
let median_time = times[times.len() / 2];
let total_time: Duration = times.iter().sum();
let max_time = times.iter().max().unwrap();
let min_time = times.iter().min().unwrap();
info!("avg send time: {:?}", total_time.div_f64(f64::from(SAMPLE_SIZE as u32) ));
info!("max_time: {:?}", max_time);
info!("min_time: {:?}", min_time);
info!("median_time: {:?}", median_time);
Ok(())
}

View File

@ -1,5 +1,6 @@
use anyhow::Context;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use tokio::time::Instant;
use super::tpu_connection_manager::TpuConnectionManager;
use crate::quic_connection_utils::QuicConnectionParameters;
@ -7,6 +8,7 @@ use crate::tpu_utils::quic_proxy_connection_manager::QuicProxyConnectionManager;
use crate::tpu_utils::tpu_connection_path::TpuConnectionPath;
use crate::tpu_utils::tpu_service::ConnectionManager::{DirectTpu, QuicProxy};
use log::{debug, trace, warn, info};
use solana_lite_rpc_core::network_utils::log_gso_workaround;
use solana_lite_rpc_core::stores::data_cache::DataCache;
use solana_lite_rpc_core::structures::transaction_sent_info::SentTransactionInfo;
@ -107,6 +109,7 @@ impl TpuService {
}
pub fn send_transaction(&self, transaction: &SentTransactionInfo) -> anyhow::Result<()> {
debug!("send txn tpu");
self.broadcast_sender.send(transaction.clone())?;
Ok(())
}
@ -178,18 +181,25 @@ impl TpuService {
pub fn start(&self, slot_notifications: SlotStream) -> AnyhowJoinHandle {
let this = self.clone();
info!("ahhhhhh");
tokio::spawn(async move {
let mut slot_notifications = slot_notifications;
loop {
// info!("loop");
let notification = slot_notifications
.recv()
.await
.context("Tpu service cannot get slot notification")?;
// info!("update quic conn");
let now = Instant::now();
this.update_quic_connections(
notification.processed_slot,
notification.estimated_processed_slot,
)
.await?;
// info!("quic conn updated in: {:?}", now.elapsed());
}
})
}

View File

@ -50,14 +50,14 @@ impl TransactionReplayer {
pub fn start_service(
&self,
sender: UnboundedSender<TransactionReplay>,
mut reciever: UnboundedReceiver<TransactionReplay>,
mut receiver: UnboundedReceiver<TransactionReplay>,
) -> AnyhowJoinHandle {
let tpu_service = self.tpu_service.clone();
let data_cache = self.data_cache.clone();
let retry_offset = self.retry_offset;
tokio::spawn(async move {
while let Some(mut tx_replay) = reciever.recv().await {
while let Some(mut tx_replay) = receiver.recv().await {
MESSAGES_IN_REPLAY_QUEUE.dec();
let now = Instant::now();
if now < tx_replay.replay_at {

View File

@ -1,7 +1,8 @@
// This class will manage the lifecycle for a transaction
// It will send, replay if necessary and confirm by listening to blocks
use std::time::Duration;
use log::trace;
use std::{num::IntErrorKind, time::Duration};
use crate::{
tpu_utils::tpu_service::TpuService,
@ -126,12 +127,15 @@ impl TransactionService {
raw_tx: Vec<u8>,
max_retries: Option<u16>,
) -> anyhow::Result<String> {
let s = Instant::now();
let tx = match bincode::deserialize::<VersionedTransaction>(&raw_tx) {
Ok(tx) => tx,
Err(err) => {
bail!(err.to_string());
bail!(format!("Failed to deserialize raw_tx: {}", err.to_string()));
}
};
let a = s.elapsed();
trace!("deser: {:?}", a);
let signature = tx.signatures[0];
let Some(BlockInformation {
@ -144,11 +148,14 @@ impl TransactionService {
else {
bail!("Blockhash not found in block store".to_string());
};
let b = s.elapsed();
trace!("block info: {:?}", b-a);
if self.block_information_store.get_last_blockheight() > last_valid_blockheight {
bail!("Blockhash is expired");
}
let c = s.elapsed();
trace!("block height: {:?}", c-b);
let prioritization_fee = {
let mut prioritization_fee = 0;
for ix in tx.message.instructions() {
@ -165,6 +172,8 @@ impl TransactionService {
}
prioritization_fee
};
let d = s.elapsed();
trace!("prio fee: {:?}", d-c);
PRIORITY_FEES_HISTOGRAM.observe(prioritization_fee as f64);
@ -186,6 +195,8 @@ impl TransactionService {
e
);
}
let e = s.elapsed();
trace!("txn channel: {:?}", e-d);
let replay_at = Instant::now() + self.replay_offset;
// ignore error for replay service
if self
@ -200,6 +211,9 @@ impl TransactionService {
{
MESSAGES_IN_REPLAY_QUEUE.inc();
}
let f = s.elapsed();
trace!("replay: {:?}", f-e);
trace!("toal: {:?}", f);
Ok(signature.to_string())
}
}

View File

@ -65,7 +65,9 @@ impl TxSender {
}
Err(err) => {
TXS_SENT_ERRORS.inc_by(1);
warn!("{err}");
let s = transaction_info.signature;
warn!("{s} - {err}");
warn!("TXS_IN_CHANNEL: {:?}", TXS_IN_CHANNEL.get());
0
}
};