Cleanup banking stage in lieu of recent transaction forwarding changes (#4101)

This commit is contained in:
Pankaj Garg 2019-05-01 15:13:10 -07:00 committed by GitHub
parent 5eee9e62e5
commit 3eec3cfac2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 50 additions and 130 deletions

View File

@ -10,7 +10,6 @@ use solana::banking_stage::{create_test_recorder, BankingStage};
use solana::blocktree::{get_tmp_ledger_path, Blocktree}; use solana::blocktree::{get_tmp_ledger_path, Blocktree};
use solana::cluster_info::ClusterInfo; use solana::cluster_info::ClusterInfo;
use solana::cluster_info::Node; use solana::cluster_info::Node;
use solana::leader_schedule_cache::LeaderScheduleCache;
use solana::packet::to_packets_chunked; use solana::packet::to_packets_chunked;
use solana::poh_recorder::WorkingBankEntries; use solana::poh_recorder::WorkingBankEntries;
use solana::service::Service; use solana::service::Service;
@ -58,7 +57,6 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (vote_sender, vote_receiver) = channel(); let (vote_sender, vote_receiver) = channel();
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let dummy = system_transaction::transfer( let dummy = system_transaction::transfer(
&mint_keypair, &mint_keypair,
&mint_keypair.pubkey(), &mint_keypair.pubkey(),
@ -124,7 +122,6 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
&poh_recorder, &poh_recorder,
verified_receiver, verified_receiver,
vote_receiver, vote_receiver,
&leader_schedule_cache,
); );
poh_recorder.lock().unwrap().set_bank(&bank); poh_recorder.lock().unwrap().set_bank(&bank);
@ -167,7 +164,6 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (vote_sender, vote_receiver) = channel(); let (vote_sender, vote_receiver) = channel();
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let dummy = system_transaction::transfer( let dummy = system_transaction::transfer(
&mint_keypair, &mint_keypair,
&mint_keypair.pubkey(), &mint_keypair.pubkey(),
@ -249,7 +245,6 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
&poh_recorder, &poh_recorder,
verified_receiver, verified_receiver,
vote_receiver, vote_receiver,
&leader_schedule_cache,
); );
poh_recorder.lock().unwrap().set_bank(&bank); poh_recorder.lock().unwrap().set_bank(&bank);

View File

@ -3,7 +3,6 @@
//! can do its processing in parallel with signature verification on the GPU. //! can do its processing in parallel with signature verification on the GPU.
use crate::blocktree::Blocktree; use crate::blocktree::Blocktree;
use crate::cluster_info::ClusterInfo; use crate::cluster_info::ClusterInfo;
use crate::contact_info::ContactInfo;
use crate::entry; use crate::entry;
use crate::entry::{hash_transactions, Entry}; use crate::entry::{hash_transactions, Entry};
use crate::leader_schedule_cache::LeaderScheduleCache; use crate::leader_schedule_cache::LeaderScheduleCache;
@ -56,7 +55,6 @@ impl BankingStage {
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: Receiver<VerifiedPackets>, verified_receiver: Receiver<VerifiedPackets>,
verified_vote_receiver: Receiver<VerifiedPackets>, verified_vote_receiver: Receiver<VerifiedPackets>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
) -> Self { ) -> Self {
Self::new_num_threads( Self::new_num_threads(
cluster_info, cluster_info,
@ -64,17 +62,15 @@ impl BankingStage {
verified_receiver, verified_receiver,
verified_vote_receiver, verified_vote_receiver,
cmp::min(2, Self::num_threads()), cmp::min(2, Self::num_threads()),
leader_schedule_cache,
) )
} }
pub fn new_num_threads( fn new_num_threads(
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: Receiver<VerifiedPackets>, verified_receiver: Receiver<VerifiedPackets>,
verified_vote_receiver: Receiver<VerifiedPackets>, verified_vote_receiver: Receiver<VerifiedPackets>,
num_threads: u32, num_threads: u32,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
) -> Self { ) -> Self {
let verified_receiver = Arc::new(Mutex::new(verified_receiver)); let verified_receiver = Arc::new(Mutex::new(verified_receiver));
let verified_vote_receiver = Arc::new(Mutex::new(verified_vote_receiver)); let verified_vote_receiver = Arc::new(Mutex::new(verified_vote_receiver));
@ -98,7 +94,6 @@ impl BankingStage {
let cluster_info = cluster_info.clone(); let cluster_info = cluster_info.clone();
let exit = exit.clone(); let exit = exit.clone();
let mut recv_start = Instant::now(); let mut recv_start = Instant::now();
let leader_schedule_cache = leader_schedule_cache.clone();
Builder::new() Builder::new()
.name("solana-banking-stage-tx".to_string()) .name("solana-banking-stage-tx".to_string())
.spawn(move || { .spawn(move || {
@ -108,7 +103,6 @@ impl BankingStage {
&cluster_info, &cluster_info,
&mut recv_start, &mut recv_start,
enable_forwarding, enable_forwarding,
leader_schedule_cache,
); );
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
}) })
@ -118,7 +112,7 @@ impl BankingStage {
Self { bank_thread_hdls } Self { bank_thread_hdls }
} }
fn forward_unprocessed_packets( fn forward_buffered_packets(
socket: &std::net::UdpSocket, socket: &std::net::UdpSocket,
tpu_via_blobs: &std::net::SocketAddr, tpu_via_blobs: &std::net::SocketAddr,
unprocessed_packets: &[(Packets, usize, Vec<u8>)], unprocessed_packets: &[(Packets, usize, Vec<u8>)],
@ -141,7 +135,7 @@ impl BankingStage {
Ok(()) Ok(())
} }
fn process_buffered_packets( fn consume_buffered_packets(
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
buffered_packets: &[(Packets, usize, Vec<u8>)], buffered_packets: &[(Packets, usize, Vec<u8>)],
) -> Result<UnprocessedPackets> { ) -> Result<UnprocessedPackets> {
@ -188,13 +182,13 @@ impl BankingStage {
Ok(unprocessed_packets) Ok(unprocessed_packets)
} }
fn process_or_forward_packets( fn consume_or_forward_packets(
leader_data: Option<&ContactInfo>, leader_id: Option<Pubkey>,
bank_is_available: bool, bank_is_available: bool,
would_be_leader: bool, would_be_leader: bool,
my_id: &Pubkey, my_id: &Pubkey,
) -> BufferedPacketsDecision { ) -> BufferedPacketsDecision {
leader_data.map_or( leader_id.map_or(
// If leader is not known, return the buffered packets as is // If leader is not known, return the buffered packets as is
BufferedPacketsDecision::Hold, BufferedPacketsDecision::Hold,
// else process the packets // else process the packets
@ -205,7 +199,7 @@ impl BankingStage {
} else if would_be_leader { } else if would_be_leader {
// If the node will be the leader soon, hold the packets for now // If the node will be the leader soon, hold the packets for now
BufferedPacketsDecision::Hold BufferedPacketsDecision::Hold
} else if x.id != *my_id { } else if x != *my_id {
// If the current node is not the leader, forward the buffered packets // If the current node is not the leader, forward the buffered packets
BufferedPacketsDecision::Forward BufferedPacketsDecision::Forward
} else { } else {
@ -216,7 +210,7 @@ impl BankingStage {
) )
} }
fn handle_buffered_packets( fn process_buffered_packets(
socket: &std::net::UdpSocket, socket: &std::net::UdpSocket,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
@ -225,84 +219,59 @@ impl BankingStage {
) -> Result<UnprocessedPackets> { ) -> Result<UnprocessedPackets> {
let rcluster_info = cluster_info.read().unwrap(); let rcluster_info = cluster_info.read().unwrap();
let decision = { let (decision, next_leader) = {
let poh = poh_recorder.lock().unwrap(); let poh = poh_recorder.lock().unwrap();
Self::process_or_forward_packets( let next_leader = poh.next_slot_leader(DEFAULT_TICKS_PER_SLOT, None);
rcluster_info.leader_data(), (
Self::consume_or_forward_packets(
next_leader,
poh.bank().is_some(), poh.bank().is_some(),
poh.would_be_leader(DEFAULT_TICKS_PER_SLOT), poh.would_be_leader(DEFAULT_TICKS_PER_SLOT * 2),
&rcluster_info.id(), &rcluster_info.id(),
),
next_leader,
) )
}; };
match decision { match decision {
BufferedPacketsDecision::Consume => { BufferedPacketsDecision::Consume => {
Self::process_buffered_packets(poh_recorder, buffered_packets) Self::consume_buffered_packets(poh_recorder, buffered_packets)
} }
BufferedPacketsDecision::Forward => { BufferedPacketsDecision::Forward => {
if enable_forwarding { if enable_forwarding {
if let Some(leader_id) = poh_recorder next_leader.map_or(Ok(buffered_packets.to_vec()), |leader_id| {
.lock() rcluster_info.lookup(&leader_id).map_or(
.unwrap() Ok(buffered_packets.to_vec()),
.next_slot_leader(DEFAULT_TICKS_PER_SLOT, None) |leader| {
{ let _ = Self::forward_buffered_packets(
if let Some(leader) = rcluster_info.lookup(&leader_id) {
let _ = Self::forward_unprocessed_packets(
&socket, &socket,
&leader.tpu_via_blobs, &leader.tpu_via_blobs,
&buffered_packets, &buffered_packets,
); );
}
}
}
Ok(vec![]) Ok(vec![])
},
)
})
} else {
Ok(vec![])
}
} }
_ => Ok(buffered_packets.to_vec()), _ => Ok(buffered_packets.to_vec()),
} }
} }
fn should_buffer_packets(
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
) -> bool {
let rcluster_info = cluster_info.read().unwrap();
// Buffer the packets if I am the next leader
// or, if it was getting sent to me
// or, the next leader is unknown
let poh = poh_recorder.lock().unwrap();
let leader_id = match poh.bank() {
Some(bank) => leader_schedule_cache
.slot_leader_at(bank.slot() + 1, Some(&bank))
.unwrap_or_default(),
None => {
if poh.would_be_leader(DEFAULT_TICKS_PER_SLOT) {
rcluster_info.id()
} else {
rcluster_info
.leader_data()
.map_or(rcluster_info.id(), |x| x.id)
}
}
};
leader_id == rcluster_info.id()
}
pub fn process_loop( pub fn process_loop(
verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>, verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
recv_start: &mut Instant, recv_start: &mut Instant,
enable_forwarding: bool, enable_forwarding: bool,
leader_schedule_cache: Arc<LeaderScheduleCache>,
) { ) {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buffered_packets = vec![]; let mut buffered_packets = vec![];
loop { loop {
if !buffered_packets.is_empty() { if !buffered_packets.is_empty() {
Self::handle_buffered_packets( Self::process_buffered_packets(
&socket, &socket,
poh_recorder, poh_recorder,
cluster_info, cluster_info,
@ -330,48 +299,18 @@ impl BankingStage {
if unprocessed_packets.is_empty() { if unprocessed_packets.is_empty() {
continue; continue;
} }
if Self::should_buffer_packets(
poh_recorder,
cluster_info,
&leader_schedule_cache,
) {
let num = unprocessed_packets let num = unprocessed_packets
.iter() .iter()
.map(|(x, start, _)| x.packets.len().saturating_sub(*start)) .map(|(x, start, _)| x.packets.len().saturating_sub(*start))
.sum(); .sum();
inc_new_counter_info!("banking_stage-buffered_packets", num); inc_new_counter_info!("banking_stage-buffered_packets", num);
buffered_packets.extend_from_slice(&unprocessed_packets); buffered_packets.extend_from_slice(&unprocessed_packets);
continue;
}
if enable_forwarding {
let rcluster_info = cluster_info.read().unwrap();
if let Some(leader_id) = poh_recorder
.lock()
.unwrap()
.next_slot_leader(DEFAULT_TICKS_PER_SLOT, None)
{
if let Some(leader) = rcluster_info.lookup(&leader_id) {
let _ = Self::forward_unprocessed_packets(
&socket,
&leader.tpu_via_blobs,
&unprocessed_packets,
);
}
}
}
} }
Err(err) => { Err(err) => {
debug!("solana-banking-stage-tx: exit due to {:?}", err); debug!("solana-banking-stage-tx: exit due to {:?}", err);
break; break;
} }
} }
let num = buffered_packets
.iter()
.map(|(x, start, _)| x.packets.len().saturating_sub(*start))
.sum();
inc_new_counter_info!("banking_stage-total_buffered_packets", num);
} }
} }
@ -700,7 +639,6 @@ mod tests {
fn test_banking_stage_shutdown1() { fn test_banking_stage_shutdown1() {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (vote_sender, vote_receiver) = channel(); let (vote_sender, vote_receiver) = channel();
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
@ -717,7 +655,6 @@ mod tests {
&poh_recorder, &poh_recorder,
verified_receiver, verified_receiver,
vote_receiver, vote_receiver,
&leader_schedule_cache,
); );
drop(verified_sender); drop(verified_sender);
drop(vote_sender); drop(vote_sender);
@ -734,7 +671,6 @@ mod tests {
let (mut genesis_block, _mint_keypair) = GenesisBlock::new(2); let (mut genesis_block, _mint_keypair) = GenesisBlock::new(2);
genesis_block.ticks_per_slot = 4; genesis_block.ticks_per_slot = 4;
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let start_hash = bank.last_blockhash(); let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (vote_sender, vote_receiver) = channel(); let (vote_sender, vote_receiver) = channel();
@ -752,7 +688,6 @@ mod tests {
&poh_recorder, &poh_recorder,
verified_receiver, verified_receiver,
vote_receiver, vote_receiver,
&leader_schedule_cache,
); );
trace!("sending bank"); trace!("sending bank");
sleep(Duration::from_millis(600)); sleep(Duration::from_millis(600));
@ -781,7 +716,6 @@ mod tests {
solana_logger::setup(); solana_logger::setup();
let (genesis_block, mint_keypair) = GenesisBlock::new(10); let (genesis_block, mint_keypair) = GenesisBlock::new(10);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let start_hash = bank.last_blockhash(); let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (vote_sender, vote_receiver) = channel(); let (vote_sender, vote_receiver) = channel();
@ -799,7 +733,6 @@ mod tests {
&poh_recorder, &poh_recorder,
verified_receiver, verified_receiver,
vote_receiver, vote_receiver,
&leader_schedule_cache,
); );
// fund another account so we can send 2 good transactions in a single batch. // fund another account so we can send 2 good transactions in a single batch.
@ -921,7 +854,6 @@ mod tests {
let entry_receiver = { let entry_receiver = {
// start a banking_stage to eat verified receiver // start a banking_stage to eat verified receiver
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let blocktree = Arc::new( let blocktree = Arc::new(
Blocktree::open(&ledger_path) Blocktree::open(&ledger_path)
.expect("Expected to be able to open database ledger"), .expect("Expected to be able to open database ledger"),
@ -937,7 +869,6 @@ mod tests {
verified_receiver, verified_receiver,
vote_receiver, vote_receiver,
2, 2,
&leader_schedule_cache,
); );
// wait for banking_stage to eat the packets // wait for banking_stage to eat the packets
@ -1038,38 +969,36 @@ mod tests {
let my_id1 = Pubkey::new_rand(); let my_id1 = Pubkey::new_rand();
assert_eq!( assert_eq!(
BankingStage::process_or_forward_packets(None, true, false, &my_id), BankingStage::consume_or_forward_packets(None, true, false, &my_id),
BufferedPacketsDecision::Hold BufferedPacketsDecision::Hold
); );
assert_eq!( assert_eq!(
BankingStage::process_or_forward_packets(None, false, false, &my_id), BankingStage::consume_or_forward_packets(None, false, false, &my_id),
BufferedPacketsDecision::Hold BufferedPacketsDecision::Hold
); );
assert_eq!( assert_eq!(
BankingStage::process_or_forward_packets(None, false, false, &my_id1), BankingStage::consume_or_forward_packets(None, false, false, &my_id1),
BufferedPacketsDecision::Hold BufferedPacketsDecision::Hold
); );
let mut contact_info = ContactInfo::default();
contact_info.id = my_id1;
assert_eq!( assert_eq!(
BankingStage::process_or_forward_packets(Some(&contact_info), false, false, &my_id), BankingStage::consume_or_forward_packets(Some(my_id1.clone()), false, false, &my_id),
BufferedPacketsDecision::Forward BufferedPacketsDecision::Forward
); );
assert_eq!( assert_eq!(
BankingStage::process_or_forward_packets(Some(&contact_info), false, true, &my_id), BankingStage::consume_or_forward_packets(Some(my_id1.clone()), false, true, &my_id),
BufferedPacketsDecision::Hold BufferedPacketsDecision::Hold
); );
assert_eq!( assert_eq!(
BankingStage::process_or_forward_packets(Some(&contact_info), true, false, &my_id), BankingStage::consume_or_forward_packets(Some(my_id1.clone()), true, false, &my_id),
BufferedPacketsDecision::Consume BufferedPacketsDecision::Consume
); );
assert_eq!( assert_eq!(
BankingStage::process_or_forward_packets(Some(&contact_info), false, false, &my_id1), BankingStage::consume_or_forward_packets(Some(my_id1.clone()), false, false, &my_id1),
BufferedPacketsDecision::Hold BufferedPacketsDecision::Hold
); );
assert_eq!( assert_eq!(
BankingStage::process_or_forward_packets(Some(&contact_info), true, false, &my_id1), BankingStage::consume_or_forward_packets(Some(my_id1.clone()), true, false, &my_id1),
BufferedPacketsDecision::Consume BufferedPacketsDecision::Consume
); );
} }

View File

@ -64,7 +64,7 @@ impl FetchStage {
if poh_recorder if poh_recorder
.lock() .lock()
.unwrap() .unwrap()
.would_be_leader(DEFAULT_TICKS_PER_SLOT) .would_be_leader(DEFAULT_TICKS_PER_SLOT * 2)
{ {
inc_new_counter_info!("fetch_stage-honor_forwards", len); inc_new_counter_info!("fetch_stage-honor_forwards", len);
for packets in batch { for packets in batch {

View File

@ -255,7 +255,6 @@ impl Fullnode {
config.sigverify_disabled, config.sigverify_disabled,
&blocktree, &blocktree,
sender, sender,
&leader_schedule_cache,
&exit, &exit,
&genesis_blockhash, &genesis_blockhash,
); );

View File

@ -8,7 +8,6 @@ use crate::cluster_info::ClusterInfo;
use crate::cluster_info_vote_listener::ClusterInfoVoteListener; use crate::cluster_info_vote_listener::ClusterInfoVoteListener;
use crate::entry::EntrySender; use crate::entry::EntrySender;
use crate::fetch_stage::FetchStage; use crate::fetch_stage::FetchStage;
use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::poh_recorder::{PohRecorder, WorkingBankEntries}; use crate::poh_recorder::{PohRecorder, WorkingBankEntries};
use crate::service::Service; use crate::service::Service;
use crate::sigverify_stage::SigVerifyStage; use crate::sigverify_stage::SigVerifyStage;
@ -41,7 +40,6 @@ impl Tpu {
sigverify_disabled: bool, sigverify_disabled: bool,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
storage_entry_sender: EntrySender, storage_entry_sender: EntrySender,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
genesis_blockhash: &Hash, genesis_blockhash: &Hash,
) -> Self { ) -> Self {
@ -74,7 +72,6 @@ impl Tpu {
poh_recorder, poh_recorder,
verified_receiver, verified_receiver,
verified_vote_receiver, verified_vote_receiver,
leader_schedule_cache,
); );
let broadcast_stage = BroadcastStage::new( let broadcast_stage = BroadcastStage::new(