Create nonce keypair chunks (#26670)

* extract KeypairChunks structure

* introduce durable nonce in TransactionChunkGenerator

* Introduce TimestampedTransaction with optional timestamp
This commit is contained in:
kirill lykov 2022-07-21 14:50:23 +02:00 committed by GitHub
parent 8105b761ed
commit 8465a3aa46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 152 additions and 50 deletions

View File

@ -7,6 +7,7 @@ use {
},
log::*,
rayon::prelude::*,
solana_client::nonce_utils,
solana_metrics::{self, datapoint_info},
solana_sdk::{
clock::{DEFAULT_MS_PER_SLOT, DEFAULT_S_PER_SLOT, MAX_PROCESSING_AGE},
@ -35,57 +36,95 @@ use {
// The point at which transactions become "too old", in seconds.
const MAX_TX_QUEUE_AGE: u64 = (MAX_PROCESSING_AGE as f64 * DEFAULT_S_PER_SLOT) as u64;
pub type SharedTransactions = Arc<RwLock<VecDeque<Vec<(Transaction, u64)>>>>;
pub type TimestampedTransaction = (Transaction, Option<u64>);
pub type SharedTransactions = Arc<RwLock<VecDeque<Vec<TimestampedTransaction>>>>;
/// Split input vector of keypairs into two sets of chunks of given size
fn split_into_source_destination(
keypairs: &[Keypair],
chunk_size: usize,
) -> (Vec<Vec<&Keypair>>, Vec<VecDeque<&Keypair>>) {
let mut source_keypair_chunks: Vec<Vec<&Keypair>> = Vec::new();
let mut dest_keypair_chunks: Vec<VecDeque<&Keypair>> = Vec::new();
for chunk in keypairs.chunks_exact(2 * chunk_size) {
source_keypair_chunks.push(chunk[..chunk_size].iter().collect());
dest_keypair_chunks.push(chunk[chunk_size..].iter().collect());
}
(source_keypair_chunks, dest_keypair_chunks)
/// Keypairs split into source and destination
/// used for transfer transactions
struct KeypairChunks<'a> {
source: Vec<Vec<&'a Keypair>>,
dest: Vec<VecDeque<&'a Keypair>>,
}
struct TransactionChunkGenerator<'a> {
source_keypair_chunks: Vec<Vec<&'a Keypair>>,
dest_keypair_chunks: Vec<VecDeque<&'a Keypair>>,
impl<'a> KeypairChunks<'a> {
/// Split input vector of keypairs into two sets of chunks of given size
fn new(keypairs: &'a [Keypair], chunk_size: usize) -> Self {
let mut source_keypair_chunks: Vec<Vec<&Keypair>> = Vec::new();
let mut dest_keypair_chunks: Vec<VecDeque<&Keypair>> = Vec::new();
for chunk in keypairs.chunks_exact(2 * chunk_size) {
source_keypair_chunks.push(chunk[..chunk_size].iter().collect());
dest_keypair_chunks.push(chunk[chunk_size..].iter().collect());
}
KeypairChunks {
source: source_keypair_chunks,
dest: dest_keypair_chunks,
}
}
}
struct TransactionChunkGenerator<'a, 'b, T> {
client: Arc<T>,
account_chunks: KeypairChunks<'a>,
nonce_chunks: Option<KeypairChunks<'b>>,
chunk_index: usize,
reclaim_lamports_back_to_source_account: bool,
}
impl<'a> TransactionChunkGenerator<'a> {
fn new(gen_keypairs: &'a [Keypair], chunk_size: usize) -> Self {
let (source_keypair_chunks, dest_keypair_chunks) =
split_into_source_destination(gen_keypairs, chunk_size);
impl<'a, 'b, T> TransactionChunkGenerator<'a, 'b, T>
where
T: 'static + BenchTpsClient + Send + Sync,
{
fn new(
client: Arc<T>,
gen_keypairs: &'a [Keypair],
nonce_keypairs: Option<&'b Vec<Keypair>>,
chunk_size: usize,
) -> Self {
let account_chunks = KeypairChunks::new(gen_keypairs, chunk_size);
let nonce_chunks =
nonce_keypairs.map(|nonce_keypairs| KeypairChunks::new(nonce_keypairs, chunk_size));
TransactionChunkGenerator {
source_keypair_chunks,
dest_keypair_chunks,
client,
account_chunks,
nonce_chunks,
chunk_index: 0,
reclaim_lamports_back_to_source_account: false,
}
}
fn generate(&mut self, blockhash: Option<&Hash>) -> Vec<(Transaction, u64)> {
let tx_count = self.source_keypair_chunks.len();
/// generate transactions to transfer lamports from source to destination accounts
/// if durable nonce is used, blockhash is None
fn generate(&mut self, blockhash: Option<&Hash>) -> Vec<TimestampedTransaction> {
let tx_count = self.account_chunks.source.len();
info!(
"Signing transactions... {} (reclaim={}, blockhash={:?})",
tx_count, self.reclaim_lamports_back_to_source_account, blockhash
);
let signing_start = Instant::now();
let source_chunk = &self.source_keypair_chunks[self.chunk_index];
let dest_chunk = &self.dest_keypair_chunks[self.chunk_index];
assert!(blockhash.is_some());
let transactions = generate_system_txs(
source_chunk,
dest_chunk,
self.reclaim_lamports_back_to_source_account,
blockhash.unwrap(),
);
let source_chunk = &self.account_chunks.source[self.chunk_index];
let dest_chunk = &self.account_chunks.dest[self.chunk_index];
let transactions = if let Some(nonce_chunks) = &self.nonce_chunks {
let source_nonce_chunk = &nonce_chunks.source[self.chunk_index];
let dest_nonce_chunk: &VecDeque<&Keypair> = &nonce_chunks.dest[self.chunk_index];
generate_nonced_system_txs(
self.client.clone(),
source_chunk,
dest_chunk,
source_nonce_chunk,
dest_nonce_chunk,
self.reclaim_lamports_back_to_source_account,
)
} else {
assert!(blockhash.is_some());
generate_system_txs(
source_chunk,
dest_chunk,
self.reclaim_lamports_back_to_source_account,
blockhash.unwrap(),
)
};
let duration = signing_start.elapsed();
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
@ -109,10 +148,12 @@ impl<'a> TransactionChunkGenerator<'a> {
fn advance(&mut self) {
// Rotate destination keypairs so that the next round of transactions will have different
// transaction signatures even when blockhash is reused.
self.dest_keypair_chunks[self.chunk_index].rotate_left(1);
self.account_chunks.dest[self.chunk_index].rotate_left(1);
if let Some(nonce_chunks) = &mut self.nonce_chunks {
nonce_chunks.dest[self.chunk_index].rotate_left(1);
}
// Move on to next chunk
self.chunk_index = (self.chunk_index + 1) % self.source_keypair_chunks.len();
self.chunk_index = (self.chunk_index + 1) % self.account_chunks.source.len();
// Switch directions after transfering for each "chunk"
if self.chunk_index == 0 {
@ -168,11 +209,11 @@ where
.unwrap()
}
fn generate_chunked_transfers(
fn generate_chunked_transfers<T: 'static + BenchTpsClient + Send + Sync>(
recent_blockhash: Arc<RwLock<Hash>>,
shared_txs: &SharedTransactions,
shared_tx_active_thread_count: Arc<AtomicIsize>,
mut chunk_generator: TransactionChunkGenerator<'_>,
mut chunk_generator: TransactionChunkGenerator<'_, '_, T>,
threads: usize,
duration: Duration,
sustained: bool,
@ -265,7 +306,12 @@ where
} = config;
assert!(gen_keypairs.len() >= 2 * tx_count);
let chunk_generator = TransactionChunkGenerator::new(&gen_keypairs, tx_count);
let chunk_generator = TransactionChunkGenerator::new(
client.clone(),
&gen_keypairs,
None, // TODO(klykov): to be added in the follow up PR
tx_count,
);
let first_tx_count = loop {
match client.get_transaction_count() {
@ -377,7 +423,7 @@ fn generate_system_txs(
dest: &VecDeque<&Keypair>,
reclaim: bool,
blockhash: &Hash,
) -> Vec<(Transaction, u64)> {
) -> Vec<TimestampedTransaction> {
let pairs: Vec<_> = if !reclaim {
source.iter().zip(dest.iter()).collect()
} else {
@ -389,16 +435,70 @@ fn generate_system_txs(
.map(|(from, to)| {
(
system_transaction::transfer(from, &to.pubkey(), 1, *blockhash),
timestamp(),
Some(timestamp()),
)
})
.collect()
}
fn generate_txs(
fn get_nonce_blockhash<T: 'static + BenchTpsClient + Send + Sync>(
client: Arc<T>,
nonce_account_pubkey: Pubkey,
) -> Hash {
let nonce_account = client
.get_account(&nonce_account_pubkey)
.unwrap_or_else(|error| panic!("{:?}", error));
let nonce_data = nonce_utils::data_from_account(&nonce_account)
.unwrap_or_else(|error| panic!("{:?}", error));
nonce_data.blockhash()
}
fn generate_nonced_system_txs<T: 'static + BenchTpsClient + Send + Sync>(
client: Arc<T>,
source: &[&Keypair],
dest: &VecDeque<&Keypair>,
source_nonce: &[&Keypair],
dest_nonce: &VecDeque<&Keypair>,
reclaim: bool,
) -> Vec<TimestampedTransaction> {
let length = source.len();
let mut transactions: Vec<TimestampedTransaction> = Vec::with_capacity(length);
for i in 0..length {
let (from, to, nonce, nonce_blockhash) = if !reclaim {
(
source[i],
dest[i],
source_nonce[i],
get_nonce_blockhash(client.clone(), source_nonce[i].pubkey()),
)
} else {
(
dest[i],
source[i],
dest_nonce[i],
get_nonce_blockhash(client.clone(), dest_nonce[i].pubkey()),
)
};
transactions.push((
system_transaction::nonced_transfer(
from,
&to.pubkey(),
1,
&nonce.pubkey(),
from,
nonce_blockhash,
),
None,
));
}
transactions
}
fn generate_txs<T: 'static + BenchTpsClient + Send + Sync>(
shared_txs: &SharedTransactions,
blockhash: &Arc<RwLock<Hash>>,
chunk_generator: &mut TransactionChunkGenerator<'_>,
chunk_generator: &mut TransactionChunkGenerator<'_, '_, T>,
threads: usize,
) {
let blockhash = blockhash.read().map(|x| *x).ok();
@ -508,14 +608,16 @@ fn do_tx_transfers<T: BenchTpsClient>(
let mut min_timestamp = u64::MAX;
for tx in txs0 {
let now = timestamp();
// Transactions that are too old will be rejected by the cluster Don't bother
// Transactions without durable nonce that are too old will be rejected by the cluster Don't bother
// sending them.
if tx.1 < min_timestamp {
min_timestamp = tx.1;
}
if now > tx.1 && now - tx.1 > 1000 * MAX_TX_QUEUE_AGE {
old_transactions = true;
continue;
if let Some(tx_timestamp) = tx.1 {
if tx_timestamp < min_timestamp {
min_timestamp = tx_timestamp;
}
if now > tx_timestamp && now - tx_timestamp > 1000 * MAX_TX_QUEUE_AGE {
old_transactions = true;
continue;
}
}
transactions.push(tx.0);
}