Forwarder: Add common setup for tests (#31232)
This commit is contained in:
parent
ca1bde3591
commit
748220c9d3
|
@ -271,24 +271,83 @@ mod tests {
|
|||
unprocessed_packet_batches::{DeserializedPacket, UnprocessedPacketBatches},
|
||||
unprocessed_transaction_storage::ThreadType,
|
||||
},
|
||||
solana_ledger::{
|
||||
blockstore::Blockstore, genesis_utils::GenesisConfigInfo,
|
||||
get_tmp_ledger_path_auto_delete,
|
||||
},
|
||||
solana_gossip::cluster_info::Node,
|
||||
solana_ledger::{blockstore::Blockstore, genesis_utils::GenesisConfigInfo},
|
||||
solana_perf::packet::PacketFlags,
|
||||
solana_poh::poh_recorder::create_test_recorder,
|
||||
solana_poh::{poh_recorder::create_test_recorder, poh_service::PohService},
|
||||
solana_runtime::bank::Bank,
|
||||
solana_sdk::{
|
||||
hash::Hash, poh_config::PohConfig, signature::Keypair, signer::Signer,
|
||||
system_transaction, transaction::VersionedTransaction,
|
||||
},
|
||||
solana_streamer::recvmmsg::recv_mmsg,
|
||||
std::sync::atomic::AtomicBool,
|
||||
tempfile::TempDir,
|
||||
};
|
||||
|
||||
struct TestSetup {
|
||||
_ledger_dir: TempDir,
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
poh_recorder: Arc<RwLock<PohRecorder>>,
|
||||
exit: Arc<AtomicBool>,
|
||||
poh_service: PohService,
|
||||
cluster_info: Arc<ClusterInfo>,
|
||||
local_node: Node,
|
||||
}
|
||||
|
||||
fn setup() -> TestSetup {
|
||||
let validator_keypair = Arc::new(Keypair::new());
|
||||
let genesis_config_info =
|
||||
create_slow_genesis_config_with_leader(10_000, &validator_keypair.pubkey());
|
||||
let GenesisConfigInfo { genesis_config, .. } = &genesis_config_info;
|
||||
|
||||
let bank: Bank = Bank::new_no_wallclock_throttle_for_tests(genesis_config);
|
||||
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
|
||||
let bank = bank_forks.read().unwrap().working_bank();
|
||||
|
||||
let ledger_path = TempDir::new().unwrap();
|
||||
let blockstore = Arc::new(
|
||||
Blockstore::open(ledger_path.as_ref())
|
||||
.expect("Expected to be able to open database ledger"),
|
||||
);
|
||||
let poh_config = PohConfig {
|
||||
// limit tick count to avoid clearing working_bank at
|
||||
// PohRecord then PohRecorderError(MaxHeightReached) at BankingStage
|
||||
target_tick_count: Some(bank.max_tick_height() - 1),
|
||||
..PohConfig::default()
|
||||
};
|
||||
|
||||
let (exit, poh_recorder, poh_service, _entry_receiver) =
|
||||
create_test_recorder(&bank, &blockstore, Some(poh_config), None);
|
||||
|
||||
let (local_node, cluster_info) = new_test_cluster_info(Some(validator_keypair));
|
||||
let cluster_info = Arc::new(cluster_info);
|
||||
|
||||
TestSetup {
|
||||
_ledger_dir: ledger_path,
|
||||
bank_forks,
|
||||
poh_recorder,
|
||||
exit,
|
||||
poh_service,
|
||||
cluster_info,
|
||||
local_node,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_forwarder_budget() {
|
||||
solana_logger::setup();
|
||||
let TestSetup {
|
||||
bank_forks,
|
||||
poh_recorder,
|
||||
exit,
|
||||
poh_service,
|
||||
cluster_info,
|
||||
local_node,
|
||||
..
|
||||
} = setup();
|
||||
|
||||
// Create `PacketBatch` with 1 unprocessed packet
|
||||
let tx = system_transaction::transfer(
|
||||
&Keypair::new(),
|
||||
|
@ -299,82 +358,63 @@ mod tests {
|
|||
let packet = Packet::from_data(None, tx).unwrap();
|
||||
let deserialized_packet = DeserializedPacket::new(packet).unwrap();
|
||||
|
||||
let validator_keypair = Arc::new(Keypair::new());
|
||||
let genesis_config_info =
|
||||
create_slow_genesis_config_with_leader(10_000, &validator_keypair.pubkey());
|
||||
let GenesisConfigInfo { genesis_config, .. } = &genesis_config_info;
|
||||
|
||||
let bank = Bank::new_no_wallclock_throttle_for_tests(genesis_config);
|
||||
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
|
||||
let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap());
|
||||
let ledger_path = get_tmp_ledger_path_auto_delete!();
|
||||
{
|
||||
let blockstore = Arc::new(
|
||||
Blockstore::open(ledger_path.path())
|
||||
.expect("Expected to be able to open database ledger"),
|
||||
let test_cases = vec![
|
||||
("budget-restricted", DataBudget::restricted(), 0),
|
||||
("budget-available", DataBudget::default(), 1),
|
||||
];
|
||||
for (name, data_budget, expected_num_forwarded) in test_cases {
|
||||
let forwarder = Forwarder::new(
|
||||
poh_recorder.clone(),
|
||||
bank_forks.clone(),
|
||||
cluster_info.clone(),
|
||||
Arc::new(ConnectionCache::default()),
|
||||
Arc::new(data_budget),
|
||||
);
|
||||
let unprocessed_packet_batches: UnprocessedPacketBatches =
|
||||
UnprocessedPacketBatches::from_iter(
|
||||
vec![deserialized_packet.clone()].into_iter(),
|
||||
1,
|
||||
);
|
||||
let stats = BankingStageStats::default();
|
||||
forwarder.handle_forwarding(
|
||||
&mut UnprocessedTransactionStorage::new_transaction_storage(
|
||||
unprocessed_packet_batches,
|
||||
ThreadType::Transactions,
|
||||
),
|
||||
true,
|
||||
&mut LeaderSlotMetricsTracker::new(0),
|
||||
&stats,
|
||||
&mut TracerPacketStats::new(0),
|
||||
);
|
||||
let poh_config = PohConfig {
|
||||
// limit tick count to avoid clearing working_bank at
|
||||
// PohRecord then PohRecorderError(MaxHeightReached) at BankingStage
|
||||
target_tick_count: Some(bank.max_tick_height() - 1),
|
||||
..PohConfig::default()
|
||||
};
|
||||
|
||||
let (exit, poh_recorder, poh_service, _entry_receiver) =
|
||||
create_test_recorder(&bank, &blockstore, Some(poh_config), None);
|
||||
|
||||
let (local_node, cluster_info) = new_test_cluster_info(Some(validator_keypair));
|
||||
let cluster_info = Arc::new(cluster_info);
|
||||
let recv_socket = &local_node.sockets.tpu_forwards[0];
|
||||
recv_socket
|
||||
.set_nonblocking(expected_num_forwarded == 0)
|
||||
.unwrap();
|
||||
|
||||
let test_cases = vec![
|
||||
("budget-restricted", DataBudget::restricted(), 0),
|
||||
("budget-available", DataBudget::default(), 1),
|
||||
];
|
||||
for (name, data_budget, expected_num_forwarded) in test_cases {
|
||||
let forwarder = Forwarder::new(
|
||||
poh_recorder.clone(),
|
||||
bank_forks.clone(),
|
||||
cluster_info.clone(),
|
||||
Arc::new(ConnectionCache::default()),
|
||||
Arc::new(data_budget),
|
||||
);
|
||||
let unprocessed_packet_batches: UnprocessedPacketBatches =
|
||||
UnprocessedPacketBatches::from_iter(
|
||||
vec![deserialized_packet.clone()].into_iter(),
|
||||
1,
|
||||
);
|
||||
let stats = BankingStageStats::default();
|
||||
forwarder.handle_forwarding(
|
||||
&mut UnprocessedTransactionStorage::new_transaction_storage(
|
||||
unprocessed_packet_batches,
|
||||
ThreadType::Transactions,
|
||||
),
|
||||
true,
|
||||
&mut LeaderSlotMetricsTracker::new(0),
|
||||
&stats,
|
||||
&mut TracerPacketStats::new(0),
|
||||
);
|
||||
|
||||
recv_socket
|
||||
.set_nonblocking(expected_num_forwarded == 0)
|
||||
.unwrap();
|
||||
|
||||
let mut packets = vec![Packet::default(); 2];
|
||||
let num_received = recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default();
|
||||
assert_eq!(num_received, expected_num_forwarded, "{name}");
|
||||
}
|
||||
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
poh_service.join().unwrap();
|
||||
let mut packets = vec![Packet::default(); 2];
|
||||
let num_received = recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default();
|
||||
assert_eq!(num_received, expected_num_forwarded, "{name}");
|
||||
}
|
||||
Blockstore::destroy(ledger_path.path()).unwrap();
|
||||
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
poh_service.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_handle_forwarding() {
|
||||
solana_logger::setup();
|
||||
let TestSetup {
|
||||
bank_forks,
|
||||
poh_recorder,
|
||||
exit,
|
||||
poh_service,
|
||||
cluster_info,
|
||||
local_node,
|
||||
..
|
||||
} = setup();
|
||||
|
||||
// packets are deserialized upon receiving, failed packets will not be
|
||||
// forwarded; Therefore need to create real packets here.
|
||||
let keypair = Keypair::new();
|
||||
|
@ -402,82 +442,55 @@ mod tests {
|
|||
),
|
||||
ThreadType::Transactions,
|
||||
);
|
||||
let connection_cache = ConnectionCache::default();
|
||||
|
||||
let validator_keypair = Arc::new(Keypair::new());
|
||||
let genesis_config_info =
|
||||
create_slow_genesis_config_with_leader(10_000, &validator_keypair.pubkey());
|
||||
let GenesisConfigInfo { genesis_config, .. } = &genesis_config_info;
|
||||
let bank = Bank::new_no_wallclock_throttle_for_tests(genesis_config);
|
||||
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
|
||||
let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap());
|
||||
let ledger_path = get_tmp_ledger_path_auto_delete!();
|
||||
{
|
||||
let blockstore = Arc::new(
|
||||
Blockstore::open(ledger_path.path())
|
||||
.expect("Expected to be able to open database ledger"),
|
||||
let test_cases = vec![
|
||||
("fwd-normal", true, vec![normal_block_hash], 2),
|
||||
("fwd-no-op", true, vec![], 2),
|
||||
("fwd-no-hold", false, vec![], 0),
|
||||
];
|
||||
|
||||
let forwarder = Forwarder::new(
|
||||
poh_recorder,
|
||||
bank_forks,
|
||||
cluster_info,
|
||||
Arc::new(connection_cache),
|
||||
Arc::new(DataBudget::default()),
|
||||
);
|
||||
for (name, hold, expected_ids, expected_num_unprocessed) in test_cases {
|
||||
let stats = BankingStageStats::default();
|
||||
forwarder.handle_forwarding(
|
||||
&mut unprocessed_packet_batches,
|
||||
hold,
|
||||
&mut LeaderSlotMetricsTracker::new(0),
|
||||
&stats,
|
||||
&mut TracerPacketStats::new(0),
|
||||
);
|
||||
let poh_config = PohConfig {
|
||||
// limit tick count to avoid clearing working_bank at
|
||||
// PohRecord then PohRecorderError(MaxHeightReached) at BankingStage
|
||||
target_tick_count: Some(bank.max_tick_height() - 1),
|
||||
..PohConfig::default()
|
||||
};
|
||||
|
||||
let (exit, poh_recorder, poh_service, _entry_receiver) =
|
||||
create_test_recorder(&bank, &blockstore, Some(poh_config), None);
|
||||
|
||||
let (local_node, cluster_info) = new_test_cluster_info(Some(validator_keypair));
|
||||
let recv_socket = &local_node.sockets.tpu_forwards[0];
|
||||
let connection_cache = ConnectionCache::default();
|
||||
recv_socket
|
||||
.set_nonblocking(expected_ids.is_empty())
|
||||
.unwrap();
|
||||
|
||||
let test_cases = vec![
|
||||
("fwd-normal", true, vec![normal_block_hash], 2),
|
||||
("fwd-no-op", true, vec![], 2),
|
||||
("fwd-no-hold", false, vec![], 0),
|
||||
];
|
||||
|
||||
let forwarder = Forwarder::new(
|
||||
poh_recorder,
|
||||
bank_forks,
|
||||
Arc::new(cluster_info),
|
||||
Arc::new(connection_cache),
|
||||
Arc::new(DataBudget::default()),
|
||||
);
|
||||
for (name, hold, expected_ids, expected_num_unprocessed) in test_cases {
|
||||
let stats = BankingStageStats::default();
|
||||
forwarder.handle_forwarding(
|
||||
&mut unprocessed_packet_batches,
|
||||
hold,
|
||||
&mut LeaderSlotMetricsTracker::new(0),
|
||||
&stats,
|
||||
&mut TracerPacketStats::new(0),
|
||||
let mut packets = vec![Packet::default(); 2];
|
||||
let num_received = recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default();
|
||||
assert_eq!(num_received, expected_ids.len(), "{name}");
|
||||
for (i, expected_id) in expected_ids.iter().enumerate() {
|
||||
assert_eq!(packets[i].meta().size, 215);
|
||||
let recv_transaction: VersionedTransaction =
|
||||
packets[i].deserialize_slice(..).unwrap();
|
||||
assert_eq!(
|
||||
recv_transaction.message.recent_blockhash(),
|
||||
expected_id,
|
||||
"{name}"
|
||||
);
|
||||
|
||||
recv_socket
|
||||
.set_nonblocking(expected_ids.is_empty())
|
||||
.unwrap();
|
||||
|
||||
let mut packets = vec![Packet::default(); 2];
|
||||
let num_received = recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default();
|
||||
assert_eq!(num_received, expected_ids.len(), "{name}");
|
||||
for (i, expected_id) in expected_ids.iter().enumerate() {
|
||||
assert_eq!(packets[i].meta().size, 215);
|
||||
let recv_transaction: VersionedTransaction =
|
||||
packets[i].deserialize_slice(..).unwrap();
|
||||
assert_eq!(
|
||||
recv_transaction.message.recent_blockhash(),
|
||||
expected_id,
|
||||
"{name}"
|
||||
);
|
||||
}
|
||||
|
||||
let num_unprocessed_packets: usize = unprocessed_packet_batches.len();
|
||||
assert_eq!(num_unprocessed_packets, expected_num_unprocessed, "{name}");
|
||||
}
|
||||
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
poh_service.join().unwrap();
|
||||
let num_unprocessed_packets: usize = unprocessed_packet_batches.len();
|
||||
assert_eq!(num_unprocessed_packets, expected_num_unprocessed, "{name}");
|
||||
}
|
||||
Blockstore::destroy(ledger_path.path()).unwrap();
|
||||
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
poh_service.join().unwrap();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue