diff --git a/src/fullnode.rs b/src/fullnode.rs index 1ce539ff1..43452e624 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -239,7 +239,7 @@ impl Fullnode { }; // Figure which node should generate the next tick - let (next_leader, next_slot, blob_index) = { + let (next_leader, next_slot) = { let next_tick = bank.tick_height() + 1; let leader_scheduler = leader_scheduler.read().unwrap(); @@ -248,28 +248,20 @@ impl Fullnode { let next_leader = leader_scheduler .get_leader_for_slot(next_slot) .expect("Leader not known after processing bank"); - let blob_index = if let Some(meta) = blocktree.meta(next_slot).expect("Database error") - { - meta.consumed - } else { - 0 - }; trace!( - "node {:?} scheduled as leader for slot {}, starting blob_index={}", + "node {:?} scheduled as leader for slot {}", next_leader, next_slot, - blob_index, ); - (next_leader, next_slot, blob_index) + (next_leader, next_slot) }; // END TODO let tvu = Tvu::new( voting_keypair_option, &bank_forks, - blob_index, entry_height, last_entry_id, &cluster_info, @@ -302,7 +294,7 @@ impl Fullnode { }; // TODO: This first rotate should come from the Tvu/ReplayStage - fullnode.rotate(&bank, next_leader, next_slot, blob_index, &last_entry_id); + fullnode.rotate(&bank, next_leader, next_slot, &last_entry_id); inc_new_counter_info!("fullnode-new", 1); fullnode } @@ -312,7 +304,6 @@ impl Fullnode { bank: &Arc, leader: Pubkey, slot: u64, - blob_index: u64, last_entry_id: &Hash, ) -> FullnodeReturnType { if leader == self.id { @@ -342,7 +333,6 @@ impl Fullnode { .expect("Failed to clone broadcast socket"), self.sigverify_disabled, slot, - blob_index, last_entry_id, &self.blocktree, &self.leader_scheduler, @@ -392,7 +382,7 @@ impl Fullnode { //self.bank_forks.write().set_working_bank_id(bank_id); let bank = self.bank_forks.read().unwrap().working_bank(); - let transition = self.rotate(&bank, leader, slot, 0, &bank.last_id()); + let transition = self.rotate(&bank, leader, slot, &bank.last_id()); debug!("role transition complete: {:?}", transition); if let Some(ref rotation_notifier) = rotation_notifier { rotation_notifier.send((transition, slot)).unwrap(); diff --git a/src/replay_stage.rs b/src/replay_stage.rs index c46aba7a7..b8508da65 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -188,7 +188,6 @@ impl ReplayStage { bank: Arc, cluster_info: Arc>, exit: Arc, - mut current_blob_index: u64, last_entry_id: Hash, to_leader_sender: &TvuRotationSender, ledger_signal_receiver: Receiver, @@ -201,6 +200,7 @@ impl ReplayStage { let to_leader_sender = to_leader_sender.clone(); let last_entry_id = Arc::new(RwLock::new(last_entry_id)); let subscriptions_ = subscriptions.clone(); + let t_replay = Builder::new() .name("solana-replay-stage".to_string()) .spawn(move || { @@ -222,7 +222,11 @@ impl ReplayStage { + leader_scheduler.num_ticks_left_in_slot(first_tick_in_current_slot), ) }; - + let mut current_blob_index = blocktree + .meta(current_slot.unwrap()) + .expect("Database error") + .map(|meta| meta.consumed) + .unwrap_or(0); let mut fees = 0; // Loop through blocktree MAX_ENTRY_RECV_PER_ITER entries at a time for each @@ -464,7 +468,6 @@ mod test { bank.clone(), Arc::new(RwLock::new(cluster_info_me)), exit.clone(), - meta.consumed, last_entry_id, &rotation_sender, l_receiver, @@ -568,7 +571,6 @@ mod test { bank.clone(), cluster_info_me.clone(), exit.clone(), - entry_height, last_entry_id, &to_leader_sender, l_receiver, @@ -694,7 +696,6 @@ mod test { bank.clone(), cluster_info_me.clone(), exit.clone(), - meta.consumed, last_entry_id, &rotation_tx, l_receiver, diff --git a/src/tpu.rs b/src/tpu.rs index 539f626a9..8c9ae9efd 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -197,7 +197,6 @@ impl Tpu { broadcast_socket: UdpSocket, sigverify_disabled: bool, slot: u64, - blob_index: u64, last_entry_id: &Hash, blocktree: &Arc, leader_scheduler: &Arc>, @@ -225,6 +224,11 @@ impl Tpu { // TODO: Fix BankingStage/BroadcastService to operate on `slot` directly instead of // `max_tick_height` let max_tick_height = (slot + 1) * leader_scheduler.read().unwrap().ticks_per_slot - 1; + let blob_index = blocktree + .meta(slot) + .expect("Database error") + .map(|meta| meta.consumed) + .unwrap_or(0); let (banking_stage, entry_receiver) = BankingStage::new( &bank, diff --git a/src/tvu.rs b/src/tvu.rs index ef0beca2b..1b9f11718 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -62,7 +62,6 @@ impl Tvu { /// # Arguments /// * `bank` - The bank state. /// * `entry_height` - Initial ledger height - /// * `blob_index` - Index of last processed blob /// * `last_entry_id` - Hash of the last entry /// * `cluster_info` - The cluster_info state. /// * `sockets` - My fetch, repair, and restransmit sockets @@ -71,7 +70,6 @@ impl Tvu { pub fn new( voting_keypair: Option>, bank_forks: &Arc>, - blob_index: u64, entry_height: u64, last_entry_id: Hash, cluster_info: &Arc>, @@ -129,7 +127,6 @@ impl Tvu { bank.clone(), cluster_info.clone(), exit.clone(), - blob_index, last_entry_id, to_leader_sender, ledger_signal_receiver, @@ -247,7 +244,6 @@ pub mod tests { Some(Arc::new(voting_keypair)), &Arc::new(RwLock::new(bank_forks)), 0, - 0, cur_hash, &cref1, { diff --git a/tests/tvu.rs b/tests/tvu.rs index e8642cc6c..a20376e64 100644 --- a/tests/tvu.rs +++ b/tests/tvu.rs @@ -114,7 +114,6 @@ fn test_replay() { Some(Arc::new(voting_keypair)), &Arc::new(RwLock::new(bank_forks)), 0, - 0, cur_hash, &cref1, {