delete leader rotation signal from banking stage

This commit is contained in:
Carl 2019-02-15 22:10:21 -08:00 committed by Greg Fitzgerald
parent 4e3d71c2c9
commit f33c6eb95f
6 changed files with 28 additions and 68 deletions

View File

@ -100,7 +100,6 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
(x, iter::repeat(1).take(len).collect())
})
.collect();
let (to_leader_sender, _to_leader_recvr) = channel();
let (_stage, signal_receiver) = BankingStage::new(
&bank,
verified_receiver,
@ -108,7 +107,6 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
&genesis_block.last_id(),
std::u64::MAX,
genesis_block.bootstrap_leader_id,
&to_leader_sender,
);
let mut id = genesis_block.last_id();
@ -209,7 +207,6 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
(x, iter::repeat(1).take(len).collect())
})
.collect();
let (to_leader_sender, _to_leader_recvr) = channel();
let (_stage, signal_receiver) = BankingStage::new(
&bank,
verified_receiver,
@ -217,7 +214,6 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
&genesis_block.last_id(),
std::u64::MAX,
genesis_block.bootstrap_leader_id,
&to_leader_sender,
);
let mut id = genesis_block.last_id();

View File

@ -14,7 +14,6 @@ use crate::poh_service::{PohService, PohServiceConfig};
use crate::result::{Error, Result};
use crate::service::Service;
use crate::sigverify_stage::VerifiedPackets;
use crate::tpu::TpuRotationSender;
use bincode::deserialize;
use log::Level;
use solana_sdk::hash::Hash;
@ -51,7 +50,6 @@ impl BankingStage {
last_entry_id: &Hash,
max_tick_height: u64,
leader_id: Pubkey,
to_validator_sender: &TpuRotationSender,
) -> (Self, Receiver<Vec<Entry>>) {
let (entry_sender, entry_receiver) = channel();
let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver));
@ -61,8 +59,7 @@ impl BankingStage {
// Single thread to generate entries from many banks.
// This thread talks to poh_service and broadcasts the entries once they have been recorded.
// Once an entry has been recorded, its last_id is registered with the bank.
let poh_service =
PohService::new(poh_recorder.clone(), config, to_validator_sender.clone());
let poh_service = PohService::new(poh_recorder.clone(), config);
// Single thread to compute confirmation
let compute_confirmation_service = ComputeLeaderConfirmationService::new(
@ -357,7 +354,6 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let (verified_sender, verified_receiver) = channel();
let (to_validator_sender, _) = channel();
let (banking_stage, _entry_receiver) = BankingStage::new(
&bank,
verified_receiver,
@ -365,7 +361,6 @@ mod tests {
&bank.last_id(),
DEFAULT_TICKS_PER_SLOT,
genesis_block.bootstrap_leader_id,
&to_validator_sender,
);
drop(verified_sender);
banking_stage.join().unwrap();
@ -377,7 +372,6 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_block));
let start_hash = bank.last_id();
let (verified_sender, verified_receiver) = channel();
let (to_validator_sender, _) = channel();
let (banking_stage, entry_receiver) = BankingStage::new(
&bank,
verified_receiver,
@ -385,7 +379,6 @@ mod tests {
&bank.last_id(),
DEFAULT_TICKS_PER_SLOT,
genesis_block.bootstrap_leader_id,
&to_validator_sender,
);
sleep(Duration::from_millis(500));
drop(verified_sender);
@ -403,7 +396,6 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_block));
let start_hash = bank.last_id();
let (verified_sender, verified_receiver) = channel();
let (to_validator_sender, _) = channel();
let (banking_stage, entry_receiver) = BankingStage::new(
&bank,
verified_receiver,
@ -411,7 +403,6 @@ mod tests {
&bank.last_id(),
DEFAULT_TICKS_PER_SLOT,
genesis_block.bootstrap_leader_id,
&to_validator_sender,
);
// good tx
@ -458,7 +449,6 @@ mod tests {
let (genesis_block, mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let (verified_sender, verified_receiver) = channel();
let (to_validator_sender, _) = channel();
let (banking_stage, entry_receiver) = BankingStage::new(
&bank,
verified_receiver,
@ -466,7 +456,6 @@ mod tests {
&bank.last_id(),
DEFAULT_TICKS_PER_SLOT,
genesis_block.bootstrap_leader_id,
&to_validator_sender,
);
// Process a batch that includes a transaction that receives two tokens.
@ -522,7 +511,6 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let (verified_sender, verified_receiver) = channel();
let (to_validator_sender, to_validator_receiver) = channel();
let max_tick_height = 10;
let (banking_stage, _entry_receiver) = BankingStage::new(
&bank,
@ -531,9 +519,16 @@ mod tests {
&bank.last_id(),
max_tick_height,
genesis_block.bootstrap_leader_id,
&to_validator_sender,
);
assert_eq!(to_validator_receiver.recv().unwrap(), max_tick_height);
loop {
let bank_tick_height = bank.tick_height();
if bank_tick_height >= max_tick_height {
break;
}
sleep(Duration::from_millis(10));
}
drop(verified_sender);
banking_stage.join().unwrap();
}
@ -545,7 +540,6 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_block));
let ticks_per_slot = 1;
let (verified_sender, verified_receiver) = channel();
let (to_validator_sender, to_validator_receiver) = channel();
let (mut banking_stage, _entry_receiver) = BankingStage::new(
&bank,
verified_receiver,
@ -553,11 +547,16 @@ mod tests {
&bank.last_id(),
ticks_per_slot,
genesis_block.bootstrap_leader_id,
&to_validator_sender,
);
// Wait for Poh recorder to hit max height
assert_eq!(to_validator_receiver.recv().unwrap(), ticks_per_slot);
loop {
let bank_tick_height = bank.tick_height();
if bank_tick_height >= leader_scheduler_config.ticks_per_slot {
break;
}
sleep(Duration::from_millis(10));
}
// Now send a transaction to the banking stage
let transaction = SystemTransaction::new_account(

View File

@ -14,7 +14,7 @@ use crate::rpc_pubsub_service::PubSubService;
use crate::rpc_service::JsonRpcService;
use crate::service::Service;
use crate::storage_stage::StorageState;
use crate::tpu::{Tpu, TpuRotationReceiver, TpuRotationSender};
use crate::tpu::{Tpu, TpuRotationReceiver};
use crate::tvu::{Sockets, Tvu};
use crate::voting_keypair::VotingKeypair;
use log::Level;
@ -104,7 +104,6 @@ pub struct Fullnode {
tpu_sockets: Vec<UdpSocket>,
broadcast_socket: UdpSocket,
node_services: NodeServices,
rotation_sender: TpuRotationSender,
rotation_receiver: TpuRotationReceiver,
blocktree: Arc<Blocktree>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
@ -279,7 +278,6 @@ impl Fullnode {
exit,
tpu_sockets: node.sockets.tpu,
broadcast_socket: node.sockets.broadcast,
rotation_sender,
rotation_receiver,
blocktree,
leader_scheduler,
@ -371,7 +369,6 @@ impl Fullnode {
max_tick_height,
blob_index,
last_entry_id,
&self.rotation_sender,
&self.blocktree,
&self.leader_scheduler,
);
@ -920,14 +917,8 @@ mod tests {
// Wait for convergence
converge(&leader_node_info, 2);
info!("Wait for leader -> validator transition");
let rotation_signal = leader
.rotation_receiver
.recv()
.expect("signal for leader -> validator transition");
debug!("received rotation signal: {:?}", rotation_signal);
// Re-send the rotation signal, it'll be received again once the tvu is unpaused
leader.rotation_sender.send(rotation_signal).expect("send");
// Wait for Tpu bank to progress while the Tvu bank is stuck
sleep(Duration::from_millis(1000));
info!("Make sure the tvu bank has not reached the last tick for the slot (the last tick is ticks_per_slot - 1)");
{

View File

@ -1,11 +1,9 @@
//! The `poh_service` module implements a service that records the passing of
//! "ticks", a measure of time in the PoH stream
use crate::poh_recorder::{PohRecorder, PohRecorderError};
use crate::result::Error;
use crate::poh_recorder::PohRecorder;
use crate::result::Result;
use crate::service::Service;
use crate::tpu::TpuRotationSender;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::sleep;
@ -46,11 +44,7 @@ impl PohService {
self.join()
}
pub fn new(
poh_recorder: PohRecorder,
config: PohServiceConfig,
to_validator_sender: TpuRotationSender,
) -> Self {
pub fn new(poh_recorder: PohRecorder, config: PohServiceConfig) -> Self {
// PohService is a headless producer, so when it exits it should notify the banking stage.
// Since channel are not used to talk between these threads an AtomicBool is used as a
// signal.
@ -61,12 +55,7 @@ impl PohService {
.name("solana-poh-service-tick_producer".to_string())
.spawn(move || {
let mut poh_recorder_ = poh_recorder;
let return_value = Self::tick_producer(
&mut poh_recorder_,
config,
&poh_exit_,
&to_validator_sender,
);
let return_value = Self::tick_producer(&mut poh_recorder_, config, &poh_exit_);
poh_exit_.store(true, Ordering::Relaxed);
return_value
})
@ -82,18 +71,13 @@ impl PohService {
poh: &mut PohRecorder,
config: PohServiceConfig,
poh_exit: &AtomicBool,
to_validator_sender: &TpuRotationSender,
) -> Result<()> {
let max_tick_height = poh.max_tick_height();
loop {
match config {
PohServiceConfig::Tick(num) => {
for _ in 1..num {
let res = poh.hash();
if let Err(e) = res {
if let Error::PohRecorderError(PohRecorderError::MaxHeightReached) = e {
to_validator_sender.send(max_tick_height)?;
}
return Err(e);
}
}
@ -104,9 +88,6 @@ impl PohService {
}
let res = poh.tick();
if let Err(e) = res {
if let Error::PohRecorderError(PohRecorderError::MaxHeightReached) = e {
to_validator_sender.send(max_tick_height)?;
}
return Err(e);
}
if poh_exit.load(Ordering::Relaxed) {
@ -164,11 +145,9 @@ mod tests {
};
const HASHES_PER_TICK: u64 = 2;
let (sender, _) = channel();
let poh_service = PohService::new(
poh_recorder,
PohServiceConfig::Tick(HASHES_PER_TICK as usize),
sender,
);
// get some events

View File

@ -290,14 +290,11 @@ impl ReplayStage {
bank.tick_height(),
&leader_scheduler_,
);
if leader_id != last_leader_id {
if my_id == leader_id {
to_leader_sender.send(current_tick_height).unwrap();
} else {
// TODO: Remove this soon once we boot the leader from ClusterInfo
cluster_info.write().unwrap().set_leader(leader_id);
}
if my_id == leader_id || my_id == last_leader_id {
to_leader_sender.send(current_tick_height).unwrap();
} else {
// TODO: Remove this soon once we boot the leader from ClusterInfo
cluster_info.write().unwrap().set_leader(leader_id);
}
// Check for any slots that chain to this one

View File

@ -203,7 +203,6 @@ impl Tpu {
max_tick_height: u64,
blob_index: u64,
last_entry_id: &Hash,
to_validator_sender: &TpuRotationSender,
blocktree: &Arc<Blocktree>,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) {
@ -234,7 +233,6 @@ impl Tpu {
last_entry_id,
max_tick_height,
self.id,
&to_validator_sender,
);
let broadcast_service = BroadcastService::new(