lite-client, tests and bench

This commit is contained in:
aniketfuryrocks 2022-12-10 23:01:37 +05:30
parent 86c84df6db
commit 5f047c9c4d
No known key found for this signature in database
GPG Key ID: 9CDC12D03F4F5BB7
11 changed files with 719 additions and 293 deletions

608
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -3,7 +3,26 @@ name = "lite-rpc"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[workspace]
members = [
"lite-client",
"lite-bench-utils"
]
[[bench]]
name="tps"
harness=false
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
[dev-dependencies]
csv = "1.1.6"
serde = { version = "1", features = ["derive"]}
lite-client = { path ="./lite-client" }
lite-bench-utils = { path = "./lite-bench-utils" }
log = "0.4.17"
simplelog = "0.12.0"
[dependencies]
solana-client = { git = "https://github.com/solana-labs/solana.git" }
@ -34,6 +53,7 @@ solana-vote-program = { git = "https://github.com/solana-labs/solana.git" }
solana-rpc = { git = "https://github.com/solana-labs/solana.git" }
tokio = { version = "1.14.1", features = ["full"]}
tokio-util = { version = "0.6", features = ["codec", "compat"] }
futures = "0.3.25"
jsonrpc-core = "18.0.0"
jsonrpc-core-client = { version = "18.0.0" }
@ -43,29 +63,21 @@ jsonrpc-pubsub = "18.0.0"
clap = { version = "4.0.29", features = ["derive"] }
procinfo = "0.4.2"
base64 = "0.13.0"
base64 = "0.13.1"
bincode = "1.3.3"
bs58 = "0.4.0"
crossbeam-channel = "0.5"
dashmap = "4.0.2"
crossbeam-channel = "0.5.6"
dashmap = "5.4.0"
itertools = "0.10.5"
libc = "0.2.131"
libc = "0.2.138"
log = "0.4.17"
rayon = "1.5.3"
regex = "1.6.0"
serde = "1.0.144"
serde_derive = "1.0.103"
serde_json = "1.0.83"
soketto = "0.7"
rayon = "1.6.1"
regex = "1.7.0"
serde = "1.0.149"
serde_derive = "1.0.149"
serde_json = "1.0.89"
soketto = "0.7.1"
spl-token = { version = "=3.5.0", features = ["no-entrypoint"] }
spl-token-2022 = { version = "=0.4.3", features = ["no-entrypoint"] }
spl-token-2022 = { version = "0.5.0", features = ["no-entrypoint"] }
stream-cancel = "0.8.1"
thiserror = "1.0"
tokio-util = { version = "0.6", features = ["codec", "compat"] }
[dev-dependencies]
csv = "1.1.6"
serde = { version = "1", features = ["derive"]}
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
thiserror = "1.0.37"

123
benches/tps.rs Normal file
View File

@ -0,0 +1,123 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use lite_bench_utils::{
generate_txs,
metrics::{AvgMetric, Metric},
new_funded_payer,
};
use log::info;
use solana_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
use solana_sdk::native_token::LAMPORTS_PER_SOL;
use lite_client::{LiteClient, LOCAL_LIGHT_RPC_ADDR};
use simplelog::*;
use tokio::sync::mpsc;
const NUM_OF_TXS: usize = 10_000;
const NUM_OF_RUNS: usize = 1;
const CSV_FILE_NAME: &str = "metrics.csv";
#[tokio::main]
async fn main() {
TermLogger::init(
LevelFilter::Info,
Config::default(),
TerminalMode::Mixed,
ColorChoice::Auto,
)
.unwrap();
let lite_client = Arc::new(LiteClient(RpcClient::new(LOCAL_LIGHT_RPC_ADDR.to_string())));
let mut csv_writer = csv::Writer::from_path(CSV_FILE_NAME).unwrap();
let mut avg_metric = AvgMetric::default();
for run_num in 0..NUM_OF_RUNS {
let metric = foo(lite_client.clone()).await;
info!("Run {run_num}: Sent and Confirmed {NUM_OF_TXS} tx(s) in {metric:?}",);
avg_metric += &metric;
csv_writer.serialize(metric).unwrap();
}
let avg_metric = Metric::from(avg_metric);
info!("Avg Metric {avg_metric:?}",);
csv_writer.serialize(avg_metric).unwrap();
csv_writer.flush().unwrap();
}
async fn foo(lite_client: Arc<LiteClient>) -> Metric {
let funded_payer = new_funded_payer(&lite_client, LAMPORTS_PER_SOL * 2000)
.await
.unwrap();
let txs = generate_txs(NUM_OF_TXS, &lite_client.0, &funded_payer)
.await
.unwrap();
let mut un_confirmed_txs: HashMap<String, Option<Instant>> = HashMap::with_capacity(txs.len());
for tx in &txs {
un_confirmed_txs.insert(tx.get_signature().to_string(), None);
}
let start_time = Instant::now();
info!("Sending and Confirming {NUM_OF_TXS} tx(s)");
let send_fut = {
let lite_client = lite_client.clone();
tokio::spawn(async move {
for tx in txs {
lite_client.send_transaction(&tx).await.unwrap();
info!("Tx {}", &tx.signatures[0]);
}
info!("Sent {NUM_OF_TXS} tx(s)");
})
};
let (metrics_send, mut metrics_recv) = mpsc::channel(1);
let confirm_fut = tokio::spawn(async move {
let mut metrics = Metric::default();
while !un_confirmed_txs.is_empty() {
let mut to_remove_txs = Vec::new();
for (sig, time_elapsed_since_last_confirmed) in un_confirmed_txs.iter_mut() {
if time_elapsed_since_last_confirmed.is_none() {
*time_elapsed_since_last_confirmed = Some(Instant::now())
}
if lite_client.confirm_transaction(sig.clone()).await.value {
metrics.txs_confirmed += 1;
to_remove_txs.push(sig.clone());
} else if time_elapsed_since_last_confirmed.unwrap().elapsed()
> Duration::from_secs(3)
{
metrics.txs_un_confirmed += 1;
to_remove_txs.push(sig.clone());
}
}
for to_remove_tx in to_remove_txs {
un_confirmed_txs.remove(&to_remove_tx);
}
}
metrics.time_elapsed_sec = start_time.elapsed().as_secs_f64();
metrics.txs_sent = NUM_OF_TXS as u64;
metrics.calc_tps();
metrics_send.send(metrics).await.unwrap();
});
let (res1, res2) = tokio::join!(send_fut, confirm_fut);
res1.unwrap();
res2.unwrap();
metrics_recv.recv().await.unwrap()
}

View File

@ -0,0 +1,14 @@
[package]
name = "lite-bench-utils"
version = "0.1.0"
edition = "2021"
[dependencies]
lite-client = { path ="../lite-client" }
serde = { version = "1.0.149", features = ["derive"] }
serde_json = "1.0.89"
solana-client = { git = "https://github.com/solana-labs/solana.git" }
solana-sdk = { git = "https://github.com/solana-labs/solana.git" }
tokio = "1.14.1"
log = "0.4.17"
anyhow = "1.0.66"

View File

@ -0,0 +1,8 @@
test:
cargo test send_and_confirm_tx -- --nocapture
benchmark:
cargo bench
clean:
cargo clean

View File

@ -0,0 +1,74 @@
pub mod metrics;
use std::thread;
use std::time::Duration;
use lite_client::LiteClient;
use log::info;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::hash::Hash;
use solana_sdk::signature::Signature;
use solana_sdk::{
message::Message, pubkey::Pubkey, signature::Keypair, signer::Signer, system_instruction,
transaction::Transaction,
};
pub async fn new_funded_payer(lite_client: &LiteClient, amount: u64) -> anyhow::Result<Keypair> {
let payer = Keypair::new();
let payer_pubkey = payer.pubkey().to_string();
// request airdrop to payer
let airdrop_sig = lite_client.request_airdrop(&payer.pubkey(), amount).await?;
info!("Air Dropping {payer_pubkey} with {amount}L");
thread::sleep(Duration::from_secs(12));
//loop {
// if let Some(res) = lite_client
// .get_signature_status_with_commitment(&airdrop_sig, CommitmentConfig::finalized())
// .await?
// {
// match res {
// Ok(_) => break,
// Err(_) => bail!("Error air dropping {payer_pubkey}"),
// }
// }
//}
info!("Air Drop Successful: {airdrop_sig}");
Ok(payer)
}
pub async fn wait_till_confirmed(lite_client: &LiteClient, sig: &Signature) {
while lite_client.confirm_transaction(sig.to_string()).await.value {}
}
pub fn create_transaction(funded_payer: &Keypair, blockhash: Hash) -> Transaction {
let to_pubkey = Pubkey::new_unique();
// transfer instruction
let instruction = system_instruction::transfer(&funded_payer.pubkey(), &to_pubkey, 1_000_000);
let message = Message::new(&[instruction], Some(&funded_payer.pubkey()));
Transaction::new(&[funded_payer], message, blockhash)
}
pub async fn generate_txs(
num_of_txs: usize,
rpc_client: &RpcClient,
funded_payer: &Keypair,
) -> anyhow::Result<Vec<Transaction>> {
let mut txs = Vec::with_capacity(num_of_txs);
let blockhash = rpc_client.get_latest_blockhash().await?;
for _ in 0..num_of_txs {
txs.push(create_transaction(funded_payer, blockhash));
}
Ok(txs)
}

View File

@ -0,0 +1,56 @@
use std::ops::{AddAssign, DivAssign};
#[derive(Debug, Default, serde::Serialize)]
pub struct Metric {
pub time_elapsed_sec: f64,
pub txs_sent: u64,
pub txs_confirmed: u64,
pub txs_un_confirmed: u64,
pub tps: f64,
}
#[derive(Default)]
pub struct AvgMetric {
num_of_runs: u64,
total_metric: Metric,
}
impl Metric {
pub fn calc_tps(&mut self) {
self.tps = self.txs_confirmed as f64 / self.time_elapsed_sec
}
}
impl AddAssign<&Self> for Metric {
fn add_assign(&mut self, rhs: &Self) {
self.time_elapsed_sec += rhs.time_elapsed_sec;
self.txs_sent += rhs.txs_sent;
self.txs_confirmed += rhs.txs_confirmed;
self.txs_un_confirmed += rhs.txs_un_confirmed;
self.tps += rhs.tps
}
}
impl DivAssign<u64> for Metric {
fn div_assign(&mut self, rhs: u64) {
self.time_elapsed_sec /= rhs as f64;
self.txs_sent /= rhs;
self.txs_confirmed /= rhs;
self.txs_un_confirmed /= rhs;
self.tps /= rhs as f64;
}
}
impl AddAssign<&Metric> for AvgMetric {
fn add_assign(&mut self, rhs: &Metric) {
self.num_of_runs += 1;
self.total_metric += rhs;
}
}
impl From<AvgMetric> for Metric {
fn from(mut avg_metric: AvgMetric) -> Self {
avg_metric.total_metric /= avg_metric.num_of_runs;
avg_metric.total_metric
}
}

8
lite-client/Cargo.toml Normal file
View File

@ -0,0 +1,8 @@
[package]
name = "lite-client"
version = "0.1.0"
edition = "2021"
[dependencies]
serde_json = "1.0.89"
solana-client = { git = "https://github.com/solana-labs/solana.git" }

View File

@ -1,7 +1,11 @@
use std::ops::{Deref, DerefMut};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_client::rpc_request::RpcRequest;
use solana_client::{
nonblocking::rpc_client::RpcClient, rpc_request::RpcRequest,
rpc_response::Response as RpcResponse,
};
pub const LOCAL_LIGHT_RPC_ADDR: &str = "http://127.0.0.1:8890";
pub struct LiteClient(pub RpcClient);
@ -20,7 +24,7 @@ impl DerefMut for LiteClient {
}
impl LiteClient {
pub async fn confirm_transaction(&self, signature: String) -> bool {
pub async fn confirm_transaction(&self, signature: String) -> RpcResponse<bool> {
self.send(
RpcRequest::Custom {
method: "confirmTransaction",
@ -31,3 +35,4 @@ impl LiteClient {
.unwrap()
}
}

View File

@ -1,3 +1,8 @@
mod cli;
mod context;
mod pubsub;
mod rpc;
use std::{net::SocketAddr, sync::Arc};
use clap::Parser;
@ -16,12 +21,6 @@ use crate::{
LightRpcRequestProcessor,
},
};
mod cli;
mod client;
mod context;
mod pubsub;
mod rpc;
use cli::Args;
fn run(port: String, subscription_port: String, rpc_url: String, websocket_url: String) {

45
tests/client.rs Normal file
View File

@ -0,0 +1,45 @@
use log::info;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_client::rpc_client::SerializableTransaction;
use solana_sdk::native_token::LAMPORTS_PER_SOL;
use lite_client::{LiteClient, LOCAL_LIGHT_RPC_ADDR};
use lite_bench_utils::{generate_txs, new_funded_payer, wait_till_confirmed};
use simplelog::*;
const AMOUNT: usize = 100;
#[tokio::test]
async fn send_and_confirm_tx() {
TermLogger::init(
LevelFilter::Info,
Config::default(),
TerminalMode::Mixed,
ColorChoice::Auto,
)
.unwrap();
let lite_client = LiteClient(RpcClient::new(LOCAL_LIGHT_RPC_ADDR.to_string()));
let funded_payer = new_funded_payer(&lite_client, LAMPORTS_PER_SOL * 2)
.await
.unwrap();
let txs = generate_txs(AMOUNT, &lite_client.0, &funded_payer)
.await
.unwrap();
info!("Sending and Confirming {AMOUNT} tx(s)");
for tx in &txs {
lite_client.send_transaction(tx).await.unwrap();
info!("Tx {}", &tx.signatures[0]);
}
for tx in &txs {
let sig = tx.get_signature();
info!("Confirming {sig}");
wait_till_confirmed(&lite_client, sig).await;
}
info!("Sent and Confirmed {AMOUNT} tx(s)");
}