listen status from ws
This commit is contained in:
parent
8ee3d6eb14
commit
0966211555
|
@ -578,11 +578,13 @@ dependencies = [
|
||||||
"dirs",
|
"dirs",
|
||||||
"futures",
|
"futures",
|
||||||
"itertools 0.10.5",
|
"itertools 0.10.5",
|
||||||
|
"jsonrpsee-types 0.22.5",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"log",
|
"log",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
"rand_chacha 0.3.1",
|
"rand_chacha 0.3.1",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
|
"scopeguard",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"solana-lite-rpc-util",
|
"solana-lite-rpc-util",
|
||||||
|
@ -595,6 +597,7 @@ dependencies = [
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
"url",
|
"url",
|
||||||
|
"websocket-tungstenite-retry",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -2341,7 +2344,7 @@ dependencies = [
|
||||||
"jsonrpsee-http-client",
|
"jsonrpsee-http-client",
|
||||||
"jsonrpsee-proc-macros",
|
"jsonrpsee-proc-macros",
|
||||||
"jsonrpsee-server",
|
"jsonrpsee-server",
|
||||||
"jsonrpsee-types",
|
"jsonrpsee-types 0.20.3",
|
||||||
"jsonrpsee-wasm-client",
|
"jsonrpsee-wasm-client",
|
||||||
"jsonrpsee-ws-client",
|
"jsonrpsee-ws-client",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
@ -2384,7 +2387,7 @@ dependencies = [
|
||||||
"futures-timer",
|
"futures-timer",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"hyper",
|
"hyper",
|
||||||
"jsonrpsee-types",
|
"jsonrpsee-types 0.20.3",
|
||||||
"parking_lot",
|
"parking_lot",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
"rustc-hash",
|
"rustc-hash",
|
||||||
|
@ -2407,7 +2410,7 @@ dependencies = [
|
||||||
"hyper",
|
"hyper",
|
||||||
"hyper-rustls",
|
"hyper-rustls",
|
||||||
"jsonrpsee-core",
|
"jsonrpsee-core",
|
||||||
"jsonrpsee-types",
|
"jsonrpsee-types 0.20.3",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
|
@ -2440,7 +2443,7 @@ dependencies = [
|
||||||
"http",
|
"http",
|
||||||
"hyper",
|
"hyper",
|
||||||
"jsonrpsee-core",
|
"jsonrpsee-core",
|
||||||
"jsonrpsee-types",
|
"jsonrpsee-types 0.20.3",
|
||||||
"route-recognizer",
|
"route-recognizer",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
@ -2467,6 +2470,19 @@ dependencies = [
|
||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "jsonrpsee-types"
|
||||||
|
version = "0.22.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "150d6168405890a7a3231a3c74843f58b8959471f6df76078db2619ddee1d07d"
|
||||||
|
dependencies = [
|
||||||
|
"anyhow",
|
||||||
|
"beef",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"thiserror",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "jsonrpsee-wasm-client"
|
name = "jsonrpsee-wasm-client"
|
||||||
version = "0.20.3"
|
version = "0.20.3"
|
||||||
|
@ -2475,7 +2491,7 @@ checksum = "7c7cbb3447cf14fd4d2f407c3cc96e6c9634d5440aa1fbed868a31f3c02b27f0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"jsonrpsee-client-transport",
|
"jsonrpsee-client-transport",
|
||||||
"jsonrpsee-core",
|
"jsonrpsee-core",
|
||||||
"jsonrpsee-types",
|
"jsonrpsee-types 0.20.3",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -2487,7 +2503,7 @@ dependencies = [
|
||||||
"http",
|
"http",
|
||||||
"jsonrpsee-client-transport",
|
"jsonrpsee-client-transport",
|
||||||
"jsonrpsee-core",
|
"jsonrpsee-core",
|
||||||
"jsonrpsee-types",
|
"jsonrpsee-types 0.20.3",
|
||||||
"url",
|
"url",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -4149,6 +4165,17 @@ dependencies = [
|
||||||
"opaque-debug",
|
"opaque-debug",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "sha-1"
|
||||||
|
version = "0.10.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"cpufeatures",
|
||||||
|
"digest 0.10.7",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "sha1"
|
name = "sha1"
|
||||||
version = "0.10.6"
|
version = "0.10.6"
|
||||||
|
@ -4290,7 +4317,7 @@ dependencies = [
|
||||||
"httparse",
|
"httparse",
|
||||||
"log",
|
"log",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
"sha-1",
|
"sha-1 0.9.8",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -5085,8 +5112,8 @@ dependencies = [
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-stream",
|
"tokio-stream",
|
||||||
"tokio-tungstenite",
|
"tokio-tungstenite 0.20.1",
|
||||||
"tungstenite",
|
"tungstenite 0.20.1",
|
||||||
"url",
|
"url",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -6054,6 +6081,20 @@ dependencies = [
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tokio-tungstenite"
|
||||||
|
version = "0.17.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f714dd15bead90401d77e04243611caec13726c2408afd5b31901dfcdcb3b181"
|
||||||
|
dependencies = [
|
||||||
|
"futures-util",
|
||||||
|
"log",
|
||||||
|
"native-tls",
|
||||||
|
"tokio",
|
||||||
|
"tokio-native-tls",
|
||||||
|
"tungstenite 0.17.3",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio-tungstenite"
|
name = "tokio-tungstenite"
|
||||||
version = "0.20.1"
|
version = "0.20.1"
|
||||||
|
@ -6065,7 +6106,7 @@ dependencies = [
|
||||||
"rustls",
|
"rustls",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-rustls",
|
"tokio-rustls",
|
||||||
"tungstenite",
|
"tungstenite 0.20.1",
|
||||||
"webpki-roots 0.25.4",
|
"webpki-roots 0.25.4",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -6308,6 +6349,26 @@ version = "0.2.5"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
|
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tungstenite"
|
||||||
|
version = "0.17.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e27992fd6a8c29ee7eef28fc78349aa244134e10ad447ce3b9f0ac0ed0fa4ce0"
|
||||||
|
dependencies = [
|
||||||
|
"base64 0.13.1",
|
||||||
|
"byteorder",
|
||||||
|
"bytes",
|
||||||
|
"http",
|
||||||
|
"httparse",
|
||||||
|
"log",
|
||||||
|
"native-tls",
|
||||||
|
"rand 0.8.5",
|
||||||
|
"sha-1 0.10.1",
|
||||||
|
"thiserror",
|
||||||
|
"url",
|
||||||
|
"utf-8",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tungstenite"
|
name = "tungstenite"
|
||||||
version = "0.20.1"
|
version = "0.20.1"
|
||||||
|
@ -6598,6 +6659,23 @@ version = "0.25.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1"
|
checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "websocket-tungstenite-retry"
|
||||||
|
version = "0.6.0"
|
||||||
|
source = "git+https://github.com/grooviegermanikus/websocket-tungstenite-retry.git?tag=v0.8.0#2f423ed40171b023aa0d5c67287ad778958a6722"
|
||||||
|
dependencies = [
|
||||||
|
"anyhow",
|
||||||
|
"futures-util",
|
||||||
|
"log",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"tokio",
|
||||||
|
"tokio-tungstenite 0.17.2",
|
||||||
|
"tokio-util",
|
||||||
|
"tracing",
|
||||||
|
"url",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "which"
|
name = "which"
|
||||||
version = "4.4.2"
|
version = "4.4.2"
|
||||||
|
|
|
@ -13,6 +13,10 @@ name = "benchnew"
|
||||||
path = "src/benchnew.rs"
|
path = "src/benchnew.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|
||||||
|
websocket-tungstenite-retry = { git = "https://github.com/grooviegermanikus/websocket-tungstenite-retry.git", tag = "v0.8.0" }
|
||||||
|
jsonrpsee-types = "0.22.2"
|
||||||
|
|
||||||
clap = { workspace = true }
|
clap = { workspace = true }
|
||||||
csv = "1.2.1"
|
csv = "1.2.1"
|
||||||
dirs = "5.0.0"
|
dirs = "5.0.0"
|
||||||
|
@ -38,6 +42,7 @@ spl-memo = "4.0.0"
|
||||||
url = "*"
|
url = "*"
|
||||||
reqwest = "0.11.26"
|
reqwest = "0.11.26"
|
||||||
lazy_static = "1.4.0"
|
lazy_static = "1.4.0"
|
||||||
|
scopeguard = "1.2.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
bincode = { workspace = true }
|
bincode = { workspace = true }
|
||||||
|
|
|
@ -11,6 +11,7 @@ use crate::benches::rpc_interface::{
|
||||||
};
|
};
|
||||||
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
|
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
|
||||||
use solana_sdk::signature::{read_keypair_file, Keypair, Signature, Signer};
|
use solana_sdk::signature::{read_keypair_file, Keypair, Signature, Signer};
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug, Default, serde::Serialize)]
|
#[derive(Clone, Copy, Debug, Default, serde::Serialize)]
|
||||||
pub struct Metric {
|
pub struct Metric {
|
||||||
|
@ -28,6 +29,7 @@ pub struct Metric {
|
||||||
pub async fn confirmation_rate(
|
pub async fn confirmation_rate(
|
||||||
payer_path: &Path,
|
payer_path: &Path,
|
||||||
rpc_url: String,
|
rpc_url: String,
|
||||||
|
tx_status_websocket_addr: String,
|
||||||
tx_params: BenchmarkTransactionParams,
|
tx_params: BenchmarkTransactionParams,
|
||||||
max_timeout: Duration,
|
max_timeout: Duration,
|
||||||
txs_per_run: usize,
|
txs_per_run: usize,
|
||||||
|
@ -46,7 +48,7 @@ pub async fn confirmation_rate(
|
||||||
let mut rpc_results = Vec::with_capacity(num_of_runs);
|
let mut rpc_results = Vec::with_capacity(num_of_runs);
|
||||||
|
|
||||||
for _ in 0..num_of_runs {
|
for _ in 0..num_of_runs {
|
||||||
match send_bulk_txs_and_wait(&rpc, &payer, txs_per_run, &tx_params, max_timeout)
|
match send_bulk_txs_and_wait(&rpc, Url::parse(&tx_status_websocket_addr).expect("Invalid Url"), &payer, txs_per_run, &tx_params, max_timeout)
|
||||||
.await
|
.await
|
||||||
.context("send bulk tx and wait")
|
.context("send bulk tx and wait")
|
||||||
{
|
{
|
||||||
|
@ -72,6 +74,7 @@ pub async fn confirmation_rate(
|
||||||
|
|
||||||
pub async fn send_bulk_txs_and_wait(
|
pub async fn send_bulk_txs_and_wait(
|
||||||
rpc: &RpcClient,
|
rpc: &RpcClient,
|
||||||
|
tx_status_websocket_addr: Url,
|
||||||
payer: &Keypair,
|
payer: &Keypair,
|
||||||
num_txs: usize,
|
num_txs: usize,
|
||||||
tx_params: &BenchmarkTransactionParams,
|
tx_params: &BenchmarkTransactionParams,
|
||||||
|
@ -87,7 +90,7 @@ pub async fn send_bulk_txs_and_wait(
|
||||||
|
|
||||||
trace!("Sending {} transactions in bulk ..", txs.len());
|
trace!("Sending {} transactions in bulk ..", txs.len());
|
||||||
let tx_and_confirmations_from_rpc: Vec<(Signature, ConfirmationResponseFromRpc)> =
|
let tx_and_confirmations_from_rpc: Vec<(Signature, ConfirmationResponseFromRpc)> =
|
||||||
send_and_confirm_bulk_transactions(rpc, &txs, max_timeout)
|
send_and_confirm_bulk_transactions(rpc, tx_status_websocket_addr, payer.pubkey(), &txs, max_timeout)
|
||||||
.await
|
.await
|
||||||
.context("send and confirm bulk tx")?;
|
.context("send and confirm bulk tx")?;
|
||||||
trace!("Done sending {} transaction.", txs.len());
|
trace!("Done sending {} transaction.", txs.len());
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
use std::str::FromStr;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use crate::benches::rpc_interface::{
|
use crate::benches::rpc_interface::{
|
||||||
|
@ -13,8 +14,10 @@ use solana_rpc_client::nonblocking::rpc_client::RpcClient;
|
||||||
use solana_sdk::signature::{read_keypair_file, Signature, Signer};
|
use solana_sdk::signature::{read_keypair_file, Signature, Signer};
|
||||||
use solana_sdk::transaction::VersionedTransaction;
|
use solana_sdk::transaction::VersionedTransaction;
|
||||||
use solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair};
|
use solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair};
|
||||||
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use tokio::time::{sleep, Instant};
|
use tokio::time::{sleep, Instant};
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
use crate::benches::tx_status_websocket_collector::start_tx_status_collector;
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug, Default)]
|
#[derive(Clone, Copy, Debug, Default)]
|
||||||
pub struct Metric {
|
pub struct Metric {
|
||||||
|
@ -46,6 +49,7 @@ pub async fn confirmation_slot(
|
||||||
payer_path: &Path,
|
payer_path: &Path,
|
||||||
rpc_a_url: String,
|
rpc_a_url: String,
|
||||||
rpc_b_url: String,
|
rpc_b_url: String,
|
||||||
|
tx_status_websocket_addr: String,
|
||||||
tx_params: BenchmarkTransactionParams,
|
tx_params: BenchmarkTransactionParams,
|
||||||
max_timeout: Duration,
|
max_timeout: Duration,
|
||||||
num_of_runs: usize,
|
num_of_runs: usize,
|
||||||
|
@ -66,12 +70,20 @@ pub async fn confirmation_slot(
|
||||||
|
|
||||||
let mut rng = create_rng(None);
|
let mut rng = create_rng(None);
|
||||||
let payer = read_keypair_file(payer_path).expect("payer file");
|
let payer = read_keypair_file(payer_path).expect("payer file");
|
||||||
info!("Payer: {}", payer.pubkey().to_string());
|
let payer_pubkey = payer.pubkey();
|
||||||
|
info!("Payer: {}", payer_pubkey.to_string());
|
||||||
// let mut ping_thing_tasks = vec![];
|
// let mut ping_thing_tasks = vec![];
|
||||||
|
|
||||||
|
// FIXME
|
||||||
|
// let (tx_status_map, jh_collector) = start_tx_status_collector(Url::parse(&tx_status_websocket_addr).unwrap(), payer.pubkey(), CommitmentConfig::confirmed()).await;
|
||||||
|
|
||||||
for _ in 0..num_of_runs {
|
for _ in 0..num_of_runs {
|
||||||
let rpc_a = create_rpc_client(&rpc_a_url);
|
let rpc_a = create_rpc_client(&rpc_a_url);
|
||||||
let rpc_b = create_rpc_client(&rpc_b_url);
|
let rpc_b = create_rpc_client(&rpc_b_url);
|
||||||
|
|
||||||
|
let tx_status_websocket_addr_a = Url::parse(&tx_status_websocket_addr).expect("Invalid URL");
|
||||||
|
let tx_status_websocket_addr_b = Url::parse(&tx_status_websocket_addr).expect("Invalid URL");
|
||||||
|
|
||||||
// measure network time to reach the respective RPC endpoints,
|
// measure network time to reach the respective RPC endpoints,
|
||||||
// used to mitigate the difference in distance by delaying the txn sending
|
// used to mitigate the difference in distance by delaying the txn sending
|
||||||
let time_a = rpc_roundtrip_duration(&rpc_a).await?.as_secs_f64();
|
let time_a = rpc_roundtrip_duration(&rpc_a).await?.as_secs_f64();
|
||||||
|
@ -95,13 +107,13 @@ pub async fn confirmation_slot(
|
||||||
let a_task = tokio::spawn(async move {
|
let a_task = tokio::spawn(async move {
|
||||||
sleep(Duration::from_secs_f64(a_delay)).await;
|
sleep(Duration::from_secs_f64(a_delay)).await;
|
||||||
debug!("(A) sending tx {}", rpc_a_tx.signatures[0]);
|
debug!("(A) sending tx {}", rpc_a_tx.signatures[0]);
|
||||||
send_and_confirm_transaction(&rpc_a, rpc_a_tx, max_timeout).await
|
send_and_confirm_transaction(&rpc_a, tx_status_websocket_addr_a, payer_pubkey, rpc_a_tx, max_timeout).await
|
||||||
});
|
});
|
||||||
|
|
||||||
let b_task = tokio::spawn(async move {
|
let b_task = tokio::spawn(async move {
|
||||||
sleep(Duration::from_secs_f64(b_delay)).await;
|
sleep(Duration::from_secs_f64(b_delay)).await;
|
||||||
debug!("(B) sending tx {}", rpc_b_tx.signatures[0]);
|
debug!("(B) sending tx {}", rpc_b_tx.signatures[0]);
|
||||||
send_and_confirm_transaction(&rpc_b, rpc_b_tx, max_timeout).await
|
send_and_confirm_transaction(&rpc_b, tx_status_websocket_addr_b, payer_pubkey, rpc_b_tx, max_timeout).await
|
||||||
});
|
});
|
||||||
|
|
||||||
let (a, b) = tokio::join!(a_task, b_task);
|
let (a, b) = tokio::join!(a_task, b_task);
|
||||||
|
@ -156,11 +168,13 @@ async fn create_tx(
|
||||||
|
|
||||||
async fn send_and_confirm_transaction(
|
async fn send_and_confirm_transaction(
|
||||||
rpc: &RpcClient,
|
rpc: &RpcClient,
|
||||||
|
tx_status_websocket_addr: Url,
|
||||||
|
payer_pubkey: Pubkey,
|
||||||
tx: VersionedTransaction,
|
tx: VersionedTransaction,
|
||||||
max_timeout: Duration,
|
max_timeout: Duration,
|
||||||
) -> anyhow::Result<ConfirmationResponseFromRpc> {
|
) -> anyhow::Result<ConfirmationResponseFromRpc> {
|
||||||
let result_vec: Vec<(Signature, ConfirmationResponseFromRpc)> =
|
let result_vec: Vec<(Signature, ConfirmationResponseFromRpc)> =
|
||||||
send_and_confirm_bulk_transactions(rpc, &[tx], max_timeout).await?;
|
send_and_confirm_bulk_transactions(rpc, tx_status_websocket_addr, payer_pubkey, &[tx], max_timeout).await?;
|
||||||
assert_eq!(result_vec.len(), 1, "expected 1 result");
|
assert_eq!(result_vec.len(), 1, "expected 1 result");
|
||||||
let (_sig, confirmation_response) = result_vec.into_iter().next().unwrap();
|
let (_sig, confirmation_response) = result_vec.into_iter().next().unwrap();
|
||||||
|
|
||||||
|
|
|
@ -2,3 +2,4 @@ pub mod api_load;
|
||||||
pub mod confirmation_rate;
|
pub mod confirmation_rate;
|
||||||
pub mod confirmation_slot;
|
pub mod confirmation_slot;
|
||||||
pub mod rpc_interface;
|
pub mod rpc_interface;
|
||||||
|
mod tx_status_websocket_collector;
|
||||||
|
|
|
@ -2,7 +2,7 @@ use anyhow::{bail, Context, Error};
|
||||||
use futures::future::join_all;
|
use futures::future::join_all;
|
||||||
use futures::TryFutureExt;
|
use futures::TryFutureExt;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use log::{debug, trace, warn};
|
use log::{debug, info, trace, warn};
|
||||||
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
|
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
|
||||||
use solana_rpc_client::rpc_client::SerializableTransaction;
|
use solana_rpc_client::rpc_client::SerializableTransaction;
|
||||||
use solana_rpc_client_api::client_error::ErrorKind;
|
use solana_rpc_client_api::client_error::ErrorKind;
|
||||||
|
@ -14,10 +14,19 @@ use solana_sdk::transaction::VersionedTransaction;
|
||||||
use solana_transaction_status::TransactionConfirmationStatus;
|
use solana_transaction_status::TransactionConfirmationStatus;
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::iter::zip;
|
use std::iter::zip;
|
||||||
|
use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::time::Instant;
|
use dashmap::mapref::multiple::RefMulti;
|
||||||
|
use scopeguard::defer;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use serde_json::json;
|
||||||
|
use solana_rpc_client_api::response::{Response, RpcBlockUpdate, RpcResponseContext, SlotUpdate};
|
||||||
|
use solana_sdk::pubkey::Pubkey;
|
||||||
|
use tokio::time::{Instant, timeout};
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
use websocket_tungstenite_retry::websocket_stable::WsMessage;
|
||||||
|
use crate::benches::tx_status_websocket_collector::start_tx_status_collector;
|
||||||
|
|
||||||
pub fn create_rpc_client(rpc_url: &Url) -> RpcClient {
|
pub fn create_rpc_client(rpc_url: &Url) -> RpcClient {
|
||||||
RpcClient::new_with_commitment(rpc_url.to_string(), CommitmentConfig::confirmed())
|
RpcClient::new_with_commitment(rpc_url.to_string(), CommitmentConfig::confirmed())
|
||||||
|
@ -28,7 +37,7 @@ pub enum ConfirmationResponseFromRpc {
|
||||||
// RPC error on send_transaction
|
// RPC error on send_transaction
|
||||||
SendError(Arc<ErrorKind>),
|
SendError(Arc<ErrorKind>),
|
||||||
// (sent slot at confirmed commitment, confirmed slot, ..., ...)
|
// (sent slot at confirmed commitment, confirmed slot, ..., ...)
|
||||||
// transaction_confirmation_status is "confirmed" or "finalized"
|
// transaction_confirmation_status is "confirmed" (finalized is not reported by blockSubscribe websocket
|
||||||
Success(Slot, Slot, TransactionConfirmationStatus, Duration),
|
Success(Slot, Slot, TransactionConfirmationStatus, Duration),
|
||||||
// timout waiting for confirmation status
|
// timout waiting for confirmation status
|
||||||
Timeout(Duration),
|
Timeout(Duration),
|
||||||
|
@ -36,6 +45,8 @@ pub enum ConfirmationResponseFromRpc {
|
||||||
|
|
||||||
pub async fn send_and_confirm_bulk_transactions(
|
pub async fn send_and_confirm_bulk_transactions(
|
||||||
rpc_client: &RpcClient,
|
rpc_client: &RpcClient,
|
||||||
|
tx_status_websocket_addr: Url,
|
||||||
|
payer_pubkey: Pubkey,
|
||||||
txs: &[VersionedTransaction],
|
txs: &[VersionedTransaction],
|
||||||
max_timeout: Duration,
|
max_timeout: Duration,
|
||||||
) -> anyhow::Result<Vec<(Signature, ConfirmationResponseFromRpc)>> {
|
) -> anyhow::Result<Vec<(Signature, ConfirmationResponseFromRpc)>> {
|
||||||
|
@ -53,6 +64,10 @@ pub async fn send_and_confirm_bulk_transactions(
|
||||||
min_context_slot: None,
|
min_context_slot: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// note: we get confirmed but never finaliized
|
||||||
|
let (tx_status_map, jh_collector) = start_tx_status_collector(tx_status_websocket_addr.clone(), payer_pubkey, CommitmentConfig::confirmed()).await;
|
||||||
|
defer!(jh_collector.abort());
|
||||||
|
|
||||||
let started_at = Instant::now();
|
let started_at = Instant::now();
|
||||||
trace!(
|
trace!(
|
||||||
"Sending {} transactions via RPC (retries=off) ..",
|
"Sending {} transactions via RPC (retries=off) ..",
|
||||||
|
@ -99,15 +114,16 @@ pub async fn send_and_confirm_bulk_transactions(
|
||||||
trace!("- tx_fail {}", tx_sig.get_signature());
|
trace!("- tx_fail {}", tx_sig.get_signature());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
let elapsed = started_at.elapsed();
|
||||||
debug!(
|
debug!(
|
||||||
"{} transactions sent successfully in {:.02}ms",
|
"{} transactions sent successfully in {:.02}ms",
|
||||||
num_sent_ok,
|
num_sent_ok,
|
||||||
started_at.elapsed().as_secs_f32() * 1000.0
|
elapsed.as_secs_f32() * 1000.0
|
||||||
);
|
);
|
||||||
debug!(
|
debug!(
|
||||||
"{} transactions failed to send in {:.02}ms",
|
"{} transactions failed to send in {:.02}ms",
|
||||||
num_sent_failed,
|
num_sent_failed,
|
||||||
started_at.elapsed().as_secs_f32() * 1000.0
|
elapsed.as_secs_f32() * 1000.0
|
||||||
);
|
);
|
||||||
|
|
||||||
if num_sent_failed > 0 {
|
if num_sent_failed > 0 {
|
||||||
|
@ -132,75 +148,63 @@ pub async fn send_and_confirm_bulk_transactions(
|
||||||
|
|
||||||
let started_at = Instant::now();
|
let started_at = Instant::now();
|
||||||
let timeout_at = started_at + max_timeout;
|
let timeout_at = started_at + max_timeout;
|
||||||
|
// "poll" the status dashmap
|
||||||
'polling_loop: for iteration in 1.. {
|
'polling_loop: for iteration in 1.. {
|
||||||
let iteration_ends_at = started_at + Duration::from_millis(iteration * 400);
|
let iteration_ends_at = started_at + Duration::from_millis(iteration * 100);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
pending_status_set.len() + result_status_map.len(),
|
pending_status_set.len() + result_status_map.len(),
|
||||||
num_sent_ok,
|
num_sent_ok,
|
||||||
"Items must move between pending+result"
|
"Items must move between pending+result"
|
||||||
);
|
);
|
||||||
let tx_batch = pending_status_set.iter().cloned().collect_vec();
|
// let tx_batch = pending_status_set.iter().cloned().collect_vec();
|
||||||
debug!(
|
// debug!(
|
||||||
"Request status for batch of remaining {} transactions in iteration {}",
|
// "Request status for batch of remaining {} transactions in iteration {}",
|
||||||
tx_batch.len(),
|
// tx_batch.len(),
|
||||||
iteration
|
// iteration
|
||||||
);
|
// );
|
||||||
|
|
||||||
let status_started_at = Instant::now();
|
// let status_started_at = Instant::now();
|
||||||
let mut batch_status = Vec::new();
|
// let mut batch_status = Vec::new();
|
||||||
// "Too many inputs provided; max 256"
|
// // "Too many inputs provided; max 256"
|
||||||
for chunk in tx_batch.chunks(256) {
|
// for chunk in tx_batch.chunks(256) {
|
||||||
// fail hard if not possible to poll status
|
// // fail hard if not possible to poll status
|
||||||
let chunk_responses = rpc_client
|
// let chunk_responses = rpc_client
|
||||||
.get_signature_statuses(chunk)
|
// .get_signature_statuses(chunk)
|
||||||
.await
|
// .await
|
||||||
.expect("get signature statuses");
|
// .expect("get signature statuses");
|
||||||
batch_status.extend(chunk_responses.value);
|
// batch_status.extend(chunk_responses.value);
|
||||||
}
|
// }
|
||||||
if status_started_at.elapsed() > Duration::from_millis(500) {
|
// if status_started_at.elapsed() > Duration::from_millis(500) {
|
||||||
warn!(
|
// warn!(
|
||||||
"SLOW get_signature_statuses for {} transactions took {:?}",
|
// "SLOW get_signature_statuses for {} transactions took {:?}",
|
||||||
tx_batch.len(),
|
// tx_batch.len(),
|
||||||
status_started_at.elapsed()
|
// status_started_at.elapsed()
|
||||||
);
|
// );
|
||||||
}
|
// }
|
||||||
let elapsed = started_at.elapsed();
|
let elapsed = started_at.elapsed();
|
||||||
|
|
||||||
for (tx_sig, status_response) in zip(tx_batch, batch_status) {
|
for multi in tx_status_map.iter() {
|
||||||
match status_response {
|
// note that we will see tx_sigs we did not send
|
||||||
Some(tx_status) => {
|
let (tx_sig, confirmed_slot) = multi.pair();
|
||||||
trace!(
|
|
||||||
"Some signature status {:?} received for {} after {:.02}ms",
|
// status is confirmed or finalized
|
||||||
tx_status.confirmation_status,
|
if pending_status_set.remove(&tx_sig) {
|
||||||
tx_sig,
|
trace!("take status for sig {:?} and confirmed_slot: {:?} from websocket source", tx_sig, confirmed_slot);
|
||||||
elapsed.as_secs_f32() * 1000.0
|
let prev_value = result_status_map.insert(
|
||||||
);
|
tx_sig.clone(),
|
||||||
if !tx_status.satisfies_commitment(CommitmentConfig::confirmed()) {
|
ConfirmationResponseFromRpc::Success(
|
||||||
continue 'polling_loop;
|
send_slot,
|
||||||
}
|
*confirmed_slot,
|
||||||
// status is confirmed or finalized
|
// note: this is not optimal
|
||||||
pending_status_set.remove(&tx_sig);
|
TransactionConfirmationStatus::Confirmed,
|
||||||
let prev_value = result_status_map.insert(
|
elapsed,
|
||||||
tx_sig,
|
),
|
||||||
ConfirmationResponseFromRpc::Success(
|
);
|
||||||
send_slot,
|
assert!(prev_value.is_none(), "Must not override existing value");
|
||||||
tx_status.slot,
|
|
||||||
tx_status.confirmation_status(),
|
|
||||||
elapsed,
|
|
||||||
),
|
|
||||||
);
|
|
||||||
assert!(prev_value.is_none(), "Must not override existing value");
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
// None: not yet processed by the cluster
|
|
||||||
trace!(
|
|
||||||
"No signature status was received for {} after {:.02}ms - continue waiting",
|
|
||||||
tx_sig,
|
|
||||||
elapsed.as_secs_f32() * 1000.0
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
} // -- END for tx_status_map loop
|
||||||
|
|
||||||
|
|
||||||
if pending_status_set.is_empty() {
|
if pending_status_set.is_empty() {
|
||||||
debug!(
|
debug!(
|
||||||
|
|
|
@ -0,0 +1,80 @@
|
||||||
|
use std::str::FromStr;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
use dashmap::{DashMap, DashSet};
|
||||||
|
use log::info;
|
||||||
|
use serde_json::json;
|
||||||
|
use solana_rpc_client_api::response::{Response, RpcBlockUpdate};
|
||||||
|
use solana_sdk::clock::Slot;
|
||||||
|
use solana_sdk::commitment_config::CommitmentConfig;
|
||||||
|
use solana_sdk::pubkey::Pubkey;
|
||||||
|
use solana_sdk::signature::Signature;
|
||||||
|
use tokio::task::AbortHandle;
|
||||||
|
use tokio::time::sleep;
|
||||||
|
use url::Url;
|
||||||
|
use websocket_tungstenite_retry::websocket_stable;
|
||||||
|
use websocket_tungstenite_retry::websocket_stable::WsMessage;
|
||||||
|
|
||||||
|
// TODO void race condition when this is not started up fast enough to catch transaction
|
||||||
|
// returns map of transaction signatures to the slot they were confirmed
|
||||||
|
pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commitment_config: CommitmentConfig)
|
||||||
|
-> (Arc<DashMap<Signature, Slot>>, AbortHandle) {
|
||||||
|
// e.g. "commitment"
|
||||||
|
let commitment_str = format!("{:?}", commitment_config);
|
||||||
|
|
||||||
|
let mut web_socket_slots = websocket_stable::StableWebSocket::new_with_timeout(
|
||||||
|
ws_url,
|
||||||
|
json!({
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"id": "1",
|
||||||
|
"method": "blockSubscribe",
|
||||||
|
"params": [
|
||||||
|
{
|
||||||
|
"mentionsAccountOrProgram": payer_pubkey.to_string(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"commitment": commitment_str,
|
||||||
|
"encoding": "base64",
|
||||||
|
"showRewards": false,
|
||||||
|
"transactionDetails": "signatures"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}),
|
||||||
|
Duration::from_secs(10),
|
||||||
|
).await
|
||||||
|
.expect("Failed to connect to websocket");
|
||||||
|
|
||||||
|
|
||||||
|
let mut channel = web_socket_slots.subscribe_message_channel();
|
||||||
|
|
||||||
|
let observed_transactions: Arc<DashMap<Signature, Slot>> = Arc::new(DashMap::with_capacity(64));
|
||||||
|
|
||||||
|
let observed_transactions_write = observed_transactions.clone();
|
||||||
|
let jh = tokio::spawn(async move {
|
||||||
|
let started_at = Instant::now();
|
||||||
|
|
||||||
|
while let Ok(msg) = channel.recv().await {
|
||||||
|
// TOOD use this to know when we are subscribed
|
||||||
|
info!("SOME MESSGE FROM SUbsCRIPTN");
|
||||||
|
if let WsMessage::Text(payload) = msg {
|
||||||
|
let ws_result: jsonrpsee_types::SubscriptionResponse<Response<RpcBlockUpdate>> =
|
||||||
|
serde_json::from_str(&payload).unwrap();
|
||||||
|
let block_update = ws_result.params.result;
|
||||||
|
let slot = block_update.value.slot;
|
||||||
|
if let Some(tx_sigs_from_block) = block_update.value.block.and_then(|b| b.signatures) {
|
||||||
|
for tx_sig in tx_sigs_from_block {
|
||||||
|
let tx_sig = Signature::from_str(&tx_sig).unwrap();
|
||||||
|
info!("Transaction signature: {} _> slot {}", tx_sig, slot);
|
||||||
|
info!("status map size: {} - up {:?}", observed_transactions_write.len(), started_at.elapsed());
|
||||||
|
observed_transactions_write.entry(tx_sig).or_insert(slot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// FIXME avoid race condition
|
||||||
|
sleep(Duration::from_secs(3)).await;
|
||||||
|
|
||||||
|
(observed_transactions, jh.abort_handle())
|
||||||
|
}
|
|
@ -7,10 +7,12 @@ use solana_rpc_client::nonblocking::rpc_client::RpcClient;
|
||||||
use solana_sdk::signature::Keypair;
|
use solana_sdk::signature::Keypair;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
pub async fn benchnew_confirmation_rate_servicerunner(
|
pub async fn benchnew_confirmation_rate_servicerunner(
|
||||||
bench_config: &BenchConfig,
|
bench_config: &BenchConfig,
|
||||||
rpc_addr: String,
|
rpc_addr: String,
|
||||||
|
tx_status_websocket_addr: String,
|
||||||
funded_payer: Keypair,
|
funded_payer: Keypair,
|
||||||
) -> confirmation_rate::Metric {
|
) -> confirmation_rate::Metric {
|
||||||
let rpc = Arc::new(RpcClient::new(rpc_addr));
|
let rpc = Arc::new(RpcClient::new(rpc_addr));
|
||||||
|
@ -21,6 +23,7 @@ pub async fn benchnew_confirmation_rate_servicerunner(
|
||||||
let max_timeout = Duration::from_secs(60);
|
let max_timeout = Duration::from_secs(60);
|
||||||
let result = send_bulk_txs_and_wait(
|
let result = send_bulk_txs_and_wait(
|
||||||
&rpc,
|
&rpc,
|
||||||
|
Url::parse(&tx_status_websocket_addr).expect("Invalid URL"),
|
||||||
&funded_payer,
|
&funded_payer,
|
||||||
bench_config.tx_count,
|
bench_config.tx_count,
|
||||||
&tx_params,
|
&tx_params,
|
||||||
|
|
|
@ -6,6 +6,7 @@ pub struct TenantConfig {
|
||||||
// technical identifier for the tenant, e.g. "solana-rpc"
|
// technical identifier for the tenant, e.g. "solana-rpc"
|
||||||
pub tenant_id: String,
|
pub tenant_id: String,
|
||||||
pub rpc_addr: String,
|
pub rpc_addr: String,
|
||||||
|
pub tx_status_ws_addr: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
// recommend to use one payer keypair for all targets and fund that keypair with enough SOL
|
// recommend to use one payer keypair for all targets and fund that keypair with enough SOL
|
||||||
|
|
|
@ -23,6 +23,7 @@ use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, SystemTime};
|
use std::time::{Duration, SystemTime};
|
||||||
use tokio::sync::OnceCell;
|
use tokio::sync::OnceCell;
|
||||||
|
use solana_lite_rpc_util::obfuscate_rpcurl;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
|
@ -42,10 +43,13 @@ async fn main() {
|
||||||
let funded_payer = Arc::new(get_funded_payer_from_env());
|
let funded_payer = Arc::new(get_funded_payer_from_env());
|
||||||
|
|
||||||
let tenant_configs = read_tenant_configs(std::env::vars().collect::<Vec<(String, String)>>());
|
let tenant_configs = read_tenant_configs(std::env::vars().collect::<Vec<(String, String)>>());
|
||||||
|
// this should point to a reliable websocket RPC node
|
||||||
|
let tx_status_websocket_addr: String = std::env::var("TX_STATUS_WS_ADDR").expect("need TX_STATUS_WS_ADDR env var");
|
||||||
|
|
||||||
info!("Use postgres config: {:?}", postgres_config.is_some());
|
info!("Use postgres config: {:?}", postgres_config.is_some());
|
||||||
info!("Use prio fees: [{}]", prio_fees.iter().join(","));
|
info!("Use prio fees: [{}]", prio_fees.iter().join(","));
|
||||||
info!("Start running benchmarks every {:?}", bench_interval);
|
info!("Start running benchmarks every {:?}", bench_interval);
|
||||||
|
info!("Use websocket for tx status: {:?}", obfuscate_rpcurl(&tx_status_websocket_addr));
|
||||||
info!(
|
info!(
|
||||||
"Found tenants: {}",
|
"Found tenants: {}",
|
||||||
tenant_configs.iter().map(|tc| &tc.tenant_id).join(", ")
|
tenant_configs.iter().map(|tc| &tc.tenant_id).join(", ")
|
||||||
|
@ -87,6 +91,7 @@ async fn main() {
|
||||||
let tenant_id = tenant_config.tenant_id.clone();
|
let tenant_id = tenant_config.tenant_id.clone();
|
||||||
let postgres_session = postgres_session.clone();
|
let postgres_session = postgres_session.clone();
|
||||||
let tenant_config = tenant_config.clone();
|
let tenant_config = tenant_config.clone();
|
||||||
|
let tx_status_websocket_addr = tx_status_websocket_addr.clone();
|
||||||
let bench_configs = bench_configs.clone();
|
let bench_configs = bench_configs.clone();
|
||||||
let jh_runner = tokio::spawn(async move {
|
let jh_runner = tokio::spawn(async move {
|
||||||
let mut interval = tokio::time::interval(bench_interval);
|
let mut interval = tokio::time::interval(bench_interval);
|
||||||
|
@ -103,6 +108,7 @@ async fn main() {
|
||||||
0 => Box::new(BenchRunnerConfirmationRateImpl {
|
0 => Box::new(BenchRunnerConfirmationRateImpl {
|
||||||
benchrun_at,
|
benchrun_at,
|
||||||
tenant_config: tenant_config.clone(),
|
tenant_config: tenant_config.clone(),
|
||||||
|
tx_status_websocket_addr: tx_status_websocket_addr.clone(),
|
||||||
bench_config: bench_config.clone(),
|
bench_config: bench_config.clone(),
|
||||||
funded_payer: funded_payer.clone(),
|
funded_payer: funded_payer.clone(),
|
||||||
metric: OnceCell::new(),
|
metric: OnceCell::new(),
|
||||||
|
@ -237,6 +243,7 @@ impl BenchTrait for BenchRunnerConfirmationRateImpl {}
|
||||||
struct BenchRunnerConfirmationRateImpl {
|
struct BenchRunnerConfirmationRateImpl {
|
||||||
pub benchrun_at: SystemTime,
|
pub benchrun_at: SystemTime,
|
||||||
pub tenant_config: TenantConfig,
|
pub tenant_config: TenantConfig,
|
||||||
|
pub tx_status_websocket_addr: String,
|
||||||
pub bench_config: BenchConfig,
|
pub bench_config: BenchConfig,
|
||||||
pub funded_payer: Arc<Keypair>,
|
pub funded_payer: Arc<Keypair>,
|
||||||
pub metric: OnceCell<confirmation_rate::Metric>,
|
pub metric: OnceCell<confirmation_rate::Metric>,
|
||||||
|
@ -248,6 +255,7 @@ impl BenchRunner for BenchRunnerConfirmationRateImpl {
|
||||||
let metric = bench::service_adapter_new::benchnew_confirmation_rate_servicerunner(
|
let metric = bench::service_adapter_new::benchnew_confirmation_rate_servicerunner(
|
||||||
&self.bench_config,
|
&self.bench_config,
|
||||||
self.tenant_config.rpc_addr.clone(),
|
self.tenant_config.rpc_addr.clone(),
|
||||||
|
self.tx_status_websocket_addr.clone(),
|
||||||
self.funded_payer.insecure_clone(),
|
self.funded_payer.insecure_clone(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
Loading…
Reference in New Issue