Move expensive test to integration tests
This test passes consistently when the test suite is run with a single thread. It fails consistently on MacOS when run as part of the unit-test suite. No idea why it passes in CI.
This commit is contained in:
parent
e4119268ca
commit
7c248cd2ef
173
src/tvu.rs
173
src/tvu.rs
|
@ -207,27 +207,11 @@ impl Service for Tvu {
|
|||
pub mod tests {
|
||||
use super::*;
|
||||
use crate::bank::Bank;
|
||||
use crate::blocktree::{get_tmp_ledger_path, BlocktreeConfig};
|
||||
use crate::blocktree::get_tmp_ledger_path;
|
||||
use crate::cluster_info::{ClusterInfo, Node};
|
||||
use crate::entry::next_entry_mut;
|
||||
use crate::entry::EntrySlice;
|
||||
use crate::genesis_block::GenesisBlock;
|
||||
use crate::gossip_service::GossipService;
|
||||
use crate::leader_scheduler::LeaderSchedulerConfig;
|
||||
use crate::packet::index_blobs;
|
||||
use crate::storage_stage::STORAGE_ROTATE_TEST_COUNT;
|
||||
use crate::streamer;
|
||||
use solana_sdk::system_transaction::SystemTransaction;
|
||||
use std::fs::remove_dir_all;
|
||||
use std::time::Duration;
|
||||
|
||||
fn new_gossip(
|
||||
cluster_info: Arc<RwLock<ClusterInfo>>,
|
||||
gossip: UdpSocket,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> GossipService {
|
||||
GossipService::new(&cluster_info, None, gossip, exit)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tvu_exit() {
|
||||
|
@ -280,159 +264,4 @@ pub mod tests {
|
|||
);
|
||||
tvu.close().expect("close");
|
||||
}
|
||||
|
||||
/// Test that message sent from leader to target1 and replayed to target2
|
||||
#[test]
|
||||
fn test_replay() {
|
||||
solana_logger::setup();
|
||||
let leader = Node::new_localhost();
|
||||
let target1_keypair = Keypair::new();
|
||||
let target1 = Node::new_localhost_with_pubkey(target1_keypair.pubkey());
|
||||
let target2 = Node::new_localhost();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
|
||||
// start cluster_info_l
|
||||
let mut cluster_info_l = ClusterInfo::new(leader.info.clone());
|
||||
cluster_info_l.set_leader(leader.info.id);
|
||||
|
||||
let cref_l = Arc::new(RwLock::new(cluster_info_l));
|
||||
let dr_l = new_gossip(cref_l, leader.sockets.gossip, exit.clone());
|
||||
|
||||
// start cluster_info2
|
||||
let mut cluster_info2 = ClusterInfo::new(target2.info.clone());
|
||||
cluster_info2.insert_info(leader.info.clone());
|
||||
cluster_info2.set_leader(leader.info.id);
|
||||
let cref2 = Arc::new(RwLock::new(cluster_info2));
|
||||
let dr_2 = new_gossip(cref2, target2.sockets.gossip, exit.clone());
|
||||
|
||||
// setup some blob services to send blobs into the socket
|
||||
// to simulate the source peer and get blobs out of the socket to
|
||||
// simulate target peer
|
||||
let (s_reader, r_reader) = channel();
|
||||
let blob_sockets: Vec<Arc<UdpSocket>> =
|
||||
target2.sockets.tvu.into_iter().map(Arc::new).collect();
|
||||
|
||||
let t_receiver = streamer::blob_receiver(blob_sockets[0].clone(), exit.clone(), s_reader);
|
||||
|
||||
// simulate leader sending messages
|
||||
let (s_responder, r_responder) = channel();
|
||||
let t_responder = streamer::responder(
|
||||
"test_replay",
|
||||
Arc::new(leader.sockets.retransmit),
|
||||
r_responder,
|
||||
);
|
||||
|
||||
// TODO: Fix this test so it always works with the default
|
||||
// LeaderSchedulerConfig/BlocktreeConfig configuration
|
||||
let mut leader_scheduler_config = LeaderSchedulerConfig::default();
|
||||
leader_scheduler_config.ticks_per_slot = 64;
|
||||
let blocktree_config = BlocktreeConfig::new(leader_scheduler_config.ticks_per_slot);
|
||||
|
||||
let starting_balance = 10_000;
|
||||
let (genesis_block, mint_keypair) = GenesisBlock::new(starting_balance);
|
||||
let tvu_addr = target1.info.tvu;
|
||||
let bank = Arc::new(Bank::new(&genesis_block));
|
||||
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::new_with_bank(
|
||||
&leader_scheduler_config,
|
||||
&bank,
|
||||
)));
|
||||
assert_eq!(bank.get_balance(&mint_keypair.pubkey()), starting_balance);
|
||||
|
||||
// start cluster_info1
|
||||
let mut cluster_info1 = ClusterInfo::new(target1.info.clone());
|
||||
cluster_info1.insert_info(leader.info.clone());
|
||||
cluster_info1.set_leader(leader.info.id);
|
||||
let cref1 = Arc::new(RwLock::new(cluster_info1));
|
||||
let dr_1 = new_gossip(cref1.clone(), target1.sockets.gossip, exit.clone());
|
||||
|
||||
let mut cur_hash = Hash::default();
|
||||
let blocktree_path = get_tmp_ledger_path("test_replay");
|
||||
|
||||
let (blocktree, l_receiver) =
|
||||
Blocktree::open_with_config_signal(&blocktree_path, &blocktree_config)
|
||||
.expect("Expected to successfully open ledger");
|
||||
let vote_account_keypair = Arc::new(Keypair::new());
|
||||
let voting_keypair = VotingKeypair::new_local(&vote_account_keypair);
|
||||
let (sender, _) = channel();
|
||||
let tvu = Tvu::new(
|
||||
Some(Arc::new(voting_keypair)),
|
||||
&bank,
|
||||
0,
|
||||
0,
|
||||
cur_hash,
|
||||
&cref1,
|
||||
{
|
||||
Sockets {
|
||||
repair: target1.sockets.repair,
|
||||
retransmit: target1.sockets.retransmit,
|
||||
fetch: target1.sockets.tvu,
|
||||
}
|
||||
},
|
||||
Arc::new(blocktree),
|
||||
STORAGE_ROTATE_TEST_COUNT,
|
||||
&sender,
|
||||
&StorageState::default(),
|
||||
None,
|
||||
l_receiver,
|
||||
leader_scheduler,
|
||||
);
|
||||
|
||||
let mut alice_ref_balance = starting_balance;
|
||||
let mut msgs = Vec::new();
|
||||
let mut blob_idx = 0;
|
||||
let num_transfers = 10;
|
||||
let transfer_amount = 501;
|
||||
let bob_keypair = Keypair::new();
|
||||
for i in 0..num_transfers {
|
||||
let entry0 = next_entry_mut(&mut cur_hash, i, vec![]);
|
||||
let entry_tick0 = next_entry_mut(&mut cur_hash, i + 1, vec![]);
|
||||
|
||||
let tx0 = SystemTransaction::new_account(
|
||||
&mint_keypair,
|
||||
bob_keypair.pubkey(),
|
||||
transfer_amount,
|
||||
cur_hash,
|
||||
0,
|
||||
);
|
||||
let entry_tick1 = next_entry_mut(&mut cur_hash, i + 1, vec![]);
|
||||
let entry1 = next_entry_mut(&mut cur_hash, i + num_transfers, vec![tx0]);
|
||||
let entry_tick2 = next_entry_mut(&mut cur_hash, i + 1, vec![]);
|
||||
|
||||
alice_ref_balance -= transfer_amount;
|
||||
|
||||
let entries = vec![entry0, entry_tick0, entry_tick1, entry1, entry_tick2];
|
||||
let blobs = entries.to_shared_blobs();
|
||||
index_blobs(&blobs, &mut blob_idx, &vec![0; blobs.len()]);
|
||||
blobs
|
||||
.iter()
|
||||
.for_each(|b| b.write().unwrap().meta.set_addr(&tvu_addr));
|
||||
msgs.extend(blobs.into_iter());
|
||||
}
|
||||
|
||||
// send the blobs into the socket
|
||||
s_responder.send(msgs).expect("send");
|
||||
drop(s_responder);
|
||||
|
||||
// receive retransmitted messages
|
||||
let timer = Duration::new(1, 0);
|
||||
while let Ok(_msg) = r_reader.recv_timeout(timer) {
|
||||
trace!("got msg");
|
||||
}
|
||||
|
||||
let alice_balance = bank.get_balance(&mint_keypair.pubkey());
|
||||
assert_eq!(alice_balance, alice_ref_balance);
|
||||
|
||||
let bob_balance = bank.get_balance(&bob_keypair.pubkey());
|
||||
assert_eq!(bob_balance, starting_balance - alice_ref_balance);
|
||||
|
||||
tvu.close().expect("close");
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
dr_l.join().expect("join");
|
||||
dr_2.join().expect("join");
|
||||
dr_1.join().expect("join");
|
||||
t_receiver.join().expect("join");
|
||||
t_responder.join().expect("join");
|
||||
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
||||
let _ignored = remove_dir_all(&blocktree_path);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,189 @@
|
|||
use log::trace;
|
||||
use solana::bank::Bank;
|
||||
use solana::blocktree::Blocktree;
|
||||
use solana::blocktree::{get_tmp_ledger_path, BlocktreeConfig};
|
||||
use solana::cluster_info::{ClusterInfo, Node};
|
||||
use solana::entry::next_entry_mut;
|
||||
use solana::entry::EntrySlice;
|
||||
use solana::genesis_block::GenesisBlock;
|
||||
use solana::gossip_service::GossipService;
|
||||
use solana::leader_scheduler::LeaderScheduler;
|
||||
use solana::leader_scheduler::LeaderSchedulerConfig;
|
||||
use solana::packet::index_blobs;
|
||||
use solana::service::Service;
|
||||
use solana::storage_stage::StorageState;
|
||||
use solana::storage_stage::STORAGE_ROTATE_TEST_COUNT;
|
||||
use solana::streamer;
|
||||
use solana::tvu::{Sockets, Tvu};
|
||||
use solana::voting_keypair::VotingKeypair;
|
||||
use solana_sdk::hash::Hash;
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||
use solana_sdk::system_transaction::SystemTransaction;
|
||||
use std::fs::remove_dir_all;
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::Duration;
|
||||
|
||||
fn new_gossip(
|
||||
cluster_info: Arc<RwLock<ClusterInfo>>,
|
||||
gossip: UdpSocket,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> GossipService {
|
||||
GossipService::new(&cluster_info, None, gossip, exit)
|
||||
}
|
||||
|
||||
/// Test that message sent from leader to target1 and replayed to target2
|
||||
#[test]
|
||||
fn test_replay() {
|
||||
solana_logger::setup();
|
||||
let leader = Node::new_localhost();
|
||||
let target1_keypair = Keypair::new();
|
||||
let target1 = Node::new_localhost_with_pubkey(target1_keypair.pubkey());
|
||||
let target2 = Node::new_localhost();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
|
||||
// start cluster_info_l
|
||||
let mut cluster_info_l = ClusterInfo::new(leader.info.clone());
|
||||
cluster_info_l.set_leader(leader.info.id);
|
||||
|
||||
let cref_l = Arc::new(RwLock::new(cluster_info_l));
|
||||
let dr_l = new_gossip(cref_l, leader.sockets.gossip, exit.clone());
|
||||
|
||||
// start cluster_info2
|
||||
let mut cluster_info2 = ClusterInfo::new(target2.info.clone());
|
||||
cluster_info2.insert_info(leader.info.clone());
|
||||
cluster_info2.set_leader(leader.info.id);
|
||||
let cref2 = Arc::new(RwLock::new(cluster_info2));
|
||||
let dr_2 = new_gossip(cref2, target2.sockets.gossip, exit.clone());
|
||||
|
||||
// setup some blob services to send blobs into the socket
|
||||
// to simulate the source peer and get blobs out of the socket to
|
||||
// simulate target peer
|
||||
let (s_reader, r_reader) = channel();
|
||||
let blob_sockets: Vec<Arc<UdpSocket>> = target2.sockets.tvu.into_iter().map(Arc::new).collect();
|
||||
|
||||
let t_receiver = streamer::blob_receiver(blob_sockets[0].clone(), exit.clone(), s_reader);
|
||||
|
||||
// simulate leader sending messages
|
||||
let (s_responder, r_responder) = channel();
|
||||
let t_responder = streamer::responder(
|
||||
"test_replay",
|
||||
Arc::new(leader.sockets.retransmit),
|
||||
r_responder,
|
||||
);
|
||||
|
||||
// TODO: Fix this test so it always works with the default
|
||||
// LeaderSchedulerConfig/BlocktreeConfig configuration
|
||||
let mut leader_scheduler_config = LeaderSchedulerConfig::default();
|
||||
leader_scheduler_config.ticks_per_slot = 64;
|
||||
let blocktree_config = BlocktreeConfig::new(leader_scheduler_config.ticks_per_slot);
|
||||
|
||||
let starting_balance = 10_000;
|
||||
let (genesis_block, mint_keypair) = GenesisBlock::new(starting_balance);
|
||||
let tvu_addr = target1.info.tvu;
|
||||
let bank = Arc::new(Bank::new(&genesis_block));
|
||||
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::new_with_bank(
|
||||
&leader_scheduler_config,
|
||||
&bank,
|
||||
)));
|
||||
assert_eq!(bank.get_balance(&mint_keypair.pubkey()), starting_balance);
|
||||
|
||||
// start cluster_info1
|
||||
let mut cluster_info1 = ClusterInfo::new(target1.info.clone());
|
||||
cluster_info1.insert_info(leader.info.clone());
|
||||
cluster_info1.set_leader(leader.info.id);
|
||||
let cref1 = Arc::new(RwLock::new(cluster_info1));
|
||||
let dr_1 = new_gossip(cref1.clone(), target1.sockets.gossip, exit.clone());
|
||||
|
||||
let mut cur_hash = Hash::default();
|
||||
let blocktree_path = get_tmp_ledger_path("test_replay");
|
||||
|
||||
let (blocktree, l_receiver) =
|
||||
Blocktree::open_with_config_signal(&blocktree_path, &blocktree_config)
|
||||
.expect("Expected to successfully open ledger");
|
||||
let vote_account_keypair = Arc::new(Keypair::new());
|
||||
let voting_keypair = VotingKeypair::new_local(&vote_account_keypair);
|
||||
let (sender, _) = channel();
|
||||
let tvu = Tvu::new(
|
||||
Some(Arc::new(voting_keypair)),
|
||||
&bank,
|
||||
0,
|
||||
0,
|
||||
cur_hash,
|
||||
&cref1,
|
||||
{
|
||||
Sockets {
|
||||
repair: target1.sockets.repair,
|
||||
retransmit: target1.sockets.retransmit,
|
||||
fetch: target1.sockets.tvu,
|
||||
}
|
||||
},
|
||||
Arc::new(blocktree),
|
||||
STORAGE_ROTATE_TEST_COUNT,
|
||||
&sender,
|
||||
&StorageState::default(),
|
||||
None,
|
||||
l_receiver,
|
||||
leader_scheduler,
|
||||
);
|
||||
|
||||
let mut alice_ref_balance = starting_balance;
|
||||
let mut msgs = Vec::new();
|
||||
let mut blob_idx = 0;
|
||||
let num_transfers = 10;
|
||||
let transfer_amount = 501;
|
||||
let bob_keypair = Keypair::new();
|
||||
for i in 0..num_transfers {
|
||||
let entry0 = next_entry_mut(&mut cur_hash, i, vec![]);
|
||||
let entry_tick0 = next_entry_mut(&mut cur_hash, i + 1, vec![]);
|
||||
|
||||
let tx0 = SystemTransaction::new_account(
|
||||
&mint_keypair,
|
||||
bob_keypair.pubkey(),
|
||||
transfer_amount,
|
||||
cur_hash,
|
||||
0,
|
||||
);
|
||||
let entry_tick1 = next_entry_mut(&mut cur_hash, i + 1, vec![]);
|
||||
let entry1 = next_entry_mut(&mut cur_hash, i + num_transfers, vec![tx0]);
|
||||
let entry_tick2 = next_entry_mut(&mut cur_hash, i + 1, vec![]);
|
||||
|
||||
alice_ref_balance -= transfer_amount;
|
||||
|
||||
let entries = vec![entry0, entry_tick0, entry_tick1, entry1, entry_tick2];
|
||||
let blobs = entries.to_shared_blobs();
|
||||
index_blobs(&blobs, &mut blob_idx, &vec![0; blobs.len()]);
|
||||
blobs
|
||||
.iter()
|
||||
.for_each(|b| b.write().unwrap().meta.set_addr(&tvu_addr));
|
||||
msgs.extend(blobs.into_iter());
|
||||
}
|
||||
|
||||
// send the blobs into the socket
|
||||
s_responder.send(msgs).expect("send");
|
||||
drop(s_responder);
|
||||
|
||||
// receive retransmitted messages
|
||||
let timer = Duration::new(1, 0);
|
||||
while let Ok(_msg) = r_reader.recv_timeout(timer) {
|
||||
trace!("got msg");
|
||||
}
|
||||
|
||||
let alice_balance = bank.get_balance(&mint_keypair.pubkey());
|
||||
assert_eq!(alice_balance, alice_ref_balance);
|
||||
|
||||
let bob_balance = bank.get_balance(&bob_keypair.pubkey());
|
||||
assert_eq!(bob_balance, starting_balance - alice_ref_balance);
|
||||
|
||||
tvu.close().expect("close");
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
dr_l.join().expect("join");
|
||||
dr_2.join().expect("join");
|
||||
dr_1.join().expect("join");
|
||||
t_receiver.join().expect("join");
|
||||
t_responder.join().expect("join");
|
||||
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
||||
let _ignored = remove_dir_all(&blocktree_path);
|
||||
}
|
Loading…
Reference in New Issue