Use PohRecorder to synchronize instead of rotate. (#3080)

This commit is contained in:
anatoly yakovenko 2019-03-03 16:44:06 -08:00 committed by GitHub
parent 2ec9bc9f05
commit 1654199b23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 486 additions and 1068 deletions

View File

@ -4,11 +4,11 @@ extern crate test;
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use solana::banking_stage::BankingStage;
use solana::entry::Entry;
use solana::banking_stage::{create_test_recorder, BankingStage};
use solana::cluster_info::ClusterInfo;
use solana::cluster_info::Node;
use solana::packet::to_packets_chunked;
use solana::poh_recorder::PohRecorder;
use solana::poh_service::{PohService, PohServiceConfig};
use solana::poh_recorder::WorkingBankEntries;
use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::hash;
@ -17,17 +17,16 @@ use solana_sdk::signature::{KeypairUtil, Signature};
use solana_sdk::system_transaction::SystemTransaction;
use solana_sdk::timing::{DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES};
use std::iter;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use test::Bencher;
fn check_txs(receiver: &Receiver<Vec<(Entry, u64)>>, ref_tx_count: usize) {
fn check_txs(receiver: &Receiver<WorkingBankEntries>, ref_tx_count: usize) {
let mut total = 0;
loop {
let entries = receiver.recv_timeout(Duration::new(1, 0));
if let Ok(entries) = entries {
if let Ok((_, entries)) = entries {
for (entry, _) in &entries {
total += entry.transactions.len();
}
@ -41,20 +40,6 @@ fn check_txs(receiver: &Receiver<Vec<(Entry, u64)>>, ref_tx_count: usize) {
assert_eq!(total, ref_tx_count);
}
fn create_test_recorder(bank: &Arc<Bank>) -> (Arc<Mutex<PohRecorder>>, PohService) {
let exit = Arc::new(AtomicBool::new(false));
let poh_recorder = Arc::new(Mutex::new(PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
)));
let poh_service = PohService::new(
poh_recorder.clone(),
&PohServiceConfig::default(),
exit.clone(),
);
(poh_recorder, poh_service)
}
#[bench]
#[ignore]
fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
@ -117,9 +102,11 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
(x, iter::repeat(1).take(len).collect())
})
.collect();
let (poh_recorder, poh_service) = create_test_recorder(&bank);
let (_stage, signal_receiver) =
BankingStage::new(&bank, &poh_recorder, verified_receiver, std::u64::MAX);
let (poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
poh_recorder.lock().unwrap().set_bank(&bank);
let mut id = genesis_block.hash();
for _ in 0..(MAX_RECENT_BLOCKHASHES * DEFAULT_TICKS_PER_SLOT as usize) {
@ -221,9 +208,11 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
(x, iter::repeat(1).take(len).collect())
})
.collect();
let (poh_recorder, poh_service) = create_test_recorder(&bank);
let (_stage, signal_receiver) =
BankingStage::new(&bank, &poh_recorder, verified_receiver, std::u64::MAX);
let (poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
poh_recorder.lock().unwrap().set_bank(&bank);
let mut id = genesis_block.hash();
for _ in 0..(MAX_RECENT_BLOCKHASHES * DEFAULT_TICKS_PER_SLOT as usize) {

View File

@ -39,7 +39,11 @@ impl BankForks {
frozen_banks.into_iter().map(|b| (b.slot(), b)).collect()
}
pub fn active_banks(&self) -> Vec<u64> {
self.banks.iter().map(|(k, _v)| *k).collect()
self.banks
.iter()
.filter(|(_, v)| !v.is_frozen())
.map(|(k, _v)| *k)
.collect()
}
pub fn get(&self, bank_id: u64) -> Option<&Arc<Bank>> {
self.banks.get(&bank_id)
@ -60,7 +64,9 @@ impl BankForks {
// TODO: use the bank's own ID instead of receiving a parameter?
pub fn insert(&mut self, bank_id: u64, bank: Bank) {
let mut bank = Arc::new(bank);
self.banks.insert(bank_id, bank.clone());
assert_eq!(bank_id, bank.slot());
let prev = self.banks.insert(bank_id, bank.clone());
assert!(prev.is_none());
if bank_id > self.working_bank.slot() {
self.working_bank = bank.clone()
@ -70,7 +76,9 @@ impl BankForks {
// parent if we're always calling insert()
// when we construct a child bank
while let Some(parent) = bank.parent() {
self.banks.remove(&parent.slot());
if let Some(prev) = self.banks.remove(&parent.slot()) {
assert!(Arc::ptr_eq(&prev, &parent));
}
bank = parent;
}
}

View File

@ -2,11 +2,13 @@
//! to contruct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU.
use crate::cluster_info::ClusterInfo;
use crate::entry::Entry;
use crate::leader_confirmation_service::LeaderConfirmationService;
use crate::packet::Packets;
use crate::packet::SharedPackets;
use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBank};
use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntries};
use crate::poh_service::{PohService, PohServiceConfig};
use crate::result::{Error, Result};
use crate::service::Service;
use crate::sigverify_stage::VerifiedPackets;
@ -15,9 +17,10 @@ use solana_metrics::counter::Counter;
use solana_runtime::bank::{self, Bank, BankError};
use solana_sdk::timing::{self, duration_as_us, MAX_RECENT_BLOCKHASHES};
use solana_sdk::transaction::Transaction;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Receiver, RecvTimeoutError};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use std::time::Instant;
@ -30,37 +33,18 @@ pub const NUM_THREADS: u32 = 10;
/// Stores the stage's thread handle and output receiver.
pub struct BankingStage {
bank_thread_hdls: Vec<JoinHandle<UnprocessedPackets>>,
exit: Arc<AtomicBool>,
leader_confirmation_service: LeaderConfirmationService,
bank_thread_hdls: Vec<JoinHandle<()>>,
}
impl BankingStage {
/// Create the stage using `bank`. Exit when `verified_receiver` is dropped.
#[allow(clippy::new_ret_no_self)]
pub fn new(
bank: &Arc<Bank>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: Receiver<VerifiedPackets>,
max_tick_height: u64,
) -> (Self, Receiver<Vec<(Entry, u64)>>) {
let (entry_sender, entry_receiver) = channel();
let working_bank = WorkingBank {
bank: bank.clone(),
sender: entry_sender,
min_tick_height: bank.tick_height(),
max_tick_height,
};
info!(
"new working bank {} {} {}",
working_bank.min_tick_height,
working_bank.max_tick_height,
poh_recorder.lock().unwrap().poh.tick_height
);
poh_recorder.lock().unwrap().set_working_bank(working_bank);
let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver));
) -> Self {
let verified_receiver = Arc::new(Mutex::new(verified_receiver));
// Single thread to generate entries from many banks.
// This thread talks to poh_service and broadcasts the entries once they have been recorded.
@ -68,47 +52,60 @@ impl BankingStage {
let exit = Arc::new(AtomicBool::new(false));
// Single thread to compute confirmation
let leader_confirmation_service = LeaderConfirmationService::new(&bank, exit.clone());
let lcs_handle = LeaderConfirmationService::start(&poh_recorder, exit.clone());
// Many banks that process transactions in parallel.
let bank_thread_hdls: Vec<JoinHandle<UnprocessedPackets>> = (0..Self::num_threads())
let mut bank_thread_hdls: Vec<JoinHandle<()>> = (0..Self::num_threads())
.map(|_| {
let thread_verified_receiver = shared_verified_receiver.clone();
let thread_poh_recorder = poh_recorder.clone();
let thread_bank = bank.clone();
let verified_receiver = verified_receiver.clone();
let poh_recorder = poh_recorder.clone();
let cluster_info = cluster_info.clone();
let exit = exit.clone();
Builder::new()
.name("solana-banking-stage-tx".to_string())
.spawn(move || {
let mut unprocessed_packets: UnprocessedPackets = vec![];
loop {
match Self::process_packets(
&thread_bank,
&thread_verified_receiver,
&thread_poh_recorder,
) {
Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
Ok(more_unprocessed_packets) => {
unprocessed_packets.extend(more_unprocessed_packets);
}
Err(err) => {
debug!("solana-banking-stage-tx: exit due to {:?}", err);
break;
}
}
}
unprocessed_packets
Self::process_loop(&verified_receiver, &poh_recorder, &cluster_info);
exit.store(true, Ordering::Relaxed);
})
.unwrap()
})
.collect();
(
Self {
bank_thread_hdls,
exit,
leader_confirmation_service,
},
entry_receiver,
)
bank_thread_hdls.push(lcs_handle);
Self { bank_thread_hdls }
}
fn forward_unprocessed_packets(
tpu: &std::net::SocketAddr,
unprocessed_packets: UnprocessedPackets,
) -> std::io::Result<()> {
let socket = UdpSocket::bind("0.0.0.0:0")?;
for (packets, start_index) in unprocessed_packets {
let packets = packets.read().unwrap();
for packet in packets.packets.iter().skip(start_index) {
socket.send_to(&packet.data[..packet.meta.size], tpu)?;
}
}
Ok(())
}
pub fn process_loop(
verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
) {
loop {
match Self::process_packets(&verified_receiver, &poh_recorder) {
Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
Ok(unprocessed_packets) => {
if let Some(leader) = cluster_info.read().unwrap().leader_data() {
let _ = Self::forward_unprocessed_packets(&leader.tpu, unprocessed_packets);
}
}
Err(err) => {
debug!("solana-banking-stage-tx: exit due to {:?}", err);
break;
}
}
}
}
pub fn num_threads() -> u32 {
@ -190,7 +187,8 @@ impl BankingStage {
bank.unlock_accounts(&txs, &results);
let unlock_time = now.elapsed();
debug!(
"lock: {}us load_execute: {}us record: {}us commit: {}us unlock: {}us txs_len: {}",
"bank: {} lock: {}us load_execute: {}us record: {}us commit: {}us unlock: {}us txs_len: {}",
bank.slot(),
duration_as_us(&lock_time),
duration_as_us(&load_execute_time),
duration_as_us(&record_time),
@ -221,7 +219,11 @@ impl BankingStage {
);
trace!("process_transcations: {:?}", result);
if let Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) = result {
info!("process transactions: max height reached");
info!(
"process transactions: max height reached slot: {} height: {}",
bank.slot(),
bank.tick_height()
);
break;
}
result?;
@ -232,7 +234,6 @@ impl BankingStage {
/// Process the incoming packets
pub fn process_packets(
bank: &Bank,
verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
poh: &Arc<Mutex<PohRecorder>>,
) -> Result<UnprocessedPackets> {
@ -241,6 +242,7 @@ impl BankingStage {
.lock()
.unwrap()
.recv_timeout(Duration::from_millis(100))?;
let mut reqs_len = 0;
let mms_len = mms.len();
info!(
@ -262,10 +264,22 @@ impl BankingStage {
continue;
}
let bank = poh.lock().unwrap().bank();
if bank.is_none() {
unprocessed_packets.push((msgs, 0));
continue;
}
let bank = bank.unwrap();
debug!("banking-stage-tx bank {}", bank.slot());
let transactions = Self::deserialize_transactions(&msgs.read().unwrap());
reqs_len += transactions.len();
debug!("transactions received {}", transactions.len());
debug!(
"bank: {} transactions received {}",
bank.slot(),
transactions.len()
);
let (verified_transactions, verified_transaction_index): (Vec<_>, Vec<_>) =
transactions
.into_iter()
@ -283,9 +297,13 @@ impl BankingStage {
})
.unzip();
debug!("verified transactions {}", verified_transactions.len());
debug!(
"bank: {} verified transactions {}",
bank.slot(),
verified_transactions.len()
);
let processed = Self::process_transactions(bank, &verified_transactions, poh)?;
let processed = Self::process_transactions(&bank, &verified_transactions, poh)?;
if processed < verified_transactions.len() {
bank_shutdown = true;
// Collect any unprocessed transactions in this batch for forwarding
@ -313,19 +331,6 @@ impl BankingStage {
Ok(unprocessed_packets)
}
pub fn join_and_collect_unprocessed_packets(&mut self) -> UnprocessedPackets {
let mut unprocessed_packets: UnprocessedPackets = vec![];
for bank_thread_hdl in self.bank_thread_hdls.drain(..) {
match bank_thread_hdl.join() {
Ok(more_unprocessed_packets) => {
unprocessed_packets.extend(more_unprocessed_packets)
}
err => warn!("bank_thread_hdl join failed: {:?}", err),
}
}
unprocessed_packets
}
}
impl Service for BankingStage {
@ -335,51 +340,52 @@ impl Service for BankingStage {
for bank_thread_hdl in self.bank_thread_hdls {
bank_thread_hdl.join()?;
}
self.exit.store(true, Ordering::Relaxed);
self.leader_confirmation_service.join()?;
Ok(())
}
}
pub fn create_test_recorder(
bank: &Arc<Bank>,
) -> (
Arc<Mutex<PohRecorder>>,
PohService,
Receiver<WorkingBankEntries>,
) {
let exit = Arc::new(AtomicBool::new(false));
let (poh_recorder, entry_receiver) =
PohRecorder::new(bank.tick_height(), bank.last_blockhash());
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_service = PohService::new(
poh_recorder.clone(),
&PohServiceConfig::default(),
exit.clone(),
);
(poh_recorder, poh_service, entry_receiver)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cluster_info::Node;
use crate::entry::EntrySlice;
use crate::packet::to_packets;
use crate::poh_service::{PohService, PohServiceConfig};
use crate::poh_recorder::WorkingBank;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::native_program::ProgramError;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction::SystemTransaction;
use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT;
use std::sync::mpsc::channel;
use std::thread::sleep;
fn create_test_recorder(bank: &Arc<Bank>) -> (Arc<Mutex<PohRecorder>>, PohService) {
let exit = Arc::new(AtomicBool::new(false));
let poh_recorder = Arc::new(Mutex::new(PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
)));
let poh_service = PohService::new(
poh_recorder.clone(),
&PohServiceConfig::default(),
exit.clone(),
);
(poh_recorder, poh_service)
}
#[test]
fn test_banking_stage_shutdown1() {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let (verified_sender, verified_receiver) = channel();
let (poh_recorder, poh_service) = create_test_recorder(&bank);
let (banking_stage, _entry_receiver) = BankingStage::new(
&bank,
&poh_recorder,
verified_receiver,
DEFAULT_TICKS_PER_SLOT,
);
let (poh_recorder, poh_service, _entry_receiever) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
drop(verified_sender);
banking_stage.join().unwrap();
poh_service.close().unwrap();
@ -387,30 +393,33 @@ mod tests {
#[test]
fn test_banking_stage_tick() {
solana_logger::setup();
let (mut genesis_block, _mint_keypair) = GenesisBlock::new(2);
genesis_block.ticks_per_slot = 4;
let bank = Arc::new(Bank::new(&genesis_block));
let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = channel();
let (poh_recorder, poh_service) = create_test_recorder(&bank);
let (banking_stage, entry_receiver) = BankingStage::new(
&bank,
&poh_recorder,
verified_receiver,
genesis_block.ticks_per_slot - 1,
);
let (poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
poh_recorder.lock().unwrap().set_bank(&bank);
let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
trace!("sending bank");
sleep(Duration::from_millis(600));
drop(verified_sender);
poh_service.close().unwrap();
drop(poh_recorder);
trace!("getting entries");
let entries: Vec<_> = entry_receiver
.iter()
.flat_map(|x| x.into_iter().map(|e| e.0))
.flat_map(|x| x.1.into_iter().map(|e| e.0))
.collect();
trace!("done");
assert_eq!(entries.len(), genesis_block.ticks_per_slot as usize - 1);
assert!(entries.verify(&start_hash));
assert_eq!(entries[entries.len() - 1].hash, bank.last_blockhash());
banking_stage.join().unwrap();
poh_service.close().unwrap();
}
#[test]
@ -419,13 +428,11 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_block));
let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = channel();
let (poh_recorder, poh_service) = create_test_recorder(&bank);
let (banking_stage, entry_receiver) = BankingStage::new(
&bank,
&poh_recorder,
verified_receiver,
DEFAULT_TICKS_PER_SLOT,
);
let (poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
poh_recorder.lock().unwrap().set_bank(&bank);
let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
// good tx
let keypair = mint_keypair;
@ -449,11 +456,13 @@ mod tests {
.unwrap();
drop(verified_sender);
poh_service.close().expect("close");
drop(poh_recorder);
//receive entries + ticks
let entries: Vec<Vec<Entry>> = entry_receiver
.iter()
.map(|x| x.into_iter().map(|e| e.0).collect())
.map(|x| x.1.into_iter().map(|e| e.0).collect())
.collect();
assert!(entries.len() >= 1);
@ -466,10 +475,9 @@ mod tests {
});
drop(entry_receiver);
banking_stage.join().unwrap();
poh_service.close().unwrap();
}
#[test]
#[ignore] //flaky
fn test_banking_stage_entryfication() {
// In this attack we'll demonstrate that a verifier can interpret the ledger
// differently if either the server doesn't signal the ledger to add an
@ -477,13 +485,11 @@ mod tests {
let (genesis_block, mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let (verified_sender, verified_receiver) = channel();
let (poh_recorder, poh_service) = create_test_recorder(&bank);
let (banking_stage, entry_receiver) = BankingStage::new(
&bank,
&poh_recorder,
verified_receiver,
DEFAULT_TICKS_PER_SLOT,
);
let (poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
poh_recorder.lock().unwrap().set_bank(&bank);
let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
// Process a batch that includes a transaction that receives two tokens.
let alice = Keypair::new();
@ -512,13 +518,15 @@ mod tests {
verified_sender
.send(vec![(packets[0].clone(), vec![1u8])])
.unwrap();
drop(verified_sender);
banking_stage.join().unwrap();
poh_service.close().expect("close");;
drop(poh_recorder);
// Collect the ledger and feed it to a new bank.
let entries: Vec<_> = entry_receiver
.iter()
.flat_map(|x| x.into_iter().map(|e| e.0))
.flat_map(|x| x.1.into_iter().map(|e| e.0))
.collect();
// same assertion as running through the bank, really...
assert!(entries.len() >= 2);
@ -533,94 +541,22 @@ mod tests {
.for_each(|x| assert_eq!(*x, Ok(())));
}
assert_eq!(bank.get_balance(&alice.pubkey()), 1);
poh_service.close().unwrap();
}
// Test that when the max_tick_height is reached, the banking stage exits
#[test]
fn test_max_tick_height_shutdown() {
solana_logger::setup();
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let (verified_sender, verified_receiver) = channel();
let max_tick_height = 10;
let (poh_recorder, poh_service) = create_test_recorder(&bank);
let (banking_stage, _entry_receiver) =
BankingStage::new(&bank, &poh_recorder, verified_receiver, max_tick_height);
loop {
let bank_tick_height = bank.tick_height();
if bank_tick_height >= max_tick_height {
break;
}
sleep(Duration::from_millis(10));
}
drop(verified_sender);
banking_stage.join().unwrap();
poh_service.close().unwrap();
}
#[test]
fn test_returns_unprocessed_packet() {
solana_logger::setup();
let (genesis_block, mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let ticks_per_slot = 1;
let (verified_sender, verified_receiver) = channel();
let (poh_recorder, poh_service) = create_test_recorder(&bank);
let (mut banking_stage, _entry_receiver) =
BankingStage::new(&bank, &poh_recorder, verified_receiver, ticks_per_slot);
// Wait for Poh recorder to hit max height
loop {
let bank_tick_height = bank.tick_height();
if bank_tick_height >= ticks_per_slot {
break;
}
sleep(Duration::from_millis(10));
}
// Now send a transaction to the banking stage
let transaction = SystemTransaction::new_account(
&mint_keypair,
Keypair::new().pubkey(),
2,
genesis_block.hash(),
0,
);
let packets = to_packets(&[transaction]);
verified_sender
.send(vec![(packets[0].clone(), vec![1u8])])
.unwrap();
// Shut down the banking stage, it should give back the transaction
drop(verified_sender);
let unprocessed_packets = banking_stage.join_and_collect_unprocessed_packets();
assert_eq!(unprocessed_packets.len(), 1);
let (packets, start_index) = &unprocessed_packets[0];
assert_eq!(packets.read().unwrap().packets.len(), 1); // TODO: maybe compare actual packet contents too
assert_eq!(*start_index, 0);
poh_service.close().unwrap();
}
#[test]
fn test_bank_record_transactions() {
let (genesis_block, mint_keypair) = GenesisBlock::new(10_000);
let bank = Arc::new(Bank::new(&genesis_block));
let (entry_sender, entry_receiver) = channel();
let working_bank = WorkingBank {
bank: bank.clone(),
sender: entry_sender,
min_tick_height: bank.tick_height(),
max_tick_height: std::u64::MAX,
};
let poh_recorder = Arc::new(Mutex::new(PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
)));
let (poh_recorder, entry_receiver) =
PohRecorder::new(bank.tick_height(), bank.last_blockhash());
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
poh_recorder.lock().unwrap().set_working_bank(working_bank);
let pubkey = Keypair::new().pubkey();
@ -631,7 +567,7 @@ mod tests {
let mut results = vec![Ok(()), Ok(())];
BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap();
let entries = entry_receiver.recv().unwrap();
let (_, entries) = entry_receiver.recv().unwrap();
assert_eq!(entries[0].0.transactions.len(), transactions.len());
// ProgramErrors should still be recorded
@ -640,13 +576,13 @@ mod tests {
ProgramError::ResultWithNegativeTokens,
));
BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap();
let entries = entry_receiver.recv().unwrap();
let (_, entries) = entry_receiver.recv().unwrap();
assert_eq!(entries[0].0.transactions.len(), transactions.len());
// Other BankErrors should not be recorded
results[0] = Err(BankError::AccountNotFound);
BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap();
let entries = entry_receiver.recv().unwrap();
let (_, entries) = entry_receiver.recv().unwrap();
assert_eq!(entries[0].0.transactions.len(), transactions.len() - 1);
}
@ -665,38 +601,38 @@ mod tests {
0,
)];
let (entry_sender, entry_receiver) = channel();
let working_bank = WorkingBank {
bank: bank.clone(),
sender: entry_sender,
min_tick_height: bank.tick_height(),
max_tick_height: bank.tick_height() + 1,
};
let poh_recorder = Arc::new(Mutex::new(PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
)));
let (poh_recorder, entry_receiver) =
PohRecorder::new(bank.tick_height(), bank.last_blockhash());
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
poh_recorder.lock().unwrap().set_working_bank(working_bank);
BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder).unwrap();
poh_recorder.lock().unwrap().tick();
let mut need_tick = true;
let mut done = false;
// read entries until I find mine, might be ticks...
while let Ok(entries) = entry_receiver.recv() {
while let Ok((_, entries)) = entry_receiver.recv() {
for (entry, _) in entries {
if !entry.is_tick() {
trace!("got entry");
assert_eq!(entry.transactions.len(), transactions.len());
assert_eq!(bank.get_balance(&pubkey), 1);
need_tick = false;
} else {
break;
done = true;
}
}
if done {
break;
}
}
trace!("done ticking");
assert_eq!(need_tick, false);
assert_eq!(done, true);
let transactions = vec![SystemTransaction::new_move(
&mint_keypair,
@ -707,7 +643,7 @@ mod tests {
)];
assert_matches!(
BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder,),
BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder),
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
);

View File

@ -1,19 +1,18 @@
//! A stage to broadcast data from a leader node to validators
//!
use crate::blocktree::Blocktree;
use crate::cluster_info::{ClusterInfo, ClusterInfoError, NodeInfo, DATA_PLANE_FANOUT};
use crate::entry::Entry;
use crate::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT};
use crate::entry::EntrySlice;
#[cfg(feature = "erasure")]
use crate::erasure::CodingGenerator;
use crate::packet::index_blobs;
use crate::poh_recorder::WorkingBankEntries;
use crate::result::{Error, Result};
use crate::service::Service;
use crate::staking_utils;
use rayon::prelude::*;
use solana_metrics::counter::Counter;
use solana_metrics::{influxdb, submit};
use solana_runtime::bank::Bank;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::duration_as_ms;
use std::net::UdpSocket;
@ -32,7 +31,6 @@ pub enum BroadcastStageReturnType {
struct Broadcast {
id: Pubkey,
blob_index: u64,
#[cfg(feature = "erasure")]
coding_generator: CodingGenerator,
@ -41,25 +39,46 @@ struct Broadcast {
impl Broadcast {
fn run(
&mut self,
slot_height: u64,
max_tick_height: u64,
broadcast_table: &[NodeInfo],
receiver: &Receiver<Vec<(Entry, u64)>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
receiver: &Receiver<WorkingBankEntries>,
sock: &UdpSocket,
blocktree: &Arc<Blocktree>,
) -> Result<()> {
let timer = Duration::new(1, 0);
let entries = receiver.recv_timeout(timer)?;
let (bank, entries) = receiver.recv_timeout(timer)?;
let mut broadcast_table = cluster_info
.read()
.unwrap()
.sorted_tvu_peers(&staking_utils::node_stakes(&bank));
// Layer 1, leader nodes are limited to the fanout size.
broadcast_table.truncate(DATA_PLANE_FANOUT);
inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1);
let slot_height = bank.slot();
let max_tick_height = (bank.slot() + 1) * bank.ticks_per_slot() - 1;
// TODO: Fix BankingStage/BroadcastStage to operate on `slot` directly instead of
// `max_tick_height`
let mut blob_index = blocktree
.meta(bank.slot())
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0);
let now = Instant::now();
let mut num_entries = entries.len();
let mut ventries = Vec::new();
let mut last_tick = entries.last().map(|v| v.1).unwrap_or(0);
ventries.push(entries);
while let Ok(entries) = receiver.try_recv() {
while let Ok((same_bank, entries)) = receiver.try_recv() {
num_entries += entries.len();
last_tick = entries.last().map(|v| v.1).unwrap_or(0);
ventries.push(entries);
assert!(last_tick <= max_tick_height);
assert!(same_bank.slot() == bank.slot());
if last_tick == max_tick_height {
break;
}
}
inc_new_counter_info!("broadcast_service-entries_received", num_entries);
@ -75,14 +94,8 @@ impl Broadcast {
.collect();
// TODO: blob_index should be slot-relative...
index_blobs(&blobs, &self.id, &mut self.blob_index, slot_height);
let parent = {
if slot_height == 0 {
0
} else {
slot_height - 1
}
};
index_blobs(&blobs, &self.id, &mut blob_index, slot_height);
let parent = bank.parents().first().map(|bank| bank.slot()).unwrap_or(0);
for b in blobs.iter() {
b.write().unwrap().set_parent(parent);
}
@ -129,7 +142,7 @@ impl Broadcast {
influxdb::Point::new("broadcast-service")
.add_field(
"transmit-index",
influxdb::Value::Integer(self.blob_index as i64),
influxdb::Value::Integer(blob_index as i64),
)
.to_owned(),
);
@ -163,12 +176,9 @@ pub struct BroadcastStage {
impl BroadcastStage {
#[allow(clippy::too_many_arguments)]
fn run(
slot_height: u64,
bank: &Arc<Bank>,
sock: &UdpSocket,
cluster_info: &Arc<RwLock<ClusterInfo>>,
blob_index: u64,
receiver: &Receiver<Vec<(Entry, u64)>>,
receiver: &Receiver<WorkingBankEntries>,
exit_signal: &Arc<AtomicBool>,
blocktree: &Arc<Blocktree>,
) -> BroadcastStageReturnType {
@ -176,32 +186,15 @@ impl BroadcastStage {
let mut broadcast = Broadcast {
id: me.id,
blob_index,
#[cfg(feature = "erasure")]
coding_generator: CodingGenerator::new(),
};
let max_tick_height = (slot_height + 1) * bank.ticks_per_slot() - 1;
loop {
if exit_signal.load(Ordering::Relaxed) {
return BroadcastStageReturnType::ExitSignal;
}
let mut broadcast_table = cluster_info
.read()
.unwrap()
.sorted_tvu_peers(&staking_utils::node_stakes(&bank));
// Layer 1, leader nodes are limited to the fanout size.
broadcast_table.truncate(DATA_PLANE_FANOUT);
inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1);
if let Err(e) = broadcast.run(
slot_height,
max_tick_height,
&broadcast_table,
receiver,
sock,
blocktree,
) {
if let Err(e) = broadcast.run(&cluster_info, receiver, sock, blocktree) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) | Error::SendError => {
return BroadcastStageReturnType::ChannelDisconnected;
@ -234,32 +227,19 @@ impl BroadcastStage {
/// completing the cycle.
#[allow(clippy::too_many_arguments)]
pub fn new(
slot_height: u64,
bank: &Arc<Bank>,
sock: UdpSocket,
cluster_info: Arc<RwLock<ClusterInfo>>,
blob_index: u64,
receiver: Receiver<Vec<(Entry, u64)>>,
receiver: Receiver<WorkingBankEntries>,
exit_sender: Arc<AtomicBool>,
blocktree: &Arc<Blocktree>,
) -> Self {
let exit_signal = Arc::new(AtomicBool::new(false));
let blocktree = blocktree.clone();
let bank = bank.clone();
let thread_hdl = Builder::new()
.name("solana-broadcaster".to_string())
.spawn(move || {
let _exit = Finalizer::new(exit_sender);
Self::run(
slot_height,
&bank,
&sock,
&cluster_info,
blob_index,
&receiver,
&exit_signal,
&blocktree,
)
Self::run(&sock, &cluster_info, &receiver, &exit_signal, &blocktree)
})
.unwrap();
@ -282,6 +262,7 @@ mod test {
use crate::cluster_info::{ClusterInfo, Node};
use crate::entry::create_ticks;
use crate::service::Service;
use solana_runtime::bank::Bank;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT;
@ -294,14 +275,13 @@ mod test {
struct MockBroadcastStage {
blocktree: Arc<Blocktree>,
broadcast_service: BroadcastStage,
bank: Arc<Bank>,
}
fn setup_dummy_broadcast_service(
slot_height: u64,
leader_pubkey: Pubkey,
ledger_path: &str,
entry_receiver: Receiver<Vec<(Entry, u64)>>,
blob_index: u64,
entry_receiver: Receiver<WorkingBankEntries>,
) -> MockBroadcastStage {
// Make the database ledger
let blocktree = Arc::new(Blocktree::open(ledger_path).unwrap());
@ -323,11 +303,8 @@ mod test {
// Start up the broadcast stage
let broadcast_service = BroadcastStage::new(
slot_height,
&bank,
leader_info.sockets.broadcast,
cluster_info,
blob_index,
entry_receiver,
exit_sender,
&blocktree,
@ -336,6 +313,7 @@ mod test {
MockBroadcastStage {
blocktree,
broadcast_service,
bank,
}
}
@ -352,17 +330,16 @@ mod test {
let (entry_sender, entry_receiver) = channel();
let broadcast_service = setup_dummy_broadcast_service(
0,
leader_keypair.pubkey(),
&ledger_path,
entry_receiver,
0,
);
let bank = broadcast_service.bank.clone();
let ticks = create_ticks(max_tick_height - start_tick_height, Hash::default());
for (i, tick) in ticks.into_iter().enumerate() {
entry_sender
.send(vec![(tick, i as u64 + 1)])
.send((bank.clone(), vec![(tick, i as u64 + 1)]))
.expect("Expect successful send to broadcast service");
}

View File

@ -16,7 +16,7 @@ use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use crate::storage_stage::StorageState;
use crate::tpu::Tpu;
use crate::tvu::{Sockets, Tvu, TvuRotationInfo, TvuRotationReceiver};
use crate::tvu::{Sockets, Tvu};
use crate::voting_keypair::VotingKeypair;
use solana_metrics::counter::Counter;
use solana_sdk::genesis_block::GenesisBlock;
@ -26,11 +26,11 @@ use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction::SystemTransaction;
use solana_sdk::timing::timestamp;
use solana_vote_api::vote_transaction::VoteTransaction;
use std::net::UdpSocket;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex, RwLock};
use std::thread::sleep;
use std::thread::JoinHandle;
use std::thread::{spawn, Result};
use std::time::Duration;
@ -84,20 +84,15 @@ impl Default for FullnodeConfig {
}
pub struct Fullnode {
id: Pubkey,
pub id: Pubkey,
exit: Arc<AtomicBool>,
rpc_service: Option<JsonRpcService>,
rpc_pubsub_service: Option<PubSubService>,
rpc_working_bank_handle: JoinHandle<()>,
gossip_service: GossipService,
sigverify_disabled: bool,
tpu_sockets: Vec<UdpSocket>,
broadcast_socket: UdpSocket,
node_services: NodeServices,
rotation_receiver: TvuRotationReceiver,
blocktree: Arc<Blocktree>,
poh_service: PohService,
poh_recorder: Arc<Mutex<PohRecorder>>,
bank_forks: Arc<RwLock<BankForks>>,
}
impl Fullnode {
@ -129,10 +124,9 @@ impl Fullnode {
bank.tick_height(),
bank.last_blockhash(),
);
let poh_recorder = Arc::new(Mutex::new(PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
)));
let (poh_recorder, entry_receiver) =
PohRecorder::new(bank.tick_height(), bank.last_blockhash());
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, exit.clone());
info!("node info: {:?}", node.info);
@ -225,8 +219,6 @@ impl Fullnode {
};
// Setup channel for rotation indications
let (rotation_sender, rotation_receiver) = channel();
let tvu = Tvu::new(
voting_keypair_option,
&bank_forks,
@ -235,137 +227,55 @@ impl Fullnode {
sockets,
blocktree.clone(),
config.storage_rotate_count,
&rotation_sender,
&storage_state,
config.blockstream.as_ref(),
ledger_signal_receiver,
&subscriptions,
&poh_recorder,
);
let tpu = Tpu::new(id, &cluster_info);
let tpu = Tpu::new(
id,
&cluster_info,
&poh_recorder,
entry_receiver,
node.sockets.tpu,
node.sockets.broadcast,
config.sigverify_disabled,
&blocktree,
);
let exit_ = exit.clone();
let bank_forks_ = bank_forks.clone();
let rpc_service_rp = rpc_service.request_processor.clone();
let rpc_working_bank_handle = spawn(move || loop {
if exit_.load(Ordering::Relaxed) {
break;
}
let bank = bank_forks_.read().unwrap().working_bank();
trace!("rpc working bank {} {}", bank.slot(), bank.last_blockhash());
rpc_service_rp
.write()
.unwrap()
.set_bank(&bank_forks_.read().unwrap().working_bank());
let timer = Duration::from_millis(100);
sleep(timer);
});
inc_new_counter_info!("fullnode-new", 1);
Self {
id,
sigverify_disabled: config.sigverify_disabled,
gossip_service,
rpc_service: Some(rpc_service),
rpc_pubsub_service: Some(rpc_pubsub_service),
rpc_working_bank_handle,
node_services: NodeServices::new(tpu, tvu),
exit,
tpu_sockets: node.sockets.tpu,
broadcast_socket: node.sockets.broadcast,
rotation_receiver,
blocktree,
poh_service,
poh_recorder,
bank_forks,
}
}
fn rotate(&mut self, rotation_info: TvuRotationInfo) {
trace!(
"{:?}: rotate for slot={} to leader={:?}",
self.id,
rotation_info.slot,
rotation_info.leader_id,
);
if let Some(ref mut rpc_service) = self.rpc_service {
// TODO: This is not the correct bank. Instead TVU should pass along the
// frozen Bank for each completed block for RPC to use from it's notion of the "best"
// available fork (until we want to surface multiple forks to RPC)
rpc_service.set_bank(&self.bank_forks.read().unwrap().working_bank());
}
if rotation_info.leader_id == self.id {
debug!("{:?} rotating to leader role", self.id);
let tpu_bank = self
.bank_forks
.read()
.unwrap()
.get(rotation_info.slot)
.unwrap()
.clone();
self.node_services.tpu.switch_to_leader(
&tpu_bank,
&self.poh_recorder,
self.tpu_sockets
.iter()
.map(|s| s.try_clone().expect("Failed to clone TPU sockets"))
.collect(),
self.broadcast_socket
.try_clone()
.expect("Failed to clone broadcast socket"),
self.sigverify_disabled,
rotation_info.slot,
&self.blocktree,
);
} else {
self.node_services.tpu.switch_to_forwarder(
rotation_info.leader_id,
self.tpu_sockets
.iter()
.map(|s| s.try_clone().expect("Failed to clone TPU sockets"))
.collect(),
);
}
}
// Runs a thread to manage node role transitions. The returned closure can be used to signal the
// node to exit.
pub fn start(
mut self,
rotation_notifier: Option<Sender<u64>>,
) -> (JoinHandle<()>, Arc<AtomicBool>, Receiver<bool>) {
let (sender, receiver) = channel();
let exit = self.exit.clone();
let timeout = Duration::from_secs(1);
let handle = spawn(move || loop {
if self.exit.load(Ordering::Relaxed) {
debug!("node shutdown requested");
self.close().expect("Unable to close node");
let _ = sender.send(true);
break;
}
match self.rotation_receiver.recv_timeout(timeout) {
Ok(rotation_info) => {
trace!("{:?}: rotate at slot={}", self.id, rotation_info.slot);
//TODO: this will be called by the TVU every time it votes
//instead of here
info!(
"reset PoH... {} {}",
rotation_info.tick_height, rotation_info.blockhash
);
self.poh_recorder
.lock()
.unwrap()
.reset(rotation_info.tick_height, rotation_info.blockhash);
let slot = rotation_info.slot;
self.rotate(rotation_info);
debug!("role transition complete");
if let Some(ref rotation_notifier) = rotation_notifier {
rotation_notifier.send(slot).unwrap();
}
}
Err(RecvTimeoutError::Timeout) => continue,
_ => (),
}
});
(handle, exit, receiver)
}
pub fn run(self, rotation_notifier: Option<Sender<u64>>) -> impl FnOnce() {
let (_, exit, receiver) = self.start(rotation_notifier);
move || {
exit.store(true, Ordering::Relaxed);
receiver.recv().unwrap();
debug!("node shutdown complete");
}
}
// Used for notifying many nodes in parallel to exit
fn exit(&self) {
pub fn exit(&self) {
self.exit.store(true, Ordering::Relaxed);
// Need to force the poh_recorder to drop the WorkingBank,
// which contains the channel to BroadcastStage. This should be
@ -375,6 +285,7 @@ impl Fullnode {
// in motion because exit()/close() are only called by the run() loop
// which is the sole initiator of rotations.
self.poh_recorder.lock().unwrap().clear_bank();
self.poh_service.exit();
if let Some(ref rpc_service) = self.rpc_service {
rpc_service.exit();
}
@ -382,10 +293,9 @@ impl Fullnode {
rpc_pubsub_service.exit();
}
self.node_services.exit();
self.poh_service.exit()
}
fn close(self) -> Result<()> {
pub fn close(self) -> Result<()> {
self.exit();
self.join()
}
@ -418,6 +328,8 @@ impl Service for Fullnode {
type JoinReturnType = ();
fn join(self) -> Result<()> {
self.poh_service.join()?;
drop(self.poh_recorder);
if let Some(rpc_service) = self.rpc_service {
rpc_service.join()?;
}
@ -425,11 +337,10 @@ impl Service for Fullnode {
rpc_pubsub_service.join()?;
}
self.rpc_working_bank_handle.join()?;
self.gossip_service.join()?;
self.node_services.join()?;
trace!("exit node_services!");
self.poh_service.join()?;
trace!("exit poh!");
Ok(())
}
}
@ -483,12 +394,7 @@ pub fn make_active_set_entries(
#[cfg(test)]
mod tests {
use super::*;
use crate::blocktree::{create_new_tmp_ledger, tmp_copy_blocktree};
use crate::entry::make_consecutive_blobs;
use crate::streamer::responder;
use solana_sdk::hash::Hash;
use solana_sdk::timing::DEFAULT_SLOTS_PER_EPOCH;
use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT;
use crate::blocktree::create_new_tmp_ledger;
use std::fs::remove_dir_all;
#[test]
@ -551,240 +457,4 @@ mod tests {
remove_dir_all(path).unwrap();
}
}
#[test]
fn test_leader_to_leader_transition() {
solana_logger::setup();
let bootstrap_leader_keypair = Keypair::new();
let bootstrap_leader_node =
Node::new_localhost_with_pubkey(bootstrap_leader_keypair.pubkey());
// Once the bootstrap leader hits the second epoch, because there are no other choices in
// the active set, this leader will remain the leader in the second epoch. In the second
// epoch, check that the same leader knows to shut down and restart as a leader again.
let ticks_per_slot = 5;
let slots_per_epoch = 2;
let voting_keypair = Keypair::new();
let fullnode_config = FullnodeConfig::default();
let (mut genesis_block, _mint_keypair) =
GenesisBlock::new_with_leader(10_000, bootstrap_leader_keypair.pubkey(), 500);
genesis_block.ticks_per_slot = ticks_per_slot;
genesis_block.slots_per_epoch = slots_per_epoch;
let (bootstrap_leader_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block);
// Start the bootstrap leader
let bootstrap_leader = Fullnode::new(
bootstrap_leader_node,
&Arc::new(bootstrap_leader_keypair),
&bootstrap_leader_ledger_path,
voting_keypair,
None,
&fullnode_config,
);
let (rotation_sender, rotation_receiver) = channel();
let bootstrap_leader_exit = bootstrap_leader.run(Some(rotation_sender));
// Wait for the bootstrap leader to transition. Since there are no other nodes in the
// cluster it will continue to be the leader
assert_eq!(rotation_receiver.recv().unwrap(), 1);
bootstrap_leader_exit();
}
#[test]
#[ignore]
fn test_ledger_role_transition() {
solana_logger::setup();
let fullnode_config = FullnodeConfig::default();
let ticks_per_slot = DEFAULT_TICKS_PER_SLOT;
// Create the leader and validator nodes
let bootstrap_leader_keypair = Arc::new(Keypair::new());
let validator_keypair = Arc::new(Keypair::new());
let (bootstrap_leader_node, validator_node, bootstrap_leader_ledger_path, _, _) =
setup_leader_validator(
&bootstrap_leader_keypair,
&validator_keypair,
ticks_per_slot,
0,
);
let bootstrap_leader_info = bootstrap_leader_node.info.clone();
let validator_ledger_path = tmp_copy_blocktree!(&bootstrap_leader_ledger_path);
let ledger_paths = vec![
bootstrap_leader_ledger_path.clone(),
validator_ledger_path.clone(),
];
{
// Test that a node knows to transition to a validator based on parsing the ledger
let bootstrap_leader = Fullnode::new(
bootstrap_leader_node,
&bootstrap_leader_keypair,
&bootstrap_leader_ledger_path,
Keypair::new(),
Some(&bootstrap_leader_info),
&fullnode_config,
);
let (rotation_sender, rotation_receiver) = channel();
let bootstrap_leader_exit = bootstrap_leader.run(Some(rotation_sender));
assert_eq!(rotation_receiver.recv().unwrap(), (DEFAULT_SLOTS_PER_EPOCH));
// Test that a node knows to transition to a leader based on parsing the ledger
let validator = Fullnode::new(
validator_node,
&validator_keypair,
&validator_ledger_path,
Keypair::new(),
Some(&bootstrap_leader_info),
&fullnode_config,
);
let (rotation_sender, rotation_receiver) = channel();
let validator_exit = validator.run(Some(rotation_sender));
assert_eq!(rotation_receiver.recv().unwrap(), (DEFAULT_SLOTS_PER_EPOCH));
validator_exit();
bootstrap_leader_exit();
}
for path in ledger_paths {
Blocktree::destroy(&path).expect("Expected successful database destruction");
let _ignored = remove_dir_all(&path);
}
}
// TODO: Rework this test or TVU (make_consecutive_blobs sends blobs that can't be handled by
// the replay_stage)
#[test]
#[ignore]
fn test_validator_to_leader_transition() {
solana_logger::setup();
// Make leader and validator node
let ticks_per_slot = 10;
let slots_per_epoch = 4;
let leader_keypair = Arc::new(Keypair::new());
let validator_keypair = Arc::new(Keypair::new());
let fullnode_config = FullnodeConfig::default();
let (leader_node, validator_node, validator_ledger_path, ledger_initial_len, blockhash) =
setup_leader_validator(&leader_keypair, &validator_keypair, ticks_per_slot, 0);
let leader_id = leader_keypair.pubkey();
let validator_info = validator_node.info.clone();
info!("leader: {:?}", leader_id);
info!("validator: {:?}", validator_info.id);
let voting_keypair = Keypair::new();
// Start the validator
let validator = Fullnode::new(
validator_node,
&validator_keypair,
&validator_ledger_path,
voting_keypair,
Some(&leader_node.info),
&fullnode_config,
);
let blobs_to_send = slots_per_epoch * ticks_per_slot + ticks_per_slot;
// Send blobs to the validator from our mock leader
let t_responder = {
let (s_responder, r_responder) = channel();
let blob_sockets: Vec<Arc<UdpSocket>> =
leader_node.sockets.tvu.into_iter().map(Arc::new).collect();
let t_responder = responder(
"test_validator_to_leader_transition",
blob_sockets[0].clone(),
r_responder,
);
let tvu_address = &validator_info.tvu;
let msgs = make_consecutive_blobs(
&leader_id,
blobs_to_send,
ledger_initial_len,
blockhash,
&tvu_address,
)
.into_iter()
.rev()
.collect();
s_responder.send(msgs).expect("send");
t_responder
};
info!("waiting for validator to rotate into the leader role");
let (rotation_sender, rotation_receiver) = channel();
let validator_exit = validator.run(Some(rotation_sender));
let rotation = rotation_receiver.recv().unwrap();
assert_eq!(rotation, blobs_to_send);
// Close the validator so that rocksdb has locks available
validator_exit();
let (bank_forks, bank_forks_info, _, _) =
new_banks_from_blocktree(&validator_ledger_path, None);
let bank = bank_forks.working_bank();
let entry_height = bank_forks_info[0].entry_height;
assert!(bank.tick_height() >= bank.ticks_per_slot() * bank.slots_per_epoch());
assert!(entry_height >= ledger_initial_len);
// Shut down
t_responder.join().expect("responder thread join");
Blocktree::destroy(&validator_ledger_path)
.expect("Expected successful database destruction");
let _ignored = remove_dir_all(&validator_ledger_path).unwrap();
}
fn setup_leader_validator(
leader_keypair: &Arc<Keypair>,
validator_keypair: &Arc<Keypair>,
ticks_per_slot: u64,
num_ending_slots: u64,
) -> (Node, Node, String, u64, Hash) {
info!("validator: {}", validator_keypair.pubkey());
info!("leader: {}", leader_keypair.pubkey());
let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey());
let (mut genesis_block, mint_keypair) =
GenesisBlock::new_with_leader(10_000, leader_node.info.id, 500);
genesis_block.ticks_per_slot = ticks_per_slot;
let (ledger_path, blockhash) = create_new_tmp_ledger!(&genesis_block);
// Add entries so that the validator is in the active set, then finish up the slot with
// ticks (and maybe add extra slots full of empty ticks)
let (entries, _) = make_active_set_entries(
validator_keypair,
&mint_keypair,
10,
0,
&blockhash,
ticks_per_slot * (num_ending_slots + 1),
);
let blocktree = Blocktree::open_config(&ledger_path, ticks_per_slot).unwrap();
let blockhash = entries.last().unwrap().hash;
let entry_height = ticks_per_slot + entries.len() as u64;
blocktree.write_entries(1, 0, 0, entries).unwrap();
(
leader_node,
validator_node,
ledger_path,
entry_height,
blockhash,
)
}
}

View File

@ -2,16 +2,16 @@
//! to generate a thread which regularly calculates the last confirmation times
//! observed by the leader
use crate::service::Service;
use crate::poh_recorder::PohRecorder;
use solana_metrics::{influxdb, submit};
use solana_runtime::bank::Bank;
use solana_sdk::timing;
use solana_vote_api::vote_state::VoteState;
use std::result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::thread::sleep;
use std::thread::{self, Builder, JoinHandle};
use std::thread::{Builder, JoinHandle};
use std::time::Duration;
#[derive(Debug, PartialEq, Eq)]
@ -20,14 +20,11 @@ pub enum ConfirmationError {
}
pub const COMPUTE_CONFIRMATION_MS: u64 = 100;
pub struct LeaderConfirmationService {
thread_hdl: JoinHandle<()>,
}
pub struct LeaderConfirmationService {}
impl LeaderConfirmationService {
fn get_last_supermajority_timestamp(
bank: &Arc<Bank>,
bank: &Bank,
last_valid_validator_timestamp: u64,
) -> result::Result<u64, ConfirmationError> {
let mut total_stake = 0;
@ -69,7 +66,7 @@ impl LeaderConfirmationService {
Err(ConfirmationError::NoValidSupermajority)
}
pub fn compute_confirmation(bank: &Arc<Bank>, last_valid_validator_timestamp: &mut u64) {
pub fn compute_confirmation(bank: &Bank, last_valid_validator_timestamp: &mut u64) {
if let Ok(super_majority_timestamp) =
Self::get_last_supermajority_timestamp(bank, *last_valid_validator_timestamp)
{
@ -90,9 +87,9 @@ impl LeaderConfirmationService {
}
/// Create a new LeaderConfirmationService for computing confirmation.
pub fn new(bank: &Arc<Bank>, exit: Arc<AtomicBool>) -> Self {
let bank = bank.clone();
let thread_hdl = Builder::new()
pub fn start(poh_recorder: &Arc<Mutex<PohRecorder>>, exit: Arc<AtomicBool>) -> JoinHandle<()> {
let poh_recorder = poh_recorder.clone();
Builder::new()
.name("solana-leader-confirmation-service".to_string())
.spawn(move || {
let mut last_valid_validator_timestamp = 0;
@ -100,21 +97,15 @@ impl LeaderConfirmationService {
if exit.load(Ordering::Relaxed) {
break;
}
Self::compute_confirmation(&bank, &mut last_valid_validator_timestamp);
// dont hold this lock too long
let maybe_bank = poh_recorder.lock().unwrap().bank();
if let Some(ref bank) = maybe_bank {
Self::compute_confirmation(bank, &mut last_valid_validator_timestamp);
}
sleep(Duration::from_millis(COMPUTE_CONFIRMATION_MS));
}
})
.unwrap();
Self { thread_hdl }
}
}
impl Service for LeaderConfirmationService {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
.unwrap()
}
}

View File

@ -3,6 +3,7 @@ use crate::client::mk_client;
use crate::cluster_info::{Node, NodeInfo};
use crate::fullnode::{Fullnode, FullnodeConfig};
use crate::gossip_service::discover;
use crate::service::Service;
use crate::thin_client::retry_get_balance;
use crate::thin_client::ThinClient;
use crate::voting_keypair::VotingKeypair;
@ -14,16 +15,14 @@ use solana_vote_api::vote_state::VoteState;
use solana_vote_api::vote_transaction::VoteTransaction;
use std::fs::remove_dir_all;
use std::io::{Error, ErrorKind, Result};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::JoinHandle;
pub struct LocalCluster {
/// Keypair with funding to particpiate in the network
pub funding_keypair: Keypair,
/// Entry point from which the rest of the network can be discovered
pub entry_point_info: NodeInfo,
fullnode_hdls: Vec<(JoinHandle<()>, Arc<AtomicBool>)>,
fullnodes: Vec<Fullnode>,
ledger_paths: Vec<String>,
}
@ -50,8 +49,7 @@ impl LocalCluster {
None,
&fullnode_config,
);
let (thread, exit, _) = leader_server.start(None);
let mut fullnode_hdls = vec![(thread, exit)];
let mut fullnodes = vec![leader_server];
let mut client = mk_client(&leader_node_info);
for _ in 0..(num_nodes - 1) {
let validator_keypair = Arc::new(Keypair::new());
@ -88,27 +86,26 @@ impl LocalCluster {
Some(&leader_node_info),
&fullnode_config,
);
let (thread, exit, _) = validator_server.start(None);
fullnode_hdls.push((thread, exit));
fullnodes.push(validator_server);
}
discover(&leader_node_info, num_nodes);
Self {
funding_keypair: mint_keypair,
entry_point_info: leader_node_info,
fullnode_hdls,
fullnodes,
ledger_paths,
}
}
pub fn exit(&self) {
for node in &self.fullnode_hdls {
node.1.store(true, Ordering::Relaxed);
for node in &self.fullnodes {
node.exit();
}
}
pub fn close(&mut self) {
self.exit();
while let Some(node) = self.fullnode_hdls.pop() {
node.0.join().expect("join");
while let Some(node) = self.fullnodes.pop() {
node.join().unwrap();
}
for path in &self.ledger_paths {
remove_dir_all(path).unwrap();

View File

@ -16,7 +16,7 @@ use crate::result::{Error, Result};
use solana_runtime::bank::Bank;
use solana_sdk::hash::Hash;
use solana_sdk::transaction::Transaction;
use std::sync::mpsc::Sender;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
#[derive(Debug, PartialEq, Eq, Clone)]
@ -26,10 +26,11 @@ pub enum PohRecorderError {
MinHeightNotReached,
}
pub type WorkingBankEntries = (Arc<Bank>, Vec<(Entry, u64)>);
#[derive(Clone)]
pub struct WorkingBank {
pub bank: Arc<Bank>,
pub sender: Sender<Vec<(Entry, u64)>>,
pub min_tick_height: u64,
pub max_tick_height: u64,
}
@ -38,6 +39,7 @@ pub struct PohRecorder {
pub poh: Poh,
tick_cache: Vec<(Entry, u64)>,
working_bank: Option<WorkingBank>,
sender: Sender<WorkingBankEntries>,
}
impl PohRecorder {
@ -51,8 +53,19 @@ impl PohRecorder {
self.poh.hash();
}
pub fn bank(&self) -> Option<Arc<Bank>> {
self.working_bank.clone().map(|w| w.bank)
}
// synchronize PoH with a bank
pub fn reset(&mut self, tick_height: u64, blockhash: Hash) {
if self.poh.hash == blockhash {
assert_eq!(self.poh.tick_height, tick_height);
info!(
"reset skipped for: {},{}",
self.poh.hash, self.poh.tick_height
);
return;
}
let mut cache = vec![];
info!(
"reset poh from: {},{} to: {},{}",
@ -66,6 +79,15 @@ impl PohRecorder {
trace!("new working bank");
self.working_bank = Some(working_bank);
}
pub fn set_bank(&mut self, bank: &Arc<Bank>) {
let max_tick_height = (bank.slot() + 1) * bank.ticks_per_slot() - 1;
let working_bank = WorkingBank {
bank: bank.clone(),
min_tick_height: bank.tick_height(),
max_tick_height,
};
self.set_working_bank(working_bank);
}
// Flush cache will delay flushing the cache for a bank until it past the WorkingBank::min_tick_height
// On a record flush will flush the cache at the WorkingBank::min_tick_height, since a record
@ -106,12 +128,16 @@ impl PohRecorder {
for t in cache {
working_bank.bank.register_tick(&t.0.hash);
}
working_bank.sender.send(cache.to_vec())
self.sender
.send((working_bank.bank.clone(), cache.to_vec()))
} else {
Ok(())
};
if self.poh.tick_height >= working_bank.max_tick_height {
info!("poh_record: max_tick_height reached, setting working bank to None");
info!(
"poh_record: max_tick_height reached, setting working bank {} to None",
working_bank.bank.slot()
);
self.working_bank = None;
}
if e.is_err() {
@ -138,13 +164,21 @@ impl PohRecorder {
self.record_and_send_txs(mixin, txs)
}
pub fn new(tick_height: u64, last_entry_hash: Hash) -> Self {
/// A recorder to synchronize PoH with the following data structures
/// * bank - the LastId's queue is updated on `tick` and `record` events
/// * sender - the Entry channel that outputs to the ledger
pub fn new(tick_height: u64, last_entry_hash: Hash) -> (Self, Receiver<WorkingBankEntries>) {
let poh = Poh::new(last_entry_hash, tick_height);
PohRecorder {
poh,
tick_cache: vec![],
working_bank: None,
}
let (sender, receiver) = channel();
(
PohRecorder {
poh,
tick_cache: vec![],
working_bank: None,
sender,
},
receiver,
)
}
fn record_and_send_txs(&mut self, mixin: Hash, txs: Vec<Transaction>) -> Result<()> {
@ -160,9 +194,10 @@ impl PohRecorder {
transactions: txs,
};
trace!("sending entry {}", recorded_entry.is_tick());
working_bank
.sender
.send(vec![(recorded_entry, poh_entry.tick_height)])?;
self.sender.send((
working_bank.bank.clone(),
vec![(recorded_entry, poh_entry.tick_height)],
))?;
Ok(())
}
@ -186,13 +221,12 @@ mod tests {
use crate::test_tx::test_tx;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::hash;
use std::sync::mpsc::channel;
use std::sync::Arc;
#[test]
fn test_poh_recorder_no_zero_tick() {
let prev_hash = Hash::default();
let mut poh_recorder = PohRecorder::new(0, prev_hash);
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash);
poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 1);
assert_eq!(poh_recorder.tick_cache[0].1, 1);
@ -202,7 +236,7 @@ mod tests {
#[test]
fn test_poh_recorder_tick_height_is_last_tick() {
let prev_hash = Hash::default();
let mut poh_recorder = PohRecorder::new(0, prev_hash);
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash);
poh_recorder.tick();
poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 2);
@ -212,7 +246,7 @@ mod tests {
#[test]
fn test_poh_recorder_reset_clears_cache() {
let mut poh_recorder = PohRecorder::new(0, Hash::default());
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default());
poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 1);
poh_recorder.reset(0, Hash::default());
@ -224,12 +258,10 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let prev_hash = bank.last_blockhash();
let (entry_sender, _) = channel();
let mut poh_recorder = PohRecorder::new(0, prev_hash);
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash);
let working_bank = WorkingBank {
bank,
sender: entry_sender,
min_tick_height: 2,
max_tick_height: 3,
};
@ -244,12 +276,10 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let prev_hash = bank.last_blockhash();
let (entry_sender, entry_receiver) = channel();
let mut poh_recorder = PohRecorder::new(0, prev_hash);
let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash);
let working_bank = WorkingBank {
bank,
sender: entry_sender,
bank: bank.clone(),
min_tick_height: 2,
max_tick_height: 3,
};
@ -265,8 +295,9 @@ mod tests {
poh_recorder.tick();
assert_eq!(poh_recorder.poh.tick_height, 3);
assert_eq!(poh_recorder.tick_cache.len(), 0);
let e = entry_receiver.recv().expect("recv 1");
let (bank_, e) = entry_receiver.recv().expect("recv 1");
assert_eq!(e.len(), 3);
assert_eq!(bank_.slot(), bank.slot());
assert!(poh_recorder.working_bank.is_none());
}
@ -275,8 +306,7 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let prev_hash = bank.last_blockhash();
let (entry_sender, entry_receiver) = channel();
let mut poh_recorder = PohRecorder::new(0, prev_hash);
let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash);
poh_recorder.tick();
poh_recorder.tick();
@ -287,7 +317,6 @@ mod tests {
let working_bank = WorkingBank {
bank,
sender: entry_sender,
min_tick_height: 2,
max_tick_height: 3,
};
@ -296,7 +325,7 @@ mod tests {
assert_eq!(poh_recorder.poh.tick_height, 5);
assert!(poh_recorder.working_bank.is_none());
let e = entry_receiver.recv().expect("recv 1");
let (_, e) = entry_receiver.recv().expect("recv 1");
assert_eq!(e.len(), 3);
}
@ -305,12 +334,10 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let prev_hash = bank.last_blockhash();
let (entry_sender, entry_receiver) = channel();
let mut poh_recorder = PohRecorder::new(0, prev_hash);
let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash);
let working_bank = WorkingBank {
bank,
sender: entry_sender,
min_tick_height: 2,
max_tick_height: 3,
};
@ -327,12 +354,10 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let prev_hash = bank.last_blockhash();
let (entry_sender, entry_receiver) = channel();
let mut poh_recorder = PohRecorder::new(0, prev_hash);
let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash);
let working_bank = WorkingBank {
bank,
sender: entry_sender,
min_tick_height: 1,
max_tick_height: 2,
};
@ -346,10 +371,10 @@ mod tests {
assert_eq!(poh_recorder.tick_cache.len(), 0);
//tick in the cache + entry
let e = entry_receiver.recv().expect("recv 1");
let (_b, e) = entry_receiver.recv().expect("recv 1");
assert_eq!(e.len(), 1);
assert!(e[0].0.is_tick());
let e = entry_receiver.recv().expect("recv 2");
let (_b, e) = entry_receiver.recv().expect("recv 2");
assert!(!e[0].0.is_tick());
}
@ -358,12 +383,10 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let prev_hash = bank.last_blockhash();
let (entry_sender, entry_receiver) = channel();
let mut poh_recorder = PohRecorder::new(0, prev_hash);
let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash);
let working_bank = WorkingBank {
bank,
sender: entry_sender,
min_tick_height: 1,
max_tick_height: 2,
};
@ -375,7 +398,7 @@ mod tests {
let h1 = hash(b"hello world!");
assert!(poh_recorder.record(h1, vec![tx.clone()]).is_err());
let e = entry_receiver.recv().expect("recv 1");
let (_bank, e) = entry_receiver.recv().expect("recv 1");
assert_eq!(e.len(), 2);
assert!(e[0].0.is_tick());
assert!(e[1].0.is_tick());
@ -386,12 +409,10 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let prev_hash = bank.last_blockhash();
let (entry_sender, entry_receiver) = channel();
let mut poh_recorder = PohRecorder::new(0, prev_hash);
let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash);
let working_bank = WorkingBank {
bank,
sender: entry_sender,
min_tick_height: 2,
max_tick_height: 3,
};

View File

@ -117,20 +117,17 @@ mod tests {
use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::hash;
use std::sync::mpsc::channel;
use std::sync::mpsc::RecvError;
#[test]
fn test_poh_service() {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let prev_hash = bank.last_blockhash();
let (entry_sender, entry_receiver) = channel();
let poh_recorder = Arc::new(Mutex::new(PohRecorder::new(bank.tick_height(), prev_hash)));
let (poh_recorder, entry_receiver) = PohRecorder::new(bank.tick_height(), prev_hash);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let exit = Arc::new(AtomicBool::new(false));
let working_bank = WorkingBank {
bank: bank.clone(),
sender: entry_sender,
min_tick_height: bank.tick_height(),
max_tick_height: std::u64::MAX,
};
@ -171,7 +168,7 @@ mod tests {
let mut need_partial = true;
while need_tick || need_entry || need_partial {
for entry in entry_receiver.recv().unwrap() {
for entry in entry_receiver.recv().unwrap().1 {
let entry = &entry.0;
if entry.is_tick() {
assert!(entry.num_hashes <= HASHES_PER_TICK);
@ -199,43 +196,4 @@ mod tests {
let _ = poh_service.join().unwrap();
let _ = entry_producer.join().unwrap();
}
#[test]
fn test_poh_service_drops_working_bank() {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let prev_hash = bank.last_blockhash();
let (entry_sender, entry_receiver) = channel();
let poh_recorder = Arc::new(Mutex::new(PohRecorder::new(bank.tick_height(), prev_hash)));
let exit = Arc::new(AtomicBool::new(false));
let working_bank = WorkingBank {
bank: bank.clone(),
sender: entry_sender,
min_tick_height: bank.tick_height() + 3,
max_tick_height: bank.tick_height() + 5,
};
let poh_service = PohService::new(
poh_recorder.clone(),
&PohServiceConfig::default(),
Arc::new(AtomicBool::new(false)),
);
poh_recorder.lock().unwrap().set_working_bank(working_bank);
// all 5 ticks are expected, there is no tick 0
// First 4 ticks must be sent all at once, since bank shouldn't see them until
// the after bank's min_tick_height(3) is reached.
let entries = entry_receiver.recv().expect("recv 1");
assert_eq!(entries.len(), 4);
let entries = entry_receiver.recv().expect("recv 2");
assert_eq!(entries.len(), 1);
//WorkingBank should be dropped by the PohService thread as well
assert_eq!(entry_receiver.recv(), Err(RecvError));
exit.store(true, Ordering::Relaxed);
poh_service.exit();
let _ = poh_service.join().unwrap();
}
}

View File

@ -8,10 +8,10 @@ use crate::cluster_info::ClusterInfo;
use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice};
use crate::leader_schedule_utils;
use crate::packet::BlobError;
use crate::poh_recorder::PohRecorder;
use crate::result;
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use crate::tvu::{TvuRotationInfo, TvuRotationSender};
use solana_metrics::counter::Counter;
use solana_runtime::bank::Bank;
use solana_sdk::hash::Hash;
@ -22,7 +22,7 @@ use solana_vote_api::vote_transaction::VoteTransaction;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use std::time::Instant;
@ -62,9 +62,9 @@ impl ReplayStage {
_bank_forks_info: &[BankForksInfo],
cluster_info: Arc<RwLock<ClusterInfo>>,
exit: Arc<AtomicBool>,
to_leader_sender: &TvuRotationSender,
ledger_signal_receiver: Receiver<bool>,
subscriptions: &Arc<RpcSubscriptions>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
) -> (Self, Receiver<(u64, Pubkey)>, EntryReceiver)
where
T: 'static + KeypairUtil + Send + Sync,
@ -73,9 +73,9 @@ impl ReplayStage {
let (slot_full_sender, slot_full_receiver) = channel();
trace!("replay stage");
let exit_ = exit.clone();
let to_leader_sender = to_leader_sender.clone();
let subscriptions = subscriptions.clone();
let bank_forks = bank_forks.clone();
let poh_recorder = poh_recorder.clone();
let mut progress = HashMap::new();
@ -84,6 +84,7 @@ impl ReplayStage {
.name("solana-replay-stage".to_string())
.spawn(move || {
let _exit = Finalizer::new(exit_.clone());
let mut first_block = false;
loop {
let now = Instant::now();
// Stop getting entries if we get exit signal
@ -91,11 +92,11 @@ impl ReplayStage {
break;
}
Self::generate_new_bank_forks(&blocktree, &mut bank_forks.write().unwrap());
let live_bank_ids = bank_forks.read().unwrap().active_banks();
trace!("live banks {:?}", live_bank_ids);
let active_banks = bank_forks.read().unwrap().active_banks();
trace!("active banks {:?}", active_banks);
let mut votable: Vec<u64> = vec![];
for bank_id in live_bank_ids {
let bank = bank_forks.read().unwrap().get(bank_id).unwrap().clone();
for bank_id in &active_banks {
let bank = bank_forks.read().unwrap().get(*bank_id).unwrap().clone();
if !Self::is_tpu(&bank, my_id) {
Self::replay_blocktree_into_bank(
&bank,
@ -104,17 +105,28 @@ impl ReplayStage {
&forward_entry_sender,
)?;
}
let max_tick_height = (bank_id + 1) * bank.ticks_per_slot() - 1;
let max_tick_height = (*bank_id + 1) * bank.ticks_per_slot() - 1;
if bank.tick_height() == max_tick_height {
bank.freeze();
votable.push(bank_id);
progress.remove(&bank_id);
info!("bank frozen {}", bank.slot());
votable.push(*bank_id);
progress.remove(bank_id);
let id = leader_schedule_utils::slot_leader_at(bank.slot(), &bank);
if let Err(e) = slot_full_sender.send((bank.slot(), id)) {
info!("{} slot_full alert failed: {:?}", my_id, e);
}
}
}
let frozen = bank_forks.read().unwrap().frozen_banks();
if votable.is_empty()
&& frozen.len() == 1
&& active_banks.is_empty()
&& !first_block
{
first_block = true;
votable.extend(frozen.keys());
info!("voting on first block {:?}", votable);
}
// TODO: fork selection
// vote on the latest one for now
votable.sort();
@ -142,20 +154,31 @@ impl ReplayStage {
);
cluster_info.write().unwrap().push_vote(vote);
}
poh_recorder
.lock()
.unwrap()
.reset(parent.tick_height(), parent.last_blockhash());
if next_leader == my_id {
let frozen = bank_forks.read().unwrap().frozen_banks();
assert!(frozen.get(&next_slot).is_none());
assert!(bank_forks.read().unwrap().get(next_slot).is_none());
let tpu_bank = Bank::new_from_parent(&parent, my_id, next_slot);
bank_forks.write().unwrap().insert(next_slot, tpu_bank);
if let Some(tpu_bank) =
bank_forks.read().unwrap().get(next_slot).cloned()
{
assert_eq!(bank_forks.read().unwrap().working_bank().slot(), tpu_bank.slot());
debug!(
"poh_recorder new working bank: me: {} next_slot: {} next_leader: {}",
my_id,
tpu_bank.slot(),
next_leader
);
poh_recorder.lock().unwrap().set_bank(&tpu_bank);
}
}
debug!(
"to_leader_sender: me: {} next_slot: {} next_leader: {}",
my_id, next_slot, next_leader
);
to_leader_sender.send(TvuRotationInfo {
tick_height: parent.tick_height(),
blockhash: parent.last_blockhash(),
slot: next_slot,
leader_id: next_leader,
})?;
}
inc_new_counter_info!(
"replicate_stage-duration",
@ -166,7 +189,7 @@ impl ReplayStage {
match result {
Err(RecvTimeoutError::Timeout) => continue,
Err(_) => break,
Ok(_) => debug!("blocktree signal"),
Ok(_) => trace!("blocktree signal"),
};
}
Ok(())
@ -270,25 +293,31 @@ impl ReplayStage {
// Find the next slot that chains to the old slot
let frozen_banks = forks.frozen_banks();
let frozen_bank_ids: Vec<u64> = frozen_banks.keys().cloned().collect();
trace!("generate new forks {:?}", frozen_bank_ids);
trace!("frozen_banks {:?}", frozen_bank_ids);
let next_slots = blocktree
.get_slots_since(&frozen_bank_ids)
.expect("Db error");
trace!("generate new forks {:?}", next_slots);
for (parent_id, children) in next_slots {
let parent_bank = frozen_banks
.get(&parent_id)
.expect("missing parent in bank forks")
.clone();
for child_id in children {
let new_fork = forks.get(child_id).is_none();
if new_fork {
let leader = leader_schedule_utils::slot_leader_at(child_id, &parent_bank);
trace!("new fork:{} parent:{}", child_id, parent_id);
forks.insert(
child_id,
Bank::new_from_parent(&parent_bank, leader, child_id),
);
if frozen_banks.get(&child_id).is_some() {
trace!("child already frozen {}", child_id);
continue;
}
if forks.get(child_id).is_some() {
trace!("child already active {}", child_id);
continue;
}
let leader = leader_schedule_utils::slot_leader_at(child_id, &parent_bank);
info!("new fork:{} parent:{}", child_id, parent_id);
forks.insert(
child_id,
Bank::new_from_parent(&parent_bank, leader, child_id),
);
}
}
}
@ -305,6 +334,7 @@ impl Service for ReplayStage {
#[cfg(test)]
mod test {
use super::*;
use crate::banking_stage::create_test_recorder;
use crate::blocktree::create_new_tmp_ledger;
use crate::cluster_info::{ClusterInfo, Node};
use crate::entry::create_ticks;
@ -341,13 +371,13 @@ mod test {
// Set up the replay stage
let exit = Arc::new(AtomicBool::new(false));
let voting_keypair = Arc::new(Keypair::new());
let (to_leader_sender, _to_leader_receiver) = channel();
{
let (bank_forks, bank_forks_info, blocktree, l_receiver) =
new_banks_from_blocktree(&my_ledger_path, None);
let bank = bank_forks.working_bank();
let blocktree = Arc::new(blocktree);
let (poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank);
let (replay_stage, _slot_full_receiver, ledger_writer_recv) = ReplayStage::new(
my_keypair.pubkey(),
Some(voting_keypair.clone()),
@ -356,9 +386,9 @@ mod test {
&bank_forks_info,
cluster_info_me.clone(),
exit.clone(),
&to_leader_sender,
l_receiver,
&Arc::new(RpcSubscriptions::default()),
&poh_recorder,
);
let keypair = voting_keypair.as_ref();
@ -378,6 +408,7 @@ mod test {
replay_stage
.close()
.expect("Expect successful ReplayStage exit");
poh_service.close().unwrap();
}
let _ignored = remove_dir_all(&my_ledger_path);
}

View File

@ -498,7 +498,6 @@ mod tests {
fn test_thin_client_basic() {
solana_logger::setup();
let (server, leader_data, alice, ledger_path) = new_fullnode();
let server_exit = server.run(None);
let bob_pubkey = Keypair::new().pubkey();
info!(
@ -525,7 +524,7 @@ mod tests {
let transaction_count = client.transaction_count();
assert_eq!(transaction_count, 1);
server_exit();
server.close().unwrap();
remove_dir_all(ledger_path).unwrap();
}
@ -534,7 +533,6 @@ mod tests {
fn test_bad_sig() {
solana_logger::setup();
let (server, leader_data, alice, ledger_path) = new_fullnode();
let server_exit = server.run(None);
let bob_pubkey = Keypair::new().pubkey();
info!(
"found leader: {:?}",
@ -562,7 +560,7 @@ mod tests {
let balance = client.get_balance(&bob_pubkey);
assert_eq!(balance.unwrap(), 1001);
server_exit();
server.close().unwrap();
remove_dir_all(ledger_path).unwrap();
}
@ -570,7 +568,6 @@ mod tests {
fn test_register_vote_account() {
solana_logger::setup();
let (server, leader_data, alice, ledger_path) = new_fullnode();
let server_exit = server.run(None);
info!(
"found leader: {:?}",
poll_gossip_for_leader(leader_data.gossip, Some(5)).unwrap()
@ -626,7 +623,7 @@ mod tests {
sleep(Duration::from_millis(900));
}
server_exit();
server.close().unwrap();
remove_dir_all(ledger_path).unwrap();
}
@ -645,7 +642,6 @@ mod tests {
fn test_zero_balance_after_nonzero() {
solana_logger::setup();
let (server, leader_data, alice, ledger_path) = new_fullnode();
let server_exit = server.run(None);
let bob_keypair = Keypair::new();
info!(
@ -682,7 +678,7 @@ mod tests {
info!("Bob's balance is {:?}", bob_balance);
assert!(bob_balance.is_err(),);
server_exit();
server.close().unwrap();
remove_dir_all(ledger_path).unwrap();
}
}

View File

@ -1,29 +1,22 @@
//! The `tpu` module implements the Transaction Processing Unit, a
//! multi-stage transaction processing pipeline in software.
use crate::banking_stage::{BankingStage, UnprocessedPackets};
use crate::banking_stage::BankingStage;
use crate::blocktree::Blocktree;
use crate::broadcast_stage::BroadcastStage;
use crate::cluster_info::ClusterInfo;
use crate::cluster_info_vote_listener::ClusterInfoVoteListener;
use crate::fetch_stage::FetchStage;
use crate::poh_recorder::PohRecorder;
use crate::poh_recorder::{PohRecorder, WorkingBankEntries};
use crate::service::Service;
use crate::sigverify_stage::SigVerifyStage;
use crate::tpu_forwarder::TpuForwarder;
use solana_runtime::bank::Bank;
use solana_sdk::pubkey::Pubkey;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
pub enum TpuMode {
Leader(LeaderServices),
Forwarder(ForwarderServices),
}
pub struct LeaderServices {
fetch_stage: FetchStage,
sigverify_stage: SigVerifyStage,
@ -49,7 +42,7 @@ impl LeaderServices {
}
}
fn exit(&self) {
pub fn exit(&self) {
self.fetch_stage.close();
}
@ -67,187 +60,63 @@ impl LeaderServices {
Ok(())
}
fn close(self) -> thread::Result<()> {
self.exit();
self.join()
}
}
pub struct ForwarderServices {
tpu_forwarder: TpuForwarder,
}
impl ForwarderServices {
fn new(tpu_forwarder: TpuForwarder) -> Self {
ForwarderServices { tpu_forwarder }
}
fn exit(&self) {
self.tpu_forwarder.close();
}
fn join(self) -> thread::Result<()> {
self.tpu_forwarder.join()
}
fn close(self) -> thread::Result<()> {
pub fn close(self) -> thread::Result<()> {
self.exit();
self.join()
}
}
pub struct Tpu {
tpu_mode: Option<TpuMode>,
leader_services: LeaderServices,
exit: Arc<AtomicBool>,
id: Pubkey,
cluster_info: Arc<RwLock<ClusterInfo>>,
pub id: Pubkey,
}
impl Tpu {
pub fn new(id: Pubkey, cluster_info: &Arc<RwLock<ClusterInfo>>) -> Self {
Self {
tpu_mode: None,
exit: Arc::new(AtomicBool::new(false)),
id,
cluster_info: cluster_info.clone(),
}
}
fn mode_exit(&mut self) {
match &mut self.tpu_mode {
Some(TpuMode::Leader(svcs)) => {
svcs.exit();
}
Some(TpuMode::Forwarder(svcs)) => {
svcs.exit();
}
None => (),
}
}
fn mode_close(&mut self) {
let tpu_mode = self.tpu_mode.take();
if let Some(tpu_mode) = tpu_mode {
match tpu_mode {
TpuMode::Leader(svcs) => {
let _ = svcs.close();
}
TpuMode::Forwarder(svcs) => {
let _ = svcs.close();
}
}
}
}
fn forward_unprocessed_packets(
tpu: &std::net::SocketAddr,
unprocessed_packets: UnprocessedPackets,
) -> std::io::Result<()> {
let socket = UdpSocket::bind("0.0.0.0:0")?;
for (packets, start_index) in unprocessed_packets {
let packets = packets.read().unwrap();
for packet in packets.packets.iter().skip(start_index) {
socket.send_to(&packet.data[..packet.meta.size], tpu)?;
}
}
Ok(())
}
fn close_and_forward_unprocessed_packets(&mut self) {
self.mode_exit();
let unprocessed_packets = match self.tpu_mode.as_mut() {
Some(TpuMode::Leader(svcs)) => {
svcs.banking_stage.join_and_collect_unprocessed_packets()
}
Some(TpuMode::Forwarder(svcs)) => {
svcs.tpu_forwarder.join_and_collect_unprocessed_packets()
}
None => vec![],
};
if !unprocessed_packets.is_empty() {
let tpu = self.cluster_info.read().unwrap().leader_data().unwrap().tpu;
info!("forwarding unprocessed packets to new leader at {:?}", tpu);
Tpu::forward_unprocessed_packets(&tpu, unprocessed_packets).unwrap_or_else(|err| {
warn!("Failed to forward unprocessed transactions: {:?}", err)
});
}
self.mode_close();
}
pub fn switch_to_forwarder(&mut self, leader_id: Pubkey, transactions_sockets: Vec<UdpSocket>) {
self.close_and_forward_unprocessed_packets();
self.cluster_info.write().unwrap().set_leader(leader_id);
let tpu_forwarder = TpuForwarder::new(transactions_sockets, self.cluster_info.clone());
self.tpu_mode = Some(TpuMode::Forwarder(ForwarderServices::new(tpu_forwarder)));
}
#[allow(clippy::too_many_arguments)]
pub fn switch_to_leader(
&mut self,
bank: &Arc<Bank>,
pub fn new(
id: Pubkey,
cluster_info: &Arc<RwLock<ClusterInfo>>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
entry_receiver: Receiver<WorkingBankEntries>,
transactions_sockets: Vec<UdpSocket>,
broadcast_socket: UdpSocket,
sigverify_disabled: bool,
slot: u64,
blocktree: &Arc<Blocktree>,
) {
self.close_and_forward_unprocessed_packets();
) -> Self {
cluster_info.write().unwrap().set_leader(id);
self.cluster_info.write().unwrap().set_leader(self.id);
self.exit = Arc::new(AtomicBool::new(false));
let exit = Arc::new(AtomicBool::new(false));
let (packet_sender, packet_receiver) = channel();
let fetch_stage = FetchStage::new_with_sender(
transactions_sockets,
self.exit.clone(),
&packet_sender.clone(),
);
let cluster_info_vote_listener = ClusterInfoVoteListener::new(
self.exit.clone(),
self.cluster_info.clone(),
packet_sender,
);
let fetch_stage =
FetchStage::new_with_sender(transactions_sockets, exit.clone(), &packet_sender.clone());
let cluster_info_vote_listener =
ClusterInfoVoteListener::new(exit.clone(), cluster_info.clone(), packet_sender);
let (sigverify_stage, verified_receiver) =
SigVerifyStage::new(packet_receiver, sigverify_disabled);
// TODO: Fix BankingStage/BroadcastStage to operate on `slot` directly instead of
// `max_tick_height`
let max_tick_height = (slot + 1) * bank.ticks_per_slot() - 1;
let blob_index = blocktree
.meta(slot)
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0);
let (banking_stage, entry_receiver) =
BankingStage::new(&bank, poh_recorder, verified_receiver, max_tick_height);
let banking_stage = BankingStage::new(&cluster_info, poh_recorder, verified_receiver);
let broadcast_stage = BroadcastStage::new(
slot,
bank,
broadcast_socket,
self.cluster_info.clone(),
blob_index,
cluster_info.clone(),
entry_receiver,
self.exit.clone(),
exit.clone(),
blocktree,
);
let svcs = LeaderServices::new(
let leader_services = LeaderServices::new(
fetch_stage,
sigverify_stage,
banking_stage,
cluster_info_vote_listener,
broadcast_stage,
);
self.tpu_mode = Some(TpuMode::Leader(svcs));
Self {
leader_services,
exit,
id,
}
}
pub fn exit(&self) {
@ -258,8 +127,8 @@ impl Tpu {
self.exit.load(Ordering::Relaxed)
}
pub fn close(mut self) -> thread::Result<()> {
self.mode_close();
pub fn close(self) -> thread::Result<()> {
self.exit();
self.join()
}
}
@ -268,11 +137,6 @@ impl Service for Tpu {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
match self.tpu_mode {
Some(TpuMode::Leader(svcs)) => svcs.join()?,
Some(TpuMode::Forwarder(svcs)) => svcs.join()?,
None => (),
}
Ok(())
self.leader_services.join()
}
}

View File

@ -18,29 +18,19 @@ use crate::blockstream_service::BlockstreamService;
use crate::blocktree::Blocktree;
use crate::blocktree_processor::BankForksInfo;
use crate::cluster_info::ClusterInfo;
use crate::poh_recorder::PohRecorder;
use crate::replay_stage::ReplayStage;
use crate::retransmit_stage::RetransmitStage;
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use crate::storage_stage::{StorageStage, StorageState};
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, RwLock};
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
pub struct TvuRotationInfo {
pub tick_height: u64, // tick height, bank might not exist yet
pub blockhash: Hash, // blockhash that was voted on
pub slot: u64, // slot height to initiate a rotation
pub leader_id: Pubkey, // leader upon rotation
}
pub type TvuRotationSender = Sender<TvuRotationInfo>;
pub type TvuRotationReceiver = Receiver<TvuRotationInfo>;
pub struct Tvu {
fetch_stage: BlobFetchStage,
retransmit_stage: RetransmitStage,
@ -72,11 +62,11 @@ impl Tvu {
sockets: Sockets,
blocktree: Arc<Blocktree>,
storage_rotate_count: u64,
to_leader_sender: &TvuRotationSender,
storage_state: &StorageState,
blockstream: Option<&String>,
ledger_signal_receiver: Receiver<bool>,
subscriptions: &Arc<RpcSubscriptions>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
) -> Self
where
T: 'static + KeypairUtil + Sync + Send,
@ -124,9 +114,9 @@ impl Tvu {
&bank_forks_info,
cluster_info.clone(),
exit.clone(),
to_leader_sender,
ledger_signal_receiver,
subscriptions,
poh_recorder,
);
let blockstream_service = if blockstream.is_some() {
@ -197,6 +187,7 @@ impl Service for Tvu {
#[cfg(test)]
pub mod tests {
use super::*;
use crate::banking_stage::create_test_recorder;
use crate::blocktree::get_tmp_ledger_path;
use crate::cluster_info::{ClusterInfo, Node};
use crate::storage_stage::STORAGE_ROTATE_TEST_COUNT;
@ -228,7 +219,8 @@ pub mod tests {
let blocktree_path = get_tmp_ledger_path!();
let (blocktree, l_receiver) = Blocktree::open_with_signal(&blocktree_path)
.expect("Expected to successfully open ledger");
let (sender, _receiver) = channel();
let bank = bank_forks.working_bank();
let (poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank);
let tvu = Tvu::new(
Some(Arc::new(Keypair::new())),
&Arc::new(RwLock::new(bank_forks)),
@ -243,12 +235,13 @@ pub mod tests {
},
Arc::new(blocktree),
STORAGE_ROTATE_TEST_COUNT,
&sender,
&StorageState::default(),
None,
l_receiver,
&Arc::new(RpcSubscriptions::default()),
&poh_recorder,
);
tvu.close().expect("close");
poh_service.close().expect("close");
}
}

View File

@ -4,6 +4,7 @@ use solana::client::mk_client;
use solana::cluster_info::{Node, NodeInfo, FULLNODE_PORT_RANGE};
use solana::fullnode::{Fullnode, FullnodeConfig};
use solana::local_vote_signer_service::LocalVoteSignerService;
use solana::service::Service;
use solana::socketaddr;
use solana::thin_client::{poll_gossip_for_leader, ThinClient};
use solana::voting_keypair::{RemoteVoteSigner, VotingKeypair};
@ -17,7 +18,6 @@ use std::fs::File;
use std::io::{Error, ErrorKind, Result};
use std::net::{Ipv4Addr, SocketAddr};
use std::process::exit;
use std::sync::mpsc::channel;
use std::sync::Arc;
fn parse_identity(matches: &ArgMatches<'_>) -> (Keypair, SocketAddr) {
@ -286,9 +286,6 @@ fn main() {
&fullnode_config,
);
let (rotation_sender, rotation_receiver) = channel();
fullnode.run(Some(rotation_sender));
if !fullnode_config.voting_disabled {
let leader_node_info = loop {
info!("Looking for leader...");
@ -313,10 +310,6 @@ fn main() {
File::create(filename).unwrap_or_else(|_| panic!("Unable to create: {}", filename));
}
info!("Node initialized");
loop {
info!(
"Node rotation event: {:?}",
rotation_receiver.recv().unwrap()
);
}
fullnode.join().expect("fullnode exit");
info!("Node exiting..");
}

View File

@ -61,7 +61,6 @@ fn test_replicator_startup_basic() {
None,
&fullnode_config,
);
let leader_exit = leader.run(None);
debug!(
"leader: {:?}",
@ -91,7 +90,6 @@ fn test_replicator_startup_basic() {
Some(&leader_info),
&fullnode_config,
);
let validator_exit = validator.run(None);
let bob = Keypair::new();
@ -217,8 +215,8 @@ fn test_replicator_startup_basic() {
}
replicator.close();
validator_exit();
leader_exit();
validator.close().unwrap();
leader.close().unwrap();
}
info!("cleanup");

View File

@ -17,7 +17,6 @@ fn test_rpc_send_tx() {
solana_logger::setup();
let (server, leader_data, alice, ledger_path) = new_fullnode();
let server_exit = server.run(None);
let bob_pubkey = Keypair::new().pubkey();
let client = reqwest::Client::new();
@ -93,6 +92,6 @@ fn test_rpc_send_tx() {
assert_eq!(confirmed_tx, true);
server_exit();
server.close().unwrap();
remove_dir_all(ledger_path).unwrap();
}

View File

@ -3,6 +3,7 @@ extern crate solana;
use log::*;
use solana::bank_forks::BankForks;
use solana::banking_stage::create_test_recorder;
use solana::blocktree::{get_tmp_ledger_path, Blocktree};
use solana::blocktree_processor::BankForksInfo;
use solana::cluster_info::{ClusterInfo, Node};
@ -110,7 +111,7 @@ fn test_replay() {
.expect("Expected to successfully open ledger");
let vote_account_keypair = Arc::new(Keypair::new());
let voting_keypair = VotingKeypair::new_local(&vote_account_keypair);
let (to_leader_sender, _to_leader_receiver) = channel();
let (poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank);
let tvu = Tvu::new(
Some(Arc::new(voting_keypair)),
&Arc::new(RwLock::new(bank_forks)),
@ -125,11 +126,11 @@ fn test_replay() {
},
Arc::new(blocktree),
STORAGE_ROTATE_TEST_COUNT,
&to_leader_sender,
&StorageState::default(),
None,
ledger_signal_receiver,
&Arc::new(RpcSubscriptions::default()),
&poh_recorder,
);
let mut alice_ref_balance = starting_balance;
@ -182,6 +183,7 @@ fn test_replay() {
let bob_balance = bank.get_balance(&bob_keypair.pubkey());
assert_eq!(bob_balance, starting_balance - alice_ref_balance);
poh_service.close().expect("close");
tvu.close().expect("close");
exit.store(true, Ordering::Relaxed);
dr_l.join().expect("join");

View File

@ -20,7 +20,6 @@ fn test_wallet_deploy_program() {
pathbuf.set_extension("so");
let (server, leader_data, alice, ledger_path) = new_fullnode();
let server_exit = server.run(None);
let (sender, receiver) = channel();
run_local_drone(alice, sender);
@ -76,6 +75,6 @@ fn test_wallet_deploy_program() {
&elf
);
server_exit();
server.close().unwrap();
remove_dir_all(ledger_path).unwrap();
}

View File

@ -21,7 +21,6 @@ fn check_balance(expected_balance: u64, client: &RpcClient, pubkey: Pubkey) {
#[test]
fn test_wallet_timestamp_tx() {
let (server, leader_data, alice, ledger_path) = new_fullnode();
let server_exit = server.run(None);
let bob_pubkey = Keypair::new().pubkey();
let (sender, receiver) = channel();
@ -75,14 +74,13 @@ fn test_wallet_timestamp_tx() {
check_balance(1, &rpc_client, process_id); // contract balance
check_balance(10, &rpc_client, bob_pubkey); // recipient balance
server_exit();
server.close().unwrap();
remove_dir_all(ledger_path).unwrap();
}
#[test]
fn test_wallet_witness_tx() {
let (server, leader_data, alice, ledger_path) = new_fullnode();
let server_exit = server.run(None);
let bob_pubkey = Keypair::new().pubkey();
let (sender, receiver) = channel();
@ -133,14 +131,13 @@ fn test_wallet_witness_tx() {
check_balance(1, &rpc_client, process_id); // contract balance
check_balance(10, &rpc_client, bob_pubkey); // recipient balance
server_exit();
server.close().unwrap();
remove_dir_all(ledger_path).unwrap();
}
#[test]
fn test_wallet_cancel_tx() {
let (server, leader_data, alice, ledger_path) = new_fullnode();
let server_exit = server.run(None);
let bob_pubkey = Keypair::new().pubkey();
let (sender, receiver) = channel();
@ -191,6 +188,6 @@ fn test_wallet_cancel_tx() {
check_balance(1, &rpc_client, process_id); // contract balance
check_balance(0, &rpc_client, bob_pubkey); // recipient balance
server_exit();
server.close().unwrap();
remove_dir_all(ledger_path).unwrap();
}

View File

@ -9,7 +9,6 @@ use std::sync::mpsc::channel;
#[test]
fn test_wallet_request_airdrop() {
let (server, leader_data, alice, ledger_path) = new_fullnode();
let server_exit = server.run(None);
let (sender, receiver) = channel();
run_local_drone(alice, sender);
let drone_addr = receiver.recv().unwrap();
@ -30,6 +29,6 @@ fn test_wallet_request_airdrop() {
.unwrap();
assert_eq!(balance, 50);
server_exit();
server.close().unwrap();
remove_dir_all(ledger_path).unwrap();
}