Replace get_tmp_ledger_path!() with self cleaning version (#33702)

This macro is used a lot for tests to create a ledger path in order to
open a Blockstore. Files will be left on disk unless the test remembers
to call Blockstore::destroy() on the directory. So, instead of requiring
this, use the get_tmp_ledger_path_auto_delete!() macro that creates a
TempDir (which automatically deletes itself when it goes out of scope).
This commit is contained in:
steviez 2023-10-21 11:38:31 +02:00 committed by GitHub
parent 5a963529a8
commit 56ccffdaa5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 2081 additions and 2211 deletions

View File

@ -14,7 +14,7 @@ use {
solana_ledger::{
blockstore::Blockstore,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
get_tmp_ledger_path,
get_tmp_ledger_path_auto_delete,
leader_schedule_cache::LeaderScheduleCache,
},
solana_measure::measure::Measure,
@ -410,216 +410,212 @@ fn main() {
}
}
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(
Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder(
bank.clone(),
blockstore.clone(),
None,
Some(leader_schedule_cache),
);
let (banking_tracer, tracer_thread) =
BankingTracer::new(matches.is_present("trace_banking").then_some((
&blockstore.banking_trace_path(),
exit.clone(),
BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
)))
.unwrap();
let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote();
let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote();
let (gossip_vote_sender, gossip_vote_receiver) =
banking_tracer.create_channel_gossip_vote();
let cluster_info = {
let keypair = Arc::new(Keypair::new());
let node = Node::new_localhost_with_pubkey(&keypair.pubkey());
ClusterInfo::new(node.info, keypair, SocketAddrSpace::Unspecified)
};
let cluster_info = Arc::new(cluster_info);
let tpu_disable_quic = matches.is_present("tpu_disable_quic");
let connection_cache = match tpu_disable_quic {
false => ConnectionCache::new_quic(
"connection_cache_banking_bench_quic",
DEFAULT_TPU_CONNECTION_POOL_SIZE,
),
true => ConnectionCache::with_udp(
"connection_cache_banking_bench_udp",
DEFAULT_TPU_CONNECTION_POOL_SIZE,
),
};
let banking_stage = BankingStage::new_thread_local_multi_iterator(
&cluster_info,
&poh_recorder,
non_vote_receiver,
tpu_vote_receiver,
gossip_vote_receiver,
num_banking_threads,
None,
replay_vote_sender,
None,
Arc::new(connection_cache),
bank_forks.clone(),
&Arc::new(PrioritizationFeeCache::new(0u64)),
);
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(
Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger"),
);
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder(
bank.clone(),
blockstore.clone(),
None,
Some(leader_schedule_cache),
);
let (banking_tracer, tracer_thread) =
BankingTracer::new(matches.is_present("trace_banking").then_some((
&blockstore.banking_trace_path(),
exit.clone(),
BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
)))
.unwrap();
let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote();
let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote();
let (gossip_vote_sender, gossip_vote_receiver) = banking_tracer.create_channel_gossip_vote();
let cluster_info = {
let keypair = Arc::new(Keypair::new());
let node = Node::new_localhost_with_pubkey(&keypair.pubkey());
ClusterInfo::new(node.info, keypair, SocketAddrSpace::Unspecified)
};
let cluster_info = Arc::new(cluster_info);
let tpu_disable_quic = matches.is_present("tpu_disable_quic");
let connection_cache = match tpu_disable_quic {
false => ConnectionCache::new_quic(
"connection_cache_banking_bench_quic",
DEFAULT_TPU_CONNECTION_POOL_SIZE,
),
true => ConnectionCache::with_udp(
"connection_cache_banking_bench_udp",
DEFAULT_TPU_CONNECTION_POOL_SIZE,
),
};
let banking_stage = BankingStage::new_thread_local_multi_iterator(
&cluster_info,
&poh_recorder,
non_vote_receiver,
tpu_vote_receiver,
gossip_vote_receiver,
num_banking_threads,
None,
replay_vote_sender,
None,
Arc::new(connection_cache),
bank_forks.clone(),
&Arc::new(PrioritizationFeeCache::new(0u64)),
);
// This is so that the signal_receiver does not go out of scope after the closure.
// If it is dropped before poh_service, then poh_service will error when
// calling send() on the channel.
let signal_receiver = Arc::new(signal_receiver);
let mut total_us = 0;
let mut tx_total_us = 0;
let base_tx_count = bank.transaction_count();
let mut txs_processed = 0;
let collector = solana_sdk::pubkey::new_rand();
let mut total_sent = 0;
for current_iteration_index in 0..iterations {
trace!("RUNNING ITERATION {}", current_iteration_index);
let now = Instant::now();
let mut sent = 0;
// This is so that the signal_receiver does not go out of scope after the closure.
// If it is dropped before poh_service, then poh_service will error when
// calling send() on the channel.
let signal_receiver = Arc::new(signal_receiver);
let mut total_us = 0;
let mut tx_total_us = 0;
let base_tx_count = bank.transaction_count();
let mut txs_processed = 0;
let collector = solana_sdk::pubkey::new_rand();
let mut total_sent = 0;
for current_iteration_index in 0..iterations {
trace!("RUNNING ITERATION {}", current_iteration_index);
let now = Instant::now();
let mut sent = 0;
let packets_for_this_iteration = &all_packets[current_iteration_index % num_chunks];
for (packet_batch_index, packet_batch) in
packets_for_this_iteration.packet_batches.iter().enumerate()
{
sent += packet_batch.len();
trace!(
"Sending PacketBatch index {}, {}",
packet_batch_index,
timestamp(),
);
non_vote_sender
.send(BankingPacketBatch::new((vec![packet_batch.clone()], None)))
.unwrap();
}
let packets_for_this_iteration = &all_packets[current_iteration_index % num_chunks];
for (packet_batch_index, packet_batch) in
packets_for_this_iteration.packet_batches.iter().enumerate()
{
sent += packet_batch.len();
trace!(
"Sending PacketBatch index {}, {}",
packet_batch_index,
timestamp(),
);
non_vote_sender
.send(BankingPacketBatch::new((vec![packet_batch.clone()], None)))
.unwrap();
}
for tx in &packets_for_this_iteration.transactions {
loop {
if bank.get_signature_status(&tx.signatures[0]).is_some() {
break;
}
if poh_recorder.read().unwrap().bank().is_none() {
break;
}
sleep(Duration::from_millis(5));
for tx in &packets_for_this_iteration.transactions {
loop {
if bank.get_signature_status(&tx.signatures[0]).is_some() {
break;
}
}
// check if txs had been processed by bank. Returns when all transactions are
// processed, with `FALSE` indicate there is still bank. or returns TRUE indicate a
// bank has expired before receiving all txs.
if check_txs(
&signal_receiver,
packets_for_this_iteration.transactions.len(),
&poh_recorder,
) {
eprintln!(
"[iteration {}, tx sent {}, slot {} expired, bank tx count {}]",
current_iteration_index,
sent,
bank.slot(),
bank.transaction_count(),
);
tx_total_us += duration_as_us(&now.elapsed());
let mut poh_time = Measure::start("poh_time");
poh_recorder
.write()
.unwrap()
.reset(bank.clone(), Some((bank.slot(), bank.slot() + 1)));
poh_time.stop();
let mut new_bank_time = Measure::start("new_bank");
let new_slot = bank.slot() + 1;
let new_bank = Bank::new_from_parent(bank, &collector, new_slot);
new_bank_time.stop();
let mut insert_time = Measure::start("insert_time");
bank_forks.write().unwrap().insert(new_bank);
bank = bank_forks.read().unwrap().working_bank();
insert_time.stop();
// set cost tracker limits to MAX so it will not filter out TXs
bank.write_cost_tracker().unwrap().set_limits(
std::u64::MAX,
std::u64::MAX,
std::u64::MAX,
);
assert!(poh_recorder.read().unwrap().bank().is_none());
poh_recorder
.write()
.unwrap()
.set_bank_for_test(bank.clone());
assert!(poh_recorder.read().unwrap().bank().is_some());
debug!(
"new_bank_time: {}us insert_time: {}us poh_time: {}us",
new_bank_time.as_us(),
insert_time.as_us(),
poh_time.as_us(),
);
} else {
eprintln!(
"[iteration {}, tx sent {}, slot {} active, bank tx count {}]",
current_iteration_index,
sent,
bank.slot(),
bank.transaction_count(),
);
tx_total_us += duration_as_us(&now.elapsed());
}
// This signature clear may not actually clear the signatures
// in this chunk, but since we rotate between CHUNKS then
// we should clear them by the time we come around again to re-use that chunk.
bank.clear_signatures();
total_us += duration_as_us(&now.elapsed());
total_sent += sent;
if current_iteration_index % num_chunks == 0 {
let last_blockhash = bank.last_blockhash();
for packets_for_single_iteration in all_packets.iter_mut() {
packets_for_single_iteration.refresh_blockhash(last_blockhash);
if poh_recorder.read().unwrap().bank().is_none() {
break;
}
sleep(Duration::from_millis(5));
}
}
txs_processed += bank_forks
.read()
.unwrap()
.working_bank()
.transaction_count();
debug!("processed: {} base: {}", txs_processed, base_tx_count);
eprintln!("[total_sent: {}, base_tx_count: {}, txs_processed: {}, txs_landed: {}, total_us: {}, tx_total_us: {}]",
total_sent, base_tx_count, txs_processed, (txs_processed - base_tx_count), total_us, tx_total_us);
// check if txs had been processed by bank. Returns when all transactions are
// processed, with `FALSE` indicate there is still bank. or returns TRUE indicate a
// bank has expired before receiving all txs.
if check_txs(
&signal_receiver,
packets_for_this_iteration.transactions.len(),
&poh_recorder,
) {
eprintln!(
"[iteration {}, tx sent {}, slot {} expired, bank tx count {}]",
current_iteration_index,
sent,
bank.slot(),
bank.transaction_count(),
);
tx_total_us += duration_as_us(&now.elapsed());
eprintln!(
"{{'name': 'banking_bench_total', 'median': '{:.2}'}}",
(1000.0 * 1000.0 * total_sent as f64) / (total_us as f64),
);
eprintln!(
"{{'name': 'banking_bench_tx_total', 'median': '{:.2}'}}",
(1000.0 * 1000.0 * total_sent as f64) / (tx_total_us as f64),
);
eprintln!(
"{{'name': 'banking_bench_success_tx_total', 'median': '{:.2}'}}",
(1000.0 * 1000.0 * (txs_processed - base_tx_count) as f64) / (total_us as f64),
);
let mut poh_time = Measure::start("poh_time");
poh_recorder
.write()
.unwrap()
.reset(bank.clone(), Some((bank.slot(), bank.slot() + 1)));
poh_time.stop();
drop(non_vote_sender);
drop(tpu_vote_sender);
drop(gossip_vote_sender);
exit.store(true, Ordering::Relaxed);
banking_stage.join().unwrap();
debug!("waited for banking_stage");
poh_service.join().unwrap();
sleep(Duration::from_secs(1));
debug!("waited for poh_service");
if let Some(tracer_thread) = tracer_thread {
tracer_thread.join().unwrap().unwrap();
let mut new_bank_time = Measure::start("new_bank");
let new_slot = bank.slot() + 1;
let new_bank = Bank::new_from_parent(bank, &collector, new_slot);
new_bank_time.stop();
let mut insert_time = Measure::start("insert_time");
bank_forks.write().unwrap().insert(new_bank);
bank = bank_forks.read().unwrap().working_bank();
insert_time.stop();
// set cost tracker limits to MAX so it will not filter out TXs
bank.write_cost_tracker().unwrap().set_limits(
std::u64::MAX,
std::u64::MAX,
std::u64::MAX,
);
assert!(poh_recorder.read().unwrap().bank().is_none());
poh_recorder
.write()
.unwrap()
.set_bank_for_test(bank.clone());
assert!(poh_recorder.read().unwrap().bank().is_some());
debug!(
"new_bank_time: {}us insert_time: {}us poh_time: {}us",
new_bank_time.as_us(),
insert_time.as_us(),
poh_time.as_us(),
);
} else {
eprintln!(
"[iteration {}, tx sent {}, slot {} active, bank tx count {}]",
current_iteration_index,
sent,
bank.slot(),
bank.transaction_count(),
);
tx_total_us += duration_as_us(&now.elapsed());
}
// This signature clear may not actually clear the signatures
// in this chunk, but since we rotate between CHUNKS then
// we should clear them by the time we come around again to re-use that chunk.
bank.clear_signatures();
total_us += duration_as_us(&now.elapsed());
total_sent += sent;
if current_iteration_index % num_chunks == 0 {
let last_blockhash = bank.last_blockhash();
for packets_for_single_iteration in all_packets.iter_mut() {
packets_for_single_iteration.refresh_blockhash(last_blockhash);
}
}
}
let _unused = Blockstore::destroy(&ledger_path);
txs_processed += bank_forks
.read()
.unwrap()
.working_bank()
.transaction_count();
debug!("processed: {} base: {}", txs_processed, base_tx_count);
eprintln!("[total_sent: {}, base_tx_count: {}, txs_processed: {}, txs_landed: {}, total_us: {}, tx_total_us: {}]",
total_sent, base_tx_count, txs_processed, (txs_processed - base_tx_count), total_us, tx_total_us);
eprintln!(
"{{'name': 'banking_bench_total', 'median': '{:.2}'}}",
(1000.0 * 1000.0 * total_sent as f64) / (total_us as f64),
);
eprintln!(
"{{'name': 'banking_bench_tx_total', 'median': '{:.2}'}}",
(1000.0 * 1000.0 * total_sent as f64) / (tx_total_us as f64),
);
eprintln!(
"{{'name': 'banking_bench_success_tx_total', 'median': '{:.2}'}}",
(1000.0 * 1000.0 * (txs_processed - base_tx_count) as f64) / (total_us as f64),
);
drop(non_vote_sender);
drop(tpu_vote_sender);
drop(gossip_vote_sender);
exit.store(true, Ordering::Relaxed);
banking_stage.join().unwrap();
debug!("waited for banking_stage");
poh_service.join().unwrap();
sleep(Duration::from_secs(1));
debug!("waited for poh_service");
if let Some(tracer_thread) = tracer_thread {
tracer_thread.join().unwrap().unwrap();
}
}

View File

@ -2,7 +2,7 @@ use {
futures_util::StreamExt,
rand::Rng,
serde_json::{json, Value},
solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path},
solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path_auto_delete},
solana_pubsub_client::{nonblocking, pubsub_client::PubsubClient},
solana_rpc::{
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
@ -233,8 +233,8 @@ fn test_block_subscription() {
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
// setup Blockstore
let ledger_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&ledger_path).unwrap();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let blockstore = Arc::new(blockstore);
// populate ledger with test txs

View File

@ -29,7 +29,7 @@ use {
blockstore::Blockstore,
blockstore_processor::process_entries_for_tests,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
get_tmp_ledger_path,
get_tmp_ledger_path_auto_delete,
},
solana_perf::{
packet::{to_packet_batches, Packet},
@ -83,49 +83,46 @@ fn check_txs(receiver: &Arc<Receiver<WorkingBankEntry>>, ref_tx_count: usize) {
fn bench_consume_buffered(bencher: &mut Bencher) {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100_000);
let bank = Arc::new(Bank::new_for_benches(&genesis_config));
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(
Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger"),
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(
Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger"),
);
let (exit, poh_recorder, poh_service, _signal_receiver) =
create_test_recorder(bank, blockstore, None, None);
let recorder = poh_recorder.read().unwrap().new_recorder();
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
let tx = test_tx();
let transactions = vec![tx; 4194304];
let batches = transactions
.iter()
.filter_map(|transaction| {
let packet = Packet::from_data(None, transaction).ok().unwrap();
DeserializedPacket::new(packet).ok()
})
.collect::<Vec<_>>();
let batches_len = batches.len();
let mut transaction_buffer = UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::from_iter(batches, 2 * batches_len),
ThreadType::Transactions,
);
let (s, _r) = unbounded();
let committer = Committer::new(None, s, Arc::new(PrioritizationFeeCache::new(0u64)));
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
// This tests the performance of buffering packets.
// If the packet buffers are copied, performance will be poor.
bencher.iter(move || {
consumer.consume_buffered_packets(
&bank_start,
&mut transaction_buffer,
&BankingStageStats::default(),
&mut LeaderSlotMetricsTracker::new(0),
);
let (exit, poh_recorder, poh_service, _signal_receiver) =
create_test_recorder(bank, blockstore, None, None);
});
let recorder = poh_recorder.read().unwrap().new_recorder();
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
let tx = test_tx();
let transactions = vec![tx; 4194304];
let batches = transactions
.iter()
.filter_map(|transaction| {
let packet = Packet::from_data(None, transaction).ok().unwrap();
DeserializedPacket::new(packet).ok()
})
.collect::<Vec<_>>();
let batches_len = batches.len();
let mut transaction_buffer = UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::from_iter(batches, 2 * batches_len),
ThreadType::Transactions,
);
let (s, _r) = unbounded();
let committer = Committer::new(None, s, Arc::new(PrioritizationFeeCache::new(0u64)));
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
// This tests the performance of buffering packets.
// If the packet buffers are copied, performance will be poor.
bencher.iter(move || {
consumer.consume_buffered_packets(
&bank_start,
&mut transaction_buffer,
&BankingStageStats::default(),
&mut LeaderSlotMetricsTracker::new(0),
);
});
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}
let _unused = Blockstore::destroy(&ledger_path);
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}
fn make_accounts_txs(txes: usize, mint_keypair: &Keypair, hash: Hash) -> Vec<Transaction> {
@ -279,95 +276,92 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
packet_batches
});
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(
Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
let (exit, poh_recorder, poh_service, signal_receiver) =
create_test_recorder(bank.clone(), blockstore, None, None);
let cluster_info = {
let keypair = Arc::new(Keypair::new());
let node = Node::new_localhost_with_pubkey(&keypair.pubkey());
ClusterInfo::new(node.info, keypair, SocketAddrSpace::Unspecified)
};
let cluster_info = Arc::new(cluster_info);
let (s, _r) = unbounded();
let _banking_stage = BankingStage::new(
BlockProductionMethod::ThreadLocalMultiIterator,
&cluster_info,
&poh_recorder,
non_vote_receiver,
tpu_vote_receiver,
gossip_vote_receiver,
None,
s,
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
);
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(
Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger"),
);
let (exit, poh_recorder, poh_service, signal_receiver) =
create_test_recorder(bank.clone(), blockstore, None, None);
let cluster_info = {
let keypair = Arc::new(Keypair::new());
let node = Node::new_localhost_with_pubkey(&keypair.pubkey());
ClusterInfo::new(node.info, keypair, SocketAddrSpace::Unspecified)
};
let cluster_info = Arc::new(cluster_info);
let (s, _r) = unbounded();
let _banking_stage = BankingStage::new(
BlockProductionMethod::ThreadLocalMultiIterator,
&cluster_info,
&poh_recorder,
non_vote_receiver,
tpu_vote_receiver,
gossip_vote_receiver,
None,
s,
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
);
let chunk_len = verified.len() / CHUNKS;
let mut start = 0;
let chunk_len = verified.len() / CHUNKS;
let mut start = 0;
// This is so that the signal_receiver does not go out of scope after the closure.
// If it is dropped before poh_service, then poh_service will error when
// calling send() on the channel.
let signal_receiver = Arc::new(signal_receiver);
let signal_receiver2 = signal_receiver;
bencher.iter(move || {
let now = Instant::now();
let mut sent = 0;
if let Some(vote_packets) = &vote_packets {
tpu_vote_sender
.send(BankingPacketBatch::new((
vote_packets[start..start + chunk_len].to_vec(),
None,
)))
.unwrap();
gossip_vote_sender
.send(BankingPacketBatch::new((
vote_packets[start..start + chunk_len].to_vec(),
None,
)))
.unwrap();
}
for v in verified[start..start + chunk_len].chunks(chunk_len / num_threads) {
debug!(
"sending... {}..{} {} v.len: {}",
start,
start + chunk_len,
timestamp(),
v.len(),
);
for xv in v {
sent += xv.len();
}
non_vote_sender
.send(BankingPacketBatch::new((v.to_vec(), None)))
.unwrap();
}
check_txs(&signal_receiver2, txes / CHUNKS);
// This signature clear may not actually clear the signatures
// in this chunk, but since we rotate between CHUNKS then
// we should clear them by the time we come around again to re-use that chunk.
bank.clear_signatures();
trace!(
"time: {} checked: {} sent: {}",
duration_as_us(&now.elapsed()),
txes / CHUNKS,
sent,
// This is so that the signal_receiver does not go out of scope after the closure.
// If it is dropped before poh_service, then poh_service will error when
// calling send() on the channel.
let signal_receiver = Arc::new(signal_receiver);
let signal_receiver2 = signal_receiver;
bencher.iter(move || {
let now = Instant::now();
let mut sent = 0;
if let Some(vote_packets) = &vote_packets {
tpu_vote_sender
.send(BankingPacketBatch::new((
vote_packets[start..start + chunk_len].to_vec(),
None,
)))
.unwrap();
gossip_vote_sender
.send(BankingPacketBatch::new((
vote_packets[start..start + chunk_len].to_vec(),
None,
)))
.unwrap();
}
for v in verified[start..start + chunk_len].chunks(chunk_len / num_threads) {
debug!(
"sending... {}..{} {} v.len: {}",
start,
start + chunk_len,
timestamp(),
v.len(),
);
start += chunk_len;
start %= verified.len();
});
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}
let _unused = Blockstore::destroy(&ledger_path);
for xv in v {
sent += xv.len();
}
non_vote_sender
.send(BankingPacketBatch::new((v.to_vec(), None)))
.unwrap();
}
check_txs(&signal_receiver2, txes / CHUNKS);
// This signature clear may not actually clear the signatures
// in this chunk, but since we rotate between CHUNKS then
// we should clear them by the time we come around again to re-use that chunk.
bank.clear_signatures();
trace!(
"time: {} checked: {} sent: {}",
duration_as_us(&now.elapsed()),
txes / CHUNKS,
sent,
);
start += chunk_len;
start %= verified.len();
});
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}
#[bench]

View File

@ -1518,7 +1518,7 @@ pub mod test {
vote_simulator::VoteSimulator,
},
itertools::Itertools,
solana_ledger::{blockstore::make_slot_entries, get_tmp_ledger_path},
solana_ledger::{blockstore::make_slot_entries, get_tmp_ledger_path_auto_delete},
solana_runtime::bank::Bank,
solana_sdk::{
account::{Account, AccountSharedData, ReadableAccount, WritableAccount},
@ -2928,36 +2928,33 @@ pub mod test {
#[test]
fn test_reconcile_blockstore_roots_with_tower_normal() {
solana_logger::setup();
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let (shreds, _) = make_slot_entries(1, 0, 42, /*merkle_variant:*/ true);
blockstore.insert_shreds(shreds, None, false).unwrap();
let (shreds, _) = make_slot_entries(3, 1, 42, /*merkle_variant:*/ true);
blockstore.insert_shreds(shreds, None, false).unwrap();
let (shreds, _) = make_slot_entries(4, 1, 42, /*merkle_variant:*/ true);
blockstore.insert_shreds(shreds, None, false).unwrap();
assert!(!blockstore.is_root(0));
assert!(!blockstore.is_root(1));
assert!(!blockstore.is_root(3));
assert!(!blockstore.is_root(4));
let (shreds, _) = make_slot_entries(1, 0, 42, /*merkle_variant:*/ true);
blockstore.insert_shreds(shreds, None, false).unwrap();
let (shreds, _) = make_slot_entries(3, 1, 42, /*merkle_variant:*/ true);
blockstore.insert_shreds(shreds, None, false).unwrap();
let (shreds, _) = make_slot_entries(4, 1, 42, /*merkle_variant:*/ true);
blockstore.insert_shreds(shreds, None, false).unwrap();
assert!(!blockstore.is_root(0));
assert!(!blockstore.is_root(1));
assert!(!blockstore.is_root(3));
assert!(!blockstore.is_root(4));
let mut tower = Tower::default();
tower.vote_state.root_slot = Some(4);
reconcile_blockstore_roots_with_external_source(
ExternalRootSource::Tower(tower.root()),
&blockstore,
&mut blockstore.last_root(),
)
.unwrap();
let mut tower = Tower::default();
tower.vote_state.root_slot = Some(4);
reconcile_blockstore_roots_with_external_source(
ExternalRootSource::Tower(tower.root()),
&blockstore,
&mut blockstore.last_root(),
)
.unwrap();
assert!(!blockstore.is_root(0));
assert!(blockstore.is_root(1));
assert!(!blockstore.is_root(3));
assert!(blockstore.is_root(4));
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
assert!(!blockstore.is_root(0));
assert!(blockstore.is_root(1));
assert!(!blockstore.is_root(3));
assert!(blockstore.is_root(4));
}
#[test]
@ -2966,61 +2963,55 @@ pub mod test {
external root (Tower(4))!?")]
fn test_reconcile_blockstore_roots_with_tower_panic_no_common_root() {
solana_logger::setup();
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let (shreds, _) = make_slot_entries(1, 0, 42, /*merkle_variant:*/ true);
blockstore.insert_shreds(shreds, None, false).unwrap();
let (shreds, _) = make_slot_entries(3, 1, 42, /*merkle_variant:*/ true);
blockstore.insert_shreds(shreds, None, false).unwrap();
let (shreds, _) = make_slot_entries(4, 1, 42, /*merkle_variant:*/ true);
blockstore.insert_shreds(shreds, None, false).unwrap();
blockstore.set_roots(std::iter::once(&3)).unwrap();
assert!(!blockstore.is_root(0));
assert!(!blockstore.is_root(1));
assert!(blockstore.is_root(3));
assert!(!blockstore.is_root(4));
let (shreds, _) = make_slot_entries(1, 0, 42, /*merkle_variant:*/ true);
blockstore.insert_shreds(shreds, None, false).unwrap();
let (shreds, _) = make_slot_entries(3, 1, 42, /*merkle_variant:*/ true);
blockstore.insert_shreds(shreds, None, false).unwrap();
let (shreds, _) = make_slot_entries(4, 1, 42, /*merkle_variant:*/ true);
blockstore.insert_shreds(shreds, None, false).unwrap();
blockstore.set_roots(std::iter::once(&3)).unwrap();
assert!(!blockstore.is_root(0));
assert!(!blockstore.is_root(1));
assert!(blockstore.is_root(3));
assert!(!blockstore.is_root(4));
let mut tower = Tower::default();
tower.vote_state.root_slot = Some(4);
reconcile_blockstore_roots_with_external_source(
ExternalRootSource::Tower(tower.root()),
&blockstore,
&mut blockstore.last_root(),
)
.unwrap();
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
let mut tower = Tower::default();
tower.vote_state.root_slot = Some(4);
reconcile_blockstore_roots_with_external_source(
ExternalRootSource::Tower(tower.root()),
&blockstore,
&mut blockstore.last_root(),
)
.unwrap();
}
#[test]
fn test_reconcile_blockstore_roots_with_tower_nop_no_parent() {
solana_logger::setup();
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let (shreds, _) = make_slot_entries(1, 0, 42, /*merkle_variant:*/ true);
blockstore.insert_shreds(shreds, None, false).unwrap();
let (shreds, _) = make_slot_entries(3, 1, 42, /*merkle_variant:*/ true);
blockstore.insert_shreds(shreds, None, false).unwrap();
assert!(!blockstore.is_root(0));
assert!(!blockstore.is_root(1));
assert!(!blockstore.is_root(3));
let (shreds, _) = make_slot_entries(1, 0, 42, /*merkle_variant:*/ true);
blockstore.insert_shreds(shreds, None, false).unwrap();
let (shreds, _) = make_slot_entries(3, 1, 42, /*merkle_variant:*/ true);
blockstore.insert_shreds(shreds, None, false).unwrap();
assert!(!blockstore.is_root(0));
assert!(!blockstore.is_root(1));
assert!(!blockstore.is_root(3));
let mut tower = Tower::default();
tower.vote_state.root_slot = Some(4);
assert_eq!(blockstore.last_root(), 0);
reconcile_blockstore_roots_with_external_source(
ExternalRootSource::Tower(tower.root()),
&blockstore,
&mut blockstore.last_root(),
)
.unwrap();
assert_eq!(blockstore.last_root(), 0);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
let mut tower = Tower::default();
tower.vote_state.root_slot = Some(4);
assert_eq!(blockstore.last_root(), 0);
reconcile_blockstore_roots_with_external_source(
ExternalRootSource::Tower(tower.root()),
&blockstore,
&mut blockstore.last_root(),
)
.unwrap();
assert_eq!(blockstore.last_root(), 0);
}
#[test]

View File

@ -918,7 +918,10 @@ mod test {
cluster_info::{ClusterInfo, Node},
contact_info::{ContactInfo, Protocol},
},
solana_ledger::{blockstore::make_many_slot_entries, get_tmp_ledger_path, shred::Nonce},
solana_ledger::{
blockstore::make_many_slot_entries, get_tmp_ledger_path,
get_tmp_ledger_path_auto_delete, shred::Nonce,
},
solana_runtime::{accounts_background_service::AbsRequestSender, bank_forks::BankForks},
solana_sdk::{
hash::Hash,
@ -1938,8 +1941,8 @@ mod test {
..
} = ManageAncestorHashesState::new(bank_forks);
let ledger_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&ledger_path).unwrap();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
// Create invalid packet with fewer bytes than the size of the nonce
let mut packet = Packet::default();

View File

@ -864,7 +864,7 @@ mod test {
make_chaining_slot_entries, make_many_slot_entries, make_slot_entries, Blockstore,
},
genesis_utils::{create_genesis_config, GenesisConfigInfo},
get_tmp_ledger_path,
get_tmp_ledger_path_auto_delete,
shred::max_ticks_per_n_shreds,
},
solana_runtime::bank::Bank,
@ -884,289 +884,270 @@ mod test {
#[test]
pub fn test_repair_orphan() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
// Create some orphan slots
let (mut shreds, _) = make_slot_entries(1, 0, 1, /*merkle_variant:*/ true);
let (shreds2, _) = make_slot_entries(5, 2, 1, /*merkle_variant:*/ true);
shreds.extend(shreds2);
blockstore.insert_shreds(shreds, None, false).unwrap();
let mut repair_weight = RepairWeight::new(0);
assert_eq!(
repair_weight.get_best_weighted_repairs(
&blockstore,
&HashMap::new(),
&EpochSchedule::default(),
MAX_ORPHANS,
MAX_REPAIR_LENGTH,
MAX_UNKNOWN_LAST_INDEX_REPAIRS,
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
),
vec![
ShredRepairType::Orphan(2),
ShredRepairType::HighestShred(0, 0)
]
);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
// Create some orphan slots
let (mut shreds, _) = make_slot_entries(1, 0, 1, /*merkle_variant:*/ true);
let (shreds2, _) = make_slot_entries(5, 2, 1, /*merkle_variant:*/ true);
shreds.extend(shreds2);
blockstore.insert_shreds(shreds, None, false).unwrap();
let mut repair_weight = RepairWeight::new(0);
assert_eq!(
repair_weight.get_best_weighted_repairs(
&blockstore,
&HashMap::new(),
&EpochSchedule::default(),
MAX_ORPHANS,
MAX_REPAIR_LENGTH,
MAX_UNKNOWN_LAST_INDEX_REPAIRS,
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
),
vec![
ShredRepairType::Orphan(2),
ShredRepairType::HighestShred(0, 0)
]
);
}
#[test]
pub fn test_repair_empty_slot() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let (shreds, _) = make_slot_entries(2, 0, 1, /*merkle_variant:*/ true);
let (shreds, _) = make_slot_entries(2, 0, 1, /*merkle_variant:*/ true);
// Write this shred to slot 2, should chain to slot 0, which we haven't received
// any shreds for
blockstore.insert_shreds(shreds, None, false).unwrap();
let mut repair_weight = RepairWeight::new(0);
// Write this shred to slot 2, should chain to slot 0, which we haven't received
// any shreds for
blockstore.insert_shreds(shreds, None, false).unwrap();
let mut repair_weight = RepairWeight::new(0);
// Check that repair tries to patch the empty slot
assert_eq!(
repair_weight.get_best_weighted_repairs(
&blockstore,
&HashMap::new(),
&EpochSchedule::default(),
MAX_ORPHANS,
MAX_REPAIR_LENGTH,
MAX_UNKNOWN_LAST_INDEX_REPAIRS,
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
),
vec![ShredRepairType::HighestShred(0, 0)]
);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
// Check that repair tries to patch the empty slot
assert_eq!(
repair_weight.get_best_weighted_repairs(
&blockstore,
&HashMap::new(),
&EpochSchedule::default(),
MAX_ORPHANS,
MAX_REPAIR_LENGTH,
MAX_UNKNOWN_LAST_INDEX_REPAIRS,
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
),
vec![ShredRepairType::HighestShred(0, 0)]
);
}
#[test]
pub fn test_generate_repairs() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let nth = 3;
let num_slots = 2;
let nth = 3;
let num_slots = 2;
// Create some shreds
let (mut shreds, _) = make_many_slot_entries(0, num_slots, 150);
let num_shreds = shreds.len() as u64;
let num_shreds_per_slot = num_shreds / num_slots;
// Create some shreds
let (mut shreds, _) = make_many_slot_entries(0, num_slots, 150);
let num_shreds = shreds.len() as u64;
let num_shreds_per_slot = num_shreds / num_slots;
// write every nth shred
let mut shreds_to_write = vec![];
let mut missing_indexes_per_slot = vec![];
for i in (0..num_shreds).rev() {
let index = i % num_shreds_per_slot;
// get_best_repair_shreds only returns missing shreds in
// between shreds received; So this should either insert the
// last shred in each slot, or exclude missing shreds after the
// last inserted shred from expected repairs.
if index % nth == 0 || index + 1 == num_shreds_per_slot {
shreds_to_write.insert(0, shreds.remove(i as usize));
} else if i < num_shreds_per_slot {
missing_indexes_per_slot.insert(0, index);
}
// write every nth shred
let mut shreds_to_write = vec![];
let mut missing_indexes_per_slot = vec![];
for i in (0..num_shreds).rev() {
let index = i % num_shreds_per_slot;
// get_best_repair_shreds only returns missing shreds in
// between shreds received; So this should either insert the
// last shred in each slot, or exclude missing shreds after the
// last inserted shred from expected repairs.
if index % nth == 0 || index + 1 == num_shreds_per_slot {
shreds_to_write.insert(0, shreds.remove(i as usize));
} else if i < num_shreds_per_slot {
missing_indexes_per_slot.insert(0, index);
}
blockstore
.insert_shreds(shreds_to_write, None, false)
.unwrap();
let expected: Vec<ShredRepairType> = (0..num_slots)
.flat_map(|slot| {
missing_indexes_per_slot
.iter()
.map(move |shred_index| ShredRepairType::Shred(slot, *shred_index))
})
.collect();
let mut repair_weight = RepairWeight::new(0);
sleep_shred_deferment_period();
assert_eq!(
repair_weight.get_best_weighted_repairs(
&blockstore,
&HashMap::new(),
&EpochSchedule::default(),
MAX_ORPHANS,
MAX_REPAIR_LENGTH,
MAX_UNKNOWN_LAST_INDEX_REPAIRS,
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
),
expected
);
assert_eq!(
repair_weight.get_best_weighted_repairs(
&blockstore,
&HashMap::new(),
&EpochSchedule::default(),
MAX_ORPHANS,
expected.len() - 2,
MAX_UNKNOWN_LAST_INDEX_REPAIRS,
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
)[..],
expected[0..expected.len() - 2]
);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
blockstore
.insert_shreds(shreds_to_write, None, false)
.unwrap();
let expected: Vec<ShredRepairType> = (0..num_slots)
.flat_map(|slot| {
missing_indexes_per_slot
.iter()
.map(move |shred_index| ShredRepairType::Shred(slot, *shred_index))
})
.collect();
let mut repair_weight = RepairWeight::new(0);
sleep_shred_deferment_period();
assert_eq!(
repair_weight.get_best_weighted_repairs(
&blockstore,
&HashMap::new(),
&EpochSchedule::default(),
MAX_ORPHANS,
MAX_REPAIR_LENGTH,
MAX_UNKNOWN_LAST_INDEX_REPAIRS,
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
),
expected
);
assert_eq!(
repair_weight.get_best_weighted_repairs(
&blockstore,
&HashMap::new(),
&EpochSchedule::default(),
MAX_ORPHANS,
expected.len() - 2,
MAX_UNKNOWN_LAST_INDEX_REPAIRS,
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
)[..],
expected[0..expected.len() - 2]
);
}
#[test]
pub fn test_generate_highest_repair() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let num_entries_per_slot = 100;
let num_entries_per_slot = 100;
// Create some shreds
let (mut shreds, _) = make_slot_entries(
0, // slot
0, // parent_slot
num_entries_per_slot as u64,
true, // merkle_variant
);
let num_shreds_per_slot = shreds.len() as u64;
// Create some shreds
let (mut shreds, _) = make_slot_entries(
0, // slot
0, // parent_slot
num_entries_per_slot as u64,
true, // merkle_variant
);
let num_shreds_per_slot = shreds.len() as u64;
// Remove last shred (which is also last in slot) so that slot is not complete
shreds.pop();
// Remove last shred (which is also last in slot) so that slot is not complete
shreds.pop();
blockstore.insert_shreds(shreds, None, false).unwrap();
blockstore.insert_shreds(shreds, None, false).unwrap();
// We didn't get the last shred for this slot, so ask for the highest shred for that slot
let expected: Vec<ShredRepairType> =
vec![ShredRepairType::HighestShred(0, num_shreds_per_slot - 1)];
// We didn't get the last shred for this slot, so ask for the highest shred for that slot
let expected: Vec<ShredRepairType> =
vec![ShredRepairType::HighestShred(0, num_shreds_per_slot - 1)];
sleep_shred_deferment_period();
let mut repair_weight = RepairWeight::new(0);
assert_eq!(
repair_weight.get_best_weighted_repairs(
&blockstore,
&HashMap::new(),
&EpochSchedule::default(),
MAX_ORPHANS,
MAX_REPAIR_LENGTH,
MAX_UNKNOWN_LAST_INDEX_REPAIRS,
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
),
expected
);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
sleep_shred_deferment_period();
let mut repair_weight = RepairWeight::new(0);
assert_eq!(
repair_weight.get_best_weighted_repairs(
&blockstore,
&HashMap::new(),
&EpochSchedule::default(),
MAX_ORPHANS,
MAX_REPAIR_LENGTH,
MAX_UNKNOWN_LAST_INDEX_REPAIRS,
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
),
expected
);
}
#[test]
pub fn test_repair_range() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let slots: Vec<u64> = vec![1, 3, 5, 7, 8];
let num_entries_per_slot = max_ticks_per_n_shreds(1, None) + 1;
let slots: Vec<u64> = vec![1, 3, 5, 7, 8];
let num_entries_per_slot = max_ticks_per_n_shreds(1, None) + 1;
let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot);
for (mut slot_shreds, _) in shreds.into_iter() {
slot_shreds.remove(0);
blockstore.insert_shreds(slot_shreds, None, false).unwrap();
}
let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot);
for (mut slot_shreds, _) in shreds.into_iter() {
slot_shreds.remove(0);
blockstore.insert_shreds(slot_shreds, None, false).unwrap();
}
// Iterate through all possible combinations of start..end (inclusive on both
// sides of the range)
for start in 0..slots.len() {
for end in start..slots.len() {
let repair_slot_range = RepairSlotRange {
start: slots[start],
end: slots[end],
};
let expected: Vec<ShredRepairType> = (repair_slot_range.start
..=repair_slot_range.end)
.map(|slot_index| {
if slots.contains(&slot_index) {
ShredRepairType::Shred(slot_index, 0)
} else {
ShredRepairType::HighestShred(slot_index, 0)
}
})
.collect();
// Iterate through all possible combinations of start..end (inclusive on both
// sides of the range)
for start in 0..slots.len() {
for end in start..slots.len() {
let repair_slot_range = RepairSlotRange {
start: slots[start],
end: slots[end],
};
let expected: Vec<ShredRepairType> = (repair_slot_range.start
..=repair_slot_range.end)
.map(|slot_index| {
if slots.contains(&slot_index) {
ShredRepairType::Shred(slot_index, 0)
} else {
ShredRepairType::HighestShred(slot_index, 0)
}
})
.collect();
sleep_shred_deferment_period();
assert_eq!(
RepairService::generate_repairs_in_range(
&blockstore,
std::usize::MAX,
&repair_slot_range,
),
expected
);
}
sleep_shred_deferment_period();
assert_eq!(
RepairService::generate_repairs_in_range(
&blockstore,
std::usize::MAX,
&repair_slot_range,
),
expected
);
}
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_repair_range_highest() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let num_entries_per_slot = 10;
let num_entries_per_slot = 10;
let num_slots = 1;
let start = 5;
let num_slots = 1;
let start = 5;
// Create some shreds in slots 0..num_slots
for i in start..start + num_slots {
let parent = if i > 0 { i - 1 } else { 0 };
let (shreds, _) = make_slot_entries(
i, // slot
parent,
num_entries_per_slot as u64,
true, // merkle_variant
);
blockstore.insert_shreds(shreds, None, false).unwrap();
}
let end = 4;
let expected: Vec<ShredRepairType> = vec![
ShredRepairType::HighestShred(end - 2, 0),
ShredRepairType::HighestShred(end - 1, 0),
ShredRepairType::HighestShred(end, 0),
];
let repair_slot_range = RepairSlotRange { start: 2, end };
assert_eq!(
RepairService::generate_repairs_in_range(
&blockstore,
std::usize::MAX,
&repair_slot_range,
),
expected
// Create some shreds in slots 0..num_slots
for i in start..start + num_slots {
let parent = if i > 0 { i - 1 } else { 0 };
let (shreds, _) = make_slot_entries(
i, // slot
parent,
num_entries_per_slot as u64,
true, // merkle_variant
);
blockstore.insert_shreds(shreds, None, false).unwrap();
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
let end = 4;
let expected: Vec<ShredRepairType> = vec![
ShredRepairType::HighestShred(end - 2, 0),
ShredRepairType::HighestShred(end - 1, 0),
ShredRepairType::HighestShred(end, 0),
];
let repair_slot_range = RepairSlotRange { start: 2, end };
assert_eq!(
RepairService::generate_repairs_in_range(
&blockstore,
std::usize::MAX,
&repair_slot_range,
),
expected
);
}
#[test]
pub fn test_generate_duplicate_repairs_for_slot() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let dead_slot = 9;
// SlotMeta doesn't exist, should make no repairs
@ -1203,8 +1184,8 @@ mod test {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let cluster_slots = ClusterSlots::default();
let cluster_info = Arc::new(new_test_cluster_info());
let identity_keypair = cluster_info.keypair().clone();

View File

@ -1423,7 +1423,7 @@ mod tests {
blockstore::make_many_slot_entries,
blockstore_processor::fill_blockstore_slot_with_ticks,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
get_tmp_ledger_path,
get_tmp_ledger_path_auto_delete,
shred::{max_ticks_per_n_shreds, Shred, ShredFlags},
},
solana_perf::packet::{deserialize_from_with_limit, Packet},
@ -1853,64 +1853,60 @@ mod tests {
fn run_highest_window_request(slot: Slot, num_slots: u64, nonce: Nonce) {
let recycler = PacketBatchRecycler::default();
solana_logger::setup();
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let rv = ServeRepair::run_highest_window_request(
&recycler,
&socketaddr_any!(),
&blockstore,
0,
0,
nonce,
);
assert!(rv.is_none());
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let rv = ServeRepair::run_highest_window_request(
&recycler,
&socketaddr_any!(),
&blockstore,
0,
0,
nonce,
);
assert!(rv.is_none());
let _ = fill_blockstore_slot_with_ticks(
&blockstore,
max_ticks_per_n_shreds(1, None) + 1,
slot,
slot - num_slots + 1,
Hash::default(),
);
let _ = fill_blockstore_slot_with_ticks(
&blockstore,
max_ticks_per_n_shreds(1, None) + 1,
slot,
slot - num_slots + 1,
Hash::default(),
);
let index = 1;
let rv = ServeRepair::run_highest_window_request(
&recycler,
&socketaddr_any!(),
&blockstore,
slot,
index,
nonce,
)
.expect("packets");
let request = ShredRepairType::HighestShred(slot, index);
verify_responses(&request, rv.iter());
let index = 1;
let rv = ServeRepair::run_highest_window_request(
&recycler,
&socketaddr_any!(),
&blockstore,
slot,
index,
nonce,
)
.expect("packets");
let request = ShredRepairType::HighestShred(slot, index);
verify_responses(&request, rv.iter());
let rv: Vec<Shred> = rv
.into_iter()
.filter_map(|p| {
assert_eq!(repair_response::nonce(p).unwrap(), nonce);
Shred::new_from_serialized_shred(p.data(..).unwrap().to_vec()).ok()
})
.collect();
assert!(!rv.is_empty());
let index = blockstore.meta(slot).unwrap().unwrap().received - 1;
assert_eq!(rv[0].index(), index as u32);
assert_eq!(rv[0].slot(), slot);
let rv: Vec<Shred> = rv
.into_iter()
.filter_map(|p| {
assert_eq!(repair_response::nonce(p).unwrap(), nonce);
Shred::new_from_serialized_shred(p.data(..).unwrap().to_vec()).ok()
})
.collect();
assert!(!rv.is_empty());
let index = blockstore.meta(slot).unwrap().unwrap().received - 1;
assert_eq!(rv[0].index(), index as u32);
assert_eq!(rv[0].slot(), slot);
let rv = ServeRepair::run_highest_window_request(
&recycler,
&socketaddr_any!(),
&blockstore,
slot,
index + 1,
nonce,
);
assert!(rv.is_none());
}
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
let rv = ServeRepair::run_highest_window_request(
&recycler,
&socketaddr_any!(),
&blockstore,
slot,
index + 1,
nonce,
);
assert!(rv.is_none());
}
#[test]
@ -1922,48 +1918,44 @@ mod tests {
fn run_window_request(slot: Slot, nonce: Nonce) {
let recycler = PacketBatchRecycler::default();
solana_logger::setup();
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let rv = ServeRepair::run_window_request(
&recycler,
&socketaddr_any!(),
&blockstore,
slot,
0,
nonce,
);
assert!(rv.is_none());
let shred = Shred::new_from_data(slot, 1, 1, &[], ShredFlags::empty(), 0, 2, 0);
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let rv = ServeRepair::run_window_request(
&recycler,
&socketaddr_any!(),
&blockstore,
slot,
0,
nonce,
);
assert!(rv.is_none());
let shred = Shred::new_from_data(slot, 1, 1, &[], ShredFlags::empty(), 0, 2, 0);
blockstore
.insert_shreds(vec![shred], None, false)
.expect("Expect successful ledger write");
blockstore
.insert_shreds(vec![shred], None, false)
.expect("Expect successful ledger write");
let index = 1;
let rv = ServeRepair::run_window_request(
&recycler,
&socketaddr_any!(),
&blockstore,
slot,
index,
nonce,
)
.expect("packets");
let request = ShredRepairType::Shred(slot, index);
verify_responses(&request, rv.iter());
let rv: Vec<Shred> = rv
.into_iter()
.filter_map(|p| {
assert_eq!(repair_response::nonce(p).unwrap(), nonce);
Shred::new_from_serialized_shred(p.data(..).unwrap().to_vec()).ok()
})
.collect();
assert_eq!(rv[0].index(), 1);
assert_eq!(rv[0].slot(), slot);
}
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
let index = 1;
let rv = ServeRepair::run_window_request(
&recycler,
&socketaddr_any!(),
&blockstore,
slot,
index,
nonce,
)
.expect("packets");
let request = ShredRepairType::Shred(slot, index);
verify_responses(&request, rv.iter());
let rv: Vec<Shred> = rv
.into_iter()
.filter_map(|p| {
assert_eq!(repair_response::nonce(p).unwrap(), nonce);
Shred::new_from_serialized_shred(p.data(..).unwrap().to_vec()).ok()
})
.collect();
assert_eq!(rv[0].index(), 1);
assert_eq!(rv[0].slot(), slot);
}
fn new_test_cluster_info() -> ClusterInfo {
@ -2095,122 +2087,114 @@ mod tests {
fn run_orphan(slot: Slot, num_slots: u64, nonce: Nonce) {
solana_logger::setup();
let recycler = PacketBatchRecycler::default();
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let rv =
ServeRepair::run_orphan(&recycler, &socketaddr_any!(), &blockstore, slot, 0, nonce);
assert!(rv.is_none());
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let rv =
ServeRepair::run_orphan(&recycler, &socketaddr_any!(), &blockstore, slot, 0, nonce);
assert!(rv.is_none());
// Create slots [slot, slot + num_slots) with 5 shreds apiece
let (shreds, _) = make_many_slot_entries(slot, num_slots, 5);
// Create slots [slot, slot + num_slots) with 5 shreds apiece
let (shreds, _) = make_many_slot_entries(slot, num_slots, 5);
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");
// We don't have slot `slot + num_slots`, so we don't know how to service this request
let rv = ServeRepair::run_orphan(
&recycler,
&socketaddr_any!(),
&blockstore,
slot + num_slots,
5,
nonce,
);
assert!(rv.is_none());
// We don't have slot `slot + num_slots`, so we don't know how to service this request
let rv = ServeRepair::run_orphan(
&recycler,
&socketaddr_any!(),
&blockstore,
slot + num_slots,
5,
nonce,
);
assert!(rv.is_none());
// For a orphan request for `slot + num_slots - 1`, we should return the highest shreds
// from slots in the range [slot, slot + num_slots - 1]
let rv: Vec<_> = ServeRepair::run_orphan(
&recycler,
&socketaddr_any!(),
&blockstore,
slot + num_slots - 1,
5,
nonce,
)
.expect("run_orphan packets")
.iter()
.cloned()
// For a orphan request for `slot + num_slots - 1`, we should return the highest shreds
// from slots in the range [slot, slot + num_slots - 1]
let rv: Vec<_> = ServeRepair::run_orphan(
&recycler,
&socketaddr_any!(),
&blockstore,
slot + num_slots - 1,
5,
nonce,
)
.expect("run_orphan packets")
.iter()
.cloned()
.collect();
// Verify responses
let request = ShredRepairType::Orphan(slot);
verify_responses(&request, rv.iter());
let expected: Vec<_> = (slot..slot + num_slots)
.rev()
.filter_map(|slot| {
let index = blockstore.meta(slot).unwrap().unwrap().received - 1;
repair_response::repair_response_packet(
&blockstore,
slot,
index,
&socketaddr_any!(),
nonce,
)
})
.collect();
// Verify responses
let request = ShredRepairType::Orphan(slot);
verify_responses(&request, rv.iter());
let expected: Vec<_> = (slot..slot + num_slots)
.rev()
.filter_map(|slot| {
let index = blockstore.meta(slot).unwrap().unwrap().received - 1;
repair_response::repair_response_packet(
&blockstore,
slot,
index,
&socketaddr_any!(),
nonce,
)
})
.collect();
assert_eq!(rv, expected);
}
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
assert_eq!(rv, expected);
}
#[test]
fn run_orphan_corrupted_shred_size() {
solana_logger::setup();
let recycler = PacketBatchRecycler::default();
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
// Create slots [1, 2] with 1 shred apiece
let (mut shreds, _) = make_many_slot_entries(1, 2, 1);
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
// Create slots [1, 2] with 1 shred apiece
let (mut shreds, _) = make_many_slot_entries(1, 2, 1);
assert_eq!(shreds[0].slot(), 1);
assert_eq!(shreds[0].index(), 0);
// TODO: The test previously relied on corrupting shred payload
// size which we no longer want to expose. Current test no longer
// covers packet size check in repair_response_packet_from_bytes.
shreds.remove(0);
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");
let nonce = 42;
// Make sure repair response is corrupted
assert!(repair_response::repair_response_packet(
&blockstore,
1,
0,
&socketaddr_any!(),
nonce,
)
.is_none());
assert_eq!(shreds[0].slot(), 1);
assert_eq!(shreds[0].index(), 0);
// TODO: The test previously relied on corrupting shred payload
// size which we no longer want to expose. Current test no longer
// covers packet size check in repair_response_packet_from_bytes.
shreds.remove(0);
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");
let nonce = 42;
// Make sure repair response is corrupted
assert!(repair_response::repair_response_packet(
&blockstore,
1,
0,
&socketaddr_any!(),
nonce,
)
.is_none());
// Orphan request for slot 2 should only return slot 1 since
// calling `repair_response_packet` on slot 1's shred will
// be corrupted
let rv: Vec<_> =
ServeRepair::run_orphan(&recycler, &socketaddr_any!(), &blockstore, 2, 5, nonce)
.expect("run_orphan packets")
.iter()
.cloned()
.collect();
// Orphan request for slot 2 should only return slot 1 since
// calling `repair_response_packet` on slot 1's shred will
// be corrupted
let rv: Vec<_> =
ServeRepair::run_orphan(&recycler, &socketaddr_any!(), &blockstore, 2, 5, nonce)
.expect("run_orphan packets")
.iter()
.cloned()
.collect();
// Verify responses
let expected = vec![repair_response::repair_response_packet(
&blockstore,
2,
0,
&socketaddr_any!(),
nonce,
)
.unwrap()];
assert_eq!(rv, expected);
}
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
// Verify responses
let expected = vec![repair_response::repair_response_packet(
&blockstore,
2,
0,
&socketaddr_any!(),
nonce,
)
.unwrap()];
assert_eq!(rv, expected);
}
#[test]
@ -2223,95 +2207,92 @@ mod tests {
solana_logger::setup();
let recycler = PacketBatchRecycler::default();
let ledger_path = get_tmp_ledger_path!();
{
let slot = 0;
let num_slots = MAX_ANCESTOR_RESPONSES as u64;
let nonce = 10;
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let slot = 0;
let num_slots = MAX_ANCESTOR_RESPONSES as u64;
let nonce = 10;
// Create slots [slot, slot + num_slots) with 5 shreds apiece
let (shreds, _) = make_many_slot_entries(slot, num_slots, 5);
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");
// Create slots [slot, slot + num_slots) with 5 shreds apiece
let (shreds, _) = make_many_slot_entries(slot, num_slots, 5);
// We don't have slot `slot + num_slots`, so we return empty
let rv = ServeRepair::run_ancestor_hashes(
&recycler,
&socketaddr_any!(),
&blockstore,
slot + num_slots,
nonce,
)
.expect("run_ancestor_hashes packets");
assert_eq!(rv.len(), 1);
let packet = &rv[0];
let ancestor_hashes_response = deserialize_ancestor_hashes_response(packet);
match ancestor_hashes_response {
AncestorHashesResponse::Hashes(hashes) => {
assert!(hashes.is_empty());
}
_ => {
panic!("unexpected response: {:?}", &ancestor_hashes_response);
}
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");
// We don't have slot `slot + num_slots`, so we return empty
let rv = ServeRepair::run_ancestor_hashes(
&recycler,
&socketaddr_any!(),
&blockstore,
slot + num_slots,
nonce,
)
.expect("run_ancestor_hashes packets");
assert_eq!(rv.len(), 1);
let packet = &rv[0];
let ancestor_hashes_response = deserialize_ancestor_hashes_response(packet);
match ancestor_hashes_response {
AncestorHashesResponse::Hashes(hashes) => {
assert!(hashes.is_empty());
}
// `slot + num_slots - 1` is not marked duplicate confirmed so nothing should return
// empty
let rv = ServeRepair::run_ancestor_hashes(
&recycler,
&socketaddr_any!(),
&blockstore,
slot + num_slots - 1,
nonce,
)
.expect("run_ancestor_hashes packets");
assert_eq!(rv.len(), 1);
let packet = &rv[0];
let ancestor_hashes_response = deserialize_ancestor_hashes_response(packet);
match ancestor_hashes_response {
AncestorHashesResponse::Hashes(hashes) => {
assert!(hashes.is_empty());
}
_ => {
panic!("unexpected response: {:?}", &ancestor_hashes_response);
}
}
// Set duplicate confirmed
let mut expected_ancestors = Vec::with_capacity(num_slots as usize);
expected_ancestors.resize(num_slots as usize, (0, Hash::default()));
for (i, duplicate_confirmed_slot) in (slot..slot + num_slots).enumerate() {
let frozen_hash = Hash::new_unique();
expected_ancestors[num_slots as usize - i - 1] =
(duplicate_confirmed_slot, frozen_hash);
blockstore.insert_bank_hash(duplicate_confirmed_slot, frozen_hash, true);
}
let rv = ServeRepair::run_ancestor_hashes(
&recycler,
&socketaddr_any!(),
&blockstore,
slot + num_slots - 1,
nonce,
)
.expect("run_ancestor_hashes packets");
assert_eq!(rv.len(), 1);
let packet = &rv[0];
let ancestor_hashes_response = deserialize_ancestor_hashes_response(packet);
match ancestor_hashes_response {
AncestorHashesResponse::Hashes(hashes) => {
assert_eq!(hashes, expected_ancestors);
}
_ => {
panic!("unexpected response: {:?}", &ancestor_hashes_response);
}
_ => {
panic!("unexpected response: {:?}", &ancestor_hashes_response);
}
}
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
// `slot + num_slots - 1` is not marked duplicate confirmed so nothing should return
// empty
let rv = ServeRepair::run_ancestor_hashes(
&recycler,
&socketaddr_any!(),
&blockstore,
slot + num_slots - 1,
nonce,
)
.expect("run_ancestor_hashes packets");
assert_eq!(rv.len(), 1);
let packet = &rv[0];
let ancestor_hashes_response = deserialize_ancestor_hashes_response(packet);
match ancestor_hashes_response {
AncestorHashesResponse::Hashes(hashes) => {
assert!(hashes.is_empty());
}
_ => {
panic!("unexpected response: {:?}", &ancestor_hashes_response);
}
}
// Set duplicate confirmed
let mut expected_ancestors = Vec::with_capacity(num_slots as usize);
expected_ancestors.resize(num_slots as usize, (0, Hash::default()));
for (i, duplicate_confirmed_slot) in (slot..slot + num_slots).enumerate() {
let frozen_hash = Hash::new_unique();
expected_ancestors[num_slots as usize - i - 1] =
(duplicate_confirmed_slot, frozen_hash);
blockstore.insert_bank_hash(duplicate_confirmed_slot, frozen_hash, true);
}
let rv = ServeRepair::run_ancestor_hashes(
&recycler,
&socketaddr_any!(),
&blockstore,
slot + num_slots - 1,
nonce,
)
.expect("run_ancestor_hashes packets");
assert_eq!(rv.len(), 1);
let packet = &rv[0];
let ancestor_hashes_response = deserialize_ancestor_hashes_response(packet);
match ancestor_hashes_response {
AncestorHashesResponse::Hashes(hashes) => {
assert_eq!(hashes, expected_ancestors);
}
_ => {
panic!("unexpected response: {:?}", &ancestor_hashes_response);
}
}
}
#[test]

View File

@ -4078,7 +4078,7 @@ pub(crate) mod tests {
blockstore::{entries_to_test_shreds, make_slot_entries, BlockstoreError},
create_new_tmp_ledger,
genesis_utils::{create_genesis_config, create_genesis_config_with_leader},
get_tmp_ledger_path,
get_tmp_ledger_path, get_tmp_ledger_path_auto_delete,
shred::{Shred, ShredFlags, LEGACY_SHRED_DATA_CAPACITY},
},
solana_rpc::{
@ -6405,9 +6405,10 @@ pub(crate) mod tests {
let mut vote_simulator = VoteSimulator::new(1);
vote_simulator.fill_bank_forks(forks, &HashMap::<Pubkey, Vec<u64>>::new(), true);
let (bank_forks, mut progress) = (vote_simulator.bank_forks, vote_simulator.progress);
let ledger_path = get_tmp_ledger_path!();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(
Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger"),
Blockstore::open(ledger_path.path())
.expect("Expected to be able to open database ledger"),
);
let mut tower = Tower::new_for_tests(8, 2.0 / 3.0);
@ -6552,9 +6553,10 @@ pub(crate) mod tests {
vote_simulator.fill_bank_forks(forks, &validator_votes, true);
let (bank_forks, mut progress) = (vote_simulator.bank_forks, vote_simulator.progress);
let ledger_path = get_tmp_ledger_path!();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(
Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger"),
Blockstore::open(ledger_path.path())
.expect("Expected to be able to open database ledger"),
);
let mut tower = Tower::new_for_tests(8, 0.67);

View File

@ -8,7 +8,7 @@ use {
solana_entry::entry::{create_ticks, Entry},
solana_ledger::{
blockstore::{entries_to_test_shreds, Blockstore},
get_tmp_ledger_path,
get_tmp_ledger_path_auto_delete,
},
solana_sdk::{clock::Slot, hash::Hash},
std::path::Path,
@ -23,8 +23,6 @@ fn bench_write_shreds(bench: &mut Bencher, entries: Vec<Entry>, ledger_path: &Pa
let shreds = entries_to_test_shreds(&entries, 0, 0, true, 0, /*merkle_variant:*/ true);
blockstore.insert_shreds(shreds, None, false).unwrap();
});
Blockstore::destroy(ledger_path).expect("Expected successful database destruction");
}
// Insert some shreds into the ledger in preparation for read benchmarks
@ -59,28 +57,28 @@ fn setup_read_bench(
#[bench]
#[ignore]
fn bench_write_small(bench: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let num_entries = 32 * 1024;
let entries = create_ticks(num_entries, 0, Hash::default());
bench_write_shreds(bench, entries, &ledger_path);
bench_write_shreds(bench, entries, ledger_path.path());
}
// Write big shreds to the ledger
#[bench]
#[ignore]
fn bench_write_big(bench: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let num_entries = 32 * 1024;
let entries = create_ticks(num_entries, 0, Hash::default());
bench_write_shreds(bench, entries, &ledger_path);
bench_write_shreds(bench, entries, ledger_path.path());
}
#[bench]
#[ignore]
fn bench_read_sequential(bench: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore =
Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger");
Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger");
// Insert some big and small shreds into the ledger
let num_small_shreds = 32 * 1024;
@ -98,16 +96,14 @@ fn bench_read_sequential(bench: &mut Bencher) {
let _ = blockstore.get_data_shred(slot, i % total_shreds);
}
});
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[bench]
#[ignore]
fn bench_read_random(bench: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore =
Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger");
Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger");
// Insert some big and small shreds into the ledger
let num_small_shreds = 32 * 1024;
@ -129,36 +125,32 @@ fn bench_read_random(bench: &mut Bencher) {
let _ = blockstore.get_data_shred(slot, *i as u64);
}
});
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[bench]
#[ignore]
fn bench_insert_data_shred_small(bench: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore =
Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger");
Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger");
let num_entries = 32 * 1024;
let entries = create_ticks(num_entries, 0, Hash::default());
bench.iter(move || {
let shreds = entries_to_test_shreds(&entries, 0, 0, true, 0, /*merkle_variant:*/ true);
blockstore.insert_shreds(shreds, None, false).unwrap();
});
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[bench]
#[ignore]
fn bench_insert_data_shred_big(bench: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore =
Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger");
Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger");
let num_entries = 32 * 1024;
let entries = create_ticks(num_entries, 0, Hash::default());
bench.iter(move || {
let shreds = entries_to_test_shreds(&entries, 0, 0, true, 0, /*merkle_variant:*/ true);
blockstore.insert_shreds(shreds, None, false).unwrap();
});
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}

View File

@ -7,7 +7,7 @@ use {
solana_ledger::{
blockstore::Blockstore,
blockstore_db::{columns as cf, LedgerColumn},
get_tmp_ledger_path,
get_tmp_ledger_path_auto_delete,
},
solana_runtime::bank::RewardType,
solana_sdk::{clock::Slot, pubkey},
@ -86,22 +86,22 @@ fn bench_read_rewards<F, G>(
#[bench]
fn bench_serialize_write_bincode(bencher: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
bench_write_rewards(bencher, &ledger_path, write_bincode_rewards);
let ledger_path = get_tmp_ledger_path_auto_delete!();
bench_write_rewards(bencher, ledger_path.path(), write_bincode_rewards);
}
#[bench]
fn bench_serialize_write_protobuf(bencher: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
bench_write_rewards(bencher, &ledger_path, write_protobuf_rewards);
let ledger_path = get_tmp_ledger_path_auto_delete!();
bench_write_rewards(bencher, ledger_path.path(), write_protobuf_rewards);
}
#[bench]
fn bench_read_bincode(bencher: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
let ledger_path = get_tmp_ledger_path_auto_delete!();
bench_read_rewards(
bencher,
&ledger_path,
ledger_path.path(),
write_bincode_rewards,
read_bincode_rewards,
);
@ -109,10 +109,10 @@ fn bench_read_bincode(bencher: &mut Bencher) {
#[bench]
fn bench_read_protobuf(bencher: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
let ledger_path = get_tmp_ledger_path_auto_delete!();
bench_read_rewards(
bencher,
&ledger_path,
ledger_path.path(),
write_protobuf_rewards,
read_protobuf_rewards,
);

File diff suppressed because it is too large Load Diff

View File

@ -385,7 +385,7 @@ mod tests {
solana_ledger::{
blockstore::Blockstore,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
get_tmp_ledger_path,
get_tmp_ledger_path_auto_delete,
leader_schedule_cache::LeaderScheduleCache,
},
solana_measure::measure::Measure,
@ -404,171 +404,168 @@ mod tests {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config));
let prev_hash = bank.last_blockhash();
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path())
.expect("Expected to be able to open database ledger");
let default_target_tick_duration =
timing::duration_as_us(&PohConfig::default().target_tick_duration);
let target_tick_duration = Duration::from_micros(default_target_tick_duration);
let poh_config = PohConfig {
hashes_per_tick: Some(clock::DEFAULT_HASHES_PER_TICK),
target_tick_duration,
target_tick_count: None,
};
let exit = Arc::new(AtomicBool::new(false));
let default_target_tick_duration =
timing::duration_as_us(&PohConfig::default().target_tick_duration);
let target_tick_duration = Duration::from_micros(default_target_tick_duration);
let poh_config = PohConfig {
hashes_per_tick: Some(clock::DEFAULT_HASHES_PER_TICK),
target_tick_duration,
target_tick_count: None,
};
let exit = Arc::new(AtomicBool::new(false));
let ticks_per_slot = bank.ticks_per_slot();
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let blockstore = Arc::new(blockstore);
let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new(
bank.tick_height(),
prev_hash,
bank.clone(),
Some((4, 4)),
ticks_per_slot,
&Pubkey::default(),
blockstore,
&leader_schedule_cache,
&poh_config,
exit.clone(),
);
let poh_recorder = Arc::new(RwLock::new(poh_recorder));
let ticks_per_slot = bank.ticks_per_slot();
let bank_slot = bank.slot();
let ticks_per_slot = bank.ticks_per_slot();
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let blockstore = Arc::new(blockstore);
let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new(
bank.tick_height(),
prev_hash,
bank.clone(),
Some((4, 4)),
ticks_per_slot,
&Pubkey::default(),
blockstore,
&leader_schedule_cache,
&poh_config,
exit.clone(),
);
let poh_recorder = Arc::new(RwLock::new(poh_recorder));
let ticks_per_slot = bank.ticks_per_slot();
let bank_slot = bank.slot();
// specify RUN_TIME to run in a benchmark-like mode
// to calibrate batch size
let run_time = std::env::var("RUN_TIME")
.map(|x| x.parse().unwrap())
.unwrap_or(0);
let is_test_run = run_time == 0;
// specify RUN_TIME to run in a benchmark-like mode
// to calibrate batch size
let run_time = std::env::var("RUN_TIME")
.map(|x| x.parse().unwrap())
.unwrap_or(0);
let is_test_run = run_time == 0;
let entry_producer = {
let poh_recorder = poh_recorder.clone();
let exit = exit.clone();
let entry_producer = {
let poh_recorder = poh_recorder.clone();
let exit = exit.clone();
Builder::new()
.name("solPohEntryProd".to_string())
.spawn(move || {
let now = Instant::now();
let mut total_us = 0;
let mut total_times = 0;
let h1 = hash(b"hello world!");
let tx = VersionedTransaction::from(test_tx());
loop {
// send some data
let mut time = Measure::start("record");
let _ = poh_recorder.write().unwrap().record(
bank_slot,
h1,
vec![tx.clone()],
);
time.stop();
total_us += time.as_us();
total_times += 1;
if is_test_run && thread_rng().gen_ratio(1, 4) {
sleep(Duration::from_millis(200));
}
if exit.load(Ordering::Relaxed) {
info!(
"spent:{}ms record: {}ms entries recorded: {}",
now.elapsed().as_millis(),
total_us / 1000,
total_times,
);
break;
}
Builder::new()
.name("solPohEntryProd".to_string())
.spawn(move || {
let now = Instant::now();
let mut total_us = 0;
let mut total_times = 0;
let h1 = hash(b"hello world!");
let tx = VersionedTransaction::from(test_tx());
loop {
// send some data
let mut time = Measure::start("record");
let _ =
poh_recorder
.write()
.unwrap()
.record(bank_slot, h1, vec![tx.clone()]);
time.stop();
total_us += time.as_us();
total_times += 1;
if is_test_run && thread_rng().gen_ratio(1, 4) {
sleep(Duration::from_millis(200));
}
})
.unwrap()
};
let hashes_per_batch = std::env::var("HASHES_PER_BATCH")
.map(|x| x.parse().unwrap())
.unwrap_or(DEFAULT_HASHES_PER_BATCH);
let poh_service = PohService::new(
poh_recorder.clone(),
&poh_config,
exit.clone(),
0,
DEFAULT_PINNED_CPU_CORE,
hashes_per_batch,
record_receiver,
);
poh_recorder.write().unwrap().set_bank_for_test(bank);
// get some events
let mut hashes = 0;
let mut need_tick = true;
let mut need_entry = true;
let mut need_partial = true;
let mut num_ticks = 0;
let time = Instant::now();
while run_time != 0 || need_tick || need_entry || need_partial {
let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
if entry.is_tick() {
num_ticks += 1;
assert!(
entry.num_hashes <= poh_config.hashes_per_tick.unwrap(),
"{} <= {}",
entry.num_hashes,
poh_config.hashes_per_tick.unwrap()
);
if entry.num_hashes == poh_config.hashes_per_tick.unwrap() {
need_tick = false;
} else {
need_partial = false;
if exit.load(Ordering::Relaxed) {
info!(
"spent:{}ms record: {}ms entries recorded: {}",
now.elapsed().as_millis(),
total_us / 1000,
total_times,
);
break;
}
}
})
.unwrap()
};
hashes += entry.num_hashes;
let hashes_per_batch = std::env::var("HASHES_PER_BATCH")
.map(|x| x.parse().unwrap())
.unwrap_or(DEFAULT_HASHES_PER_BATCH);
let poh_service = PohService::new(
poh_recorder.clone(),
&poh_config,
exit.clone(),
0,
DEFAULT_PINNED_CPU_CORE,
hashes_per_batch,
record_receiver,
);
poh_recorder.write().unwrap().set_bank_for_test(bank);
assert_eq!(hashes, poh_config.hashes_per_tick.unwrap());
// get some events
let mut hashes = 0;
let mut need_tick = true;
let mut need_entry = true;
let mut need_partial = true;
let mut num_ticks = 0;
hashes = 0;
let time = Instant::now();
while run_time != 0 || need_tick || need_entry || need_partial {
let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
if entry.is_tick() {
num_ticks += 1;
assert!(
entry.num_hashes <= poh_config.hashes_per_tick.unwrap(),
"{} <= {}",
entry.num_hashes,
poh_config.hashes_per_tick.unwrap()
);
if entry.num_hashes == poh_config.hashes_per_tick.unwrap() {
need_tick = false;
} else {
assert!(entry.num_hashes >= 1);
need_entry = false;
hashes += entry.num_hashes;
need_partial = false;
}
if run_time != 0 {
if time.elapsed().as_millis() > run_time {
break;
}
} else {
assert!(
time.elapsed().as_secs() < 60,
"Test should not run for this long! {}s tick {} entry {} partial {}",
time.elapsed().as_secs(),
need_tick,
need_entry,
need_partial,
);
}
hashes += entry.num_hashes;
assert_eq!(hashes, poh_config.hashes_per_tick.unwrap());
hashes = 0;
} else {
assert!(entry.num_hashes >= 1);
need_entry = false;
hashes += entry.num_hashes;
}
info!(
"target_tick_duration: {} ticks_per_slot: {}",
poh_config.target_tick_duration.as_nanos(),
ticks_per_slot
);
let elapsed = time.elapsed();
info!(
"{} ticks in {}ms {}us/tick",
num_ticks,
elapsed.as_millis(),
elapsed.as_micros() / num_ticks
);
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
entry_producer.join().unwrap();
if run_time != 0 {
if time.elapsed().as_millis() > run_time {
break;
}
} else {
assert!(
time.elapsed().as_secs() < 60,
"Test should not run for this long! {}s tick {} entry {} partial {}",
time.elapsed().as_secs(),
need_tick,
need_entry,
need_partial,
);
}
}
Blockstore::destroy(&ledger_path).unwrap();
info!(
"target_tick_duration: {} ticks_per_slot: {}",
poh_config.target_tick_duration.as_nanos(),
ticks_per_slot
);
let elapsed = time.elapsed();
info!(
"{} ticks in {}ms {}us/tick",
num_ticks,
elapsed.as_millis(),
elapsed.as_micros() / num_ticks
);
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
entry_producer.join().unwrap();
}
}

View File

@ -108,7 +108,8 @@ mod test {
super::*,
solana_gossip::contact_info::ContactInfo,
solana_ledger::{
blockstore::Blockstore, get_tmp_ledger_path, leader_schedule_cache::LeaderScheduleCache,
blockstore::Blockstore, get_tmp_ledger_path_auto_delete,
leader_schedule_cache::LeaderScheduleCache,
},
solana_runtime::{
bank::Bank,
@ -128,123 +129,120 @@ mod test {
#[test]
fn test_get_leader_tpus() {
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&ledger_path).unwrap();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let validator_vote_keypairs0 = ValidatorVoteKeypairs::new_rand();
let validator_vote_keypairs1 = ValidatorVoteKeypairs::new_rand();
let validator_vote_keypairs2 = ValidatorVoteKeypairs::new_rand();
let validator_keypairs = vec![
&validator_vote_keypairs0,
&validator_vote_keypairs1,
&validator_vote_keypairs2,
];
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts(
1_000_000_000,
&validator_keypairs,
vec![10_000; 3],
);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
let validator_vote_keypairs0 = ValidatorVoteKeypairs::new_rand();
let validator_vote_keypairs1 = ValidatorVoteKeypairs::new_rand();
let validator_vote_keypairs2 = ValidatorVoteKeypairs::new_rand();
let validator_keypairs = vec![
&validator_vote_keypairs0,
&validator_vote_keypairs1,
&validator_vote_keypairs2,
];
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts(
1_000_000_000,
&validator_keypairs,
vec![10_000; 3],
);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
let (poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
bank.last_blockhash(),
bank.clone(),
Some((2, 2)),
bank.ticks_per_slot(),
&Pubkey::default(),
Arc::new(blockstore),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&PohConfig::default(),
Arc::new(AtomicBool::default()),
);
let (poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
bank.last_blockhash(),
bank.clone(),
Some((2, 2)),
bank.ticks_per_slot(),
&Pubkey::default(),
Arc::new(blockstore),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&PohConfig::default(),
Arc::new(AtomicBool::default()),
);
let node_keypair = Arc::new(Keypair::new());
let cluster_info = Arc::new(ClusterInfo::new(
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
node_keypair,
SocketAddrSpace::Unspecified,
));
let node_keypair = Arc::new(Keypair::new());
let cluster_info = Arc::new(ClusterInfo::new(
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
node_keypair,
SocketAddrSpace::Unspecified,
));
let validator0_socket = (
SocketAddr::from((Ipv4Addr::LOCALHOST, 1111)),
SocketAddr::from((Ipv4Addr::LOCALHOST, 1111 + QUIC_PORT_OFFSET)),
);
let validator1_socket = (
SocketAddr::from((Ipv4Addr::LOCALHOST, 2222)),
SocketAddr::from((Ipv4Addr::LOCALHOST, 2222 + QUIC_PORT_OFFSET)),
);
let validator2_socket = (
SocketAddr::from((Ipv4Addr::LOCALHOST, 3333)),
SocketAddr::from((Ipv4Addr::LOCALHOST, 3333 + QUIC_PORT_OFFSET)),
);
let recent_peers: HashMap<_, _> = vec![
(
validator_vote_keypairs0.node_keypair.pubkey(),
validator0_socket,
),
(
validator_vote_keypairs1.node_keypair.pubkey(),
validator1_socket,
),
(
validator_vote_keypairs2.node_keypair.pubkey(),
validator2_socket,
),
]
.iter()
.cloned()
.collect();
let leader_info = ClusterTpuInfo {
cluster_info,
poh_recorder: Arc::new(RwLock::new(poh_recorder)),
recent_peers: recent_peers.clone(),
};
let validator0_socket = (
SocketAddr::from((Ipv4Addr::LOCALHOST, 1111)),
SocketAddr::from((Ipv4Addr::LOCALHOST, 1111 + QUIC_PORT_OFFSET)),
);
let validator1_socket = (
SocketAddr::from((Ipv4Addr::LOCALHOST, 2222)),
SocketAddr::from((Ipv4Addr::LOCALHOST, 2222 + QUIC_PORT_OFFSET)),
);
let validator2_socket = (
SocketAddr::from((Ipv4Addr::LOCALHOST, 3333)),
SocketAddr::from((Ipv4Addr::LOCALHOST, 3333 + QUIC_PORT_OFFSET)),
);
let recent_peers: HashMap<_, _> = vec![
(
validator_vote_keypairs0.node_keypair.pubkey(),
validator0_socket,
),
(
validator_vote_keypairs1.node_keypair.pubkey(),
validator1_socket,
),
(
validator_vote_keypairs2.node_keypair.pubkey(),
validator2_socket,
),
]
.iter()
.cloned()
.collect();
let leader_info = ClusterTpuInfo {
cluster_info,
poh_recorder: Arc::new(RwLock::new(poh_recorder)),
recent_peers: recent_peers.clone(),
};
let slot = bank.slot();
let first_leader =
solana_ledger::leader_schedule_utils::slot_leader_at(slot, &bank).unwrap();
assert_eq!(
leader_info.get_leader_tpus(1, Protocol::UDP),
vec![&recent_peers.get(&first_leader).unwrap().0]
);
let slot = bank.slot();
let first_leader =
solana_ledger::leader_schedule_utils::slot_leader_at(slot, &bank).unwrap();
assert_eq!(
leader_info.get_leader_tpus(1, Protocol::UDP),
vec![&recent_peers.get(&first_leader).unwrap().0]
);
let second_leader = solana_ledger::leader_schedule_utils::slot_leader_at(
slot + NUM_CONSECUTIVE_LEADER_SLOTS,
&bank,
)
.unwrap();
let mut expected_leader_sockets = vec![
&recent_peers.get(&first_leader).unwrap().0,
&recent_peers.get(&second_leader).unwrap().0,
];
expected_leader_sockets.dedup();
assert_eq!(
leader_info.get_leader_tpus(2, Protocol::UDP),
expected_leader_sockets
);
let second_leader = solana_ledger::leader_schedule_utils::slot_leader_at(
slot + NUM_CONSECUTIVE_LEADER_SLOTS,
&bank,
)
.unwrap();
let mut expected_leader_sockets = vec![
&recent_peers.get(&first_leader).unwrap().0,
&recent_peers.get(&second_leader).unwrap().0,
];
expected_leader_sockets.dedup();
assert_eq!(
leader_info.get_leader_tpus(2, Protocol::UDP),
expected_leader_sockets
);
let third_leader = solana_ledger::leader_schedule_utils::slot_leader_at(
slot + (2 * NUM_CONSECUTIVE_LEADER_SLOTS),
&bank,
)
.unwrap();
let mut expected_leader_sockets = vec![
&recent_peers.get(&first_leader).unwrap().0,
&recent_peers.get(&second_leader).unwrap().0,
&recent_peers.get(&third_leader).unwrap().0,
];
expected_leader_sockets.dedup();
assert_eq!(
leader_info.get_leader_tpus(3, Protocol::UDP),
expected_leader_sockets
);
let third_leader = solana_ledger::leader_schedule_utils::slot_leader_at(
slot + (2 * NUM_CONSECUTIVE_LEADER_SLOTS),
&bank,
)
.unwrap();
let mut expected_leader_sockets = vec![
&recent_peers.get(&first_leader).unwrap().0,
&recent_peers.get(&second_leader).unwrap().0,
&recent_peers.get(&third_leader).unwrap().0,
];
expected_leader_sockets.dedup();
assert_eq!(
leader_info.get_leader_tpus(3, Protocol::UDP),
expected_leader_sockets
);
for x in 4..8 {
assert!(leader_info.get_leader_tpus(x, Protocol::UDP).len() <= recent_peers.len());
}
for x in 4..8 {
assert!(leader_info.get_leader_tpus(x, Protocol::UDP).len() <= recent_peers.len());
}
Blockstore::destroy(&ledger_path).unwrap();
}
}

View File

@ -586,7 +586,7 @@ mod tests {
crate::rpc::{create_validator_exit, tests::new_test_cluster_info},
solana_ledger::{
genesis_utils::{create_genesis_config, GenesisConfigInfo},
get_tmp_ledger_path,
get_tmp_ledger_path_auto_delete,
},
solana_rpc_client_api::config::RpcContextConfig,
solana_runtime::bank::Bank,
@ -618,8 +618,8 @@ mod tests {
solana_net_utils::find_available_port_in_range(ip_addr, (10000, 65535)).unwrap(),
);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
@ -719,8 +719,8 @@ mod tests {
#[test]
fn test_is_file_get_path() {
let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let bank_forks = create_bank_forks();
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
@ -728,13 +728,13 @@ mod tests {
let bank_forks = create_bank_forks();
let rrm = RpcRequestMiddleware::new(
ledger_path.clone(),
ledger_path.path().to_path_buf(),
None,
bank_forks.clone(),
health.clone(),
);
let rrm_with_snapshot_config = RpcRequestMiddleware::new(
ledger_path.clone(),
ledger_path.path().to_path_buf(),
Some(SnapshotConfig::default()),
bank_forks,
health,
@ -829,15 +829,14 @@ mod tests {
fn test_process_file_get() {
let runtime = Runtime::new().unwrap();
let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let genesis_path = ledger_path.join(DEFAULT_GENESIS_ARCHIVE);
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let genesis_path = ledger_path.path().join(DEFAULT_GENESIS_ARCHIVE);
let bank_forks = create_bank_forks();
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let rrm = RpcRequestMiddleware::new(
ledger_path.clone(),
ledger_path.path().to_path_buf(),
None,
bank_forks,
RpcHealth::stub(optimistically_confirmed_bank, blockstore),
@ -872,7 +871,7 @@ mod tests {
{
std::fs::remove_file(&genesis_path).unwrap();
{
let mut file = std::fs::File::create(ledger_path.join("wrong")).unwrap();
let mut file = std::fs::File::create(ledger_path.path().join("wrong")).unwrap();
file.write_all(b"wrong file").unwrap();
}
symlink::symlink_file("wrong", &genesis_path).unwrap();

View File

@ -1259,6 +1259,7 @@ pub(crate) mod tests {
rpc_pubsub_service,
},
serial_test::serial,
solana_ledger::get_tmp_ledger_path_auto_delete,
solana_rpc_client_api::config::{
RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
@ -1473,8 +1474,8 @@ pub(crate) mod tests {
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let ledger_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&ledger_path).unwrap();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let blockstore = Arc::new(blockstore);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
@ -1593,8 +1594,8 @@ pub(crate) mod tests {
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let ledger_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&ledger_path).unwrap();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let blockstore = Arc::new(blockstore);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
@ -1711,8 +1712,8 @@ pub(crate) mod tests {
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let ledger_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&ledger_path).unwrap();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let blockstore = Arc::new(blockstore);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let max_complete_rewards_slot = Arc::new(AtomicU64::default());

View File

@ -230,7 +230,7 @@ pub(crate) mod tests {
nonce_info::{NonceFull, NoncePartial},
rent_debits::RentDebits,
},
solana_ledger::{genesis_utils::create_genesis_config, get_tmp_ledger_path},
solana_ledger::{genesis_utils::create_genesis_config, get_tmp_ledger_path_auto_delete},
solana_runtime::bank::{Bank, TransactionBalancesSet},
solana_sdk::{
account_utils::StateMut,
@ -339,9 +339,9 @@ pub(crate) mod tests {
let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config));
let (transaction_status_sender, transaction_status_receiver) = unbounded();
let ledger_path = get_tmp_ledger_path!();
let blockstore =
Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger");
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path())
.expect("Expected to be able to open database ledger");
let blockstore = Arc::new(blockstore);
let transaction = build_test_transaction_legacy();

View File

@ -505,7 +505,7 @@ pub mod test {
solana_ledger::{
blockstore::Blockstore,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
get_tmp_ledger_path,
get_tmp_ledger_path_auto_delete,
shred::{max_ticks_per_n_shreds, ProcessShredsStats, ReedSolomonCache, Shredder},
},
solana_runtime::bank::Bank,
@ -590,8 +590,8 @@ pub mod test {
#[test]
fn test_duplicate_retransmit_signal() {
// Setup
let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let (transmit_sender, transmit_receiver) = unbounded();
let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
@ -694,66 +694,62 @@ pub mod test {
#[test]
fn test_broadcast_ledger() {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path!();
let ledger_path = get_tmp_ledger_path_auto_delete!();
// Create the leader scheduler
let leader_keypair = Arc::new(Keypair::new());
let (entry_sender, entry_receiver) = unbounded();
let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
let broadcast_service = setup_dummy_broadcast_service(
leader_keypair,
ledger_path.path(),
entry_receiver,
retransmit_slots_receiver,
);
let start_tick_height;
let max_tick_height;
let ticks_per_slot;
let slot;
{
// Create the leader scheduler
let leader_keypair = Arc::new(Keypair::new());
let (entry_sender, entry_receiver) = unbounded();
let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
let broadcast_service = setup_dummy_broadcast_service(
leader_keypair,
&ledger_path,
entry_receiver,
retransmit_slots_receiver,
);
let start_tick_height;
let max_tick_height;
let ticks_per_slot;
let slot;
{
let bank = broadcast_service.bank;
start_tick_height = bank.tick_height();
max_tick_height = bank.max_tick_height();
ticks_per_slot = bank.ticks_per_slot();
slot = bank.slot();
let ticks = create_ticks(max_tick_height - start_tick_height, 0, Hash::default());
for (i, tick) in ticks.into_iter().enumerate() {
entry_sender
.send((bank.clone(), (tick, i as u64 + 1)))
.expect("Expect successful send to broadcast service");
}
let bank = broadcast_service.bank;
start_tick_height = bank.tick_height();
max_tick_height = bank.max_tick_height();
ticks_per_slot = bank.ticks_per_slot();
slot = bank.slot();
let ticks = create_ticks(max_tick_height - start_tick_height, 0, Hash::default());
for (i, tick) in ticks.into_iter().enumerate() {
entry_sender
.send((bank.clone(), (tick, i as u64 + 1)))
.expect("Expect successful send to broadcast service");
}
trace!(
"[broadcast_ledger] max_tick_height: {}, start_tick_height: {}, ticks_per_slot: {}",
max_tick_height,
start_tick_height,
ticks_per_slot,
);
let mut entries = vec![];
for _ in 0..10 {
entries = broadcast_service
.blockstore
.get_slot_entries(slot, 0)
.expect("Expect entries to be present");
if entries.len() >= max_tick_height as usize {
break;
}
sleep(Duration::from_millis(1000));
}
assert_eq!(entries.len(), max_tick_height as usize);
drop(entry_sender);
drop(retransmit_slots_sender);
broadcast_service
.broadcast_service
.join()
.expect("Expect successful join of broadcast service");
}
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
trace!(
"[broadcast_ledger] max_tick_height: {}, start_tick_height: {}, ticks_per_slot: {}",
max_tick_height,
start_tick_height,
ticks_per_slot,
);
let mut entries = vec![];
for _ in 0..10 {
entries = broadcast_service
.blockstore
.get_slot_entries(slot, 0)
.expect("Expect entries to be present");
if entries.len() >= max_tick_height as usize {
break;
}
sleep(Duration::from_millis(1000));
}
assert_eq!(entries.len(), max_tick_height as usize);
drop(entry_sender);
drop(retransmit_slots_sender);
broadcast_service
.broadcast_service
.join()
.expect("Expect successful join of broadcast service");
}
}

View File

@ -503,7 +503,7 @@ mod test {
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_ledger::{
blockstore::Blockstore, genesis_utils::create_genesis_config, get_tmp_ledger_path,
shred::max_ticks_per_n_shreds,
get_tmp_ledger_path_auto_delete, shred::max_ticks_per_n_shreds,
},
solana_runtime::bank::Bank,
solana_sdk::{
@ -815,9 +815,10 @@ mod test {
bs.current_slot_and_parent = Some((1, 0));
let entries = create_ticks(10_000, 1, solana_sdk::hash::Hash::default());
let ledger_path = get_tmp_ledger_path!();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(
Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger"),
Blockstore::open(ledger_path.path())
.expect("Expected to be able to open database ledger"),
);
let mut stats = ProcessShredsStats::default();