Added integration test for transitioning leader to validator to see that tpu pipeline can exit and restart a tvu. Fixed Tpu and broadcast stage so that exiting later stages in the pipeline also causes earlier stages to exit.

This commit is contained in:
Carl 2018-09-14 14:34:32 -07:00 committed by Greg Fitzgerald
parent 6d27751365
commit bfe64f5f6e
8 changed files with 253 additions and 61 deletions

View File

@ -49,7 +49,10 @@ impl BankingStage {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => error!("{:?}", e),
Error::SendError => {
break;
}
_ => println!("BANKING ERROR {:?}", e),
}
}
}).unwrap();
@ -108,7 +111,10 @@ impl BankingStage {
debug!("process_transactions");
let results = bank.process_transactions(transactions);
let transactions = results.into_iter().filter_map(|x| x.ok()).collect();
signal_sender.send(Signal::Transactions(transactions))?;
match signal_sender.send(Signal::Transactions(transactions)) {
Err(_) => return Err(Error::SendError),
_ => (),
}
debug!("done process_transactions");
packet_recycler.recycle(msgs, "process_transactions");

View File

@ -12,7 +12,7 @@ use rayon::prelude::*;
use result::{Error, Result};
use service::Service;
use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError};
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
@ -148,6 +148,24 @@ fn broadcast(
Ok(())
}
// Implement a destructor for the BroadcastStage thread to signal it exited
// even on panics
struct Finalizer {
exit_sender: Arc<AtomicBool>,
}
impl Finalizer {
fn new(exit_sender: Arc<AtomicBool>) -> Self {
Finalizer { exit_sender }
}
}
// Implement a destructor for Finalizer.
impl Drop for Finalizer {
fn drop(&mut self) {
self.exit_sender.clone().store(true, Ordering::Relaxed);
}
}
pub struct BroadcastStage {
thread_hdl: JoinHandle<BroadcastStageReturnType>,
}
@ -216,6 +234,13 @@ impl BroadcastStage {
/// * `window` - Cache of blobs that we have broadcast
/// * `recycler` - Blob recycler.
/// * `receiver` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
/// * `exit_sender` - Set to true when this stage exits, allows rest of Tpu to exit cleanly. Otherwise,
/// when a Tpu stage closes, it only closes the stages that come after it. The stages
/// that come before could be blocked on a receive, and never notice that they need to
/// exit. Now, if any stage of the Tpu closes, it will lead to closing the WriteStage (b/c
/// WriteStage is the last stage in the pipeline), which will then close Broadcast stage,
/// which will then close FetchStage in the Tpu, and then the rest of the Tpu,
/// completing the cycle.
pub fn new(
sock: UdpSocket,
crdt: Arc<RwLock<Crdt>>,
@ -223,13 +248,17 @@ impl BroadcastStage {
entry_height: u64,
recycler: BlobRecycler,
receiver: Receiver<Vec<Entry>>,
exit_sender: Arc<AtomicBool>,
) -> Self {
let thread_hdl = Builder::new()
.name("solana-broadcaster".to_string())
.spawn(move || Self::run(&sock, &crdt, &window, entry_height, &recycler, &receiver))
.spawn(move || {
let _exit = Finalizer::new(exit_sender);
Self::run(&sock, &crdt, &window, entry_height, &recycler, &receiver)
})
.unwrap();
BroadcastStage { thread_hdl }
(BroadcastStage { thread_hdl })
}
}
@ -252,6 +281,7 @@ mod tests {
use service::Service;
use signature::{Keypair, KeypairUtil, Pubkey};
use std::cmp;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, RwLock};
use window::{new_window_from_entries, SharedWindow};
@ -293,7 +323,7 @@ mod tests {
let shared_window = Arc::new(RwLock::new(window));
let (entry_sender, entry_receiver) = channel();
let exit_sender = Arc::new(AtomicBool::new(false));
// Start up the broadcast stage
let broadcast_stage = BroadcastStage::new(
leader_info.sockets.broadcast,
@ -302,6 +332,7 @@ mod tests {
entry_height,
blob_recycler.clone(),
entry_receiver,
exit_sender,
);
(
@ -375,15 +406,13 @@ mod tests {
};
}
let highest_index = find_highest_window_index(&shared_window);
// TODO: 2 * LEADER_ROTATION_INTERVAL - 1 due to the same bug in
// index_blobs() as mentioned above
assert_eq!(highest_index, 2 * LEADER_ROTATION_INTERVAL - 1);
// Make sure the threads closed cleanly
assert_eq!(
broadcast_stage.join().unwrap(),
BroadcastStageReturnType::LeaderRotation
);
let highest_index = find_highest_window_index(&shared_window);
assert_eq!(highest_index, 2 * LEADER_ROTATION_INTERVAL - 1);
}
}

View File

@ -11,7 +11,7 @@ use packet::BlobRecycler;
use rpc::{JsonRpcService, RPC_PORT};
use rpu::Rpu;
use service::Service;
use signature::{Keypair, KeypairUtil};
use signature::{Keypair, KeypairUtil, Pubkey};
use std::net::UdpSocket;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering};
@ -44,6 +44,10 @@ impl LeaderServices {
self.broadcast_stage.join()?;
self.tpu.join()
}
pub fn exit(&self) -> () {
self.tpu.exit();
}
}
pub struct ValidatorServices {
@ -58,6 +62,10 @@ impl ValidatorServices {
pub fn join(self) -> Result<()> {
self.tvu.join()
}
pub fn exit(&self) -> () {
//TODO: implement exit for Tvu
}
}
pub enum FullnodeReturnType {
@ -298,8 +306,7 @@ impl Fullnode {
let tick_duration = None;
// TODO: To light up PoH, uncomment the following line:
//let tick_duration = Some(Duration::from_millis(1000));
let (tpu, entry_receiver) = Tpu::new(
let (tpu, entry_receiver, tpu_exit) = Tpu::new(
keypair.clone(),
&bank,
&crdt,
@ -310,7 +317,6 @@ impl Fullnode {
.map(|s| s.try_clone().expect("Failed to clone transaction sockets"))
.collect(),
&blob_recycler,
exit.clone(),
ledger_path,
sigverify_disabled,
entry_height,
@ -326,6 +332,7 @@ impl Fullnode {
entry_height,
blob_recycler.clone(),
entry_receiver,
tpu_exit,
);
let leader_state = LeaderServices::new(tpu, broadcast_stage);
node_role = Some(NodeRole::Leader(leader_state));
@ -370,6 +377,7 @@ impl Fullnode {
}
}
self.rpu.set_new_bank(self.bank.clone());
let tvu = Tvu::new(
self.keypair.clone(),
&self.bank,
@ -414,6 +422,12 @@ impl Fullnode {
//used for notifying many nodes in parallel to exit
pub fn exit(&self) {
self.exit.store(true, Ordering::Relaxed);
match self.node_role {
Some(NodeRole::Leader(ref leader_services)) => leader_services.exit(),
Some(NodeRole::Validator(ref validator_services)) => validator_services.exit(),
_ => (),
}
}
pub fn close(self) -> Result<(Option<FullnodeReturnType>)> {
@ -421,6 +435,13 @@ impl Fullnode {
self.join()
}
// TODO: only used for testing, get rid of this once we have actual
// leader scheduling
pub fn set_scheduled_leader(&self, leader_id: Pubkey, entry_height: u64) {
let mut wcrdt = self.crdt.write().unwrap();
wcrdt.set_scheduled_leader(entry_height, leader_id);
}
fn new_bank_from_ledger(ledger_path: &str) -> (Bank, u64, Vec<Entry>) {
let bank = Bank::new_default(false);
let entries = read_ledger(ledger_path, true).expect("opening ledger");

View File

@ -9,7 +9,7 @@ use result::{Error, Result};
use service::Service;
use std::net::SocketAddr;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Instant;
use streamer::{self, BlobReceiver, BlobSender};
@ -17,7 +17,7 @@ use timing;
pub struct RequestStage {
thread_hdl: JoinHandle<()>,
pub request_processor: Arc<RequestProcessor>,
pub request_processor: Arc<RwLock<RequestProcessor>>,
}
impl RequestStage {
@ -78,19 +78,18 @@ impl RequestStage {
Ok(())
}
pub fn new(
request_processor: RequestProcessor,
request_processor: Arc<RwLock<RequestProcessor>>,
packet_receiver: Receiver<SharedPackets>,
packet_recycler: PacketRecycler,
blob_recycler: BlobRecycler,
) -> (Self, BlobReceiver) {
let request_processor = Arc::new(request_processor);
let request_processor_ = request_processor.clone();
let (blob_sender, blob_receiver) = channel();
let thread_hdl = Builder::new()
.name("solana-request-stage".to_string())
.spawn(move || loop {
if let Err(e) = Self::process_request_packets(
&request_processor_,
&request_processor_.read().unwrap(),
&packet_receiver,
&blob_sender,
&packet_recycler,

View File

@ -28,16 +28,18 @@ use packet::{BlobRecycler, PacketRecycler};
use request_processor::RequestProcessor;
use request_stage::RequestStage;
use service::Service;
use std::mem;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
use streamer;
pub struct Rpu {
request_stage: RequestStage,
thread_hdls: Vec<JoinHandle<()>>,
request_processor: Arc<RwLock<RequestProcessor>>,
}
impl Rpu {
@ -58,9 +60,9 @@ impl Rpu {
packet_sender,
);
let request_processor = RequestProcessor::new(bank.clone());
let request_processor = Arc::new(RwLock::new(RequestProcessor::new(bank.clone())));
let (request_stage, blob_receiver) = RequestStage::new(
request_processor,
request_processor.clone(),
packet_receiver,
packet_recycler.clone(),
blob_recycler.clone(),
@ -74,11 +76,18 @@ impl Rpu {
);
let thread_hdls = vec![t_receiver, t_responder];
Rpu {
thread_hdls,
request_stage,
request_processor,
}
}
pub fn set_new_bank(&self, new_bank: Arc<Bank>) {
let mut w_request_procesor = self.request_processor.write().unwrap();
mem::replace(&mut *w_request_procesor, RequestProcessor::new(new_bank));
}
}
impl Service for Rpu {

View File

@ -65,10 +65,15 @@ impl SigVerifyStage {
);
let verified_batch = Self::verify_batch(batch, sigverify_disabled);
sendr
match sendr
.lock()
.expect("lock in fn verify_batch in tpu")
.send(verified_batch)?;
.send(verified_batch)
{
Err(_) => return Err(Error::SendError),
_ => (),
}
let total_time_ms = timing::duration_as_ms(&now.elapsed());
let total_time_s = timing::duration_as_s(&now.elapsed());
@ -105,6 +110,9 @@ impl SigVerifyStage {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::SendError => {
break;
}
_ => error!("{:?}", e),
}
}

View File

@ -36,7 +36,7 @@ use service::Service;
use signature::Keypair;
use sigverify_stage::SigVerifyStage;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::Receiver;
use std::sync::{Arc, RwLock};
use std::thread;
@ -53,6 +53,7 @@ pub struct Tpu {
banking_stage: BankingStage,
record_stage: RecordStage,
write_stage: WriteStage,
exit: Arc<AtomicBool>,
}
impl Tpu {
@ -63,16 +64,16 @@ impl Tpu {
tick_duration: Option<Duration>,
transactions_sockets: Vec<UdpSocket>,
blob_recycler: &BlobRecycler,
exit: Arc<AtomicBool>,
ledger_path: &str,
sigverify_disabled: bool,
entry_height: u64,
) -> (Self, Receiver<Vec<Entry>>) {
) -> (Self, Receiver<Vec<Entry>>, Arc<AtomicBool>) {
let exit = Arc::new(AtomicBool::new(false));
let mut packet_recycler = PacketRecycler::default();
packet_recycler.set_name("tpu::Packet");
let (fetch_stage, packet_receiver) =
FetchStage::new(transactions_sockets, exit, &packet_recycler);
FetchStage::new(transactions_sockets, exit.clone(), &packet_recycler);
let (sigverify_stage, verified_receiver) =
SigVerifyStage::new(packet_receiver, sigverify_disabled);
@ -103,8 +104,13 @@ impl Tpu {
banking_stage,
record_stage,
write_stage,
exit: exit.clone(),
};
(tpu, entry_forwarder)
(tpu, entry_forwarder, exit)
}
pub fn exit(&self) -> () {
self.exit.store(true, Ordering::Relaxed);
}
pub fn close(self) -> thread::Result<Option<TpuReturnType>> {

View File

@ -5,9 +5,9 @@ extern crate chrono;
extern crate serde_json;
extern crate solana;
use solana::crdt::{Crdt, Node, NodeInfo};
use solana::crdt::{Crdt, Node, NodeInfo, LEADER_ROTATION_INTERVAL};
use solana::entry::Entry;
use solana::fullnode::Fullnode;
use solana::fullnode::{Fullnode, FullnodeReturnType};
use solana::hash::Hash;
use solana::ledger::LedgerWriter;
use solana::logger;
@ -30,28 +30,34 @@ use std::thread::sleep;
use std::thread::Builder;
use std::time::{Duration, Instant};
fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
//lets spy on the network
fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc<RwLock<Crdt>>, Pubkey) {
let exit = Arc::new(AtomicBool::new(false));
let mut spy = Node::new_localhost();
let daddr = "0.0.0.0:0".parse().unwrap();
let me = spy.info.id.clone();
spy.info.contact_info.tvu = daddr;
spy.info.contact_info.rpu = daddr;
spy.info.contact_info.tvu = spy.sockets.replicate[0].local_addr().unwrap();
spy.info.contact_info.rpu = spy.sockets.transaction[0].local_addr().unwrap();
let mut spy_crdt = Crdt::new(spy.info).expect("Crdt::new");
spy_crdt.insert(&leader);
spy_crdt.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_crdt));
let spy_crdt_ref = Arc::new(RwLock::new(spy_crdt));
let spy_window = Arc::new(RwLock::new(default_window()));
let recycler = BlobRecycler::default();
let ncp = Ncp::new(
&spy_ref,
&spy_crdt_ref,
spy_window,
recycler,
None,
spy.sockets.gossip,
exit.clone(),
);
(ncp, spy_crdt_ref, me)
}
fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
//lets spy on the network
let (ncp, spy_ref, me) = make_spy_node(leader);
//wait for the network to converge
let mut converged = false;
let mut rv = vec![];
@ -85,15 +91,16 @@ fn tmp_ledger_path(name: &str) -> String {
format!("/tmp/tmp-ledger-{}-{}", name, keypair.pubkey())
}
fn genesis(name: &str, num: i64) -> (Mint, String) {
fn genesis(name: &str, num: i64) -> (Mint, String, Vec<Entry>) {
let mint = Mint::new(num);
let path = tmp_ledger_path(name);
let mut writer = LedgerWriter::open(&path, true).unwrap();
writer.write_entries(mint.create_entries()).unwrap();
let entries = mint.create_entries();
writer.write_entries(entries.clone()).unwrap();
(mint, path)
(mint, path, entries)
}
fn tmp_copy_ledger(from: &str, name: &str) -> String {
@ -131,7 +138,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
let bob_pubkey = Keypair::new().pubkey();
let mut ledger_paths = Vec::new();
let (alice, leader_ledger_path) = genesis("multi_node_ledger_window", 10_000);
let (alice, leader_ledger_path, _) = genesis("multi_node_ledger_window", 10_000);
ledger_paths.push(leader_ledger_path.clone());
// make a copy at zero
@ -151,7 +158,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
// Send leader some tokens to vote
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &leader_pubkey, None).unwrap();
send_tx_and_retry_get_balance(&leader_data, &alice, &leader_pubkey, 500, None).unwrap();
info!("leader balance {}", leader_balance);
// start up another validator from zero, converge and then check
@ -173,7 +180,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
// another transaction with leader
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, None).unwrap();
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, None).unwrap();
info!("bob balance on leader {}", leader_balance);
assert_eq!(leader_balance, 500);
@ -211,7 +218,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
let bob_pubkey = Keypair::new().pubkey();
let mut ledger_paths = Vec::new();
let (alice, leader_ledger_path) = genesis("multi_node_validator_catchup_from_zero", 10_000);
let (alice, leader_ledger_path, _) = genesis("multi_node_validator_catchup_from_zero", 10_000);
ledger_paths.push(leader_ledger_path.clone());
let zero_ledger_path = tmp_copy_ledger(
@ -224,7 +231,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
// Send leader some tokens to vote
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &leader_pubkey, None).unwrap();
send_tx_and_retry_get_balance(&leader_data, &alice, &leader_pubkey, 500, None).unwrap();
info!("leader balance {}", leader_balance);
let mut nodes = vec![server];
@ -251,7 +258,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
assert_eq!(servers.len(), N + 1);
//verify leader can do transfer
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, None).unwrap();
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, None).unwrap();
assert_eq!(leader_balance, 500);
//verify validator has the same balance
let mut success = 0usize;
@ -284,7 +291,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
let servers = converge(&leader_data, N + 2);
let mut leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, None).unwrap();
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, None).unwrap();
info!("leader balance {}", leader_balance);
loop {
let mut client = mk_client(&leader_data);
@ -335,13 +342,13 @@ fn test_multi_node_basic() {
let bob_pubkey = Keypair::new().pubkey();
let mut ledger_paths = Vec::new();
let (alice, leader_ledger_path) = genesis("multi_node_basic", 10_000);
let (alice, leader_ledger_path, _) = genesis("multi_node_basic", 10_000);
ledger_paths.push(leader_ledger_path.clone());
let server = Fullnode::new(leader, &leader_ledger_path, leader_keypair, None, false);
// Send leader some tokens to vote
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &leader_pubkey, None).unwrap();
send_tx_and_retry_get_balance(&leader_data, &alice, &leader_pubkey, 500, None).unwrap();
info!("leader balance {}", leader_balance);
let mut nodes = vec![server];
@ -364,7 +371,7 @@ fn test_multi_node_basic() {
assert_eq!(servers.len(), N + 1);
//verify leader can do transfer
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, None).unwrap();
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, None).unwrap();
assert_eq!(leader_balance, 500);
//verify validator has the same balance
let mut success = 0usize;
@ -393,17 +400,17 @@ fn test_boot_validator_from_file() -> result::Result<()> {
let leader_keypair = Keypair::new();
let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let bob_pubkey = Keypair::new().pubkey();
let (alice, leader_ledger_path) = genesis("boot_validator_from_file", 100_000);
let (alice, leader_ledger_path, _) = genesis("boot_validator_from_file", 100_000);
let mut ledger_paths = Vec::new();
ledger_paths.push(leader_ledger_path.clone());
let leader_data = leader.info.clone();
let leader_fullnode = Fullnode::new(leader, &leader_ledger_path, leader_keypair, None, false);
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap();
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(500)).unwrap();
assert_eq!(leader_balance, 500);
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(1000)).unwrap();
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(1000)).unwrap();
assert_eq!(leader_balance, 1000);
let keypair = Keypair::new();
@ -446,7 +453,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
// ledger (currently up to WINDOW_SIZE entries)
logger::setup();
let (alice, ledger_path) = genesis(
let (alice, ledger_path, _) = genesis(
"leader_restart_validator_start_from_old_ledger",
100_000 + 500 * solana::window_service::MAX_REPAIR_BACKOFF as i64,
);
@ -456,7 +463,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
// lengthen the ledger
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap();
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(500)).unwrap();
assert_eq!(leader_balance, 500);
// create a "stale" ledger by copying current ledger
@ -471,7 +478,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
// lengthen the ledger
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(1000)).unwrap();
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(1000)).unwrap();
assert_eq!(leader_balance, 1000);
// restart the leader
@ -498,7 +505,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
let mut client = mk_client(&validator_data);
for _ in 0..solana::window_service::MAX_REPAIR_BACKOFF {
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(expected))
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(expected))
.unwrap();
assert_eq!(leader_balance, expected);
@ -538,7 +545,7 @@ fn test_multi_node_dynamic_network() {
let leader_pubkey = leader_keypair.pubkey().clone();
let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let bob_pubkey = Keypair::new().pubkey();
let (alice, leader_ledger_path) = genesis("multi_node_dynamic_network", 10_000_000);
let (alice, leader_ledger_path, _) = genesis("multi_node_dynamic_network", 10_000_000);
let mut ledger_paths = Vec::new();
ledger_paths.push(leader_ledger_path.clone());
@ -553,6 +560,7 @@ fn test_multi_node_dynamic_network() {
&leader_data,
&alice_arc.read().unwrap(),
&leader_pubkey,
500,
None,
).unwrap();
info!("leader balance {}", leader_balance);
@ -710,6 +718,111 @@ fn test_multi_node_dynamic_network() {
}
}
#[test]
fn test_leader_to_validator_transition() {
logger::setup();
// Make a dummy address to be the sink for this test's mock transactions
let bob_pubkey = Keypair::new().pubkey();
// Initialize the leader ledger. Make a mint and a genesis entry
// in the leader ledger
let (mint, leader_ledger_path, entries) = genesis(
"test_leader_to_validator_transition",
(3 * LEADER_ROTATION_INTERVAL) as i64,
);
let genesis_height = entries.len() as u64;
let mut ledger_paths = Vec::new();
ledger_paths.push(leader_ledger_path.clone());
// Start the leader node
let leader_keypair = Keypair::new();
let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_info = leader_node.info.clone();
let mut leader = Fullnode::new(
leader_node,
&leader_ledger_path,
leader_keypair,
None,
false,
);
// Set the next leader to be Bob
leader.set_scheduled_leader(bob_pubkey, LEADER_ROTATION_INTERVAL);
// Make an extra node for our leader to broadcast to,
// who won't vote and mess with our leader's entry count
let (ncp, spy_node, me) = make_spy_node(&leader_info);
// Wait for the leader to see the spy node
let mut converged = false;
for _ in 0..30 {
let num = spy_node.read().unwrap().convergence();
let mut v: Vec<NodeInfo> = spy_node
.read()
.unwrap()
.table
.values()
.into_iter()
.filter(|x| x.id != me)
.filter(|x| Crdt::is_valid_address(&x.contact_info.rpu))
.cloned()
.collect();
// There's only one person excluding the spy node (the leader) who should see
// two nodes on the network
if num >= 2 as u64 && v.len() >= 1 {
converged = true;
break;
}
sleep(Duration::new(1, 0));
}
assert!(converged);
let extra_transactions = std::cmp::max(LEADER_ROTATION_INTERVAL / 4, 1);
// Push leader "extra_transactions" past LEADER_ROTATION_INTERVAL entry height,
// make sure the leader stops.
assert!(genesis_height < LEADER_ROTATION_INTERVAL);
for i in genesis_height..(LEADER_ROTATION_INTERVAL + extra_transactions) {
let expected_balance = std::cmp::min(
LEADER_ROTATION_INTERVAL - genesis_height,
i - genesis_height);
send_tx_and_retry_get_balance(
&leader_info,
&mint,
&bob_pubkey,
1,
Some(expected_balance as i64),
);
}
// Wait for leader to shut down tpu and restart tvu
match leader.handle_role_transition().unwrap() {
Some(FullnodeReturnType::LeaderRotation) => (),
_ => panic!("Expected reason for exit to be leader rotation"),
}
// Query now validator to make sure that he has the proper balances in his bank
// after the transition, even though we submitted "extra_transactions"
// transactions earlier
let mut leader_client = mk_client(&leader_info);
let expected_bal = LEADER_ROTATION_INTERVAL - genesis_height;
let bal = leader_client
.poll_get_balance(&bob_pubkey)
.expect("Expected success when polling newly transitioned validator for balance")
as u64;
assert_eq!(bal, expected_bal);
// Shut down
ncp.close().unwrap();
leader.close().unwrap();
}
fn mk_client(leader: &NodeInfo) -> ThinClient {
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
requests_socket
@ -751,6 +864,7 @@ fn send_tx_and_retry_get_balance(
leader: &NodeInfo,
alice: &Mint,
bob_pubkey: &Pubkey,
transfer_amount: i64,
expected: Option<i64>,
) -> Option<i64> {
let mut client = mk_client(leader);
@ -758,7 +872,7 @@ fn send_tx_and_retry_get_balance(
let last_id = client.get_last_id();
info!("executing leader transfer");
let _sig = client
.transfer(500, &alice.keypair(), *bob_pubkey, &last_id)
.transfer(transfer_amount, &alice.keypair(), *bob_pubkey, &last_id)
.unwrap();
retry_get_balance(&mut client, bob_pubkey, expected)
}