From 4f18fc836fa47f0a456724532aa57ebed3583204 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Wed, 1 May 2019 11:37:29 -0700 Subject: [PATCH] Forward transactions to the next slot leader (#4092) - this ensures that transactions will reach in time for the next node to process them --- core/src/banking_stage.rs | 37 ++++++++++++++++++++++++++----------- core/src/poh_recorder.rs | 6 ++++++ 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 2ab237c72..60603b667 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -241,11 +241,19 @@ impl BankingStage { } BufferedPacketsDecision::Forward => { if enable_forwarding { - let _ = Self::forward_unprocessed_packets( - &socket, - &rcluster_info.leader_data().unwrap().tpu_via_blobs, - &buffered_packets, - ); + if let Some(leader_id) = poh_recorder + .lock() + .unwrap() + .next_slot_leader(DEFAULT_TICKS_PER_SLOT, None) + { + if let Some(leader) = rcluster_info.lookup(&leader_id) { + let _ = Self::forward_unprocessed_packets( + &socket, + &leader.tpu_via_blobs, + &buffered_packets, + ); + } + } } Ok(vec![]) } @@ -337,12 +345,19 @@ impl BankingStage { } if enable_forwarding { - if let Some(leader) = cluster_info.read().unwrap().leader_data() { - let _ = Self::forward_unprocessed_packets( - &socket, - &leader.tpu_via_blobs, - &unprocessed_packets, - ); + let rcluster_info = cluster_info.read().unwrap(); + if let Some(leader_id) = poh_recorder + .lock() + .unwrap() + .next_slot_leader(DEFAULT_TICKS_PER_SLOT, None) + { + if let Some(leader) = rcluster_info.lookup(&leader_id) { + let _ = Self::forward_unprocessed_packets( + &socket, + &leader.tpu_via_blobs, + &unprocessed_packets, + ); + } } } } diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index 2b8155d3d..72bb2debb 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -13,6 +13,7 @@ use crate::blocktree::Blocktree; use crate::entry::Entry; use crate::leader_schedule_cache::LeaderScheduleCache; +use crate::leader_schedule_utils; use crate::poh::Poh; use crate::result::{Error, Result}; use solana_runtime::bank::Bank; @@ -91,6 +92,11 @@ impl PohRecorder { self.working_bank.is_some() || close_to_leader_tick } + pub fn next_slot_leader(&self, ticks_per_slot: u64, bank: Option<&Bank>) -> Option { + let slot = leader_schedule_utils::tick_height_to_slot(ticks_per_slot, self.tick_height()); + self.leader_schedule_cache.slot_leader_at(slot + 1, bank) + } + pub fn hash(&mut self) { // TODO: amortize the cost of this lock by doing the loop in here for // some min amount of hashes