This commit is contained in:
Lou-Kamades 2024-03-28 10:04:16 -05:00
parent 0105602e05
commit 4da3158842
No known key found for this signature in database
GPG Key ID: 87A166E4D7C01F30
11 changed files with 858 additions and 21 deletions

30
Cargo.lock generated
View File

@ -4812,6 +4812,36 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "solana-lite-rpc-services-integration-test"
version = "0.1.0"
dependencies = [
"anyhow",
"bench",
"bincode",
"countmap",
"crossbeam-channel",
"dashmap 5.5.3",
"itertools 0.10.5",
"lite-rpc",
"log",
"serde",
"serde_json",
"solana-lite-rpc-address-lookup-tables",
"solana-lite-rpc-cluster-endpoints",
"solana-lite-rpc-core",
"solana-lite-rpc-prioritization-fees",
"solana-lite-rpc-services",
"solana-lite-rpc-util",
"solana-net-utils",
"solana-rpc-client",
"solana-sdk",
"solana-streamer",
"solana-transaction-status",
"tokio",
"tracing-subscriber",
]
[[package]]
name = "solana-lite-rpc-util"
version = "0.2.4"

View File

@ -5,6 +5,7 @@ members = [
"core",
"util",
"services",
"services-integration-test",
"lite-rpc",
"quic-forward-proxy",
"quic-forward-proxy-integration-test",

View File

@ -54,10 +54,10 @@ fn create_grpc_multiplex_processed_block_task(
loop {
// recv loop
if last_tick.elapsed() > Duration::from_millis(800) {
warn!(
"(soft_realtime) slow multiplex loop interation: {:?}",
last_tick.elapsed()
);
// warn!(
// "(soft_realtime) slow multiplex loop interation: {:?}",
// last_tick.elapsed()
// );
}
last_tick = Instant::now();
@ -344,16 +344,16 @@ pub fn create_grpc_multiplex_blocks_subscription(
}
if let Err(e) = producedblock_sender.send(processed_block.clone()) {
warn!("produced block channel has no receivers {e:?}");
// warn!("produced block channel has no receivers {e:?}");
}
if confirmed_block_not_yet_processed.remove(&processed_block.blockhash) {
if let Err(e) = producedblock_sender.send(processed_block.to_confirmed_block()) {
warn!("produced block channel has no receivers while trying to send confirmed block {e:?}");
// warn!("produced block channel has no receivers while trying to send confirmed block {e:?}");
}
}
if finalized_block_not_yet_processed.remove(&processed_block.blockhash) {
if let Err(e) = producedblock_sender.send(processed_block.to_finalized_block()) {
warn!("produced block channel has no receivers while trying to send confirmed block {e:?}");
// warn!("produced block channel has no receivers while trying to send confirmed block {e:?}");
}
}
recent_processed_blocks.insert(processed_block.blockhash, processed_block);
@ -365,7 +365,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
trace!("got processed blockinfo {} with blockhash {}",
blockinfo_processed.slot, blockhash);
if let Err(e) = blockinfo_sender.send(blockinfo_processed) {
warn!("Processed blockinfo channel has no receivers {e:?}");
// warn!("Processed blockinfo channel has no receivers {e:?}");
}
},
blockinfo_confirmed = block_info_reciever_confirmed.recv() => {
@ -375,7 +375,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
trace!("got confirmed blockinfo {} with blockhash {}",
blockinfo_confirmed.slot, blockhash);
if let Err(e) = blockinfo_sender.send(blockinfo_confirmed) {
warn!("Confirmed blockinfo channel has no receivers {e:?}");
// warn!("Confirmed blockinfo channel has no receivers {e:?}");
}
if let Some(cached_processed_block) = recent_processed_blocks.get(&blockhash) {
@ -383,7 +383,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
debug!("got confirmed blockinfo {} with blockhash {}",
confirmed_block.slot, confirmed_block.blockhash.clone());
if let Err(e) = producedblock_sender.send(confirmed_block) {
warn!("confirmed block channel has no receivers {e:?}");
// warn!("confirmed block channel has no receivers {e:?}");
}
} else {
confirmed_block_not_yet_processed.insert(blockhash);
@ -400,7 +400,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
trace!("got finalized blockinfo {} with blockhash {}",
blockinfo_finalized.slot, blockhash);
if let Err(e) = blockinfo_sender.send(blockinfo_finalized) {
warn!("Finalized blockinfo channel has no receivers {e:?}");
// warn!("Finalized blockinfo channel has no receivers {e:?}");
}
if let Some(cached_processed_block) = recent_processed_blocks.remove(&blockhash) {
@ -409,7 +409,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
debug!("got finalized blockinfo {} with blockhash {}",
finalized_block.slot, finalized_block.blockhash.clone());
if let Err(e) = producedblock_sender.send(finalized_block) {
warn!("Finalized block channel has no receivers {e:?}");
// warn!("Finalized block channel has no receivers {e:?}");
}
} else if startup_completed {
// this warning is ok for first few blocks when we start lrpc
@ -505,7 +505,7 @@ pub fn create_grpc_multiplex_processed_slots_subscription(
})
.context("Send slot to channel");
if send_result.is_err() {
warn!("Slot channel receiver is closed - aborting");
info!("Slot channel receiver is closed - aborting");
bail!("Slot channel receiver is closed - aborting");
}
@ -519,18 +519,18 @@ pub fn create_grpc_multiplex_processed_slots_subscription(
}
Ok(Some(Message::Connecting(attempt))) => {
if attempt > 1 {
warn!(
info!(
"Multiplexed geyser slot stream performs reconnect attempt {}",
attempt
);
}
}
Ok(None) => {
warn!("Multiplexed geyser source stream slot terminated - reconnect");
info!("Multiplexed geyser source stream slot terminated - reconnect");
break 'recv_loop;
}
Err(_elapsed) => {
warn!("Multiplexed geyser slot stream timeout - reconnect");
info!("Multiplexed geyser slot stream timeout - reconnect");
// throttle
sleep(Duration::from_millis(1500)).await;
break 'recv_loop;

View File

@ -0,0 +1,37 @@
[package]
name = "solana-lite-rpc-services-integration-test"
version = "0.1.0"
edition = "2021"
description = "Integration test for lite rpc services"
rust-version = "1.73.0"
repository = "https://github.com/blockworks-foundation/lite-rpc"
license = "AGPL"
publish = false
[dependencies]
bincode = { workspace = true }
bench = { path = "../bench" }
lite-rpc = { path = "../lite-rpc" }
solana-lite-rpc-address-lookup-tables = { workspace = true }
solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-util = { workspace = true }
solana-lite-rpc-services = { workspace = true }
solana-lite-rpc-prioritization-fees = { workspace = true }
solana-lite-rpc-cluster-endpoints = { workspace = true }
solana-sdk = { workspace = true }
solana-streamer = { workspace = true }
solana-transaction-status = { workspace = true }
solana-net-utils = { workspace = true }
solana-rpc-client = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
dashmap = { workspace = true }
itertools = { workspace = true }
tracing-subscriber = { workspace = true, features = ["std", "env-filter"] }
tokio = { version = "1.28.2", features = ["full", "fs"]}
[dev-dependencies]
crossbeam-channel = "0.5.6"
countmap = "0.2.0"

View File

@ -0,0 +1,232 @@
use dashmap::DashMap;
use solana_lite_rpc_core::{
keypair_loader::load_identity_keypair,
stores::{
block_information_store::{BlockInformation, BlockInformationStore},
cluster_info_store::ClusterInfo,
data_cache::{DataCache, SlotCache},
subscription_store::SubscriptionStore,
tx_store::TxStore,
},
structures::{
account_filter::AccountFilters, epoch::EpochCache, identity_stakes::IdentityStakes,
leaderschedule::CalculatedSchedule, produced_block::ProducedBlock,
},
types::{BlockStream, SlotStream}, AnyhowJoinHandle,
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{
commitment_config::CommitmentConfig, hash::hash, nonce::state::Data, signature::Keypair,
signer::Signer, transaction::VersionedTransaction,
};
use std::{process::abort, sync::Arc};
use lite_rpc::{cli::Config, service_spawner::ServiceSpawner, DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE};
use log::{debug, info, trace, warn};
use solana_lite_rpc_cluster_endpoints::{
endpoint_stremers::EndpointStreaming,
geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig},
grpc_subscription::create_grpc_subscription,
json_rpc_leaders_getter::JsonRpcLeaderGetter,
};
use solana_lite_rpc_services::{tpu_utils::tpu_connection_path::TpuConnectionPath, transaction_replayer::TransactionReplayer, transaction_service::TransactionService};
use solana_lite_rpc_services::tpu_utils::tpu_service::{TpuService, TpuServiceConfig};
use solana_lite_rpc_services::{
quic_connection_utils::QuicConnectionParameters, tx_sender::TxSender,
};
use std::net::{SocketAddr, ToSocketAddrs};
use std::time::Duration;
use tokio::time::{timeout, Instant};
use tokio::{sync::RwLock, task::JoinHandle};
// TODO: refactor
async fn get_latest_block(
mut block_stream: BlockStream,
commitment_config: CommitmentConfig,
) -> ProducedBlock {
let started = Instant::now();
loop {
match timeout(Duration::from_millis(500), block_stream.recv()).await {
Ok(Ok(block)) => {
if block.commitment_config == commitment_config {
return block;
}
}
Err(_elapsed) => {
debug!(
"waiting for latest block ({}) ... {:.02}ms",
commitment_config.commitment,
started.elapsed().as_secs_f32() * 1000.0
);
}
Ok(Err(_error)) => {
panic!("Did not recv blocks");
}
}
}
}
const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameters {
connection_timeout: Duration::from_secs(2),
connection_retry_count: 10,
finalize_timeout: Duration::from_secs(2),
max_number_of_connections: 8,
unistream_timeout: Duration::from_secs(2),
write_timeout: Duration::from_secs(2),
number_of_transactions_per_unistream: 10,
unistreams_to_create_new_connection_in_percentage: 10,
};
pub async fn setup_tx_service() -> anyhow::Result<(TransactionService, AnyhowJoinHandle)> {
let config = Config::load().await?;
let grpc_sources = config.get_grpc_sources();
let Config {
rpc_addr,
fanout_size,
identity_keypair,
transaction_retry_after_secs,
quic_proxy_addr,
prometheus_addr,
maximum_retries_per_tx,
account_filters,
..
} = config;
let validator_identity = Arc::new(
load_identity_keypair(identity_keypair)
.await?
.unwrap_or_else(Keypair::new),
);
let retry_after = Duration::from_secs(transaction_retry_after_secs);
let tpu_connection_path = configure_tpu_connection_path(quic_proxy_addr);
let account_filters = if let Some(account_filters) = account_filters {
serde_json::from_str::<AccountFilters>(account_filters.as_str())
.expect("Account filters should be valid")
} else {
vec![]
};
let rpc_client = Arc::new(RpcClient::new(rpc_addr.clone()));
let timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};
info!("Creating geyser subscription...");
let (subscriptions, _cluster_endpoint_tasks) = create_grpc_subscription(
rpc_client.clone(),
grpc_sources
.iter()
.map(|s| {
GrpcSourceConfig::new(s.addr.clone(), s.x_token.clone(), None, timeouts.clone())
})
.collect(),
account_filters.clone(),
)?;
let EndpointStreaming {
// note: blocks_notifier will be dropped at some point
blocks_notifier,
slot_notifier,
..
} = subscriptions;
info!("Waiting for first finalized block...");
let finalized_block =
get_latest_block(blocks_notifier.resubscribe(), CommitmentConfig::finalized()).await;
info!("Got finalized block: {:?}", finalized_block.slot);
let (epoch_data, _current_epoch_info) = EpochCache::bootstrap_epoch(&rpc_client).await?;
let block_information_store =
BlockInformationStore::new(BlockInformation::from_block(&finalized_block));
let data_cache = DataCache {
block_information_store,
cluster_info: ClusterInfo::default(),
identity_stakes: IdentityStakes::new(validator_identity.pubkey()),
slot_cache: SlotCache::new(finalized_block.slot),
tx_subs: SubscriptionStore::default(),
txs: TxStore {
store: Arc::new(DashMap::new()),
},
epoch_data,
leader_schedule: Arc::new(RwLock::new(CalculatedSchedule::default())),
};
let tpu_config = TpuServiceConfig {
fanout_slots: fanout_size,
maximum_transaction_in_queue: 20000,
quic_connection_params: QUIC_CONNECTION_PARAMS,
tpu_connection_path,
};
let spawner = ServiceSpawner {
prometheus_addr,
data_cache: data_cache.clone(),
};
//init grpc leader schedule and vote account is configured.
let leader_schedule = Arc::new(JsonRpcLeaderGetter::new(rpc_client.clone(), 1024, 128));
trace!("spawning TpuService");
let tpu_service: TpuService = TpuService::new(
tpu_config,
validator_identity,
leader_schedule,
data_cache.clone(),
)
.await?;
trace!("TpuService spawned successfully");
let tx_sender = TxSender::new(data_cache.clone(), tpu_service.clone());
let tx_replayer = TransactionReplayer::new(tpu_service.clone(), data_cache, retry_after);
trace!("spawning tx_service");
let (transaction_service, tx_service_jh) = spawner.spawn_tx_service(
tx_sender,
tx_replayer,
tpu_service,
DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE,
None,
maximum_retries_per_tx,
slot_notifier.resubscribe(),
);
trace!("tx_service spawned successfully");
Ok((transaction_service, tx_service_jh))
}
// TODO: deduplicate
pub fn configure_tpu_connection_path(quic_proxy_addr: Option<String>) -> TpuConnectionPath {
match quic_proxy_addr {
None => TpuConnectionPath::QuicDirectPath,
Some(prox_address) => {
let proxy_socket_addr = parse_host_port(prox_address.as_str()).unwrap();
TpuConnectionPath::QuicForwardProxyPath {
// e.g. "127.0.0.1:11111" or "localhost:11111"
forward_proxy_address: proxy_socket_addr,
}
}
}
}
pub fn parse_host_port(host_port: &str) -> Result<SocketAddr, String> {
let addrs: Vec<_> = host_port
.to_socket_addrs()
.map_err(|err| format!("Unable to resolve host {host_port}: {err}"))?
.collect();
if addrs.is_empty() {
Err(format!("Unable to resolve host: {host_port}"))
} else if addrs.len() > 1 {
Err(format!("Multiple addresses resolved for host: {host_port}"))
} else {
Ok(addrs[0])
}
}

View File

@ -0,0 +1,290 @@
use anyhow::bail;
use bench::{create_memo_tx, create_rng, BenchmarkTransactionParams};
use dashmap::DashMap;
use solana_lite_rpc_core::{
keypair_loader::load_identity_keypair,
solana_utils::SerializableTransaction,
stores::{
block_information_store::{BlockInformation, BlockInformationStore},
cluster_info_store::ClusterInfo,
data_cache::{DataCache, SlotCache},
subscription_store::SubscriptionStore,
tx_store::TxStore,
},
structures::{
account_filter::AccountFilters, epoch::EpochCache, identity_stakes::IdentityStakes,
leaderschedule::CalculatedSchedule, produced_block::ProducedBlock,
transaction_sent_info::SentTransactionInfo,
},
types::{BlockStream, SlotStream},
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{
commitment_config::CommitmentConfig, hash::hash, nonce::state::Data, signature::Keypair,
signer::Signer, transaction::VersionedTransaction,
};
use std::{process::abort, sync::Arc};
use lite_rpc::cli::Config;
use log::{debug, info, trace, warn};
use solana_lite_rpc_cluster_endpoints::{
endpoint_stremers::EndpointStreaming,
geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig},
grpc_subscription::create_grpc_subscription,
json_rpc_leaders_getter::JsonRpcLeaderGetter,
};
use solana_lite_rpc_services::tpu_utils::tpu_connection_path::TpuConnectionPath;
use solana_lite_rpc_services::tpu_utils::tpu_service::{TpuService, TpuServiceConfig};
use solana_lite_rpc_services::{
quic_connection_utils::QuicConnectionParameters, tx_sender::TxSender,
};
use std::net::{SocketAddr, ToSocketAddrs};
use std::time::Duration;
use tokio::time::{timeout, Instant};
use tokio::{sync::RwLock, task::JoinHandle};
// TODO: refactor
async fn get_latest_block(
mut block_stream: BlockStream,
commitment_config: CommitmentConfig,
) -> ProducedBlock {
let started = Instant::now();
loop {
match timeout(Duration::from_millis(500), block_stream.recv()).await {
Ok(Ok(block)) => {
if block.commitment_config == commitment_config {
return block;
}
}
Err(_elapsed) => {
debug!(
"waiting for latest block ({}) ... {:.02}ms",
commitment_config.commitment,
started.elapsed().as_secs_f32() * 1000.0
);
}
Ok(Err(_error)) => {
panic!("Did not recv blocks");
}
}
}
}
const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameters {
connection_timeout: Duration::from_secs(2),
connection_retry_count: 10,
finalize_timeout: Duration::from_secs(2),
max_number_of_connections: 8,
unistream_timeout: Duration::from_secs(2),
write_timeout: Duration::from_secs(2),
number_of_transactions_per_unistream: 10,
unistreams_to_create_new_connection_in_percentage: 10,
};
#[tokio::test]
/// - Goal: measure TPU_service performance
pub async fn tpu_service() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
info!("START BENCHMARK: tpu_service");
let (tpu_service, data_cache, slot_stream) = setup_tpu_service().await?;
// let mut slot_receiver = slot_stream.subscribe();
let tpu_jh = tpu_service.start(slot_stream.resubscribe());
info!("before unwrap");
let _ = tpu_jh.await.unwrap();
info!("after unwrap");
// let mut receiver = tpu_service.broadcast_sender.subscribe();
// let handle = tokio::spawn(async move {
// loop {
// tokio::select! {
// r = receiver.recv() => {
// match r {
// Ok(_) => {} //info!("{:?}", msg)
// Err(e) => {
// warn!("recv error: {:?}", e);
// abort();
// }
// }
// }
// // s = slot_receiver.recv() => {
// // match s {
// // Ok(_) => {debug!("slot")},
// // Err(e) => warn!("recv error: {:?}", e),
// // }
// // }
// }
// }
// });
// info!("tpu serv inited");
// let mut rng = create_rng(None);
// let payer = Keypair::new();
// let mut blockhash = hash(&[1, 2, 3]);
// let prio_fee = 1u64;
// let params = BenchmarkTransactionParams {
// tx_size: bench::tx_size::TxSize::Small,
// cu_price_micro_lamports: prio_fee,
// };
// let mut i = 0;
// while i < 3 {
// blockhash = hash(&blockhash.as_ref());
// let tx = create_memo_tx(&payer, blockhash, &mut rng, &params);
// let sent_tx = SentTransactionInfo {
// signature: *tx.get_signature(),
// slot: data_cache.slot_cache.get_current_slot(),
// transaction: bincode::serialize::<VersionedTransaction>(&tx)
// .expect("Could not serialize VersionedTransaction"),
// last_valid_block_height: data_cache.block_information_store.get_last_blockheight(),
// prioritization_fee: prio_fee,
// };
// info!("Sending txn: {:?}", i);
// tpu_service.send_transaction(&sent_tx)?;
// info!("Txn sent success");
// i += 1;
// }
// handle.await?;
Ok(())
}
async fn setup_tpu_service() -> anyhow::Result<(TpuService, DataCache, SlotStream)> {
let config = Config::load().await?;
let grpc_sources = config.get_grpc_sources();
let Config {
rpc_addr,
fanout_size,
identity_keypair,
quic_proxy_addr,
account_filters,
..
} = config;
let validator_identity = Arc::new(
load_identity_keypair(identity_keypair)
.await?
.unwrap_or_else(Keypair::new),
);
let tpu_connection_path = configure_tpu_connection_path(quic_proxy_addr);
let account_filters = if let Some(account_filters) = account_filters {
serde_json::from_str::<AccountFilters>(account_filters.as_str())
.expect("Account filters should be valid")
} else {
vec![]
};
let rpc_client = Arc::new(RpcClient::new(rpc_addr.clone()));
info!("RPC: {:?}", rpc_addr.clone());
let timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};
info!("Creating geyser subscription...");
let (subscriptions, _) = create_grpc_subscription(
rpc_client.clone(),
grpc_sources
.iter()
.map(|s| {
GrpcSourceConfig::new(s.addr.clone(), s.x_token.clone(), None, timeouts.clone())
})
.collect(),
account_filters.clone(),
)?;
let EndpointStreaming {
// note: blocks_notifier will be dropped at some point
blocks_notifier,
slot_notifier,
..
} = subscriptions;
info!("Waiting for first finalized block...");
let finalized_block =
get_latest_block(blocks_notifier.resubscribe(), CommitmentConfig::finalized()).await;
info!("Got finalized block: {:?}", finalized_block.slot);
let (epoch_data, _current_epoch_info) = EpochCache::bootstrap_epoch(&rpc_client).await?;
let block_information_store =
BlockInformationStore::new(BlockInformation::from_block(&finalized_block));
let data_cache = DataCache {
block_information_store,
cluster_info: ClusterInfo::default(),
identity_stakes: IdentityStakes::new(validator_identity.pubkey()),
slot_cache: SlotCache::new(finalized_block.slot),
tx_subs: SubscriptionStore::default(),
txs: TxStore {
store: Arc::new(DashMap::new()),
},
epoch_data,
leader_schedule: Arc::new(RwLock::new(CalculatedSchedule::default())),
};
let tpu_config = TpuServiceConfig {
fanout_slots: fanout_size,
maximum_transaction_in_queue: 20000,
quic_connection_params: QUIC_CONNECTION_PARAMS,
tpu_connection_path,
};
//init grpc leader schedule and vote account is configured.
let leader_schedule = Arc::new(JsonRpcLeaderGetter::new(rpc_client.clone(), 1024, 128));
trace!("spawning TpuService");
let tpu_service: TpuService = TpuService::new(
tpu_config,
validator_identity,
leader_schedule,
data_cache.clone(),
)
.await?;
trace!("TpuService created successfully");
Ok((tpu_service, data_cache, slot_notifier))
}
// TODO: deduplicate
fn configure_tpu_connection_path(quic_proxy_addr: Option<String>) -> TpuConnectionPath {
match quic_proxy_addr {
None => TpuConnectionPath::QuicDirectPath,
Some(prox_address) => {
let proxy_socket_addr = parse_host_port(prox_address.as_str()).unwrap();
TpuConnectionPath::QuicForwardProxyPath {
// e.g. "127.0.0.1:11111" or "localhost:11111"
forward_proxy_address: proxy_socket_addr,
}
}
}
}
fn parse_host_port(host_port: &str) -> Result<SocketAddr, String> {
let addrs: Vec<_> = host_port
.to_socket_addrs()
.map_err(|err| format!("Unable to resolve host {host_port}: {err}"))?
.collect();
if addrs.is_empty() {
Err(format!("Unable to resolve host: {host_port}"))
} else if addrs.len() > 1 {
Err(format!("Multiple addresses resolved for host: {host_port}"))
} else {
Ok(addrs[0])
}
}

View File

@ -0,0 +1,123 @@
// use anyhow::bail;
// use bench::{create_memo_tx, create_rng, BenchmarkTransactionParams};
// use log::{debug, info, trace, warn};
// use solana_lite_rpc_core::{
// solana_utils::SerializableTransaction,
// structures::transaction_sent_info::SentTransactionInfo,
// types::{BlockStream, SlotStream},
// AnyhowJoinHandle,
// };
// use solana_rpc_client::nonblocking::rpc_client::RpcClient;
// use solana_sdk::{
// commitment_config::CommitmentConfig, hash::hash, nonce::state::Data, signature::Keypair,
// signer::Signer, transaction::VersionedTransaction,
// };
// use solana_lite_rpc_services::tpu_utils::tpu_service::{TpuService, TpuServiceConfig};
// use solana_lite_rpc_services::{
// quic_connection_utils::QuicConnectionParameters, tx_sender::TxSender,
// };
// use solana_lite_rpc_services::{
// tpu_utils::tpu_connection_path::TpuConnectionPath, transaction_replayer::TransactionReplay,
// };
// use std::net::{SocketAddr, ToSocketAddrs};
// use std::time::Duration;
// use tokio::time::{timeout, Instant};
// use tokio::{sync::RwLock, task::JoinHandle};
// use crate::setup::setup_services;
// mod setup;
// #[tokio::test(flavor = "multi_thread", worker_threads = 10)]
// /// - Goal: measure replay_service performance
// pub async fn transaction_replay() -> anyhow::Result<()> {
// tracing_subscriber::fmt::init();
// info!("START BENCHMARK: tpu_service");
// let (_tx_sender, replay_service, tpu_service, data_cache, slot_stream) =
// setup_services().await?;
// let (replay_sender, mut replay_receiver) = tokio::sync::mpsc::unbounded_channel();
// let replay_channel = replay_sender.clone();
// let retry_offset = replay_service.retry_offset;
// // let r2 = replay_receiver.re();
// let mut rng = create_rng(None);
// let payer = Keypair::new();
// let mut blockhash = hash(&[1, 2, 3]);
// let prio_fee = 1u64;
// let params = BenchmarkTransactionParams {
// tx_size: bench::tx_size::TxSize::Small,
// cu_price_micro_lamports: prio_fee,
// };
// let tpu_jh = tpu_service.start(slot_stream.resubscribe());
// let replay_jh = replay_service.start_service(replay_sender, replay_receiver);
// // let replay_jh = tokio::spawn( async move {
// // while let Some(mut tx_replay) = replay_receiver.recv().await {
// // info!("received");
// // }
// // });
// info!("replayer spawned");
// tokio::spawn(async move {
// let mut i = 0;
// loop {
// while i < 10000 {
// blockhash = hash(&blockhash.as_ref());
// let tx = create_memo_tx(&payer, blockhash, &mut rng, &params);
// let sent_tx = SentTransactionInfo {
// signature: *tx.get_signature(),
// slot: data_cache.slot_cache.get_current_slot(),
// transaction: bincode::serialize::<VersionedTransaction>(&tx)
// .expect("Could not serialize VersionedTransaction"),
// last_valid_block_height: data_cache
// .block_information_store
// .get_last_blockheight(),
// prioritization_fee: prio_fee,
// };
// let replay_at = Instant::now() + retry_offset;
// let r = TransactionReplay {
// transaction: sent_tx,
// replay_count: 1,
// max_replay: 1,
// replay_at,
// };
// trace!("Sending replay: {:?}", i);
// replay_channel.send(r).unwrap();
// trace!("replay send success");
// i += 1
// }
// tokio::time::sleep(Duration::from_millis(500)).await;
// }
// });
// let handle: AnyhowJoinHandle = tokio::spawn(async move {
// tokio::select! {
// res = tpu_jh => {
// bail!("Tpu Service {res:?}")
// },
// res = replay_jh => {
// bail!("Replay Service {res:?}")
// },
// // res = send_jh => {
// // bail!("Replay Send task {res:?}")
// // }
// }
// });
// handle.await?;
// Ok(())
// }

View File

@ -0,0 +1,103 @@
use std::sync::Arc;
use bench::{create_memo_tx, create_rng, BenchmarkTransactionParams};
use dashmap::DashMap;
use itertools::Itertools;
use serde::Serialize;
use solana_lite_rpc_core::{
keypair_loader::load_identity_keypair,
stores::{
block_information_store::{BlockInformation, BlockInformationStore},
cluster_info_store::ClusterInfo,
data_cache::{DataCache, SlotCache},
subscription_store::SubscriptionStore,
tx_store::TxStore,
},
structures::{
account_filter::AccountFilters, epoch::EpochCache, identity_stakes::IdentityStakes,
leaderschedule::CalculatedSchedule, produced_block::ProducedBlock,
},
types::BlockStream,
AnyhowJoinHandle,
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{
commitment_config::CommitmentConfig, hash::hash, signature::Keypair, signer::Signer,
transaction::VersionedTransaction,
};
use lite_rpc::{cli::Config, service_spawner::ServiceSpawner};
use lite_rpc::{DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, MAX_NB_OF_CONNECTIONS_WITH_LEADERS};
use log::{debug, info, trace};
use solana_lite_rpc_address_lookup_tables::address_lookup_table_store::AddressLookupTableStore;
use solana_lite_rpc_cluster_endpoints::{
endpoint_stremers::EndpointStreaming,
geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig},
grpc_subscription::create_grpc_subscription,
json_rpc_leaders_getter::JsonRpcLeaderGetter,
};
use solana_lite_rpc_core::traits::address_lookup_table_interface::AddressLookupTableInterface;
use solana_lite_rpc_prioritization_fees::account_prio_service::AccountPrioService;
use solana_lite_rpc_services::quic_connection_utils::QuicConnectionParameters;
use solana_lite_rpc_services::tpu_utils::tpu_service::{TpuService, TpuServiceConfig};
use solana_lite_rpc_services::transaction_replayer::TransactionReplayer;
use solana_lite_rpc_services::tx_sender::TxSender;
use solana_lite_rpc_services::{
data_caching_service::DataCachingService, tpu_utils::tpu_connection_path::TpuConnectionPath,
transaction_service::TransactionService,
};
use solana_lite_rpc_prioritization_fees::start_block_priofees_task;
use std::net::{SocketAddr, ToSocketAddrs};
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::{
io::AsyncReadExt,
time::{timeout, Instant},
};
use crate::setup::setup_tx_service;
mod setup;
#[tokio::test]
/// TC 4
///- send txs on LiteRPC broadcast channel and consume them using the Solana quic-streamer
/// - see quic_proxy_tpu_integrationtest.rs (note: not only about proxy)
/// - run cargo test (maybe need to use release build)
/// - Goal: measure performance of LiteRPC internal channel/thread structure and the TPU_service performance
pub async fn txn_broadcast() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
info!("START BENCHMARK: txn_broadcast");
debug!("spawning tx_service");
let (transaction_service, tx_jh) = setup_tx_service().await?;
debug!("tx_service spawned successfully");
let mut rng = create_rng(None);
let payer = Keypair::new();
let mut blockhash = hash(&[1, 2, 3]);
let params = BenchmarkTransactionParams {
tx_size: bench::tx_size::TxSize::Small,
cu_price_micro_lamports: 1,
};
let mut i = 0;
while i < 3 {
blockhash = hash(&blockhash.as_ref());
let tx = create_memo_tx(&payer, blockhash, &mut rng, &params);
info!("Sending txn: {:?}", i);
transaction_service
.send_transaction(
bincode::serialize::<VersionedTransaction>(&tx)
.expect("Could not serialize VersionedTransaction"),
Some(1),
)
.await?;
info!("Txn sent success");
i += 1;
}
Ok(())
}

View File

@ -1,5 +1,7 @@
use anyhow::Context;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use tokio::sync::broadcast::Receiver;
use tokio::time::Instant;
use super::tpu_connection_manager::TpuConnectionManager;
use crate::quic_connection_utils::QuicConnectionParameters;
@ -7,6 +9,7 @@ use crate::tpu_utils::quic_proxy_connection_manager::QuicProxyConnectionManager;
use crate::tpu_utils::tpu_connection_path::TpuConnectionPath;
use crate::tpu_utils::tpu_service::ConnectionManager::{DirectTpu, QuicProxy};
use log::{debug, trace, warn, info};
use solana_lite_rpc_core::network_utils::log_gso_workaround;
use solana_lite_rpc_core::stores::data_cache::DataCache;
use solana_lite_rpc_core::structures::transaction_sent_info::SentTransactionInfo;
@ -107,6 +110,7 @@ impl TpuService {
}
pub fn send_transaction(&self, transaction: &SentTransactionInfo) -> anyhow::Result<()> {
debug!("send txn tpu");
self.broadcast_sender.send(transaction.clone())?;
Ok(())
}
@ -178,18 +182,25 @@ impl TpuService {
pub fn start(&self, slot_notifications: SlotStream) -> AnyhowJoinHandle {
let this = self.clone();
info!("ahhhhhh");
tokio::spawn(async move {
let mut slot_notifications = slot_notifications;
loop {
// info!("loop");
let notification = slot_notifications
.recv()
.await
.context("Tpu service cannot get slot notification")?;
// info!("update quic conn");
let now = Instant::now();
this.update_quic_connections(
notification.processed_slot,
notification.estimated_processed_slot,
)
.await?;
// info!("quic conn updated in: {:?}", now.elapsed());
}
})
}

View File

@ -1,6 +1,6 @@
use crate::tpu_utils::tpu_service::TpuService;
use anyhow::{bail, Context};
use log::error;
use log::{error, info};
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use solana_lite_rpc_core::{
stores::data_cache::DataCache, structures::transaction_sent_info::SentTransactionInfo,
@ -50,14 +50,17 @@ impl TransactionReplayer {
pub fn start_service(
&self,
sender: UnboundedSender<TransactionReplay>,
mut reciever: UnboundedReceiver<TransactionReplay>,
mut receiver: UnboundedReceiver<TransactionReplay>,
) -> AnyhowJoinHandle {
let tpu_service = self.tpu_service.clone();
let data_cache = self.data_cache.clone();
let retry_offset = self.retry_offset;
info!("replay inner spawn");
tokio::spawn(async move {
while let Some(mut tx_replay) = reciever.recv().await {
info!("async moved");
while let Some(mut tx_replay) = receiver.recv().await {
MESSAGES_IN_REPLAY_QUEUE.dec();
let now = Instant::now();
if now < tx_replay.replay_at {

View File

@ -1,6 +1,7 @@
// This class will manage the lifecycle for a transaction
// It will send, replay if necessary and confirm by listening to blocks
use log::debug;
use std::time::Duration;
use crate::{
@ -126,14 +127,17 @@ impl TransactionService {
raw_tx: Vec<u8>,
max_retries: Option<u16>,
) -> anyhow::Result<String> {
debug!("send txn service");
let tx = match bincode::deserialize::<VersionedTransaction>(&raw_tx) {
Ok(tx) => tx,
Err(err) => {
bail!(err.to_string());
bail!(format!("Failed to deserialize raw_tx: {}", err.to_string()));
}
};
let signature = tx.signatures[0];
debug!("sig: {:?}", signature);
let Some(BlockInformation {
slot,
last_valid_blockheight,
@ -148,7 +152,7 @@ impl TransactionService {
if self.block_information_store.get_last_blockheight() > last_valid_blockheight {
bail!("Blockhash is expired");
}
debug!("a");
let prioritization_fee = {
let mut prioritization_fee = 0;
for ix in tx.message.instructions() {
@ -167,6 +171,7 @@ impl TransactionService {
};
PRIORITY_FEES_HISTOGRAM.observe(prioritization_fee as f64);
debug!("b");
let max_replay = max_retries.map_or(self.max_retries, |x| x as usize);
let transaction_info = SentTransactionInfo {
@ -187,6 +192,8 @@ impl TransactionService {
);
}
let replay_at = Instant::now() + self.replay_offset;
debug!("c");
// ignore error for replay service
if self
.replay_channel