benchrunner service (#363)

benchrunner service
+ minor changes in bench code
This commit is contained in:
Groovie | Mango 2024-03-26 13:41:46 +01:00 committed by GitHub
parent 087cc7f204
commit 3c994597b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 1398 additions and 175 deletions

View File

@ -0,0 +1,23 @@
name: Deploy benchrunner to Fly
on:
push:
tags:
- 'production/benchrunner-*'
- 'experimental/benchrunner-*'
env:
FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }}
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup Fly
uses: superfly/flyctl-actions/setup-flyctl@master
- name: Deploy solana-lite-rpc-benchrunner
run: flyctl deploy -c cd/solana-lite-rpc-benchrunner.toml --remote-only

46
Cargo.lock generated
View File

@ -3233,6 +3233,18 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0"
[[package]]
name = "postgres-derive"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83145eba741b050ef981a9a1838c843fa7665e154383325aa8b440ae703180a2"
dependencies = [
"heck 0.4.1",
"proc-macro2",
"quote",
"syn 2.0.55",
]
[[package]]
name = "postgres-native-tls"
version = "0.5.0"
@ -3273,7 +3285,10 @@ dependencies = [
"bytes",
"chrono",
"fallible-iterator",
"postgres-derive",
"postgres-protocol",
"serde",
"serde_json",
]
[[package]]
@ -4495,6 +4510,37 @@ dependencies = [
"tokio",
]
[[package]]
name = "solana-lite-rpc-benchrunner-service"
version = "0.2.4"
dependencies = [
"anyhow",
"async-trait",
"bench",
"chrono",
"clap 4.5.3",
"futures",
"futures-util",
"itertools 0.10.5",
"lazy_static",
"log",
"native-tls",
"postgres-native-tls",
"postgres-types",
"prometheus",
"serde",
"serde_json",
"solana-lite-rpc-util",
"solana-rpc-client",
"solana-rpc-client-api",
"solana-sdk",
"solana-transaction-status",
"tokio",
"tokio-postgres",
"tokio-util",
"tracing-subscriber",
]
[[package]]
name = "solana-lite-rpc-blockstore"
version = "0.2.4"

View File

@ -12,6 +12,7 @@ members = [
"blockstore",
"prioritization_fees",
"bench",
"benchrunner-service",
"address-lookup-tables",
"accounts",
"accounts-on-demand",
@ -84,6 +85,7 @@ solana-lite-rpc-prioritization-fees = {path = "prioritization_fees", version="0.
solana-lite-rpc-address-lookup-tables = {path = "address-lookup-tables", version="0.2.4"}
solana-lite-rpc-accounts = {path = "accounts", version = "0.2.4"}
solana-lite-rpc-accounts-on-demand = {path = "accounts-on-demand", version = "0.2.4"}
bench = { path = "bench", version="0.2.4" }
async-trait = "0.1.68"
yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" }

31
Dockerfile-benchrunner Normal file
View File

@ -0,0 +1,31 @@
# syntax = docker/dockerfile:1.2
FROM rust:1.75.0 as base
RUN cargo install cargo-chef@0.1.62 --locked
RUN rustup component add rustfmt
RUN apt-get update && apt-get install -y clang cmake ssh
WORKDIR /app
FROM base AS plan
COPY . .
WORKDIR /app
RUN cargo chef prepare --recipe-path recipe.json
FROM base as build
COPY --from=plan /app/recipe.json recipe.json
RUN cargo chef cook --release --recipe-path recipe.json
COPY . .
RUN cargo build --release --bin solana-lite-rpc-benchrunner-service
FROM debian:bookworm-slim as run
RUN apt-get update && apt-get -y install ca-certificates libc6 libssl3 libssl-dev openssl
COPY openssl-legacy.cnf /etc/ssl/openssl-legacy.cnf
COPY --from=build /app/target/release/solana-lite-rpc-benchrunner-service /usr/local/bin/
ENV OPENSSL_CONF=/etc/ssl/openssl-legacy.cnf
CMD solana-lite-rpc-benchrunner-service \
--bench-interval 600000 \
--tx-count 100 \
--prio-fees 100000 --prio-fees 1000 --prio-fees 0

166
bench/src/bench1.rs Normal file
View File

@ -0,0 +1,166 @@
use crate::{helpers::BenchHelper, metrics::Metric, metrics::TxMetricData};
use dashmap::DashMap;
use log::warn;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::hash::Hash;
use solana_sdk::signature::Keypair;
use solana_sdk::signature::Signature;
use solana_sdk::slot_history::Slot;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use tokio::{
sync::{mpsc::UnboundedSender, RwLock},
time::{Duration, Instant},
};
#[derive(Clone, Debug, Copy)]
struct TxSendData {
sent_duration: Duration,
sent_instant: Instant,
sent_slot: Slot,
transaction_bytes: u64,
}
struct ApiCallerResult {
gross_send_time: Duration,
}
// called by benchrunner-service
#[allow(clippy::too_many_arguments)]
pub async fn bench(
rpc_client: Arc<RpcClient>,
tx_count: usize,
funded_payer: Keypair,
seed: u64,
block_hash: Arc<RwLock<Hash>>,
current_slot: Arc<AtomicU64>,
tx_metric_sx: UnboundedSender<TxMetricData>,
log_txs: bool,
transaction_size: TransactionSize,
cu_price_micro_lamports: u64,
) -> Metric {
let map_of_txs: Arc<DashMap<Signature, TxSendData>> = Arc::new(DashMap::new());
// transaction sender task
let api_caller_result = {
let map_of_txs = map_of_txs.clone();
let rpc_client = rpc_client.clone();
let current_slot = current_slot.clone();
tokio::spawn(async move {
let map_of_txs = map_of_txs.clone();
let n_chars = match transaction_size {
TransactionSize::Small => 10,
TransactionSize::Large => 232, // 565 is max but we need to lower that to not burn the CUs
};
let rand_strings = BenchHelper::generate_random_strings(tx_count, Some(seed), n_chars);
let bench_start_time = Instant::now();
for rand_string in &rand_strings {
let blockhash = { *block_hash.read().await };
let tx = match transaction_size {
TransactionSize::Small => BenchHelper::create_memo_tx_small(
rand_string,
&funded_payer,
blockhash,
cu_price_micro_lamports,
),
TransactionSize::Large => BenchHelper::create_memo_tx_large(
rand_string,
&funded_payer,
blockhash,
cu_price_micro_lamports,
),
};
let start_time = Instant::now();
match rpc_client.send_transaction(&tx).await {
Ok(signature) => {
map_of_txs.insert(
signature,
TxSendData {
sent_duration: start_time.elapsed(),
sent_instant: Instant::now(),
sent_slot: current_slot.load(std::sync::atomic::Ordering::Relaxed),
transaction_bytes: bincode::serialized_size(&tx).unwrap(),
},
);
}
Err(e) => {
warn!("tx send failed with error {}", e);
}
}
}
ApiCallerResult {
gross_send_time: bench_start_time.elapsed(),
}
})
};
let mut metric = Metric::default();
let confirmation_time = Instant::now();
let mut confirmed_count = 0;
while confirmation_time.elapsed() < Duration::from_secs(60)
&& !(map_of_txs.is_empty() && confirmed_count == tx_count)
{
let signatures = map_of_txs.iter().map(|x| *x.key()).collect::<Vec<_>>();
if signatures.is_empty() {
tokio::time::sleep(Duration::from_millis(1)).await;
continue;
}
if let Ok(res) = rpc_client.get_signature_statuses(&signatures).await {
for (i, signature) in signatures.iter().enumerate() {
let tx_status = &res.value[i];
if tx_status.is_some() {
let tx_data = map_of_txs.get(signature).unwrap();
let time_to_confirm = tx_data.sent_instant.elapsed();
let transaction_bytes = tx_data.transaction_bytes;
metric.add_successful_transaction(
tx_data.sent_duration,
time_to_confirm,
transaction_bytes,
);
if log_txs {
let _ = tx_metric_sx.send(TxMetricData {
signature: signature.to_string(),
sent_slot: tx_data.sent_slot,
confirmed_slot: current_slot.load(Ordering::Relaxed),
time_to_send_in_millis: tx_data.sent_duration.as_millis() as u64,
time_to_confirm_in_millis: time_to_confirm.as_millis() as u64,
});
}
drop(tx_data);
map_of_txs.remove(signature);
confirmed_count += 1;
}
}
}
}
for tx in map_of_txs.iter() {
metric.add_unsuccessful_transaction(tx.sent_duration, tx.transaction_bytes);
}
let api_caller_result = api_caller_result
.await
.expect("api caller task must succeed");
metric
.set_total_gross_send_time(api_caller_result.gross_send_time.as_micros() as f64 / 1_000.0);
metric.finalize();
metric
}
// see https://spl.solana.com/memo for sizing of transactions
// As of v1.5.1, an unsigned instruction can support single-byte UTF-8 of up to 566 bytes.
// An instruction with a simple memo of 32 bytes can support up to 12 signers.
#[derive(Debug, Clone, Copy)]
pub enum TransactionSize {
// 179 bytes, 5237 CUs
Small,
// 1186 bytes, 193175 CUs
Large,
}

View File

@ -96,6 +96,7 @@ impl BenchHelper {
funded_payer: &Keypair,
blockhash: Hash,
random_seed: Option<u64>,
cu_price_micro_lamports: u64,
) -> Vec<Transaction> {
let seed = random_seed.map_or(0, |x| x);
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(seed);
@ -103,31 +104,51 @@ impl BenchHelper {
.map(|_| {
let random_bytes: Vec<u8> = Alphanumeric.sample_iter(&mut rng).take(10).collect();
Self::create_memo_tx_small(&random_bytes, funded_payer, blockhash)
Self::create_memo_tx_small(
&random_bytes,
funded_payer,
blockhash,
cu_price_micro_lamports,
)
})
.collect()
}
pub fn create_memo_tx_small(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction {
// note: there is another version of this
pub fn create_memo_tx_small(
msg: &[u8],
payer: &Keypair,
blockhash: Hash,
cu_price_micro_lamports: u64,
) -> Transaction {
let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap();
let instruction = Instruction::new_with_bytes(memo, msg, vec![]);
let cu = compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(10000);
let price: Instruction =
compute_budget::ComputeBudgetInstruction::set_compute_unit_price(1000000);
let message = Message::new(&[cu, price, instruction], Some(&payer.pubkey()));
let cu_request: Instruction =
compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(14000);
let instructions = if cu_price_micro_lamports > 0 {
let cu_budget_ix: Instruction =
compute_budget::ComputeBudgetInstruction::set_compute_unit_price(
cu_price_micro_lamports,
);
vec![cu_request, cu_budget_ix, instruction]
} else {
vec![cu_request, instruction]
};
let message = Message::new(&instructions, Some(&payer.pubkey()));
Transaction::new(&[payer], message, blockhash)
}
pub fn create_memo_tx_large(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction {
pub fn create_memo_tx_large(
msg: &[u8],
payer: &Keypair,
blockhash: Hash,
cu_price_micro_lamports: u64,
) -> Transaction {
let accounts = (0..8).map(|_| Keypair::new()).collect_vec();
let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap();
let cu = compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(10000);
let price: Instruction =
compute_budget::ComputeBudgetInstruction::set_compute_unit_price(1000000);
let instruction = Instruction::new_with_bytes(
memo,
msg,
@ -136,7 +157,18 @@ impl BenchHelper {
.map(|keypair| AccountMeta::new_readonly(keypair.pubkey(), true))
.collect_vec(),
);
let message = Message::new(&[cu, price, instruction], Some(&payer.pubkey()));
let instructions = if cu_price_micro_lamports > 0 {
let cu_budget_ix: Instruction =
compute_budget::ComputeBudgetInstruction::set_compute_unit_price(
cu_price_micro_lamports,
);
vec![cu_budget_ix, instruction]
} else {
vec![instruction]
};
let message = Message::new(&instructions, Some(&payer.pubkey()));
let mut signers = vec![payer];
signers.extend(accounts.iter());
@ -155,7 +187,7 @@ fn transaction_size_small() {
let seed = 42;
let random_strings = BenchHelper::generate_random_strings(1, Some(seed), 10);
let rand_string = random_strings.first().unwrap();
let tx = BenchHelper::create_memo_tx_small(rand_string, &payer_keypair, blockhash);
let tx = BenchHelper::create_memo_tx_small(rand_string, &payer_keypair, blockhash, 300);
assert_eq!(bincode::serialized_size(&tx).unwrap(), 231);
}
@ -170,7 +202,7 @@ fn transaction_size_large() {
let seed = 42;
let random_strings = BenchHelper::generate_random_strings(1, Some(seed), 232);
let rand_string = random_strings.first().unwrap();
let tx = BenchHelper::create_memo_tx_large(rand_string, &payer_keypair, blockhash);
let tx = BenchHelper::create_memo_tx_large(rand_string, &payer_keypair, blockhash, 300);
assert_eq!(bincode::serialized_size(&tx).unwrap(), 1230);
assert_eq!(bincode::serialized_size(&tx).unwrap(), 1222);
}

View File

@ -21,9 +21,11 @@ use solana_transaction_status::TransactionStatus;
use std::{str::FromStr, time::Duration};
use tokio::time::Instant;
pub mod bench1;
pub mod benches;
pub mod helpers;
pub mod metrics;
pub mod service_adapter;
pub mod tx_size;
#[derive(Parser, Debug)]
@ -208,6 +210,7 @@ pub fn create_memo_tx(
}
}
// note: there is another version of this
pub fn create_memo_tx_small(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction {
let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap();

View File

@ -1,26 +1,21 @@
use bench::{
bench1,
helpers::BenchHelper,
metrics::{AvgMetric, Metric, TxMetricData},
Args,
};
use clap::Parser;
use dashmap::DashMap;
use futures::future::join_all;
use log::{error, info, warn};
use log::{error, info};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::signature::Signature;
use bench::bench1::TransactionSize;
use solana_sdk::{
commitment_config::CommitmentConfig, hash::Hash, signature::Keypair, signer::Signer,
slot_history::Slot,
};
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use tokio::{
sync::{mpsc::UnboundedSender, RwLock},
time::{Duration, Instant},
};
use std::sync::{atomic::AtomicU64, Arc};
use tokio::{sync::RwLock, time::Duration};
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
async fn main() {
@ -36,6 +31,8 @@ async fn main() {
large_transactions,
} = Args::parse();
let cu_price_micro_lamports = 300;
let mut run_interval_ms = tokio::time::interval(Duration::from_millis(run_interval_ms));
let transaction_size = if large_transactions {
@ -103,7 +100,7 @@ async fn main() {
for seed in 0..runs {
let funded_payer = Keypair::from_bytes(funded_payer.to_bytes().as_slice()).unwrap();
tasks.push(tokio::spawn(bench(
tasks.push(tokio::spawn(bench1::bench(
rpc_client.clone(),
tx_count,
funded_payer,
@ -113,6 +110,7 @@ async fn main() {
tx_log_sx.clone(),
log_transactions,
transaction_size,
cu_price_micro_lamports,
)));
// wait for an interval
run_interval_ms.tick().await;
@ -145,145 +143,3 @@ async fn main() {
csv_writer.flush().unwrap();
}
#[derive(Clone, Debug, Copy)]
struct TxSendData {
sent_duration: Duration,
sent_instant: Instant,
sent_slot: Slot,
transaction_bytes: u64,
}
struct ApiCallerResult {
gross_send_time: Duration,
}
#[allow(clippy::too_many_arguments)]
async fn bench(
rpc_client: Arc<RpcClient>,
tx_count: usize,
funded_payer: Keypair,
seed: u64,
block_hash: Arc<RwLock<Hash>>,
current_slot: Arc<AtomicU64>,
tx_metric_sx: UnboundedSender<TxMetricData>,
log_txs: bool,
transaction_size: TransactionSize,
) -> Metric {
let map_of_txs: Arc<DashMap<Signature, TxSendData>> = Arc::new(DashMap::new());
// transaction sender task
let api_caller_result = {
let map_of_txs = map_of_txs.clone();
let rpc_client = rpc_client.clone();
let current_slot = current_slot.clone();
tokio::spawn(async move {
let map_of_txs = map_of_txs.clone();
let n_chars = match transaction_size {
TransactionSize::Small => 10,
TransactionSize::Large => 232, // 565 is max but we need to lower that to not burn the CUs
};
let rand_strings = BenchHelper::generate_random_strings(tx_count, Some(seed), n_chars);
let bench_start_time = Instant::now();
for rand_string in &rand_strings {
let blockhash = { *block_hash.read().await };
let tx = match transaction_size {
TransactionSize::Small => {
BenchHelper::create_memo_tx_small(rand_string, &funded_payer, blockhash)
}
TransactionSize::Large => {
BenchHelper::create_memo_tx_large(rand_string, &funded_payer, blockhash)
}
};
let start_time = Instant::now();
match rpc_client.send_transaction(&tx).await {
Ok(signature) => {
map_of_txs.insert(
signature,
TxSendData {
sent_duration: start_time.elapsed(),
sent_instant: Instant::now(),
sent_slot: current_slot.load(std::sync::atomic::Ordering::Relaxed),
transaction_bytes: bincode::serialized_size(&tx).unwrap(),
},
);
}
Err(e) => {
warn!("tx send failed with error {}", e);
}
}
}
ApiCallerResult {
gross_send_time: bench_start_time.elapsed(),
}
})
};
let mut metric = Metric::default();
let confirmation_time = Instant::now();
let mut confirmed_count = 0;
while confirmation_time.elapsed() < Duration::from_secs(60)
&& !(map_of_txs.is_empty() && confirmed_count == tx_count)
{
let signatures = map_of_txs.iter().map(|x| *x.key()).collect::<Vec<_>>();
if signatures.is_empty() {
tokio::time::sleep(Duration::from_millis(1)).await;
continue;
}
if let Ok(res) = rpc_client.get_signature_statuses(&signatures).await {
for (i, signature) in signatures.iter().enumerate() {
let tx_status = &res.value[i];
if tx_status.is_some() {
let tx_data = map_of_txs.get(signature).unwrap();
let time_to_confirm = tx_data.sent_instant.elapsed();
let transaction_bytes = tx_data.transaction_bytes;
metric.add_successful_transaction(
tx_data.sent_duration,
time_to_confirm,
transaction_bytes,
);
if log_txs {
let _ = tx_metric_sx.send(TxMetricData {
signature: signature.to_string(),
sent_slot: tx_data.sent_slot,
confirmed_slot: current_slot.load(Ordering::Relaxed),
time_to_send_in_millis: tx_data.sent_duration.as_millis() as u64,
time_to_confirm_in_millis: time_to_confirm.as_millis() as u64,
});
}
drop(tx_data);
map_of_txs.remove(signature);
confirmed_count += 1;
}
}
}
}
for tx in map_of_txs.iter() {
metric.add_unsuccessful_transaction(tx.sent_duration, tx.transaction_bytes);
}
let api_caller_result = api_caller_result
.await
.expect("api caller task must succeed");
metric
.set_total_gross_send_time(api_caller_result.gross_send_time.as_micros() as f64 / 1_000.0);
metric.finalize();
metric
}
// see https://spl.solana.com/memo for sizing of transactions
// As of v1.5.1, an unsigned instruction can support single-byte UTF-8 of up to 566 bytes.
// An instruction with a simple memo of 32 bytes can support up to 12 signers.
#[derive(Debug, Clone, Copy)]
enum TransactionSize {
// 179 bytes, 5237 CUs
Small,
// 1186 bytes, 193175 CUs
Large,
}

View File

@ -0,0 +1,102 @@
// adapter code for all from benchrunner-service
use crate::bench1;
use crate::bench1::TransactionSize;
use crate::metrics::{Metric, TxMetricData};
use crate::tx_size::TxSize;
use log::debug;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::hash::Hash;
use solana_sdk::signature::Keypair;
use solana_sdk::signer::Signer;
use std::fmt::Display;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::Instant;
#[derive(Debug, Clone)]
pub struct BenchConfig {
pub tx_count: usize,
pub cu_price_micro_lamports: u64,
}
impl Display for BenchConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
pub async fn bench_servicerunner(
bench_config: &BenchConfig,
rpc_addr: String,
funded_payer: Keypair,
size_tx: TxSize,
) -> Metric {
let started_at = Instant::now();
let transaction_size = match size_tx {
TxSize::Small => TransactionSize::Small,
TxSize::Large => TransactionSize::Large,
};
debug!("Payer: {}", funded_payer.pubkey());
let rpc_client = Arc::new(RpcClient::new_with_commitment(
rpc_addr.clone(),
CommitmentConfig::confirmed(),
));
let bh = rpc_client.get_latest_blockhash().await.unwrap();
let slot = rpc_client.get_slot().await.unwrap();
let block_hash: Arc<RwLock<Hash>> = Arc::new(RwLock::new(bh));
let current_slot = Arc::new(AtomicU64::new(slot));
{
// block hash updater task
let block_hash = block_hash.clone();
let rpc_client = rpc_client.clone();
let current_slot = current_slot.clone();
tokio::spawn(async move {
loop {
let bh = rpc_client.get_latest_blockhash().await;
match bh {
Ok(bh) => {
let mut lock = block_hash.write().await;
*lock = bh;
}
Err(e) => println!("blockhash update error {}", e),
}
let slot = rpc_client.get_slot().await;
match slot {
Ok(slot) => {
current_slot.store(slot, std::sync::atomic::Ordering::Relaxed);
}
Err(e) => println!("slot {}", e),
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
})
};
{
// TODO what todo
// not used unless log_txs is set to true
let (tx_log_sx_null, _tx_log_rx) = tokio::sync::mpsc::unbounded_channel::<TxMetricData>();
bench1::bench(
rpc_client.clone(),
bench_config.tx_count,
funded_payer,
started_at.elapsed().as_micros() as u64,
block_hash.clone(),
current_slot.clone(),
tx_log_sx_null,
false, // log_transactions
transaction_size,
bench_config.cu_price_micro_lamports,
)
.await
}
}

View File

@ -1,4 +1,5 @@
use serde::Deserialize;
use std::fmt::Display;
// see https://spl.solana.com/memo for sizing of transactions
// As of v1.5.1, an unsigned instruction can support single-byte UTF-8 of up to 566 bytes.
@ -11,6 +12,15 @@ pub enum TxSize {
Large,
}
impl Display for TxSize {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TxSize::Small => write!(f, "small"),
TxSize::Large => write!(f, "large"),
}
}
}
impl TxSize {
pub fn size(&self) -> usize {
match self {

0
bench/transactions.csv Normal file
View File

View File

@ -0,0 +1,39 @@
[package]
name = "solana-lite-rpc-benchrunner-service"
version = "0.2.4"
edition = "2021"
description = "Service for running recurring benchmarks"
rust-version = "1.73.0"
repository = "https://github.com/blockworks-foundation/lite-rpc"
license = "AGPL"
[dependencies]
solana-lite-rpc-util = { workspace = true }
bench = { workspace = true }
solana-sdk = { workspace = true }
solana-rpc-client = { workspace = true }
solana-transaction-status = { workspace = true }
solana-rpc-client-api = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
clap = { workspace = true }
tracing-subscriber = { workspace = true }
prometheus = { workspace = true }
lazy_static = { workspace = true }
async-trait = { workspace = true }
tokio = { version = "1.28.2", features = ["full", "fs"]}
tokio-util = "0.7"
chrono = { workspace = true }
itertools = { workspace = true }
native-tls = { workspace = true }
postgres-native-tls = { workspace = true }
postgres-types = { version = "0.2.6", features = ["derive", "with-serde_json-1"] }
tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] }

View File

@ -0,0 +1,35 @@
# Setup
### Hardware
Hardware: recommend 1024MB RAM, 2 vCPUs, small disk
### Environment Variables
| Environment Variable | Purpose | Required? | Default Value |
|----------------------|-------------------------------------------------------|---------------|---------------|
| `PG_ENABLED` | Enable writing to PostgreSQL | No | false |
| `PG_CONFIG` | PostgreSQL connection string | if PG_ENABLED | |
| `TENANT1_ID` | Technical ID for the tenant | Yes | |
| `TENANT1_RPC_ADDR` | RPC address for the target RPC node | Yes | |
| `TENANT2_.. | more tenants can be added using TENANT2, TENANT3, ... | | |
### Command-line Arguments
```
Options:
-b, --bench-interval <BENCH_INTERVAL>
interval in milliseconds to run the benchmark [default: 60000]
-n, --tx-count <TX_COUNT>
[default: 10]
-s, --size-tx <SIZE_TX>
[default: small] [possible values: small, large]
-p, --prio-fees <PRIO_FEES>
[default: 0]
```
```bash
solana-lite-rpc-benchrunner-service \
--bench-interval 600000 \
--tx-count 100 \
--prio-fees 0 --prio-fees 1000 --prio-fees 100000
```

View File

@ -0,0 +1,80 @@
use itertools::Itertools;
use solana_sdk::signature::Keypair;
#[derive(Debug, Clone)]
pub struct TenantConfig {
// technical identifier for the tenant, e.g. "solana-rpc"
pub tenant_id: String,
pub rpc_addr: String,
}
// recommend to use one payer keypair for all targets and fund that keypair with enough SOL
pub fn get_funded_payer_from_env() -> Keypair {
let keypair58_string: String = std::env::var("FUNDED_PAYER_KEYPAIR58")
.expect("need funded payer keypair on env (variable FUNDED_PAYER_KEYPAIR58)");
Keypair::from_base58_string(&keypair58_string)
}
pub fn read_tenant_configs(env_vars: Vec<(String, String)>) -> Vec<TenantConfig> {
let map = env_vars
.iter()
.filter(|(k, _)| k.starts_with("TENANT"))
.into_group_map_by(|(k, _v)| {
let tenant_counter = k
.split('_')
.next()
.expect("tenant prefix must be split by underscore (e.g. TENANT99_SOMETHING")
.replace("TENANT", "");
tenant_counter
.parse::<u32>()
.expect("tenant counter must be a number (e.g. TENANT99)")
});
let values = map
.iter()
.sorted()
.map(|(tc, v)| TenantConfig {
tenant_id: v
.iter()
.find(|(v, _)| *v == format!("TENANT{}_ID", tc))
.iter()
.exactly_one()
.expect("need ID")
.1
.to_string(),
rpc_addr: v
.iter()
.find(|(v, _)| *v == format!("TENANT{}_RPC_ADDR", tc))
.iter()
.exactly_one()
.expect("need RPC_ADDR")
.1
.to_string(),
})
.collect::<Vec<TenantConfig>>();
values
}
#[test]
fn test_env_vars() {
let env_vars = vec![
(String::from("TENANT1_ID"), String::from("solana-rpc")),
(
String::from("TENANT1_RPC_ADDR"),
String::from("http://localhost:8899"),
),
(String::from("TENANT2_ID"), String::from("lite-rpc")),
(
String::from("TENANT2_RPC_ADDR"),
String::from("http://localhost:8890"),
),
];
let tenant_configs = read_tenant_configs(env_vars);
assert_eq!(tenant_configs.len(), 2);
assert_eq!(tenant_configs[0].tenant_id, "solana-rpc");
assert_eq!(tenant_configs[0].rpc_addr, "http://localhost:8899");
assert_eq!(tenant_configs[1].tenant_id, "lite-rpc");
assert_eq!(tenant_configs[1].rpc_addr, "http://localhost:8890");
}

View File

@ -0,0 +1,16 @@
use bench::tx_size::TxSize;
use clap::Parser;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct Args {
/// interval in milliseconds to run the benchmark
#[arg(short = 'b', long, default_value_t = 60_000)]
pub bench_interval: u64,
#[arg(short = 'n', long, default_value_t = 10)]
pub tx_count: usize,
#[clap(short, long, default_value_t = TxSize::Small)]
pub size_tx: TxSize,
#[clap(short, long, default_values_t = [0])]
pub prio_fees: Vec<u64>,
}

View File

@ -0,0 +1,149 @@
mod args;
mod cli;
mod postgres;
mod prometheus;
use crate::args::{get_funded_payer_from_env, read_tenant_configs};
use crate::cli::Args;
use crate::postgres::metrics_dbstore::{
save_metrics_to_postgres, upsert_benchrun_status, BenchRunStatus,
};
use crate::postgres::postgres_session::PostgresSessionConfig;
use crate::postgres::postgres_session_cache::PostgresSessionCache;
use crate::prometheus::metrics_prometheus::publish_metrics_on_prometheus;
use crate::prometheus::prometheus_sync::PrometheusSync;
use bench::service_adapter::BenchConfig;
use clap::Parser;
use futures_util::future::join_all;
use itertools::Itertools;
use log::{debug, error, info};
use std::net::SocketAddr;
use std::str::FromStr;
use std::time::{Duration, SystemTime};
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let Args {
bench_interval,
tx_count,
size_tx,
prio_fees,
} = Args::parse();
let postgres_config = PostgresSessionConfig::new_from_env().unwrap();
let bench_interval = Duration::from_millis(bench_interval);
let funded_payer = get_funded_payer_from_env();
let tenant_configs = read_tenant_configs(std::env::vars().collect::<Vec<(String, String)>>());
info!("Use postgres config: {:?}", postgres_config.is_some());
info!("Use prio fees: [{}]", prio_fees.iter().join(","));
info!("Start running benchmarks every {:?}", bench_interval);
info!(
"Found tenants: {}",
tenant_configs.iter().map(|tc| &tc.tenant_id).join(", ")
);
if tenant_configs.is_empty() {
error!("No tenants found (missing env vars) - exit");
return;
}
let _prometheus_task = PrometheusSync::sync(SocketAddr::from_str("[::]:9091").unwrap());
let mut jh_tenant_task = Vec::new();
// let postgres_session = Arc::new(PostgresSession::new(postgres_config.unwrap()).await);
let postgres_session = match postgres_config {
None => None,
Some(x) => {
let session_cache = PostgresSessionCache::new(x)
.await
.expect("PostgreSQL session cache");
Some(session_cache)
}
};
let bench_configs = prio_fees
.iter()
.map(|prio_fees| BenchConfig {
tx_count,
cu_price_micro_lamports: *prio_fees,
})
.collect_vec();
for tenant_config in &tenant_configs {
let funded_payer = funded_payer.insecure_clone();
let tenant_id = tenant_config.tenant_id.clone();
let postgres_session = postgres_session.clone();
let tenant_config = tenant_config.clone();
let bench_configs = bench_configs.clone();
let jh_runner = tokio::spawn(async move {
let mut interval = tokio::time::interval(bench_interval);
for run_count in 1.. {
let bench_config = bench_configs[run_count % bench_configs.len()].clone();
debug!(
"Invoke bench execution (#{}) on tenant <{}> using {}",
run_count, tenant_id, bench_config
);
let benchrun_at = SystemTime::now();
if let Some(postgres_session) = postgres_session.as_ref() {
let _dbstatus = upsert_benchrun_status(
postgres_session,
&tenant_config,
&bench_config,
benchrun_at,
BenchRunStatus::STARTED,
)
.await;
}
let metric = bench::service_adapter::bench_servicerunner(
&bench_config,
tenant_config.rpc_addr.clone(),
funded_payer.insecure_clone(),
size_tx,
)
.await;
if let Some(postgres_session) = postgres_session.as_ref() {
let _dbstatus = save_metrics_to_postgres(
postgres_session,
&tenant_config,
&bench_config,
&metric,
benchrun_at,
)
.await;
}
publish_metrics_on_prometheus(&tenant_config, &bench_config, &metric).await;
if let Some(postgres_session) = postgres_session.as_ref() {
let _dbstatus = upsert_benchrun_status(
postgres_session,
&tenant_config,
&bench_config,
benchrun_at,
BenchRunStatus::FINISHED,
)
.await;
}
debug!(
"Bench execution (#{}) done in {:?}",
run_count,
benchrun_at.elapsed().unwrap()
);
interval.tick().await;
}
});
jh_tenant_task.push(jh_runner);
} // -- END tenant loop
join_all(jh_tenant_task).await;
}

View File

@ -0,0 +1,104 @@
use crate::args::TenantConfig;
use crate::postgres::postgres_session_cache::PostgresSessionCache;
use bench::metrics::Metric;
use bench::service_adapter::BenchConfig;
use log::warn;
use postgres_types::ToSql;
use std::time::SystemTime;
#[allow(clippy::upper_case_acronyms)]
pub enum BenchRunStatus {
STARTED,
FINISHED,
}
impl BenchRunStatus {
pub fn to_db_string(&self) -> &str {
match self {
BenchRunStatus::STARTED => "STARTED",
BenchRunStatus::FINISHED => "FINISHED",
}
}
}
pub async fn upsert_benchrun_status(
postgres_session: &PostgresSessionCache,
tenant_config: &TenantConfig,
_bench_config: &BenchConfig,
benchrun_at: SystemTime,
status: BenchRunStatus,
) -> anyhow::Result<()> {
let values: &[&(dyn ToSql + Sync)] = &[
&tenant_config.tenant_id,
&benchrun_at,
&status.to_db_string(),
];
let write_result = postgres_session
.get_session()
.await?
.execute(
r#"
INSERT INTO benchrunner.bench_runs (
tenant,
ts,
status
)
VALUES ($1, $2, $3)
ON CONFLICT (tenant, ts) DO UPDATE SET status = $3
"#,
values,
)
.await;
if let Err(err) = write_result {
warn!("Failed to upsert status (err {:?}) - continue", err);
}
Ok(())
}
pub async fn save_metrics_to_postgres(
postgres_session: &PostgresSessionCache,
tenant_config: &TenantConfig,
bench_config: &BenchConfig,
metric: &Metric,
benchrun_at: SystemTime,
) -> anyhow::Result<()> {
let metricjson = serde_json::to_value(metric).unwrap();
let values: &[&(dyn ToSql + Sync)] = &[
&tenant_config.tenant_id,
&benchrun_at,
&(bench_config.cu_price_micro_lamports as i64),
&(metric.txs_sent as i64),
&(metric.txs_confirmed as i64),
&(metric.txs_un_confirmed as i64),
&(metric.average_confirmation_time_ms as f32),
&metricjson,
];
let write_result = postgres_session
.get_session()
.await?
.execute(
r#"
INSERT INTO
benchrunner.bench_metrics (
tenant,
ts,
prio_fees,
txs_sent,
txs_confirmed, txs_un_confirmed,
average_confirmation_time_ms,
metric_json
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
"#,
values,
)
.await;
if let Err(err) = write_result {
warn!("Failed to insert metrics (err {:?}) - continue", err);
}
Ok(())
}

View File

@ -0,0 +1,3 @@
pub mod metrics_dbstore;
pub mod postgres_session;
pub mod postgres_session_cache;

View File

@ -0,0 +1,214 @@
#![allow(dead_code)]
use std::env;
use std::sync::Arc;
use anyhow::Context;
use native_tls::{Certificate, Identity, TlsConnector};
use postgres_native_tls::MakeTlsConnector;
use solana_lite_rpc_util::encoding::BinaryEncoding;
use tokio_postgres::{
config::SslMode, tls::MakeTlsConnect, types::ToSql, Client, Error, NoTls, Row, Socket,
};
#[derive(serde::Deserialize, Debug, Clone)]
pub struct PostgresSessionConfig {
pub pg_config: String,
pub ssl: Option<PostgresSessionSslConfig>,
}
#[derive(serde::Deserialize, Debug, Clone)]
pub struct PostgresSessionSslConfig {
pub ca_pem_b64: String,
pub client_pks_b64: String,
pub client_pks_pass: String,
}
impl PostgresSessionConfig {
pub fn new_from_env() -> anyhow::Result<Option<Self>> {
// pg not enabled
if env::var("PG_ENABLED").is_err() {
return Ok(None);
}
let enable_pg = env::var("PG_ENABLED").context("PG_ENABLED")?;
if enable_pg != *"true" {
return Ok(None);
}
let env_pg_config = env::var("PG_CONFIG").context("PG_CONFIG not found")?;
let ssl_config = if env_pg_config
.parse::<tokio_postgres::Config>()?
.get_ssl_mode()
.eq(&SslMode::Disable)
{
None
} else {
let env_ca_pem_b64 = env::var("CA_PEM_B64").context("CA_PEM_B64 not found")?;
let env_client_pks_b64 =
env::var("CLIENT_PKS_B64").context("CLIENT_PKS_B64 not found")?;
let env_client_pks_pass =
env::var("CLIENT_PKS_PASS").context("CLIENT_PKS_PASS not found")?;
Some(PostgresSessionSslConfig {
ca_pem_b64: env_ca_pem_b64,
client_pks_b64: env_client_pks_b64,
client_pks_pass: env_client_pks_pass,
})
};
Ok(Some(Self {
pg_config: env_pg_config,
ssl: ssl_config,
}))
}
}
#[derive(Clone)]
pub struct PostgresSession {
client: Arc<Client>,
}
impl PostgresSession {
pub async fn new_from_env() -> anyhow::Result<Self> {
let pg_session_config = PostgresSessionConfig::new_from_env()
.expect("failed to start Postgres Client")
.expect("Postgres not enabled (use PG_ENABLED)");
PostgresSession::new(pg_session_config).await
}
pub async fn new(
PostgresSessionConfig { pg_config, ssl }: PostgresSessionConfig,
) -> anyhow::Result<Self> {
let pg_config = pg_config.parse::<tokio_postgres::Config>()?;
let client = if let SslMode::Disable = pg_config.get_ssl_mode() {
Self::spawn_connection(pg_config, NoTls).await?
} else {
let PostgresSessionSslConfig {
ca_pem_b64,
client_pks_b64,
client_pks_pass,
} = ssl.as_ref().unwrap();
let ca_pem = BinaryEncoding::Base64
.decode(ca_pem_b64)
.context("ca pem decode")?;
let client_pks = BinaryEncoding::Base64
.decode(client_pks_b64)
.context("client pks decode")?;
let connector = TlsConnector::builder()
.add_root_certificate(Certificate::from_pem(&ca_pem)?)
.identity(Identity::from_pkcs12(&client_pks, client_pks_pass).context("Identity")?)
.danger_accept_invalid_hostnames(true)
.danger_accept_invalid_certs(true)
.build()?;
Self::spawn_connection(pg_config, MakeTlsConnector::new(connector)).await?
};
Ok(Self {
client: Arc::new(client),
})
}
async fn spawn_connection<T>(
pg_config: tokio_postgres::Config,
connector: T,
) -> anyhow::Result<Client>
where
T: MakeTlsConnect<Socket> + Send + 'static,
<T as MakeTlsConnect<Socket>>::Stream: Send,
{
let (client, connection) = pg_config
.connect(connector)
.await
.context("Connecting to Postgres failed")?;
tokio::spawn(async move {
log::info!("Connecting to Postgres");
if let Err(err) = connection.await {
log::error!("Connection to Postgres broke: {err:?}");
return;
}
log::debug!("Postgres thread shutting down");
});
Ok(client)
}
pub fn is_closed(&self) -> bool {
self.client.is_closed()
}
pub async fn execute(
&self,
statement: &str,
params: &[&(dyn ToSql + Sync)],
) -> Result<u64, tokio_postgres::error::Error> {
self.client.execute(statement, params).await
}
// execute statements seperated by semicolon
pub async fn execute_multiple(&self, statement: &str) -> Result<(), Error> {
self.client.batch_execute(statement).await