Bugfixes and making tests work

This commit is contained in:
Godmode Galactus 2023-04-04 17:19:42 +02:00
parent bf5841f438
commit b1343f9835
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
8 changed files with 69 additions and 54 deletions

1
Cargo.lock generated
View File

@ -381,6 +381,7 @@ dependencies = [
"dirs",
"log",
"rand 0.8.5",
"rand_chacha 0.3.1",
"serde",
"serde_json",
"solana-rpc-client",

View File

@ -16,4 +16,5 @@ tracing-subscriber = { workspace = true }
csv = "1.2.1"
dirs = "5.0.0"
rand = "0.8.5"
rand_chacha = "0.3.1"

View File

@ -1,7 +1,7 @@
use std::str::FromStr;
use std::{str::FromStr, time::Duration};
use anyhow::Context;
use rand::{distributions::Alphanumeric, prelude::Distribution};
use rand::{distributions::Alphanumeric, prelude::Distribution, SeedableRng};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{
commitment_config::CommitmentConfig,
@ -14,8 +14,10 @@ use solana_sdk::{
system_instruction,
transaction::Transaction,
};
use tokio::time::Instant;
const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr";
const WAIT_LIMIT_IN_SECONDS: u64 = 60;
pub struct BenchHelper;
@ -40,7 +42,11 @@ impl BenchHelper {
sig: &Signature,
commitment_config: CommitmentConfig,
) -> anyhow::Result<()> {
let instant = Instant::now();
loop {
if instant.elapsed() > Duration::from_secs(WAIT_LIMIT_IN_SECONDS) {
return Err(anyhow::Error::msg("Timedout waiting"));
}
if let Some(err) = rpc_client
.get_signature_status_with_commitment(sig, commitment_config)
.await?
@ -68,13 +74,13 @@ impl BenchHelper {
num_of_txs: usize,
funded_payer: &Keypair,
blockhash: Hash,
random_seed: Option<u64>,
) -> Vec<Transaction> {
let seed = random_seed.map_or(0, |x| x);
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(seed);
(0..num_of_txs)
.map(|_| {
let random_bytes: Vec<u8> = Alphanumeric
.sample_iter(rand::thread_rng())
.take(10)
.collect();
let random_bytes: Vec<u8> = Alphanumeric.sample_iter(&mut rng).take(10).collect();
Self::create_memo_tx(&random_bytes, funded_payer, blockhash)
})

View File

@ -62,7 +62,7 @@ async fn bench(rpc_client: Arc<RpcClient>, tx_count: usize) -> Metric {
let funded_payer = BenchHelper::get_payer().await.unwrap();
let blockhash = rpc_client.get_latest_blockhash().await.unwrap();
let txs = BenchHelper::generate_txs(tx_count, &funded_payer, blockhash);
let txs = BenchHelper::generate_txs(tx_count, &funded_payer, blockhash, None);
let mut un_confirmed_txs: HashMap<Signature, Option<Instant>> =
HashMap::with_capacity(txs.len());

View File

@ -243,6 +243,11 @@ impl BlockListener {
TXS_CONFIRMED.inc();
}
info!(
"got transaction {} confrimation level {}",
sig, commitment_config.commitment
);
tx_status.value_mut().status = Some(TransactionStatus {
slot,
confirmations: None,

View File

@ -9,8 +9,8 @@ use std::{
};
use dashmap::DashMap;
use log::{error, info};
use prometheus::{core::GenericGauge, register_int_gauge, opts};
use log::{error, info, trace};
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use quinn::{
ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime,
TransportConfig,
@ -21,8 +21,6 @@ use tokio::{
time::timeout,
};
use crate::DEFAULT_TX_BATCH_SIZE;
use super::rotating_queue::RotatingQueue;
pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";
@ -110,7 +108,7 @@ impl ActiveConnection {
) {
NB_QUIC_TASKS.inc();
let mut already_connected = false;
let mut connection: Option<(Connection, quinn::SendStream)> = None;
let mut connection: Option<Connection> = None;
let mut transaction_reciever = transaction_reciever;
let mut exit_oneshot_channel = exit_oneshot_channel;
loop {
@ -138,8 +136,15 @@ impl ActiveConnection {
continue;
}
};
let (conn, mut send_stream) = match connection {
Some(conn) => conn,
let mut send_stream = match &connection {
Some(conn) => {
let unistream = conn.open_uni().await;
if let Err(e) = unistream {
error!("error opening a unistream for {} error {}", identity, e);
continue;
}
unistream.unwrap()
},
None => {
let conn = if already_connected {
info!("making make_connection_0rtt");
@ -157,7 +162,9 @@ impl ActiveConnection {
error!("error opening a unistream for {} error {}", identity, e);
continue;
}
(conn, unistream.unwrap())
connection = Some(conn);
unistream.unwrap()
},
Err(e) => {
error!("Could not connect to {} because of error {}", identity, e);
@ -167,44 +174,33 @@ impl ActiveConnection {
}
};
let mut length = 1;
let mut tx_batch = tx;
while length < DEFAULT_TX_BATCH_SIZE {
match transaction_reciever.try_recv() {
Ok(mut tx) => {
length += 1;
tx_batch.append(&mut tx);
},
_ => {
break;
}
}
}
info!("Sending {} {} transactions", identity, length);
if let Err(e) = send_stream.write_all(tx_batch.as_slice()).await {
trace!("Sending {} transaction", identity);
if let Err(e) = send_stream.write_all(tx.as_slice()).await {
error!(
"Error while writing transaction for {} error {}",
identity, e
);
}
connection = Some((conn, send_stream));
if let Err(e) = send_stream.finish().await {
error!(
"Error finishing for {}, error {}",
identity, e,
)
}
},
Err(_) => {
// timed out
if let Some((_,stream)) = &mut connection {
info!("finishing {}", identity);
if let Some(_) = &mut connection {
NB_QUIC_CONNECTIONS.dec();
let _ = stream.finish().await;
connection = None;
}
}
}
},
_ = exit_oneshot_channel.recv() => {
if let Some((_,stream)) = &mut connection {
if let Some(_) = &mut connection {
NB_QUIC_CONNECTIONS.dec();
let _ = stream.finish().await;
connection = None;
}
@ -213,9 +209,8 @@ impl ActiveConnection {
};
}
if let Some((_, stream)) = &mut connection {
if let Some(_) = &mut connection {
NB_QUIC_CONNECTIONS.dec();
let _ = stream.finish().await;
}
NB_QUIC_TASKS.dec();
}

View File

@ -96,21 +96,21 @@ impl TxSender {
let txs_sent = self.txs_sent_store.clone();
for (sig, _) in &sigs_and_slots {
info!("sending transaction {}", sig);
txs_sent.insert(sig.to_owned(), TxProps::default());
}
let forwarded_slot = tpu_client.get_estimated_slot();
let transaction_batch_size = txs.len() as u64;
let mut quic_responses = vec![];
for tx in txs {
let quic_response = match tpu_client.send_transaction(tx) {
Ok(_) => {
TXS_SENT.inc_by(transaction_batch_size);
TXS_SENT.inc_by(1);
1
}
Err(err) => {
TXS_SENT_ERRORS.inc_by(transaction_batch_size);
TXS_SENT_ERRORS.inc_by(1);
warn!("{err}");
0
}
@ -167,6 +167,10 @@ impl TxSender {
{
Ok(value) => match value {
Some((sig, tx, slot)) => {
if self.txs_sent_store.contains_key(&sig) {
// duplicate transaction
continue;
}
TXS_IN_CHANNEL.dec();
sigs_and_slots.push((sig, slot));
txs.push(tx);

View File

@ -1,7 +1,7 @@
use bench::helpers::BenchHelper;
use lite_rpc::DEFAULT_LITE_RPC_ADDR;
use log::info;
use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
#[tokio::test]
@ -13,15 +13,17 @@ async fn send_and_confirm_txs_get_signature_statuses() {
let funded_payer = BenchHelper::get_payer().await.unwrap();
let blockhash = rpc_client.get_latest_blockhash().await.unwrap();
let tx = &BenchHelper::generate_txs(1, &funded_payer, blockhash)[0];
let sig = tx.get_signature();
let txs = BenchHelper::generate_txs(5, &funded_payer, blockhash, Some(1));
let signatures = txs.iter().map(|x| x.signatures[0]).collect::<Vec<_>>();
rpc_client.send_transaction(tx).await.unwrap();
info!("{sig}");
BenchHelper::wait_till_signature_status(&rpc_client, sig, CommitmentConfig::confirmed())
.await
.unwrap();
for tx in txs {
rpc_client.send_transaction(&tx).await.unwrap();
}
for sig in signatures {
BenchHelper::wait_till_signature_status(&rpc_client, &sig, CommitmentConfig::confirmed())
.await
.unwrap();
}
}
#[tokio::test]
@ -31,10 +33,11 @@ async fn send_and_confirm_tx_rpc_client() {
let funded_payer = BenchHelper::get_payer().await.unwrap();
let blockhash = rpc_client.get_latest_blockhash().await.unwrap();
let tx = &BenchHelper::generate_txs(1, &funded_payer, blockhash)[0];
let sig = tx.get_signature();
let txs = BenchHelper::generate_txs(5, &funded_payer, blockhash, Some(2));
rpc_client.send_and_confirm_transaction(tx).await.unwrap();
for tx in txs {
rpc_client.send_and_confirm_transaction(&tx).await.unwrap();
info!("Sent and Confirmed {sig}");
info!("Sent and Confirmed {}", tx.signatures[0]);
}
}