Forward transactions to the next slot leader (#4092)

- this ensures that transactions will reach in time for the next node to process them
This commit is contained in:
Pankaj Garg 2019-05-01 11:37:29 -07:00 committed by GitHub
parent 950d8494ba
commit 4f18fc836f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 11 deletions

View File

@ -241,11 +241,19 @@ impl BankingStage {
} }
BufferedPacketsDecision::Forward => { BufferedPacketsDecision::Forward => {
if enable_forwarding { if enable_forwarding {
let _ = Self::forward_unprocessed_packets( if let Some(leader_id) = poh_recorder
&socket, .lock()
&rcluster_info.leader_data().unwrap().tpu_via_blobs, .unwrap()
&buffered_packets, .next_slot_leader(DEFAULT_TICKS_PER_SLOT, None)
); {
if let Some(leader) = rcluster_info.lookup(&leader_id) {
let _ = Self::forward_unprocessed_packets(
&socket,
&leader.tpu_via_blobs,
&buffered_packets,
);
}
}
} }
Ok(vec![]) Ok(vec![])
} }
@ -337,12 +345,19 @@ impl BankingStage {
} }
if enable_forwarding { if enable_forwarding {
if let Some(leader) = cluster_info.read().unwrap().leader_data() { let rcluster_info = cluster_info.read().unwrap();
let _ = Self::forward_unprocessed_packets( if let Some(leader_id) = poh_recorder
&socket, .lock()
&leader.tpu_via_blobs, .unwrap()
&unprocessed_packets, .next_slot_leader(DEFAULT_TICKS_PER_SLOT, None)
); {
if let Some(leader) = rcluster_info.lookup(&leader_id) {
let _ = Self::forward_unprocessed_packets(
&socket,
&leader.tpu_via_blobs,
&unprocessed_packets,
);
}
} }
} }
} }

View File

@ -13,6 +13,7 @@
use crate::blocktree::Blocktree; use crate::blocktree::Blocktree;
use crate::entry::Entry; use crate::entry::Entry;
use crate::leader_schedule_cache::LeaderScheduleCache; use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::leader_schedule_utils;
use crate::poh::Poh; use crate::poh::Poh;
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
@ -91,6 +92,11 @@ impl PohRecorder {
self.working_bank.is_some() || close_to_leader_tick self.working_bank.is_some() || close_to_leader_tick
} }
pub fn next_slot_leader(&self, ticks_per_slot: u64, bank: Option<&Bank>) -> Option<Pubkey> {
let slot = leader_schedule_utils::tick_height_to_slot(ticks_per_slot, self.tick_height());
self.leader_schedule_cache.slot_leader_at(slot + 1, bank)
}
pub fn hash(&mut self) { pub fn hash(&mut self) {
// TODO: amortize the cost of this lock by doing the loop in here for // TODO: amortize the cost of this lock by doing the loop in here for
// some min amount of hashes // some min amount of hashes