diff --git a/Cargo.lock b/Cargo.lock index 0c187aa5..5a781fcf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4812,6 +4812,36 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "solana-lite-rpc-services-integration-test" +version = "0.1.0" +dependencies = [ + "anyhow", + "bench", + "bincode", + "countmap", + "crossbeam-channel", + "dashmap 5.5.3", + "itertools 0.10.5", + "lite-rpc", + "log", + "serde", + "serde_json", + "solana-lite-rpc-address-lookup-tables", + "solana-lite-rpc-cluster-endpoints", + "solana-lite-rpc-core", + "solana-lite-rpc-prioritization-fees", + "solana-lite-rpc-services", + "solana-lite-rpc-util", + "solana-net-utils", + "solana-rpc-client", + "solana-sdk", + "solana-streamer", + "solana-transaction-status", + "tokio", + "tracing-subscriber", +] + [[package]] name = "solana-lite-rpc-util" version = "0.2.4" diff --git a/Cargo.toml b/Cargo.toml index 9cc87b93..0b661dde 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "core", "util", "services", + "services-integration-test", "lite-rpc", "quic-forward-proxy", "quic-forward-proxy-integration-test", diff --git a/services-integration-test/Cargo.toml b/services-integration-test/Cargo.toml new file mode 100644 index 00000000..2301e576 --- /dev/null +++ b/services-integration-test/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "solana-lite-rpc-services-integration-test" +version = "0.1.0" +edition = "2021" +description = "Integration test for lite rpc services" +rust-version = "1.73.0" +repository = "https://github.com/blockworks-foundation/lite-rpc" +license = "AGPL" +publish = false + +[dependencies] +bincode = { workspace = true } +bench = { path = "../bench" } +lite-rpc = { path = "../lite-rpc" } +solana-lite-rpc-address-lookup-tables = { workspace = true } +solana-lite-rpc-core = { workspace = true } +solana-lite-rpc-util = { workspace = true } +solana-lite-rpc-services = { workspace = true } +solana-lite-rpc-prioritization-fees = { workspace = true } +solana-lite-rpc-cluster-endpoints = { workspace = true } +solana-sdk = { workspace = true } +solana-streamer = { workspace = true } +solana-transaction-status = { workspace = true } +solana-net-utils = { workspace = true } +solana-rpc-client = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +anyhow = { workspace = true } +log = { workspace = true } +dashmap = { workspace = true } +itertools = { workspace = true } +tracing-subscriber = { workspace = true, features = ["std", "env-filter"] } +tokio = { version = "1.28.2", features = ["full", "fs"]} + +[dev-dependencies] +crossbeam-channel = "0.5.6" +countmap = "0.2.0" diff --git a/services-integration-test/tests/setup/mod.rs b/services-integration-test/tests/setup/mod.rs new file mode 100644 index 00000000..011dc718 --- /dev/null +++ b/services-integration-test/tests/setup/mod.rs @@ -0,0 +1,233 @@ +use dashmap::DashMap; +use solana_lite_rpc_core::{ + keypair_loader::load_identity_keypair, + stores::{ + block_information_store::{BlockInformation, BlockInformationStore}, + cluster_info_store::ClusterInfo, + data_cache::{DataCache, SlotCache}, + subscription_store::SubscriptionStore, + tx_store::TxStore, + }, + structures::{ + account_filter::AccountFilters, epoch::EpochCache, identity_stakes::IdentityStakes, + leaderschedule::CalculatedSchedule, produced_block::ProducedBlock, + }, + types::BlockStream, AnyhowJoinHandle, +}; +use solana_rpc_client::nonblocking::rpc_client::RpcClient; +use solana_sdk::{ + commitment_config::CommitmentConfig, signature::Keypair, + signer::Signer, +}; +use std::sync::Arc; + +use lite_rpc::{cli::Config, service_spawner::ServiceSpawner, DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE}; +use log::{debug, info, trace}; +use solana_lite_rpc_cluster_endpoints::{ + endpoint_stremers::EndpointStreaming, + geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig}, + grpc_subscription::create_grpc_subscription, + json_rpc_leaders_getter::JsonRpcLeaderGetter, +}; +use solana_lite_rpc_services::{tpu_utils::tpu_connection_path::TpuConnectionPath, transaction_replayer::TransactionReplayer, transaction_service::TransactionService}; +use solana_lite_rpc_services::tpu_utils::tpu_service::{TpuService, TpuServiceConfig}; +use solana_lite_rpc_services::{ + quic_connection_utils::QuicConnectionParameters, tx_sender::TxSender, +}; + +use std::net::{SocketAddr, ToSocketAddrs}; +use std::time::Duration; +use tokio::time::{timeout, Instant}; +use tokio::sync::RwLock; + +// TODO: refactor +async fn get_latest_block( + mut block_stream: BlockStream, + commitment_config: CommitmentConfig, +) -> ProducedBlock { + let started = Instant::now(); + loop { + match timeout(Duration::from_millis(500), block_stream.recv()).await { + Ok(Ok(block)) => { + if block.commitment_config == commitment_config { + return block; + } + } + Err(_elapsed) => { + debug!( + "waiting for latest block ({}) ... {:.02}ms", + commitment_config.commitment, + started.elapsed().as_secs_f32() * 1000.0 + ); + } + Ok(Err(_error)) => { + panic!("Did not recv blocks"); + } + } + } +} + +const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameters { + connection_timeout: Duration::from_secs(2), + connection_retry_count: 10, + finalize_timeout: Duration::from_secs(2), + max_number_of_connections: 8, + unistream_timeout: Duration::from_secs(2), + write_timeout: Duration::from_secs(2), + number_of_transactions_per_unistream: 10, + unistreams_to_create_new_connection_in_percentage: 10, +}; + +pub async fn setup_tx_service() -> anyhow::Result<(TransactionService, DataCache, AnyhowJoinHandle)> { + let config = Config::load().await?; + let grpc_sources = config.get_grpc_sources(); + + let Config { + rpc_addr, + fanout_size, + identity_keypair, + transaction_retry_after_secs, + quic_proxy_addr, + prometheus_addr, + maximum_retries_per_tx, + account_filters, + .. + } = config; + + let validator_identity = Arc::new( + load_identity_keypair(identity_keypair) + .await? + .unwrap_or_else(Keypair::new), + ); + + let retry_after = Duration::from_secs(transaction_retry_after_secs); + let tpu_connection_path = configure_tpu_connection_path(quic_proxy_addr); + trace!("tpu_connection_path: {}", tpu_connection_path); + + let account_filters = if let Some(account_filters) = account_filters { + serde_json::from_str::(account_filters.as_str()) + .expect("Account filters should be valid") + } else { + vec![] + }; + + let rpc_client = Arc::new(RpcClient::new(rpc_addr.clone())); + + let timeouts = GrpcConnectionTimeouts { + connect_timeout: Duration::from_secs(5), + request_timeout: Duration::from_secs(5), + subscribe_timeout: Duration::from_secs(5), + receive_timeout: Duration::from_secs(5), + }; + + info!("Creating geyser subscription..."); + let (subscriptions, _cluster_endpoint_tasks) = create_grpc_subscription( + rpc_client.clone(), + grpc_sources + .iter() + .map(|s| { + GrpcSourceConfig::new(s.addr.clone(), s.x_token.clone(), None, timeouts.clone()) + }) + .collect(), + account_filters.clone(), + )?; + + let EndpointStreaming { + // note: blocks_notifier will be dropped at some point + blocks_notifier, + slot_notifier, + .. + } = subscriptions; + + info!("Waiting for first finalized block..."); + let finalized_block = + get_latest_block(blocks_notifier.resubscribe(), CommitmentConfig::finalized()).await; + info!("Got finalized block: {:?}", finalized_block.slot); + + let (epoch_data, _current_epoch_info) = EpochCache::bootstrap_epoch(&rpc_client).await?; + + let block_information_store = + BlockInformationStore::new(BlockInformation::from_block(&finalized_block)); + + let data_cache = DataCache { + block_information_store, + cluster_info: ClusterInfo::default(), + identity_stakes: IdentityStakes::new(validator_identity.pubkey()), + slot_cache: SlotCache::new(finalized_block.slot), + tx_subs: SubscriptionStore::default(), + txs: TxStore { + store: Arc::new(DashMap::new()), + }, + epoch_data, + leader_schedule: Arc::new(RwLock::new(CalculatedSchedule::default())), + }; + + let tpu_config = TpuServiceConfig { + fanout_slots: fanout_size, + maximum_transaction_in_queue: 20000, + quic_connection_params: QUIC_CONNECTION_PARAMS, + tpu_connection_path, + }; + + let spawner = ServiceSpawner { + prometheus_addr, + data_cache: data_cache.clone(), + }; + //init grpc leader schedule and vote account is configured. + let leader_schedule = Arc::new(JsonRpcLeaderGetter::new(rpc_client.clone(), 1024, 128)); + + let tpu_service: TpuService = TpuService::new( + tpu_config, + validator_identity, + leader_schedule, + data_cache.clone(), + ) + .await?; + + let tx_sender = TxSender::new(data_cache.clone(), tpu_service.clone()); + let tx_replayer = TransactionReplayer::new(tpu_service.clone(), data_cache.clone(), retry_after); + + trace!("spawning tx_service"); + let (transaction_service, tx_service_jh) = spawner.spawn_tx_service( + tx_sender, + tx_replayer, + tpu_service, + DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, + None, + maximum_retries_per_tx, + slot_notifier.resubscribe(), + ); + trace!("tx_service spawned successfully"); + + Ok((transaction_service, data_cache, tx_service_jh)) +} + +// TODO: deduplicate +pub fn configure_tpu_connection_path(quic_proxy_addr: Option) -> TpuConnectionPath { + match quic_proxy_addr { + None => { + TpuConnectionPath::QuicDirectPath + }, + Some(prox_address) => { + let proxy_socket_addr = parse_host_port(prox_address.as_str()).unwrap(); + TpuConnectionPath::QuicForwardProxyPath { + // e.g. "127.0.0.1:11111" or "localhost:11111" + forward_proxy_address: proxy_socket_addr, + } + } + } +} + +pub fn parse_host_port(host_port: &str) -> Result { + let addrs: Vec<_> = host_port + .to_socket_addrs() + .map_err(|err| format!("Unable to resolve host {host_port}: {err}"))? + .collect(); + if addrs.is_empty() { + Err(format!("Unable to resolve host: {host_port}")) + } else if addrs.len() > 1 { + Err(format!("Multiple addresses resolved for host: {host_port}")) + } else { + Ok(addrs[0]) + } +} diff --git a/services-integration-test/tests/txn_broadcast.rs b/services-integration-test/tests/txn_broadcast.rs new file mode 100644 index 00000000..191ebe25 --- /dev/null +++ b/services-integration-test/tests/txn_broadcast.rs @@ -0,0 +1,78 @@ +use bench::{create_memo_tx, create_rng, BenchmarkTransactionParams}; +use solana_sdk::{ + commitment_config::CommitmentConfig, signature::Keypair, + transaction::VersionedTransaction, +}; + +use log::{debug, info, trace}; + +use std::time::Duration; +use tokio::time::Instant; + +use crate::setup::setup_tx_service; + +mod setup; + +const SAMPLE_SIZE: usize = 10000; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +/// TC 4 +///- send txs on LiteRPC broadcast channel and consume them using the Solana quic-streamer +/// - see quic_proxy_tpu_integrationtest.rs (note: not only about proxy) +/// - run cargo test (maybe need to use release build) +/// - Goal: measure performance of LiteRPC internal channel/thread structure and the TPU_service performance +pub async fn txn_broadcast() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + info!("START BENCHMARK: txn_broadcast"); + + debug!("spawning tx_service"); + let (transaction_service, data_cache, _tx_jh) = setup_tx_service().await?; + debug!("tx_service spawned successfully"); + + let mut rng = create_rng(None); + let payer = Keypair::new(); + let params = BenchmarkTransactionParams { + tx_size: bench::tx_size::TxSize::Small, + cu_price_micro_lamports: 1, + }; + + let mut i = 0; + + let mut times: Vec = vec![]; + + // TODO: save stats + // TODO: txn sink? + while i < SAMPLE_SIZE { + let blockhash = data_cache.block_information_store.get_latest_blockhash(CommitmentConfig::confirmed()).await; + let tx = create_memo_tx(&payer, blockhash, &mut rng, ¶ms); + let serialized = bincode::serialize::(&tx) + .expect("Could not serialize VersionedTransaction"); + + info!("Sending txn: {:?} {:?}", tx.signatures[0], i); + let send_start = Instant::now(); + transaction_service + .send_transaction( + serialized, + Some(1), + ) + .await?; + let send_time = send_start.elapsed(); + debug!("sent in {:?}", send_time); + times.push(send_time); + i += 1; + } + + + times.sort(); + let median_time = times[times.len() / 2]; + let total_time: Duration = times.iter().sum(); + let max_time = times.iter().max().unwrap(); + let min_time = times.iter().min().unwrap(); + + info!("avg send time: {:?}", total_time.div_f64(f64::from(SAMPLE_SIZE as u32) )); + info!("max_time: {:?}", max_time); + info!("min_time: {:?}", min_time); + info!("median_time: {:?}", median_time); + + Ok(()) +} diff --git a/services/src/tpu_utils/tpu_service.rs b/services/src/tpu_utils/tpu_service.rs index 087a71f0..596aecc4 100644 --- a/services/src/tpu_utils/tpu_service.rs +++ b/services/src/tpu_utils/tpu_service.rs @@ -1,5 +1,6 @@ use anyhow::Context; use prometheus::{core::GenericGauge, opts, register_int_gauge}; +use tokio::time::Instant; use super::tpu_connection_manager::TpuConnectionManager; use crate::quic_connection_utils::QuicConnectionParameters; @@ -7,6 +8,7 @@ use crate::tpu_utils::quic_proxy_connection_manager::QuicProxyConnectionManager; use crate::tpu_utils::tpu_connection_path::TpuConnectionPath; use crate::tpu_utils::tpu_service::ConnectionManager::{DirectTpu, QuicProxy}; +use log::{debug, trace, warn, info}; use solana_lite_rpc_core::network_utils::log_gso_workaround; use solana_lite_rpc_core::stores::data_cache::DataCache; use solana_lite_rpc_core::structures::transaction_sent_info::SentTransactionInfo; @@ -107,6 +109,7 @@ impl TpuService { } pub fn send_transaction(&self, transaction: &SentTransactionInfo) -> anyhow::Result<()> { + debug!("send txn tpu"); self.broadcast_sender.send(transaction.clone())?; Ok(()) } @@ -178,18 +181,25 @@ impl TpuService { pub fn start(&self, slot_notifications: SlotStream) -> AnyhowJoinHandle { let this = self.clone(); + info!("ahhhhhh"); tokio::spawn(async move { let mut slot_notifications = slot_notifications; loop { + + // info!("loop"); let notification = slot_notifications .recv() .await .context("Tpu service cannot get slot notification")?; + // info!("update quic conn"); + let now = Instant::now(); this.update_quic_connections( notification.processed_slot, notification.estimated_processed_slot, ) .await?; + + // info!("quic conn updated in: {:?}", now.elapsed()); } }) } diff --git a/services/src/transaction_replayer.rs b/services/src/transaction_replayer.rs index 9766bc11..f58c838d 100644 --- a/services/src/transaction_replayer.rs +++ b/services/src/transaction_replayer.rs @@ -50,14 +50,14 @@ impl TransactionReplayer { pub fn start_service( &self, sender: UnboundedSender, - mut reciever: UnboundedReceiver, + mut receiver: UnboundedReceiver, ) -> AnyhowJoinHandle { let tpu_service = self.tpu_service.clone(); let data_cache = self.data_cache.clone(); let retry_offset = self.retry_offset; tokio::spawn(async move { - while let Some(mut tx_replay) = reciever.recv().await { + while let Some(mut tx_replay) = receiver.recv().await { MESSAGES_IN_REPLAY_QUEUE.dec(); let now = Instant::now(); if now < tx_replay.replay_at { diff --git a/services/src/transaction_service.rs b/services/src/transaction_service.rs index 2c00efd6..2fb544fd 100644 --- a/services/src/transaction_service.rs +++ b/services/src/transaction_service.rs @@ -1,7 +1,8 @@ // This class will manage the lifecycle for a transaction // It will send, replay if necessary and confirm by listening to blocks -use std::time::Duration; +use log::trace; +use std::{num::IntErrorKind, time::Duration}; use crate::{ tpu_utils::tpu_service::TpuService, @@ -126,12 +127,15 @@ impl TransactionService { raw_tx: Vec, max_retries: Option, ) -> anyhow::Result { + let s = Instant::now(); let tx = match bincode::deserialize::(&raw_tx) { Ok(tx) => tx, Err(err) => { - bail!(err.to_string()); + bail!(format!("Failed to deserialize raw_tx: {}", err.to_string())); } }; + let a = s.elapsed(); + trace!("deser: {:?}", a); let signature = tx.signatures[0]; let Some(BlockInformation { @@ -144,11 +148,14 @@ impl TransactionService { else { bail!("Blockhash not found in block store".to_string()); }; + let b = s.elapsed(); + trace!("block info: {:?}", b-a); if self.block_information_store.get_last_blockheight() > last_valid_blockheight { bail!("Blockhash is expired"); } - + let c = s.elapsed(); + trace!("block height: {:?}", c-b); let prioritization_fee = { let mut prioritization_fee = 0; for ix in tx.message.instructions() { @@ -165,6 +172,8 @@ impl TransactionService { } prioritization_fee }; + let d = s.elapsed(); + trace!("prio fee: {:?}", d-c); PRIORITY_FEES_HISTOGRAM.observe(prioritization_fee as f64); @@ -186,6 +195,8 @@ impl TransactionService { e ); } + let e = s.elapsed(); + trace!("txn channel: {:?}", e-d); let replay_at = Instant::now() + self.replay_offset; // ignore error for replay service if self @@ -200,6 +211,9 @@ impl TransactionService { { MESSAGES_IN_REPLAY_QUEUE.inc(); } + let f = s.elapsed(); + trace!("replay: {:?}", f-e); + trace!("toal: {:?}", f); Ok(signature.to_string()) } } diff --git a/services/src/tx_sender.rs b/services/src/tx_sender.rs index de006615..af1c9ce0 100644 --- a/services/src/tx_sender.rs +++ b/services/src/tx_sender.rs @@ -65,7 +65,9 @@ impl TxSender { } Err(err) => { TXS_SENT_ERRORS.inc_by(1); - warn!("{err}"); + let s = transaction_info.signature; + warn!("{s} - {err}"); + warn!("TXS_IN_CHANNEL: {:?}", TXS_IN_CHANNEL.get()); 0 } };